Home Download Docs Code Community
     1	/*
     2	Copyright 2014 The Perkeep AUTHORS
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     8	     http://www.apache.org/licenses/LICENSE-2.0
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    17	/*
    18	Package blobpacked registers the "blobpacked" blobserver storage type,
    19	storing blobs initially as one physical blob per logical blob, but then
    20	rearranging little physical blobs into large contiguous blobs organized by
    21	how they'll likely be accessed. An index tracks the mapping from logical to
    22	physical blobs.
    24	Example low-level config:
    26		"/storage/": {
    27		    "handler": "storage-blobpacked",
    28		    "handlerArgs": {
    29		       "smallBlobs": "/small/",
    30		       "largeBlobs": "/large/",
    31		       "metaIndex": {
    32		          "type": "mysql",
    33		           .....
    34		       }
    35		     }
    36		}
    38	The resulting large blobs are valid zip files. Those blobs may up be up to
    39	16 MB and contain the original contiguous file (or fractions of it), as well
    40	as metadata about how the file is cut up. The zip file will have the
    41	following structure:
    43		foo.jpg       (or whatever)
    44		camlistore/sha1-beb1df0b75952c7d277905ad14de71ef7ef90c44.json (some file ref)
    45		camlistore/sha1-a0ceb10b04403c9cc1d032e07a9071db5e711c9a.json (some bytes ref)
    46		camlistore/sha1-7b4d9c8529c27d592255c6dfb17188493db96ccc.json (another bytes ref)
    47		camlistore/camlistore-pack-manifest.json
    49	The camlistore-pack-manifest.json is documented on the exported
    50	Manifest type. It looks like this:
    52		{
    53		  "wholeRef": "sha1-0e64816d731a56915e8bb4ae4d0ac7485c0b84da",
    54		  "wholeSize": 2962227200, // 2.8GB; so will require ~176-180 16MB chunks
    55		  "wholePartIndex": 17,    // 0-based
    56		  "dataBlobsOrigin": "sha1-355705cf62a56669303d2561f29e0620a676c36e",
    57		  "dataBlobs": [
    58		      {"blob": "sha1-f1d2d2f924e986ac86fdf7b36c94bcdf32beec15", "offset": 0, "size": 273048},
    59		      {"blob": "sha1-e242ed3bffccdf271b7fbaf34ed72d089537b42f", "offset": 273048, "size": 112783},
    60		      {"blob": "sha1-6eadeac2dade6347e87c0d24fd455feffa7069f0", "offset": 385831, ...},
    61		      {"blob": "sha1-9425cca1dde5d8b6eb70cd087db4e356da92396e", "offset": ...},
    62		      {"blob": "sha1-7709559a3c8668c57cc0a2f57c418b1cc3598049", "offset": ...},
    63		      {"blob": "sha1-f62cb5d05cfbf2a7a6c7f8339d0a4bf1dcd0ab6c", "offset": ...}
    64		  ] // raw data blobs of foo.jpg
    65		}
    67	The manifest.json ensures that if the metadata index is lost, all the
    68	data can be reconstructed from the raw zip files.
    70	The 'wholeRef' property specifies which large file that this zip is building
    71	up.  If the file is less than 15.5 MB or so (leaving room for the zip
    72	overhead and manifest size), it will probably all be in one zip and the
    73	first file in the zip will be the whole thing. Otherwise it'll be cut across
    74	multiple zip files, each no larger than 16MB. In that case, each part of the
    75	file will have a different 'wholePartIndex' number, starting at index
    76	0. Each will have the same 'wholeSize'.
    77	*/
    78	package blobpacked // import "perkeep.org/pkg/blobserver/blobpacked"
    80	// TODO: BlobStreamer using the zip manifests, for recovery.
    82	import (
    83		"archive/zip"
    84		"bytes"
    85		"context"
    86		"crypto/sha1"
    87		"encoding/json"
    88		"errors"
    89		"fmt"
    90		"io"
    91		"log"
    92		"os"
    93		"runtime"
    94		"sort"
    95		"strconv"
    96		"strings"
    97		"sync"
    98		"time"
   100		"perkeep.org/internal/pools"
   101		"perkeep.org/pkg/blob"
   102		"perkeep.org/pkg/blobserver"
   103		"perkeep.org/pkg/constants"
   104		"perkeep.org/pkg/env"
   105		"perkeep.org/pkg/schema"
   106		"perkeep.org/pkg/sorted"
   108		"go4.org/jsonconfig"
   109		"go4.org/strutil"
   110		"go4.org/syncutil"
   111	)
   113	// TODO: evaluate whether this should even be 0, to keep the schema blobs together at least.
   114	// Files under this size aren't packed.
   115	const packThreshold = 512 << 10
   117	// Overhead for zip files.
   118	// These are only variables so they can be changed by tests, but
   119	// they're effectively constant.
   120	var (
   121		zipFixedOverhead = 20 /*directory64EndLen*/ +
   122			56 /*directory64LocLen */ +
   123			22 /*directoryEndLen*/ +
   124			512 /* conservative slop space, to get us away from 16 MB zip boundary */
   125		zipPerEntryOverhead = 30 /*fileHeaderLen*/ +
   126			24 /*dataDescriptor64Len*/ +
   127			22 /*directoryEndLen*/ +
   128			len("camlistore/sha1-f1d2d2f924e986ac86fdf7b36c94bcdf32beec15.dat")*3/2 /*padding for larger blobrefs*/
   129	)
   131	// meta key prefixes
   132	const (
   133		blobMetaPrefix      = "b:"
   134		blobMetaPrefixLimit = "b;"
   136		wholeMetaPrefix      = "w:"
   137		wholeMetaPrefixLimit = "w;"
   139		zipMetaPrefix      = "z:"
   140		zipMetaPrefixLimit = "z;"
   141	)
   143	const (
   144		zipManifestPath = "camlistore/camlistore-pack-manifest.json"
   145	)
   147	// RecoveryMode is the mode in which the blobpacked server starts.
   148	type RecoveryMode int
   150	// Note: not using iota for these, because they're stored in GCE
   151	// instance's metadata values.
   152	const (
   153		// NoRecovery means blobpacked does not attempt to repair its index on startup.
   154		// It is the default.
   155		NoRecovery RecoveryMode = 0
   156		// FastRecovery populates the blobpacked index, without erasing any existing one.
   157		FastRecovery RecoveryMode = 1
   158		// FullRecovery erases the existing blobpacked index, then rebuilds it.
   159		FullRecovery RecoveryMode = 2
   160	)
   162	var (
   163		recoveryMu sync.Mutex
   164		recovery   = NoRecovery
   165	)
   167	// TODO(mpl): make SetRecovery a method of type storage if we ever export it.
   169	// SetRecovery sets the recovery mode for the blobpacked package.
   170	// If set to one of the modes other than NoRecovery, it means that any
   171	// blobpacked storage subsequently initialized will automatically start with
   172	// rebuilding its meta index of zip files, in accordance with the selected mode.
   173	func SetRecovery(mode RecoveryMode) {
   174		recoveryMu.Lock()
   175		defer recoveryMu.Unlock()
   176		recovery = mode
   177	}
   179	type subFetcherStorage interface {
   180		blobserver.Storage
   181		blob.SubFetcher
   182	}
   184	// TODO(mpl): all a logf method or something to storage so we get the
   185	// "blobpacked:" prefix automatically to log messages.
   187	type storage struct {
   188		small blobserver.Storage
   189		large subFetcherStorage
   191		// meta key -> value rows are:
   192		//
   193		// For logical blobs packed within a large blob, "b:" prefix:
   194		//   b:sha1-xxxx -> "<size> <big-blobref> <offset_u32>"
   195		//
   196		// For wholerefs: (wholeMetaPrefix)
   197		//   w:sha1-xxxx(wholeref) -> "<nbytes_total_u64> <nchunks_u32>"
   198		// Then for each big nchunk of the file:
   199		// The wholeRef and the chunk number as a key to: the blobRef of the zip
   200		// file, the position of the data within the zip, the position of the data
   201		// within the uploaded whole file, the length of data in this zip.
   202		//   w:sha1-xxxx:0 -> "<zipchunk-blobref> <offset-in-zipchunk-blobref> <offset-in-whole_u64> <length_u32>"
   203		//   w:sha1-xxxx:...
   204		//   w:sha1-xxxx:(nchunks-1)
   205		//
   206		// For zipRefs: (zipMetaPrefix)
   207		// key: blobref of the zip, prefixed by "z:"
   208		// value: size of the zip, blobref of the contents of the whole file (which may
   209		// span multiple zips, ~15.5 MB of data per zip), size of the whole file, position
   210		// in the whole file of the data (first file) in the zip, size of the data in the
   211		// zip (== size of the zip's first file).
   212		//   z:<zip-blobref> -> "<zip_size_u32> <whole_ref_from_zip_manifest> <whole_size_u64>
   213		//   <zip_data_offset_in_whole_u64> <zip_data_bytes_u32>"
   214		//
   215		// For marking that zips that have blobs (possibly all)
   216		// deleted from inside them: (deleted zip)
   217		//   d:sha1-xxxxxx -> <unix-time-of-delete>
   218		meta sorted.KeyValue
   220		// If non-zero, the maximum size of a zip blob.
   221		// It defaults to constants.MaxBlobSize.
   222		forceMaxZipBlobSize int
   224		skipDelete bool // don't delete from small after packing
   226		packGate *syncutil.Gate
   228		loggerOnce sync.Once
   229		log        *log.Logger // nil means default
   230	}
   232	var (
   233		_ blobserver.BlobStreamer    = (*storage)(nil)
   234		_ blobserver.Generationer    = (*storage)(nil)
   235		_ blobserver.WholeRefFetcher = (*storage)(nil)
   236	)
   238	func (s *storage) String() string {
   239		return fmt.Sprintf("\"blobpacked\" storage")
   240	}
   242	func (s *storage) Logf(format string, args ...interface{}) {
   243		s.logger().Printf(format, args...)
   244	}
   246	func (s *storage) logger() *log.Logger {
   247		s.loggerOnce.Do(s.initLogger)
   248		return s.log
   249	}
   251	func (s *storage) initLogger() {
   252		if s.log == nil {
   253			s.log = log.New(os.Stderr, "blobpacked: ", log.LstdFlags)
   254		}
   255	}
   257	func (s *storage) init() {
   258		s.packGate = syncutil.NewGate(10)
   259	}
   261	func (s *storage) maxZipBlobSize() int {
   262		if s.forceMaxZipBlobSize > 0 {
   263			return s.forceMaxZipBlobSize
   264		}
   265		return constants.MaxBlobSize
   266	}
   268	func init() {
   269		blobserver.RegisterStorageConstructor("blobpacked", blobserver.StorageConstructor(newFromConfig))
   270	}
   272	func newFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (blobserver.Storage, error) {
   273		var (
   274			smallPrefix = conf.RequiredString("smallBlobs")
   275			largePrefix = conf.RequiredString("largeBlobs")
   276			metaConf    = conf.RequiredObject("metaIndex")
   277			keepGoing   = conf.OptionalBool("keepGoing", false)
   278		)
   279		if err := conf.Validate(); err != nil {
   280			return nil, err
   281		}
   282		small, err := ld.GetStorage(smallPrefix)
   283		if err != nil {
   284			return nil, fmt.Errorf("failed to load smallBlobs at %s: %v", smallPrefix, err)
   285		}
   286		large, err := ld.GetStorage(largePrefix)
   287		if err != nil {
   288			return nil, fmt.Errorf("failed to load largeBlobs at %s: %v", largePrefix, err)
   289		}
   290		largeSubber, ok := large.(subFetcherStorage)
   291		if !ok {
   292			return nil, fmt.Errorf("largeBlobs at %q of type %T doesn't support fetching sub-ranges of blobs",
   293				largePrefix, large)
   294		}
   295		meta, err := sorted.NewKeyValueMaybeWipe(metaConf)
   296		if err != nil {
   297			return nil, fmt.Errorf("failed to setup blobpacked metaIndex: %v", err)
   298		}
   299		sto := &storage{
   300			small: small,
   301			large: largeSubber,
   302			meta:  meta,
   303		}
   304		sto.init()
   306		recoveryMu.Lock()
   307		defer recoveryMu.Unlock()
   308		condFatalf := func(pattern string, args ...interface{}) {
   309			log.Printf(pattern, args...)
   310			if !keepGoing {
   311				os.Exit(1)
   312			}
   313		}
   315		var newKv func() (sorted.KeyValue, error)
   316		switch recovery {
   317		case FastRecovery:
   318			newKv = func() (sorted.KeyValue, error) {
   319				return sorted.NewKeyValue(metaConf)
   320			}
   321		case FullRecovery:
   322			newKv = func() (sorted.KeyValue, error) {
   323				kv, err := sorted.NewKeyValue(metaConf)
   324				if err != nil {
   325					return nil, err
   326				}
   327				wiper, ok := kv.(sorted.Wiper)
   328				if !ok {
   329					return nil, fmt.Errorf("blobpacked meta index of type %T needs to be wiped, but does not support automatic wiping. It should be removed manually.", kv)
   330				}
   331				if err := wiper.Wipe(); err != nil {
   332					return nil, fmt.Errorf("blobpacked meta index of type %T could not be wiped: %v", kv, err)
   333				}
   334				return kv, nil
   335			}
   336		}
   337		if newKv != nil {
   338			// i.e. we're in one of the recovery modes
   339			log.Print("Starting recovery of blobpacked index")
   340			if err := meta.Close(); err != nil {
   341				return nil, err
   342			}
   343			if err := sto.reindex(context.TODO(), newKv); err != nil {
   344				return nil, err
   345			}
   346			if _, err := sto.checkLargeIntegrity(); err != nil {
   347				condFatalf("blobpacked: reindexed successfully, but error after validation: %v", err)
   348			}
   349			return sto, nil
   350		}
   352		// Check for a weird state: zip files exist, but no metadata about them
   353		// is recorded. This is probably a corrupt state, and the user likely
   354		// wants to recover.
   355		if !sto.anyMeta() && sto.anyZipPacks() {
   356			if env.OnGCE() {
   357				// TODO(mpl): make web UI page/mode that informs about this error.
   358				condFatalf("Error: blobpacked storage detects non-zero packed zips, but no metadata. Please switch to recovery mode: add the \"camlistore-recovery = %d\" key/value to the Custom metadata of your instance. And restart the instance.", FastRecovery)
   359			}
   360			condFatalf("Error: blobpacked storage detects non-zero packed zips, but no metadata. Please re-start in recovery mode with -recovery=%d", FastRecovery)
   361		}
   363		if mode, err := sto.checkLargeIntegrity(); err != nil {
   364			if mode <= NoRecovery {
   365				condFatalf("%v", err)
   366			}
   367			if env.OnGCE() {
   368				// TODO(mpl): make web UI page/mode that informs about this error.
   369				condFatalf("Error: %v. Please switch to recovery mode: add the \"camlistore-recovery = %d\" key/value to the Custom metadata of your instance. And restart the instance.", err, mode)
   370			}
   371			condFatalf("Error: %v. Please re-start in recovery mode with -recovery=%d", err, mode)
   372		}
   374		return sto, nil
   375	}
   377	// checkLargeIntegrity verifies that all large blobs in the large storage are
   378	// indexed in meta, and vice-versa, that all rows in meta referring to a large blob
   379	// correspond to an existing large blob in the large storage. If any of the above
   380	// is not true, it returns the recovery mode that should be used to fix the
   381	// problem, as well as the error detailing the problem. It does not perform any
   382	// check about the contents of the large blobs themselves.
   383	func (s *storage) checkLargeIntegrity() (RecoveryMode, error) {
   384		inLarge := 0
   385		var missing []blob.Ref // blobs in large but not in meta
   386		var extra []blob.Ref   // blobs in meta but not in large
   387		t := s.meta.Find(zipMetaPrefix, zipMetaPrefixLimit)
   388		defer t.Close()
   389		iterate := true
   390		var enumFunc func(sb blob.SizedRef) error
   391		enumFunc = func(sb blob.SizedRef) error {
   392			if iterate && !t.Next() {
   393				// all of the yet to be enumerated are missing from meta
   394				missing = append(missing, sb.Ref)
   395				return nil
   396			}
   397			iterate = true
   398			wantMetaKey := zipMetaPrefix + sb.Ref.String()
   399			metaKey := t.Key()
   400			if metaKey != wantMetaKey {
   401				if metaKey > wantMetaKey {
   402					// zipRef missing from meta
   403					missing = append(missing, sb.Ref)
   404					iterate = false
   405					return nil
   406				}
   407				// zipRef in meta that actually does not exist in s.large.
   408				xbr, ok := blob.Parse(strings.TrimPrefix(metaKey, zipMetaPrefix))
   409				if !ok {
   410					return fmt.Errorf("bogus key in z: row: %q", metaKey)
   411				}
   412				extra = append(extra, xbr)
   413				// iterate meta once more at the same storage enumeration point
   414				return enumFunc(sb)
   415			}
   416			if _, err := parseZipMetaRow(t.ValueBytes()); err != nil {
   417				return fmt.Errorf("error parsing row from meta: %v", err)
   418			}
   419			inLarge++
   420			return nil
   421		}
   422		log.Printf("blobpacked: checking integrity of packed blobs against index...")
   423		if err := blobserver.EnumerateAllFrom(context.Background(), s.large, "", enumFunc); err != nil {
   424			return FullRecovery, err
   425		}
   426		log.Printf("blobpacked: %d large blobs found in index, %d missing from index", inLarge, len(missing))
   427		if len(missing) > 0 {
   428			printSample(missing, "missing")
   429		}
   430		if len(extra) > 0 {
   431			printSample(extra, "extra")
   432			return FullRecovery, fmt.Errorf("%d large blobs in index but not actually in storage", len(extra))
   433		}
   434		if err := t.Close(); err != nil {
   435			return FullRecovery, fmt.Errorf("error reading or closing index: %v", err)
   436		}
   437		if len(missing) > 0 {
   438			return FastRecovery, fmt.Errorf("%d large blobs missing from index", len(missing))
   439		}
   440		return NoRecovery, nil
   441	}
   443	func printSample(fromSlice []blob.Ref, sliceName string) {
   444		sort.Slice(fromSlice, func(i, j int) bool { return fromSlice[i].Less(fromSlice[j]) })
   445		for i, br := range fromSlice {
   446			if i == 10 {
   447				break
   448			}
   449			log.Printf("  sample %v large blob: %v", sliceName, br)
   450		}
   451	}
   453	// zipMetaInfo is the info needed to write the wholeMetaPrefix and
   454	// zipMetaPrefix entries when reindexing. For a given file, spread over several
   455	// zips, each zip has a corresponding zipMetaInfo. The wholeMetaPrefix and
   456	// zipMetaPrefix rows pertaining to a file can only be written once all the
   457	// zipMetaInfo have been collected and sorted, because the offset of each zip's
   458	// data is derived from the size of the other pieces that precede it in the file.
   459	type zipMetaInfo struct {
   460		wholePartIndex int      // index of that zip, 0-based
   461		zipRef         blob.Ref // ref of the zip file holding packed data blobs + other schema blobs
   462		zipSize        uint32   // size of the zipped file
   463		offsetInZip    uint32   // position of the contiguous data blobs, relative to the zip
   464		dataSize       uint32   // size of the data in the zip
   465		wholeSize      uint64   // size of the whole file that this zip is a part of
   466		wholeRef       blob.Ref // ref of the contents of the whole file
   467	}
   469	// rowValue returns the value of the "z:<zipref>" meta key row
   470	// based on the contents of zm and the provided arguments.
   471	func (zm zipMetaInfo) rowValue(offset uint64) string {
   472		return fmt.Sprintf("%d %v %d %d %d", zm.zipSize, zm.wholeRef, zm.wholeSize, offset, zm.dataSize)
   473	}
   475	// TODO(mpl): add client command to call reindex on an "offline" blobpacked. camtool packblobs -reindex maybe?
   477	// fileName returns the name of the (possibly partial) first file in zipRef
   478	// (i.e. the actual data). It returns a zipOpenError if there was any problem
   479	// reading the zip, and os.ErrNotExist if the zip could not be fetched or if
   480	// there was no file in the zip.
   481	func (s *storage) fileName(ctx context.Context, zipRef blob.Ref) (string, error) {
   482		_, size, err := s.large.Fetch(ctx, zipRef)
   483		if err != nil {
   484			return "", err
   485		}
   486		zr, err := zip.NewReader(blob.ReaderAt(ctx, s.large, zipRef), int64(size))
   487		if err != nil {
   488			return "", zipOpenError{zipRef, err}
   489		}
   490		for _, f := range zr.File {
   491			return f.Name, nil
   492		}
   493		return "", os.ErrNotExist
   494	}
   496	// reindex rebuilds the meta index for packed blobs. It calls newMeta to create
   497	// a new KeyValue on which to write the index, and replaces s.meta with it. There
   498	// is no locking whatsoever so it should not be called when the storage is already
   499	// in use. its signature might change if/when it gets exported.
   500	func (s *storage) reindex(ctx context.Context, newMeta func() (sorted.KeyValue, error)) error {
   501		meta, err := newMeta()
   502		if err != nil {
   503			return fmt.Errorf("failed to create new blobpacked meta index: %v", err)
   504		}
   506		zipMetaByWholeRef := make(map[blob.Ref][]zipMetaInfo)
   508		// first a fast full enumerate, so we can report progress afterwards
   509		packedTotal := 0
   510		blobserver.EnumerateAllFrom(ctx, s.large, "", func(sb blob.SizedRef) error {
   511			packedTotal++
   512			return nil
   513		})
   515		var packedDone, packedSeen int
   516		t := time.NewTicker(10 * time.Second)
   517		defer t.Stop()
   518		if err := blobserver.EnumerateAllFrom(ctx, s.large, "", func(sb blob.SizedRef) error {
   519			select {
   520			case <-t.C:
   521				log.Printf("blobpacked: %d / %d zip packs seen", packedSeen, packedTotal)
   522			default:
   523			}
   524			zipRef := sb.Ref
   525			zr, err := zip.NewReader(blob.ReaderAt(ctx, s.large, zipRef), int64(sb.Size))
   526			if err != nil {
   527				return zipOpenError{zipRef, err}
   528			}
   529			var maniFile *zip.File
   530			var firstOff int64 // offset of first file (the packed data chunks)
   531			for i, f := range zr.File {
   532				if i == 0 {
   533					firstOff, err = f.DataOffset()
   534					if err != nil {
   535						return err
   536					}
   537				}
   538				if f.Name == zipManifestPath {
   539					maniFile = f
   540					break
   541				}
   542			}
   543			if maniFile == nil {
   544				return fmt.Errorf("no perkeep manifest file found in zip %v", zipRef)
   545			}
   546			maniRC, err := maniFile.Open()
   547			if err != nil {
   548				return err
   549			}
   550			defer maniRC.Close()
   551			var mf Manifest
   552			if err := json.NewDecoder(maniRC).Decode(&mf); err != nil {
   553				return err
   554			}
   555			if !mf.WholeRef.Valid() || mf.WholeSize == 0 || !mf.DataBlobsOrigin.Valid() {
   556				return fmt.Errorf("incomplete blobpack manifest JSON in %v", zipRef)
   557			}
   559			bm := meta.BeginBatch()
   560			// In this loop, we write all the blobMetaPrefix entries for the
   561			// data blobs in this zip, and we also compute the dataBytesWritten, for later.
   562			var dataBytesWritten int64
   563			for _, bp := range mf.DataBlobs {
   564				bm.Set(blobMetaPrefix+bp.SizedRef.Ref.String(), fmt.Sprintf("%d %v %d", bp.SizedRef.Size, zipRef, firstOff+bp.Offset))
   565				dataBytesWritten += int64(bp.SizedRef.Size)
   566			}
   567			if dataBytesWritten > (1<<32 - 1) {
   568				return fmt.Errorf("total data blobs size in zip %v overflows uint32", zipRef)
   569			}
   570			dataSize := uint32(dataBytesWritten)
   572			// In this loop, we write all the blobMetaPrefix entries for the schema blobs in this zip
   573			for _, f := range zr.File {
   574				if !(strings.HasPrefix(f.Name, "camlistore/") && strings.HasSuffix(f.Name, ".json")) ||
   575					f.Name == zipManifestPath {
   576					continue
   577				}
   578				br, ok := blob.Parse(strings.TrimSuffix(strings.TrimPrefix(f.Name, "camlistore/"), ".json"))
   579				if !ok {
   580					return fmt.Errorf("schema file in zip %v does not have blobRef as name: %v", zipRef, f.Name)
   581				}
   582				offset, err := f.DataOffset()
   583				if err != nil {
   584					return err
   585				}
   586				bm.Set(blobMetaPrefix+br.String(), fmt.Sprintf("%d %v %d", f.UncompressedSize64, zipRef, offset))
   587			}
   589			if err := meta.CommitBatch(bm); err != nil {
   590				return err
   591			}
   593			// record that info for later, when we got them all, so we can write the wholeMetaPrefix entries.
   594			zipMetaByWholeRef[mf.WholeRef] = append(zipMetaByWholeRef[mf.WholeRef], zipMetaInfo{
   595				wholePartIndex: mf.WholePartIndex,
   596				zipRef:         zipRef,
   597				zipSize:        sb.Size,
   598				offsetInZip:    uint32(firstOff),
   599				dataSize:       dataSize,
   600				wholeSize:      uint64(mf.WholeSize),
   601				wholeRef:       mf.WholeRef, // redundant with zipMetaByWholeRef key for now.
   602			})
   603			packedSeen++
   604			return nil
   605		}); err != nil {
   606			return err
   607		}
   609		// finally, write the wholeMetaPrefix entries
   610		foundDups := false
   611		packedFiles := 0
   612		tt := time.NewTicker(2 * time.Second)
   613		defer tt.Stop()
   614		bm := meta.BeginBatch()
   615		for wholeRef, zipMetas := range zipMetaByWholeRef {
   616			select {
   617			case <-t.C:
   618				log.Printf("blobpacked: %d files reindexed", packedFiles)
   619			default:
   620			}
   621			sort.Slice(zipMetas, func(i, j int) bool { return zipMetas[i].wholePartIndex < zipMetas[j].wholePartIndex })
   622			hasDup := hasDups(zipMetas)
   623			if hasDup {
   624				foundDups = true
   625			}
   626			offsets := wholeOffsets(zipMetas)
   627			for _, z := range zipMetas {
   628				offset := offsets[z.wholePartIndex]
   629				// write the w:row
   630				bm.Set(fmt.Sprintf("%s%s:%d", wholeMetaPrefix, wholeRef, z.wholePartIndex),
   631					fmt.Sprintf("%s %d %d %d", z.zipRef, z.offsetInZip, offset, z.dataSize))
   632				// write the z: row
   633				bm.Set(fmt.Sprintf("%s%v", zipMetaPrefix, z.zipRef), z.rowValue(offset))
   634				packedDone++
   635			}
   636			if hasDup {
   637				if debug, _ := strconv.ParseBool(os.Getenv("CAMLI_DEBUG")); debug {
   638					printDuplicates(zipMetas)
   639				}
   640			}
   642			wholeBytesWritten := offsets[len(offsets)-1]
   643			if zipMetas[0].wholeSize != wholeBytesWritten {
   644				// Any corrupted zip should have been found earlier, so this error means we're
   645				// missing at least one full zip for the whole file to be complete.
   646				fileName, err := s.fileName(ctx, zipMetas[0].zipRef)
   647				if err != nil {
   648					return fmt.Errorf("could not get filename of file in zip %v: %v", zipMetas[0].zipRef, err)
   649				}
   650				log.Printf(
   651					"blobpacked: file %q (wholeRef %v) is incomplete: sum of all zips (%d bytes) does not match manifest's WholeSize (%d bytes)",
   652					fileName, wholeRef, wholeBytesWritten, zipMetas[0].wholeSize)
   653				var allParts []blob.Ref
   654				for _, z := range zipMetas {
   655					allParts = append(allParts, z.zipRef)
   656				}
   657				log.Printf("blobpacked: known parts of %v: %v", wholeRef, allParts)
   658				// we skip writing the w: row for the full file, and we don't count the file
   659				// as complete.
   660				continue
   661			}
   662			bm.Set(fmt.Sprintf("%s%s", wholeMetaPrefix, wholeRef),
   663				fmt.Sprintf("%d %d", wholeBytesWritten, zipMetas[len(zipMetas)-1].wholePartIndex+1))
   664			packedFiles++
   665		}
   666		if err := meta.CommitBatch(bm); err != nil {
   667			return err
   668		}
   670		log.Printf("blobpacked: %d / %d zip packs successfully reindexed", packedDone, packedTotal)
   671		if packedFiles < len(zipMetaByWholeRef) {
   672			log.Printf("blobpacked: %d files reindexed, and %d incomplete file(s) found.", packedFiles, len(zipMetaByWholeRef)-packedFiles)
   673		} else {
   674			log.Printf("blobpacked: %d files reindexed.", packedFiles)
   675		}
   676		if foundDups {
   677			if debug, _ := strconv.ParseBool(os.Getenv("CAMLI_DEBUG")); !debug {
   678				log.Print("blobpacked: zip blobs with duplicate contents were found. Re-run with CAMLI_DEBUG=true for more detail.")
   679			}
   680		}
   682		// TODO(mpl): take into account removed blobs. I can't be done for now
   683		// (2015-01-29) because RemoveBlobs currently only updates the meta index.
   684		// So if the index was lost, all information about removals was lost too.
   686		s.meta = meta
   687		return nil
   688	}
   690	// hasDups reports whether zm contains successive zipRefs which have the same
   691	// wholePartIndex, which we assume means they have the same data contents. It
   692	// panics if that assumption seems wrong, i.e. if the data within assumed
   693	// duplicates is not the same size in all of them. zm must be sorted by
   694	// wholePartIndex.
   695	// See https://github.com/perkeep/perkeep/issues/1079
   696	func hasDups(zm []zipMetaInfo) bool {
   697		i := 0
   698		var dataSize uint32
   699		var firstDup blob.Ref
   700		dupFound := false
   701		for _, z := range zm {
   702			if z.wholePartIndex == i {
   703				firstDup = z.zipRef
   704				dataSize = z.dataSize
   705				i++
   706				continue
   707			}
   708			// we could return true right now, but we want to go through it all, to make
   709			// sure our assumption that "same part index -> duplicate" is true, using at least
   710			// the dataSize to confirm. For a better effort, we should use DataBlobsOrigin.
   711			if z.dataSize != dataSize {
   712				panic(fmt.Sprintf("%v and %v looked like duplicates at first, but don't actually have the same dataSize. TODO: add DataBlobsOrigin checking.", firstDup, z.zipRef))
   713			}
   714			dupFound = true
   715		}
   716		return dupFound
   717	}
   719	// wholeOffsets returns the offset for each part of a file f, in order, assuming
   720	// zm are all the (wholePartIndex) ordered zip parts that constitute that file. If
   721	// zm seems to contain duplicates, they are skipped. The additional last item of
   722	// the returned slice is the sum of all the parts, i.e. the whole size of f.
   723	func wholeOffsets(zm []zipMetaInfo) []uint64 {
   724		i := 0
   725		var offsets []uint64
   726		var currentOffset uint64
   727		for _, z := range zm {
   728			if i != z.wholePartIndex {
   729				continue
   730			}
   731			offsets = append(offsets, currentOffset)
   732			currentOffset += uint64(z.dataSize)
   733			i++
   734		}
   735		// add the last computed offset to the slice, as it's useful info too: it's the
   736		// size of all the data in the zip.
   737		offsets = append(offsets, currentOffset)
   738		return offsets
   739	}
   741	func printDuplicates(zm []zipMetaInfo) {
   742		i := 0
   743		byPartIndex := make(map[int][]zipMetaInfo)
   744		for _, z := range zm {
   745			if i == z.wholePartIndex {
   746				byPartIndex[z.wholePartIndex] = []zipMetaInfo{z}
   747				i++
   748				continue
   749			}
   750			byPartIndex[z.wholePartIndex] = append(byPartIndex[z.wholePartIndex], z)
   751		}
   752		for _, zm := range byPartIndex {
   753			if len(zm) <= 1 {
   754				continue
   755			}
   756			br := make([]blob.Ref, 0, len(zm))
   757			for _, z := range zm {
   758				br = append(br, z.zipRef)
   759			}
   760			log.Printf("zip blobs with same data contents: %v", br)
   761		}
   762	}
   764	func (s *storage) anyMeta() (v bool) {
   765		// TODO: we only care about getting 1 row, but the
   766		// sorted.KeyValue interface doesn't let us give it that
   767		// hint. Care?
   768		sorted.Foreach(s.meta, func(_, _ string) error {
   769			v = true
   770			return errors.New("stop")
   771		})
   772		return
   773	}
   775	func (s *storage) anyZipPacks() (v bool) {
   776		ctx, cancel := context.WithCancel(context.TODO())
   777		defer cancel()
   778		dest := make(chan blob.SizedRef, 1)
   779		if err := s.large.EnumerateBlobs(ctx, dest, "", 1); err != nil {
   780			// Not a great interface in general, but only needed
   781			// by the start-up check for now, where it doesn't
   782			// really matter.
   783			return false
   784		}
   785		_, ok := <-dest
   786		return ok
   787	}
   789	func (s *storage) Close() error {
   790		return s.meta.Close()
   791	}
   793	func (s *storage) StorageGeneration() (initTime time.Time, random string, err error) {
   794		sgen, sok := s.small.(blobserver.Generationer)
   795		lgen, lok := s.large.(blobserver.Generationer)
   796		if !sok || !lok {
   797			return time.Time{}, "", blobserver.GenerationNotSupportedError("underlying storage engines don't support Generationer")
   798		}
   799		st, srand, err := sgen.StorageGeneration()
   800		if err != nil {
   801			return
   802		}
   803		lt, lrand, err := lgen.StorageGeneration()
   804		if err != nil {
   805			return
   806		}
   807		hash := sha1.New()
   808		io.WriteString(hash, srand)
   809		io.WriteString(hash, lrand)
   810		maxTime := func(a, b time.Time) time.Time {
   811			if a.After(b) {
   812				return a
   813			}
   814			return b
   815		}
   816		return maxTime(lt, st), fmt.Sprintf("%x", hash.Sum(nil)), nil
   817	}
   819	func (s *storage) ResetStorageGeneration() error {
   820		var retErr error
   821		for _, st := range []blobserver.Storage{s.small, s.large} {
   822			if g, ok := st.(blobserver.Generationer); ok {
   823				if err := g.ResetStorageGeneration(); err != nil {
   824					retErr = err
   825				}
   826			}
   827		}
   828		return retErr
   829	}
   831	type meta struct {
   832		exists   bool
   833		size     uint32
   834		largeRef blob.Ref // if invalid, then on small if exists
   835		largeOff uint32
   836	}
   838	func (m *meta) isPacked() bool { return m.largeRef.Valid() }
   840	// if not found, err == nil.
   841	func (s *storage) getMetaRow(br blob.Ref) (meta, error) {
   842		v, err := s.meta.Get(blobMetaPrefix + br.String())
   843		if err == sorted.ErrNotFound {
   844			return meta{}, nil
   845		}
   846		if err != nil {
   847			return meta{}, fmt.Errorf("blobpacked.getMetaRow(%v) = %v", br, err)
   848		}
   849		return parseMetaRow([]byte(v))
   850	}
   852	var singleSpace = []byte{' '}
   854	// parses:
   855	// "<size_u32> <big-blobref> <big-offset>"
   856	func parseMetaRow(v []byte) (m meta, err error) {
   857		row := v
   858		sp := bytes.IndexByte(v, ' ')
   859		if sp < 1 || sp == len(v)-1 {
   860			return meta{}, fmt.Errorf("invalid metarow %q", v)
   861		}
   862		m.exists = true
   863		size, err := strutil.ParseUintBytes(v[:sp], 10, 32)
   864		if err != nil {
   865			return meta{}, fmt.Errorf("invalid metarow size %q", v)
   866		}
   867		m.size = uint32(size)
   868		v = v[sp+1:]
   870		// remains: "<big-blobref> <big-offset>"
   871		if bytes.Count(v, singleSpace) != 1 {
   872			return meta{}, fmt.Errorf("invalid metarow %q: wrong number of spaces", row)
   873		}
   874		sp = bytes.IndexByte(v, ' ')
   875		largeRef, ok := blob.ParseBytes(v[:sp])
   876		if !ok {
   877			return meta{}, fmt.Errorf("invalid metarow %q: bad blobref %q", row, v[:sp])
   878		}
   879		m.largeRef = largeRef
   880		off, err := strutil.ParseUintBytes(v[sp+1:], 10, 32)
   881		if err != nil {
   882			return meta{}, fmt.Errorf("invalid metarow %q: bad offset: %v", row, err)
   883		}
   884		m.largeOff = uint32(off)
   885		return m, nil
   886	}
   888	func parseMetaRowSizeOnly(v []byte) (size uint32, err error) {
   889		sp := bytes.IndexByte(v, ' ')
   890		if sp < 1 || sp == len(v)-1 {
   891			return 0, fmt.Errorf("invalid metarow %q", v)
   892		}
   893		size64, err := strutil.ParseUintBytes(v[:sp], 10, 32)
   894		if err != nil {
   895			return 0, fmt.Errorf("invalid metarow size %q", v)
   896		}
   897		return uint32(size64), nil
   898	}
   900	// parses:
   901	// "<zip_size_u32> <whole_ref_from_zip_manifest> <whole_size_u64> <zip_data_offset_in_whole_u64> <zip_data_bytes_u32>"
   902	func parseZipMetaRow(v []byte) (m zipMetaInfo, err error) {
   903		row := v
   904		sp := bytes.IndexByte(v, ' ')
   905		if sp < 1 || sp == len(v)-1 {
   906			return zipMetaInfo{}, fmt.Errorf("invalid z: meta row %q", row)
   907		}
   908		if bytes.Count(v, singleSpace) != 4 {
   909			return zipMetaInfo{}, fmt.Errorf("wrong number of spaces in z: meta row %q", row)
   910		}
   911		zipSize, err := strutil.ParseUintBytes(v[:sp], 10, 32)
   912		if err != nil {
   913			return zipMetaInfo{}, fmt.Errorf("invalid zipSize %q in z: meta row: %q", v[:sp], row)
   914		}
   915		m.zipSize = uint32(zipSize)
   917		v = v[sp+1:]
   918		sp = bytes.IndexByte(v, ' ')
   919		wholeRef, ok := blob.ParseBytes(v[:sp])
   920		if !ok {
   921			return zipMetaInfo{}, fmt.Errorf("invalid wholeRef %q in z: meta row: %q", v[:sp], row)
   922		}
   923		m.wholeRef = wholeRef
   925		v = v[sp+1:]
   926		sp = bytes.IndexByte(v, ' ')
   927		wholeSize, err := strutil.ParseUintBytes(v[:sp], 10, 64)
   928		if err != nil {
   929			return zipMetaInfo{}, fmt.Errorf("invalid wholeSize %q in z: meta row: %q", v[:sp], row)
   930		}
   931		m.wholeSize = uint64(wholeSize)
   933		v = v[sp+1:]
   934		sp = bytes.IndexByte(v, ' ')
   935		if _, err := strutil.ParseUintBytes(v[:sp], 10, 64); err != nil {
   936			return zipMetaInfo{}, fmt.Errorf("invalid offset %q in z: meta row: %q", v[:sp], row)
   937		}
   939		v = v[sp+1:]
   940		dataSize, err := strutil.ParseUintBytes(v, 10, 32)
   941		if err != nil {
   942			return zipMetaInfo{}, fmt.Errorf("invalid dataSize %q in z: meta row: %q", v, row)
   943		}
   944		m.dataSize = uint32(dataSize)
   946		return m, nil
   947	}
   949	func (s *storage) ReceiveBlob(ctx context.Context, br blob.Ref, source io.Reader) (sb blob.SizedRef, err error) {
   950		buf := pools.BytesBuffer()
   951		defer pools.PutBuffer(buf)
   953		if _, err := io.Copy(buf, source); err != nil {
   954			return sb, err
   955		}
   956		size := uint32(buf.Len())
   957		isFile := false
   958		fileBlob, err := schema.BlobFromReader(br, bytes.NewReader(buf.Bytes()))
   959		if err == nil && fileBlob.Type() == schema.TypeFile {
   960			isFile = true
   961		}
   962		meta, err := s.getMetaRow(br)
   963		if err != nil {
   964			return sb, err
   965		}
   966		if meta.exists {
   967			sb = blob.SizedRef{Size: size, Ref: br}
   968		} else {
   969			sb, err = s.small.ReceiveBlob(ctx, br, buf)
   970			if err != nil {
   971				return sb, err
   972			}
   973		}
   974		if !isFile || meta.isPacked() || fileBlob.PartsSize() < packThreshold {
   975			return sb, nil
   976		}
   978		// Pack the blob.
   979		s.packGate.Start()
   980		defer s.packGate.Done()
   981		// We ignore the return value from packFile since we can't
   982		// really recover. At least be happy that we have all the
   983		// data on 'small' already. packFile will log at least.
   984		s.packFile(ctx, br)
   985		return sb, nil
   986	}
   988	func (s *storage) Fetch(ctx context.Context, br blob.Ref) (io.ReadCloser, uint32, error) {
   989		m, err := s.getMetaRow(br)
   990		if err != nil {
   991			return nil, 0, err
   992		}
   993		if !m.exists || !m.isPacked() {
   994			return s.small.Fetch(ctx, br)
   995		}
   996		rc, err := s.large.SubFetch(ctx, m.largeRef, int64(m.largeOff), int64(m.size))
   997		if err != nil {
   998			return nil, 0, err
   999		}
  1000		return rc, m.size, nil
  1001	}
  1003	const removeLookups = 50 // arbitrary
  1005	func (s *storage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
  1006		// Plan:
  1007		//  -- delete from small (if it's there)
  1008		//  -- if in big, update the meta index to note that it's there, but deleted.
  1009		//  -- fetch big's zip file (constructed from a ReaderAt that is all dummy zeros +
  1010		//     the zip's TOC only, relying on big being a SubFetcher, and keeping info in
  1011		//     the meta about the offset of the TOC+total size of each big's zip)
  1012		//  -- iterate over the zip's blobs (at some point). If all are marked deleted, actually RemoveBlob
  1013		//     on big to delete the full zip and then delete all the meta rows.
  1014		var (
  1015			mu       sync.Mutex
  1016			unpacked []blob.Ref
  1017			packed   []blob.Ref
  1018			large    = map[blob.Ref]bool{} // the large blobs that packed are in
  1019		)
  1020		var grp syncutil.Group
  1021		delGate := syncutil.NewGate(removeLookups)
  1022		for _, br := range blobs {
  1023			br := br
  1024			delGate.Start()
  1025			grp.Go(func() error {
  1026				defer delGate.Done()
  1027				m, err := s.getMetaRow(br)
  1028				if err != nil {
  1029					return err
  1030				}
  1031				mu.Lock()
  1032				defer mu.Unlock()
  1033				if m.isPacked() {
  1034					packed = append(packed, br)
  1035					large[m.largeRef] = true
  1036				} else {
  1037					unpacked = append(unpacked, br)
  1038				}
  1039				return nil
  1040			})
  1041		}
  1042		if err := grp.Err(); err != nil {
  1043			return err
  1044		}
  1045		if len(unpacked) > 0 {
  1046			grp.Go(func() error {
  1047				return s.small.RemoveBlobs(ctx, unpacked)
  1048			})
  1049		}
  1050		if len(packed) > 0 {
  1051			grp.Go(func() error {
  1052				bm := s.meta.BeginBatch()
  1053				now := time.Now()
  1054				for zipRef := range large {
  1055					bm.Set("d:"+zipRef.String(), fmt.Sprint(now.Unix()))
  1056				}
  1057				for _, br := range packed {
  1058					bm.Delete("b:" + br.String())
  1059				}
  1060				return s.meta.CommitBatch(bm)
  1061			})
  1062		}
  1063		return grp.Err()
  1064	}
  1066	var statGate = syncutil.NewGate(50) // arbitrary
  1068	func (s *storage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
  1069		var (
  1070			trySmallMu sync.Mutex
  1071			trySmall   []blob.Ref
  1072		)
  1074		err := blobserver.StatBlobsParallelHelper(ctx, blobs, fn, statGate, func(br blob.Ref) (sb blob.SizedRef, err error) {
  1075			m, err := s.getMetaRow(br)
  1076			if err != nil {
  1077				return sb, err
  1078			}
  1079			if m.exists {
  1080				return blob.SizedRef{Ref: br, Size: m.size}, nil
  1081			}
  1082			// Try it in round two against the small blobs:
  1083			trySmallMu.Lock()
  1084			trySmall = append(trySmall, br)
  1085			trySmallMu.Unlock()
  1086			return sb, nil
  1087		})
  1088		if err != nil {
  1089			return err
  1090		}
  1091		if len(trySmall) == 0 {
  1092			return nil
  1093		}
  1094		return s.small.StatBlobs(ctx, trySmall, fn)
  1095	}
  1097	func (s *storage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) (err error) {
  1098		return blobserver.MergedEnumerate(ctx, dest, []blobserver.BlobEnumerator{
  1099			s.small,
  1100			enumerator{s},
  1101		}, after, limit)
  1102	}
  1104	// enumerator implements EnumerateBlobs.
  1105	type enumerator struct {
  1106		*storage
  1107	}
  1109	func (s enumerator) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) (err error) {
  1110		defer close(dest)
  1111		t := s.meta.Find(blobMetaPrefix+after, blobMetaPrefixLimit)
  1112		defer func() {
  1113			closeErr := t.Close()
  1114			if err == nil {
  1115				err = closeErr
  1116			}
  1117		}()
  1118		n := 0
  1119		afterb := []byte(after)
  1120		for n < limit && t.Next() {
  1121			key := t.KeyBytes()[len(blobMetaPrefix):]
  1122			if n == 0 && bytes.Equal(key, afterb) {
  1123				continue
  1124			}
  1125			n++
  1126			br, ok := blob.ParseBytes(key)
  1127			if !ok {
  1128				return fmt.Errorf("unknown key %q in meta index", t.Key())
  1129			}
  1130			size, err := parseMetaRowSizeOnly(t.ValueBytes())
  1131			if err != nil {
  1132				return err
  1133			}
  1134			select {
  1135			case <-ctx.Done():
  1136				return ctx.Err()
  1137			case dest <- blob.SizedRef{Ref: br, Size: size}:
  1138			}
  1139		}
  1140		return nil
  1141	}
  1143	func (s *storage) packFile(ctx context.Context, fileRef blob.Ref) (err error) {
  1144		s.Logf("Packing file %s ...", fileRef)
  1145		defer func() {
  1146			if err == nil {
  1147				s.Logf("Packed file %s", fileRef)
  1148			} else {
  1149				s.Logf("Error packing file %s: %v", fileRef, err)
  1150			}
  1151		}()
  1153		fr, err := schema.NewFileReader(ctx, s, fileRef)
  1154		if err != nil {
  1155			return err
  1156		}
  1157		return newPacker(s, fileRef, fr).pack(ctx)
  1158	}
  1160	func newPacker(s *storage, fileRef blob.Ref, fr *schema.FileReader) *packer {
  1161		return &packer{
  1162			s:            s,
  1163			fileRef:      fileRef,
  1164			fr:           fr,
  1165			dataSize:     map[blob.Ref]uint32{},
  1166			schemaBlob:   map[blob.Ref]*blob.Blob{},
  1167			schemaParent: map[blob.Ref][]blob.Ref{},
  1168		}
  1169	}
  1171	// A packer writes a file out
  1172	type packer struct {
  1173		s       *storage
  1174		fileRef blob.Ref
  1175		fr      *schema.FileReader
  1177		wholeRef  blob.Ref
  1178		wholeSize int64
  1180		dataRefs []blob.Ref // in order
  1181		dataSize map[blob.Ref]uint32
  1183		schemaRefs   []blob.Ref // in order, but irrelevant
  1184		schemaBlob   map[blob.Ref]*blob.Blob
  1185		schemaParent map[blob.Ref][]blob.Ref // data blob -> its parent/ancestor schema blob(s), all the way up to fileRef included
  1187		chunksRemain      []blob.Ref
  1188		zips              []writtenZip
  1189		wholeBytesWritten int64 // sum of zips.dataRefs.size
  1190	}
  1192	type writtenZip struct {
  1193		blob.SizedRef
  1194		dataRefs []blob.Ref
  1195	}
  1197	var (
  1198		testHookSawTruncate           func(blob.Ref)
  1199		testHookStopBeforeOverflowing func()
  1200	)
  1202	func (pk *packer) pack(ctx context.Context) error {
  1203		if err := pk.scanChunks(ctx); err != nil {
  1204			return err
  1205		}
  1207		// TODO: decide as a function of schemaRefs and dataRefs
  1208		// already in s.large whether it makes sense to still compact
  1209		// this from a savings standpoint. For now we just always do.
  1210		// Maybe we'd have knobs in the future. Ideally not.
  1212		// Don't pack a file if we already have its wholeref stored
  1213		// otherwise (perhaps under a different filename). But that
  1214		// means we have to compute its wholeref first. We assume the
  1215		// blob source will cache these lookups so it's not too
  1216		// expensive to do two passes over the input.
  1217		h := blob.NewHash()
  1218		var err error
  1219		pk.wholeSize, err = io.Copy(h, pk.fr)
  1220		if err != nil {
  1221			return err
  1222		}
  1223		pk.wholeRef = blob.RefFromHash(h)
  1224		wholeKey := wholeMetaPrefix + pk.wholeRef.String()
  1225		_, err = pk.s.meta.Get(wholeKey)
  1226		if err == nil {
  1227			// Nil error means there was some knowledge of this wholeref.
  1228			return fmt.Errorf("already have wholeref %v packed; not packing again", pk.wholeRef)
  1229		} else if err != sorted.ErrNotFound {
  1230			return err
  1231		}
  1233		pk.chunksRemain = pk.dataRefs
  1234		var trunc blob.Ref
  1235	MakingZips:
  1236		for len(pk.chunksRemain) > 0 {
  1237			if err := pk.writeAZip(ctx, trunc); err != nil {
  1238				if needTrunc, ok := err.(needsTruncatedAfterError); ok {
  1239					trunc = needTrunc.Ref
  1240					if fn := testHookSawTruncate; fn != nil {
  1241						fn(trunc)
  1242					}
  1243					continue MakingZips
  1244				}
  1245				return err
  1246			}
  1247			trunc = blob.Ref{}
  1248		}
  1250		// Record the final wholeMetaPrefix record:
  1251		err = pk.s.meta.Set(wholeKey, fmt.Sprintf("%d %d", pk.wholeSize, len(pk.zips)))
  1252		if err != nil {
  1253			return fmt.Errorf("Error setting %s: %v", wholeKey, err)
  1254		}
  1256		return nil
  1257	}
  1259	func (pk *packer) scanChunks(ctx context.Context) error {
  1260		schemaSeen := map[blob.Ref]bool{}
  1261		return pk.fr.ForeachChunk(ctx, func(schemaPath []blob.Ref, p schema.BytesPart) error {
  1262			if !p.BlobRef.Valid() {
  1263				return errors.New("sparse files are not packed")
  1264			}
  1265			if p.Offset != 0 {
  1266				// TODO: maybe care about this later, if we ever start making
  1267				// these sorts of files.
  1268				return errors.New("file uses complicated schema. not packing")
  1269			}
  1270			pk.schemaParent[p.BlobRef] = append([]blob.Ref(nil), schemaPath...) // clone it
  1271			pk.dataSize[p.BlobRef] = uint32(p.Size)
  1272			for _, schemaRef := range schemaPath {
  1273				if schemaSeen[schemaRef] {
  1274					continue
  1275				}
  1276				schemaSeen[schemaRef] = true
  1277				pk.schemaRefs = append(pk.schemaRefs, schemaRef)
  1278				if b, err := blob.FromFetcher(ctx, pk.s, schemaRef); err != nil {
  1279					return err
  1280				} else {
  1281					pk.schemaBlob[schemaRef] = b
  1282				}
  1283			}
  1284			pk.dataRefs = append(pk.dataRefs, p.BlobRef)
  1285			return nil
  1286		})
  1287	}
  1289	// needsTruncatedAfterError is returned by writeAZip if it failed in its estimation and the zip file
  1290	// was over the 16MB (or whatever) max blob size limit. In this case the caller tries again
  1291	type needsTruncatedAfterError struct{ blob.Ref }
  1293	func (e needsTruncatedAfterError) Error() string { return "needs truncation after " + e.Ref.String() }
  1295	// check should only be used for things which really shouldn't ever happen, but should
  1296	// still be checked. If there is interesting logic in the 'else', then don't use this.
  1297	func check(err error) {
  1298		if err != nil {
  1299			b := make([]byte, 2<<10)
  1300			b = b[:runtime.Stack(b, false)]
  1301			log.Printf("Unlikely error condition triggered: %v at %s", err, b)
  1302			panic(err)
  1303		}
  1304	}
  1306	// trunc is a hint about which blob to truncate after. It may be zero.
  1307	// If the returned error is of type 'needsTruncatedAfterError', then
  1308	// the zip should be attempted to be written again, but truncating the
  1309	// data after the listed blob.
  1310	func (pk *packer) writeAZip(ctx context.Context, trunc blob.Ref) (err error) {
  1311		defer func() {
  1312			if e := recover(); e != nil {
  1313				if v, ok := e.(error); ok && err == nil {
  1314					err = v
  1315				} else {
  1316					panic(e)
  1317				}
  1318			}
  1319		}()
  1320		mf := Manifest{
  1321			WholeRef:       pk.wholeRef,
  1322			WholeSize:      pk.wholeSize,
  1323			WholePartIndex: len(pk.zips),
  1324		}
  1325		var zbuf bytes.Buffer
  1326		cw := &countWriter{w: &zbuf}
  1327		zw := zip.NewWriter(cw)
  1329		var approxSize = zipFixedOverhead // can't use zbuf.Len because zw buffers
  1330		var dataRefsWritten []blob.Ref
  1331		var dataBytesWritten int64
  1332		var schemaBlobSeen = map[blob.Ref]bool{}
  1333		var schemaBlobs []blob.Ref // to add after the main file
  1335		baseFileName := pk.fr.FileName()
  1336		if strings.Contains(baseFileName, "/") || strings.Contains(baseFileName, "\\") {
  1337			return fmt.Errorf("File schema blob %v filename had a slash in it: %q", pk.fr.SchemaBlobRef(), baseFileName)
  1338		}
  1339		fh := &zip.FileHeader{
  1340			Name:   baseFileName,
  1341			Method: zip.Store, // uncompressed
  1342		}
  1343		fh.SetModTime(pk.fr.ModTime())
  1344		fh.SetMode(0644)
  1345		fw, err := zw.CreateHeader(fh)
  1346		check(err)
  1347		check(zw.Flush())
  1348		dataStart := cw.n
  1349		approxSize += zipPerEntryOverhead // for the first FileHeader w/ the data
  1351		zipMax := pk.s.maxZipBlobSize()
  1352		chunks := pk.chunksRemain
  1353		chunkWholeHash := blob.NewHash()
  1354		for len(chunks) > 0 {
  1355			dr := chunks[0] // the next chunk to maybe write
  1357			if trunc.Valid() && trunc == dr {
  1358				if approxSize == 0 {
  1359					return errors.New("first blob is too large to pack, once you add the zip overhead")
  1360				}
  1361				break
  1362			}
  1364			schemaBlobsSave := schemaBlobs
  1365			for _, parent := range pk.schemaParent[dr] {
  1366				if !schemaBlobSeen[parent] {
  1367					schemaBlobSeen[parent] = true
  1368					schemaBlobs = append(schemaBlobs, parent)
  1369					approxSize += int(pk.schemaBlob[parent].Size()) + zipPerEntryOverhead
  1370				}
  1371			}
  1373			thisSize := pk.dataSize[dr]
  1374			approxSize += int(thisSize)
  1375			if approxSize+mf.approxSerializedSize() > zipMax {
  1376				if fn := testHookStopBeforeOverflowing; fn != nil {
  1377					fn()
  1378				}
  1379				schemaBlobs = schemaBlobsSave // restore it
  1380				break
  1381			}
  1383			// Copy the data to the zip.
  1384			rc, size, err := pk.s.Fetch(ctx, dr)
  1385			check(err)
  1386			if size != thisSize {
  1387				rc.Close()
  1388				return errors.New("unexpected size")
  1389			}
  1390			if n, err := io.Copy(io.MultiWriter(fw, chunkWholeHash), rc); err != nil || n != int64(size) {
  1391				rc.Close()
  1392				return fmt.Errorf("copy to zip = %v, %v; want %v bytes", n, err, size)
  1393			}
  1394			rc.Close()
  1396			dataRefsWritten = append(dataRefsWritten, dr)
  1397			dataBytesWritten += int64(size)
  1398			chunks = chunks[1:]
  1399		}
  1400		mf.DataBlobsOrigin = blob.RefFromHash(chunkWholeHash)
  1402		// zipBlobs is where a schema or data blob is relative to the beginning
  1403		// of the zip file.
  1404		var zipBlobs []BlobAndPos
  1406		var dataOffset int64
  1407		for _, br := range dataRefsWritten {
  1408			size := pk.dataSize[br]
  1409			mf.DataBlobs = append(mf.DataBlobs, BlobAndPos{blob.SizedRef{Ref: br, Size: size}, dataOffset})
  1411			zipBlobs = append(zipBlobs, BlobAndPos{blob.SizedRef{Ref: br, Size: size}, dataStart + dataOffset})
  1412			dataOffset += int64(size)
  1413		}
  1415		for _, br := range schemaBlobs {
  1416			fw, err := zw.CreateHeader(&zip.FileHeader{
  1417				Name:   "camlistore/" + br.String() + ".json",
  1418				Method: zip.Store, // uncompressed
  1419			})
  1420			check(err)
  1421			check(zw.Flush())
  1422			b := pk.schemaBlob[br]
  1423			zipBlobs = append(zipBlobs, BlobAndPos{blob.SizedRef{Ref: br, Size: b.Size()}, cw.n})
  1424			r, err := b.ReadAll(ctx)
  1425			if err != nil {
  1426				return err
  1427			}
  1428			n, err := io.Copy(fw, r)
  1430			check(err)
  1431			if n != int64(b.Size()) {
  1432				return fmt.Errorf("failed to write all of schema blob %v: %d bytes, not wanted %d", br, n, b.Size())
  1433			}
  1434		}
  1436		// Manifest file
  1437		fw, err = zw.Create(zipManifestPath)
  1438		check(err)
  1439		enc, err := json.MarshalIndent(mf, "", "  ")
  1440		check(err)
  1441		_, err = fw.Write(enc)
  1442		check(err)
  1443		err = zw.Close()
  1444		check(err)
  1446		if zbuf.Len() > zipMax {
  1447			// We guessed wrong. Back up. Find out how many blobs we went over.
  1448			overage := zbuf.Len() - zipMax
  1449			for i := len(dataRefsWritten) - 1; i >= 0; i-- {
  1450				dr := dataRefsWritten[i]
  1451				if overage <= 0 {
  1452					return needsTruncatedAfterError{dr}
  1453				}
  1454				overage -= int(pk.dataSize[dr])
  1455			}
  1456			return errors.New("file is unpackable; first blob is too big to fit")
  1457		}
  1459		zipRef := blob.RefFromBytes(zbuf.Bytes())
  1460		zipSB, err := blobserver.ReceiveNoHash(ctx, pk.s.large, zipRef, bytes.NewReader(zbuf.Bytes()))
  1461		if err != nil {
  1462			return err
  1463		}
  1465		bm := pk.s.meta.BeginBatch()
  1466		bm.Set(fmt.Sprintf("%s%s:%d", wholeMetaPrefix, pk.wholeRef, len(pk.zips)),
  1467			fmt.Sprintf("%s %d %d %d",
  1468				zipRef,
  1469				dataStart,
  1470				pk.wholeBytesWritten,
  1471				dataBytesWritten))
  1472		bm.Set(fmt.Sprintf("%s%v", zipMetaPrefix, zipRef),
  1473			fmt.Sprintf("%d %v %d %d %d",
  1474				zipSB.Size,
  1475				pk.wholeRef,
  1476				pk.wholeSize,
  1477				pk.wholeBytesWritten,
  1478				dataBytesWritten))
  1480		pk.wholeBytesWritten += dataBytesWritten
  1481		pk.zips = append(pk.zips, writtenZip{
  1482			SizedRef: zipSB,
  1483			dataRefs: dataRefsWritten,
  1484		})
  1486		for _, zb := range zipBlobs {
  1487			bm.Set(blobMetaPrefix+zb.Ref.String(), fmt.Sprintf("%d %v %d", zb.Size, zipRef, zb.Offset))
  1488		}
  1489		if err := pk.s.meta.CommitBatch(bm); err != nil {
  1490			return err
  1491		}
  1493		// Delete from small
  1494		if !pk.s.skipDelete {
  1495			toDelete := make([]blob.Ref, 0, len(dataRefsWritten)+len(schemaBlobs))
  1496			toDelete = append(toDelete, dataRefsWritten...)
  1497			toDelete = append(toDelete, schemaBlobs...)
  1498			if err := pk.s.small.RemoveBlobs(ctx, toDelete); err != nil {
  1499				// Can't really do anything about it and doesn't really matter, so
  1500				// just log for now.
  1501				pk.s.Logf("Error removing blobs from %s: %v", pk.s.small, err)
  1502			}
  1503		}
  1505		// On success, consume the chunks we wrote from pk.chunksRemain.
  1506		pk.chunksRemain = pk.chunksRemain[len(dataRefsWritten):]
  1507		return nil
  1508	}
  1510	type zipOpenError struct {
  1511		zipRef blob.Ref
  1512		err    error
  1513	}
  1515	func (ze zipOpenError) Error() string {
  1516		return fmt.Sprintf("Error opening packed zip blob %v: %v", ze.zipRef, ze.err)
  1517	}
  1519	// foreachZipBlob calls fn for each blob in the zip pack blob
  1520	// identified by zipRef.  If fn returns a non-nil error,
  1521	// foreachZipBlob stops enumerating with that error.
  1522	func (s *storage) foreachZipBlob(ctx context.Context, zipRef blob.Ref, fn func(BlobAndPos) error) error {
  1523		sb, err := blobserver.StatBlob(ctx, s.large, zipRef)
  1524		if err != nil {
  1525			return err
  1526		}
  1527		zr, err := zip.NewReader(blob.ReaderAt(ctx, s.large, zipRef), int64(sb.Size))
  1528		if err != nil {
  1529			return zipOpenError{zipRef, err}
  1530		}
  1531		var maniFile *zip.File // or nil if not found
  1532		var firstOff int64     // offset of first file (the packed data chunks)
  1533		for i, f := range zr.File {
  1534			if i == 0 {
  1535				firstOff, err = f.DataOffset()
  1536				if err != nil {
  1537					return err
  1538				}
  1539			}
  1540			if f.Name == zipManifestPath {
  1541				maniFile = f
  1542				break
  1543			}
  1544		}
  1545		if maniFile == nil {
  1546			return errors.New("no camlistore manifest file found in zip")
  1547		}
  1548		// apply fn to all the schema blobs
  1549		for _, f := range zr.File {
  1550			if !strings.HasPrefix(f.Name, "camlistore/") || f.Name == zipManifestPath ||
  1551				!strings.HasSuffix(f.Name, ".json") {
  1552				continue
  1553			}
  1554			brStr := strings.TrimSuffix(strings.TrimPrefix(f.Name, "camlistore/"), ".json")
  1555			br, ok := blob.Parse(brStr)
  1556			if ok {
  1557				off, err := f.DataOffset()
  1558				if err != nil {
  1559					return err
  1560				}
  1561				if err := fn(BlobAndPos{
  1562					SizedRef: blob.SizedRef{Ref: br, Size: uint32(f.UncompressedSize64)},
  1563					Offset:   off,
  1564				}); err != nil {
  1565					return err
  1566				}
  1567			}
  1568		}
  1569		maniRC, err := maniFile.Open()
  1570		if err != nil {
  1571			return err
  1572		}
  1573		defer maniRC.Close()
  1575		var mf Manifest
  1576		if err := json.NewDecoder(maniRC).Decode(&mf); err != nil {
  1577			return err
  1578		}
  1579		if !mf.WholeRef.Valid() || mf.WholeSize == 0 || !mf.DataBlobsOrigin.Valid() {
  1580			return errors.New("incomplete blobpack manifest JSON")
  1581		}
  1582		// apply fn to all the data blobs
  1583		for _, bap := range mf.DataBlobs {
  1584			bap.Offset += firstOff
  1585			if err := fn(bap); err != nil {
  1586				return err
  1587			}
  1588		}
  1589		return nil
  1590	}
  1592	// deleteZipPack deletes the zip pack file br, but only if that zip
  1593	// file's parts are deleted already from the meta index.
  1594	func (s *storage) deleteZipPack(ctx context.Context, br blob.Ref) error {
  1595		inUse, err := s.zipPartsInUse(ctx, br)
  1596		if err != nil {
  1597			return err
  1598		}
  1599		if len(inUse) > 0 {
  1600			return fmt.Errorf("can't delete zip pack %v: %d parts in use: %v", br, len(inUse), inUse)
  1601		}
  1602		if err := s.large.RemoveBlobs(ctx, []blob.Ref{br}); err != nil {
  1603			return err
  1604		}
  1605		return s.meta.Delete("d:" + br.String())
  1606	}
  1608	func (s *storage) zipPartsInUse(ctx context.Context, br blob.Ref) ([]blob.Ref, error) {
  1609		var (
  1610			mu    sync.Mutex
  1611			inUse []blob.Ref
  1612		)
  1613		var grp syncutil.Group
  1614		gate := syncutil.NewGate(20) // arbitrary constant
  1615		err := s.foreachZipBlob(ctx, br, func(bap BlobAndPos) error {
  1616			gate.Start()
  1617			grp.Go(func() error {
  1618				defer gate.Done()
  1619				mr, err := s.getMetaRow(bap.Ref)
  1620				if err != nil {
  1621					return err
  1622				}
  1623				if mr.isPacked() {
  1624					mu.Lock()
  1625					inUse = append(inUse, mr.largeRef)
  1626					mu.Unlock()
  1627				}
  1628				return nil
  1629			})
  1630			return nil
  1631		})
  1632		if os.IsNotExist(err) {
  1633			// An already-deleted blob from large isn't considered
  1634			// to be in-use.
  1635			return nil, nil
  1636		}
  1637		if err != nil {
  1638			return nil, err
  1639		}
  1640		if err := grp.Err(); err != nil {
  1641			return nil, err
  1642		}
  1643		return inUse, nil
  1644	}
  1646	// A BlobAndPos is a blobref, its size, and where it is located within
  1647	// a larger group of bytes.
  1648	type BlobAndPos struct {
  1649		blob.SizedRef
  1650		Offset int64 `json:"offset"`
  1651	}
  1653	// Manifest is the JSON description type representing the
  1654	// "camlistore/camlistore-pack-manifest.json" file found in a blobpack
  1655	// zip file.
  1656	type Manifest struct {
  1657		// WholeRef is the blobref of the entire file that this zip is
  1658		// either fully or partially describing.  For files under
  1659		// around 16MB, the WholeRef and DataBlobsOrigin will be
  1660		// the same.
  1661		WholeRef blob.Ref `json:"wholeRef"`
  1663		// WholeSize is the number of bytes in the original file being
  1664		// cut up.
  1665		WholeSize int64 `json:"wholeSize"`
  1667		// WholePartIndex is the chunk number (0-based) of this zip file.
  1668		// If a client has 'n' zip files with the same WholeRef whose
  1669		// WholePartIndexes are contiguous (including 0) and the sum of
  1670		// the DataBlobs equals WholeSize, the client has the entire
  1671		// original file.
  1672		WholePartIndex int `json:"wholePartIndex"`
  1674		// DataBlobsOrigin is the blobref of the contents of the first
  1675		// file in the zip pack file. This first file is the actual data,
  1676		// or a part of it, that the rest of this zip is describing or
  1677		// referencing.
  1678		DataBlobsOrigin blob.Ref `json:"dataBlobsOrigin"`
  1680		// DataBlobs describes all the logical blobs that are
  1681		// concatenated together in the first file in the zip file.
  1682		// The offsets are relative to the beginning of that first
  1683		// file, not the beginning of the zip file itself.
  1684		DataBlobs []BlobAndPos `json:"dataBlobs"`
  1685	}
  1687	// approxSerializedSize reports how big this Manifest will be
  1688	// (approximately), once encoded as JSON. This is used as a hint by
  1689	// the packer to decide when to keep trying to add blobs. If this
  1690	// number is too low, the packer backs up (at a slight performance
  1691	// cost) but is still correct. If this approximation returns too large
  1692	// of a number, it just causes multiple zip files to be created when
  1693	// the original blobs might've just barely fit.
  1694	func (mf *Manifest) approxSerializedSize() int {
  1695		// Empirically (for sha1-* blobrefs) it's 204 bytes fixed
  1696		// encoding overhead (pre-compression), and 119 bytes per
  1697		// encoded DataBlob.
  1698		// And empirically, it compresses down to 30% of its size with flate.
  1699		// So use the sha1 numbers but conseratively assume only 50% compression,
  1700		// to make up for longer sha-3 blobrefs.
  1701		return (204 + len(mf.DataBlobs)*119) / 2
  1702	}
  1704	type countWriter struct {
  1705		w io.Writer
  1706		n int64
  1707	}
  1709	func (cw *countWriter) Write(p []byte) (n int, err error) {
  1710		n, err = cw.w.Write(p)
  1711		cw.n += int64(n)
  1712		return
  1713	}
Website layout inspired by memcached.
Content by the authors.