Home Download Docs Code Community
     1	/*
     2	Copyright 2011 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	package index
    18	
    19	import (
    20		"bytes"
    21		"context"
    22		"errors"
    23		"fmt"
    24		"io"
    25		"log"
    26		"math"
    27		"os"
    28		"runtime"
    29		"sort"
    30		"strconv"
    31		"strings"
    32		"sync"
    33		"time"
    34	
    35		"perkeep.org/pkg/blob"
    36		"perkeep.org/pkg/blobserver"
    37		"perkeep.org/pkg/env"
    38		"perkeep.org/pkg/schema"
    39		"perkeep.org/pkg/sorted"
    40		"perkeep.org/pkg/types/camtypes"
    41	
    42		"go4.org/jsonconfig"
    43		"go4.org/strutil"
    44		"go4.org/types"
    45	)
    46	
    47	func init() {
    48		blobserver.RegisterStorageConstructor("index", newFromConfig)
    49	}
    50	
    51	type Index struct {
    52		*blobserver.NoImplStorage
    53		reindex   bool // whether "reindex" was set in config (likely via perkeepd flag)
    54		keepGoing bool // whether "keepGoing" was set in config (likely via perkeepd flag)
    55		s         sorted.KeyValue
    56	
    57		KeyFetcher blob.Fetcher // for verifying claims
    58	
    59		// TODO(mpl): do not init and use deletes when we have a corpus. Since corpus has its own deletes now, they are redundant.
    60	
    61		// deletes is a cache to keep track of the deletion status (deleted vs undeleted)
    62		// of the blobs in the index. It makes for faster reads than the otherwise
    63		// recursive calls on the index.
    64		deletes *deletionCache
    65	
    66		corpus *Corpus // or nil, if not being kept in memory
    67	
    68		mu sync.RWMutex // guards following
    69		//mu syncdebug.RWMutexTracker  // (when debugging)
    70	
    71		// needs maps from a blob to the missing blobs it needs to
    72		// finish indexing.
    73		needs map[blob.Ref][]blob.Ref
    74		// neededBy is the inverse of needs. The keys are missing blobs
    75		// and the value(s) are blobs waiting to be reindexed.
    76		neededBy     map[blob.Ref][]blob.Ref
    77		readyReindex map[blob.Ref]bool // set of things ready to be re-indexed
    78		// reindexWg is used to make sure that we wait for all asynchronous, out
    79		// of order, indexing to be finished, at the end of reindexing.
    80		reindexWg sync.WaitGroup
    81		// blobSource is used for fetching blobs when indexing files and other
    82		// blobs types that reference other objects.
    83		// The only write access to blobSource should be its initialization (transition
    84		// from nil to non-nil), once, and protected by mu.
    85		blobSource blobserver.FetcherEnumerator
    86	
    87		hasWiped bool // whether Wipe has been called on s. So we don't redo it in Reindex() for nothing.
    88	}
    89	
    90	func (x *Index) Lock()    { x.mu.Lock() }
    91	func (x *Index) Unlock()  { x.mu.Unlock() }
    92	func (x *Index) RLock()   { x.mu.RLock() }
    93	func (x *Index) RUnlock() { x.mu.RUnlock() }
    94	
    95	var (
    96		_ blobserver.Storage = (*Index)(nil)
    97		_ Interface          = (*Index)(nil)
    98	)
    99	
   100	func (x *Index) logf(format string, args ...interface{}) {
   101		log.Printf("index: "+format, args...)
   102	}
   103	
   104	var aboutToReindex = false
   105	
   106	// SignerRefSet is the set of all blob Refs (of different hashes) that represent
   107	// the same signer GPG identity. They are stored as strings for allocation reasons:
   108	// we favor allocating when updating SignerRefSets in the corpus over when reading
   109	// them.
   110	type SignerRefSet []string
   111	
   112	// Owner is the set of methods that identify, through their GPG key, a signer of
   113	// claims and permanodes.
   114	type Owner struct {
   115		keyID []string
   116		// blobByKeyID maps an owner GPG ID to all its owner blobs (because different hashes).
   117		// refs are stored as strings for allocation reasons.
   118		blobByKeyID map[string]SignerRefSet
   119	}
   120	
   121	// NewOwner returns an Owner that associates keyID with ref.
   122	func NewOwner(keyID string, ref blob.Ref) *Owner {
   123		return &Owner{
   124			keyID:       []string{keyID},
   125			blobByKeyID: map[string]SignerRefSet{keyID: SignerRefSet{ref.String()}},
   126		}
   127	}
   128	
   129	// KeyID returns the GPG key ID (e.g. 2931A67C26F5ABDA) of the owner. Its
   130	// signature might change when support for multiple GPG keys is introduced.
   131	func (o *Owner) KeyID() string {
   132		if o == nil || len(o.keyID) == 0 {
   133			return ""
   134		}
   135		return o.keyID[0]
   136	}
   137	
   138	// RefSet returns the set of refs that represent the same owner as keyID.
   139	func (o *Owner) RefSet(keyID string) SignerRefSet {
   140		if o == nil || len(o.blobByKeyID) == 0 {
   141			return nil
   142		}
   143		refs := o.blobByKeyID[keyID]
   144		if len(refs) == 0 {
   145			return nil
   146		}
   147		return refs
   148	}
   149	
   150	// BlobRef returns the currently recommended ref implementation of the owner GPG
   151	// key blob. Its signature might change when support for multiple hashes and/or
   152	// multiple GPG keys is introduced.
   153	func (o *Owner) BlobRef() blob.Ref {
   154		if o == nil || len(o.blobByKeyID) == 0 {
   155			return blob.Ref{}
   156		}
   157		refs := o.blobByKeyID[o.KeyID()]
   158		if len(refs) == 0 {
   159			return blob.Ref{}
   160		}
   161		ref, ok := blob.Parse(refs[0])
   162		if !ok {
   163			return blob.Ref{}
   164		}
   165		return ref
   166	}
   167	
   168	// TODO(mpl): I'm not sure there are any cases where we don't want the index to
   169	// have a blobSource, so maybe we should phase out InitBlobSource and integrate it
   170	// to New or something. But later.
   171	
   172	// InitBlobSource sets the index's blob source and starts the background
   173	// out-of-order indexing loop. It panics if the blobSource is already set.
   174	// If the index's key fetcher is nil, it is also set to the blobSource
   175	// argument.
   176	func (x *Index) InitBlobSource(blobSource blobserver.FetcherEnumerator) {
   177		x.Lock()
   178		defer x.Unlock()
   179		if x.blobSource != nil {
   180			panic("blobSource of Index already set")
   181		}
   182		x.blobSource = blobSource
   183		if x.KeyFetcher == nil {
   184			x.KeyFetcher = blobSource
   185		}
   186	}
   187	
   188	// New returns a new index using the provided key/value storage implementation.
   189	func New(s sorted.KeyValue) (*Index, error) {
   190		idx := &Index{
   191			s:            s,
   192			needs:        make(map[blob.Ref][]blob.Ref),
   193			neededBy:     make(map[blob.Ref][]blob.Ref),
   194			readyReindex: make(map[blob.Ref]bool),
   195		}
   196		if aboutToReindex {
   197			idx.deletes = newDeletionCache()
   198			return idx, nil
   199		}
   200	
   201		schemaVersion := idx.schemaVersion()
   202		switch {
   203		case schemaVersion == 0 && idx.isEmpty():
   204			// New index.
   205			err := idx.s.Set(keySchemaVersion.name, fmt.Sprint(requiredSchemaVersion))
   206			if err != nil {
   207				return nil, fmt.Errorf("Could not write index schema version %q: %v", requiredSchemaVersion, err)
   208			}
   209		case schemaVersion != requiredSchemaVersion:
   210			tip := ""
   211			if env.IsDev() {
   212				// Good signal that we're using the devcam server, so help out
   213				// the user with a more useful tip:
   214				tip = `(For the dev server, run "devcam server --wipe" to wipe both your blobs and index)`
   215			} else {
   216				if is4To5SchemaBump(schemaVersion) {
   217					return idx, errMissingWholeRef
   218				}
   219				tip = "Run 'perkeepd --reindex' (it might take awhile, but shows status). Alternative: 'camtool dbinit' (or just delete the file for a file based index), and then 'camtool sync --all'"
   220			}
   221			return nil, fmt.Errorf("index schema version is %d; required one is %d. You need to reindex. %s",
   222				schemaVersion, requiredSchemaVersion, tip)
   223		}
   224		if err := idx.initDeletesCache(); err != nil {
   225			return nil, fmt.Errorf("Could not initialize index's deletes cache: %v", err)
   226		}
   227		if err := idx.initNeededMaps(); err != nil {
   228			return nil, fmt.Errorf("Could not initialize index's missing blob maps: %v", err)
   229		}
   230		return idx, nil
   231	}
   232	
   233	func is4To5SchemaBump(schemaVersion int) bool {
   234		return schemaVersion == 4 && requiredSchemaVersion == 5
   235	}
   236	
   237	var errMissingWholeRef = errors.New("missing wholeRef field in fileInfo rows")
   238	
   239	// fixMissingWholeRef appends the wholeRef to all the keyFileInfo rows values. It should
   240	// only be called to upgrade a version 4 index schema to version 5.
   241	func (x *Index) fixMissingWholeRef(fetcher blob.Fetcher) (err error) {
   242		// We did that check from the caller, but double-check again to prevent from misuse
   243		// of that function.
   244		if x.schemaVersion() != 4 || requiredSchemaVersion != 5 {
   245			panic("fixMissingWholeRef should only be used when upgrading from v4 to v5 of the index schema")
   246		}
   247		x.logf("fixing the missing wholeRef in the fileInfo rows...")
   248		defer func() {
   249			if err != nil {
   250				x.logf("fixing the fileInfo rows failed: %v", err)
   251				return
   252			}
   253			x.logf("successfully fixed wholeRef in FileInfo rows.")
   254		}()
   255	
   256		// first build a reverted keyWholeToFileRef map, so we can get the wholeRef from the fileRef easily.
   257		fileRefToWholeRef := make(map[blob.Ref]blob.Ref)
   258		it := x.queryPrefix(keyWholeToFileRef)
   259		var keyA [3]string
   260		for it.Next() {
   261			keyPart := strutil.AppendSplitN(keyA[:0], it.Key(), "|", 3)
   262			if len(keyPart) != 3 {
   263				return fmt.Errorf("bogus keyWholeToFileRef key: got %q, wanted \"wholetofile|wholeRef|fileRef\"", it.Key())
   264			}
   265			wholeRef, ok1 := blob.Parse(keyPart[1])
   266			fileRef, ok2 := blob.Parse(keyPart[2])
   267			if !ok1 || !ok2 {
   268				return fmt.Errorf("bogus part in keyWholeToFileRef key: %q", it.Key())
   269			}
   270			fileRefToWholeRef[fileRef] = wholeRef
   271		}
   272		if err := it.Close(); err != nil {
   273			return err
   274		}
   275	
   276		var fixedEntries, missedEntries int
   277		t := time.NewTicker(5 * time.Second)
   278		defer t.Stop()
   279		// We record the mutations and set them all after the iteration because of the sqlite locking:
   280		// since BeginBatch takes a lock, and Find too, we would deadlock at queryPrefix if we
   281		// started a batch mutation before.
   282		mutations := make(map[string]string)
   283		keyPrefix := keyFileInfo.name + "|"
   284		it = x.queryPrefix(keyFileInfo)
   285		defer it.Close()
   286		var valA [3]string
   287		for it.Next() {
   288			select {
   289			case <-t.C:
   290				x.logf("recorded %d missing wholeRef that we'll try to fix, and %d that we can't fix.", fixedEntries, missedEntries)
   291			default:
   292			}
   293			br, ok := blob.ParseBytes(it.KeyBytes()[len(keyPrefix):])
   294			if !ok {
   295				return fmt.Errorf("invalid blobRef %q", it.KeyBytes()[len(keyPrefix):])
   296			}
   297			wholeRef, ok := fileRefToWholeRef[br]
   298			if !ok {
   299				missedEntries++
   300				x.logf("WARNING: wholeRef for %v not found in index. You should probably rebuild the whole index.", br)
   301				continue
   302			}
   303			valPart := strutil.AppendSplitN(valA[:0], it.Value(), "|", 3)
   304			// The old format we're fixing should be: size|filename|mimetype
   305			if len(valPart) != 3 {
   306				return fmt.Errorf("bogus keyFileInfo value: got %q, wanted \"size|filename|mimetype\"", it.Value())
   307			}
   308			size_s, filename, mimetype := valPart[0], valPart[1], urld(valPart[2])
   309			if strings.Contains(mimetype, "|") {
   310				// I think this can only happen for people migrating from a commit at least as recent as
   311				// 8229c1985079681a652cb65551b4e80a10d135aa, when wholeRef was introduced to keyFileInfo
   312				// but there was no migration code yet.
   313				// For the "production" migrations between 0.8 and 0.9, the index should not have any wholeRef
   314				// in the keyFileInfo entries. So if something goes wrong and is somehow linked to that happening,
   315				// I'd like to know about it, hence the logging.
   316				x.logf("%v: %v already has a wholeRef, not fixing it", it.Key(), it.Value())
   317				continue
   318			}
   319			size, err := strconv.Atoi(size_s)
   320			if err != nil {
   321				return fmt.Errorf("bogus size in keyFileInfo value %v: %v", it.Value(), err)
   322			}
   323			mutations[keyFileInfo.Key(br)] = keyFileInfo.Val(size, filename, mimetype, wholeRef)
   324			fixedEntries++
   325		}
   326		if err := it.Close(); err != nil {
   327			return err
   328		}
   329		x.logf("starting to commit the missing wholeRef fixes (%d entries) now, this can take a while.", fixedEntries)
   330		bm := x.s.BeginBatch()
   331		for k, v := range mutations {
   332			bm.Set(k, v)
   333		}
   334		bm.Set(keySchemaVersion.name, "5")
   335		if err := x.s.CommitBatch(bm); err != nil {
   336			return err
   337		}
   338		if missedEntries > 0 {
   339			x.logf("some missing wholeRef entries were not fixed (%d), you should do a full reindex.", missedEntries)
   340		}
   341		return nil
   342	}
   343	
   344	func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) {
   345		blobPrefix := config.RequiredString("blobSource")
   346		kvConfig := config.RequiredObject("storage")
   347		reindex := config.OptionalBool("reindex", false)
   348		keepGoing := config.OptionalBool("keepGoing", false)
   349	
   350		if err := config.Validate(); err != nil {
   351			return nil, err
   352		}
   353	
   354		kv, err := sorted.NewKeyValue(kvConfig)
   355		if err != nil {
   356			if _, ok := err.(sorted.NeedWipeError); !ok {
   357				return nil, err
   358			}
   359			if !reindex {
   360				return nil, err
   361			}
   362		}
   363		if reindex {
   364			aboutToReindex = true
   365			wiper, ok := kv.(sorted.Wiper)
   366			if !ok {
   367				return nil, fmt.Errorf("index's storage type %T doesn't support sorted.Wiper", kv)
   368			}
   369			if err := wiper.Wipe(); err != nil {
   370				return nil, fmt.Errorf("error wiping index's sorted key/value type %T: %v", kv, err)
   371			}
   372			log.Printf("Index wiped.")
   373		}
   374	
   375		sto, err := ld.GetStorage(blobPrefix)
   376		if err != nil {
   377			return nil, err
   378		}
   379	
   380		ix, err := New(kv)
   381		// TODO(mpl): next time we need to do another fix, make a new error
   382		// type that lets us apply the needed fix depending on its value or
   383		// something. For now just one value/fix.
   384		if err == errMissingWholeRef {
   385			// TODO: maybe we don't want to do that automatically. Brad says
   386			// we have to think about the case on GCE/CoreOS in particular.
   387			if err := ix.fixMissingWholeRef(sto); err != nil {
   388				ix.Close()
   389				return nil, fmt.Errorf("could not fix missing wholeRef entries: %v", err)
   390			}
   391			ix, err = New(kv)
   392		}
   393		ix.keepGoing = keepGoing
   394		ix.reindex = reindex
   395		if reindex {
   396			ix.hasWiped = true
   397		}
   398		if err != nil {
   399			return nil, err
   400		}
   401		ix.InitBlobSource(sto)
   402	
   403		if !reindex {
   404			if err := ix.integrityCheck(3 * time.Second); err != nil {
   405				return nil, err
   406			}
   407		}
   408	
   409		return ix, err
   410	}
   411	
   412	func (x *Index) String() string {
   413		return fmt.Sprintf("Perkeep index, using key/value implementation %T", x.s)
   414	}
   415	
   416	func (x *Index) isEmpty() bool {
   417		iter := x.s.Find("", "")
   418		hasRows := iter.Next()
   419		if err := iter.Close(); err != nil {
   420			panic(err)
   421		}
   422		return !hasRows
   423	}
   424	
   425	// reindexMaxProcs is the number of concurrent goroutines that will be used for reindexing.
   426	var reindexMaxProcs = struct {
   427		sync.RWMutex
   428		v int
   429	}{v: runtime.NumCPU()}
   430	
   431	// SetReindexMaxProcs sets the maximum number of concurrent goroutines that are
   432	// used during reindexing.
   433	func SetReindexMaxProcs(n int) {
   434		reindexMaxProcs.Lock()
   435		defer reindexMaxProcs.Unlock()
   436		reindexMaxProcs.v = n
   437	}
   438	
   439	// ReindexMaxProcs returns the maximum number of concurrent goroutines that are
   440	// used during reindexing.
   441	func ReindexMaxProcs() int {
   442		reindexMaxProcs.RLock()
   443		defer reindexMaxProcs.RUnlock()
   444		return reindexMaxProcs.v
   445	}
   446	
   447	func (x *Index) WantsReindex() bool   { return x.reindex }
   448	func (x *Index) WantsKeepGoing() bool { return x.keepGoing }
   449	
   450	func (x *Index) Reindex() error {
   451		x.Lock()
   452		if x.blobSource == nil {
   453			x.Unlock()
   454			return errors.New("index: can't re-index: no blobSource")
   455		}
   456		x.Unlock()
   457	
   458		reindexMaxProcs.RLock()
   459		defer reindexMaxProcs.RUnlock()
   460	
   461		ctx := context.Background()
   462	
   463		if !x.hasWiped {
   464			wiper, ok := x.s.(sorted.Wiper)
   465			if !ok {
   466				return fmt.Errorf("index's storage type %T doesn't support sorted.Wiper", x.s)
   467			}
   468			log.Printf("Wiping index storage type %T ...", x.s)
   469			if err := wiper.Wipe(); err != nil {
   470				return fmt.Errorf("error wiping index's sorted key/value type %T: %v", x.s, err)
   471			}
   472			log.Printf("Index wiped.")
   473		}
   474		log.Printf("Rebuilding index...")
   475	
   476		reindexStart, _ := blob.Parse(os.Getenv("CAMLI_REINDEX_START"))
   477	
   478		err := x.s.Set(keySchemaVersion.name, fmt.Sprintf("%d", requiredSchemaVersion))
   479		if err != nil {
   480			return err
   481		}
   482	
   483		var nerrmu sync.Mutex
   484		nerr := 0
   485	
   486		blobc := make(chan blob.Ref, 32)
   487	
   488		enumCtx := context.Background()
   489		enumErr := make(chan error, 1)
   490		go func() {
   491			defer close(blobc)
   492			donec := enumCtx.Done()
   493			var lastTick time.Time
   494			enumErr <- blobserver.EnumerateAll(enumCtx, x.blobSource, func(sb blob.SizedRef) error {
   495				now := time.Now()
   496				if lastTick.Before(now.Add(-1 * time.Second)) {
   497					log.Printf("Reindexing at %v", sb.Ref)
   498					lastTick = now
   499				}
   500				if reindexStart.Valid() && sb.Ref.Less(reindexStart) {
   501					return nil
   502				}
   503				select {
   504				case <-donec:
   505					return ctx.Err()
   506				case blobc <- sb.Ref:
   507					return nil
   508				}
   509			})
   510		}()
   511		var wg sync.WaitGroup
   512		for i := 0; i < reindexMaxProcs.v; i++ {
   513			wg.Add(1)
   514			go func() {
   515				defer wg.Done()
   516				for br := range blobc {
   517					if err := x.indexBlob(ctx, br); err != nil {
   518						log.Printf("Error reindexing %v: %v", br, err)
   519						nerrmu.Lock()
   520						nerr++
   521						nerrmu.Unlock()
   522						// TODO: flag (or default?) to stop the EnumerateAll above once
   523						// there's any error with reindexing?
   524					}
   525				}
   526			}()
   527		}
   528		if err := <-enumErr; err != nil {
   529			return err
   530		}
   531	
   532		wg.Wait()
   533		x.reindexWg.Wait()
   534	
   535		x.RLock()
   536		readyCount := len(x.readyReindex)
   537		needed := len(x.needs)
   538		var needSample bytes.Buffer
   539		if needed > 0 {
   540			n := 0
   541		Sample:
   542			for x, needs := range x.needs {
   543				for _, need := range needs {
   544					n++
   545					if n == 10 {
   546						break Sample
   547					}
   548					if n > 1 {
   549						fmt.Fprintf(&needSample, ", ")
   550					}
   551					fmt.Fprintf(&needSample, "%v needs %v", x, need)
   552				}
   553			}
   554		}
   555		x.RUnlock()
   556		if readyCount > 0 {
   557			return fmt.Errorf("%d blobs were ready to reindex in out-of-order queue, but not yet ran", readyCount)
   558		}
   559		if needed > 0 {
   560			return fmt.Errorf("%d blobs are still needed as dependencies; a sample: %s", needed, needSample.Bytes())
   561		}
   562	
   563		nerrmu.Lock() // no need to unlock
   564		if nerr != 0 {
   565			return fmt.Errorf("%d blobs failed to re-index", nerr)
   566		}
   567		if err := x.initDeletesCache(); err != nil {
   568			return err
   569		}
   570		log.Printf("Index rebuild complete.")
   571		return nil
   572	}
   573	
   574	// integrityCheck enumerates blobs through x.blobSource during timeout, and
   575	// verifies for each of them that it has a meta row in the index. It logs a message
   576	// if any of them is not found. It only returns an error if something went wrong
   577	// during the enumeration.
   578	func (x *Index) integrityCheck(timeout time.Duration) error {
   579		t0 := time.Now()
   580		x.logf("starting integrity check...")
   581		defer func() {
   582			x.logf("integrity check done (after %v)", time.Since(t0).Round(10*time.Millisecond))
   583		}()
   584		if x.blobSource == nil {
   585			return errors.New("index: can't check sanity of index: no blobSource")
   586		}
   587	
   588		// we don't actually need seen atm, but I anticipate we'll return it at some
   589		// point, so we can display the blobs that were tested/seen/missed on the web UI.
   590		seen := make([]blob.Ref, 0)
   591		notFound := make([]blob.Ref, 0)
   592		enumCtx := context.TODO()
   593		stopTime := time.NewTimer(timeout)
   594		defer stopTime.Stop()
   595		var errEOT = errors.New("time's out")
   596		if err := blobserver.EnumerateAll(enumCtx, x.blobSource, func(sb blob.SizedRef) error {
   597			select {
   598			case <-stopTime.C:
   599				return errEOT
   600			default:
   601			}
   602			if _, err := x.GetBlobMeta(enumCtx, sb.Ref); err != nil {
   603				if !os.IsNotExist(err) {
   604					return err
   605				}
   606				notFound = append(notFound, sb.Ref)
   607				return nil
   608			}
   609			seen = append(seen, sb.Ref)
   610			return nil
   611		}); err != nil && err != errEOT {
   612			return err
   613		}
   614		if len(notFound) > 0 {
   615			// TODO(mpl): at least on GCE, display that message and maybe more on a web UI page as well.
   616			x.logf("WARNING: sanity checking of the index found %d non-indexed blobs out of %d tested blobs. Reindexing is advised.", len(notFound), len(notFound)+len(seen))
   617		}
   618		return nil
   619	}
   620	
   621	func queryPrefixString(s sorted.KeyValue, prefix string) sorted.Iterator {
   622		if prefix == "" {
   623			return s.Find("", "")
   624		}
   625		lastByte := prefix[len(prefix)-1]
   626		if lastByte == 0xff {
   627			panic("unsupported query prefix ending in 0xff")
   628		}
   629		end := prefix[:len(prefix)-1] + string(lastByte+1)
   630		return s.Find(prefix, end)
   631	}
   632	
   633	func (x *Index) queryPrefixString(prefix string) sorted.Iterator {
   634		return queryPrefixString(x.s, prefix)
   635	}
   636	
   637	func queryPrefix(s sorted.KeyValue, key *keyType, args ...interface{}) sorted.Iterator {
   638		return queryPrefixString(s, key.Prefix(args...))
   639	}
   640	
   641	func (x *Index) queryPrefix(key *keyType, args ...interface{}) sorted.Iterator {
   642		return x.queryPrefixString(key.Prefix(args...))
   643	}
   644	
   645	func closeIterator(it sorted.Iterator, perr *error) {
   646		err := it.Close()
   647		if err != nil && *perr == nil {
   648			*perr = err
   649		}
   650	}
   651	
   652	// schemaVersion returns the version of schema as it is found
   653	// in the currently used index. If not found, it returns 0.
   654	func (x *Index) schemaVersion() int {
   655		schemaVersionStr, err := x.s.Get(keySchemaVersion.name)
   656		if err != nil {
   657			if err == sorted.ErrNotFound {
   658				return 0
   659			}
   660			panic(fmt.Sprintf("Could not get index schema version: %v", err))
   661		}
   662		schemaVersion, err := strconv.Atoi(schemaVersionStr)
   663		if err != nil {
   664			panic(fmt.Sprintf("Bogus index schema version: %q", schemaVersionStr))
   665		}
   666		return schemaVersion
   667	}
   668	
   669	type deletion struct {
   670		deleter blob.Ref
   671		when    time.Time
   672	}
   673	
   674	type byDeletionDate []deletion
   675	
   676	func (d byDeletionDate) Len() int           { return len(d) }
   677	func (d byDeletionDate) Swap(i, j int)      { d[i], d[j] = d[j], d[i] }
   678	func (d byDeletionDate) Less(i, j int) bool { return d[i].when.Before(d[j].when) }
   679	
   680	type deletionCache struct {
   681		sync.RWMutex
   682		m map[blob.Ref][]deletion
   683	}
   684	
   685	func newDeletionCache() *deletionCache {
   686		return &deletionCache{
   687			m: make(map[blob.Ref][]deletion),
   688		}
   689	}
   690	
   691	// initDeletesCache creates and populates the deletion status cache used by the index
   692	// for faster calls to IsDeleted and DeletedAt. It is called by New.
   693	func (x *Index) initDeletesCache() (err error) {
   694		x.deletes = newDeletionCache()
   695		it := x.queryPrefix(keyDeleted)
   696		defer closeIterator(it, &err)
   697		for it.Next() {
   698			cl, ok := kvDeleted(it.Key())
   699			if !ok {
   700				return fmt.Errorf("Bogus keyDeleted entry key: want |\"deleted\"|<deleted blobref>|<reverse claimdate>|<deleter claim>|, got %q", it.Key())
   701			}
   702			targetDeletions := append(x.deletes.m[cl.Target],
   703				deletion{
   704					deleter: cl.BlobRef,
   705					when:    cl.Date,
   706				})
   707			sort.Sort(sort.Reverse(byDeletionDate(targetDeletions)))
   708			x.deletes.m[cl.Target] = targetDeletions
   709		}
   710		return err
   711	}
   712	
   713	func kvDeleted(k string) (c camtypes.Claim, ok bool) {
   714		// TODO(bradfitz): garbage
   715		keyPart := strings.Split(k, "|")
   716		if len(keyPart) != 4 {
   717			return
   718		}
   719		if keyPart[0] != "deleted" {
   720			return
   721		}
   722		target, ok := blob.Parse(keyPart[1])
   723		if !ok {
   724			return
   725		}
   726		claimRef, ok := blob.Parse(keyPart[3])
   727		if !ok {
   728			return
   729		}
   730		date, err := time.Parse(time.RFC3339, unreverseTimeString(keyPart[2]))
   731		if err != nil {
   732			return
   733		}
   734		return camtypes.Claim{
   735			BlobRef: claimRef,
   736			Target:  target,
   737			Date:    date,
   738			Type:    string(schema.DeleteClaim),
   739		}, true
   740	}
   741	
   742	// IsDeleted reports whether the provided blobref (of a permanode or
   743	// claim) should be considered deleted.
   744	func (x *Index) IsDeleted(br blob.Ref) bool {
   745		if x.deletes == nil {
   746			// We still allow the slow path, in case someone creates
   747			// their own Index without a deletes cache.
   748			return x.isDeletedNoCache(br)
   749		}
   750		x.deletes.RLock()
   751		defer x.deletes.RUnlock()
   752		return x.isDeleted(br)
   753	}
   754	
   755	// The caller must hold x.deletes.mu for read.
   756	func (x *Index) isDeleted(br blob.Ref) bool {
   757		deletes, ok := x.deletes.m[br]
   758		if !ok {
   759			return false
   760		}
   761		for _, v := range deletes {
   762			if !x.isDeleted(v.deleter) {
   763				return true
   764			}
   765		}
   766		return false
   767	}
   768	
   769	// Used when the Index has no deletes cache (x.deletes is nil).
   770	func (x *Index) isDeletedNoCache(br blob.Ref) bool {
   771		var err error
   772		it := x.queryPrefix(keyDeleted, br)
   773		for it.Next() {
   774			cl, ok := kvDeleted(it.Key())
   775			if !ok {
   776				panic(fmt.Sprintf("Bogus keyDeleted entry key: want |\"deleted\"|<deleted blobref>|<reverse claimdate>|<deleter claim>|, got %q", it.Key()))
   777			}
   778			if !x.isDeletedNoCache(cl.BlobRef) {
   779				closeIterator(it, &err)
   780				if err != nil {
   781					// TODO: Do better?
   782					panic(fmt.Sprintf("Could not close iterator on keyDeleted: %v", err))
   783				}
   784				return true
   785			}
   786		}
   787		closeIterator(it, &err)
   788		if err != nil {
   789			// TODO: Do better?
   790			panic(fmt.Sprintf("Could not close iterator on keyDeleted: %v", err))
   791		}
   792		return false
   793	}
   794	
   795	// GetRecentPermanodes sends results to dest filtered by owner, limit, and
   796	// before.  A zero value for before will default to the current time.  The
   797	// results will have duplicates suppressed, with most recent permanode
   798	// returned.
   799	// Note, permanodes more recent than before will still be fetched from the
   800	// index then skipped. This means runtime scales linearly with the number of
   801	// nodes more recent than before.
   802	func (x *Index) GetRecentPermanodes(ctx context.Context, dest chan<- camtypes.RecentPermanode, owner blob.Ref, limit int, before time.Time) (err error) {
   803		defer close(dest)
   804	
   805		keyId, err := x.KeyId(ctx, owner)
   806		if err == sorted.ErrNotFound {
   807			x.logf("no recent permanodes because keyId for owner %v not found", owner)
   808			return nil
   809		}
   810		if err != nil {
   811			x.logf("error fetching keyId for owner %v: %v", owner, err)
   812			return err
   813		}
   814	
   815		sent := 0
   816		var seenPermanode dupSkipper
   817	
   818		if before.IsZero() {
   819			before = time.Now()
   820		}
   821		// TODO(bradfitz): handle before efficiently. don't use queryPrefix.
   822		it := x.queryPrefix(keyRecentPermanode, keyId)
   823		defer closeIterator(it, &err)
   824		for it.Next() {
   825			permaStr := it.Value()
   826			parts := strings.SplitN(it.Key(), "|", 4)
   827			if len(parts) != 4 {
   828				continue
   829			}
   830			mTime, _ := time.Parse(time.RFC3339, unreverseTimeString(parts[2]))
   831			permaRef, ok := blob.Parse(permaStr)
   832			if !ok {
   833				continue
   834			}
   835			if x.IsDeleted(permaRef) {
   836				continue
   837			}
   838			if seenPermanode.Dup(permaStr) {
   839				continue
   840			}
   841			// Skip entries with an mTime less than or equal to before.
   842			if !mTime.Before(before) {
   843				continue
   844			}
   845			dest <- camtypes.RecentPermanode{
   846				Permanode:   permaRef,
   847				Signer:      owner, // TODO(bradfitz): kinda. usually. for now.
   848				LastModTime: mTime,
   849			}
   850			sent++
   851			if sent == limit {
   852				break
   853			}
   854		}
   855		return nil
   856	}
   857	
   858	func (x *Index) AppendClaims(ctx context.Context, dst []camtypes.Claim, permaNode blob.Ref,
   859		signerFilter string,
   860		attrFilter string) ([]camtypes.Claim, error) {
   861		if x.corpus != nil {
   862			return x.corpus.AppendClaims(ctx, dst, permaNode, signerFilter, attrFilter)
   863		}
   864		var (
   865			err error
   866			it  sorted.Iterator
   867		)
   868		var signerRefs SignerRefSet
   869		if signerFilter != "" {
   870			signerRefs, err = x.signerRefs(ctx, signerFilter)
   871			if err != nil {
   872				return dst, err
   873			}
   874			if len(signerRefs) == 0 {
   875				return dst, nil
   876			}
   877			it = x.queryPrefix(keyPermanodeClaim, permaNode, signerFilter)
   878		} else {
   879			it = x.queryPrefix(keyPermanodeClaim, permaNode)
   880		}
   881		defer closeIterator(it, &err)
   882	
   883		// In the common case, an attribute filter is just a plain
   884		// token ("camliContent") unescaped. If so, fast path that
   885		// check to skip the row before we even split it.
   886		var mustHave string
   887		if attrFilter != "" && urle(attrFilter) == attrFilter {
   888			mustHave = attrFilter
   889		}
   890	
   891		for it.Next() {
   892			val := it.Value()
   893			if mustHave != "" && !strings.Contains(val, mustHave) {
   894				continue
   895			}
   896			cl, ok := kvClaim(it.Key(), val, blob.Parse)
   897			if !ok {
   898				continue
   899			}
   900			if x.IsDeleted(cl.BlobRef) {
   901				continue
   902			}
   903			if attrFilter != "" && cl.Attr != attrFilter {
   904				continue
   905			}
   906			// TODO(mpl): if we ever pass an Owner to AppendClaims, then we could have a
   907			// Matches method on it, that we would use here.
   908			if signerFilter != "" && !signerRefs.blobMatches(cl.Signer) {
   909				continue
   910			}
   911			dst = append(dst, cl)
   912		}
   913		return dst, nil
   914	}
   915	
   916	func kvClaim(k, v string, blobParse func(string) (blob.Ref, bool)) (c camtypes.Claim, ok bool) {
   917		const nKeyPart = 5
   918		const nValPart = 4
   919		var keya [nKeyPart]string
   920		var vala [nValPart]string
   921		keyPart := strutil.AppendSplitN(keya[:0], k, "|", -1)
   922		valPart := strutil.AppendSplitN(vala[:0], v, "|", -1)
   923		if len(keyPart) < nKeyPart || len(valPart) < nValPart {
   924			return
   925		}
   926		signerRef, ok := blobParse(valPart[3])
   927		if !ok {
   928			return
   929		}
   930		permaNode, ok := blobParse(keyPart[1])
   931		if !ok {
   932			return
   933		}
   934		claimRef, ok := blobParse(keyPart[4])
   935		if !ok {
   936			return
   937		}
   938		date, err := time.Parse(time.RFC3339, keyPart[3])
   939		if err != nil {
   940			return
   941		}
   942		return camtypes.Claim{
   943			BlobRef:   claimRef,
   944			Signer:    signerRef,
   945			Permanode: permaNode,
   946			Date:      date,
   947			Type:      urld(valPart[0]),
   948			Attr:      urld(valPart[1]),
   949			Value:     urld(valPart[2]),
   950		}, true
   951	}
   952	
   953	func (x *Index) GetBlobMeta(ctx context.Context, br blob.Ref) (camtypes.BlobMeta, error) {
   954		if x.corpus != nil {
   955			return x.corpus.GetBlobMeta(ctx, br)
   956		}
   957		key := "meta:" + br.String()
   958		meta, err := x.s.Get(key)
   959		if err == sorted.ErrNotFound {
   960			err = os.ErrNotExist
   961		}
   962		if err != nil {
   963			return camtypes.BlobMeta{}, err
   964		}
   965		pos := strings.Index(meta, "|")
   966		if pos < 0 {
   967			panic(fmt.Sprintf("Bogus index row for key %q: got value %q", key, meta))
   968		}
   969		size, err := strconv.ParseUint(meta[:pos], 10, 32)
   970		if err != nil {
   971			return camtypes.BlobMeta{}, err
   972		}
   973		mime := meta[pos+1:]
   974		return camtypes.BlobMeta{
   975			Ref:       br,
   976			Size:      uint32(size),
   977			CamliType: camliTypeFromMIME(mime),
   978		}, nil
   979	}
   980	
   981	// HasLegacySHA1 reports whether the index has legacy SHA-1 blobs.
   982	func (x *Index) HasLegacySHA1() (ok bool, err error) {
   983		if x.corpus != nil {
   984			return x.corpus.hasLegacySHA1, err
   985		}
   986		it := x.queryPrefix(keyWholeToFileRef, "sha1-")
   987		defer closeIterator(it, &err)
   988		for it.Next() {
   989			return true, err
   990		}
   991		return false, err
   992	}
   993	
   994	func (x *Index) KeyId(ctx context.Context, signer blob.Ref) (string, error) {
   995		if x.corpus != nil {
   996			return x.corpus.KeyId(ctx, signer)
   997		}
   998		return x.s.Get("signerkeyid:" + signer.String())
   999	}
  1000	
  1001	// signerRefs returns the set of signer blobRefs matching the signer keyID. It
  1002	// does not return an error if none is found.
  1003	func (x *Index) signerRefs(ctx context.Context, keyID string) (SignerRefSet, error) {
  1004		if x.corpus != nil {
  1005			return x.corpus.signerRefs[keyID], nil
  1006		}
  1007		it := x.queryPrefixString(keySignerKeyID.name)
  1008		var err error
  1009		var refs SignerRefSet
  1010		defer closeIterator(it, &err)
  1011		prefix := keySignerKeyID.name + ":"
  1012		for it.Next() {
  1013			if it.Value() == keyID {
  1014				refs = append(refs, strings.TrimPrefix(it.Key(), prefix))
  1015			}
  1016		}
  1017		return refs, nil
  1018	}
  1019	
  1020	func (x *Index) PermanodeOfSignerAttrValue(ctx context.Context, signer blob.Ref, attr, val string) (permaNode blob.Ref, err error) {
  1021		keyId, err := x.KeyId(ctx, signer)
  1022		if err == sorted.ErrNotFound {
  1023			return blob.Ref{}, os.ErrNotExist
  1024		}
  1025		if err != nil {
  1026			return blob.Ref{}, err
  1027		}
  1028		it := x.queryPrefix(keySignerAttrValue, keyId, attr, val)
  1029		defer closeIterator(it, &err)
  1030		for it.Next() {
  1031			permaRef, ok := blob.Parse(it.Value())
  1032			if ok && !x.IsDeleted(permaRef) {
  1033				return permaRef, nil
  1034			}
  1035		}
  1036		return blob.Ref{}, os.ErrNotExist
  1037	}
  1038	
  1039	// SearchPermanodesWithAttr is just like PermanodeOfSignerAttrValue
  1040	// except we return multiple and dup-suppress.  If request.Query is
  1041	// "", it is not used in the prefix search.
  1042	func (x *Index) SearchPermanodesWithAttr(ctx context.Context, dest chan<- blob.Ref, request *camtypes.PermanodeByAttrRequest) (err error) {
  1043		defer close(dest)
  1044		if request.FuzzyMatch {
  1045			// TODO(bradfitz): remove this for now? figure out how to handle it generically?
  1046			return errors.New("TODO: SearchPermanodesWithAttr: generic indexer doesn't support FuzzyMatch on PermanodeByAttrRequest")
  1047		}
  1048		if request.Attribute == "" {
  1049			return errors.New("index: missing Attribute in SearchPermanodesWithAttr")
  1050		}
  1051		if !IsIndexedAttribute(request.Attribute) {
  1052			return fmt.Errorf("SearchPermanodesWithAttr: called with a non-indexed attribute %q", request.Attribute)
  1053		}
  1054	
  1055		keyId, err := x.KeyId(ctx, request.Signer)
  1056		if err == sorted.ErrNotFound {
  1057			return nil
  1058		}
  1059		if err != nil {
  1060			return err
  1061		}
  1062		seen := make(map[string]bool)
  1063		var it sorted.Iterator
  1064		if request.Query == "" {
  1065			it = x.queryPrefix(keySignerAttrValue, keyId, request.Attribute)
  1066		} else {
  1067			it = x.queryPrefix(keySignerAttrValue, keyId, request.Attribute, request.Query)
  1068		}
  1069		defer closeIterator(it, &err)
  1070		before := request.At
  1071		if before.IsZero() {
  1072			before = time.Now()
  1073		}
  1074		for it.Next() {
  1075			cl, ok := kvSignerAttrValue(it.Key(), it.Value())
  1076			if !ok {
  1077				continue
  1078			}
  1079			if x.IsDeleted(cl.BlobRef) {
  1080				continue
  1081			}
  1082			if x.IsDeleted(cl.Permanode) {
  1083				continue
  1084			}
  1085			if cl.Date.After(before) {
  1086				continue
  1087			}
  1088			pnstr := cl.Permanode.String()
  1089			if seen[pnstr] {
  1090				continue
  1091			}
  1092			seen[pnstr] = true
  1093	
  1094			dest <- cl.Permanode
  1095			if len(seen) == request.MaxResults {
  1096				break
  1097			}
  1098		}
  1099		return nil
  1100	}
  1101	
  1102	func kvSignerAttrValue(k, v string) (c camtypes.Claim, ok bool) {
  1103		// TODO(bradfitz): garbage
  1104		keyPart := strings.Split(k, "|")
  1105		valPart := strings.Split(v, "|")
  1106		if len(keyPart) != 6 || len(valPart) != 1 {
  1107			// TODO(mpl): use glog
  1108			log.Printf("bogus keySignerAttrValue index entry: %q = %q", k, v)
  1109			return
  1110		}
  1111		if keyPart[0] != "signerattrvalue" {
  1112			return
  1113		}
  1114		date, err := time.Parse(time.RFC3339, unreverseTimeString(keyPart[4]))
  1115		if err != nil {
  1116			log.Printf("bogus time in keySignerAttrValue index entry: %q", keyPart[4])
  1117			return
  1118		}
  1119		claimRef, ok := blob.Parse(keyPart[5])
  1120		if !ok {
  1121			log.Printf("bogus claim in keySignerAttrValue index entry: %q", keyPart[5])
  1122			return
  1123		}
  1124		permaNode, ok := blob.Parse(valPart[0])
  1125		if !ok {
  1126			log.Printf("bogus permanode in keySignerAttrValue index entry: %q", valPart[0])
  1127			return
  1128		}
  1129		return camtypes.Claim{
  1130			BlobRef:   claimRef,
  1131			Permanode: permaNode,
  1132			Date:      date,
  1133			Attr:      urld(keyPart[2]),
  1134			Value:     urld(keyPart[3]),
  1135		}, true
  1136	}
  1137	
  1138	func (x *Index) PathsOfSignerTarget(ctx context.Context, signer, target blob.Ref) (paths []*camtypes.Path, err error) {
  1139		paths = []*camtypes.Path{}
  1140		keyId, err := x.KeyId(ctx, signer)
  1141		if err != nil {
  1142			if err == sorted.ErrNotFound {
  1143				err = nil
  1144			}
  1145			return
  1146		}
  1147	
  1148		mostRecent := make(map[string]*camtypes.Path)
  1149		maxClaimDates := make(map[string]time.Time)
  1150	
  1151		it := x.queryPrefix(keyPathBackward, keyId, target)
  1152		defer closeIterator(it, &err)
  1153		for it.Next() {
  1154			p, ok, active := kvPathBackward(it.Key(), it.Value())
  1155			if !ok {
  1156				continue
  1157			}
  1158			if x.IsDeleted(p.Claim) {
  1159				continue
  1160			}
  1161			if x.IsDeleted(p.Base) {
  1162				continue
  1163			}
  1164	
  1165			key := p.Base.String() + "/" + p.Suffix
  1166			if p.ClaimDate.After(maxClaimDates[key]) {
  1167				maxClaimDates[key] = p.ClaimDate
  1168				if active {
  1169					mostRecent[key] = &p
  1170				} else {
  1171					delete(mostRecent, key)
  1172				}
  1173			}
  1174		}
  1175		for _, v := range mostRecent {
  1176			paths = append(paths, v)
  1177		}
  1178		return paths, nil
  1179	}
  1180	
  1181	func kvPathBackward(k, v string) (p camtypes.Path, ok bool, active bool) {
  1182		// TODO(bradfitz): garbage
  1183		keyPart := strings.Split(k, "|")
  1184		valPart := strings.Split(v, "|")
  1185		if len(keyPart) != 4 || len(valPart) != 4 {
  1186			// TODO(mpl): use glog
  1187			log.Printf("bogus keyPathBackward index entry: %q = %q", k, v)
  1188			return
  1189		}
  1190		if keyPart[0] != "signertargetpath" {
  1191			return
  1192		}
  1193		target, ok := blob.Parse(keyPart[2])
  1194		if !ok {
  1195			log.Printf("bogus target in keyPathBackward index entry: %q", keyPart[2])
  1196			return
  1197		}
  1198		claim, ok := blob.Parse(keyPart[3])
  1199		if !ok {
  1200			log.Printf("bogus claim in keyPathBackward index entry: %q", keyPart[3])
  1201			return
  1202		}
  1203		date, err := time.Parse(time.RFC3339, valPart[0])
  1204		if err != nil {
  1205			log.Printf("bogus date in keyPathBackward index entry: %q", valPart[0])
  1206			return
  1207		}
  1208		base, ok := blob.Parse(valPart[1])
  1209		if !ok {
  1210			log.Printf("bogus base in keyPathBackward index entry: %q", valPart[1])
  1211			return
  1212		}
  1213		if valPart[2] == "Y" {
  1214			active = true
  1215		}
  1216		return camtypes.Path{
  1217			Claim:     claim,
  1218			Base:      base,
  1219			Target:    target,
  1220			ClaimDate: date,
  1221			Suffix:    urld(valPart[3]),
  1222		}, true, active
  1223	}
  1224	
  1225	func (x *Index) PathsLookup(ctx context.Context, signer, base blob.Ref, suffix string) (paths []*camtypes.Path, err error) {
  1226		paths = []*camtypes.Path{}
  1227		keyId, err := x.KeyId(ctx, signer)
  1228		if err != nil {
  1229			if err == sorted.ErrNotFound {
  1230				err = nil
  1231			}
  1232			return
  1233		}
  1234	
  1235		it := x.queryPrefix(keyPathForward, keyId, base, suffix)
  1236		defer closeIterator(it, &err)
  1237		for it.Next() {
  1238			p, ok, active := kvPathForward(it.Key(), it.Value())
  1239			if !ok {
  1240				continue
  1241			}
  1242			if x.IsDeleted(p.Claim) {
  1243				continue
  1244			}
  1245			if x.IsDeleted(p.Target) {
  1246				continue
  1247			}
  1248	
  1249			// TODO(bradfitz): investigate what's up with deleted
  1250			// forward path claims here.  Needs docs with the
  1251			// interface too, and tests.
  1252			_ = active
  1253	
  1254			paths = append(paths, &p)
  1255		}
  1256		return
  1257	}
  1258	
  1259	func kvPathForward(k, v string) (p camtypes.Path, ok bool, active bool) {
  1260		// TODO(bradfitz): garbage
  1261		keyPart := strings.Split(k, "|")
  1262		valPart := strings.Split(v, "|")
  1263		if len(keyPart) != 6 || len(valPart) != 2 {
  1264			// TODO(mpl): use glog
  1265			log.Printf("bogus keyPathForward index entry: %q = %q", k, v)
  1266			return
  1267		}
  1268		if keyPart[0] != "path" {
  1269			return
  1270		}
  1271		base, ok := blob.Parse(keyPart[2])
  1272		if !ok {
  1273			log.Printf("bogus base in keyPathForward index entry: %q", keyPart[2])
  1274			return
  1275		}
  1276		date, err := time.Parse(time.RFC3339, unreverseTimeString(keyPart[4]))
  1277		if err != nil {
  1278			log.Printf("bogus date in keyPathForward index entry: %q", keyPart[4])
  1279			return
  1280		}
  1281		claim, ok := blob.Parse(keyPart[5])
  1282		if !ok {
  1283			log.Printf("bogus claim in keyPathForward index entry: %q", keyPart[5])
  1284			return
  1285		}
  1286		if valPart[0] == "Y" {
  1287			active = true
  1288		}
  1289		target, ok := blob.Parse(valPart[1])
  1290		if !ok {
  1291			log.Printf("bogus target in keyPathForward index entry: %q", valPart[1])
  1292			return
  1293		}
  1294		return camtypes.Path{
  1295			Claim:     claim,
  1296			Base:      base,
  1297			Target:    target,
  1298			ClaimDate: date,
  1299			Suffix:    urld(keyPart[3]),
  1300		}, true, active
  1301	}
  1302	
  1303	func (x *Index) PathLookup(ctx context.Context, signer, base blob.Ref, suffix string, at time.Time) (*camtypes.Path, error) {
  1304		paths, err := x.PathsLookup(ctx, signer, base, suffix)
  1305		if err != nil {
  1306			return nil, err
  1307		}
  1308		var (
  1309			newest    = int64(0)
  1310			atSeconds = int64(0)
  1311			best      *camtypes.Path
  1312		)
  1313	
  1314		if !at.IsZero() {
  1315			atSeconds = at.Unix()
  1316		}
  1317	
  1318		for _, path := range paths {
  1319			t := path.ClaimDate
  1320			secs := t.Unix()
  1321			if atSeconds != 0 && secs > atSeconds {
  1322				// Too new
  1323				continue
  1324			}
  1325			if newest > secs {
  1326				// Too old
  1327				continue
  1328			}
  1329			// Just right
  1330			newest, best = secs, path
  1331		}
  1332		if best == nil {
  1333			return nil, os.ErrNotExist
  1334		}
  1335		return best, nil
  1336	}
  1337	
  1338	func (x *Index) existingFileSchemas(wholeRef blob.Ref) (schemaRefs []blob.Ref, err error) {
  1339		it := x.queryPrefix(keyWholeToFileRef, wholeRef)
  1340		defer closeIterator(it, &err)
  1341		for it.Next() {
  1342			keyPart := strings.Split(it.Key(), "|")[1:]
  1343			if len(keyPart) < 2 {
  1344				continue
  1345			}
  1346			ref, ok := blob.Parse(keyPart[1])
  1347			if ok {
  1348				schemaRefs = append(schemaRefs, ref)
  1349			}
  1350		}
  1351		return schemaRefs, nil
  1352	}
  1353	
  1354	// WholeRefToFile maps a file contents blobRef (a "wholeRef"), to the file schemas with those contents.
  1355	type WholeRefToFile map[string][]blob.Ref
  1356	
  1357	// ExistingFileSchemas returns the file schemas for the provided file contents refs.
  1358	func (x *Index) ExistingFileSchemas(wholeRef ...blob.Ref) (WholeRefToFile, error) {
  1359		schemaRefs := make(WholeRefToFile)
  1360		for _, v := range wholeRef {
  1361			newRefs, err := x.existingFileSchemas(v)
  1362			if err != nil {
  1363				return nil, err
  1364			}
  1365			schemaRefs[v.String()] = newRefs
  1366		}
  1367		return schemaRefs, nil
  1368	}
  1369	
  1370	func (x *Index) loadKey(key string, val *string, err *error, wg *sync.WaitGroup) {
  1371		defer wg.Done()
  1372		*val, *err = x.s.Get(key)
  1373	}
  1374	
  1375	func (x *Index) GetFileInfo(ctx context.Context, fileRef blob.Ref) (camtypes.FileInfo, error) {
  1376		if x.corpus != nil {
  1377			return x.corpus.GetFileInfo(ctx, fileRef)
  1378		}
  1379		ikey := "fileinfo|" + fileRef.String()
  1380		tkey := keyFileTimes.name + "|" + fileRef.String()
  1381		// TODO: switch this to use syncutil.Group
  1382		wg := new(sync.WaitGroup)
  1383		wg.Add(2)
  1384		var iv, tv string // info value, time value
  1385		var ierr, terr error
  1386		go x.loadKey(ikey, &iv, &ierr, wg)
  1387		go x.loadKey(tkey, &tv, &terr, wg)
  1388		wg.Wait()
  1389	
  1390		if ierr == sorted.ErrNotFound {
  1391			return camtypes.FileInfo{}, os.ErrNotExist
  1392		}
  1393		if ierr != nil {
  1394			return camtypes.FileInfo{}, ierr
  1395		}
  1396		valPart := strings.Split(iv, "|")
  1397		if len(valPart) < 3 {
  1398			x.logf("bogus key %q = %q", ikey, iv)
  1399			return camtypes.FileInfo{}, os.ErrNotExist
  1400		}
  1401		var wholeRef blob.Ref
  1402		if len(valPart) >= 4 {
  1403			wholeRef, _ = blob.Parse(valPart[3])
  1404		}
  1405		size, err := strconv.ParseInt(valPart[0], 10, 64)
  1406		if err != nil {
  1407			x.logf("bogus integer at position 0 in key %q = %q", ikey, iv)
  1408			return camtypes.FileInfo{}, os.ErrNotExist
  1409		}
  1410		fileName := urld(valPart[1])
  1411		fi := camtypes.FileInfo{
  1412			Size:     size,
  1413			FileName: fileName,
  1414			MIMEType: urld(valPart[2]),
  1415			WholeRef: wholeRef,
  1416		}
  1417	
  1418		if tv != "" {
  1419			times := strings.Split(urld(tv), ",")
  1420			updateFileInfoTimes(&fi, times)
  1421		}
  1422	
  1423		return fi, nil
  1424	}
  1425	
  1426	func updateFileInfoTimes(fi *camtypes.FileInfo, times []string) {
  1427		if len(times) == 0 {
  1428			return
  1429		}
  1430		fi.Time = types.ParseTime3339OrNil(times[0])
  1431		if len(times) == 2 {
  1432			fi.ModTime = types.ParseTime3339OrNil(times[1])
  1433		}
  1434	}
  1435	
  1436	// v is "width|height"
  1437	func kvImageInfo(v []byte) (ii camtypes.ImageInfo, ok bool) {
  1438		pipei := bytes.IndexByte(v, '|')
  1439		if pipei < 0 {
  1440			return
  1441		}
  1442		w, err := strutil.ParseUintBytes(v[:pipei], 10, 16)
  1443		if err != nil {
  1444			return
  1445		}
  1446		h, err := strutil.ParseUintBytes(v[pipei+1:], 10, 16)
  1447		if err != nil {
  1448			return
  1449		}
  1450		ii.Width = uint16(w)
  1451		ii.Height = uint16(h)
  1452		return ii, true
  1453	}
  1454	
  1455	func (x *Index) GetImageInfo(ctx context.Context, fileRef blob.Ref) (camtypes.ImageInfo, error) {
  1456		if x.corpus != nil {
  1457			return x.corpus.GetImageInfo(ctx, fileRef)
  1458		}
  1459		// it might be that the key does not exist because image.DecodeConfig failed earlier
  1460		// (because of unsupported JPEG features like progressive mode).
  1461		key := keyImageSize.Key(fileRef.String())
  1462		v, err := x.s.Get(key)
  1463		if err == sorted.ErrNotFound {
  1464			err = os.ErrNotExist
  1465		}
  1466		if err != nil {
  1467			return camtypes.ImageInfo{}, err
  1468		}
  1469		ii, ok := kvImageInfo([]byte(v))
  1470		if !ok {
  1471			return camtypes.ImageInfo{}, fmt.Errorf("index: bogus key %q = %q", key, v)
  1472		}
  1473		return ii, nil
  1474	}
  1475	
  1476	func (x *Index) GetMediaTags(ctx context.Context, fileRef blob.Ref) (tags map[string]string, err error) {
  1477		if x.corpus != nil {
  1478			return x.corpus.GetMediaTags(ctx, fileRef)
  1479		}
  1480		fi, err := x.GetFileInfo(ctx, fileRef)
  1481		if err != nil {
  1482			return nil, err
  1483		}
  1484		it := x.queryPrefix(keyMediaTag, fi.WholeRef.String())
  1485		defer closeIterator(it, &err)
  1486		for it.Next() {
  1487			if tags == nil {
  1488				tags = make(map[string]string)
  1489			}
  1490			tags[it.Key()] = it.Value()
  1491		}
  1492		return tags, nil
  1493	}
  1494	
  1495	func (x *Index) GetFileLocation(ctx context.Context, fileRef blob.Ref) (camtypes.Location, error) {
  1496		if x.corpus != nil {
  1497			lat, long, ok := x.corpus.FileLatLong(fileRef)
  1498			if !ok {
  1499				return camtypes.Location{}, os.ErrNotExist
  1500			}
  1501			// TODO(mpl): Brad says to move this check lower, in corpus func and/or when building corpus from index rows.
  1502			if math.IsNaN(long) || math.IsNaN(lat) {
  1503				return camtypes.Location{}, fmt.Errorf("latitude or longitude in corpus for %v is NaN. Reindex to fix it", fileRef)
  1504			}
  1505			return camtypes.Location{Latitude: lat, Longitude: long}, nil
  1506		}
  1507		fi, err := x.GetFileInfo(ctx, fileRef)
  1508		if err != nil {
  1509			return camtypes.Location{}, err
  1510		}
  1511		it := x.queryPrefixString(keyEXIFGPS.Key(fi.WholeRef.String()))
  1512		defer closeIterator(it, &err)
  1513		if !it.Next() {
  1514			return camtypes.Location{}, os.ErrNotExist
  1515		}
  1516	
  1517		var lat, long float64
  1518		key, v := it.Key(), it.Value()
  1519		pipe := strings.Index(v, "|")
  1520		if pipe < 0 {
  1521			return camtypes.Location{}, fmt.Errorf("index: bogus key %q = %q", key, v)
  1522		}
  1523		lat, err = strconv.ParseFloat(v[:pipe], 64)
  1524		if err != nil {
  1525			return camtypes.Location{}, fmt.Errorf("index: bogus value at position 0 in key %q = %q", key, v)
  1526		}
  1527		long, err = strconv.ParseFloat(v[pipe+1:], 64)
  1528		if err != nil {
  1529			return camtypes.Location{}, fmt.Errorf("index: bogus value at position 1 in key %q = %q", key, v)
  1530		}
  1531		if math.IsNaN(long) || math.IsNaN(lat) {
  1532			return camtypes.Location{}, fmt.Errorf("latitude or longitude in index for %v is NaN. Reindex to fix it", fileRef)
  1533		}
  1534		return camtypes.Location{Latitude: lat, Longitude: long}, nil
  1535	}
  1536	
  1537	func (x *Index) EdgesTo(ref blob.Ref, opts *camtypes.EdgesToOpts) (edges []*camtypes.Edge, err error) {
  1538		it := x.queryPrefix(keyEdgeBackward, ref)
  1539		defer closeIterator(it, &err)
  1540		permanodeParents := make(map[string]*camtypes.Edge)
  1541		for it.Next() {
  1542			edge, ok := kvEdgeBackward(it.Key(), it.Value())
  1543			if !ok {
  1544				continue
  1545			}
  1546			if x.IsDeleted(edge.From) {
  1547				continue
  1548			}
  1549			if x.IsDeleted(edge.BlobRef) {
  1550				continue
  1551			}
  1552			edge.To = ref
  1553			if edge.FromType == schema.TypePermanode {
  1554				permanodeParents[edge.From.String()] = edge
  1555			} else {
  1556				edges = append(edges, edge)
  1557			}
  1558		}
  1559		for _, e := range permanodeParents {
  1560			edges = append(edges, e)
  1561		}
  1562		return edges, nil
  1563	}
  1564	
  1565	func kvEdgeBackward(k, v string) (edge *camtypes.Edge, ok bool) {
  1566		// TODO(bradfitz): garbage
  1567		keyPart := strings.Split(k, "|")
  1568		valPart := strings.Split(v, "|")
  1569		if len(keyPart) != 4 || len(valPart) != 2 {
  1570			// TODO(mpl): use glog
  1571			log.Printf("bogus keyEdgeBackward index entry: %q = %q", k, v)
  1572			return
  1573		}
  1574		if keyPart[0] != "edgeback" {
  1575			return
  1576		}
  1577		parentRef, ok := blob.Parse(keyPart[2])
  1578		if !ok {
  1579			log.Printf("bogus parent in keyEdgeBackward index entry: %q", keyPart[2])
  1580			return
  1581		}
  1582		blobRef, ok := blob.Parse(keyPart[3])
  1583		if !ok {
  1584			log.Printf("bogus blobref in keyEdgeBackward index entry: %q", keyPart[3])
  1585			return
  1586		}
  1587		return &camtypes.Edge{
  1588			From:      parentRef,
  1589			FromType:  schema.CamliType(valPart[0]),
  1590			FromTitle: valPart[1],
  1591			BlobRef:   blobRef,
  1592		}, true
  1593	}
  1594	
  1595	// GetDirMembers sends on dest the children of the static directory dir.
  1596	func (x *Index) GetDirMembers(ctx context.Context, dir blob.Ref, dest chan<- blob.Ref, limit int) (err error) {
  1597		defer close(dest)
  1598	
  1599		sent := 0
  1600		if x.corpus != nil {
  1601			children, err := x.corpus.GetDirChildren(ctx, dir)
  1602			if err != nil {
  1603				return err
  1604			}
  1605			for child := range children {
  1606				dest <- child
  1607				sent++
  1608				if sent == limit {
  1609					break
  1610				}
  1611			}
  1612			return nil
  1613		}
  1614	
  1615		it := x.queryPrefix(keyStaticDirChild, dir.String())
  1616		defer closeIterator(it, &err)
  1617		for it.Next() {
  1618			keyPart := strings.Split(it.Key(), "|")
  1619			if len(keyPart) != 3 {
  1620				return fmt.Errorf("index: bogus key keyStaticDirChild = %q", it.Key())
  1621			}
  1622	
  1623			child, ok := blob.Parse(keyPart[2])
  1624			if !ok {
  1625				continue
  1626			}
  1627			dest <- child
  1628			sent++
  1629			if sent == limit {
  1630				break
  1631			}
  1632		}
  1633		return nil
  1634	}
  1635	
  1636	func kvBlobMeta(k, v string) (bm camtypes.BlobMeta, ok bool) {
  1637		refStr := k[len("meta:"):]
  1638		br, ok := blob.Parse(refStr)
  1639		if !ok {
  1640			return
  1641		}
  1642		pipe := strings.Index(v, "|")
  1643		if pipe < 0 {
  1644			return
  1645		}
  1646		size, err := strconv.ParseUint(v[:pipe], 10, 32)
  1647		if err != nil {
  1648			return
  1649		}
  1650		return camtypes.BlobMeta{
  1651			Ref:       br,
  1652			Size:      uint32(size),
  1653			CamliType: camliTypeFromMIME(v[pipe+1:]),
  1654		}, true
  1655	}
  1656	
  1657	func kvBlobMeta_bytes(k, v []byte) (bm camtypes.BlobMeta, ok bool) {
  1658		ref := k[len("meta:"):]
  1659		br, ok := blob.ParseBytes(ref)
  1660		if !ok {
  1661			return
  1662		}
  1663		pipe := bytes.IndexByte(v, '|')
  1664		if pipe < 0 {
  1665			return
  1666		}
  1667		size, err := strutil.ParseUintBytes(v[:pipe], 10, 32)
  1668		if err != nil {
  1669			return
  1670		}
  1671		return camtypes.BlobMeta{
  1672			Ref:       br,
  1673			Size:      uint32(size),
  1674			CamliType: camliTypeFromMIME_bytes(v[pipe+1:]),
  1675		}, true
  1676	}
  1677	
  1678	func enumerateBlobMeta(s sorted.KeyValue, cb func(camtypes.BlobMeta) error) (err error) {
  1679		it := queryPrefixString(s, "meta:")
  1680		defer closeIterator(it, &err)
  1681		for it.Next() {
  1682			bm, ok := kvBlobMeta(it.Key(), it.Value())
  1683			if !ok {
  1684				continue
  1685			}
  1686			if err := cb(bm); err != nil {
  1687				return err
  1688			}
  1689		}
  1690		return nil
  1691	}
  1692	
  1693	var errStopIteration = errors.New("stop iteration") // local error, doesn't escape this package
  1694	
  1695	// EnumerateBlobMeta calls fn for all known meta blobs.
  1696	// If fn returns false, iteration stops and an nil error is returned.
  1697	// If ctx becomes done, iteration stops and ctx.Err() is returned.
  1698	func (x *Index) EnumerateBlobMeta(ctx context.Context, fn func(camtypes.BlobMeta) bool) error {
  1699		if x.corpus != nil {
  1700			var err error
  1701			var n int
  1702			done := ctx.Done()
  1703			x.corpus.EnumerateBlobMeta(func(m camtypes.BlobMeta) bool {
  1704				// Every so often, check context.
  1705				n++
  1706				if n%256 == 0 {
  1707					select {
  1708					case <-done:
  1709						err = ctx.Err()
  1710						return false
  1711					default:
  1712	
  1713					}
  1714				}
  1715				return fn(m)
  1716			})
  1717			return err
  1718		}
  1719		done := ctx.Done()
  1720		err := enumerateBlobMeta(x.s, func(bm camtypes.BlobMeta) error {
  1721			select {
  1722			case <-done:
  1723				return ctx.Err()
  1724			default:
  1725				if !fn(bm) {
  1726					return errStopIteration
  1727				}
  1728				return nil
  1729			}
  1730		})
  1731		if err == errStopIteration {
  1732			err = nil
  1733		}
  1734		return err
  1735	}
  1736	
  1737	// Storage returns the index's underlying Storage implementation.
  1738	func (x *Index) Storage() sorted.KeyValue { return x.s }
  1739	
  1740	// Close closes the underlying sorted.KeyValue, if the storage has a Close method.
  1741	// The return value is the return value of the underlying Close, or
  1742	// nil otherwise.
  1743	func (x *Index) Close() error {
  1744		if cl, ok := x.s.(io.Closer); ok {
  1745			return cl.Close()
  1746		}
  1747		return nil
  1748	}
  1749	
  1750	// initNeededMaps initializes x.needs and x.neededBy on start-up.
  1751	func (x *Index) initNeededMaps() (err error) {
  1752		x.deletes = newDeletionCache()
  1753		it := x.queryPrefix(keyMissing)
  1754		defer closeIterator(it, &err)
  1755		for it.Next() {
  1756			key := it.KeyBytes()
  1757			pair := key[len("missing|"):]
  1758			pipe := bytes.IndexByte(pair, '|')
  1759			if pipe < 0 {
  1760				return fmt.Errorf("Bogus missing key %q", key)
  1761			}
  1762			have, ok1 := blob.ParseBytes(pair[:pipe])
  1763			missing, ok2 := blob.ParseBytes(pair[pipe+1:])
  1764			if !ok1 || !ok2 {
  1765				return fmt.Errorf("Bogus missing key %q", key)
  1766			}
  1767			x.noteNeededMemory(have, missing)
  1768		}
  1769		return
  1770	}
  1771	
  1772	func (x *Index) noteNeeded(have, missing blob.Ref) error {
  1773		if err := x.s.Set(keyMissing.Key(have, missing), "1"); err != nil {
  1774			return err
  1775		}
  1776		x.noteNeededMemory(have, missing)
  1777		return nil
  1778	}
  1779	
  1780	func (x *Index) noteNeededMemory(have, missing blob.Ref) {
  1781		x.needs[have] = append(x.needs[have], missing)
  1782		x.neededBy[missing] = append(x.neededBy[missing], have)
  1783	}
  1784	
  1785	const camliTypeMIMEPrefix = "application/json; camliType="
  1786	
  1787	var camliTypeMIMEPrefixBytes = []byte(camliTypeMIMEPrefix)
  1788	
  1789	// "application/json; camliType=file" => "file"
  1790	// "image/gif" => ""
  1791	func camliTypeFromMIME(mime string) schema.CamliType {
  1792		if v := strings.TrimPrefix(mime, camliTypeMIMEPrefix); v != mime {
  1793			return schema.CamliType(v)
  1794		}
  1795		return ""
  1796	}
  1797	
  1798	func camliTypeFromMIME_bytes(mime []byte) schema.CamliType {
  1799		if v := bytes.TrimPrefix(mime, camliTypeMIMEPrefixBytes); len(v) != len(mime) {
  1800			return schema.CamliType(strutil.StringFromBytes(v))
  1801		}
  1802		return ""
  1803	}
  1804	
  1805	// TODO(bradfitz): rename this? This is really about signer-attr-value
  1806	// (PermanodeOfSignerAttrValue), and not about indexed attributes in general.
  1807	func IsIndexedAttribute(attr string) bool {
  1808		switch attr {
  1809		case "camliRoot", "camliImportRoot", "tag", "title":
  1810			return true
  1811		}
  1812		return false
  1813	}
  1814	
  1815	// IsBlobReferenceAttribute returns whether attr is an attribute whose
  1816	// value is a blob reference (e.g. camliMember) and thus something the
  1817	// indexers should keep inverted indexes on for parent/child-type
  1818	// relationships.
  1819	func IsBlobReferenceAttribute(attr string) bool {
  1820		switch attr {
  1821		case "camliMember":
  1822			return true
  1823		}
  1824		return false
  1825	}
  1826	
  1827	func IsFulltextAttribute(attr string) bool {
  1828		switch attr {
  1829		case "tag", "title":
  1830			return true
  1831		}
  1832		return false
  1833	}
Website layout inspired by memcached.
Content by the authors.