Home Download Docs Code Community
     1	/*
     2	Copyright 2014 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	/*
    18	Package proxycache registers the "proxycache" blobserver storage type,
    19	which uses a provided blobserver as a cache for a second origin
    20	blobserver.
    21	
    22	If the provided maxCacheBytes is unspecified, the default is 512MB.
    23	
    24	Example config:
    25	
    26		      "/cache/": {
    27		          "handler": "storage-proxycache",
    28		          "handlerArgs": {
    29				  "origin": "/cloud-blobs/",
    30				  "cache": "/local-ssd/",
    31				  "maxCacheBytes": 536870912
    32		          }
    33		      },
    34	*/
    35	package proxycache // import "perkeep.org/pkg/blobserver/proxycache"
    36	
    37	import (
    38		"bytes"
    39		"context"
    40		"io"
    41		"log"
    42		"os"
    43		"sync"
    44	
    45		"go4.org/jsonconfig"
    46		"go4.org/syncutil"
    47		"perkeep.org/internal/lru"
    48		"perkeep.org/pkg/blob"
    49		"perkeep.org/pkg/blobserver"
    50	)
    51	
    52	// Storage implements the "proxycache" blob storage.
    53	type Storage struct {
    54		origin blobserver.Storage
    55		cache  blobserver.Storage
    56	
    57		debug         bool
    58		maxCacheBytes int64
    59	
    60		mu         sync.Mutex // guards following
    61		lru        *lru.Cache
    62		cacheBytes int64
    63	}
    64	
    65	var (
    66		_ blobserver.Storage = (*Storage)(nil)
    67		_ blob.SubFetcher    = (*Storage)(nil)
    68		// TODO:
    69		// _ blobserver.Generationer = (*Storage)(nil)
    70	)
    71	
    72	// New returns a proxycache blob storage that reads from cache,
    73	// then origin, populating cache as needed, up to a total of maxBytes.
    74	func New(maxBytes int64, cache, origin blobserver.Storage) *Storage {
    75		sto := &Storage{
    76			origin:        origin,
    77			cache:         cache,
    78			lru:           lru.NewUnlocked(0),
    79			maxCacheBytes: maxBytes,
    80		}
    81		return sto
    82	}
    83	
    84	func init() {
    85		blobserver.RegisterStorageConstructor("proxycache", blobserver.StorageConstructor(newFromConfig))
    86	}
    87	
    88	func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) {
    89		var (
    90			origin        = config.RequiredString("origin")
    91			cache         = config.RequiredString("cache")
    92			maxCacheBytes = config.OptionalInt64("maxCacheBytes", 512<<20)
    93		)
    94		if err := config.Validate(); err != nil {
    95			return nil, err
    96		}
    97		cacheSto, err := ld.GetStorage(cache)
    98		if err != nil {
    99			return nil, err
   100		}
   101		originSto, err := ld.GetStorage(origin)
   102		if err != nil {
   103			return nil, err
   104		}
   105		return New(maxCacheBytes, cacheSto, originSto), nil
   106	}
   107	
   108	// must hold sto.mu.
   109	// Reports whether an item was removed.
   110	func (sto *Storage) removeOldest() bool {
   111		ctx := context.TODO()
   112		k, v := sto.lru.RemoveOldest()
   113		if v == nil {
   114			return false
   115		}
   116		sb := v.(blob.SizedRef)
   117		// TODO: run these without sto.mu held in background
   118		// goroutine? at least pass a context?
   119		err := sto.cache.RemoveBlobs(ctx, []blob.Ref{sb.Ref})
   120		if err != nil {
   121			log.Printf("proxycache: could not remove oldest blob %v (%d bytes): %v", sb.Ref, sb.Size, err)
   122			sto.lru.Add(k, v)
   123			return false
   124		}
   125		if sto.debug {
   126			log.Printf("proxycache: removed blob %v (%d bytes)", sb.Ref, sb.Size)
   127		}
   128		sto.cacheBytes -= int64(sb.Size)
   129		return true
   130	}
   131	
   132	func (sto *Storage) touch(sb blob.SizedRef) {
   133		key := sb.Ref.String()
   134	
   135		sto.mu.Lock()
   136		defer sto.mu.Unlock()
   137	
   138		_, old := sto.lru.Get(key)
   139		if !old {
   140			sto.lru.Add(key, sb)
   141			sto.cacheBytes += int64(sb.Size)
   142	
   143			// Clean while needed.
   144			for sto.cacheBytes > sto.maxCacheBytes {
   145				if !sto.removeOldest() {
   146					break
   147				}
   148			}
   149		}
   150	}
   151	
   152	func (sto *Storage) Fetch(ctx context.Context, b blob.Ref) (rc io.ReadCloser, size uint32, err error) {
   153		rc, size, err = sto.cache.Fetch(ctx, b)
   154		if err == nil {
   155			sto.touch(blob.SizedRef{Ref: b, Size: size})
   156			return
   157		}
   158		if err != os.ErrNotExist {
   159			log.Printf("warning: proxycache cache fetch error for %v: %v", b, err)
   160		}
   161		rc, size, err = sto.origin.Fetch(ctx, b)
   162		if err != nil {
   163			return
   164		}
   165		all, err := io.ReadAll(rc)
   166		if err != nil {
   167			return
   168		}
   169		if _, err := blobserver.Receive(ctx, sto.cache, b, bytes.NewReader(all)); err != nil {
   170			log.Printf("populating proxycache cache for %v: %v", b, err)
   171		} else {
   172			sto.touch(blob.SizedRef{Ref: b, Size: size})
   173		}
   174		return io.NopCloser(bytes.NewReader(all)), size, nil
   175	}
   176	
   177	func (sto *Storage) SubFetch(ctx context.Context, ref blob.Ref, offset, length int64) (io.ReadCloser, error) {
   178		if sf, ok := sto.cache.(blob.SubFetcher); ok {
   179			rc, err := sf.SubFetch(ctx, ref, offset, length)
   180			if err == nil {
   181				return rc, nil
   182			}
   183			if err != os.ErrNotExist && err != blob.ErrUnimplemented {
   184				log.Printf("proxycache: error fetching from cache %T: %v", sto.cache, err)
   185			}
   186		}
   187		if sf, ok := sto.origin.(blob.SubFetcher); ok {
   188			return sf.SubFetch(ctx, ref, offset, length)
   189		}
   190		return nil, blob.ErrUnimplemented
   191	}
   192	
   193	func (sto *Storage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
   194		need := map[blob.Ref]bool{}
   195		for _, br := range blobs {
   196			need[br] = true
   197		}
   198		err := sto.cache.StatBlobs(ctx, blobs, func(sb blob.SizedRef) error {
   199			sto.touch(sb)
   200			delete(need, sb.Ref)
   201			return fn(sb)
   202		})
   203		if err != nil {
   204			return err
   205		}
   206		if len(need) == 0 {
   207			// Cache had them all.
   208			return nil
   209		}
   210		// And now any missing ones:
   211		blobs = make([]blob.Ref, 0, len(need))
   212		for br := range need {
   213			blobs = append(blobs, br)
   214		}
   215		return sto.origin.StatBlobs(ctx, blobs, func(sb blob.SizedRef) error {
   216			sto.touch(sb)
   217			return fn(sb)
   218		})
   219	}
   220	
   221	func (sto *Storage) ReceiveBlob(ctx context.Context, br blob.Ref, src io.Reader) (blob.SizedRef, error) {
   222		// Slurp the whole blob before replicating. Bounded by 16 MB anyway.
   223		var buf bytes.Buffer
   224		if _, err := io.Copy(&buf, src); err != nil {
   225			return blob.SizedRef{}, err
   226		}
   227	
   228		sb, err := sto.origin.ReceiveBlob(ctx, br, bytes.NewReader(buf.Bytes()))
   229		if err != nil {
   230			return sb, err
   231		}
   232	
   233		if _, err := sto.cache.ReceiveBlob(ctx, br, bytes.NewReader(buf.Bytes())); err != nil {
   234			log.Printf("proxycache: ignoring error populating blob %v in cache: %v", br, err)
   235		} else {
   236			sto.touch(sb)
   237		}
   238		return sb, err
   239	}
   240	
   241	func (sto *Storage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
   242		var gr syncutil.Group
   243		gr.Go(func() error {
   244			return sto.cache.RemoveBlobs(ctx, blobs)
   245		})
   246		gr.Go(func() error {
   247			return sto.origin.RemoveBlobs(ctx, blobs)
   248		})
   249		gr.Wait()
   250		return gr.Err()
   251	}
   252	
   253	func (sto *Storage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
   254		return sto.origin.EnumerateBlobs(ctx, dest, after, limit)
   255	}
Website layout inspired by memcached.
Content by the authors.