Home Download Docs Code Community
     1	/*
     2	Copyright 2011 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	/*
    18	Package shard registers the "shard" blobserver storage type,
    19	predictably spraying out blobs out over the provided backends
    20	based on their blobref. Each blob maps to exactly one backend.
    21	
    22	Example low-level config:
    23	
    24		"/foo/": {
    25		    "handler": "storage-shard",
    26		    "handlerArgs": {
    27		        "backends": ["/s1/", "/s2/"]
    28		     }
    29		},
    30	*/
    31	package shard // import "perkeep.org/pkg/blobserver/shard"
    32	
    33	import (
    34		"context"
    35		"errors"
    36		"io"
    37		"sync"
    38	
    39		"go4.org/jsonconfig"
    40		"perkeep.org/pkg/blob"
    41		"perkeep.org/pkg/blobserver"
    42	)
    43	
    44	type shardStorage struct {
    45		shardPrefixes []string
    46		shards        []blobserver.Storage
    47	}
    48	
    49	func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err error) {
    50		sto := &shardStorage{
    51			shardPrefixes: config.RequiredList("backends"),
    52		}
    53		if err := config.Validate(); err != nil {
    54			return nil, err
    55		}
    56		if len(sto.shardPrefixes) == 0 {
    57			return nil, errors.New("shard: need at least one shard")
    58		}
    59		sto.shards = make([]blobserver.Storage, len(sto.shardPrefixes))
    60		for i, prefix := range sto.shardPrefixes {
    61			shardSto, err := ld.GetStorage(prefix)
    62			if err != nil {
    63				return nil, err
    64			}
    65			sto.shards[i] = shardSto
    66		}
    67		return sto, nil
    68	}
    69	
    70	func (sto *shardStorage) shard(b blob.Ref) blobserver.Storage {
    71		return sto.shards[int(sto.shardNum(b))]
    72	}
    73	
    74	func (sto *shardStorage) shardNum(b blob.Ref) uint32 {
    75		return b.Sum32() % uint32(len(sto.shards))
    76	}
    77	
    78	func (sto *shardStorage) Fetch(ctx context.Context, b blob.Ref) (file io.ReadCloser, size uint32, err error) {
    79		return sto.shard(b).Fetch(ctx, b)
    80	}
    81	
    82	func (sto *shardStorage) ReceiveBlob(ctx context.Context, b blob.Ref, source io.Reader) (sb blob.SizedRef, err error) {
    83		return sto.shard(b).ReceiveBlob(ctx, b, source)
    84	}
    85	
    86	func (sto *shardStorage) batchedShards(ctx context.Context, blobs []blob.Ref, fn func(blobserver.Storage, []blob.Ref) error) error {
    87		m := make(map[uint32][]blob.Ref)
    88		for _, b := range blobs {
    89			sn := sto.shardNum(b)
    90			m[sn] = append(m[sn], b)
    91		}
    92		ch := make(chan error, len(m))
    93		for sn := range m {
    94			sblobs := m[sn]
    95			s := sto.shards[sn]
    96			go func() {
    97				ch <- fn(s, sblobs)
    98			}()
    99		}
   100		var reterr error
   101		for range m {
   102			if err := <-ch; err != nil {
   103				reterr = err
   104			}
   105		}
   106		return reterr
   107	}
   108	
   109	func (sto *shardStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
   110		return sto.batchedShards(context.TODO(), blobs, func(s blobserver.Storage, blobs []blob.Ref) error {
   111			return s.RemoveBlobs(ctx, blobs)
   112		})
   113	}
   114	
   115	func (sto *shardStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
   116		var (
   117			fnMu   sync.Mutex // serializes calls to fn, guards failed
   118			failed bool
   119		)
   120		// TODO: do a context.WithCancel and abort all shards' context
   121		// once one fails, but don't do that until we can guarantee
   122		// that the first failure we report is the real one, not
   123		// another goroutine getting its context canceled before our
   124		// real first failure returns from its goroutine. That is, we
   125		// should use golang.org/x/sync/errgroup, but we need to
   126		// integrate it with batchedShards and audit callers. Or not
   127		// use batchedShards here, or only use batchedShards to
   128		// collect work to do and then use errgroup directly ourselves
   129		// here.
   130		return sto.batchedShards(ctx, blobs, func(s blobserver.Storage, blobs []blob.Ref) error {
   131			return s.StatBlobs(ctx, blobs, func(sb blob.SizedRef) error {
   132				fnMu.Lock()
   133				defer fnMu.Unlock()
   134				if failed {
   135					return nil
   136				}
   137				if err := fn(sb); err != nil {
   138					failed = true
   139					return err
   140				}
   141				return nil
   142			})
   143		})
   144	}
   145	
   146	func (sto *shardStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
   147		return blobserver.MergedEnumerateStorage(ctx, dest, sto.shards, after, limit)
   148	}
   149	
   150	func init() {
   151		blobserver.RegisterStorageConstructor("shard", blobserver.StorageConstructor(newFromConfig))
   152	}
Website layout inspired by memcached.
Content by the authors.