1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package test
18
19 import (
20 "context"
21 "io"
22
23 "perkeep.org/pkg/blob"
24 "perkeep.org/pkg/blobserver"
25 "perkeep.org/pkg/blobserver/memory"
26 )
27
28
29
30
31 type Fetcher struct {
32 memory.Storage
33
34
35 ReceiveErr error
36
37
38
39 FetchErr func() error
40 }
41
42 var (
43 _ blobserver.Storage = (*Fetcher)(nil)
44 _ blobserver.BlobStreamer = (*Fetcher)(nil)
45 )
46
47 func (tf *Fetcher) Fetch(ctx context.Context, ref blob.Ref) (file io.ReadCloser, size uint32, err error) {
48 if tf.FetchErr != nil {
49 if err = tf.FetchErr(); err != nil {
50 return
51 }
52 }
53 file, size, err = tf.Storage.Fetch(ctx, ref)
54 if err != nil {
55 return
56 }
57 return file, size, nil
58 }
59
60 func (tf *Fetcher) SubFetch(ctx context.Context, ref blob.Ref, offset, length int64) (io.ReadCloser, error) {
61 if tf.FetchErr != nil {
62 if err := tf.FetchErr(); err != nil {
63 return nil, err
64 }
65 }
66 rc, err := tf.Storage.SubFetch(ctx, ref, offset, length)
67 if err != nil {
68 return rc, err
69 }
70 return rc, nil
71 }
72
73 func (tf *Fetcher) ReceiveBlob(ctx context.Context, br blob.Ref, source io.Reader) (blob.SizedRef, error) {
74 sb, err := tf.Storage.ReceiveBlob(ctx, br, source)
75 if err != nil {
76 return sb, err
77 }
78 if err := tf.ReceiveErr; err != nil {
79 tf.RemoveBlobs(ctx, []blob.Ref{br})
80 return sb, err
81 }
82 return sb, nil
83 }
84
85 func (tf *Fetcher) AddBlob(b *Blob) {
86 _, err := tf.ReceiveBlob(context.Background(), b.BlobRef(), b.Reader())
87 if err != nil {
88 panic(err)
89 }
90 }