Home Download Docs Code Community
     1	/*
     2	Copyright 2018 The Perkeep Authors.
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	/*
    18	Package overlay registers the "overlay" blobserver storage type
    19	that presents storage that is the result of overlaying a
    20	storage ("upper") on top of another storage ("lower").
    21	All changes go to the upper storage. The lower storage is never changed.
    22	
    23	The optional "deleted" KeyValue store may be provided to keep track of
    24	deleted blobs. When "deleted" is missing, deletion returns an error.
    25	
    26	Example usage:
    27	
    28		"/bs/": {
    29		  "handler": "storage-overlay",
    30		  "handlerArgs": {
    31		    "lower": "/sto-base/",
    32		    "upper": "/bs-local-changes/",
    33		    "deleted": {
    34		      "file": "/volume1/camlistore/home/var/camlistore/blobs/deleted.leveldb",
    35		      "type": "leveldb"
    36		    }
    37		  }
    38		}
    39	*/
    40	package overlay // import "perkeep.org/pkg/blobserver/overlay"
    41	
    42	import (
    43		"context"
    44		"fmt"
    45		"io"
    46		"log"
    47		"os"
    48		"time"
    49	
    50		"go4.org/jsonconfig"
    51		"perkeep.org/pkg/blob"
    52		"perkeep.org/pkg/blobserver"
    53		"perkeep.org/pkg/sorted"
    54	)
    55	
    56	func init() {
    57		blobserver.RegisterStorageConstructor("overlay", blobserver.StorageConstructor(newFromConfig))
    58	}
    59	
    60	// readOnlyStorage is a blobserver.Storage with write methods removed.
    61	type readOnlyStorage interface {
    62		blob.Fetcher
    63		blobserver.BlobEnumerator
    64		blobserver.BlobStatter
    65	}
    66	
    67	type overlayStorage struct {
    68		lower readOnlyStorage
    69	
    70		// deleted stores refs deleted from lower
    71		deleted sorted.KeyValue
    72	
    73		// read-write storage for changes
    74		upper blobserver.Storage
    75	}
    76	
    77	func newFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (blobserver.Storage, error) {
    78		var (
    79			lowerPrefix = conf.RequiredString("lower")
    80			upperPrefix = conf.RequiredString("upper")
    81			deletedConf = conf.OptionalObject("deleted")
    82		)
    83		if err := conf.Validate(); err != nil {
    84			return nil, err
    85		}
    86	
    87		lower, err := ld.GetStorage(lowerPrefix)
    88		if err != nil {
    89			return nil, fmt.Errorf("failed to load lower at %s: %v", lowerPrefix, err)
    90		}
    91		upper, err := ld.GetStorage(upperPrefix)
    92		if err != nil {
    93			return nil, fmt.Errorf("failed to load upper at %s: %v", upperPrefix, err)
    94		}
    95		var deleted sorted.KeyValue
    96		if len(deletedConf) != 0 {
    97			deleted, err = sorted.NewKeyValueMaybeWipe(deletedConf)
    98			if err != nil {
    99				return nil, fmt.Errorf("failed to setup deleted: %v", err)
   100			}
   101		}
   102	
   103		sto := &overlayStorage{
   104			lower:   lower,
   105			upper:   upper,
   106			deleted: deleted,
   107		}
   108	
   109		return sto, nil
   110	}
   111	
   112	func (sto *overlayStorage) Close() error {
   113		if sto.deleted == nil {
   114			return nil
   115		}
   116		return sto.deleted.Close()
   117	}
   118	
   119	// ReceiveBlob stores received blobs on the upper layer.
   120	func (sto *overlayStorage) ReceiveBlob(ctx context.Context, br blob.Ref, src io.Reader) (sb blob.SizedRef, err error) {
   121		sb, err = sto.upper.ReceiveBlob(ctx, br, src)
   122		if err == nil && sto.deleted != nil {
   123			err = sto.deleted.Delete(br.String())
   124		}
   125		return sb, err
   126	}
   127	
   128	// RemoveBlobs marks the given blobs as deleted, and removes them if they are in the upper layer.
   129	func (sto *overlayStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
   130		if sto.deleted == nil {
   131			return blobserver.ErrNotImplemented
   132		}
   133	
   134		err := sto.upper.RemoveBlobs(ctx, blobs)
   135		if err != nil {
   136			return err
   137		}
   138	
   139		m := sto.deleted.BeginBatch()
   140		for _, br := range blobs {
   141			m.Set(br.String(), "1")
   142		}
   143		return sto.deleted.CommitBatch(m)
   144	}
   145	
   146	func (sto *overlayStorage) isDeleted(br blob.Ref) bool {
   147		if sto.deleted == nil {
   148			return false
   149		}
   150	
   151		_, err := sto.deleted.Get(br.String())
   152		if err == nil {
   153			return true
   154		}
   155	
   156		if err != sorted.ErrNotFound {
   157			log.Printf("overlayStorage error accessing deleted: %v", err)
   158		}
   159	
   160		return false
   161	}
   162	
   163	// Fetch the blob by trying first the upper and then lower.
   164	// The lower storage is checked only if the blob was not deleleted in sto itself.
   165	func (sto *overlayStorage) Fetch(ctx context.Context, br blob.Ref) (file io.ReadCloser, size uint32, err error) {
   166		if sto.isDeleted(br) {
   167			return nil, 0, os.ErrNotExist
   168		}
   169	
   170		file, size, err = sto.upper.Fetch(ctx, br)
   171		if err != os.ErrNotExist {
   172			return file, size, err
   173		}
   174	
   175		return sto.lower.Fetch(ctx, br)
   176	}
   177	
   178	// StatBlobs on all BlobStatter reads sequentially, returning the first error.
   179	func (sto *overlayStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, f func(blob.SizedRef) error) error {
   180		exists := make([]blob.Ref, 0, len(blobs))
   181		for _, br := range blobs {
   182			if !sto.isDeleted(br) {
   183				exists = append(exists, br)
   184			}
   185		}
   186	
   187		seen := make(map[blob.Ref]struct{}, len(exists))
   188	
   189		err := sto.upper.StatBlobs(ctx, exists, func(sbr blob.SizedRef) error {
   190			seen[sbr.Ref] = struct{}{}
   191			return f(sbr)
   192		})
   193	
   194		if err != nil {
   195			return err
   196		}
   197	
   198		lowerBlobs := make([]blob.Ref, 0, len(exists))
   199		for _, br := range exists {
   200			if _, s := seen[br]; !s {
   201				lowerBlobs = append(lowerBlobs, br)
   202			}
   203		}
   204	
   205		return sto.lower.StatBlobs(ctx, lowerBlobs, f)
   206	}
   207	
   208	// EnumerateBlobs enumerates blobs of the lower and upper layers.
   209	func (sto *overlayStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
   210		defer close(dest)
   211	
   212		enums := []blobserver.BlobEnumerator{sto.lower, sto.upper}
   213	
   214		// Ensure that we send limit blobs if possible.
   215		sent := 0
   216		for sent < limit {
   217			ch := make(chan blob.SizedRef)
   218			errch := make(chan error, 1)
   219			go func() {
   220				errch <- blobserver.MergedEnumerate(ctx, ch, enums, after, limit-sent)
   221			}()
   222	
   223			var last blob.Ref
   224	
   225			// Yield all blobs that weren't deleted from ch to destch.
   226			seen := 0
   227			for sbr := range ch {
   228				seen++
   229				if !sto.isDeleted(sbr.Ref) {
   230					log.Println(sent, sbr.Ref)
   231					dest <- sbr
   232					sent++
   233				}
   234				last = sbr.Ref
   235			}
   236	
   237			if err := <-errch; err != nil {
   238				return err
   239			}
   240	
   241			// if no blob was received, enumeration is finished
   242			if seen == 0 {
   243				return nil
   244			}
   245	
   246			// resume enumeration after the last blob seen
   247			after = last.String()
   248		}
   249	
   250		return nil
   251	}
   252	
   253	func (sto *overlayStorage) StorageGeneration() (initTime time.Time, random string, err error) {
   254		if gener, ok := sto.upper.(blobserver.Generationer); ok {
   255			return gener.StorageGeneration()
   256		}
   257		err = blobserver.GenerationNotSupportedError(fmt.Sprintf("blobserver.Generationer not implemented on %T", sto.upper))
   258		return
   259	}
   260	
   261	func (sto *overlayStorage) ResetStorageGeneration() error {
   262		if gener, ok := sto.upper.(blobserver.Generationer); ok {
   263			return gener.ResetStorageGeneration()
   264		}
   265		return blobserver.GenerationNotSupportedError(fmt.Sprintf("blobserver.Generationer not implemented on %T", sto.upper))
   266	}
Website layout inspired by memcached.
Content by the authors.