1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17
18 package storagetest
19
20 import (
21 "context"
22 "errors"
23 "fmt"
24 "io"
25 "reflect"
26 "sort"
27 "strconv"
28 "strings"
29 "testing"
30 "time"
31
32 "perkeep.org/pkg/blob"
33 "perkeep.org/pkg/blobserver"
34 "perkeep.org/pkg/test"
35
36 "go4.org/syncutil"
37 )
38
39 type Opts struct {
40
41 New func(*testing.T) blobserver.Storage
42
43
44
45 Retries []time.Duration
46
47 SkipEnum bool
48 }
49
50 func Test(t *testing.T, fn func(*testing.T) blobserver.Storage) {
51 TestOpt(t, Opts{New: fn})
52 }
53
54 type run struct {
55 t *testing.T
56 opt Opts
57 sto blobserver.Storage
58 }
59
60 func TestOpt(t *testing.T, opt Opts) {
61 sto := opt.New(t)
62 r := &run{
63 t: t,
64 opt: opt,
65 sto: sto,
66 }
67 t.Logf("Testing blobserver storage %T", sto)
68
69 t.Logf("Testing Enumerate for empty")
70 r.testEnumerate(nil)
71
72 t.Logf("Test stat of blob not existing")
73 {
74 b := &test.Blob{Contents: "not exist"}
75 blobRefs := []blob.Ref{b.BlobRef()}
76 testStat(t, sto, blobRefs, nil)
77 }
78
79 var blobs []*test.Blob
80 var blobRefs []blob.Ref
81 var blobSizedRefs []blob.SizedRef
82
83 contents := []string{"foo", "quux", "asdf", "qwerty", "0123456789"}
84 if !testing.Short() {
85 for i := 0; i < 95; i++ {
86 contents = append(contents, "foo-"+strconv.Itoa(i))
87 }
88 }
89
90 t.Logf("Testing receive")
91 for i, x := range contents {
92 b1 := &test.Blob{Contents: x}
93 if testing.Short() {
94 t.Logf("blob[%d] = %s: %q", i, b1.BlobRef(), x)
95 }
96 b1s, err := sto.ReceiveBlob(context.Background(), b1.BlobRef(), b1.Reader())
97 if err != nil {
98 t.Fatalf("ReceiveBlob of %s: %v", b1, err)
99 }
100 if b1s != b1.SizedRef() {
101 t.Fatalf("Received %v; want %v", b1s, b1.SizedRef())
102 }
103 blobs = append(blobs, b1)
104 blobRefs = append(blobRefs, b1.BlobRef())
105 blobSizedRefs = append(blobSizedRefs, b1.SizedRef())
106
107 switch len(blobSizedRefs) {
108 case 1, 5, 100:
109 t.Logf("Testing Enumerate for %d blobs", len(blobSizedRefs))
110 r.testEnumerate(blobSizedRefs)
111 }
112 }
113
114 t.Logf("Testing Fetch")
115 for i, b2 := range blobs {
116 rc, size, err := sto.Fetch(context.Background(), b2.BlobRef())
117 if err != nil {
118 t.Fatalf("error fetching %d. %s: %v", i, b2, err)
119 }
120 testSizedBlob(t, rc, b2.BlobRef(), int64(size))
121 rc.Close()
122 }
123
124 t.Logf("Testing Stat")
125 testStat(t, sto, blobRefs, blobSizedRefs)
126
127
128 sort.Sort(blob.SizedByRef(blobSizedRefs))
129
130 t.Logf("Testing Enumerate on all")
131 r.testEnumerate(blobSizedRefs)
132
133 t.Logf("Testing Enumerate 'limit' param")
134 r.testEnumerate(blobSizedRefs[:3], 3)
135
136
137 {
138 after := blobSizedRefs[2].Ref.String()
139 t.Logf("Testing Enumerate 'after' param; after %q", after)
140 r.testEnumerate(blobSizedRefs[3:], after)
141 }
142
143
144 {
145 after := blobSizedRefs[2].Ref.String()
146 t.Logf("Testing Enumerate 'after' + 'limit' param; after %q, limit 1", after)
147 r.testEnumerate(blobSizedRefs[3:4], after, 1)
148 }
149
150
151 {
152 after := "a"
153 t.Logf("Testing Enumerate 'after' + 'limit' param; after %q, limit 1", after)
154 r.testEnumerate(blobSizedRefs[:1], after, 1)
155 }
156
157 r.testRemove(blobRefs)
158
159 r.testSubFetcher()
160 }
161
162 func (r *run) testRemove(blobRefs []blob.Ref) {
163 ctx := context.Background()
164 t, sto := r.t, r.sto
165 t.Logf("Testing Remove")
166 if err := sto.RemoveBlobs(ctx, blobRefs); err != nil {
167 if strings.Contains(err.Error(), "not implemented") {
168 t.Logf("RemoveBlobs: %v", err)
169 return
170 }
171 t.Fatalf("RemoveBlobs: %v", err)
172 }
173 r.testEnumerate(nil)
174 if len(blobRefs) > 0 {
175 t.Logf("Testing double-delete")
176 if err := sto.RemoveBlobs(ctx, []blob.Ref{blobRefs[0]}); err != nil {
177 t.Fatalf("Double RemoveBlobs: %v", err)
178 }
179 }
180 }
181
182 func (r *run) testSubFetcher() {
183 t, sto := r.t, r.sto
184 sf, ok := sto.(blob.SubFetcher)
185 if !ok {
186 t.Logf("%T is not a SubFetcher", sto)
187 return
188 }
189 t.Logf("Testing SubFetch")
190 big := &test.Blob{Contents: "Some big blob"}
191 if _, err := sto.ReceiveBlob(context.Background(), big.BlobRef(), big.Reader()); err != nil {
192 t.Fatal(err)
193 }
194 regions := []struct {
195 off, limit int64
196 want string
197 errok bool
198 }{
199 {5, 3, "big", false},
200 {5, 8, "big blob", false},
201 {5, 100, "big blob", true},
202 }
203 for _, tt := range regions {
204 r, err := sf.SubFetch(context.Background(), big.BlobRef(), tt.off, tt.limit)
205 if errors.Is(err, blob.ErrUnimplemented) {
206 t.Logf("%T implements SubFetcher but its wrapped value doesn't", sto)
207 return
208 }
209 if err != nil {
210 t.Fatalf("Error fetching big blob for SubFetch: %v", err)
211 }
212 if r == nil {
213 t.Fatal("SubFetch returned nil, nil")
214 }
215 all, err := io.ReadAll(r)
216 r.Close()
217 if err != nil && !tt.errok {
218 t.Errorf("Unexpected error reading SubFetch region %+v: %v", tt, err)
219 }
220 if string(all) != tt.want {
221 t.Errorf("SubFetch region %+v got %q; want %q", tt, all, tt.want)
222 }
223 }
224
225
226 invalids := []struct {
227 off, limit int64
228 }{
229 {int64(len(big.Contents)) + 1, 1},
230 {-1, 1},
231 {1, -1},
232 }
233 for _, tt := range invalids {
234 r, err := sf.SubFetch(context.Background(), big.BlobRef(), tt.off, tt.limit)
235 if errors.Is(err, blob.ErrUnimplemented) {
236 t.Logf("%T implements SubFetcher but its wrapped value doesn't", sto)
237 return
238 }
239 if err == nil {
240 r.Close()
241 t.Errorf("No error fetching with off=%d limit=%d; wanted an error", tt.off, tt.limit)
242 continue
243 }
244 if !errors.Is(err, blob.ErrNegativeSubFetch) && !errors.Is(err, blob.ErrOutOfRangeOffsetSubFetch) {
245 t.Errorf("Unexpected error fetching with off=%d limit=%d: %v", tt.off, tt.limit, err)
246 }
247 }
248 }
249
250 func testSizedBlob(t *testing.T, r io.Reader, b1 blob.Ref, size int64) {
251 h := b1.Hash()
252 n, err := io.Copy(h, r)
253 if err != nil {
254 t.Fatalf("error reading from %s: %v", r, err)
255 }
256 if n != size {
257 t.Fatalf("read %d bytes from %s, metadata said %d!", n, r, size)
258 }
259 b2 := blob.RefFromHash(h)
260 if b2 != b1 {
261 t.Fatalf("content mismatch (awaited %s, got %s)", b1, b2)
262 }
263 }
264
265 func CheckEnumerate(sto blobserver.Storage, wantUnsorted []blob.SizedRef, opts ...interface{}) error {
266 var after string
267 var n = 1000
268 for _, opt := range opts {
269 switch v := opt.(type) {
270 case string:
271 after = v
272 case int:
273 n = v
274 default:
275 panic("bad option of type " + fmt.Sprintf("%T", v))
276 }
277 }
278
279 want := append([]blob.SizedRef(nil), wantUnsorted...)
280 sort.Sort(blob.SizedByRef(want))
281
282 sbc := make(chan blob.SizedRef, 10)
283
284 var got []blob.SizedRef
285 var grp syncutil.Group
286 sawEnd := make(chan bool, 1)
287 grp.Go(func() error {
288 ctx, cancel := context.WithCancel(context.TODO())
289 defer cancel()
290 if err := sto.EnumerateBlobs(ctx, sbc, after, n); err != nil {
291 return fmt.Errorf("EnumerateBlobs(%q, %d): %w", after, n, err)
292 }
293 return nil
294 })
295 grp.Go(func() error {
296 var lastRef blob.Ref
297 for sb := range sbc {
298 if !sb.Valid() {
299 return fmt.Errorf("invalid blobref %#v received in enumerate", sb)
300 }
301 got = append(got, sb)
302 if lastRef.Valid() && sb.Ref.Less(lastRef) {
303 return fmt.Errorf("blobs appearing out of order")
304 }
305 lastRef = sb.Ref
306 }
307 sawEnd <- true
308 return nil
309
310 })
311 grp.Go(func() error {
312 select {
313 case <-sawEnd:
314 return nil
315 case <-time.After(10 * time.Second):
316 return errors.New("timeout waiting for EnumerateBlobs to close its channel")
317 }
318
319 })
320 if err := grp.Err(); err != nil {
321 return fmt.Errorf("Enumerate error: %w", err)
322 }
323 if len(got) == 0 && len(want) == 0 {
324 return nil
325 }
326 var gotSet = map[blob.SizedRef]bool{}
327 for _, sb := range got {
328 if gotSet[sb] {
329 return fmt.Errorf("duplicate blob %v returned in enumerate", sb)
330 }
331 gotSet[sb] = true
332 }
333
334 if !reflect.DeepEqual(got, want) {
335 return fmt.Errorf("enumerate mismatch. Got %d; want %d.\n Got: %v\nWant: %v\n",
336 len(got), len(want), got, want)
337 }
338 return nil
339 }
340
341 func (r *run) testEnumerate(wantUnsorted []blob.SizedRef, opts ...interface{}) {
342 if r.opt.SkipEnum {
343 r.t.Log("Skipping enum test")
344 return
345 }
346 if err := r.withRetries(func() error {
347 return CheckEnumerate(r.sto, wantUnsorted, opts...)
348 }); err != nil {
349 r.t.Fatalf("%v", err)
350 }
351 }
352
353 func (r *run) withRetries(fn func() error) error {
354 delays := r.opt.Retries
355 for {
356 err := fn()
357 if err == nil || len(delays) == 0 {
358 return err
359 }
360 r.t.Logf("(operation failed; retrying after %v)", delays[0])
361 time.Sleep(delays[0])
362 delays = delays[1:]
363 }
364 }
365
366 func testStat(t *testing.T, sto blobserver.BlobStatter, blobs []blob.Ref, want []blob.SizedRef) {
367
368 pos := make(map[blob.Ref]int)
369 need := make(map[blob.Ref]bool)
370 for i, sb := range want {
371 pos[sb.Ref] = i
372 need[sb.Ref] = true
373 }
374
375 err := sto.StatBlobs(context.Background(), blobs, func(sb blob.SizedRef) error {
376 if !sb.Valid() {
377 t.Errorf("StatBlobs func called with invalid/zero blob.SizedRef")
378 return nil
379 }
380 wantPos, ok := pos[sb.Ref]
381 if !ok {
382 t.Errorf("StatBlobs func called with unrequested ref %v (size %d)", sb.Ref, sb.Size)
383 return nil
384 }
385 if !need[sb.Ref] {
386 t.Errorf("StatBlobs func called with ref %v multiple times", sb.Ref)
387 return nil
388 }
389 delete(need, sb.Ref)
390 w := want[wantPos]
391 if sb != w {
392 t.Errorf("StatBlobs returned %v; want %v", sb, w)
393 }
394 return nil
395 })
396 for br := range need {
397 t.Errorf("StatBlobs never returned results for %v", br)
398 }
399 if err != nil {
400 t.Errorf("StatBlobs: %v", err)
401 }
402 }
403
404 type StreamerTestOpt interface {
405 verify(got []blob.SizedRef) error
406 }
407
408
409 type WantN int
410
411 func (want WantN) verify(got []blob.SizedRef) error {
412 if int(want) != len(got) {
413 return fmt.Errorf("got %d streamed blobs; want %d", len(got), int(want))
414 }
415 return nil
416 }
417
418 type WantSizedRefs []blob.SizedRef
419
420 func (s WantSizedRefs) verify(got []blob.SizedRef) error {
421 want := []blob.SizedRef(s)
422 if !reflect.DeepEqual(got, want) {
423 return fmt.Errorf("mismatch:\n got %d blobs: %q\nwant %d blobs: %q\n", len(got), got, len(want), want)
424 }
425 return nil
426 }
427
428
429
430
431
432
433
434 func TestStreamer(t *testing.T, bs blobserver.BlobStreamer, opts ...StreamerTestOpt) {
435
436 var sawEnum map[blob.SizedRef]bool
437 if enumer, ok := bs.(blobserver.BlobEnumerator); ok {
438 sawEnum = make(map[blob.SizedRef]bool)
439
440
441 enumCtx, cancel := context.WithCancel(context.TODO())
442 defer cancel()
443 if err := blobserver.EnumerateAll(enumCtx, enumer, func(sb blob.SizedRef) error {
444 sawEnum[sb] = true
445 return nil
446 }); err != nil {
447 t.Fatalf("Enumerate: %v", err)
448 }
449 }
450
451
452
453 ch := make(chan blobserver.BlobAndToken)
454 errCh := make(chan error, 1)
455 go func() {
456 ctx, cancel := context.WithCancel(context.TODO())
457 defer cancel()
458 errCh <- bs.StreamBlobs(ctx, ch, "")
459 }()
460 var gotRefs []blob.SizedRef
461 sawStreamed := map[blob.Ref]int{}
462 for b := range ch {
463 sawStreamed[b.Ref()]++
464 sbr := b.SizedRef()
465 if sawEnum != nil {
466 if _, ok := sawEnum[sbr]; ok {
467 delete(sawEnum, sbr)
468 } else {
469 t.Errorf("Streamer yielded blob not returned by Enumerate: %v", sbr)
470 }
471 }
472 gotRefs = append(gotRefs, sbr)
473 }
474 if err := <-errCh; err != nil {
475 t.Errorf("initial uninterrupted StreamBlobs error: %v", err)
476 }
477 for br, n := range sawStreamed {
478 if n > 1 {
479 t.Errorf("Streamed returned duplicate %v, %d times", br, n)
480 }
481 }
482 nMissing := 0
483 for sbr := range sawEnum {
484 t.Errorf("Enumerate found %v but Streamer didn't return it", sbr)
485 nMissing++
486 if nMissing == 10 && len(sawEnum) > 10 {
487 t.Errorf("... etc ...")
488 break
489 }
490 }
491 for _, opt := range opts {
492 if err := opt.verify(gotRefs); err != nil {
493 t.Errorf("error after first uninterrupted StreamBlobs pass: %v", err)
494 }
495 }
496 if t.Failed() {
497 return
498 }
499
500
501
502
503
504
505
506
507
508
509 wantRefs := append([]blob.SizedRef(nil), gotRefs...)
510 sawStreamed = map[blob.Ref]int{}
511 gotRefs = gotRefs[:0]
512 contToken := ""
513 for i := 0; i < len(wantRefs); i++ {
514 ctx, cancel := context.WithCancel(context.TODO())
515 defer cancel()
516 ch := make(chan blobserver.BlobAndToken)
517 errc := make(chan error, 1)
518 go func() {
519 errc <- bs.StreamBlobs(ctx, ch, contToken)
520 }()
521 nrecv := 0
522 nextToken := ""
523 for bt := range ch {
524 nrecv++
525 sbr := bt.Blob.SizedRef()
526 isNew := len(gotRefs) == 0 || sbr != gotRefs[len(gotRefs)-1]
527 if isNew {
528 if sawStreamed[sbr.Ref] > 0 {
529 t.Fatalf("In complex pass, returned duplicate blob %v\n\nSo far, before interrupting:\n%v\n\nWant:\n%v", sbr, gotRefs, wantRefs)
530 }
531 sawStreamed[sbr.Ref]++
532 gotRefs = append(gotRefs, sbr)
533 nextToken = bt.Token
534 cancel()
535 break
536 } else if i == 0 {
537 t.Fatalf("first iteration should receive a new value")
538 } else if nrecv == 2 {
539 t.Fatalf("at cut point %d of testStream, Streamer received 2 values, both not unique. Looping?", i)
540 }
541 }
542 err := <-errc
543 if err != nil && err != context.Canceled {
544 t.Fatalf("StreamBlobs on iteration %d (token %q) returned error: %v", i, contToken, err)
545 }
546 if err == nil {
547 break
548 }
549 contToken = nextToken
550 }
551 if !reflect.DeepEqual(gotRefs, wantRefs) {
552 t.Errorf("Mismatch on complex pass (got %d, want %d):\n got %q\nwant %q\n", len(gotRefs), len(wantRefs), gotRefs, wantRefs)
553 wantMap := map[blob.SizedRef]bool{}
554 for _, sbr := range wantRefs {
555 wantMap[sbr] = true
556 }
557 for _, sbr := range gotRefs {
558 if _, ok := wantMap[sbr]; ok {
559 delete(wantMap, sbr)
560 } else {
561 t.Errorf("got has unwanted: %v", sbr)
562 }
563 }
564 missing := wantMap
565 for sbr := range missing {
566 t.Errorf("got is missing: %v", sbr)
567 }
568
569 t.FailNow()
570 }
571 }