1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 18 19 20 21 22 23 24 25 26 27 28 29 30
31 package shard
32
33 import (
34 "context"
35 "errors"
36 "io"
37 "sync"
38
39 "go4.org/jsonconfig"
40 "perkeep.org/pkg/blob"
41 "perkeep.org/pkg/blobserver"
42 )
43
44 type shardStorage struct {
45 shardPrefixes []string
46 shards []blobserver.Storage
47 }
48
49 func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err error) {
50 sto := &shardStorage{
51 shardPrefixes: config.RequiredList("backends"),
52 }
53 if err := config.Validate(); err != nil {
54 return nil, err
55 }
56 if len(sto.shardPrefixes) == 0 {
57 return nil, errors.New("shard: need at least one shard")
58 }
59 sto.shards = make([]blobserver.Storage, len(sto.shardPrefixes))
60 for i, prefix := range sto.shardPrefixes {
61 shardSto, err := ld.GetStorage(prefix)
62 if err != nil {
63 return nil, err
64 }
65 sto.shards[i] = shardSto
66 }
67 return sto, nil
68 }
69
70 func (sto *shardStorage) shard(b blob.Ref) blobserver.Storage {
71 return sto.shards[int(sto.shardNum(b))]
72 }
73
74 func (sto *shardStorage) shardNum(b blob.Ref) uint32 {
75 return b.Sum32() % uint32(len(sto.shards))
76 }
77
78 func (sto *shardStorage) Fetch(ctx context.Context, b blob.Ref) (file io.ReadCloser, size uint32, err error) {
79 return sto.shard(b).Fetch(ctx, b)
80 }
81
82 func (sto *shardStorage) ReceiveBlob(ctx context.Context, b blob.Ref, source io.Reader) (sb blob.SizedRef, err error) {
83 return sto.shard(b).ReceiveBlob(ctx, b, source)
84 }
85
86 func (sto *shardStorage) batchedShards(ctx context.Context, blobs []blob.Ref, fn func(blobserver.Storage, []blob.Ref) error) error {
87 m := make(map[uint32][]blob.Ref)
88 for _, b := range blobs {
89 sn := sto.shardNum(b)
90 m[sn] = append(m[sn], b)
91 }
92 ch := make(chan error, len(m))
93 for sn := range m {
94 sblobs := m[sn]
95 s := sto.shards[sn]
96 go func() {
97 ch <- fn(s, sblobs)
98 }()
99 }
100 var reterr error
101 for range m {
102 if err := <-ch; err != nil {
103 reterr = err
104 }
105 }
106 return reterr
107 }
108
109 func (sto *shardStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
110 return sto.batchedShards(context.TODO(), blobs, func(s blobserver.Storage, blobs []blob.Ref) error {
111 return s.RemoveBlobs(ctx, blobs)
112 })
113 }
114
115 func (sto *shardStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
116 var (
117 fnMu sync.Mutex
118 failed bool
119 )
120
121
122
123
124
125
126
127
128
129
130 return sto.batchedShards(ctx, blobs, func(s blobserver.Storage, blobs []blob.Ref) error {
131 return s.StatBlobs(ctx, blobs, func(sb blob.SizedRef) error {
132 fnMu.Lock()
133 defer fnMu.Unlock()
134 if failed {
135 return nil
136 }
137 if err := fn(sb); err != nil {
138 failed = true
139 return err
140 }
141 return nil
142 })
143 })
144 }
145
146 func (sto *shardStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
147 return blobserver.MergedEnumerateStorage(ctx, dest, sto.shards, after, limit)
148 }
149
150 func init() {
151 blobserver.RegisterStorageConstructor("shard", blobserver.StorageConstructor(newFromConfig))
152 }