1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
32 package diskpacked
33
34 import (
35 "bufio"
36 "bytes"
37 "context"
38 "errors"
39 "expvar"
40 "fmt"
41 "io"
42 "log"
43 "os"
44 "path/filepath"
45 "regexp"
46 "strings"
47 "sync"
48
49 "perkeep.org/pkg/blob"
50 "perkeep.org/pkg/blobserver"
51 "perkeep.org/pkg/blobserver/local"
52 "perkeep.org/pkg/sorted"
53
54 "go4.org/jsonconfig"
55 "go4.org/lock"
56 "go4.org/readerutil"
57 "go4.org/strutil"
58 "go4.org/syncutil"
59 "go4.org/types"
60 )
61
62
63 type debugT bool
64
65 var debug = debugT(false)
66
67 func (d debugT) Printf(format string, args ...interface{}) {
68 if bool(d) {
69 log.Printf(format, args...)
70 }
71 }
72
73 func (d debugT) Println(args ...interface{}) {
74 if bool(d) {
75 log.Println(args...)
76 }
77 }
78
79 const defaultMaxFileSize = 512 << 20
80
81 type storage struct {
82 root string
83 index sorted.KeyValue
84 maxFileSize int64
85
86 writeLock io.Closer
87
88 *local.Generationer
89
90 mu sync.Mutex
91 closed bool
92 writer *os.File
93 fds []*os.File
94 size int64
95 }
96
97 func (s *storage) String() string {
98 return fmt.Sprintf("\"diskpacked\" blob packs at %s", s.root)
99 }
100
101 var (
102 readVar = expvar.NewMap("diskpacked-read-bytes")
103 readTotVar = expvar.NewMap("diskpacked-total-read-bytes")
104 openFdsVar = expvar.NewMap("diskpacked-open-fds")
105 writeVar = expvar.NewMap("diskpacked-write-bytes")
106 writeTotVar = expvar.NewMap("diskpacked-total-write-bytes")
107 )
108
109 const defaultIndexType = sorted.DefaultKVFileType
110 const defaultIndexFile = "index." + defaultIndexType
111
112
113 func IsDir(dir string) (bool, error) {
114 _, err := os.Stat(filepath.Join(dir, defaultIndexFile))
115 if os.IsNotExist(err) {
116 return false, nil
117 }
118 return err == nil, err
119 }
120
121
122
123
124 func New(dir string) (blobserver.Storage, error) {
125 var maxSize int64
126 if dh, err := os.Open(dir); err == nil {
127 var nBlobFiles, atMax int
128 if fis, err := dh.Readdir(-1); err == nil {
129
130 for _, fi := range fis {
131 if nm := fi.Name(); strings.HasPrefix(nm, "pack-") && strings.HasSuffix(nm, ".blobs") {
132 nBlobFiles++
133 if s := fi.Size(); s > maxSize {
134 maxSize, atMax = fi.Size(), 0
135 } else if s == maxSize {
136 atMax++
137 }
138 }
139 }
140 }
141
142
143 if !(atMax > 1 && nBlobFiles == atMax+1) {
144 maxSize = 0
145 }
146 }
147 return newStorage(dir, maxSize, nil)
148 }
149
150
151 func newIndex(root string, indexConf jsonconfig.Obj) (sorted.KeyValue, error) {
152 if len(indexConf) > 0 {
153 return sorted.NewKeyValueMaybeWipe(indexConf)
154 }
155 return sorted.NewKeyValueMaybeWipe(jsonconfig.Obj{
156 "type": defaultIndexType,
157 "file": filepath.Join(root, defaultIndexFile),
158 })
159 }
160
161
162
163 func newStorage(root string, maxFileSize int64, indexConf jsonconfig.Obj) (s *storage, err error) {
164 fi, err := os.Stat(root)
165 if os.IsNotExist(err) {
166 return nil, fmt.Errorf("storage root %q doesn't exist", root)
167 }
168 if err != nil {
169 return nil, fmt.Errorf("failed to stat directory %q: %w", root, err)
170 }
171 if !fi.IsDir() {
172 return nil, fmt.Errorf("storage root %q exists but is not a directory", root)
173 }
174 index, err := newIndex(root, indexConf)
175 if err != nil {
176 return nil, err
177 }
178 defer func() {
179 if err != nil {
180 index.Close()
181 }
182 }()
183 if maxFileSize <= 0 {
184 maxFileSize = defaultMaxFileSize
185 }
186
187
188
189 root = strings.TrimRight(root, `\/`)
190 s = &storage{
191 root: root,
192 index: index,
193 maxFileSize: maxFileSize,
194 Generationer: local.NewGenerationer(root),
195 }
196 if err := s.openAllPacks(); err != nil {
197 s.Close()
198 return nil, err
199 }
200 s.mu.Lock()
201 defer s.mu.Unlock()
202 if _, _, err := s.StorageGeneration(); err != nil {
203 return nil, fmt.Errorf("error initialization generation for %q: %w", root, err)
204 }
205 return s, nil
206 }
207
208 func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err error) {
209 var (
210 path = config.RequiredString("path")
211 maxFileSize = config.OptionalInt("maxFileSize", 0)
212 indexConf = config.OptionalObject("metaIndex")
213 )
214 if err := config.Validate(); err != nil {
215 return nil, err
216 }
217 return newStorage(path, int64(maxFileSize), indexConf)
218 }
219
220 func init() {
221 blobserver.RegisterStorageConstructor("diskpacked", blobserver.StorageConstructor(newFromConfig))
222 }
223
224
225
226
227 func (s *storage) openForRead(n int) error {
228 if n > len(s.fds) {
229 panic(fmt.Sprintf("openForRead called out of order got %d, expected %d", n, len(s.fds)))
230 }
231
232 fn := s.filename(n)
233 f, err := os.Open(fn)
234 if err != nil {
235 return err
236 }
237 openFdsVar.Add(s.root, 1)
238 debug.Printf("diskpacked: opened for read %q", fn)
239 s.fds = append(s.fds, f)
240 return nil
241 }
242
243
244
245
246
247 func (s *storage) openForWrite(n int) error {
248 fn := s.filename(n)
249 l, err := lock.Lock(fn + ".lock")
250 if err != nil {
251 return err
252 }
253 f, err := os.OpenFile(fn, os.O_RDWR|os.O_CREATE, 0666)
254 if err != nil {
255 l.Close()
256 return err
257 }
258 openFdsVar.Add(s.root, 1)
259 debug.Printf("diskpacked: opened for write %q", fn)
260
261 s.size, err = f.Seek(0, os.SEEK_END)
262 if err != nil {
263 f.Close()
264 l.Close()
265 return err
266 }
267
268 s.writer = f
269 s.writeLock = l
270 return nil
271 }
272
273
274 func (s *storage) closePack() error {
275 var err error
276 if s.writer != nil {
277 err = s.writer.Close()
278 openFdsVar.Add(s.root, -1)
279 s.writer = nil
280 }
281 if s.writeLock != nil {
282 lerr := s.writeLock.Close()
283 if err == nil {
284 err = lerr
285 }
286 s.writeLock = nil
287 }
288 return err
289 }
290
291
292
293
294
295 func (s *storage) nextPack() error {
296 debug.Println("diskpacked: nextPack")
297 s.size = 0
298 if err := s.closePack(); err != nil {
299 return err
300 }
301 n := len(s.fds)
302 if err := s.openForWrite(n); err != nil {
303 return err
304 }
305 return s.openForRead(n)
306 }
307
308
309
310
311 func (s *storage) openAllPacks() error {
312 debug.Println("diskpacked: openAllPacks")
313 n := 0
314 for {
315 err := s.openForRead(n)
316 if os.IsNotExist(err) {
317 break
318 }
319 if err != nil {
320 return err
321 }
322 n++
323 }
324
325 if n == 0 {
326
327 return s.nextPack()
328 }
329
330
331 return s.openForWrite(n - 1)
332 }
333
334
335 func (s *storage) Close() error {
336 s.mu.Lock()
337 defer s.mu.Unlock()
338 if s.closed {
339 return nil
340 }
341 var closeErr error
342 s.closed = true
343 if err := s.index.Close(); err != nil {
344 log.Println("diskpacked: closing index:", err)
345 }
346 for _, f := range s.fds {
347 err := f.Close()
348 openFdsVar.Add(s.root, -1)
349 if err != nil {
350 closeErr = err
351 }
352 }
353 if err := s.closePack(); err != nil && closeErr == nil {
354 closeErr = err
355 }
356 return closeErr
357 }
358
359 func (s *storage) Fetch(ctx context.Context, br blob.Ref) (io.ReadCloser, uint32, error) {
360 return s.fetch(br, 0, -1)
361 }
362
363 func (s *storage) SubFetch(ctx context.Context, br blob.Ref, offset, length int64) (io.ReadCloser, error) {
364 if offset < 0 || length < 0 {
365 return nil, blob.ErrNegativeSubFetch
366 }
367 rc, _, err := s.fetch(br, offset, length)
368 return rc, err
369 }
370
371
372 func (s *storage) fetch(br blob.Ref, offset, length int64) (rc io.ReadCloser, size uint32, err error) {
373 meta, err := s.meta(br)
374 if err != nil {
375 return nil, 0, err
376 }
377
378 if meta.file >= len(s.fds) {
379 return nil, 0, fmt.Errorf("diskpacked: attempt to fetch blob from out of range pack file %d > %d", meta.file, len(s.fds))
380 }
381 rac := s.fds[meta.file]
382 var rs io.ReadSeeker
383 if length == -1 {
384
385 rs = io.NewSectionReader(rac, meta.offset, int64(meta.size))
386 } else {
387 if offset > int64(meta.size) {
388 return nil, 0, blob.ErrOutOfRangeOffsetSubFetch
389 } else if offset+length > int64(meta.size) {
390 length = int64(meta.size) - offset
391 }
392 rs = io.NewSectionReader(rac, meta.offset+offset, length)
393 }
394 fn := rac.Name()
395
396 readVar.Add(fn, 0)
397 if v, ok := readVar.Get(fn).(*expvar.Int); ok {
398 rs = readerutil.NewStatsReadSeeker(v, rs)
399 }
400 readTotVar.Add(s.root, 0)
401 if v, ok := readTotVar.Get(s.root).(*expvar.Int); ok {
402 rs = readerutil.NewStatsReadSeeker(v, rs)
403 }
404 rsc := struct {
405 io.ReadSeeker
406 io.Closer
407 }{
408 rs,
409 types.NopCloser,
410 }
411 return rsc, meta.size, nil
412 }
413
414 func (s *storage) filename(file int) string {
415 return filepath.Join(s.root, fmt.Sprintf("pack-%05d.blobs", file))
416 }
417
418 var removeGate = syncutil.NewGate(20)
419
420
421 func (s *storage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
422 batch := s.index.BeginBatch()
423 var wg syncutil.Group
424 for _, br := range blobs {
425 br := br
426 removeGate.Start()
427 batch.Delete(br.String())
428 wg.Go(func() error {
429 defer removeGate.Done()
430 if err := s.delete(br); err != nil && !errors.Is(err, os.ErrNotExist) {
431 return err
432 }
433 return nil
434 })
435 }
436 err1 := wg.Err()
437 err2 := s.index.CommitBatch(batch)
438 if err1 != nil {
439 return err1
440 }
441 return err2
442 }
443
444 var statGate = syncutil.NewGate(20)
445
446 func (s *storage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
447 return blobserver.StatBlobsParallelHelper(ctx, blobs, fn, statGate, func(br blob.Ref) (sb blob.SizedRef, err error) {
448 m, err := s.meta(br)
449 if err == nil {
450 return m.SizedRef(br), nil
451 }
452 if errors.Is(err, os.ErrNotExist) {
453 return sb, nil
454 }
455 return sb, err
456 })
457 }
458
459 func (s *storage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) (err error) {
460 defer close(dest)
461
462 t := s.index.Find(after, "")
463 defer func() {
464 closeErr := t.Close()
465 if err == nil {
466 err = closeErr
467 }
468 }()
469 for i := 0; i < limit && t.Next(); {
470 key := t.Key()
471 if key <= after {
472
473 continue
474 }
475 br, ok := blob.Parse(key)
476 if !ok {
477 return fmt.Errorf("diskpacked: couldn't parse index key %q", key)
478 }
479 m, ok := parseBlobMeta(t.Value())
480 if !ok {
481 return fmt.Errorf("diskpacked: couldn't parse index value %q: %q", key, t.Value())
482 }
483 select {
484 case dest <- m.SizedRef(br):
485 case <-ctx.Done():
486 return ctx.Err()
487 }
488 i++
489 }
490 return nil
491 }
492
493
494 func parseContToken(token string) (pack int, offset int64, err error) {
495
496 if token == "" {
497 pack = 0
498 offset = 0
499 return
500 }
501 _, err = fmt.Sscan(token, &pack, &offset)
502
503 return
504 }
505
506
507
508
509
510
511
512 func readHeader(br *bufio.Reader) (consumed int, digest []byte, size uint32, err error) {
513 line, err := br.ReadSlice(']')
514 if err != nil {
515 return
516 }
517 const minSize = len("[b-c 0]")
518 sp := bytes.IndexByte(line, ' ')
519 size64, err := strutil.ParseUintBytes(line[sp+1:len(line)-1], 10, 32)
520 if len(line) < minSize || line[0] != '[' || line[len(line)-1] != ']' || sp < 0 || err != nil {
521 return 0, nil, 0, errors.New("diskpacked: invalid header reader")
522 }
523 return len(line), line[1:sp], uint32(size64), nil
524 }
525
526
527
528 var deletedBlobRef = regexp.MustCompile(`^x+-0+$`)
529
530 var _ blobserver.BlobStreamer = (*storage)(nil)
531
532
533 func (s *storage) StreamBlobs(ctx context.Context, dest chan<- blobserver.BlobAndToken, contToken string) error {
534 defer close(dest)
535
536 fileNum, offset, err := parseContToken(contToken)
537 if err != nil {
538 return errors.New("diskpacked: invalid continuation token")
539 }
540 debug.Printf("Continuing blob streaming from pack %s, offset %d",
541 s.filename(fileNum), offset)
542
543 fd, err := os.Open(s.filename(fileNum))
544 if err != nil {
545 return err
546 }
547
548 defer func() {
549 if fd != nil {
550 fd.Close()
551 }
552 }()
553
554
555
556
557
558
559
560
561 _, err = fd.Seek(offset, io.SeekStart)
562 if err != nil {
563 return err
564 }
565
566 const ioBufSize = 256 * 1024
567
568
569 r := bufio.NewReaderSize(fd, ioBufSize)
570
571 for {
572
573 if _, err := r.Peek(1); err != nil {
574 if !errors.Is(err, io.EOF) {
575 return err
576 }
577
578 fileNum++
579 offset = 0
580 fd.Close()
581 fd, err = os.Open(s.filename(fileNum))
582 if os.IsNotExist(err) {
583
584 return nil
585 } else if err != nil {
586 return err
587 }
588 r.Reset(fd)
589 continue
590 }
591
592 thisOffset := offset
593 consumed, digest, size, err := readHeader(r)
594 if err != nil {
595 return err
596 }
597
598 offset += int64(consumed)
599 if deletedBlobRef.Match(digest) {
600
601 if _, err := io.CopyN(io.Discard, r, int64(size)); err != nil {
602 return err
603 }
604 offset += int64(size)
605 continue
606 }
607
608 ref, ok := blob.ParseBytes(digest)
609 if !ok {
610 return fmt.Errorf("diskpacked: Invalid blobref %q", digest)
611 }
612
613
614
615
616
617
618
619
620
621 data := make([]byte, size)
622 if _, err := io.ReadFull(r, data); err != nil {
623 return err
624 }
625 offset += int64(size)
626 blob := blob.NewBlob(ref, size, func(context.Context) ([]byte, error) {
627 return data, nil
628 })
629 select {
630 case dest <- blobserver.BlobAndToken{
631 Blob: blob,
632 Token: fmt.Sprintf("%d %d", fileNum, thisOffset),
633 }:
634
635 case <-ctx.Done():
636 return ctx.Err()
637 }
638 }
639 }
640
641 func (s *storage) ReceiveBlob(ctx context.Context, br blob.Ref, source io.Reader) (sbr blob.SizedRef, err error) {
642 var b bytes.Buffer
643 n, err := b.ReadFrom(source)
644 if err != nil {
645 return
646 }
647
648 sbr = blob.SizedRef{Ref: br, Size: uint32(n)}
649
650
651
652 if m, err := s.meta(br); err == nil {
653 fi, err := os.Stat(s.filename(m.file))
654 if err == nil && fi.Size() >= m.offset+int64(m.size) {
655 return sbr, nil
656 }
657 }
658
659 err = s.append(sbr, &b)
660 return
661 }
662
663
664 func (s *storage) append(br blob.SizedRef, r io.Reader) error {
665 s.mu.Lock()
666 defer s.mu.Unlock()
667 if s.closed {
668 return errors.New("diskpacked: write to closed storage")
669 }
670
671
672 origOffset := s.size
673
674 fn := s.writer.Name()
675 n, err := fmt.Fprintf(s.writer, "[%v %v]", br.Ref.String(), br.Size)
676 s.size += int64(n)
677 writeVar.Add(fn, int64(n))
678 writeTotVar.Add(s.root, int64(n))
679 if err != nil {
680 return err
681 }
682
683
684 offset, err := s.writer.Seek(0, io.SeekCurrent)
685 if err != nil {
686 return err
687 }
688 if offset != s.size {
689 return fmt.Errorf("diskpacked: seek says offset = %d, we think %d",
690 offset, s.size)
691 }
692 offset = s.size
693
694 n2, err := io.Copy(s.writer, r)
695 s.size += n2
696 writeVar.Add(fn, int64(n))
697 writeTotVar.Add(s.root, int64(n))
698 if err != nil {
699 return err
700 }
701 if n2 != int64(br.Size) {
702 return fmt.Errorf("diskpacked: written blob size %d didn't match size %d", n, br.Size)
703 }
704 if err = s.writer.Sync(); err != nil {
705 return err
706 }
707
708 packIdx := len(s.fds) - 1
709 if s.size > s.maxFileSize {
710 if err := s.nextPack(); err != nil {
711 return err
712 }
713 }
714 err = s.index.Set(br.Ref.String(), blobMeta{packIdx, offset, br.Size}.String())
715 if err != nil {
716 if _, seekErr := s.writer.Seek(origOffset, io.SeekStart); seekErr != nil {
717 log.Printf("ERROR seeking back to the original offset: %v", seekErr)
718 } else if truncErr := s.writer.Truncate(origOffset); truncErr != nil {
719 log.Printf("ERROR truncating file after index error: %v", truncErr)
720 } else {
721 s.size = origOffset
722 }
723 }
724 return err
725 }
726
727
728 func (s *storage) meta(br blob.Ref) (m blobMeta, err error) {
729 ms, err := s.index.Get(br.String())
730 if err != nil {
731 if errors.Is(err, sorted.ErrNotFound) {
732 err = os.ErrNotExist
733 }
734 return
735 }
736 m, ok := parseBlobMeta(ms)
737 if !ok {
738 err = fmt.Errorf("diskpacked: bad blob metadata: %q", ms)
739 }
740 return
741 }
742
743
744 type blobMeta struct {
745 file int
746 offset int64
747 size uint32
748 }
749
750 func parseBlobMeta(s string) (m blobMeta, ok bool) {
751 n, err := fmt.Sscan(s, &m.file, &m.offset, &m.size)
752 return m, n == 3 && err == nil
753 }
754
755 func (m blobMeta) String() string {
756 return fmt.Sprintf("%v %v %v", m.file, m.offset, m.size)
757 }
758
759 func (m blobMeta) SizedRef(br blob.Ref) blob.SizedRef {
760 return blob.SizedRef{Ref: br, Size: m.size}
761 }