1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package schema
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "io"
24 "log"
25 "os"
26 "sync"
27 "time"
28
29 "perkeep.org/pkg/blob"
30 "perkeep.org/pkg/env"
31
32 "go4.org/readerutil"
33 "go4.org/syncutil"
34 "go4.org/syncutil/singleflight"
35 "go4.org/types"
36 )
37
38
39 type FileReader struct {
40
41 *io.SectionReader
42 parent *FileReader
43 rootOff int64
44 fetcher blob.Fetcher
45 ss *superset
46 size int64
47
48 sfg singleflight.Group
49
50 blobmu sync.Mutex
51 lastBlob *blob.Blob
52
53 ssmmu sync.Mutex
54 ssm map[blob.Ref]*superset
55 }
56
57 var _ interface {
58 io.Seeker
59 io.ReaderAt
60 io.Reader
61 io.Closer
62 Size() int64
63 } = (*FileReader)(nil)
64
65
66
67
68
69
70 func NewFileReader(ctx context.Context, fetcher blob.Fetcher, fileBlobRef blob.Ref) (*FileReader, error) {
71
72
73 if !fileBlobRef.Valid() {
74 return nil, errors.New("schema/filereader: NewFileReader blobref invalid")
75 }
76 rc, _, err := fetcher.Fetch(ctx, fileBlobRef)
77 if err != nil {
78 return nil, fmt.Errorf("schema/filereader: fetching file schema blob: %w", err)
79 }
80 defer rc.Close()
81 ss, err := parseSuperset(rc)
82 if err != nil {
83 return nil, fmt.Errorf("schema/filereader: decoding file schema blob: %w", err)
84 }
85 ss.BlobRef = fileBlobRef
86 if ss.Type != "file" && ss.Type != "bytes" {
87 return nil, fmt.Errorf("schema/filereader: expected \"file\" or \"bytes\" schema blob, got %q", ss.Type)
88 }
89 fr, err := ss.NewFileReader(fetcher)
90 if err != nil {
91 return nil, fmt.Errorf("schema/filereader: creating FileReader for %s: %w", fileBlobRef, err)
92 }
93 return fr, nil
94 }
95
96 func (b *Blob) NewFileReader(fetcher blob.Fetcher) (*FileReader, error) {
97 return b.ss.NewFileReader(fetcher)
98 }
99
100
101
102
103
104
105
106
107
108 func (ss *superset) NewFileReader(fetcher blob.Fetcher) (*FileReader, error) {
109 if ss.Type != "file" && ss.Type != "bytes" {
110 return nil, fmt.Errorf("schema/filereader: Superset not of type \"file\" or \"bytes\"")
111 }
112 size := int64(ss.SumPartsSize())
113 fr := &FileReader{
114 fetcher: fetcher,
115 ss: ss,
116 size: size,
117 ssm: make(map[blob.Ref]*superset),
118 }
119 fr.SectionReader = io.NewSectionReader(fr, 0, size)
120 return fr, nil
121 }
122
123
124
125
126 func (fr *FileReader) LoadAllChunks() {
127
128
129
130 go fr.loadAllChunksSync(context.Background())
131 }
132
133 func (fr *FileReader) loadAllChunksSync(ctx context.Context) {
134 gate := syncutil.NewGate(20)
135 fr.ForeachChunk(ctx, func(_ []blob.Ref, p BytesPart) error {
136 if !p.BlobRef.Valid() {
137 return nil
138 }
139 gate.Start()
140 go func(br blob.Ref) {
141 defer gate.Done()
142 rc, _, err := fr.fetcher.Fetch(ctx, br)
143 if err == nil {
144 defer rc.Close()
145 var b [1]byte
146 rc.Read(b[:])
147 }
148 }(p.BlobRef)
149 return nil
150 })
151 }
152
153
154 func (fr *FileReader) UnixMtime() time.Time {
155 t, err := time.Parse(time.RFC3339, fr.ss.UnixMtime)
156 if err != nil {
157 return time.Time{}
158 }
159 return t
160 }
161
162
163 func (fr *FileReader) FileName() string { return fr.ss.FileNameString() }
164
165 func (fr *FileReader) ModTime() time.Time { return fr.ss.ModTime() }
166
167 func (fr *FileReader) FileMode() os.FileMode { return fr.ss.FileMode() }
168
169 func (fr *FileReader) SchemaBlobRef() blob.Ref { return fr.ss.BlobRef }
170
171
172 func (fr *FileReader) Close() error { return nil }
173
174 func (fr *FileReader) ReadAt(p []byte, offset int64) (n int, err error) {
175 if offset < 0 {
176 return 0, errors.New("schema/filereader: negative offset")
177 }
178 if offset >= fr.Size() {
179 return 0, io.EOF
180 }
181 want := len(p)
182 for len(p) > 0 && err == nil {
183 rc, err := fr.readerForOffset(context.TODO(), offset)
184 if err != nil {
185 return n, err
186 }
187 var n1 int
188 n1, err = io.ReadFull(rc, p)
189 rc.Close()
190 if err == io.EOF || err == io.ErrUnexpectedEOF {
191 err = nil
192 }
193 if n1 == 0 {
194 break
195 }
196 p = p[n1:]
197 offset += int64(n1)
198 n += n1
199 }
200 if n < want && err == nil {
201 err = io.ErrUnexpectedEOF
202 }
203 return n, err
204 }
205
206
207
208
209
210
211
212
213
214
215
216
217
218 func (fr *FileReader) ForeachChunk(ctx context.Context, fn func(schemaPath []blob.Ref, p BytesPart) error) error {
219 return fr.foreachChunk(ctx, fn, nil)
220 }
221
222 func (fr *FileReader) foreachChunk(ctx context.Context, fn func([]blob.Ref, BytesPart) error, path []blob.Ref) error {
223 path = append(path, fr.ss.BlobRef)
224 for _, bp := range fr.ss.Parts {
225 if bp.BytesRef.Valid() && bp.BlobRef.Valid() {
226 return fmt.Errorf("part in %v illegally contained both a blobRef and bytesRef", fr.ss.BlobRef)
227 }
228 if bp.BytesRef.Valid() {
229 ss, err := fr.getSuperset(ctx, bp.BytesRef)
230 if err != nil {
231 return err
232 }
233 subfr, err := ss.NewFileReader(fr.fetcher)
234 if err != nil {
235 return err
236 }
237 subfr.parent = fr
238 if err := subfr.foreachChunk(ctx, fn, path); err != nil {
239 return err
240 }
241 } else {
242 if err := fn(path, *bp); err != nil {
243 return err
244 }
245 }
246 }
247 return nil
248 }
249
250 func (fr *FileReader) rootReader() *FileReader {
251 if fr.parent != nil {
252 return fr.parent.rootReader()
253 }
254 return fr
255 }
256
257 func (fr *FileReader) getBlob(ctx context.Context, br blob.Ref) (*blob.Blob, error) {
258 if root := fr.rootReader(); root != fr {
259 return root.getBlob(ctx, br)
260 }
261 fr.blobmu.Lock()
262 last := fr.lastBlob
263 fr.blobmu.Unlock()
264 if last != nil && last.Ref() == br {
265 return last, nil
266 }
267 blob, err := blob.FromFetcher(ctx, fr.fetcher, br)
268 if err != nil {
269 return nil, err
270 }
271
272 fr.blobmu.Lock()
273 fr.lastBlob = blob
274 fr.blobmu.Unlock()
275 return blob, nil
276 }
277
278 func (fr *FileReader) getSuperset(ctx context.Context, br blob.Ref) (*superset, error) {
279 if root := fr.rootReader(); root != fr {
280 return root.getSuperset(ctx, br)
281 }
282 brStr := br.String()
283 ssi, err := fr.sfg.Do(brStr, func() (interface{}, error) {
284 fr.ssmmu.Lock()
285 ss, ok := fr.ssm[br]
286 fr.ssmmu.Unlock()
287 if ok {
288 return ss, nil
289 }
290 rc, _, err := fr.fetcher.Fetch(ctx, br)
291 if err != nil {
292 return nil, fmt.Errorf("schema/filereader: fetching file schema blob: %w", err)
293 }
294 defer rc.Close()
295 ss, err = parseSuperset(rc)
296 if err != nil {
297 return nil, err
298 }
299 ss.BlobRef = br
300 fr.ssmmu.Lock()
301 defer fr.ssmmu.Unlock()
302 fr.ssm[br] = ss
303 return ss, nil
304 })
305 if err != nil {
306 return nil, err
307 }
308 return ssi.(*superset), nil
309 }
310
311 var debug = env.IsDebug()
312
313
314
315
316
317 func (fr *FileReader) readerForOffset(ctx context.Context, off int64) (io.ReadCloser, error) {
318 if debug {
319 log.Printf("(%p) readerForOffset %d + %d = %d", fr, fr.rootOff, off, fr.rootOff+off)
320 }
321 if off < 0 {
322 panic("negative offset")
323 }
324 if off >= fr.size {
325 return types.EmptyBody, nil
326 }
327 offRemain := off
328 var skipped int64
329 parts := fr.ss.Parts
330 for len(parts) > 0 && parts[0].Size <= uint64(offRemain) {
331 offRemain -= int64(parts[0].Size)
332 skipped += int64(parts[0].Size)
333 parts = parts[1:]
334 }
335 if len(parts) == 0 {
336 return types.EmptyBody, nil
337 }
338 p0 := parts[0]
339 var rsc readerutil.ReadSeekCloser
340 var err error
341 switch {
342 case p0.BlobRef.Valid() && p0.BytesRef.Valid():
343 return nil, fmt.Errorf("part illegally contained both a blobRef and bytesRef")
344 case !p0.BlobRef.Valid() && !p0.BytesRef.Valid():
345 return io.NopCloser(
346 io.LimitReader(zeroReader{},
347 int64(p0.Size-uint64(offRemain)))), nil
348 case p0.BlobRef.Valid():
349 blob, err := fr.getBlob(ctx, p0.BlobRef)
350 if err != nil {
351 return nil, err
352 }
353 byteReader, err := blob.ReadAll(ctx)
354 if err != nil {
355 return nil, err
356 }
357 rsc = struct {
358 io.ReadSeeker
359 io.Closer
360 }{
361 byteReader,
362 io.NopCloser(nil),
363 }
364 case p0.BytesRef.Valid():
365 var ss *superset
366 ss, err = fr.getSuperset(ctx, p0.BytesRef)
367 if err != nil {
368 return nil, err
369 }
370 rsc, err = ss.NewFileReader(fr.fetcher)
371 if err == nil {
372 subFR := rsc.(*FileReader)
373 subFR.parent = fr.rootReader()
374 subFR.rootOff = fr.rootOff + skipped
375 }
376 }
377 if err != nil {
378 return nil, err
379 }
380 offRemain += int64(p0.Offset)
381 if offRemain > 0 {
382 newPos, err := rsc.Seek(offRemain, os.SEEK_SET)
383 if err != nil {
384 return nil, err
385 }
386 if newPos != offRemain {
387 panic("Seek didn't work")
388 }
389 }
390 return struct {
391 io.Reader
392 io.Closer
393 }{
394 io.LimitReader(rsc, int64(p0.Size)),
395 rsc,
396 }, nil
397 }
398
399 type zeroReader struct{}
400
401 func (zeroReader) Read(p []byte) (n int, err error) {
402 for i := range p {
403 p[i] = 0
404 }
405 return len(p), nil
406 }