1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
78 package blobpacked
79
80
81
82 import (
83 "archive/zip"
84 "bytes"
85 "context"
86 "crypto/sha1"
87 "encoding/json"
88 "errors"
89 "fmt"
90 "io"
91 "log"
92 "os"
93 "runtime"
94 "sort"
95 "strconv"
96 "strings"
97 "sync"
98 "time"
99
100 "perkeep.org/internal/pools"
101 "perkeep.org/pkg/blob"
102 "perkeep.org/pkg/blobserver"
103 "perkeep.org/pkg/constants"
104 "perkeep.org/pkg/env"
105 "perkeep.org/pkg/schema"
106 "perkeep.org/pkg/sorted"
107
108 "go4.org/jsonconfig"
109 "go4.org/strutil"
110 "go4.org/syncutil"
111 )
112
113
114
115 const packThreshold = 512 << 10
116
117
118
119
120 var (
121 zipFixedOverhead = 20 +
122 56 +
123 22 +
124 512
125 zipPerEntryOverhead = 30 +
126 24 +
127 22 +
128 len("camlistore/sha1-f1d2d2f924e986ac86fdf7b36c94bcdf32beec15.dat")*3/2
129 )
130
131
132 const (
133 blobMetaPrefix = "b:"
134 blobMetaPrefixLimit = "b;"
135
136 wholeMetaPrefix = "w:"
137 wholeMetaPrefixLimit = "w;"
138
139 zipMetaPrefix = "z:"
140 zipMetaPrefixLimit = "z;"
141 )
142
143 const (
144 zipManifestPath = "camlistore/camlistore-pack-manifest.json"
145 )
146
147
148 type RecoveryMode int
149
150
151
152 const (
153
154
155 NoRecovery RecoveryMode = 0
156
157 FastRecovery RecoveryMode = 1
158
159 FullRecovery RecoveryMode = 2
160 )
161
162 var (
163 recoveryMu sync.Mutex
164 recovery = NoRecovery
165 )
166
167
168
169
170
171
172
173 func SetRecovery(mode RecoveryMode) {
174 recoveryMu.Lock()
175 defer recoveryMu.Unlock()
176 recovery = mode
177 }
178
179 type subFetcherStorage interface {
180 blobserver.Storage
181 blob.SubFetcher
182 }
183
184
185
186
187 type storage struct {
188 small blobserver.Storage
189 large subFetcherStorage
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218 meta sorted.KeyValue
219
220
221
222 forceMaxZipBlobSize int
223
224 skipDelete bool
225
226 packGate *syncutil.Gate
227
228 loggerOnce sync.Once
229 log *log.Logger
230 }
231
232 var (
233 _ blobserver.BlobStreamer = (*storage)(nil)
234 _ blobserver.Generationer = (*storage)(nil)
235 _ blobserver.WholeRefFetcher = (*storage)(nil)
236 )
237
238 func (s *storage) String() string {
239 return fmt.Sprintf("\"blobpacked\" storage")
240 }
241
242 func (s *storage) Logf(format string, args ...interface{}) {
243 s.logger().Printf(format, args...)
244 }
245
246 func (s *storage) logger() *log.Logger {
247 s.loggerOnce.Do(s.initLogger)
248 return s.log
249 }
250
251 func (s *storage) initLogger() {
252 if s.log == nil {
253 s.log = log.New(os.Stderr, "blobpacked: ", log.LstdFlags)
254 }
255 }
256
257 func (s *storage) init() {
258 s.packGate = syncutil.NewGate(10)
259 }
260
261 func (s *storage) maxZipBlobSize() int {
262 if s.forceMaxZipBlobSize > 0 {
263 return s.forceMaxZipBlobSize
264 }
265 return constants.MaxBlobSize
266 }
267
268 func init() {
269 blobserver.RegisterStorageConstructor("blobpacked", blobserver.StorageConstructor(newFromConfig))
270 }
271
272 func newFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (blobserver.Storage, error) {
273 var (
274 smallPrefix = conf.RequiredString("smallBlobs")
275 largePrefix = conf.RequiredString("largeBlobs")
276 metaConf = conf.RequiredObject("metaIndex")
277 keepGoing = conf.OptionalBool("keepGoing", false)
278 )
279 if err := conf.Validate(); err != nil {
280 return nil, err
281 }
282 small, err := ld.GetStorage(smallPrefix)
283 if err != nil {
284 return nil, fmt.Errorf("failed to load smallBlobs at %s: %v", smallPrefix, err)
285 }
286 large, err := ld.GetStorage(largePrefix)
287 if err != nil {
288 return nil, fmt.Errorf("failed to load largeBlobs at %s: %v", largePrefix, err)
289 }
290 largeSubber, ok := large.(subFetcherStorage)
291 if !ok {
292 return nil, fmt.Errorf("largeBlobs at %q of type %T doesn't support fetching sub-ranges of blobs",
293 largePrefix, large)
294 }
295 meta, err := sorted.NewKeyValueMaybeWipe(metaConf)
296 if err != nil {
297 return nil, fmt.Errorf("failed to setup blobpacked metaIndex: %v", err)
298 }
299 sto := &storage{
300 small: small,
301 large: largeSubber,
302 meta: meta,
303 }
304 sto.init()
305
306 recoveryMu.Lock()
307 defer recoveryMu.Unlock()
308 condFatalf := func(pattern string, args ...interface{}) {
309 log.Printf(pattern, args...)
310 if !keepGoing {
311 os.Exit(1)
312 }
313 }
314
315 var newKv func() (sorted.KeyValue, error)
316 switch recovery {
317 case FastRecovery:
318 newKv = func() (sorted.KeyValue, error) {
319 return sorted.NewKeyValue(metaConf)
320 }
321 case FullRecovery:
322 newKv = func() (sorted.KeyValue, error) {
323 kv, err := sorted.NewKeyValue(metaConf)
324 if err != nil {
325 return nil, err
326 }
327 wiper, ok := kv.(sorted.Wiper)
328 if !ok {
329 return nil, fmt.Errorf("blobpacked meta index of type %T needs to be wiped, but does not support automatic wiping. It should be removed manually.", kv)
330 }
331 if err := wiper.Wipe(); err != nil {
332 return nil, fmt.Errorf("blobpacked meta index of type %T could not be wiped: %v", kv, err)
333 }
334 return kv, nil
335 }
336 }
337 if newKv != nil {
338
339 log.Print("Starting recovery of blobpacked index")
340 if err := meta.Close(); err != nil {
341 return nil, err
342 }
343 if err := sto.reindex(context.TODO(), newKv); err != nil {
344 return nil, err
345 }
346 if _, err := sto.checkLargeIntegrity(); err != nil {
347 condFatalf("blobpacked: reindexed successfully, but error after validation: %v", err)
348 }
349 return sto, nil
350 }
351
352
353
354
355 if !sto.anyMeta() && sto.anyZipPacks() {
356 if env.OnGCE() {
357
358 condFatalf("Error: blobpacked storage detects non-zero packed zips, but no metadata. Please switch to recovery mode: add the \"camlistore-recovery = %d\" key/value to the Custom metadata of your instance. And restart the instance.", FastRecovery)
359 }
360 condFatalf("Error: blobpacked storage detects non-zero packed zips, but no metadata. Please re-start in recovery mode with -recovery=%d", FastRecovery)
361 }
362
363 if mode, err := sto.checkLargeIntegrity(); err != nil {
364 if mode <= NoRecovery {
365 condFatalf("%v", err)
366 }
367 if env.OnGCE() {
368
369 condFatalf("Error: %v. Please switch to recovery mode: add the \"camlistore-recovery = %d\" key/value to the Custom metadata of your instance. And restart the instance.", err, mode)
370 }
371 condFatalf("Error: %v. Please re-start in recovery mode with -recovery=%d", err, mode)
372 }
373
374 return sto, nil
375 }
376
377
378
379
380
381
382
383 func (s *storage) checkLargeIntegrity() (RecoveryMode, error) {
384 inLarge := 0
385 var missing []blob.Ref
386 var extra []blob.Ref
387 t := s.meta.Find(zipMetaPrefix, zipMetaPrefixLimit)
388 defer t.Close()
389 iterate := true
390 var enumFunc func(sb blob.SizedRef) error
391 enumFunc = func(sb blob.SizedRef) error {
392 if iterate && !t.Next() {
393
394 missing = append(missing, sb.Ref)
395 return nil
396 }
397 iterate = true
398 wantMetaKey := zipMetaPrefix + sb.Ref.String()
399 metaKey := t.Key()
400 if metaKey != wantMetaKey {
401 if metaKey > wantMetaKey {
402
403 missing = append(missing, sb.Ref)
404 iterate = false
405 return nil
406 }
407
408 xbr, ok := blob.Parse(strings.TrimPrefix(metaKey, zipMetaPrefix))
409 if !ok {
410 return fmt.Errorf("bogus key in z: row: %q", metaKey)
411 }
412 extra = append(extra, xbr)
413
414 return enumFunc(sb)
415 }
416 if _, err := parseZipMetaRow(t.ValueBytes()); err != nil {
417 return fmt.Errorf("error parsing row from meta: %v", err)
418 }
419 inLarge++
420 return nil
421 }
422 log.Printf("blobpacked: checking integrity of packed blobs against index...")
423 if err := blobserver.EnumerateAllFrom(context.Background(), s.large, "", enumFunc); err != nil {
424 return FullRecovery, err
425 }
426 log.Printf("blobpacked: %d large blobs found in index, %d missing from index", inLarge, len(missing))
427 if len(missing) > 0 {
428 printSample(missing, "missing")
429 }
430 if len(extra) > 0 {
431 printSample(extra, "extra")
432 return FullRecovery, fmt.Errorf("%d large blobs in index but not actually in storage", len(extra))
433 }
434 if err := t.Close(); err != nil {
435 return FullRecovery, fmt.Errorf("error reading or closing index: %v", err)
436 }
437 if len(missing) > 0 {
438 return FastRecovery, fmt.Errorf("%d large blobs missing from index", len(missing))
439 }
440 return NoRecovery, nil
441 }
442
443 func printSample(fromSlice []blob.Ref, sliceName string) {
444 sort.Slice(fromSlice, func(i, j int) bool { return fromSlice[i].Less(fromSlice[j]) })
445 for i, br := range fromSlice {
446 if i == 10 {
447 break
448 }
449 log.Printf(" sample %v large blob: %v", sliceName, br)
450 }
451 }
452
453
454
455
456
457
458
459 type zipMetaInfo struct {
460 wholePartIndex int
461 zipRef blob.Ref
462 zipSize uint32
463 offsetInZip uint32
464 dataSize uint32
465 wholeSize uint64
466 wholeRef blob.Ref
467 }
468
469
470
471 func (zm zipMetaInfo) rowValue(offset uint64) string {
472 return fmt.Sprintf("%d %v %d %d %d", zm.zipSize, zm.wholeRef, zm.wholeSize, offset, zm.dataSize)
473 }
474
475
476
477
478
479
480
481 func (s *storage) fileName(ctx context.Context, zipRef blob.Ref) (string, error) {
482 _, size, err := s.large.Fetch(ctx, zipRef)
483 if err != nil {
484 return "", err
485 }
486 zr, err := zip.NewReader(blob.ReaderAt(ctx, s.large, zipRef), int64(size))
487 if err != nil {
488 return "", zipOpenError{zipRef, err}
489 }
490 for _, f := range zr.File {
491 return f.Name, nil
492 }
493 return "", os.ErrNotExist
494 }
495
496
497
498
499
500 func (s *storage) reindex(ctx context.Context, newMeta func() (sorted.KeyValue, error)) error {
501 meta, err := newMeta()
502 if err != nil {
503 return fmt.Errorf("failed to create new blobpacked meta index: %v", err)
504 }
505
506 zipMetaByWholeRef := make(map[blob.Ref][]zipMetaInfo)
507
508
509 packedTotal := 0
510 blobserver.EnumerateAllFrom(ctx, s.large, "", func(sb blob.SizedRef) error {
511 packedTotal++
512 return nil
513 })
514
515 var packedDone, packedSeen int
516 t := time.NewTicker(10 * time.Second)
517 defer t.Stop()
518 if err := blobserver.EnumerateAllFrom(ctx, s.large, "", func(sb blob.SizedRef) error {
519 select {
520 case <-t.C:
521 log.Printf("blobpacked: %d / %d zip packs seen", packedSeen, packedTotal)
522 default:
523 }
524 zipRef := sb.Ref
525 zr, err := zip.NewReader(blob.ReaderAt(ctx, s.large, zipRef), int64(sb.Size))
526 if err != nil {
527 return zipOpenError{zipRef, err}
528 }
529 var maniFile *zip.File
530 var firstOff int64
531 for i, f := range zr.File {
532 if i == 0 {
533 firstOff, err = f.DataOffset()
534 if err != nil {
535 return err
536 }
537 }
538 if f.Name == zipManifestPath {
539 maniFile = f
540 break
541 }
542 }
543 if maniFile == nil {
544 return fmt.Errorf("no perkeep manifest file found in zip %v", zipRef)
545 }
546 maniRC, err := maniFile.Open()
547 if err != nil {
548 return err
549 }
550 defer maniRC.Close()
551 var mf Manifest
552 if err := json.NewDecoder(maniRC).Decode(&mf); err != nil {
553 return err
554 }
555 if !mf.WholeRef.Valid() || mf.WholeSize == 0 || !mf.DataBlobsOrigin.Valid() {
556 return fmt.Errorf("incomplete blobpack manifest JSON in %v", zipRef)
557 }
558
559 bm := meta.BeginBatch()
560
561
562 var dataBytesWritten int64
563 for _, bp := range mf.DataBlobs {
564 bm.Set(blobMetaPrefix+bp.SizedRef.Ref.String(), fmt.Sprintf("%d %v %d", bp.SizedRef.Size, zipRef, firstOff+bp.Offset))
565 dataBytesWritten += int64(bp.SizedRef.Size)
566 }
567 if dataBytesWritten > (1<<32 - 1) {
568 return fmt.Errorf("total data blobs size in zip %v overflows uint32", zipRef)
569 }
570 dataSize := uint32(dataBytesWritten)
571
572
573 for _, f := range zr.File {
574 if !(strings.HasPrefix(f.Name, "camlistore/") && strings.HasSuffix(f.Name, ".json")) ||
575 f.Name == zipManifestPath {
576 continue
577 }
578 br, ok := blob.Parse(strings.TrimSuffix(strings.TrimPrefix(f.Name, "camlistore/"), ".json"))
579 if !ok {
580 return fmt.Errorf("schema file in zip %v does not have blobRef as name: %v", zipRef, f.Name)
581 }
582 offset, err := f.DataOffset()
583 if err != nil {
584 return err
585 }
586 bm.Set(blobMetaPrefix+br.String(), fmt.Sprintf("%d %v %d", f.UncompressedSize64, zipRef, offset))
587 }
588
589 if err := meta.CommitBatch(bm); err != nil {
590 return err
591 }
592
593
594 zipMetaByWholeRef[mf.WholeRef] = append(zipMetaByWholeRef[mf.WholeRef], zipMetaInfo{
595 wholePartIndex: mf.WholePartIndex,
596 zipRef: zipRef,
597 zipSize: sb.Size,
598 offsetInZip: uint32(firstOff),
599 dataSize: dataSize,
600 wholeSize: uint64(mf.WholeSize),
601 wholeRef: mf.WholeRef,
602 })
603 packedSeen++
604 return nil
605 }); err != nil {
606 return err
607 }
608
609
610 foundDups := false
611 packedFiles := 0
612 tt := time.NewTicker(2 * time.Second)
613 defer tt.Stop()
614 bm := meta.BeginBatch()
615 for wholeRef, zipMetas := range zipMetaByWholeRef {
616 select {
617 case <-t.C:
618 log.Printf("blobpacked: %d files reindexed", packedFiles)
619 default:
620 }
621 sort.Slice(zipMetas, func(i, j int) bool { return zipMetas[i].wholePartIndex < zipMetas[j].wholePartIndex })
622 hasDup := hasDups(zipMetas)
623 if hasDup {
624 foundDups = true
625 }
626 offsets := wholeOffsets(zipMetas)
627 for _, z := range zipMetas {
628 offset := offsets[z.wholePartIndex]
629
630 bm.Set(fmt.Sprintf("%s%s:%d", wholeMetaPrefix, wholeRef, z.wholePartIndex),
631 fmt.Sprintf("%s %d %d %d", z.zipRef, z.offsetInZip, offset, z.dataSize))
632
633 bm.Set(fmt.Sprintf("%s%v", zipMetaPrefix, z.zipRef), z.rowValue(offset))
634 packedDone++
635 }
636 if hasDup {
637 if debug, _ := strconv.ParseBool(os.Getenv("CAMLI_DEBUG")); debug {
638 printDuplicates(zipMetas)
639 }
640 }
641
642 wholeBytesWritten := offsets[len(offsets)-1]
643 if zipMetas[0].wholeSize != wholeBytesWritten {
644
645
646 fileName, err := s.fileName(ctx, zipMetas[0].zipRef)
647 if err != nil {
648 return fmt.Errorf("could not get filename of file in zip %v: %v", zipMetas[0].zipRef, err)
649 }
650 log.Printf(
651 "blobpacked: file %q (wholeRef %v) is incomplete: sum of all zips (%d bytes) does not match manifest's WholeSize (%d bytes)",
652 fileName, wholeRef, wholeBytesWritten, zipMetas[0].wholeSize)
653 var allParts []blob.Ref
654 for _, z := range zipMetas {
655 allParts = append(allParts, z.zipRef)
656 }
657 log.Printf("blobpacked: known parts of %v: %v", wholeRef, allParts)
658
659
660 continue
661 }
662 bm.Set(fmt.Sprintf("%s%s", wholeMetaPrefix, wholeRef),
663 fmt.Sprintf("%d %d", wholeBytesWritten, zipMetas[len(zipMetas)-1].wholePartIndex+1))
664 packedFiles++
665 }
666 if err := meta.CommitBatch(bm); err != nil {
667 return err
668 }
669
670 log.Printf("blobpacked: %d / %d zip packs successfully reindexed", packedDone, packedTotal)
671 if packedFiles < len(zipMetaByWholeRef) {
672 log.Printf("blobpacked: %d files reindexed, and %d incomplete file(s) found.", packedFiles, len(zipMetaByWholeRef)-packedFiles)
673 } else {
674 log.Printf("blobpacked: %d files reindexed.", packedFiles)
675 }
676 if foundDups {
677 if debug, _ := strconv.ParseBool(os.Getenv("CAMLI_DEBUG")); !debug {
678 log.Print("blobpacked: zip blobs with duplicate contents were found. Re-run with CAMLI_DEBUG=true for more detail.")
679 }
680 }
681
682
683
684
685
686 s.meta = meta
687 return nil
688 }
689
690
691
692
693
694
695
696 func hasDups(zm []zipMetaInfo) bool {
697 i := 0
698 var dataSize uint32
699 var firstDup blob.Ref
700 dupFound := false
701 for _, z := range zm {
702 if z.wholePartIndex == i {
703 firstDup = z.zipRef
704 dataSize = z.dataSize
705 i++
706 continue
707 }
708
709
710
711 if z.dataSize != dataSize {
712 panic(fmt.Sprintf("%v and %v looked like duplicates at first, but don't actually have the same dataSize. TODO: add DataBlobsOrigin checking.", firstDup, z.zipRef))
713 }
714 dupFound = true
715 }
716 return dupFound
717 }
718
719
720
721
722
723 func wholeOffsets(zm []zipMetaInfo) []uint64 {
724 i := 0
725 var offsets []uint64
726 var currentOffset uint64
727 for _, z := range zm {
728 if i != z.wholePartIndex {
729 continue
730 }
731 offsets = append(offsets, currentOffset)
732 currentOffset += uint64(z.dataSize)
733 i++
734 }
735
736
737 offsets = append(offsets, currentOffset)
738 return offsets
739 }
740
741 func printDuplicates(zm []zipMetaInfo) {
742 i := 0
743 byPartIndex := make(map[int][]zipMetaInfo)
744 for _, z := range zm {
745 if i == z.wholePartIndex {
746 byPartIndex[z.wholePartIndex] = []zipMetaInfo{z}
747 i++
748 continue
749 }
750 byPartIndex[z.wholePartIndex] = append(byPartIndex[z.wholePartIndex], z)
751 }
752 for _, zm := range byPartIndex {
753 if len(zm) <= 1 {
754 continue
755 }
756 br := make([]blob.Ref, 0, len(zm))
757 for _, z := range zm {
758 br = append(br, z.zipRef)
759 }
760 log.Printf("zip blobs with same data contents: %v", br)
761 }
762 }
763
764 func (s *storage) anyMeta() (v bool) {
765
766
767
768 sorted.Foreach(s.meta, func(_, _ string) error {
769 v = true
770 return errors.New("stop")
771 })
772 return
773 }
774
775 func (s *storage) anyZipPacks() (v bool) {
776 ctx, cancel := context.WithCancel(context.TODO())
777 defer cancel()
778 dest := make(chan blob.SizedRef, 1)
779 if err := s.large.EnumerateBlobs(ctx, dest, "", 1); err != nil {
780
781
782
783 return false
784 }
785 _, ok := <-dest
786 return ok
787 }
788
789 func (s *storage) Close() error {
790 return s.meta.Close()
791 }
792
793 func (s *storage) StorageGeneration() (initTime time.Time, random string, err error) {
794 sgen, sok := s.small.(blobserver.Generationer)
795 lgen, lok := s.large.(blobserver.Generationer)
796 if !sok || !lok {
797 return time.Time{}, "", blobserver.GenerationNotSupportedError("underlying storage engines don't support Generationer")
798 }
799 st, srand, err := sgen.StorageGeneration()
800 if err != nil {
801 return
802 }
803 lt, lrand, err := lgen.StorageGeneration()
804 if err != nil {
805 return
806 }
807 hash := sha1.New()
808 io.WriteString(hash, srand)
809 io.WriteString(hash, lrand)
810 maxTime := func(a, b time.Time) time.Time {
811 if a.After(b) {
812 return a
813 }
814 return b
815 }
816 return maxTime(lt, st), fmt.Sprintf("%x", hash.Sum(nil)), nil
817 }
818
819 func (s *storage) ResetStorageGeneration() error {
820 var retErr error
821 for _, st := range []blobserver.Storage{s.small, s.large} {
822 if g, ok := st.(blobserver.Generationer); ok {
823 if err := g.ResetStorageGeneration(); err != nil {
824 retErr = err
825 }
826 }
827 }
828 return retErr
829 }
830
831 type meta struct {
832 exists bool
833 size uint32
834 largeRef blob.Ref
835 largeOff uint32
836 }
837
838 func (m *meta) isPacked() bool { return m.largeRef.Valid() }
839
840
841 func (s *storage) getMetaRow(br blob.Ref) (meta, error) {
842 v, err := s.meta.Get(blobMetaPrefix + br.String())
843 if err == sorted.ErrNotFound {
844 return meta{}, nil
845 }
846 if err != nil {
847 return meta{}, fmt.Errorf("blobpacked.getMetaRow(%v) = %v", br, err)
848 }
849 return parseMetaRow([]byte(v))
850 }
851
852 var singleSpace = []byte{' '}
853
854
855
856 func parseMetaRow(v []byte) (m meta, err error) {
857 row := v
858 sp := bytes.IndexByte(v, ' ')
859 if sp < 1 || sp == len(v)-1 {
860 return meta{}, fmt.Errorf("invalid metarow %q", v)
861 }
862 m.exists = true
863 size, err := strutil.ParseUintBytes(v[:sp], 10, 32)
864 if err != nil {
865 return meta{}, fmt.Errorf("invalid metarow size %q", v)
866 }
867 m.size = uint32(size)
868 v = v[sp+1:]
869
870
871 if bytes.Count(v, singleSpace) != 1 {
872 return meta{}, fmt.Errorf("invalid metarow %q: wrong number of spaces", row)
873 }
874 sp = bytes.IndexByte(v, ' ')
875 largeRef, ok := blob.ParseBytes(v[:sp])
876 if !ok {
877 return meta{}, fmt.Errorf("invalid metarow %q: bad blobref %q", row, v[:sp])
878 }
879 m.largeRef = largeRef
880 off, err := strutil.ParseUintBytes(v[sp+1:], 10, 32)
881 if err != nil {
882 return meta{}, fmt.Errorf("invalid metarow %q: bad offset: %v", row, err)
883 }
884 m.largeOff = uint32(off)
885 return m, nil
886 }
887
888 func parseMetaRowSizeOnly(v []byte) (size uint32, err error) {
889 sp := bytes.IndexByte(v, ' ')
890 if sp < 1 || sp == len(v)-1 {
891 return 0, fmt.Errorf("invalid metarow %q", v)
892 }
893 size64, err := strutil.ParseUintBytes(v[:sp], 10, 32)
894 if err != nil {
895 return 0, fmt.Errorf("invalid metarow size %q", v)
896 }
897 return uint32(size64), nil
898 }
899
900
901
902 func parseZipMetaRow(v []byte) (m zipMetaInfo, err error) {
903 row := v
904 sp := bytes.IndexByte(v, ' ')
905 if sp < 1 || sp == len(v)-1 {
906 return zipMetaInfo{}, fmt.Errorf("invalid z: meta row %q", row)
907 }
908 if bytes.Count(v, singleSpace) != 4 {
909 return zipMetaInfo{}, fmt.Errorf("wrong number of spaces in z: meta row %q", row)
910 }
911 zipSize, err := strutil.ParseUintBytes(v[:sp], 10, 32)
912 if err != nil {
913 return zipMetaInfo{}, fmt.Errorf("invalid zipSize %q in z: meta row: %q", v[:sp], row)
914 }
915 m.zipSize = uint32(zipSize)
916
917 v = v[sp+1:]
918 sp = bytes.IndexByte(v, ' ')
919 wholeRef, ok := blob.ParseBytes(v[:sp])
920 if !ok {
921 return zipMetaInfo{}, fmt.Errorf("invalid wholeRef %q in z: meta row: %q", v[:sp], row)
922 }
923 m.wholeRef = wholeRef
924
925 v = v[sp+1:]
926 sp = bytes.IndexByte(v, ' ')
927 wholeSize, err := strutil.ParseUintBytes(v[:sp], 10, 64)
928 if err != nil {
929 return zipMetaInfo{}, fmt.Errorf("invalid wholeSize %q in z: meta row: %q", v[:sp], row)
930 }
931 m.wholeSize = uint64(wholeSize)
932
933 v = v[sp+1:]
934 sp = bytes.IndexByte(v, ' ')
935 if _, err := strutil.ParseUintBytes(v[:sp], 10, 64); err != nil {
936 return zipMetaInfo{}, fmt.Errorf("invalid offset %q in z: meta row: %q", v[:sp], row)
937 }
938
939 v = v[sp+1:]
940 dataSize, err := strutil.ParseUintBytes(v, 10, 32)
941 if err != nil {
942 return zipMetaInfo{}, fmt.Errorf("invalid dataSize %q in z: meta row: %q", v, row)
943 }
944 m.dataSize = uint32(dataSize)
945
946 return m, nil
947 }
948
949 func (s *storage) ReceiveBlob(ctx context.Context, br blob.Ref, source io.Reader) (sb blob.SizedRef, err error) {
950 buf := pools.BytesBuffer()
951 defer pools.PutBuffer(buf)
952
953 if _, err := io.Copy(buf, source); err != nil {
954 return sb, err
955 }
956 size := uint32(buf.Len())
957 isFile := false
958 fileBlob, err := schema.BlobFromReader(br, bytes.NewReader(buf.Bytes()))
959 if err == nil && fileBlob.Type() == schema.TypeFile {
960 isFile = true
961 }
962 meta, err := s.getMetaRow(br)
963 if err != nil {
964 return sb, err
965 }
966 if meta.exists {
967 sb = blob.SizedRef{Size: size, Ref: br}
968 } else {
969 sb, err = s.small.ReceiveBlob(ctx, br, buf)
970 if err != nil {
971 return sb, err
972 }
973 }
974 if !isFile || meta.isPacked() || fileBlob.PartsSize() < packThreshold {
975 return sb, nil
976 }
977
978
979 s.packGate.Start()
980 defer s.packGate.Done()
981
982
983
984 s.packFile(ctx, br)
985 return sb, nil
986 }
987
988 func (s *storage) Fetch(ctx context.Context, br blob.Ref) (io.ReadCloser, uint32, error) {
989 m, err := s.getMetaRow(br)
990 if err != nil {
991 return nil, 0, err
992 }
993 if !m.exists || !m.isPacked() {
994 return s.small.Fetch(ctx, br)
995 }
996 rc, err := s.large.SubFetch(ctx, m.largeRef, int64(m.largeOff), int64(m.size))
997 if err != nil {
998 return nil, 0, err
999 }
1000 return rc, m.size, nil
1001 }
1002
1003 const removeLookups = 50
1004
1005 func (s *storage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
1006
1007
1008
1009
1010
1011
1012
1013
1014 var (
1015 mu sync.Mutex
1016 unpacked []blob.Ref
1017 packed []blob.Ref
1018 large = map[blob.Ref]bool{}
1019 )
1020 var grp syncutil.Group
1021 delGate := syncutil.NewGate(removeLookups)
1022 for _, br := range blobs {
1023 br := br
1024 delGate.Start()
1025 grp.Go(func() error {
1026 defer delGate.Done()
1027 m, err := s.getMetaRow(br)
1028 if err != nil {
1029 return err
1030 }
1031 mu.Lock()
1032 defer mu.Unlock()
1033 if m.isPacked() {
1034 packed = append(packed, br)
1035 large[m.largeRef] = true
1036 } else {
1037 unpacked = append(unpacked, br)
1038 }
1039 return nil
1040 })
1041 }
1042 if err := grp.Err(); err != nil {
1043 return err
1044 }
1045 if len(unpacked) > 0 {
1046 grp.Go(func() error {
1047 return s.small.RemoveBlobs(ctx, unpacked)
1048 })
1049 }
1050 if len(packed) > 0 {
1051 grp.Go(func() error {
1052 bm := s.meta.BeginBatch()
1053 now := time.Now()
1054 for zipRef := range large {
1055 bm.Set("d:"+zipRef.String(), fmt.Sprint(now.Unix()))
1056 }
1057 for _, br := range packed {
1058 bm.Delete("b:" + br.String())
1059 }
1060 return s.meta.CommitBatch(bm)
1061 })
1062 }
1063 return grp.Err()
1064 }
1065
1066 var statGate = syncutil.NewGate(50)
1067
1068 func (s *storage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
1069 var (
1070 trySmallMu sync.Mutex
1071 trySmall []blob.Ref
1072 )
1073
1074 err := blobserver.StatBlobsParallelHelper(ctx, blobs, fn, statGate, func(br blob.Ref) (sb blob.SizedRef, err error) {
1075 m, err := s.getMetaRow(br)
1076 if err != nil {
1077 return sb, err
1078 }
1079 if m.exists {
1080 return blob.SizedRef{Ref: br, Size: m.size}, nil
1081 }
1082
1083 trySmallMu.Lock()
1084 trySmall = append(trySmall, br)
1085 trySmallMu.Unlock()
1086 return sb, nil
1087 })
1088 if err != nil {
1089 return err
1090 }
1091 if len(trySmall) == 0 {
1092 return nil
1093 }
1094 return s.small.StatBlobs(ctx, trySmall, fn)
1095 }
1096
1097 func (s *storage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) (err error) {
1098 return blobserver.MergedEnumerate(ctx, dest, []blobserver.BlobEnumerator{
1099 s.small,
1100 enumerator{s},
1101 }, after, limit)
1102 }
1103
1104
1105 type enumerator struct {
1106 *storage
1107 }
1108
1109 func (s enumerator) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) (err error) {
1110 defer close(dest)
1111 t := s.meta.Find(blobMetaPrefix+after, blobMetaPrefixLimit)
1112 defer func() {
1113 closeErr := t.Close()
1114 if err == nil {
1115 err = closeErr
1116 }
1117 }()
1118 n := 0
1119 afterb := []byte(after)
1120 for n < limit && t.Next() {
1121 key := t.KeyBytes()[len(blobMetaPrefix):]
1122 if n == 0 && bytes.Equal(key, afterb) {
1123 continue
1124 }
1125 n++
1126 br, ok := blob.ParseBytes(key)
1127 if !ok {
1128 return fmt.Errorf("unknown key %q in meta index", t.Key())
1129 }
1130 size, err := parseMetaRowSizeOnly(t.ValueBytes())
1131 if err != nil {
1132 return err
1133 }
1134 select {
1135 case <-ctx.Done():
1136 return ctx.Err()
1137 case dest <- blob.SizedRef{Ref: br, Size: size}:
1138 }
1139 }
1140 return nil
1141 }
1142
1143 func (s *storage) packFile(ctx context.Context, fileRef blob.Ref) (err error) {
1144 s.Logf("Packing file %s ...", fileRef)
1145 defer func() {
1146 if err == nil {
1147 s.Logf("Packed file %s", fileRef)
1148 } else {
1149 s.Logf("Error packing file %s: %v", fileRef, err)
1150 }
1151 }()
1152
1153 fr, err := schema.NewFileReader(ctx, s, fileRef)
1154 if err != nil {
1155 return err
1156 }
1157 return newPacker(s, fileRef, fr).pack(ctx)
1158 }
1159
1160 func newPacker(s *storage, fileRef blob.Ref, fr *schema.FileReader) *packer {
1161 return &packer{
1162 s: s,
1163 fileRef: fileRef,
1164 fr: fr,
1165 dataSize: map[blob.Ref]uint32{},
1166 schemaBlob: map[blob.Ref]*blob.Blob{},
1167 schemaParent: map[blob.Ref][]blob.Ref{},
1168 }
1169 }
1170
1171
1172 type packer struct {
1173 s *storage
1174 fileRef blob.Ref
1175 fr *schema.FileReader
1176
1177 wholeRef blob.Ref
1178 wholeSize int64
1179
1180 dataRefs []blob.Ref
1181 dataSize map[blob.Ref]uint32
1182
1183 schemaRefs []blob.Ref
1184 schemaBlob map[blob.Ref]*blob.Blob
1185 schemaParent map[blob.Ref][]blob.Ref
1186
1187 chunksRemain []blob.Ref
1188 zips []writtenZip
1189 wholeBytesWritten int64
1190 }
1191
1192 type writtenZip struct {
1193 blob.SizedRef
1194 dataRefs []blob.Ref
1195 }
1196
1197 var (
1198 testHookSawTruncate func(blob.Ref)
1199 testHookStopBeforeOverflowing func()
1200 )
1201
1202 func (pk *packer) pack(ctx context.Context) error {
1203 if err := pk.scanChunks(ctx); err != nil {
1204 return err
1205 }
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217 h := blob.NewHash()
1218 var err error
1219 pk.wholeSize, err = io.Copy(h, pk.fr)
1220 if err != nil {
1221 return err
1222 }
1223 pk.wholeRef = blob.RefFromHash(h)
1224 wholeKey := wholeMetaPrefix + pk.wholeRef.String()
1225 _, err = pk.s.meta.Get(wholeKey)
1226 if err == nil {
1227
1228 return fmt.Errorf("already have wholeref %v packed; not packing again", pk.wholeRef)
1229 } else if err != sorted.ErrNotFound {
1230 return err
1231 }
1232
1233 pk.chunksRemain = pk.dataRefs
1234 var trunc blob.Ref
1235 MakingZips:
1236 for len(pk.chunksRemain) > 0 {
1237 if err := pk.writeAZip(ctx, trunc); err != nil {
1238 if needTrunc, ok := err.(needsTruncatedAfterError); ok {
1239 trunc = needTrunc.Ref
1240 if fn := testHookSawTruncate; fn != nil {
1241 fn(trunc)
1242 }
1243 continue MakingZips
1244 }
1245 return err
1246 }
1247 trunc = blob.Ref{}
1248 }
1249
1250
1251 err = pk.s.meta.Set(wholeKey, fmt.Sprintf("%d %d", pk.wholeSize, len(pk.zips)))
1252 if err != nil {
1253 return fmt.Errorf("Error setting %s: %v", wholeKey, err)
1254 }
1255
1256 return nil
1257 }
1258
1259 func (pk *packer) scanChunks(ctx context.Context) error {
1260 schemaSeen := map[blob.Ref]bool{}
1261 return pk.fr.ForeachChunk(ctx, func(schemaPath []blob.Ref, p schema.BytesPart) error {
1262 if !p.BlobRef.Valid() {
1263 return errors.New("sparse files are not packed")
1264 }
1265 if p.Offset != 0 {
1266
1267
1268 return errors.New("file uses complicated schema. not packing")
1269 }
1270 pk.schemaParent[p.BlobRef] = append([]blob.Ref(nil), schemaPath...)
1271 pk.dataSize[p.BlobRef] = uint32(p.Size)
1272 for _, schemaRef := range schemaPath {
1273 if schemaSeen[schemaRef] {
1274 continue
1275 }
1276 schemaSeen[schemaRef] = true
1277 pk.schemaRefs = append(pk.schemaRefs, schemaRef)
1278 if b, err := blob.FromFetcher(ctx, pk.s, schemaRef); err != nil {
1279 return err
1280 } else {
1281 pk.schemaBlob[schemaRef] = b
1282 }
1283 }
1284 pk.dataRefs = append(pk.dataRefs, p.BlobRef)
1285 return nil
1286 })
1287 }
1288
1289
1290
1291 type needsTruncatedAfterError struct{ blob.Ref }
1292
1293 func (e needsTruncatedAfterError) Error() string { return "needs truncation after " + e.Ref.String() }
1294
1295
1296
1297 func check(err error) {
1298 if err != nil {
1299 b := make([]byte, 2<<10)
1300 b = b[:runtime.Stack(b, false)]
1301 log.Printf("Unlikely error condition triggered: %v at %s", err, b)
1302 panic(err)
1303 }
1304 }
1305
1306
1307
1308
1309
1310 func (pk *packer) writeAZip(ctx context.Context, trunc blob.Ref) (err error) {
1311 defer func() {
1312 if e := recover(); e != nil {
1313 if v, ok := e.(error); ok && err == nil {
1314 err = v
1315 } else {
1316 panic(e)
1317 }
1318 }
1319 }()
1320 mf := Manifest{
1321 WholeRef: pk.wholeRef,
1322 WholeSize: pk.wholeSize,
1323 WholePartIndex: len(pk.zips),
1324 }
1325 var zbuf bytes.Buffer
1326 cw := &countWriter{w: &zbuf}
1327 zw := zip.NewWriter(cw)
1328
1329 var approxSize = zipFixedOverhead
1330 var dataRefsWritten []blob.Ref
1331 var dataBytesWritten int64
1332 var schemaBlobSeen = map[blob.Ref]bool{}
1333 var schemaBlobs []blob.Ref
1334
1335 baseFileName := pk.fr.FileName()
1336 if strings.Contains(baseFileName, "/") || strings.Contains(baseFileName, "\\") {
1337 return fmt.Errorf("File schema blob %v filename had a slash in it: %q", pk.fr.SchemaBlobRef(), baseFileName)
1338 }
1339 fh := &zip.FileHeader{
1340 Name: baseFileName,
1341 Method: zip.Store,
1342 }
1343 fh.SetModTime(pk.fr.ModTime())
1344 fh.SetMode(0644)
1345 fw, err := zw.CreateHeader(fh)
1346 check(err)
1347 check(zw.Flush())
1348 dataStart := cw.n
1349 approxSize += zipPerEntryOverhead
1350
1351 zipMax := pk.s.maxZipBlobSize()
1352 chunks := pk.chunksRemain
1353 chunkWholeHash := blob.NewHash()
1354 for len(chunks) > 0 {
1355 dr := chunks[0]
1356
1357 if trunc.Valid() && trunc == dr {
1358 if approxSize == 0 {
1359 return errors.New("first blob is too large to pack, once you add the zip overhead")
1360 }
1361 break
1362 }
1363
1364 schemaBlobsSave := schemaBlobs
1365 for _, parent := range pk.schemaParent[dr] {
1366 if !schemaBlobSeen[parent] {
1367 schemaBlobSeen[parent] = true
1368 schemaBlobs = append(schemaBlobs, parent)
1369 approxSize += int(pk.schemaBlob[parent].Size()) + zipPerEntryOverhead
1370 }
1371 }
1372
1373 thisSize := pk.dataSize[dr]
1374 approxSize += int(thisSize)
1375 if approxSize+mf.approxSerializedSize() > zipMax {
1376 if fn := testHookStopBeforeOverflowing; fn != nil {
1377 fn()
1378 }
1379 schemaBlobs = schemaBlobsSave
1380 break
1381 }
1382
1383
1384 rc, size, err := pk.s.Fetch(ctx, dr)
1385 check(err)
1386 if size != thisSize {
1387 rc.Close()
1388 return errors.New("unexpected size")
1389 }
1390 if n, err := io.Copy(io.MultiWriter(fw, chunkWholeHash), rc); err != nil || n != int64(size) {
1391 rc.Close()
1392 return fmt.Errorf("copy to zip = %v, %v; want %v bytes", n, err, size)
1393 }
1394 rc.Close()
1395
1396 dataRefsWritten = append(dataRefsWritten, dr)
1397 dataBytesWritten += int64(size)
1398 chunks = chunks[1:]
1399 }
1400 mf.DataBlobsOrigin = blob.RefFromHash(chunkWholeHash)
1401
1402
1403
1404 var zipBlobs []BlobAndPos
1405
1406 var dataOffset int64
1407 for _, br := range dataRefsWritten {
1408 size := pk.dataSize[br]
1409 mf.DataBlobs = append(mf.DataBlobs, BlobAndPos{blob.SizedRef{Ref: br, Size: size}, dataOffset})
1410
1411 zipBlobs = append(zipBlobs, BlobAndPos{blob.SizedRef{Ref: br, Size: size}, dataStart + dataOffset})
1412 dataOffset += int64(size)
1413 }
1414
1415 for _, br := range schemaBlobs {
1416 fw, err := zw.CreateHeader(&zip.FileHeader{
1417 Name: "camlistore/" + br.String() + ".json",
1418 Method: zip.Store,
1419 })
1420 check(err)
1421 check(zw.Flush())
1422 b := pk.schemaBlob[br]
1423 zipBlobs = append(zipBlobs, BlobAndPos{blob.SizedRef{Ref: br, Size: b.Size()}, cw.n})
1424 r, err := b.ReadAll(ctx)
1425 if err != nil {
1426 return err
1427 }
1428 n, err := io.Copy(fw, r)
1429
1430 check(err)
1431 if n != int64(b.Size()) {
1432 return fmt.Errorf("failed to write all of schema blob %v: %d bytes, not wanted %d", br, n, b.Size())
1433 }
1434 }
1435
1436
1437 fw, err = zw.Create(zipManifestPath)
1438 check(err)
1439 enc, err := json.MarshalIndent(mf, "", " ")
1440 check(err)
1441 _, err = fw.Write(enc)
1442 check(err)
1443 err = zw.Close()
1444 check(err)
1445
1446 if zbuf.Len() > zipMax {
1447
1448 overage := zbuf.Len() - zipMax
1449 for i := len(dataRefsWritten) - 1; i >= 0; i-- {
1450 dr := dataRefsWritten[i]
1451 if overage <= 0 {
1452 return needsTruncatedAfterError{dr}
1453 }
1454 overage -= int(pk.dataSize[dr])
1455 }
1456 return errors.New("file is unpackable; first blob is too big to fit")
1457 }
1458
1459 zipRef := blob.RefFromBytes(zbuf.Bytes())
1460 zipSB, err := blobserver.ReceiveNoHash(ctx, pk.s.large, zipRef, bytes.NewReader(zbuf.Bytes()))
1461 if err != nil {
1462 return err
1463 }
1464
1465 bm := pk.s.meta.BeginBatch()
1466 bm.Set(fmt.Sprintf("%s%s:%d", wholeMetaPrefix, pk.wholeRef, len(pk.zips)),
1467 fmt.Sprintf("%s %d %d %d",
1468 zipRef,
1469 dataStart,
1470 pk.wholeBytesWritten,
1471 dataBytesWritten))
1472 bm.Set(fmt.Sprintf("%s%v", zipMetaPrefix, zipRef),
1473 fmt.Sprintf("%d %v %d %d %d",
1474 zipSB.Size,
1475 pk.wholeRef,
1476 pk.wholeSize,
1477 pk.wholeBytesWritten,
1478 dataBytesWritten))
1479
1480 pk.wholeBytesWritten += dataBytesWritten
1481 pk.zips = append(pk.zips, writtenZip{
1482 SizedRef: zipSB,
1483 dataRefs: dataRefsWritten,
1484 })
1485
1486 for _, zb := range zipBlobs {
1487 bm.Set(blobMetaPrefix+zb.Ref.String(), fmt.Sprintf("%d %v %d", zb.Size, zipRef, zb.Offset))
1488 }
1489 if err := pk.s.meta.CommitBatch(bm); err != nil {
1490 return err
1491 }
1492
1493
1494 if !pk.s.skipDelete {
1495 toDelete := make([]blob.Ref, 0, len(dataRefsWritten)+len(schemaBlobs))
1496 toDelete = append(toDelete, dataRefsWritten...)
1497 toDelete = append(toDelete, schemaBlobs...)
1498 if err := pk.s.small.RemoveBlobs(ctx, toDelete); err != nil {
1499
1500
1501 pk.s.Logf("Error removing blobs from %s: %v", pk.s.small, err)
1502 }
1503 }
1504
1505
1506 pk.chunksRemain = pk.chunksRemain[len(dataRefsWritten):]
1507 return nil
1508 }
1509
1510 type zipOpenError struct {
1511 zipRef blob.Ref
1512 err error
1513 }
1514
1515 func (ze zipOpenError) Error() string {
1516 return fmt.Sprintf("Error opening packed zip blob %v: %v", ze.zipRef, ze.err)
1517 }
1518
1519
1520
1521
1522 func (s *storage) foreachZipBlob(ctx context.Context, zipRef blob.Ref, fn func(BlobAndPos) error) error {
1523 sb, err := blobserver.StatBlob(ctx, s.large, zipRef)
1524 if err != nil {
1525 return err
1526 }
1527 zr, err := zip.NewReader(blob.ReaderAt(ctx, s.large, zipRef), int64(sb.Size))
1528 if err != nil {
1529 return zipOpenError{zipRef, err}
1530 }
1531 var maniFile *zip.File
1532 var firstOff int64
1533 for i, f := range zr.File {
1534 if i == 0 {
1535 firstOff, err = f.DataOffset()
1536 if err != nil {
1537 return err
1538 }
1539 }
1540 if f.Name == zipManifestPath {
1541 maniFile = f
1542 break
1543 }
1544 }
1545 if maniFile == nil {
1546 return errors.New("no camlistore manifest file found in zip")
1547 }
1548
1549 for _, f := range zr.File {
1550 if !strings.HasPrefix(f.Name, "camlistore/") || f.Name == zipManifestPath ||
1551 !strings.HasSuffix(f.Name, ".json") {
1552 continue
1553 }
1554 brStr := strings.TrimSuffix(strings.TrimPrefix(f.Name, "camlistore/"), ".json")
1555 br, ok := blob.Parse(brStr)
1556 if ok {
1557 off, err := f.DataOffset()
1558 if err != nil {
1559 return err
1560 }
1561 if err := fn(BlobAndPos{
1562 SizedRef: blob.SizedRef{Ref: br, Size: uint32(f.UncompressedSize64)},
1563 Offset: off,
1564 }); err != nil {
1565 return err
1566 }
1567 }
1568 }
1569 maniRC, err := maniFile.Open()
1570 if err != nil {
1571 return err
1572 }
1573 defer maniRC.Close()
1574
1575 var mf Manifest
1576 if err := json.NewDecoder(maniRC).Decode(&mf); err != nil {
1577 return err
1578 }
1579 if !mf.WholeRef.Valid() || mf.WholeSize == 0 || !mf.DataBlobsOrigin.Valid() {
1580 return errors.New("incomplete blobpack manifest JSON")
1581 }
1582
1583 for _, bap := range mf.DataBlobs {
1584 bap.Offset += firstOff
1585 if err := fn(bap); err != nil {
1586 return err
1587 }
1588 }
1589 return nil
1590 }
1591
1592
1593
1594 func (s *storage) deleteZipPack(ctx context.Context, br blob.Ref) error {
1595 inUse, err := s.zipPartsInUse(ctx, br)
1596 if err != nil {
1597 return err
1598 }
1599 if len(inUse) > 0 {
1600 return fmt.Errorf("can't delete zip pack %v: %d parts in use: %v", br, len(inUse), inUse)
1601 }
1602 if err := s.large.RemoveBlobs(ctx, []blob.Ref{br}); err != nil {
1603 return err
1604 }
1605 return s.meta.Delete("d:" + br.String())
1606 }
1607
1608 func (s *storage) zipPartsInUse(ctx context.Context, br blob.Ref) ([]blob.Ref, error) {
1609 var (
1610 mu sync.Mutex
1611 inUse []blob.Ref
1612 )
1613 var grp syncutil.Group
1614 gate := syncutil.NewGate(20)
1615 err := s.foreachZipBlob(ctx, br, func(bap BlobAndPos) error {
1616 gate.Start()
1617 grp.Go(func() error {
1618 defer gate.Done()
1619 mr, err := s.getMetaRow(bap.Ref)
1620 if err != nil {
1621 return err
1622 }
1623 if mr.isPacked() {
1624 mu.Lock()
1625 inUse = append(inUse, mr.largeRef)
1626 mu.Unlock()
1627 }
1628 return nil
1629 })
1630 return nil
1631 })
1632 if os.IsNotExist(err) {
1633
1634
1635 return nil, nil
1636 }
1637 if err != nil {
1638 return nil, err
1639 }
1640 if err := grp.Err(); err != nil {
1641 return nil, err
1642 }
1643 return inUse, nil
1644 }
1645
1646
1647
1648 type BlobAndPos struct {
1649 blob.SizedRef
1650 Offset int64 `json:"offset"`
1651 }
1652
1653
1654
1655
1656 type Manifest struct {
1657
1658
1659
1660
1661 WholeRef blob.Ref `json:"wholeRef"`
1662
1663
1664
1665 WholeSize int64 `json:"wholeSize"`
1666
1667
1668
1669
1670
1671
1672 WholePartIndex int `json:"wholePartIndex"`
1673
1674
1675
1676
1677
1678 DataBlobsOrigin blob.Ref `json:"dataBlobsOrigin"`
1679
1680
1681
1682
1683
1684 DataBlobs []BlobAndPos `json:"dataBlobs"`
1685 }
1686
1687
1688
1689
1690
1691
1692
1693
1694 func (mf *Manifest) approxSerializedSize() int {
1695
1696
1697
1698
1699
1700
1701 return (204 + len(mf.DataBlobs)*119) / 2
1702 }
1703
1704 type countWriter struct {
1705 w io.Writer
1706 n int64
1707 }
1708
1709 func (cw *countWriter) Write(p []byte) (n int, err error) {
1710 n, err = cw.w.Write(p)
1711 cw.n += int64(n)
1712 return
1713 }