1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
35 package replica
36
37 import (
38 "bytes"
39 "context"
40 "errors"
41 "fmt"
42 "io"
43 "log"
44 "os"
45 "sync"
46 "time"
47
48 "golang.org/x/sync/errgroup"
49
50 "go4.org/jsonconfig"
51 "perkeep.org/pkg/blob"
52 "perkeep.org/pkg/blobserver"
53 )
54
55 var (
56 _ blobserver.Generationer = (*replicaStorage)(nil)
57 _ blobserver.WholeRefFetcher = (*replicaStorage)(nil)
58 )
59
60 const buffered = 8
61
62 type replicaStorage struct {
63
64 replicaPrefixes []string
65 replicas []blobserver.Storage
66
67
68 readPrefixes []string
69 readReplicas []blobserver.Storage
70
71
72
73 minWritesForSuccess int
74 }
75
76
77
78 func NewForTest(sto []blobserver.Storage) blobserver.Storage {
79 sto = append([]blobserver.Storage(nil), sto...)
80 names := make([]string, len(sto))
81 for i := range names {
82 names[i] = "/unknown-prefix/"
83 }
84 return &replicaStorage{
85 replicaPrefixes: names,
86 replicas: sto,
87 readPrefixes: names,
88 readReplicas: sto,
89 minWritesForSuccess: len(sto),
90 }
91 }
92
93 func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err error) {
94 sto := &replicaStorage{
95 replicaPrefixes: config.RequiredList("backends"),
96 readPrefixes: config.OptionalList("readBackends"),
97 }
98 nReplicas := len(sto.replicaPrefixes)
99 sto.minWritesForSuccess = config.OptionalInt("minWritesForSuccess", nReplicas)
100 if err := config.Validate(); err != nil {
101 return nil, err
102 }
103 if nReplicas == 0 {
104 return nil, errors.New("replica: need at least one replica")
105 }
106 if sto.minWritesForSuccess == 0 {
107 sto.minWritesForSuccess = nReplicas
108 }
109
110 if len(sto.readPrefixes) == 0 {
111 sto.readPrefixes = sto.replicaPrefixes
112 }
113
114 for _, prefix := range sto.replicaPrefixes {
115 s, err := ld.GetStorage(prefix)
116 if err != nil {
117
118
119 h, _ := ld.GetHandler(prefix)
120 var ok bool
121 if s, ok = h.(blobserver.Storage); !ok {
122 return nil, err
123 }
124 }
125 sto.replicas = append(sto.replicas, s)
126 }
127 for _, prefix := range sto.readPrefixes {
128 s, err := ld.GetStorage(prefix)
129 if err != nil {
130 return nil, err
131 }
132 sto.readReplicas = append(sto.readReplicas, s)
133 }
134 return sto, nil
135 }
136
137 func (sto *replicaStorage) Fetch(ctx context.Context, b blob.Ref) (file io.ReadCloser, size uint32, err error) {
138
139 for _, replica := range sto.readReplicas {
140 file, size, err = replica.Fetch(ctx, b)
141 if err == nil {
142 return
143 }
144 }
145 return
146 }
147
148
149 func (sto *replicaStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
150 var (
151 mu sync.Mutex
152 need = make(map[blob.Ref]bool)
153 failed bool
154 )
155 for _, br := range blobs {
156 need[br] = true
157 }
158
159 group, ctx := errgroup.WithContext(ctx)
160
161 for _, replica := range sto.readReplicas {
162 replica := replica
163 group.Go(func() error {
164 return replica.StatBlobs(ctx, blobs, func(sb blob.SizedRef) error {
165 mu.Lock()
166 defer mu.Unlock()
167 if failed {
168 return nil
169 }
170 if !need[sb.Ref] {
171
172 return nil
173 }
174 delete(need, sb.Ref)
175 if err := fn(sb); err != nil {
176 failed = true
177 return err
178 }
179 return nil
180 })
181 })
182 }
183
184 return group.Wait()
185 }
186
187 type sizedBlobAndError struct {
188 idx int
189 sb blob.SizedRef
190 err error
191 }
192
193 func (sto *replicaStorage) ReceiveBlob(ctx context.Context, br blob.Ref, src io.Reader) (_ blob.SizedRef, err error) {
194
195 var buf bytes.Buffer
196 size, err := io.Copy(&buf, src)
197 if err != nil {
198 return
199 }
200
201 nReplicas := len(sto.replicas)
202 resc := make(chan sizedBlobAndError, nReplicas)
203 uploadToReplica := func(idx int, dst blobserver.BlobReceiver) {
204
205
206 sb, err := blobserver.ReceiveNoHash(ctx, dst, br, bytes.NewReader(buf.Bytes()))
207 resc <- sizedBlobAndError{idx, sb, err}
208 }
209 for idx, replica := range sto.replicas {
210 go uploadToReplica(idx, replica)
211 }
212
213 nSuccess := 0
214 var fails []sizedBlobAndError
215 for range sto.replicas {
216 res := <-resc
217 switch {
218 case res.err == nil && int64(res.sb.Size) == size:
219 nSuccess++
220 if nSuccess == sto.minWritesForSuccess {
221 return res.sb, nil
222 }
223 case res.err == nil:
224 err = fmt.Errorf("replica: upload shard reported size %d, expected %d", res.sb.Size, size)
225 res.err = err
226 fails = append(fails, res)
227 default:
228 err = res.err
229 fails = append(fails, res)
230 }
231 }
232 for _, res := range fails {
233 log.Printf("replica: receiving blob %v, %d successes, %d failures; backend %s reported: %v",
234 br,
235 nSuccess, len(fails),
236 sto.replicaPrefixes[res.idx], res.err)
237 }
238 return
239 }
240
241 func (sto *replicaStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
242 errch := make(chan error, buffered)
243 removeFrom := func(s blobserver.Storage) {
244 errch <- s.RemoveBlobs(ctx, blobs)
245 }
246 for _, replica := range sto.replicas {
247 go removeFrom(replica)
248 }
249 var reterr error
250 nSuccess := 0
251 for range sto.replicas {
252 if err := <-errch; err != nil {
253 reterr = err
254 } else {
255 nSuccess++
256 }
257 }
258 if nSuccess > 0 {
259
260
261
262 return nil
263 }
264 return reterr
265 }
266
267 func (sto *replicaStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
268 return blobserver.MergedEnumerateStorage(ctx, dest, sto.readReplicas, after, limit)
269 }
270
271 func (sto *replicaStorage) ResetStorageGeneration() error {
272 var ret error
273 n := 0
274 for _, replica := range sto.replicas {
275 if g, ok := replica.(blobserver.Generationer); ok {
276 n++
277 if err := g.ResetStorageGeneration(); err != nil && ret == nil {
278 ret = err
279 }
280 }
281 }
282 if n == 0 {
283 return errors.New("ResetStorageGeneration not supported")
284 }
285 return ret
286 }
287
288 func (sto *replicaStorage) StorageGeneration() (initTime time.Time, random string, err error) {
289 var buf bytes.Buffer
290 n := 0
291 for _, replica := range sto.replicas {
292 if g, ok := replica.(blobserver.Generationer); ok {
293 n++
294 rt, rrand, rerr := g.StorageGeneration()
295 if rerr != nil {
296 err = rerr
297 } else {
298 if rt.After(initTime) {
299
300
301 initTime = rt
302 }
303 if buf.Len() != 0 {
304 buf.WriteByte('/')
305 }
306 buf.WriteString(rrand)
307 }
308 }
309 }
310 if n == 0 {
311 err = errors.New("No replicas support StorageGeneration")
312 }
313 return initTime, buf.String(), err
314 }
315
316 func (sto *replicaStorage) OpenWholeRef(wholeRef blob.Ref, offset int64) (rc io.ReadCloser, wholeSize int64, err error) {
317
318 for _, replica := range sto.readReplicas {
319 if v, ok := replica.(blobserver.WholeRefFetcher); ok {
320 rc, wholeSize, err = v.OpenWholeRef(wholeRef, offset)
321 if err == nil {
322 return
323 }
324 }
325 }
326 if err == nil {
327 err = os.ErrNotExist
328 }
329 return
330 }
331
332 func init() {
333 blobserver.RegisterStorageConstructor("replica", blobserver.StorageConstructor(newFromConfig))
334 }