Home Download Docs Code Community
     1	/*
     2	Copyright 2017 The Perkeep Authors.
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	// Package union registers the "union" read-only blobserver storage type
    18	// to read from the given subsets, serving the first responding.
    19	package union // import "perkeep.org/pkg/blobserver/union"
    20	
    21	import (
    22		"context"
    23		"errors"
    24		"io"
    25		"sync"
    26	
    27		"go4.org/jsonconfig"
    28		"perkeep.org/pkg/blob"
    29		"perkeep.org/pkg/blobserver"
    30	)
    31	
    32	type unionStorage struct {
    33		subsets []blobserver.Storage
    34	}
    35	
    36	func newFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (blobserver.Storage, error) {
    37		sto := &unionStorage{}
    38	
    39		reads := conf.RequiredList("subsets")
    40		if err := conf.Validate(); err != nil {
    41			return nil, err
    42		}
    43	
    44		for _, s := range reads {
    45			rs, err := ld.GetStorage(s)
    46			if err != nil {
    47				return nil, err
    48			}
    49			sto.subsets = append(sto.subsets, rs)
    50		}
    51	
    52		return sto, nil
    53	}
    54	
    55	// ReceiveBlob would receive the blobs, but now just returns ErrReadonly.
    56	func (sto *unionStorage) ReceiveBlob(ctx context.Context, br blob.Ref, src io.Reader) (sb blob.SizedRef, err error) {
    57		return blob.SizedRef{}, blobserver.ErrReadonly
    58	}
    59	
    60	// RemoveBlobs would remove the given blobs, but now just returns ErrReadonly.
    61	func (sto *unionStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
    62		return blobserver.ErrReadonly
    63	}
    64	
    65	// Fetch the blob by trying all configured read Storage concurrently,
    66	// returning the first successful response, or the first error if there's no match.
    67	func (sto *unionStorage) Fetch(ctx context.Context, b blob.Ref) (file io.ReadCloser, size uint32, err error) {
    68		type result struct {
    69			file io.ReadCloser
    70			size uint32
    71			err  error
    72		}
    73		results := make(chan result, len(sto.subsets))
    74		var wg sync.WaitGroup
    75		for _, bs := range sto.subsets {
    76			bs := bs
    77			wg.Add(1)
    78			go func() {
    79				defer wg.Done()
    80				var res result
    81				res.file, res.size, res.err = bs.Fetch(ctx, b)
    82				results <- res
    83			}()
    84		}
    85		go func() {
    86			wg.Wait()
    87			close(results)
    88		}()
    89		var firstErr error
    90		var firstRes result
    91		for r := range results {
    92			if r.err != nil {
    93				if firstErr == nil {
    94					firstErr = r.err
    95				}
    96				continue
    97			}
    98			if firstRes.file != nil {
    99				if r.file != nil {
   100					r.file.Close() // don't need, we already have a successful Fetch
   101				}
   102				continue
   103			}
   104	
   105			firstRes = r
   106		}
   107		if firstRes.file != nil {
   108			return firstRes.file, firstRes.size, nil
   109		}
   110		return nil, 0, firstErr
   111	}
   112	
   113	// StatBlobs on all BlobStatter reads sequentially, returning the first error.
   114	func (sto *unionStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, f func(blob.SizedRef) error) error {
   115		if err := ctx.Err(); err != nil {
   116			return err
   117		}
   118		// need to dedup the blobs
   119		maybeDup := make(chan blob.SizedRef)
   120		errCh := make(chan error, 1)
   121		var wg sync.WaitGroup
   122		var any bool
   123		for _, s := range sto.subsets {
   124			if bs, ok := s.(blobserver.BlobStatter); ok {
   125				any = true
   126				wg.Add(1)
   127				go func() {
   128					defer wg.Done()
   129					if err := bs.StatBlobs(ctx, blobs, func(sr blob.SizedRef) error {
   130						maybeDup <- sr
   131						return nil
   132					}); err != nil {
   133						errCh <- err
   134					}
   135				}()
   136			}
   137		}
   138		if !any {
   139			return errors.New("union: No BlobStatter reader configured")
   140		}
   141	
   142		var closeChanOnce sync.Once
   143		go func() {
   144			wg.Wait()
   145			closeChanOnce.Do(func() { close(maybeDup) })
   146		}()
   147	
   148		seen := make(map[blob.Ref]struct{}, len(blobs))
   149		for {
   150			select {
   151			case <-ctx.Done():
   152				return ctx.Err()
   153			case err := <-errCh:
   154				closeChanOnce.Do(func() { close(maybeDup) })
   155				return err
   156			case sr, ok := <-maybeDup:
   157				if !ok {
   158					return nil
   159				}
   160				if _, ok = seen[sr.Ref]; !ok {
   161					seen[sr.Ref] = struct{}{}
   162					if err := f(sr); err != nil {
   163						return err
   164					}
   165				}
   166			}
   167		}
   168	}
   169	
   170	// EnumerateBlobs concurrently on the readers, returning one of the errors.
   171	func (sto *unionStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
   172		return blobserver.MergedEnumerateStorage(ctx, dest, sto.subsets, after, limit)
   173	}
   174	
   175	func init() {
   176		blobserver.RegisterStorageConstructor("union", blobserver.StorageConstructor(newFromConfig))
   177	}
Website layout inspired by memcached.
Content by the authors.