1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17
18 package storagetest
19
20 import (
21 "context"
22 "errors"
23 "fmt"
24 "io"
25 "reflect"
26 "sort"
27 "strconv"
28 "strings"
29 "testing"
30 "time"
31
32 "perkeep.org/pkg/blob"
33 "perkeep.org/pkg/blobserver"
34 "perkeep.org/pkg/test"
35
36 "go4.org/syncutil"
37 )
38
39 type Opts struct {
40
41
42 New func(*testing.T) (sto blobserver.Storage, cleanup func())
43
44
45
46 Retries []time.Duration
47
48 SkipEnum bool
49 }
50
51 func Test(t *testing.T, fn func(*testing.T) (sto blobserver.Storage, cleanup func())) {
52 TestOpt(t, Opts{New: fn})
53 }
54
55 type run struct {
56 t *testing.T
57 opt Opts
58 sto blobserver.Storage
59 }
60
61 func TestOpt(t *testing.T, opt Opts) {
62 sto, cleanup := opt.New(t)
63 defer func() {
64 if t.Failed() {
65 t.Logf("test %T FAILED, skipping cleanup!", sto)
66 } else {
67 if cleanup != nil {
68 cleanup()
69 }
70 }
71 }()
72 r := &run{
73 t: t,
74 opt: opt,
75 sto: sto,
76 }
77 t.Logf("Testing blobserver storage %T", sto)
78
79 t.Logf("Testing Enumerate for empty")
80 r.testEnumerate(nil)
81
82 t.Logf("Test stat of blob not existing")
83 {
84 b := &test.Blob{Contents: "not exist"}
85 blobRefs := []blob.Ref{b.BlobRef()}
86 testStat(t, sto, blobRefs, nil)
87 }
88
89 var blobs []*test.Blob
90 var blobRefs []blob.Ref
91 var blobSizedRefs []blob.SizedRef
92
93 contents := []string{"foo", "quux", "asdf", "qwerty", "0123456789"}
94 if !testing.Short() {
95 for i := 0; i < 95; i++ {
96 contents = append(contents, "foo-"+strconv.Itoa(i))
97 }
98 }
99
100 t.Logf("Testing receive")
101 for i, x := range contents {
102 b1 := &test.Blob{Contents: x}
103 if testing.Short() {
104 t.Logf("blob[%d] = %s: %q", i, b1.BlobRef(), x)
105 }
106 b1s, err := sto.ReceiveBlob(context.Background(), b1.BlobRef(), b1.Reader())
107 if err != nil {
108 t.Fatalf("ReceiveBlob of %s: %v", b1, err)
109 }
110 if b1s != b1.SizedRef() {
111 t.Fatalf("Received %v; want %v", b1s, b1.SizedRef())
112 }
113 blobs = append(blobs, b1)
114 blobRefs = append(blobRefs, b1.BlobRef())
115 blobSizedRefs = append(blobSizedRefs, b1.SizedRef())
116
117 switch len(blobSizedRefs) {
118 case 1, 5, 100:
119 t.Logf("Testing Enumerate for %d blobs", len(blobSizedRefs))
120 r.testEnumerate(blobSizedRefs)
121 }
122 }
123
124 t.Logf("Testing Fetch")
125 for i, b2 := range blobs {
126 rc, size, err := sto.Fetch(context.Background(), b2.BlobRef())
127 if err != nil {
128 t.Fatalf("error fetching %d. %s: %v", i, b2, err)
129 }
130 testSizedBlob(t, rc, b2.BlobRef(), int64(size))
131 rc.Close()
132 }
133
134 t.Logf("Testing Stat")
135 testStat(t, sto, blobRefs, blobSizedRefs)
136
137
138 sort.Sort(blob.SizedByRef(blobSizedRefs))
139
140 t.Logf("Testing Enumerate on all")
141 r.testEnumerate(blobSizedRefs)
142
143 t.Logf("Testing Enumerate 'limit' param")
144 r.testEnumerate(blobSizedRefs[:3], 3)
145
146
147 {
148 after := blobSizedRefs[2].Ref.String()
149 t.Logf("Testing Enumerate 'after' param; after %q", after)
150 r.testEnumerate(blobSizedRefs[3:], after)
151 }
152
153
154 {
155 after := blobSizedRefs[2].Ref.String()
156 t.Logf("Testing Enumerate 'after' + 'limit' param; after %q, limit 1", after)
157 r.testEnumerate(blobSizedRefs[3:4], after, 1)
158 }
159
160
161 {
162 after := "a"
163 t.Logf("Testing Enumerate 'after' + 'limit' param; after %q, limit 1", after)
164 r.testEnumerate(blobSizedRefs[:1], after, 1)
165 }
166
167 r.testRemove(blobRefs)
168
169 r.testSubFetcher()
170 }
171
172 func (r *run) testRemove(blobRefs []blob.Ref) {
173 ctx := context.Background()
174 t, sto := r.t, r.sto
175 t.Logf("Testing Remove")
176 if err := sto.RemoveBlobs(ctx, blobRefs); err != nil {
177 if strings.Contains(err.Error(), "not implemented") {
178 t.Logf("RemoveBlobs: %v", err)
179 return
180 }
181 t.Fatalf("RemoveBlobs: %v", err)
182 }
183 r.testEnumerate(nil)
184 if len(blobRefs) > 0 {
185 t.Logf("Testing double-delete")
186 if err := sto.RemoveBlobs(ctx, []blob.Ref{blobRefs[0]}); err != nil {
187 t.Fatalf("Double RemoveBlobs: %v", err)
188 }
189 }
190 }
191
192 func (r *run) testSubFetcher() {
193 t, sto := r.t, r.sto
194 sf, ok := sto.(blob.SubFetcher)
195 if !ok {
196 t.Logf("%T is not a SubFetcher", sto)
197 return
198 }
199 t.Logf("Testing SubFetch")
200 big := &test.Blob{Contents: "Some big blob"}
201 if _, err := sto.ReceiveBlob(context.Background(), big.BlobRef(), big.Reader()); err != nil {
202 t.Fatal(err)
203 }
204 regions := []struct {
205 off, limit int64
206 want string
207 errok bool
208 }{
209 {5, 3, "big", false},
210 {5, 8, "big blob", false},
211 {5, 100, "big blob", true},
212 }
213 for _, tt := range regions {
214 r, err := sf.SubFetch(context.Background(), big.BlobRef(), tt.off, tt.limit)
215 if err == blob.ErrUnimplemented {
216 t.Logf("%T implements SubFetcher but its wrapped value doesn't", sto)
217 return
218 }
219 if err != nil {
220 t.Fatalf("Error fetching big blob for SubFetch: %v", err)
221 }
222 if r == nil {
223 t.Fatal("SubFetch returned nil, nil")
224 }
225 all, err := io.ReadAll(r)
226 r.Close()
227 if err != nil && !tt.errok {
228 t.Errorf("Unexpected error reading SubFetch region %+v: %v", tt, err)
229 }
230 if string(all) != tt.want {
231 t.Errorf("SubFetch region %+v got %q; want %q", tt, all, tt.want)
232 }
233 }
234
235
236 invalids := []struct {
237 off, limit int64
238 }{
239 {int64(len(big.Contents)) + 1, 1},
240 {-1, 1},
241 {1, -1},
242 }
243 for _, tt := range invalids {
244 r, err := sf.SubFetch(context.Background(), big.BlobRef(), tt.off, tt.limit)
245 if err == blob.ErrUnimplemented {
246 t.Logf("%T implements SubFetcher but its wrapped value doesn't", sto)
247 return
248 }
249 if err == nil {
250 r.Close()
251 t.Errorf("No error fetching with off=%d limit=%d; wanted an error", tt.off, tt.limit)
252 continue
253 }
254 if err != blob.ErrNegativeSubFetch && err != blob.ErrOutOfRangeOffsetSubFetch {
255 t.Errorf("Unexpected error fetching with off=%d limit=%d: %v", tt.off, tt.limit, err)
256 }
257 }
258 }
259
260 func testSizedBlob(t *testing.T, r io.Reader, b1 blob.Ref, size int64) {
261 h := b1.Hash()
262 n, err := io.Copy(h, r)
263 if err != nil {
264 t.Fatalf("error reading from %s: %v", r, err)
265 }
266 if n != size {
267 t.Fatalf("read %d bytes from %s, metadata said %d!", n, r, size)
268 }
269 b2 := blob.RefFromHash(h)
270 if b2 != b1 {
271 t.Fatalf("content mismatch (awaited %s, got %s)", b1, b2)
272 }
273 }
274
275 func CheckEnumerate(sto blobserver.Storage, wantUnsorted []blob.SizedRef, opts ...interface{}) error {
276 var after string
277 var n = 1000
278 for _, opt := range opts {
279 switch v := opt.(type) {
280 case string:
281 after = v
282 case int:
283 n = v
284 default:
285 panic("bad option of type " + fmt.Sprintf("%T", v))
286 }
287 }
288
289 want := append([]blob.SizedRef(nil), wantUnsorted...)
290 sort.Sort(blob.SizedByRef(want))
291
292 sbc := make(chan blob.SizedRef, 10)
293
294 var got []blob.SizedRef
295 var grp syncutil.Group
296 sawEnd := make(chan bool, 1)
297 grp.Go(func() error {
298 ctx, cancel := context.WithCancel(context.TODO())
299 defer cancel()
300 if err := sto.EnumerateBlobs(ctx, sbc, after, n); err != nil {
301 return fmt.Errorf("EnumerateBlobs(%q, %d): %v", after, n, err)
302 }
303 return nil
304 })
305 grp.Go(func() error {
306 var lastRef blob.Ref
307 for sb := range sbc {
308 if !sb.Valid() {
309 return fmt.Errorf("invalid blobref %#v received in enumerate", sb)
310 }
311 got = append(got, sb)
312 if lastRef.Valid() && sb.Ref.Less(lastRef) {
313 return fmt.Errorf("blobs appearing out of order")
314 }
315 lastRef = sb.Ref
316 }
317 sawEnd <- true
318 return nil
319
320 })
321 grp.Go(func() error {
322 select {
323 case <-sawEnd:
324 return nil
325 case <-time.After(10 * time.Second):
326 return errors.New("timeout waiting for EnumerateBlobs to close its channel")
327 }
328
329 })
330 if err := grp.Err(); err != nil {
331 return fmt.Errorf("Enumerate error: %v", err)
332 }
333 if len(got) == 0 && len(want) == 0 {
334 return nil
335 }
336 var gotSet = map[blob.SizedRef]bool{}
337 for _, sb := range got {
338 if gotSet[sb] {
339 return fmt.Errorf("duplicate blob %v returned in enumerate", sb)
340 }
341 gotSet[sb] = true
342 }
343
344 if !reflect.DeepEqual(got, want) {
345 return fmt.Errorf("enumerate mismatch. Got %d; want %d.\n Got: %v\nWant: %v\n",
346 len(got), len(want), got, want)
347 }
348 return nil
349 }
350
351 func (r *run) testEnumerate(wantUnsorted []blob.SizedRef, opts ...interface{}) {
352 if r.opt.SkipEnum {
353 r.t.Log("Skipping enum test")
354 return
355 }
356 if err := r.withRetries(func() error {
357 return CheckEnumerate(r.sto, wantUnsorted, opts...)
358 }); err != nil {
359 r.t.Fatalf("%v", err)
360 }
361 }
362
363 func (r *run) withRetries(fn func() error) error {
364 delays := r.opt.Retries
365 for {
366 err := fn()
367 if err == nil || len(delays) == 0 {
368 return err
369 }
370 r.t.Logf("(operation failed; retrying after %v)", delays[0])
371 time.Sleep(delays[0])
372 delays = delays[1:]
373 }
374 }
375
376 func testStat(t *testing.T, sto blobserver.BlobStatter, blobs []blob.Ref, want []blob.SizedRef) {
377
378 pos := make(map[blob.Ref]int)
379 need := make(map[blob.Ref]bool)
380 for i, sb := range want {
381 pos[sb.Ref] = i
382 need[sb.Ref] = true
383 }
384
385 err := sto.StatBlobs(context.Background(), blobs, func(sb blob.SizedRef) error {
386 if !sb.Valid() {
387 t.Errorf("StatBlobs func called with invalid/zero blob.SizedRef")
388 return nil
389 }
390 wantPos, ok := pos[sb.Ref]
391 if !ok {
392 t.Errorf("StatBlobs func called with unrequested ref %v (size %d)", sb.Ref, sb.Size)
393 return nil
394 }
395 if !need[sb.Ref] {
396 t.Errorf("StatBlobs func called with ref %v multiple times", sb.Ref)
397 return nil
398 }
399 delete(need, sb.Ref)
400 w := want[wantPos]
401 if sb != w {
402 t.Errorf("StatBlobs returned %v; want %v", sb, w)
403 }
404 return nil
405 })
406 for br := range need {
407 t.Errorf("StatBlobs never returned results for %v", br)
408 }
409 if err != nil {
410 t.Errorf("StatBlobs: %v", err)
411 }
412 }
413
414 type StreamerTestOpt interface {
415 verify(got []blob.SizedRef) error
416 }
417
418
419 type WantN int
420
421 func (want WantN) verify(got []blob.SizedRef) error {
422 if int(want) != len(got) {
423 return fmt.Errorf("got %d streamed blobs; want %d", len(got), int(want))
424 }
425 return nil
426 }
427
428 type WantSizedRefs []blob.SizedRef
429
430 func (s WantSizedRefs) verify(got []blob.SizedRef) error {
431 want := []blob.SizedRef(s)
432 if !reflect.DeepEqual(got, want) {
433 return fmt.Errorf("mismatch:\n got %d blobs: %q\nwant %d blobs: %q\n", len(got), got, len(want), want)
434 }
435 return nil
436 }
437
438
439
440
441
442
443
444 func TestStreamer(t *testing.T, bs blobserver.BlobStreamer, opts ...StreamerTestOpt) {
445
446 var sawEnum map[blob.SizedRef]bool
447 if enumer, ok := bs.(blobserver.BlobEnumerator); ok {
448 sawEnum = make(map[blob.SizedRef]bool)
449
450
451 enumCtx, cancel := context.WithCancel(context.TODO())
452 defer cancel()
453 if err := blobserver.EnumerateAll(enumCtx, enumer, func(sb blob.SizedRef) error {
454 sawEnum[sb] = true
455 return nil
456 }); err != nil {
457 t.Fatalf("Enumerate: %v", err)
458 }
459 }
460
461
462
463 ch := make(chan blobserver.BlobAndToken)
464 errCh := make(chan error, 1)
465 go func() {
466 ctx, cancel := context.WithCancel(context.TODO())
467 defer cancel()
468 errCh <- bs.StreamBlobs(ctx, ch, "")
469 }()
470 var gotRefs []blob.SizedRef
471 sawStreamed := map[blob.Ref]int{}
472 for b := range ch {
473 sawStreamed[b.Ref()]++
474 sbr := b.SizedRef()
475 if sawEnum != nil {
476 if _, ok := sawEnum[sbr]; ok {
477 delete(sawEnum, sbr)
478 } else {
479 t.Errorf("Streamer yielded blob not returned by Enumerate: %v", sbr)
480 }
481 }
482 gotRefs = append(gotRefs, sbr)
483 }
484 if err := <-errCh; err != nil {
485 t.Errorf("initial uninterrupted StreamBlobs error: %v", err)
486 }
487 for br, n := range sawStreamed {
488 if n > 1 {
489 t.Errorf("Streamed returned duplicate %v, %d times", br, n)
490 }
491 }
492 nMissing := 0
493 for sbr := range sawEnum {
494 t.Errorf("Enumerate found %v but Streamer didn't return it", sbr)
495 nMissing++
496 if nMissing == 10 && len(sawEnum) > 10 {
497 t.Errorf("... etc ...")
498 break
499 }
500 }
501 for _, opt := range opts {
502 if err := opt.verify(gotRefs); err != nil {
503 t.Errorf("error after first uninterrupted StreamBlobs pass: %v", err)
504 }
505 }
506 if t.Failed() {
507 return
508 }
509
510
511
512
513
514
515
516
517
518
519 wantRefs := append([]blob.SizedRef(nil), gotRefs...)
520 sawStreamed = map[blob.Ref]int{}
521 gotRefs = gotRefs[:0]
522 contToken := ""
523 for i := 0; i < len(wantRefs); i++ {
524 ctx, cancel := context.WithCancel(context.TODO())
525 defer cancel()
526 ch := make(chan blobserver.BlobAndToken)
527 errc := make(chan error, 1)
528 go func() {
529 errc <- bs.StreamBlobs(ctx, ch, contToken)
530 }()
531 nrecv := 0
532 nextToken := ""
533 for bt := range ch {
534 nrecv++
535 sbr := bt.Blob.SizedRef()
536 isNew := len(gotRefs) == 0 || sbr != gotRefs[len(gotRefs)-1]
537 if isNew {
538 if sawStreamed[sbr.Ref] > 0 {
539 t.Fatalf("In complex pass, returned duplicate blob %v\n\nSo far, before interrupting:\n%v\n\nWant:\n%v", sbr, gotRefs, wantRefs)
540 }
541 sawStreamed[sbr.Ref]++
542 gotRefs = append(gotRefs, sbr)
543 nextToken = bt.Token
544 cancel()
545 break
546 } else if i == 0 {
547 t.Fatalf("first iteration should receive a new value")
548 } else if nrecv == 2 {
549 t.Fatalf("at cut point %d of testStream, Streamer received 2 values, both not unique. Looping?", i)
550 }
551 }
552 err := <-errc
553 if err != nil && err != context.Canceled {
554 t.Fatalf("StreamBlobs on iteration %d (token %q) returned error: %v", i, contToken, err)
555 }
556 if err == nil {
557 break
558 }
559 contToken = nextToken
560 }
561 if !reflect.DeepEqual(gotRefs, wantRefs) {
562 t.Errorf("Mismatch on complex pass (got %d, want %d):\n got %q\nwant %q\n", len(gotRefs), len(wantRefs), gotRefs, wantRefs)
563 wantMap := map[blob.SizedRef]bool{}
564 for _, sbr := range wantRefs {
565 wantMap[sbr] = true
566 }
567 for _, sbr := range gotRefs {
568 if _, ok := wantMap[sbr]; ok {
569 delete(wantMap, sbr)
570 } else {
571 t.Errorf("got has unwanted: %v", sbr)
572 }
573 }
574 missing := wantMap
575 for sbr := range missing {
576 t.Errorf("got is missing: %v", sbr)
577 }
578
579 t.FailNow()
580 }
581 }