Home Download Docs Code Community
     1	/*
     2	Copyright 2011 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	package index
    18	
    19	import (
    20		"bytes"
    21		"context"
    22		"errors"
    23		"fmt"
    24		"io"
    25		"log"
    26		"math"
    27		"os"
    28		"runtime"
    29		"sort"
    30		"strconv"
    31		"strings"
    32		"sync"
    33		"time"
    34	
    35		"perkeep.org/pkg/blob"
    36		"perkeep.org/pkg/blobserver"
    37		"perkeep.org/pkg/env"
    38		"perkeep.org/pkg/schema"
    39		"perkeep.org/pkg/sorted"
    40		"perkeep.org/pkg/types/camtypes"
    41	
    42		"go4.org/jsonconfig"
    43		"go4.org/strutil"
    44		"go4.org/types"
    45	)
    46	
    47	func init() {
    48		blobserver.RegisterStorageConstructor("index", newFromConfig)
    49	}
    50	
    51	type Index struct {
    52		*blobserver.NoImplStorage
    53		reindex   bool // whether "reindex" was set in config (likely via perkeepd flag)
    54		keepGoing bool // whether "keepGoing" was set in config (likely via perkeepd flag)
    55		s         sorted.KeyValue
    56	
    57		KeyFetcher blob.Fetcher // for verifying claims
    58	
    59		// TODO(mpl): do not init and use deletes when we have a corpus. Since corpus has its own deletes now, they are redundant.
    60	
    61		// deletes is a cache to keep track of the deletion status (deleted vs undeleted)
    62		// of the blobs in the index. It makes for faster reads than the otherwise
    63		// recursive calls on the index.
    64		deletes *deletionCache
    65	
    66		corpus *Corpus // or nil, if not being kept in memory
    67	
    68		mu sync.RWMutex // guards following
    69		//mu syncdebug.RWMutexTracker  // (when debugging)
    70	
    71		// needs maps from a blob to the missing blobs it needs to
    72		// finish indexing.
    73		needs map[blob.Ref][]blob.Ref
    74		// neededBy is the inverse of needs. The keys are missing blobs
    75		// and the value(s) are blobs waiting to be reindexed.
    76		neededBy     map[blob.Ref][]blob.Ref
    77		readyReindex map[blob.Ref]bool // set of things ready to be re-indexed
    78		// reindexWg is used to make sure that we wait for all asynchronous, out
    79		// of order, indexing to be finished, at the end of reindexing.
    80		reindexWg sync.WaitGroup
    81		// blobSource is used for fetching blobs when indexing files and other
    82		// blobs types that reference other objects.
    83		// The only write access to blobSource should be its initialization (transition
    84		// from nil to non-nil), once, and protected by mu.
    85		blobSource blobserver.FetcherEnumerator
    86	
    87		hasWiped bool // whether Wipe has been called on s. So we don't redo it in Reindex() for nothing.
    88	}
    89	
    90	func (x *Index) Lock()    { x.mu.Lock() }
    91	func (x *Index) Unlock()  { x.mu.Unlock() }
    92	func (x *Index) RLock()   { x.mu.RLock() }
    93	func (x *Index) RUnlock() { x.mu.RUnlock() }
    94	
    95	var (
    96		_ blobserver.Storage = (*Index)(nil)
    97		_ Interface          = (*Index)(nil)
    98	)
    99	
   100	func (x *Index) logf(format string, args ...interface{}) {
   101		log.Printf("index: "+format, args...)
   102	}
   103	
   104	var aboutToReindex = false
   105	
   106	// SignerRefSet is the set of all blob Refs (of different hashes) that represent
   107	// the same signer GPG identity. They are stored as strings for allocation reasons:
   108	// we favor allocating when updating SignerRefSets in the corpus over when reading
   109	// them.
   110	type SignerRefSet []string
   111	
   112	// Owner is the set of methods that identify, through their GPG key, a signer of
   113	// claims and permanodes.
   114	type Owner struct {
   115		keyID []string
   116		// blobByKeyID maps an owner GPG ID to all its owner blobs (because different hashes).
   117		// refs are stored as strings for allocation reasons.
   118		blobByKeyID map[string]SignerRefSet
   119	}
   120	
   121	// NewOwner returns an Owner that associates keyID with ref.
   122	func NewOwner(keyID string, ref blob.Ref) *Owner {
   123		return &Owner{
   124			keyID:       []string{keyID},
   125			blobByKeyID: map[string]SignerRefSet{keyID: {ref.String()}},
   126		}
   127	}
   128	
   129	// KeyID returns the GPG key ID (e.g. 2931A67C26F5ABDA) of the owner. Its
   130	// signature might change when support for multiple GPG keys is introduced.
   131	func (o *Owner) KeyID() string {
   132		if o == nil || len(o.keyID) == 0 {
   133			return ""
   134		}
   135		return o.keyID[0]
   136	}
   137	
   138	// RefSet returns the set of refs that represent the same owner as keyID.
   139	func (o *Owner) RefSet(keyID string) SignerRefSet {
   140		if o == nil || len(o.blobByKeyID) == 0 {
   141			return nil
   142		}
   143		refs := o.blobByKeyID[keyID]
   144		if len(refs) == 0 {
   145			return nil
   146		}
   147		return refs
   148	}
   149	
   150	// BlobRef returns the currently recommended ref implementation of the owner GPG
   151	// key blob. Its signature might change when support for multiple hashes and/or
   152	// multiple GPG keys is introduced.
   153	func (o *Owner) BlobRef() blob.Ref {
   154		if o == nil || len(o.blobByKeyID) == 0 {
   155			return blob.Ref{}
   156		}
   157		refs := o.blobByKeyID[o.KeyID()]
   158		if len(refs) == 0 {
   159			return blob.Ref{}
   160		}
   161		ref, ok := blob.Parse(refs[0])
   162		if !ok {
   163			return blob.Ref{}
   164		}
   165		return ref
   166	}
   167	
   168	// TODO(mpl): I'm not sure there are any cases where we don't want the index to
   169	// have a blobSource, so maybe we should phase out InitBlobSource and integrate it
   170	// to New or something. But later.
   171	
   172	// InitBlobSource sets the index's blob source and starts the background
   173	// out-of-order indexing loop. It panics if the blobSource is already set.
   174	// If the index's key fetcher is nil, it is also set to the blobSource
   175	// argument.
   176	func (x *Index) InitBlobSource(blobSource blobserver.FetcherEnumerator) {
   177		x.Lock()
   178		defer x.Unlock()
   179		if x.blobSource != nil {
   180			panic("blobSource of Index already set")
   181		}
   182		x.blobSource = blobSource
   183		if x.KeyFetcher == nil {
   184			x.KeyFetcher = blobSource
   185		}
   186	}
   187	
   188	// New returns a new index using the provided key/value storage implementation.
   189	func New(s sorted.KeyValue) (*Index, error) {
   190		idx := &Index{
   191			s:            s,
   192			needs:        make(map[blob.Ref][]blob.Ref),
   193			neededBy:     make(map[blob.Ref][]blob.Ref),
   194			readyReindex: make(map[blob.Ref]bool),
   195		}
   196		if aboutToReindex {
   197			idx.deletes = newDeletionCache()
   198			return idx, nil
   199		}
   200	
   201		schemaVersion := idx.schemaVersion()
   202		switch {
   203		case schemaVersion == 0 && idx.isEmpty():
   204			// New index.
   205			err := idx.s.Set(keySchemaVersion.name, fmt.Sprint(requiredSchemaVersion))
   206			if err != nil {
   207				return nil, fmt.Errorf("Could not write index schema version %q: %w", requiredSchemaVersion, err)
   208			}
   209		case schemaVersion != requiredSchemaVersion:
   210			tip := ""
   211			if env.IsDev() {
   212				// Good signal that we're using the devcam server, so help out
   213				// the user with a more useful tip:
   214				tip = `(For the dev server, run "devcam server --wipe" to wipe both your blobs and index)`
   215			} else {
   216				if is4To5SchemaBump(schemaVersion) {
   217					return idx, errMissingWholeRef
   218				}
   219				tip = "Run 'perkeepd --reindex' (it might take awhile, but shows status). Alternative: 'camtool dbinit' (or just delete the file for a file based index), and then 'camtool sync --all'"
   220			}
   221			return nil, fmt.Errorf("index schema version is %d; required one is %d. You need to reindex. %s",
   222				schemaVersion, requiredSchemaVersion, tip)
   223		}
   224		if err := idx.initDeletesCache(); err != nil {
   225			return nil, fmt.Errorf("Could not initialize index's deletes cache: %w", err)
   226		}
   227		if err := idx.initNeededMaps(); err != nil {
   228			return nil, fmt.Errorf("Could not initialize index's missing blob maps: %w", err)
   229		}
   230		return idx, nil
   231	}
   232	
   233	func is4To5SchemaBump(schemaVersion int) bool {
   234		return schemaVersion == 4 && requiredSchemaVersion == 5
   235	}
   236	
   237	var errMissingWholeRef = errors.New("missing wholeRef field in fileInfo rows")
   238	
   239	// fixMissingWholeRef appends the wholeRef to all the keyFileInfo rows values. It should
   240	// only be called to upgrade a version 4 index schema to version 5.
   241	func (x *Index) fixMissingWholeRef(fetcher blob.Fetcher) (err error) {
   242		// We did that check from the caller, but double-check again to prevent from misuse
   243		// of that function.
   244		if x.schemaVersion() != 4 || requiredSchemaVersion != 5 {
   245			panic("fixMissingWholeRef should only be used when upgrading from v4 to v5 of the index schema")
   246		}
   247		x.logf("fixing the missing wholeRef in the fileInfo rows...")
   248		defer func() {
   249			if err != nil {
   250				x.logf("fixing the fileInfo rows failed: %v", err)
   251				return
   252			}
   253			x.logf("successfully fixed wholeRef in FileInfo rows.")
   254		}()
   255	
   256		// first build a reverted keyWholeToFileRef map, so we can get the wholeRef from the fileRef easily.
   257		fileRefToWholeRef := make(map[blob.Ref]blob.Ref)
   258		it := x.queryPrefix(keyWholeToFileRef)
   259		var keyA [3]string
   260		for it.Next() {
   261			keyPart := strutil.AppendSplitN(keyA[:0], it.Key(), "|", 3)
   262			if len(keyPart) != 3 {
   263				return fmt.Errorf("bogus keyWholeToFileRef key: got %q, wanted \"wholetofile|wholeRef|fileRef\"", it.Key())
   264			}
   265			wholeRef, ok1 := blob.Parse(keyPart[1])
   266			fileRef, ok2 := blob.Parse(keyPart[2])
   267			if !ok1 || !ok2 {
   268				return fmt.Errorf("bogus part in keyWholeToFileRef key: %q", it.Key())
   269			}
   270			fileRefToWholeRef[fileRef] = wholeRef
   271		}
   272		if err := it.Close(); err != nil {
   273			return err
   274		}
   275	
   276		var fixedEntries, missedEntries int
   277		t := time.NewTicker(5 * time.Second)
   278		defer t.Stop()
   279		// We record the mutations and set them all after the iteration because of the sqlite locking:
   280		// since BeginBatch takes a lock, and Find too, we would deadlock at queryPrefix if we
   281		// started a batch mutation before.
   282		mutations := make(map[string]string)
   283		keyPrefix := keyFileInfo.name + "|"
   284		it = x.queryPrefix(keyFileInfo)
   285		defer it.Close()
   286		var valA [3]string
   287		for it.Next() {
   288			select {
   289			case <-t.C:
   290				x.logf("recorded %d missing wholeRef that we'll try to fix, and %d that we can't fix.", fixedEntries, missedEntries)
   291			default:
   292			}
   293			br, ok := blob.ParseBytes(it.KeyBytes()[len(keyPrefix):])
   294			if !ok {
   295				return fmt.Errorf("invalid blobRef %q", it.KeyBytes()[len(keyPrefix):])
   296			}
   297			wholeRef, ok := fileRefToWholeRef[br]
   298			if !ok {
   299				missedEntries++
   300				x.logf("WARNING: wholeRef for %v not found in index. You should probably rebuild the whole index.", br)
   301				continue
   302			}
   303			valPart := strutil.AppendSplitN(valA[:0], it.Value(), "|", 3)
   304			// The old format we're fixing should be: size|filename|mimetype
   305			if len(valPart) != 3 {
   306				return fmt.Errorf("bogus keyFileInfo value: got %q, wanted \"size|filename|mimetype\"", it.Value())
   307			}
   308			size_s, filename, mimetype := valPart[0], valPart[1], urld(valPart[2])
   309			if strings.Contains(mimetype, "|") {
   310				// I think this can only happen for people migrating from a commit at least as recent as
   311				// 8229c1985079681a652cb65551b4e80a10d135aa, when wholeRef was introduced to keyFileInfo
   312				// but there was no migration code yet.
   313				// For the "production" migrations between 0.8 and 0.9, the index should not have any wholeRef
   314				// in the keyFileInfo entries. So if something goes wrong and is somehow linked to that happening,
   315				// I'd like to know about it, hence the logging.
   316				x.logf("%v: %v already has a wholeRef, not fixing it", it.Key(), it.Value())
   317				continue
   318			}
   319			size, err := strconv.Atoi(size_s)
   320			if err != nil {
   321				return fmt.Errorf("bogus size in keyFileInfo value %v: %w", it.Value(), err)
   322			}
   323			mutations[keyFileInfo.Key(br)] = keyFileInfo.Val(size, filename, mimetype, wholeRef)
   324			fixedEntries++
   325		}
   326		if err := it.Close(); err != nil {
   327			return err
   328		}
   329		x.logf("starting to commit the missing wholeRef fixes (%d entries) now, this can take a while.", fixedEntries)
   330		bm := x.s.BeginBatch()
   331		for k, v := range mutations {
   332			bm.Set(k, v)
   333		}
   334		bm.Set(keySchemaVersion.name, "5")
   335		if err := x.s.CommitBatch(bm); err != nil {
   336			return err
   337		}
   338		if missedEntries > 0 {
   339			x.logf("some missing wholeRef entries were not fixed (%d), you should do a full reindex.", missedEntries)
   340		}
   341		return nil
   342	}
   343	
   344	func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) {
   345		blobPrefix := config.RequiredString("blobSource")
   346		kvConfig := config.RequiredObject("storage")
   347		reindex := config.OptionalBool("reindex", false)
   348		keepGoing := config.OptionalBool("keepGoing", false)
   349	
   350		if err := config.Validate(); err != nil {
   351			return nil, err
   352		}
   353	
   354		kv, err := sorted.NewKeyValue(kvConfig)
   355		if err != nil {
   356			var nwe sorted.NeedWipeError
   357			if !errors.As(err, &nwe) {
   358				return nil, err
   359			}
   360			if !reindex {
   361				return nil, err
   362			}
   363		}
   364		if reindex {
   365			aboutToReindex = true
   366			wiper, ok := kv.(sorted.Wiper)
   367			if !ok {
   368				return nil, fmt.Errorf("index's storage type %T doesn't support sorted.Wiper", kv)
   369			}
   370			if err := wiper.Wipe(); err != nil {
   371				return nil, fmt.Errorf("error wiping index's sorted key/value type %T: %w", kv, err)
   372			}
   373			log.Printf("Index wiped.")
   374		}
   375	
   376		sto, err := ld.GetStorage(blobPrefix)
   377		if err != nil {
   378			return nil, err
   379		}
   380	
   381		ix, err := New(kv)
   382		// TODO(mpl): next time we need to do another fix, make a new error
   383		// type that lets us apply the needed fix depending on its value or
   384		// something. For now just one value/fix.
   385		if errors.Is(err, errMissingWholeRef) {
   386			// TODO: maybe we don't want to do that automatically. Brad says
   387			// we have to think about the case on GCE/CoreOS in particular.
   388			if err := ix.fixMissingWholeRef(sto); err != nil {
   389				ix.Close()
   390				return nil, fmt.Errorf("could not fix missing wholeRef entries: %w", err)
   391			}
   392			ix, err = New(kv)
   393		}
   394		ix.keepGoing = keepGoing
   395		ix.reindex = reindex
   396		if reindex {
   397			ix.hasWiped = true
   398		}
   399		if err != nil {
   400			return nil, err
   401		}
   402		ix.InitBlobSource(sto)
   403	
   404		if !reindex {
   405			if err := ix.integrityCheck(3 * time.Second); err != nil {
   406				return nil, err
   407			}
   408		}
   409	
   410		return ix, err
   411	}
   412	
   413	func (x *Index) String() string {
   414		return fmt.Sprintf("Perkeep index, using key/value implementation %T", x.s)
   415	}
   416	
   417	func (x *Index) isEmpty() bool {
   418		iter := x.s.Find("", "")
   419		hasRows := iter.Next()
   420		if err := iter.Close(); err != nil {
   421			panic(err)
   422		}
   423		return !hasRows
   424	}
   425	
   426	// reindexMaxProcs is the number of concurrent goroutines that will be used for reindexing.
   427	var reindexMaxProcs = struct {
   428		sync.RWMutex
   429		v int
   430	}{v: runtime.NumCPU()}
   431	
   432	// SetReindexMaxProcs sets the maximum number of concurrent goroutines that are
   433	// used during reindexing.
   434	func SetReindexMaxProcs(n int) {
   435		reindexMaxProcs.Lock()
   436		defer reindexMaxProcs.Unlock()
   437		reindexMaxProcs.v = n
   438	}
   439	
   440	// ReindexMaxProcs returns the maximum number of concurrent goroutines that are
   441	// used during reindexing.
   442	func ReindexMaxProcs() int {
   443		reindexMaxProcs.RLock()
   444		defer reindexMaxProcs.RUnlock()
   445		return reindexMaxProcs.v
   446	}
   447	
   448	func (x *Index) WantsReindex() bool   { return x.reindex }
   449	func (x *Index) WantsKeepGoing() bool { return x.keepGoing }
   450	
   451	func (x *Index) Reindex() error {
   452		x.Lock()
   453		if x.blobSource == nil {
   454			x.Unlock()
   455			return errors.New("index: can't re-index: no blobSource")
   456		}
   457		x.Unlock()
   458	
   459		reindexMaxProcs.RLock()
   460		defer reindexMaxProcs.RUnlock()
   461	
   462		ctx := context.Background()
   463	
   464		if !x.hasWiped {
   465			wiper, ok := x.s.(sorted.Wiper)
   466			if !ok {
   467				return fmt.Errorf("index's storage type %T doesn't support sorted.Wiper", x.s)
   468			}
   469			log.Printf("Wiping index storage type %T ...", x.s)
   470			if err := wiper.Wipe(); err != nil {
   471				return fmt.Errorf("error wiping index's sorted key/value type %T: %w", x.s, err)
   472			}
   473			log.Printf("Index wiped.")
   474		}
   475		log.Printf("Rebuilding index...")
   476	
   477		reindexStart, _ := blob.Parse(os.Getenv("CAMLI_REINDEX_START"))
   478	
   479		err := x.s.Set(keySchemaVersion.name, fmt.Sprintf("%d", requiredSchemaVersion))
   480		if err != nil {
   481			return err
   482		}
   483	
   484		var nerrmu sync.Mutex
   485		nerr := 0
   486	
   487		blobc := make(chan blob.Ref, 32)
   488	
   489		enumCtx := context.Background()
   490		enumErr := make(chan error, 1)
   491		go func() {
   492			defer close(blobc)
   493			donec := enumCtx.Done()
   494			var lastTick time.Time
   495			enumErr <- blobserver.EnumerateAll(enumCtx, x.blobSource, func(sb blob.SizedRef) error {
   496				now := time.Now()
   497				if lastTick.Before(now.Add(-1 * time.Second)) {
   498					log.Printf("Reindexing at %v", sb.Ref)
   499					lastTick = now
   500				}
   501				if reindexStart.Valid() && sb.Ref.Less(reindexStart) {
   502					return nil
   503				}
   504				select {
   505				case <-donec:
   506					return ctx.Err()
   507				case blobc <- sb.Ref:
   508					return nil
   509				}
   510			})
   511		}()
   512		var wg sync.WaitGroup
   513		for i := 0; i < reindexMaxProcs.v; i++ {
   514			wg.Add(1)
   515			go func() {
   516				defer wg.Done()
   517				for br := range blobc {
   518					if err := x.indexBlob(ctx, br); err != nil {
   519						log.Printf("Error reindexing %v: %v", br, err)
   520						nerrmu.Lock()
   521						nerr++
   522						nerrmu.Unlock()
   523						// TODO: flag (or default?) to stop the EnumerateAll above once
   524						// there's any error with reindexing?
   525					}
   526				}
   527			}()
   528		}
   529		if err := <-enumErr; err != nil {
   530			return err
   531		}
   532	
   533		wg.Wait()
   534		x.reindexWg.Wait()
   535	
   536		x.RLock()
   537		readyCount := len(x.readyReindex)
   538		needed := len(x.needs)
   539		var needSample bytes.Buffer
   540		if needed > 0 {
   541			n := 0
   542		Sample:
   543			for x, needs := range x.needs {
   544				for _, need := range needs {
   545					n++
   546					if n == 10 {
   547						break Sample
   548					}
   549					if n > 1 {
   550						fmt.Fprintf(&needSample, ", ")
   551					}
   552					fmt.Fprintf(&needSample, "%v needs %v", x, need)
   553				}
   554			}
   555		}
   556		x.RUnlock()
   557		if readyCount > 0 {
   558			return fmt.Errorf("%d blobs were ready to reindex in out-of-order queue, but not yet ran", readyCount)
   559		}
   560		if needed > 0 {
   561			return fmt.Errorf("%d blobs are still needed as dependencies; a sample: %s", needed, needSample.Bytes())
   562		}
   563	
   564		nerrmu.Lock() // no need to unlock
   565		if nerr != 0 {
   566			return fmt.Errorf("%d blobs failed to re-index", nerr)
   567		}
   568		if err := x.initDeletesCache(); err != nil {
   569			return err
   570		}
   571		log.Printf("Index rebuild complete.")
   572		return nil
   573	}
   574	
   575	// integrityCheck enumerates blobs through x.blobSource during timeout, and
   576	// verifies for each of them that it has a meta row in the index. It logs a message
   577	// if any of them is not found. It only returns an error if something went wrong
   578	// during the enumeration.
   579	func (x *Index) integrityCheck(timeout time.Duration) error {
   580		t0 := time.Now()
   581		x.logf("starting integrity check...")
   582		defer func() {
   583			x.logf("integrity check done (after %v)", time.Since(t0).Round(10*time.Millisecond))
   584		}()
   585		if x.blobSource == nil {
   586			return errors.New("index: can't check sanity of index: no blobSource")
   587		}
   588	
   589		// we don't actually need seen atm, but I anticipate we'll return it at some
   590		// point, so we can display the blobs that were tested/seen/missed on the web UI.
   591		seen := make([]blob.Ref, 0)
   592		notFound := make([]blob.Ref, 0)
   593		enumCtx := context.TODO()
   594		stopTime := time.NewTimer(timeout)
   595		defer stopTime.Stop()
   596		var errEOT = errors.New("time's out")
   597		if err := blobserver.EnumerateAll(enumCtx, x.blobSource, func(sb blob.SizedRef) error {
   598			select {
   599			case <-stopTime.C:
   600				return errEOT
   601			default:
   602			}
   603			if _, err := x.GetBlobMeta(enumCtx, sb.Ref); err != nil {
   604				if !os.IsNotExist(err) {
   605					return err
   606				}
   607				notFound = append(notFound, sb.Ref)
   608				return nil
   609			}
   610			seen = append(seen, sb.Ref)
   611			return nil
   612		}); err != nil && err != errEOT {
   613			return err
   614		}
   615		if len(notFound) > 0 {
   616			// TODO(mpl): at least on GCE, display that message and maybe more on a web UI page as well.
   617			x.logf("WARNING: sanity checking of the index found %d non-indexed blobs out of %d tested blobs. Reindexing is advised.", len(notFound), len(notFound)+len(seen))
   618		}
   619		return nil
   620	}
   621	
   622	func queryPrefixString(s sorted.KeyValue, prefix string) sorted.Iterator {
   623		if prefix == "" {
   624			return s.Find("", "")
   625		}
   626		lastByte := prefix[len(prefix)-1]
   627		if lastByte == 0xff {
   628			panic("unsupported query prefix ending in 0xff")
   629		}
   630		end := prefix[:len(prefix)-1] + string(lastByte+1)
   631		return s.Find(prefix, end)
   632	}
   633	
   634	func (x *Index) queryPrefixString(prefix string) sorted.Iterator {
   635		return queryPrefixString(x.s, prefix)
   636	}
   637	
   638	func queryPrefix(s sorted.KeyValue, key *keyType, args ...interface{}) sorted.Iterator {
   639		return queryPrefixString(s, key.Prefix(args...))
   640	}
   641	
   642	func (x *Index) queryPrefix(key *keyType, args ...interface{}) sorted.Iterator {
   643		return x.queryPrefixString(key.Prefix(args...))
   644	}
   645	
   646	func closeIterator(it sorted.Iterator, perr *error) {
   647		err := it.Close()
   648		if err != nil && *perr == nil {
   649			*perr = err
   650		}
   651	}
   652	
   653	// schemaVersion returns the version of schema as it is found
   654	// in the currently used index. If not found, it returns 0.
   655	func (x *Index) schemaVersion() int {
   656		schemaVersionStr, err := x.s.Get(keySchemaVersion.name)
   657		if err != nil {
   658			if errors.Is(err, sorted.ErrNotFound) {
   659				return 0
   660			}
   661			panic(fmt.Sprintf("Could not get index schema version: %v", err))
   662		}
   663		schemaVersion, err := strconv.Atoi(schemaVersionStr)
   664		if err != nil {
   665			panic(fmt.Sprintf("Bogus index schema version: %q", schemaVersionStr))
   666		}
   667		return schemaVersion
   668	}
   669	
   670	type deletion struct {
   671		deleter blob.Ref
   672		when    time.Time
   673	}
   674	
   675	type byDeletionDate []deletion
   676	
   677	func (d byDeletionDate) Len() int           { return len(d) }
   678	func (d byDeletionDate) Swap(i, j int)      { d[i], d[j] = d[j], d[i] }
   679	func (d byDeletionDate) Less(i, j int) bool { return d[i].when.Before(d[j].when) }
   680	
   681	type deletionCache struct {
   682		sync.RWMutex
   683		m map[blob.Ref][]deletion
   684	}
   685	
   686	func newDeletionCache() *deletionCache {
   687		return &deletionCache{
   688			m: make(map[blob.Ref][]deletion),
   689		}
   690	}
   691	
   692	// initDeletesCache creates and populates the deletion status cache used by the index
   693	// for faster calls to IsDeleted and DeletedAt. It is called by New.
   694	func (x *Index) initDeletesCache() (err error) {
   695		x.deletes = newDeletionCache()
   696		it := x.queryPrefix(keyDeleted)
   697		defer closeIterator(it, &err)
   698		for it.Next() {
   699			cl, ok := kvDeleted(it.Key())
   700			if !ok {
   701				return fmt.Errorf("Bogus keyDeleted entry key: want |\"deleted\"|<deleted blobref>|<reverse claimdate>|<deleter claim>|, got %q", it.Key())
   702			}
   703			targetDeletions := append(x.deletes.m[cl.Target],
   704				deletion{
   705					deleter: cl.BlobRef,
   706					when:    cl.Date,
   707				})
   708			sort.Sort(sort.Reverse(byDeletionDate(targetDeletions)))
   709			x.deletes.m[cl.Target] = targetDeletions
   710		}
   711		return err
   712	}
   713	
   714	func kvDeleted(k string) (c camtypes.Claim, ok bool) {
   715		// TODO(bradfitz): garbage
   716		keyPart := strings.Split(k, "|")
   717		if len(keyPart) != 4 {
   718			return
   719		}
   720		if keyPart[0] != "deleted" {
   721			return
   722		}
   723		target, ok := blob.Parse(keyPart[1])
   724		if !ok {
   725			return
   726		}
   727		claimRef, ok := blob.Parse(keyPart[3])
   728		if !ok {
   729			return
   730		}
   731		date, err := time.Parse(time.RFC3339, unreverseTimeString(keyPart[2]))
   732		if err != nil {
   733			return
   734		}
   735		return camtypes.Claim{
   736			BlobRef: claimRef,
   737			Target:  target,
   738			Date:    date,
   739			Type:    string(schema.DeleteClaim),
   740		}, true
   741	}
   742	
   743	// IsDeleted reports whether the provided blobref (of a permanode or
   744	// claim) should be considered deleted.
   745	func (x *Index) IsDeleted(br blob.Ref) bool {
   746		if x.deletes == nil {
   747			// We still allow the slow path, in case someone creates
   748			// their own Index without a deletes cache.
   749			return x.isDeletedNoCache(br)
   750		}
   751		x.deletes.RLock()
   752		defer x.deletes.RUnlock()
   753		return x.isDeleted(br)
   754	}
   755	
   756	// The caller must hold x.deletes.mu for read.
   757	func (x *Index) isDeleted(br blob.Ref) bool {
   758		deletes, ok := x.deletes.m[br]
   759		if !ok {
   760			return false
   761		}
   762		for _, v := range deletes {
   763			if !x.isDeleted(v.deleter) {
   764				return true
   765			}
   766		}
   767		return false
   768	}
   769	
   770	// Used when the Index has no deletes cache (x.deletes is nil).
   771	func (x *Index) isDeletedNoCache(br blob.Ref) bool {
   772		var err error
   773		it := x.queryPrefix(keyDeleted, br)
   774		for it.Next() {
   775			cl, ok := kvDeleted(it.Key())
   776			if !ok {
   777				panic(fmt.Sprintf("Bogus keyDeleted entry key: want |\"deleted\"|<deleted blobref>|<reverse claimdate>|<deleter claim>|, got %q", it.Key()))
   778			}
   779			if !x.isDeletedNoCache(cl.BlobRef) {
   780				closeIterator(it, &err)
   781				if err != nil {
   782					// TODO: Do better?
   783					panic(fmt.Sprintf("Could not close iterator on keyDeleted: %v", err))
   784				}
   785				return true
   786			}
   787		}
   788		closeIterator(it, &err)
   789		if err != nil {
   790			// TODO: Do better?
   791			panic(fmt.Sprintf("Could not close iterator on keyDeleted: %v", err))
   792		}
   793		return false
   794	}
   795	
   796	// GetRecentPermanodes sends results to dest filtered by owner, limit, and
   797	// before.  A zero value for before will default to the current time.  The
   798	// results will have duplicates suppressed, with most recent permanode
   799	// returned.
   800	// Note, permanodes more recent than before will still be fetched from the
   801	// index then skipped. This means runtime scales linearly with the number of
   802	// nodes more recent than before.
   803	func (x *Index) GetRecentPermanodes(ctx context.Context, dest chan<- camtypes.RecentPermanode, owner blob.Ref, limit int, before time.Time) (err error) {
   804		defer close(dest)
   805	
   806		keyId, err := x.KeyId(ctx, owner)
   807		if errors.Is(err, sorted.ErrNotFound) {
   808			x.logf("no recent permanodes because keyId for owner %v not found", owner)
   809			return nil
   810		}
   811		if err != nil {
   812			x.logf("error fetching keyId for owner %v: %v", owner, err)
   813			return err
   814		}
   815	
   816		sent := 0
   817		var seenPermanode dupSkipper
   818	
   819		if before.IsZero() {
   820			before = time.Now()
   821		}
   822		// TODO(bradfitz): handle before efficiently. don't use queryPrefix.
   823		it := x.queryPrefix(keyRecentPermanode, keyId)
   824		defer closeIterator(it, &err)
   825		for it.Next() {
   826			permaStr := it.Value()
   827			parts := strings.SplitN(it.Key(), "|", 4)
   828			if len(parts) != 4 {
   829				continue
   830			}
   831			mTime, _ := time.Parse(time.RFC3339, unreverseTimeString(parts[2]))
   832			permaRef, ok := blob.Parse(permaStr)
   833			if !ok {
   834				continue
   835			}
   836			if x.IsDeleted(permaRef) {
   837				continue
   838			}
   839			if seenPermanode.Dup(permaStr) {
   840				continue
   841			}
   842			// Skip entries with an mTime less than or equal to before.
   843			if !mTime.Before(before) {
   844				continue
   845			}
   846			dest <- camtypes.RecentPermanode{
   847				Permanode:   permaRef,
   848				Signer:      owner, // TODO(bradfitz): kinda. usually. for now.
   849				LastModTime: mTime,
   850			}
   851			sent++
   852			if sent == limit {
   853				break
   854			}
   855		}
   856		return nil
   857	}
   858	
   859	func (x *Index) AppendClaims(ctx context.Context, dst []camtypes.Claim, permaNode blob.Ref,
   860		signerFilter string,
   861		attrFilter string) ([]camtypes.Claim, error) {
   862		if x.corpus != nil {
   863			return x.corpus.AppendClaims(ctx, dst, permaNode, signerFilter, attrFilter)
   864		}
   865		var (
   866			err error
   867			it  sorted.Iterator
   868		)
   869		var signerRefs SignerRefSet
   870		if signerFilter != "" {
   871			signerRefs, err = x.signerRefs(ctx, signerFilter)
   872			if err != nil {
   873				return dst, err
   874			}
   875			if len(signerRefs) == 0 {
   876				return dst, nil
   877			}
   878			it = x.queryPrefix(keyPermanodeClaim, permaNode, signerFilter)
   879		} else {
   880			it = x.queryPrefix(keyPermanodeClaim, permaNode)
   881		}
   882		defer closeIterator(it, &err)
   883	
   884		// In the common case, an attribute filter is just a plain
   885		// token ("camliContent") unescaped. If so, fast path that
   886		// check to skip the row before we even split it.
   887		var mustHave string
   888		if attrFilter != "" && urle(attrFilter) == attrFilter {
   889			mustHave = attrFilter
   890		}
   891	
   892		for it.Next() {
   893			val := it.Value()
   894			if mustHave != "" && !strings.Contains(val, mustHave) {
   895				continue
   896			}
   897			cl, ok := kvClaim(it.Key(), val, blob.Parse)
   898			if !ok {
   899				continue
   900			}
   901			if x.IsDeleted(cl.BlobRef) {
   902				continue
   903			}
   904			if attrFilter != "" && cl.Attr != attrFilter {
   905				continue
   906			}
   907			// TODO(mpl): if we ever pass an Owner to AppendClaims, then we could have a
   908			// Matches method on it, that we would use here.
   909			if signerFilter != "" && !signerRefs.blobMatches(cl.Signer) {
   910				continue
   911			}
   912			dst = append(dst, cl)
   913		}
   914		return dst, nil
   915	}
   916	
   917	func kvClaim(k, v string, blobParse func(string) (blob.Ref, bool)) (c camtypes.Claim, ok bool) {
   918		const nKeyPart = 5
   919		const nValPart = 4
   920		var keya [nKeyPart]string
   921		var vala [nValPart]string
   922		keyPart := strutil.AppendSplitN(keya[:0], k, "|", -1)
   923		valPart := strutil.AppendSplitN(vala[:0], v, "|", -1)
   924		if len(keyPart) < nKeyPart || len(valPart) < nValPart {
   925			return
   926		}
   927		signerRef, ok := blobParse(valPart[3])
   928		if !ok {
   929			return
   930		}
   931		permaNode, ok := blobParse(keyPart[1])
   932		if !ok {
   933			return
   934		}
   935		claimRef, ok := blobParse(keyPart[4])
   936		if !ok {
   937			return
   938		}
   939		date, err := time.Parse(time.RFC3339, keyPart[3])
   940		if err != nil {
   941			return
   942		}
   943		return camtypes.Claim{
   944			BlobRef:   claimRef,
   945			Signer:    signerRef,
   946			Permanode: permaNode,
   947			Date:      date,
   948			Type:      urld(valPart[0]),
   949			Attr:      urld(valPart[1]),
   950			Value:     urld(valPart[2]),
   951		}, true
   952	}
   953	
   954	func (x *Index) GetBlobMeta(ctx context.Context, br blob.Ref) (camtypes.BlobMeta, error) {
   955		if x.corpus != nil {
   956			return x.corpus.GetBlobMeta(ctx, br)
   957		}
   958		key := "meta:" + br.String()
   959		meta, err := x.s.Get(key)
   960		if errors.Is(err, sorted.ErrNotFound) {
   961			err = os.ErrNotExist
   962		}
   963		if err != nil {
   964			return camtypes.BlobMeta{}, err
   965		}
   966		pos := strings.Index(meta, "|")
   967		if pos < 0 {
   968			panic(fmt.Sprintf("Bogus index row for key %q: got value %q", key, meta))
   969		}
   970		size, err := strconv.ParseUint(meta[:pos], 10, 32)
   971		if err != nil {
   972			return camtypes.BlobMeta{}, err
   973		}
   974		mime := meta[pos+1:]
   975		return camtypes.BlobMeta{
   976			Ref:       br,
   977			Size:      uint32(size),
   978			CamliType: camliTypeFromMIME(mime),
   979		}, nil
   980	}
   981	
   982	// HasLegacySHA1 reports whether the index has legacy SHA-1 blobs.
   983	func (x *Index) HasLegacySHA1() (ok bool, err error) {
   984		if x.corpus != nil {
   985			return x.corpus.hasLegacySHA1, err
   986		}
   987		it := x.queryPrefix(keyWholeToFileRef, "sha1-")
   988		defer closeIterator(it, &err)
   989		for it.Next() {
   990			return true, err
   991		}
   992		return false, err
   993	}
   994	
   995	func (x *Index) KeyId(ctx context.Context, signer blob.Ref) (string, error) {
   996		if x.corpus != nil {
   997			return x.corpus.KeyId(ctx, signer)
   998		}
   999		return x.s.Get("signerkeyid:" + signer.String())
  1000	}
  1001	
  1002	// signerRefs returns the set of signer blobRefs matching the signer keyID. It
  1003	// does not return an error if none is found.
  1004	func (x *Index) signerRefs(ctx context.Context, keyID string) (SignerRefSet, error) {
  1005		if x.corpus != nil {
  1006			return x.corpus.signerRefs[keyID], nil
  1007		}
  1008		it := x.queryPrefixString(keySignerKeyID.name)
  1009		var err error
  1010		var refs SignerRefSet
  1011		defer closeIterator(it, &err)
  1012		prefix := keySignerKeyID.name + ":"
  1013		for it.Next() {
  1014			if it.Value() == keyID {
  1015				refs = append(refs, strings.TrimPrefix(it.Key(), prefix))
  1016			}
  1017		}
  1018		return refs, nil
  1019	}
  1020	
  1021	func (x *Index) PermanodeOfSignerAttrValue(ctx context.Context, signer blob.Ref, attr, val string) (permaNode blob.Ref, err error) {
  1022		keyId, err := x.KeyId(ctx, signer)
  1023		if errors.Is(err, sorted.ErrNotFound) {
  1024			return blob.Ref{}, os.ErrNotExist
  1025		}
  1026		if err != nil {
  1027			return blob.Ref{}, err
  1028		}
  1029		it := x.queryPrefix(keySignerAttrValue, keyId, attr, val)
  1030		defer closeIterator(it, &err)
  1031		for it.Next() {
  1032			permaRef, ok := blob.Parse(it.Value())
  1033			if ok && !x.IsDeleted(permaRef) {
  1034				return permaRef, nil
  1035			}
  1036		}
  1037		return blob.Ref{}, os.ErrNotExist
  1038	}
  1039	
  1040	// SearchPermanodesWithAttr is just like PermanodeOfSignerAttrValue
  1041	// except we return multiple and dup-suppress.  If request.Query is
  1042	// "", it is not used in the prefix search.
  1043	func (x *Index) SearchPermanodesWithAttr(ctx context.Context, dest chan<- blob.Ref, request *camtypes.PermanodeByAttrRequest) (err error) {
  1044		defer close(dest)
  1045		if request.FuzzyMatch {
  1046			// TODO(bradfitz): remove this for now? figure out how to handle it generically?
  1047			return errors.New("TODO: SearchPermanodesWithAttr: generic indexer doesn't support FuzzyMatch on PermanodeByAttrRequest")
  1048		}
  1049		if request.Attribute == "" {
  1050			return errors.New("index: missing Attribute in SearchPermanodesWithAttr")
  1051		}
  1052		if !IsIndexedAttribute(request.Attribute) {
  1053			return fmt.Errorf("SearchPermanodesWithAttr: called with a non-indexed attribute %q", request.Attribute)
  1054		}
  1055	
  1056		keyId, err := x.KeyId(ctx, request.Signer)
  1057		if errors.Is(err, sorted.ErrNotFound) {
  1058			return nil
  1059		}
  1060		if err != nil {
  1061			return err
  1062		}
  1063		seen := make(map[string]bool)
  1064		var it sorted.Iterator
  1065		if request.Query == "" {
  1066			it = x.queryPrefix(keySignerAttrValue, keyId, request.Attribute)
  1067		} else {
  1068			it = x.queryPrefix(keySignerAttrValue, keyId, request.Attribute, request.Query)
  1069		}
  1070		defer closeIterator(it, &err)
  1071		before := request.At
  1072		if before.IsZero() {
  1073			before = time.Now()
  1074		}
  1075		for it.Next() {
  1076			cl, ok := kvSignerAttrValue(it.Key(), it.Value())
  1077			if !ok {
  1078				continue
  1079			}
  1080			if x.IsDeleted(cl.BlobRef) {
  1081				continue
  1082			}
  1083			if x.IsDeleted(cl.Permanode) {
  1084				continue
  1085			}
  1086			if cl.Date.After(before) {
  1087				continue
  1088			}
  1089			pnstr := cl.Permanode.String()
  1090			if seen[pnstr] {
  1091				continue
  1092			}
  1093			seen[pnstr] = true
  1094	
  1095			dest <- cl.Permanode
  1096			if len(seen) == request.MaxResults {
  1097				break
  1098			}
  1099		}
  1100		return nil
  1101	}
  1102	
  1103	func kvSignerAttrValue(k, v string) (c camtypes.Claim, ok bool) {
  1104		// TODO(bradfitz): garbage
  1105		keyPart := strings.Split(k, "|")
  1106		valPart := strings.Split(v, "|")
  1107		if len(keyPart) != 6 || len(valPart) != 1 {
  1108			// TODO(mpl): use glog
  1109			log.Printf("bogus keySignerAttrValue index entry: %q = %q", k, v)
  1110			return
  1111		}
  1112		if keyPart[0] != "signerattrvalue" {
  1113			return
  1114		}
  1115		date, err := time.Parse(time.RFC3339, unreverseTimeString(keyPart[4]))
  1116		if err != nil {
  1117			log.Printf("bogus time in keySignerAttrValue index entry: %q", keyPart[4])
  1118			return
  1119		}
  1120		claimRef, ok := blob.Parse(keyPart[5])
  1121		if !ok {
  1122			log.Printf("bogus claim in keySignerAttrValue index entry: %q", keyPart[5])
  1123			return
  1124		}
  1125		permaNode, ok := blob.Parse(valPart[0])
  1126		if !ok {
  1127			log.Printf("bogus permanode in keySignerAttrValue index entry: %q", valPart[0])
  1128			return
  1129		}
  1130		return camtypes.Claim{
  1131			BlobRef:   claimRef,
  1132			Permanode: permaNode,
  1133			Date:      date,
  1134			Attr:      urld(keyPart[2]),
  1135			Value:     urld(keyPart[3]),
  1136		}, true
  1137	}
  1138	
  1139	func (x *Index) PathsOfSignerTarget(ctx context.Context, signer, target blob.Ref) (paths []*camtypes.Path, err error) {
  1140		paths = []*camtypes.Path{}
  1141		keyId, err := x.KeyId(ctx, signer)
  1142		if err != nil {
  1143			if errors.Is(err, sorted.ErrNotFound) {
  1144				err = nil
  1145			}
  1146			return
  1147		}
  1148	
  1149		mostRecent := make(map[string]*camtypes.Path)
  1150		maxClaimDates := make(map[string]time.Time)
  1151	
  1152		it := x.queryPrefix(keyPathBackward, keyId, target)
  1153		defer closeIterator(it, &err)
  1154		for it.Next() {
  1155			p, ok, active := kvPathBackward(it.Key(), it.Value())
  1156			if !ok {
  1157				continue
  1158			}
  1159			if x.IsDeleted(p.Claim) {
  1160				continue
  1161			}
  1162			if x.IsDeleted(p.Base) {
  1163				continue
  1164			}
  1165	
  1166			key := p.Base.String() + "/" + p.Suffix
  1167			if p.ClaimDate.After(maxClaimDates[key]) {
  1168				maxClaimDates[key] = p.ClaimDate
  1169				if active {
  1170					mostRecent[key] = &p
  1171				} else {
  1172					delete(mostRecent, key)
  1173				}
  1174			}
  1175		}
  1176		for _, v := range mostRecent {
  1177			paths = append(paths, v)
  1178		}
  1179		return paths, nil
  1180	}
  1181	
  1182	func kvPathBackward(k, v string) (p camtypes.Path, ok bool, active bool) {
  1183		// TODO(bradfitz): garbage
  1184		keyPart := strings.Split(k, "|")
  1185		valPart := strings.Split(v, "|")
  1186		if len(keyPart) != 4 || len(valPart) != 4 {
  1187			// TODO(mpl): use glog
  1188			log.Printf("bogus keyPathBackward index entry: %q = %q", k, v)
  1189			return
  1190		}
  1191		if keyPart[0] != "signertargetpath" {
  1192			return
  1193		}
  1194		target, ok := blob.Parse(keyPart[2])
  1195		if !ok {
  1196			log.Printf("bogus target in keyPathBackward index entry: %q", keyPart[2])
  1197			return
  1198		}
  1199		claim, ok := blob.Parse(keyPart[3])
  1200		if !ok {
  1201			log.Printf("bogus claim in keyPathBackward index entry: %q", keyPart[3])
  1202			return
  1203		}
  1204		date, err := time.Parse(time.RFC3339, valPart[0])
  1205		if err != nil {
  1206			log.Printf("bogus date in keyPathBackward index entry: %q", valPart[0])
  1207			return
  1208		}
  1209		base, ok := blob.Parse(valPart[1])
  1210		if !ok {
  1211			log.Printf("bogus base in keyPathBackward index entry: %q", valPart[1])
  1212			return
  1213		}
  1214		if valPart[2] == "Y" {
  1215			active = true
  1216		}
  1217		return camtypes.Path{
  1218			Claim:     claim,
  1219			Base:      base,
  1220			Target:    target,
  1221			ClaimDate: date,
  1222			Suffix:    urld(valPart[3]),
  1223		}, true, active
  1224	}
  1225	
  1226	func (x *Index) PathsLookup(ctx context.Context, signer, base blob.Ref, suffix string) (paths []*camtypes.Path, err error) {
  1227		paths = []*camtypes.Path{}
  1228		keyId, err := x.KeyId(ctx, signer)
  1229		if err != nil {
  1230			if errors.Is(err, sorted.ErrNotFound) {
  1231				err = nil
  1232			}
  1233			return
  1234		}
  1235	
  1236		it := x.queryPrefix(keyPathForward, keyId, base, suffix)
  1237		defer closeIterator(it, &err)
  1238		for it.Next() {
  1239			p, ok, active := kvPathForward(it.Key(), it.Value())
  1240			if !ok {
  1241				continue
  1242			}
  1243			if x.IsDeleted(p.Claim) {
  1244				continue
  1245			}
  1246			if x.IsDeleted(p.Target) {
  1247				continue
  1248			}
  1249	
  1250			// TODO(bradfitz): investigate what's up with deleted
  1251			// forward path claims here.  Needs docs with the
  1252			// interface too, and tests.
  1253			_ = active
  1254	
  1255			paths = append(paths, &p)
  1256		}
  1257		return
  1258	}
  1259	
  1260	func kvPathForward(k, v string) (p camtypes.Path, ok bool, active bool) {
  1261		// TODO(bradfitz): garbage
  1262		keyPart := strings.Split(k, "|")
  1263		valPart := strings.Split(v, "|")
  1264		if len(keyPart) != 6 || len(valPart) != 2 {
  1265			// TODO(mpl): use glog
  1266			log.Printf("bogus keyPathForward index entry: %q = %q", k, v)
  1267			return
  1268		}
  1269		if keyPart[0] != "path" {
  1270			return
  1271		}
  1272		base, ok := blob.Parse(keyPart[2])
  1273		if !ok {
  1274			log.Printf("bogus base in keyPathForward index entry: %q", keyPart[2])
  1275			return
  1276		}
  1277		date, err := time.Parse(time.RFC3339, unreverseTimeString(keyPart[4]))
  1278		if err != nil {
  1279			log.Printf("bogus date in keyPathForward index entry: %q", keyPart[4])
  1280			return
  1281		}
  1282		claim, ok := blob.Parse(keyPart[5])
  1283		if !ok {
  1284			log.Printf("bogus claim in keyPathForward index entry: %q", keyPart[5])
  1285			return
  1286		}
  1287		if valPart[0] == "Y" {
  1288			active = true
  1289		}
  1290		target, ok := blob.Parse(valPart[1])
  1291		if !ok {
  1292			log.Printf("bogus target in keyPathForward index entry: %q", valPart[1])
  1293			return
  1294		}
  1295		return camtypes.Path{
  1296			Claim:     claim,
  1297			Base:      base,
  1298			Target:    target,
  1299			ClaimDate: date,
  1300			Suffix:    urld(keyPart[3]),
  1301		}, true, active
  1302	}
  1303	
  1304	func (x *Index) PathLookup(ctx context.Context, signer, base blob.Ref, suffix string, at time.Time) (*camtypes.Path, error) {
  1305		paths, err := x.PathsLookup(ctx, signer, base, suffix)
  1306		if err != nil {
  1307			return nil, err
  1308		}
  1309		var (
  1310			newest    = int64(0)
  1311			atSeconds = int64(0)
  1312			best      *camtypes.Path
  1313		)
  1314	
  1315		if !at.IsZero() {
  1316			atSeconds = at.Unix()
  1317		}
  1318	
  1319		for _, path := range paths {
  1320			t := path.ClaimDate
  1321			secs := t.Unix()
  1322			if atSeconds != 0 && secs > atSeconds {
  1323				// Too new
  1324				continue
  1325			}
  1326			if newest > secs {
  1327				// Too old
  1328				continue
  1329			}
  1330			// Just right
  1331			newest, best = secs, path
  1332		}
  1333		if best == nil {
  1334			return nil, os.ErrNotExist
  1335		}
  1336		return best, nil
  1337	}
  1338	
  1339	func (x *Index) existingFileSchemas(wholeRef blob.Ref) (schemaRefs []blob.Ref, err error) {
  1340		it := x.queryPrefix(keyWholeToFileRef, wholeRef)
  1341		defer closeIterator(it, &err)
  1342		for it.Next() {
  1343			keyPart := strings.Split(it.Key(), "|")[1:]
  1344			if len(keyPart) < 2 {
  1345				continue
  1346			}
  1347			ref, ok := blob.Parse(keyPart[1])
  1348			if ok {
  1349				schemaRefs = append(schemaRefs, ref)
  1350			}
  1351		}
  1352		return schemaRefs, nil
  1353	}
  1354	
  1355	// WholeRefToFile maps a file contents blobRef (a "wholeRef"), to the file schemas with those contents.
  1356	type WholeRefToFile map[string][]blob.Ref
  1357	
  1358	// ExistingFileSchemas returns the file schemas for the provided file contents refs.
  1359	func (x *Index) ExistingFileSchemas(wholeRef ...blob.Ref) (WholeRefToFile, error) {
  1360		schemaRefs := make(WholeRefToFile)
  1361		for _, v := range wholeRef {
  1362			newRefs, err := x.existingFileSchemas(v)
  1363			if err != nil {
  1364				return nil, err
  1365			}
  1366			schemaRefs[v.String()] = newRefs
  1367		}
  1368		return schemaRefs, nil
  1369	}
  1370	
  1371	func (x *Index) loadKey(key string, val *string, err *error, wg *sync.WaitGroup) {
  1372		defer wg.Done()
  1373		*val, *err = x.s.Get(key)
  1374	}
  1375	
  1376	func (x *Index) GetFileInfo(ctx context.Context, fileRef blob.Ref) (camtypes.FileInfo, error) {
  1377		if x.corpus != nil {
  1378			return x.corpus.GetFileInfo(ctx, fileRef)
  1379		}
  1380		ikey := "fileinfo|" + fileRef.String()
  1381		tkey := keyFileTimes.name + "|" + fileRef.String()
  1382		// TODO: switch this to use syncutil.Group
  1383		wg := new(sync.WaitGroup)
  1384		wg.Add(2)
  1385		var iv, tv string // info value, time value
  1386		var ierr, terr error
  1387		go x.loadKey(ikey, &iv, &ierr, wg)
  1388		go x.loadKey(tkey, &tv, &terr, wg)
  1389		wg.Wait()
  1390	
  1391		if errors.Is(ierr, sorted.ErrNotFound) {
  1392			return camtypes.FileInfo{}, os.ErrNotExist
  1393		}
  1394		if ierr != nil {
  1395			return camtypes.FileInfo{}, ierr
  1396		}
  1397		valPart := strings.Split(iv, "|")
  1398		if len(valPart) < 3 {
  1399			x.logf("bogus key %q = %q", ikey, iv)
  1400			return camtypes.FileInfo{}, os.ErrNotExist
  1401		}
  1402		var wholeRef blob.Ref
  1403		if len(valPart) >= 4 {
  1404			wholeRef, _ = blob.Parse(valPart[3])
  1405		}
  1406		size, err := strconv.ParseInt(valPart[0], 10, 64)
  1407		if err != nil {
  1408			x.logf("bogus integer at position 0 in key %q = %q", ikey, iv)
  1409			return camtypes.FileInfo{}, os.ErrNotExist
  1410		}
  1411		fileName := urld(valPart[1])
  1412		fi := camtypes.FileInfo{
  1413			Size:     size,
  1414			FileName: fileName,
  1415			MIMEType: urld(valPart[2]),
  1416			WholeRef: wholeRef,
  1417		}
  1418	
  1419		if tv != "" {
  1420			times := strings.Split(urld(tv), ",")
  1421			updateFileInfoTimes(&fi, times)
  1422		}
  1423	
  1424		return fi, nil
  1425	}
  1426	
  1427	func updateFileInfoTimes(fi *camtypes.FileInfo, times []string) {
  1428		if len(times) == 0 {
  1429			return
  1430		}
  1431		fi.Time = types.ParseTime3339OrNil(times[0])
  1432		if len(times) == 2 {
  1433			fi.ModTime = types.ParseTime3339OrNil(times[1])
  1434		}
  1435	}
  1436	
  1437	// v is "width|height"
  1438	func kvImageInfo(v []byte) (ii camtypes.ImageInfo, ok bool) {
  1439		pipei := bytes.IndexByte(v, '|')
  1440		if pipei < 0 {
  1441			return
  1442		}
  1443		w, err := strutil.ParseUintBytes(v[:pipei], 10, 16)
  1444		if err != nil {
  1445			return
  1446		}
  1447		h, err := strutil.ParseUintBytes(v[pipei+1:], 10, 16)
  1448		if err != nil {
  1449			return
  1450		}
  1451		ii.Width = uint16(w)
  1452		ii.Height = uint16(h)
  1453		return ii, true
  1454	}
  1455	
  1456	func (x *Index) GetImageInfo(ctx context.Context, fileRef blob.Ref) (camtypes.ImageInfo, error) {
  1457		if x.corpus != nil {
  1458			return x.corpus.GetImageInfo(ctx, fileRef)
  1459		}
  1460		// it might be that the key does not exist because image.DecodeConfig failed earlier
  1461		// (because of unsupported JPEG features like progressive mode).
  1462		key := keyImageSize.Key(fileRef.String())
  1463		v, err := x.s.Get(key)
  1464		if errors.Is(err, sorted.ErrNotFound) {
  1465			err = os.ErrNotExist
  1466		}
  1467		if err != nil {
  1468			return camtypes.ImageInfo{}, err
  1469		}
  1470		ii, ok := kvImageInfo([]byte(v))
  1471		if !ok {
  1472			return camtypes.ImageInfo{}, fmt.Errorf("index: bogus key %q = %q", key, v)
  1473		}
  1474		return ii, nil
  1475	}
  1476	
  1477	func (x *Index) GetMediaTags(ctx context.Context, fileRef blob.Ref) (tags map[string]string, err error) {
  1478		if x.corpus != nil {
  1479			return x.corpus.GetMediaTags(ctx, fileRef)
  1480		}
  1481		fi, err := x.GetFileInfo(ctx, fileRef)
  1482		if err != nil {
  1483			return nil, err
  1484		}
  1485		it := x.queryPrefix(keyMediaTag, fi.WholeRef.String())
  1486		defer closeIterator(it, &err)
  1487		for it.Next() {
  1488			if tags == nil {
  1489				tags = make(map[string]string)
  1490			}
  1491			tags[it.Key()] = it.Value()
  1492		}
  1493		return tags, nil
  1494	}
  1495	
  1496	func (x *Index) GetFileLocation(ctx context.Context, fileRef blob.Ref) (camtypes.Location, error) {
  1497		if x.corpus != nil {
  1498			lat, long, ok := x.corpus.FileLatLong(fileRef)
  1499			if !ok {
  1500				return camtypes.Location{}, os.ErrNotExist
  1501			}
  1502			// TODO(mpl): Brad says to move this check lower, in corpus func and/or when building corpus from index rows.
  1503			if math.IsNaN(long) || math.IsNaN(lat) {
  1504				return camtypes.Location{}, fmt.Errorf("latitude or longitude in corpus for %v is NaN. Reindex to fix it", fileRef)
  1505			}
  1506			return camtypes.Location{Latitude: lat, Longitude: long}, nil
  1507		}
  1508		fi, err := x.GetFileInfo(ctx, fileRef)
  1509		if err != nil {
  1510			return camtypes.Location{}, err
  1511		}
  1512		it := x.queryPrefixString(keyEXIFGPS.Key(fi.WholeRef.String()))
  1513		defer closeIterator(it, &err)
  1514		if !it.Next() {
  1515			return camtypes.Location{}, os.ErrNotExist
  1516		}
  1517	
  1518		var lat, long float64
  1519		key, v := it.Key(), it.Value()
  1520		pipe := strings.Index(v, "|")
  1521		if pipe < 0 {
  1522			return camtypes.Location{}, fmt.Errorf("index: bogus key %q = %q", key, v)
  1523		}
  1524		lat, err = strconv.ParseFloat(v[:pipe], 64)
  1525		if err != nil {
  1526			return camtypes.Location{}, fmt.Errorf("index: bogus value at position 0 in key %q = %q", key, v)
  1527		}
  1528		long, err = strconv.ParseFloat(v[pipe+1:], 64)
  1529		if err != nil {
  1530			return camtypes.Location{}, fmt.Errorf("index: bogus value at position 1 in key %q = %q", key, v)
  1531		}
  1532		if math.IsNaN(long) || math.IsNaN(lat) {
  1533			return camtypes.Location{}, fmt.Errorf("latitude or longitude in index for %v is NaN. Reindex to fix it", fileRef)
  1534		}
  1535		return camtypes.Location{Latitude: lat, Longitude: long}, nil
  1536	}
  1537	
  1538	func (x *Index) EdgesTo(ref blob.Ref, opts *camtypes.EdgesToOpts) (edges []*camtypes.Edge, err error) {
  1539		it := x.queryPrefix(keyEdgeBackward, ref)
  1540		defer closeIterator(it, &err)
  1541		permanodeParents := make(map[string]*camtypes.Edge)
  1542		for it.Next() {
  1543			edge, ok := kvEdgeBackward(it.Key(), it.Value())
  1544			if !ok {
  1545				continue
  1546			}
  1547			if x.IsDeleted(edge.From) {
  1548				continue
  1549			}
  1550			if x.IsDeleted(edge.BlobRef) {
  1551				continue
  1552			}
  1553			edge.To = ref
  1554			if edge.FromType == schema.TypePermanode {
  1555				permanodeParents[edge.From.String()] = edge
  1556			} else {
  1557				edges = append(edges, edge)
  1558			}
  1559		}
  1560		for _, e := range permanodeParents {
  1561			edges = append(edges, e)
  1562		}
  1563		return edges, nil
  1564	}
  1565	
  1566	func kvEdgeBackward(k, v string) (edge *camtypes.Edge, ok bool) {
  1567		// TODO(bradfitz): garbage
  1568		keyPart := strings.Split(k, "|")
  1569		valPart := strings.Split(v, "|")
  1570		if len(keyPart) != 4 || len(valPart) != 2 {
  1571			// TODO(mpl): use glog
  1572			log.Printf("bogus keyEdgeBackward index entry: %q = %q", k, v)
  1573			return
  1574		}
  1575		if keyPart[0] != "edgeback" {
  1576			return
  1577		}
  1578		parentRef, ok := blob.Parse(keyPart[2])
  1579		if !ok {
  1580			log.Printf("bogus parent in keyEdgeBackward index entry: %q", keyPart[2])
  1581			return
  1582		}
  1583		blobRef, ok := blob.Parse(keyPart[3])
  1584		if !ok {
  1585			log.Printf("bogus blobref in keyEdgeBackward index entry: %q", keyPart[3])
  1586			return
  1587		}
  1588		return &camtypes.Edge{
  1589			From:      parentRef,
  1590			FromType:  schema.CamliType(valPart[0]),
  1591			FromTitle: valPart[1],
  1592			BlobRef:   blobRef,
  1593		}, true
  1594	}
  1595	
  1596	// GetDirMembers sends on dest the children of the static directory dir.
  1597	func (x *Index) GetDirMembers(ctx context.Context, dir blob.Ref, dest chan<- blob.Ref, limit int) (err error) {
  1598		defer close(dest)
  1599	
  1600		sent := 0
  1601		if x.corpus != nil {
  1602			children, err := x.corpus.GetDirChildren(ctx, dir)
  1603			if err != nil {
  1604				return err
  1605			}
  1606			for child := range children {
  1607				dest <- child
  1608				sent++
  1609				if sent == limit {
  1610					break
  1611				}
  1612			}
  1613			return nil
  1614		}
  1615	
  1616		it := x.queryPrefix(keyStaticDirChild, dir.String())
  1617		defer closeIterator(it, &err)
  1618		for it.Next() {
  1619			keyPart := strings.Split(it.Key(), "|")
  1620			if len(keyPart) != 3 {
  1621				return fmt.Errorf("index: bogus key keyStaticDirChild = %q", it.Key())
  1622			}
  1623	
  1624			child, ok := blob.Parse(keyPart[2])
  1625			if !ok {
  1626				continue
  1627			}
  1628			dest <- child
  1629			sent++
  1630			if sent == limit {
  1631				break
  1632			}
  1633		}
  1634		return nil
  1635	}
  1636	
  1637	func kvBlobMeta(k, v string) (bm camtypes.BlobMeta, ok bool) {
  1638		refStr := k[len("meta:"):]
  1639		br, ok := blob.Parse(refStr)
  1640		if !ok {
  1641			return
  1642		}
  1643		pipe := strings.Index(v, "|")
  1644		if pipe < 0 {
  1645			return
  1646		}
  1647		size, err := strconv.ParseUint(v[:pipe], 10, 32)
  1648		if err != nil {
  1649			return
  1650		}
  1651		return camtypes.BlobMeta{
  1652			Ref:       br,
  1653			Size:      uint32(size),
  1654			CamliType: camliTypeFromMIME(v[pipe+1:]),
  1655		}, true
  1656	}
  1657	
  1658	func kvBlobMeta_bytes(k, v []byte) (bm camtypes.BlobMeta, ok bool) {
  1659		ref := k[len("meta:"):]
  1660		br, ok := blob.ParseBytes(ref)
  1661		if !ok {
  1662			return
  1663		}
  1664		pipe := bytes.IndexByte(v, '|')
  1665		if pipe < 0 {
  1666			return
  1667		}
  1668		size, err := strutil.ParseUintBytes(v[:pipe], 10, 32)
  1669		if err != nil {
  1670			return
  1671		}
  1672		return camtypes.BlobMeta{
  1673			Ref:       br,
  1674			Size:      uint32(size),
  1675			CamliType: camliTypeFromMIME_bytes(v[pipe+1:]),
  1676		}, true
  1677	}
  1678	
  1679	func enumerateBlobMeta(s sorted.KeyValue, cb func(camtypes.BlobMeta) error) (err error) {
  1680		it := queryPrefixString(s, "meta:")
  1681		defer closeIterator(it, &err)
  1682		for it.Next() {
  1683			bm, ok := kvBlobMeta(it.Key(), it.Value())
  1684			if !ok {
  1685				continue
  1686			}
  1687			if err := cb(bm); err != nil {
  1688				return err
  1689			}
  1690		}
  1691		return nil
  1692	}
  1693	
  1694	var errStopIteration = errors.New("stop iteration") // local error, doesn't escape this package
  1695	
  1696	// EnumerateBlobMeta calls fn for all known meta blobs.
  1697	// If fn returns false, iteration stops and an nil error is returned.
  1698	// If ctx becomes done, iteration stops and ctx.Err() is returned.
  1699	func (x *Index) EnumerateBlobMeta(ctx context.Context, fn func(camtypes.BlobMeta) bool) error {
  1700		if x.corpus != nil {
  1701			var err error
  1702			var n int
  1703			done := ctx.Done()
  1704			x.corpus.EnumerateBlobMeta(func(m camtypes.BlobMeta) bool {
  1705				// Every so often, check context.
  1706				n++
  1707				if n%256 == 0 {
  1708					select {
  1709					case <-done:
  1710						err = ctx.Err()
  1711						return false
  1712					default:
  1713	
  1714					}
  1715				}
  1716				return fn(m)
  1717			})
  1718			return err
  1719		}
  1720		done := ctx.Done()
  1721		err := enumerateBlobMeta(x.s, func(bm camtypes.BlobMeta) error {
  1722			select {
  1723			case <-done:
  1724				return ctx.Err()
  1725			default:
  1726				if !fn(bm) {
  1727					return errStopIteration
  1728				}
  1729				return nil
  1730			}
  1731		})
  1732		if errors.Is(err, errStopIteration) {
  1733			err = nil
  1734		}
  1735		return err
  1736	}
  1737	
  1738	// Storage returns the index's underlying Storage implementation.
  1739	func (x *Index) Storage() sorted.KeyValue { return x.s }
  1740	
  1741	// Close closes the underlying sorted.KeyValue, if the storage has a Close method.
  1742	// The return value is the return value of the underlying Close, or
  1743	// nil otherwise.
  1744	func (x *Index) Close() error {
  1745		if cl, ok := x.s.(io.Closer); ok {
  1746			return cl.Close()
  1747		}
  1748		return nil
  1749	}
  1750	
  1751	// initNeededMaps initializes x.needs and x.neededBy on start-up.
  1752	func (x *Index) initNeededMaps() (err error) {
  1753		x.deletes = newDeletionCache()
  1754		it := x.queryPrefix(keyMissing)
  1755		defer closeIterator(it, &err)
  1756		for it.Next() {
  1757			key := it.KeyBytes()
  1758			pair := key[len("missing|"):]
  1759			pipe := bytes.IndexByte(pair, '|')
  1760			if pipe < 0 {
  1761				return fmt.Errorf("Bogus missing key %q", key)
  1762			}
  1763			have, ok1 := blob.ParseBytes(pair[:pipe])
  1764			missing, ok2 := blob.ParseBytes(pair[pipe+1:])
  1765			if !ok1 || !ok2 {
  1766				return fmt.Errorf("Bogus missing key %q", key)
  1767			}
  1768			x.noteNeededMemory(have, missing)
  1769		}
  1770		return
  1771	}
  1772	
  1773	func (x *Index) noteNeeded(have, missing blob.Ref) error {
  1774		if err := x.s.Set(keyMissing.Key(have, missing), "1"); err != nil {
  1775			return err
  1776		}
  1777		x.noteNeededMemory(have, missing)
  1778		return nil
  1779	}
  1780	
  1781	func (x *Index) noteNeededMemory(have, missing blob.Ref) {
  1782		x.needs[have] = append(x.needs[have], missing)
  1783		x.neededBy[missing] = append(x.neededBy[missing], have)
  1784	}
  1785	
  1786	const camliTypeMIMEPrefix = "application/json; camliType="
  1787	
  1788	var camliTypeMIMEPrefixBytes = []byte(camliTypeMIMEPrefix)
  1789	
  1790	// "application/json; camliType=file" => "file"
  1791	// "image/gif" => ""
  1792	func camliTypeFromMIME(mime string) schema.CamliType {
  1793		if v := strings.TrimPrefix(mime, camliTypeMIMEPrefix); v != mime {
  1794			return schema.CamliType(v)
  1795		}
  1796		return ""
  1797	}
  1798	
  1799	func camliTypeFromMIME_bytes(mime []byte) schema.CamliType {
  1800		if v := bytes.TrimPrefix(mime, camliTypeMIMEPrefixBytes); len(v) != len(mime) {
  1801			return schema.CamliType(strutil.StringFromBytes(v))
  1802		}
  1803		return ""
  1804	}
  1805	
  1806	// TODO(bradfitz): rename this? This is really about signer-attr-value
  1807	// (PermanodeOfSignerAttrValue), and not about indexed attributes in general.
  1808	func IsIndexedAttribute(attr string) bool {
  1809		switch attr {
  1810		case "camliRoot", "camliImportRoot", "tag", "title":
  1811			return true
  1812		}
  1813		return false
  1814	}
  1815	
  1816	// IsBlobReferenceAttribute returns whether attr is an attribute whose
  1817	// value is a blob reference (e.g. camliMember) and thus something the
  1818	// indexers should keep inverted indexes on for parent/child-type
  1819	// relationships.
  1820	func IsBlobReferenceAttribute(attr string) bool {
  1821		switch attr {
  1822		case "camliMember":
  1823			return true
  1824		}
  1825		return false
  1826	}
  1827	
  1828	func IsFulltextAttribute(attr string) bool {
  1829		switch attr {
  1830		case "tag", "title":
  1831			return true
  1832		}
  1833		return false
  1834	}
Website layout inspired by memcached.
Content by the authors.