Home Download Docs Code Community
     1	/*
     2	Copyright 2011 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	/*
    18	Package shard registers the "shard" blobserver storage type,
    19	predictably spraying out blobs out over the provided backends
    20	based on their blobref. Each blob maps to exactly one backend.
    21	
    22	Example low-level config:
    23	
    24	     "/foo/": {
    25	         "handler": "storage-shard",
    26	         "handlerArgs": {
    27	             "backends": ["/s1/", "/s2/"]
    28	          }
    29	     },
    30	
    31	*/
    32	package shard // import "perkeep.org/pkg/blobserver/shard"
    33	
    34	import (
    35		"context"
    36		"errors"
    37		"io"
    38		"sync"
    39	
    40		"go4.org/jsonconfig"
    41		"perkeep.org/pkg/blob"
    42		"perkeep.org/pkg/blobserver"
    43	)
    44	
    45	type shardStorage struct {
    46		shardPrefixes []string
    47		shards        []blobserver.Storage
    48	}
    49	
    50	func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err error) {
    51		sto := &shardStorage{
    52			shardPrefixes: config.RequiredList("backends"),
    53		}
    54		if err := config.Validate(); err != nil {
    55			return nil, err
    56		}
    57		if len(sto.shardPrefixes) == 0 {
    58			return nil, errors.New("shard: need at least one shard")
    59		}
    60		sto.shards = make([]blobserver.Storage, len(sto.shardPrefixes))
    61		for i, prefix := range sto.shardPrefixes {
    62			shardSto, err := ld.GetStorage(prefix)
    63			if err != nil {
    64				return nil, err
    65			}
    66			sto.shards[i] = shardSto
    67		}
    68		return sto, nil
    69	}
    70	
    71	func (sto *shardStorage) shard(b blob.Ref) blobserver.Storage {
    72		return sto.shards[int(sto.shardNum(b))]
    73	}
    74	
    75	func (sto *shardStorage) shardNum(b blob.Ref) uint32 {
    76		return b.Sum32() % uint32(len(sto.shards))
    77	}
    78	
    79	func (sto *shardStorage) Fetch(ctx context.Context, b blob.Ref) (file io.ReadCloser, size uint32, err error) {
    80		return sto.shard(b).Fetch(ctx, b)
    81	}
    82	
    83	func (sto *shardStorage) ReceiveBlob(ctx context.Context, b blob.Ref, source io.Reader) (sb blob.SizedRef, err error) {
    84		return sto.shard(b).ReceiveBlob(ctx, b, source)
    85	}
    86	
    87	func (sto *shardStorage) batchedShards(ctx context.Context, blobs []blob.Ref, fn func(blobserver.Storage, []blob.Ref) error) error {
    88		m := make(map[uint32][]blob.Ref)
    89		for _, b := range blobs {
    90			sn := sto.shardNum(b)
    91			m[sn] = append(m[sn], b)
    92		}
    93		ch := make(chan error, len(m))
    94		for sn := range m {
    95			sblobs := m[sn]
    96			s := sto.shards[sn]
    97			go func() {
    98				ch <- fn(s, sblobs)
    99			}()
   100		}
   101		var reterr error
   102		for range m {
   103			if err := <-ch; err != nil {
   104				reterr = err
   105			}
   106		}
   107		return reterr
   108	}
   109	
   110	func (sto *shardStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
   111		return sto.batchedShards(context.TODO(), blobs, func(s blobserver.Storage, blobs []blob.Ref) error {
   112			return s.RemoveBlobs(ctx, blobs)
   113		})
   114	}
   115	
   116	func (sto *shardStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
   117		var (
   118			fnMu   sync.Mutex // serializes calls to fn, guards failed
   119			failed bool
   120		)
   121		// TODO: do a context.WithCancel and abort all shards' context
   122		// once one fails, but don't do that until we can guarantee
   123		// that the first failure we report is the real one, not
   124		// another goroutine getting its context canceled before our
   125		// real first failure returns from its goroutine. That is, we
   126		// should use golang.org/x/sync/errgroup, but we need to
   127		// integrate it with batchedShards and audit callers. Or not
   128		// use batchedShards here, or only use batchedShards to
   129		// collect work to do and then use errgroup directly ourselves
   130		// here.
   131		return sto.batchedShards(ctx, blobs, func(s blobserver.Storage, blobs []blob.Ref) error {
   132			return s.StatBlobs(ctx, blobs, func(sb blob.SizedRef) error {
   133				fnMu.Lock()
   134				defer fnMu.Unlock()
   135				if failed {
   136					return nil
   137				}
   138				if err := fn(sb); err != nil {
   139					failed = true
   140					return err
   141				}
   142				return nil
   143			})
   144		})
   145	}
   146	
   147	func (sto *shardStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
   148		return blobserver.MergedEnumerateStorage(ctx, dest, sto.shards, after, limit)
   149	}
   150	
   151	func init() {
   152		blobserver.RegisterStorageConstructor("shard", blobserver.StorageConstructor(newFromConfig))
   153	}
Website layout inspired by memcached.
Content by the authors.