1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package blobpacked
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "io"
24 "strconv"
25 "strings"
26
27 "perkeep.org/pkg/blob"
28 "perkeep.org/pkg/blobserver"
29 )
30
31
32
33 func (s *storage) StreamBlobs(ctx context.Context, dest chan<- blobserver.BlobAndToken, contToken string) (err error) {
34 return blobserver.NewMultiBlobStreamer(
35 smallBlobStreamer{s},
36 largeBlobStreamer{s},
37 ).StreamBlobs(ctx, dest, contToken)
38 }
39
40 type smallBlobStreamer struct{ sto *storage }
41 type largeBlobStreamer struct{ sto *storage }
42
43
44 func (st smallBlobStreamer) StreamBlobs(ctx context.Context, dest chan<- blobserver.BlobAndToken, contToken string) (err error) {
45 small := st.sto.small
46 if bs, ok := small.(blobserver.BlobStreamer); ok {
47 return bs.StreamBlobs(ctx, dest, contToken)
48 }
49 defer close(dest)
50 donec := ctx.Done()
51 return blobserver.EnumerateAllFrom(ctx, small, contToken, func(sb blob.SizedRef) error {
52 select {
53 case dest <- blobserver.BlobAndToken{
54 Blob: blob.NewBlob(sb.Ref, sb.Size, func(ctx context.Context) ([]byte, error) {
55 return slurpSizedRef(ctx, small, sb)
56 }),
57 Token: sb.Ref.StringMinusOne(),
58 }:
59 return nil
60 case <-donec:
61 return ctx.Err()
62 }
63 })
64 }
65
66 var errContToken = errors.New("blobpacked: bad continuation token")
67
68
69
70
71
72 func (st largeBlobStreamer) StreamBlobs(ctx context.Context, dest chan<- blobserver.BlobAndToken, contToken string) (err error) {
73 defer close(dest)
74 s := st.sto
75 large := s.large
76
77 var after string
78 var skipFiles int
79 var firstRef blob.Ref
80
81 if contToken != "" {
82 f := strings.SplitN(contToken, ":", 2)
83 if len(f) != 2 {
84 return errContToken
85 }
86 firstRef, _ = blob.Parse(f[0])
87 skipFiles, err = strconv.Atoi(f[1])
88 if !firstRef.Valid() || err != nil {
89 return errContToken
90 }
91
92
93
94 after = firstRef.StringMinusOne()
95 }
96 return blobserver.EnumerateAllFrom(ctx, large, after, func(sb blob.SizedRef) error {
97 if firstRef.Valid() {
98 if sb.Ref.Less(firstRef) {
99
100 return nil
101 }
102 if firstRef.Less(sb.Ref) {
103 skipFiles = 0
104 }
105 }
106 fileN := 0
107 return s.foreachZipBlob(ctx, sb.Ref, func(bap BlobAndPos) error {
108 if skipFiles > 0 {
109 skipFiles--
110 fileN++
111 return nil
112 }
113 select {
114 case dest <- blobserver.BlobAndToken{
115 Blob: blob.NewBlob(bap.Ref, bap.Size, func(ctx context.Context) ([]byte, error) {
116 return slurpSizedRef(ctx, s, bap.SizedRef)
117 }),
118 Token: fmt.Sprintf("%s:%d", sb.Ref, fileN),
119 }:
120 fileN++
121 return nil
122 case <-ctx.Done():
123 return ctx.Err()
124 }
125 })
126 })
127 }
128
129 func slurpSizedRef(ctx context.Context, f blob.Fetcher, sb blob.SizedRef) ([]byte, error) {
130 rc, size, err := f.Fetch(ctx, sb.Ref)
131 if err != nil {
132 return nil, err
133 }
134 defer rc.Close()
135 if size != sb.Size {
136 return nil, fmt.Errorf("blobpacked fetch of %v reported %d bytes; expected %d", sb.Ref, size, sb.Size)
137 }
138 slurp, err := io.ReadAll(rc)
139 if err != nil {
140 return nil, err
141 }
142 if uint32(len(slurp)) != sb.Size {
143 return nil, fmt.Errorf("blobpacked read %d bytes of %v; expected %d", len(slurp), sb.Ref, sb.Size)
144 }
145 return slurp, nil
146 }