Home Download Docs Code Community
     1	/*
     2	Copyright 2018 The Perkeep Authors.
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	/*
    18	Package overlay registers the "overlay" blobserver storage type
    19	that presents storage that is the result of overlaying a
    20	storage ("upper") on top of another storage ("lower").
    21	All changes go to the upper storage. The lower storage is never changed.
    22	
    23	The optional "deleted" KeyValue store may be provided to keep track of
    24	deleted blobs. When "deleted" is missing, deletion returns an error.
    25	
    26	Example usage:
    27	
    28		"/bs/": {
    29		  "handler": "storage-overlay",
    30		  "handlerArgs": {
    31		    "lower": "/sto-base/",
    32		    "upper": "/bs-local-changes/",
    33		    "deleted": {
    34		      "file": "/volume1/camlistore/home/var/camlistore/blobs/deleted.leveldb",
    35		      "type": "leveldb"
    36		    }
    37		  }
    38		}
    39	*/
    40	package overlay // import "perkeep.org/pkg/blobserver/overlay"
    41	
    42	import (
    43		"context"
    44		"errors"
    45		"fmt"
    46		"io"
    47		"log"
    48		"os"
    49		"time"
    50	
    51		"go4.org/jsonconfig"
    52	
    53		"perkeep.org/pkg/blob"
    54		"perkeep.org/pkg/blobserver"
    55		"perkeep.org/pkg/sorted"
    56	)
    57	
    58	func init() {
    59		blobserver.RegisterStorageConstructor("overlay", blobserver.StorageConstructor(newFromConfig))
    60	}
    61	
    62	// readOnlyStorage is a blobserver.Storage with write methods removed.
    63	type readOnlyStorage interface {
    64		blob.Fetcher
    65		blobserver.BlobEnumerator
    66		blobserver.BlobStatter
    67	}
    68	
    69	type overlayStorage struct {
    70		lower readOnlyStorage
    71	
    72		// deleted stores refs deleted from lower
    73		deleted sorted.KeyValue
    74	
    75		// read-write storage for changes
    76		upper blobserver.Storage
    77	}
    78	
    79	func newFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (blobserver.Storage, error) {
    80		var (
    81			lowerPrefix = conf.RequiredString("lower")
    82			upperPrefix = conf.RequiredString("upper")
    83			deletedConf = conf.OptionalObject("deleted")
    84		)
    85		if err := conf.Validate(); err != nil {
    86			return nil, err
    87		}
    88	
    89		lower, err := ld.GetStorage(lowerPrefix)
    90		if err != nil {
    91			return nil, fmt.Errorf("failed to load lower at %s: %w", lowerPrefix, err)
    92		}
    93		upper, err := ld.GetStorage(upperPrefix)
    94		if err != nil {
    95			return nil, fmt.Errorf("failed to load upper at %s: %w", upperPrefix, err)
    96		}
    97		var deleted sorted.KeyValue
    98		if len(deletedConf) != 0 {
    99			deleted, err = sorted.NewKeyValueMaybeWipe(deletedConf)
   100			if err != nil {
   101				return nil, fmt.Errorf("failed to setup deleted: %w", err)
   102			}
   103		}
   104	
   105		sto := &overlayStorage{
   106			lower:   lower,
   107			upper:   upper,
   108			deleted: deleted,
   109		}
   110	
   111		return sto, nil
   112	}
   113	
   114	func (sto *overlayStorage) Close() error {
   115		if sto.deleted == nil {
   116			return nil
   117		}
   118		return sto.deleted.Close()
   119	}
   120	
   121	// ReceiveBlob stores received blobs on the upper layer.
   122	func (sto *overlayStorage) ReceiveBlob(ctx context.Context, br blob.Ref, src io.Reader) (sb blob.SizedRef, err error) {
   123		sb, err = sto.upper.ReceiveBlob(ctx, br, src)
   124		if err == nil && sto.deleted != nil {
   125			err = sto.deleted.Delete(br.String())
   126		}
   127		return sb, err
   128	}
   129	
   130	// RemoveBlobs marks the given blobs as deleted, and removes them if they are in the upper layer.
   131	func (sto *overlayStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
   132		if sto.deleted == nil {
   133			return blobserver.ErrNotImplemented
   134		}
   135	
   136		err := sto.upper.RemoveBlobs(ctx, blobs)
   137		if err != nil {
   138			return err
   139		}
   140	
   141		m := sto.deleted.BeginBatch()
   142		for _, br := range blobs {
   143			m.Set(br.String(), "1")
   144		}
   145		return sto.deleted.CommitBatch(m)
   146	}
   147	
   148	func (sto *overlayStorage) isDeleted(br blob.Ref) bool {
   149		if sto.deleted == nil {
   150			return false
   151		}
   152	
   153		_, err := sto.deleted.Get(br.String())
   154		if err == nil {
   155			return true
   156		}
   157	
   158		if !errors.Is(err, sorted.ErrNotFound) {
   159			log.Printf("overlayStorage error accessing deleted: %v", err)
   160		}
   161	
   162		return false
   163	}
   164	
   165	// Fetch the blob by trying first the upper and then lower.
   166	// The lower storage is checked only if the blob was not deleleted in sto itself.
   167	func (sto *overlayStorage) Fetch(ctx context.Context, br blob.Ref) (file io.ReadCloser, size uint32, err error) {
   168		if sto.isDeleted(br) {
   169			return nil, 0, os.ErrNotExist
   170		}
   171	
   172		file, size, err = sto.upper.Fetch(ctx, br)
   173		if !errors.Is(err, os.ErrNotExist) {
   174			return file, size, err
   175		}
   176	
   177		return sto.lower.Fetch(ctx, br)
   178	}
   179	
   180	// StatBlobs on all BlobStatter reads sequentially, returning the first error.
   181	func (sto *overlayStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, f func(blob.SizedRef) error) error {
   182		exists := make([]blob.Ref, 0, len(blobs))
   183		for _, br := range blobs {
   184			if !sto.isDeleted(br) {
   185				exists = append(exists, br)
   186			}
   187		}
   188	
   189		seen := make(map[blob.Ref]struct{}, len(exists))
   190	
   191		err := sto.upper.StatBlobs(ctx, exists, func(sbr blob.SizedRef) error {
   192			seen[sbr.Ref] = struct{}{}
   193			return f(sbr)
   194		})
   195	
   196		if err != nil {
   197			return err
   198		}
   199	
   200		lowerBlobs := make([]blob.Ref, 0, len(exists))
   201		for _, br := range exists {
   202			if _, s := seen[br]; !s {
   203				lowerBlobs = append(lowerBlobs, br)
   204			}
   205		}
   206	
   207		return sto.lower.StatBlobs(ctx, lowerBlobs, f)
   208	}
   209	
   210	// EnumerateBlobs enumerates blobs of the lower and upper layers.
   211	func (sto *overlayStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
   212		defer close(dest)
   213	
   214		enums := []blobserver.BlobEnumerator{sto.lower, sto.upper}
   215	
   216		// Ensure that we send limit blobs if possible.
   217		sent := 0
   218		for sent < limit {
   219			ch := make(chan blob.SizedRef)
   220			errch := make(chan error, 1)
   221			go func() {
   222				errch <- blobserver.MergedEnumerate(ctx, ch, enums, after, limit-sent)
   223			}()
   224	
   225			var last blob.Ref
   226	
   227			// Yield all blobs that weren't deleted from ch to destch.
   228			seen := 0
   229			for sbr := range ch {
   230				seen++
   231				if !sto.isDeleted(sbr.Ref) {
   232					log.Println(sent, sbr.Ref)
   233					dest <- sbr
   234					sent++
   235				}
   236				last = sbr.Ref
   237			}
   238	
   239			if err := <-errch; err != nil {
   240				return err
   241			}
   242	
   243			// if no blob was received, enumeration is finished
   244			if seen == 0 {
   245				return nil
   246			}
   247	
   248			// resume enumeration after the last blob seen
   249			after = last.String()
   250		}
   251	
   252		return nil
   253	}
   254	
   255	func (sto *overlayStorage) StorageGeneration() (initTime time.Time, random string, err error) {
   256		if gener, ok := sto.upper.(blobserver.Generationer); ok {
   257			return gener.StorageGeneration()
   258		}
   259		err = blobserver.GenerationNotSupportedError(fmt.Sprintf("blobserver.Generationer not implemented on %T", sto.upper))
   260		return
   261	}
   262	
   263	func (sto *overlayStorage) ResetStorageGeneration() error {
   264		if gener, ok := sto.upper.(blobserver.Generationer); ok {
   265			return gener.ResetStorageGeneration()
   266		}
   267		return blobserver.GenerationNotSupportedError(fmt.Sprintf("blobserver.Generationer not implemented on %T", sto.upper))
   268	}
Website layout inspired by memcached.
Content by the authors.