Home Download Docs Code Community
     1	/*
     2	Copyright 2013 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	package index
    18	
    19	import (
    20		"bytes"
    21		"context"
    22		"errors"
    23		"fmt"
    24		"log"
    25		"os"
    26		"runtime"
    27		"sort"
    28		"strconv"
    29		"strings"
    30		"sync"
    31		"time"
    32	
    33		"perkeep.org/internal/osutil"
    34		"perkeep.org/pkg/blob"
    35		"perkeep.org/pkg/schema"
    36		"perkeep.org/pkg/schema/nodeattr"
    37		"perkeep.org/pkg/sorted"
    38		"perkeep.org/pkg/types/camtypes"
    39	
    40		"go4.org/strutil"
    41		"go4.org/syncutil"
    42	)
    43	
    44	// Corpus is an in-memory summary of all of a user's blobs' metadata.
    45	//
    46	// A Corpus is not safe for concurrent use. Callers should use Lock or RLock
    47	// on the parent index instead.
    48	type Corpus struct {
    49		// building is true at start while scanning all rows in the
    50		// index.  While building, certain invariants (like things
    51		// being sorted) can be temporarily violated and fixed at the
    52		// end of scan.
    53		building bool
    54	
    55		// hasLegacySHA1 reports whether some SHA-1 blobs are indexed. It is set while
    56		//building the corpus from the initial index scan.
    57		hasLegacySHA1 bool
    58	
    59		// gen is incremented on every blob received.
    60		// It's used as a query cache invalidator.
    61		gen int64
    62	
    63		strs      map[string]string   // interned strings
    64		brOfStr   map[string]blob.Ref // blob.Parse fast path
    65		brInterns int64               // blob.Ref -> blob.Ref, via br method
    66	
    67		blobs        map[blob.Ref]*camtypes.BlobMeta
    68		sumBlobBytes int64
    69	
    70		// camBlobs maps from camliType ("file") to blobref to the meta.
    71		// The value is the same one in blobs.
    72		camBlobs map[schema.CamliType]map[blob.Ref]*camtypes.BlobMeta
    73	
    74		// TODO: add GoLLRB to vendor; keep sorted BlobMeta
    75		keyId signerFromBlobrefMap
    76	
    77		// signerRefs maps a signer GPG ID to all its signer blobs (because different hashes).
    78		signerRefs   map[string]SignerRefSet
    79		files        map[blob.Ref]camtypes.FileInfo // keyed by file or directory schema blob
    80		permanodes   map[blob.Ref]*PermanodeMeta
    81		imageInfo    map[blob.Ref]camtypes.ImageInfo // keyed by fileref (not wholeref)
    82		fileWholeRef map[blob.Ref]blob.Ref           // fileref -> its wholeref (TODO: multi-valued?)
    83		gps          map[blob.Ref]latLong            // wholeRef -> GPS coordinates
    84		// dirChildren maps a directory to its (direct) children (static-set entries).
    85		dirChildren map[blob.Ref]map[blob.Ref]struct{}
    86		// fileParents maps a file or directory to its (direct) parents.
    87		fileParents map[blob.Ref]map[blob.Ref]struct{}
    88	
    89		// Lack of edge tracking implementation is issue #707
    90		// (https://github.com/perkeep/perkeep/issues/707)
    91	
    92		// claimBack allows hopping backwards from a Claim's Value
    93		// when the Value is a blobref.  It allows, for example,
    94		// finding the parents of camliMember claims.  If a permanode
    95		// parent set A has a camliMembers B and C, it allows finding
    96		// A from either B and C.
    97		// The slice is not sorted.
    98		claimBack map[blob.Ref][]*camtypes.Claim
    99	
   100		// TODO: use deletedCache instead?
   101		deletedBy map[blob.Ref]blob.Ref // key is deleted by value
   102		// deletes tracks deletions of claims and permanodes. The key is
   103		// the blobref of a claim or permanode. The values, sorted newest first,
   104		// contain the blobref of the claim responsible for the deletion, as well
   105		// as the date when that deletion happened.
   106		deletes map[blob.Ref][]deletion
   107	
   108		mediaTags map[blob.Ref]map[string]string // wholeref -> "album" -> "foo"
   109	
   110		permanodesByTime    *lazySortedPermanodes // cache of permanodes sorted by creation time.
   111		permanodesByModtime *lazySortedPermanodes // cache of permanodes sorted by modtime.
   112	
   113		// permanodesSetByNodeType maps from a camliNodeType attribute
   114		// value to the set of permanodes that ever had that
   115		// value. The bool is always true.
   116		permanodesSetByNodeType map[string]map[blob.Ref]bool
   117	
   118		// scratch string slice
   119		ss []string
   120	}
   121	
   122	func (c *Corpus) logf(format string, args ...interface{}) {
   123		log.Printf("index/corpus: "+format, args...)
   124	}
   125	
   126	// blobMatches reports whether br is in the set.
   127	func (srs SignerRefSet) blobMatches(br blob.Ref) bool {
   128		for _, v := range srs {
   129			if br.EqualString(v) {
   130				return true
   131			}
   132		}
   133		return false
   134	}
   135	
   136	// signerFromBlobrefMap maps a signer blobRef to the signer's GPG ID (e.g.
   137	// 2931A67C26F5ABDA). It is needed because the signer on a claim is represented by
   138	// its blobRef, but the same signer could have created claims with different hashes
   139	// (e.g. with sha1 and with sha224), so these claims would look as if created by
   140	// different signers (because different blobRefs). signerID thus allows the
   141	// algorithms to rely on the unique GPG ID of a signer instead of the different
   142	// blobRef representations of it. Its value is usually the corpus keyId.
   143	type signerFromBlobrefMap map[blob.Ref]string
   144	
   145	type latLong struct {
   146		lat, long float64
   147	}
   148	
   149	// IsDeleted reports whether the provided blobref (of a permanode or claim) should be considered deleted.
   150	func (c *Corpus) IsDeleted(br blob.Ref) bool {
   151		for _, v := range c.deletes[br] {
   152			if !c.IsDeleted(v.deleter) {
   153				return true
   154			}
   155		}
   156		return false
   157	}
   158	
   159	type PermanodeMeta struct {
   160		Claims []*camtypes.Claim // sorted by camtypes.ClaimsByDate
   161	
   162		attr attrValues // attributes from all signers
   163	
   164		// signer maps a signer's GPG ID (e.g. 2931A67C26F5ABDA) to the attrs for this
   165		// signer.
   166		signer map[string]attrValues
   167	}
   168	
   169	type attrValues map[string][]string
   170	
   171	// cacheAttrClaim applies attribute changes from cl.
   172	func (m attrValues) cacheAttrClaim(cl *camtypes.Claim) {
   173		switch cl.Type {
   174		case string(schema.SetAttributeClaim):
   175			m[cl.Attr] = []string{cl.Value}
   176		case string(schema.AddAttributeClaim):
   177			m[cl.Attr] = append(m[cl.Attr], cl.Value)
   178		case string(schema.DelAttributeClaim):
   179			if cl.Value == "" {
   180				delete(m, cl.Attr)
   181			} else {
   182				a, i := m[cl.Attr], 0
   183				for _, v := range a {
   184					if v != cl.Value {
   185						a[i] = v
   186						i++
   187					}
   188				}
   189				m[cl.Attr] = a[:i]
   190			}
   191		}
   192	}
   193	
   194	// restoreInvariants sorts claims by date and
   195	// recalculates latest attributes.
   196	func (pm *PermanodeMeta) restoreInvariants(signers signerFromBlobrefMap) error {
   197		sort.Sort(camtypes.ClaimPtrsByDate(pm.Claims))
   198		pm.attr = make(attrValues)
   199		pm.signer = make(map[string]attrValues)
   200		for _, cl := range pm.Claims {
   201			if err := pm.appendAttrClaim(cl, signers); err != nil {
   202				return err
   203			}
   204		}
   205		return nil
   206	}
   207	
   208	// fixupLastClaim fixes invariants on the assumption
   209	// that the all but the last element in Claims are sorted by date
   210	// and the last element is the only one not yet included in Attrs.
   211	func (pm *PermanodeMeta) fixupLastClaim(signers signerFromBlobrefMap) error {
   212		if pm.attr != nil {
   213			n := len(pm.Claims)
   214			if n < 2 || camtypes.ClaimPtrsByDate(pm.Claims).Less(n-2, n-1) {
   215				// already sorted, update Attrs from new Claim
   216				return pm.appendAttrClaim(pm.Claims[n-1], signers)
   217			}
   218		}
   219		return pm.restoreInvariants(signers)
   220	}
   221	
   222	// appendAttrClaim stores permanode attributes
   223	// from cl in pm.attr and pm.signer[signerID[cl.Signer]].
   224	// The caller of appendAttrClaim is responsible for calling
   225	// it with claims sorted in camtypes.ClaimPtrsByDate order.
   226	func (pm *PermanodeMeta) appendAttrClaim(cl *camtypes.Claim, signers signerFromBlobrefMap) error {
   227		signer, ok := signers[cl.Signer]
   228		if !ok {
   229			return fmt.Errorf("claim %v has unknown signer %q", cl.BlobRef, cl.Signer)
   230		}
   231		sc, ok := pm.signer[signer]
   232		if !ok {
   233			// Optimize for the case where cl.Signer of all claims are the same.
   234			// Instead of having two identical attrValues copies in
   235			// pm.attr and pm.signer[cl.Signer],
   236			// use a single attrValues
   237			// until there is at least a second signer.
   238			switch len(pm.signer) {
   239			case 0:
   240				// Set up signer cache to reference
   241				// the existing attrValues.
   242				pm.attr.cacheAttrClaim(cl)
   243				pm.signer[signer] = pm.attr
   244				return nil
   245	
   246			case 1:
   247				// pm.signer has exactly one other signer,
   248				// and its attrValues entry references pm.attr.
   249				// Make a copy of pm.attr
   250				// for this other signer now.
   251				m := make(attrValues)
   252				for a, v := range pm.attr {
   253					xv := make([]string, len(v))
   254					copy(xv, v)
   255					m[a] = xv
   256				}
   257	
   258				for sig := range pm.signer {
   259					pm.signer[sig] = m
   260					break
   261				}
   262			}
   263			sc = make(attrValues)
   264			pm.signer[signer] = sc
   265		}
   266	
   267		pm.attr.cacheAttrClaim(cl)
   268	
   269		// Cache claim in sc only if sc != pm.attr.
   270		if len(pm.signer) > 1 {
   271			sc.cacheAttrClaim(cl)
   272		}
   273		return nil
   274	}
   275	
   276	// valuesAtSigner returns an attrValues to query permanode attr values at the
   277	// given time for the signerFilter, which is the GPG ID of a signer (e.g. 2931A67C26F5ABDA).
   278	// It returns (nil, true) if signerFilter is not empty but pm has no
   279	// attributes for it (including if signerFilter is unknown).
   280	// It returns ok == true if v represents attrValues valid for the specified
   281	// parameters.
   282	// It returns (nil, false) if neither pm.attr nor pm.signer should be used for
   283	// the given time, because e.g. some claims are more recent than this time. In
   284	// which case, the caller should resort to querying another source, such as pm.Claims.
   285	// The returned map must not be changed by the caller.
   286	func (pm *PermanodeMeta) valuesAtSigner(at time.Time,
   287		signerFilter string) (v attrValues, ok bool) {
   288	
   289		if pm.attr == nil {
   290			return nil, false
   291		}
   292	
   293		var m attrValues
   294		if signerFilter != "" {
   295			m = pm.signer[signerFilter]
   296			if m == nil {
   297				return nil, true
   298			}
   299		} else {
   300			m = pm.attr
   301		}
   302		if at.IsZero() {
   303			return m, true
   304		}
   305		if n := len(pm.Claims); n == 0 || !pm.Claims[n-1].Date.After(at) {
   306			return m, true
   307		}
   308		return nil, false
   309	}
   310	
   311	func newCorpus() *Corpus {
   312		c := &Corpus{
   313			blobs:                   make(map[blob.Ref]*camtypes.BlobMeta),
   314			camBlobs:                make(map[schema.CamliType]map[blob.Ref]*camtypes.BlobMeta),
   315			files:                   make(map[blob.Ref]camtypes.FileInfo),
   316			permanodes:              make(map[blob.Ref]*PermanodeMeta),
   317			imageInfo:               make(map[blob.Ref]camtypes.ImageInfo),
   318			deletedBy:               make(map[blob.Ref]blob.Ref),
   319			keyId:                   make(map[blob.Ref]string),
   320			signerRefs:              make(map[string]SignerRefSet),
   321			brOfStr:                 make(map[string]blob.Ref),
   322			fileWholeRef:            make(map[blob.Ref]blob.Ref),
   323			gps:                     make(map[blob.Ref]latLong),
   324			mediaTags:               make(map[blob.Ref]map[string]string),
   325			deletes:                 make(map[blob.Ref][]deletion),
   326			claimBack:               make(map[blob.Ref][]*camtypes.Claim),
   327			permanodesSetByNodeType: make(map[string]map[blob.Ref]bool),
   328			dirChildren:             make(map[blob.Ref]map[blob.Ref]struct{}),
   329			fileParents:             make(map[blob.Ref]map[blob.Ref]struct{}),
   330		}
   331		c.permanodesByModtime = &lazySortedPermanodes{
   332			c:      c,
   333			pnTime: c.PermanodeModtime,
   334		}
   335		c.permanodesByTime = &lazySortedPermanodes{
   336			c:      c,
   337			pnTime: c.PermanodeAnyTime,
   338		}
   339		return c
   340	}
   341	
   342	func NewCorpusFromStorage(s sorted.KeyValue) (*Corpus, error) {
   343		if s == nil {
   344			return nil, errors.New("storage is nil")
   345		}
   346		c := newCorpus()
   347		return c, c.scanFromStorage(s)
   348	}
   349	
   350	func (x *Index) KeepInMemory() (*Corpus, error) {
   351		var err error
   352		x.corpus, err = NewCorpusFromStorage(x.s)
   353		return x.corpus, err
   354	}
   355	
   356	// PreventStorageAccessForTesting causes any access to the index's underlying
   357	// Storage interface to panic.
   358	func (x *Index) PreventStorageAccessForTesting() {
   359		x.s = crashStorage{}
   360	}
   361	
   362	type crashStorage struct {
   363		sorted.KeyValue
   364	}
   365	
   366	func (crashStorage) Get(key string) (string, error) {
   367		panic(fmt.Sprintf("unexpected KeyValue.Get(%q) called", key))
   368	}
   369	
   370	func (crashStorage) Find(start, end string) sorted.Iterator {
   371		panic(fmt.Sprintf("unexpected KeyValue.Find(%q, %q) called", start, end))
   372	}
   373	
   374	// *********** Updating the corpus
   375	
   376	var corpusMergeFunc = map[string]func(c *Corpus, k, v []byte) error{
   377		"have":                 nil, // redundant with "meta"
   378		"recpn":                nil, // unneeded.
   379		"meta":                 (*Corpus).mergeMetaRow,
   380		keySignerKeyID.name:    (*Corpus).mergeSignerKeyIdRow,
   381		"claim":                (*Corpus).mergeClaimRow,
   382		"fileinfo":             (*Corpus).mergeFileInfoRow,
   383		keyFileTimes.name:      (*Corpus).mergeFileTimesRow,
   384		"imagesize":            (*Corpus).mergeImageSizeRow,
   385		"wholetofile":          (*Corpus).mergeWholeToFileRow,
   386		"exifgps":              (*Corpus).mergeEXIFGPSRow,
   387		"exiftag":              nil, // not using any for now
   388		"signerattrvalue":      nil, // ignoring for now
   389		"mediatag":             (*Corpus).mergeMediaTag,
   390		keyStaticDirChild.name: (*Corpus).mergeStaticDirChildRow,
   391	}
   392	
   393	func memstats() *runtime.MemStats {
   394		ms := new(runtime.MemStats)
   395		runtime.GC()
   396		runtime.ReadMemStats(ms)
   397		return ms
   398	}
   399	
   400	var logCorpusStats = true // set to false in tests
   401	
   402	var slurpPrefixes = []string{
   403		"meta:", // must be first
   404		keySignerKeyID.name + ":",
   405	
   406		// the first two above are loaded serially first for dependency reasons, whereas
   407		// the others below are loaded concurrently afterwards.
   408		"claim|",
   409		"fileinfo|",
   410		keyFileTimes.name + "|",
   411		"imagesize|",
   412		"wholetofile|",
   413		"exifgps|",
   414		"mediatag|",
   415		keyStaticDirChild.name + "|",
   416	}
   417	
   418	// Key types (without trailing punctuation) that we slurp to memory at start.
   419	var slurpedKeyType = make(map[string]bool)
   420	
   421	func init() {
   422		for _, prefix := range slurpPrefixes {
   423			slurpedKeyType[typeOfKey(prefix)] = true
   424		}
   425	}
   426	
   427	func (c *Corpus) scanFromStorage(s sorted.KeyValue) error {
   428		c.building = true
   429	
   430		var ms0 *runtime.MemStats
   431		if logCorpusStats {
   432			ms0 = memstats()
   433			c.logf("loading into memory...")
   434			c.logf("loading into memory... (1/%d: meta rows)", len(slurpPrefixes))
   435		}
   436	
   437		scanmu := new(sync.Mutex)
   438	
   439		// We do the "meta" rows first, before the prefixes below, because it
   440		// populates the blobs map (used for blobref interning) and the camBlobs
   441		// map (used for hinting the size of other maps)
   442		if err := c.scanPrefix(scanmu, s, "meta:"); err != nil {
   443			return err
   444		}
   445	
   446		// we do the keyIDs first, because they're necessary to properly merge claims
   447		if err := c.scanPrefix(scanmu, s, keySignerKeyID.name+":"); err != nil {
   448			return err
   449		}
   450	
   451		c.files = make(map[blob.Ref]camtypes.FileInfo, len(c.camBlobs[schema.TypeFile]))
   452		c.permanodes = make(map[blob.Ref]*PermanodeMeta, len(c.camBlobs[schema.TypePermanode]))
   453		cpu0 := osutil.CPUUsage()
   454	
   455		var grp syncutil.Group
   456		for i, prefix := range slurpPrefixes[2:] {
   457			if logCorpusStats {
   458				c.logf("loading into memory... (%d/%d: prefix %q)", i+2, len(slurpPrefixes),
   459					prefix[:len(prefix)-1])
   460			}
   461			prefix := prefix
   462			grp.Go(func() error { return c.scanPrefix(scanmu, s, prefix) })
   463		}
   464		if err := grp.Err(); err != nil {
   465			return err
   466		}
   467	
   468		// Post-load optimizations and restoration of invariants.
   469		for _, pm := range c.permanodes {
   470			// Restore invariants violated during building:
   471			if err := pm.restoreInvariants(c.keyId); err != nil {
   472				return err
   473			}
   474	
   475			// And intern some stuff.
   476			for _, cl := range pm.Claims {
   477				cl.BlobRef = c.br(cl.BlobRef)
   478				cl.Signer = c.br(cl.Signer)
   479				cl.Permanode = c.br(cl.Permanode)
   480				cl.Target = c.br(cl.Target)
   481			}
   482		}
   483		c.brOfStr = nil // drop this now.
   484		c.building = false
   485		// log.V(1).Printf("interned blob.Ref = %d", c.brInterns)
   486	
   487		if err := c.initDeletes(s); err != nil {
   488			return fmt.Errorf("Could not populate the corpus deletes: %v", err)
   489		}
   490	
   491		if logCorpusStats {
   492			cpu := osutil.CPUUsage() - cpu0
   493			ms1 := memstats()
   494			memUsed := ms1.Alloc - ms0.Alloc
   495			if ms1.Alloc < ms0.Alloc {
   496				memUsed = 0
   497			}
   498			c.logf("stats: %.3f MiB mem: %d blobs (%.3f GiB) (%d schema (%d permanode, %d file (%d image), ...)",
   499				float64(memUsed)/(1<<20),
   500				len(c.blobs),
   501				float64(c.sumBlobBytes)/(1<<30),
   502				c.numSchemaBlobs(),
   503				len(c.permanodes),
   504				len(c.files),
   505				len(c.imageInfo))
   506			c.logf("scanning CPU usage: %v", cpu)
   507		}
   508	
   509		return nil
   510	}
   511	
   512	// initDeletes populates the corpus deletes from the delete entries in s.
   513	func (c *Corpus) initDeletes(s sorted.KeyValue) (err error) {
   514		it := queryPrefix(s, keyDeleted)
   515		defer closeIterator(it, &err)
   516		for it.Next() {
   517			cl, ok := kvDeleted(it.Key())
   518			if !ok {
   519				return fmt.Errorf("Bogus keyDeleted entry key: want |\"deleted\"|<deleted blobref>|<reverse claimdate>|<deleter claim>|, got %q", it.Key())
   520			}
   521			targetDeletions := append(c.deletes[cl.Target],
   522				deletion{
   523					deleter: cl.BlobRef,
   524					when:    cl.Date,
   525				})
   526			sort.Sort(sort.Reverse(byDeletionDate(targetDeletions)))
   527			c.deletes[cl.Target] = targetDeletions
   528		}
   529		return err
   530	}
   531	
   532	func (c *Corpus) numSchemaBlobs() (n int64) {
   533		for _, m := range c.camBlobs {
   534			n += int64(len(m))
   535		}
   536		return
   537	}
   538	
   539	func (c *Corpus) scanPrefix(mu *sync.Mutex, s sorted.KeyValue, prefix string) (err error) {
   540		typeKey := typeOfKey(prefix)
   541		fn, ok := corpusMergeFunc[typeKey]
   542		if !ok {
   543			panic("No registered merge func for prefix " + prefix)
   544		}
   545	
   546		n, t0 := 0, time.Now()
   547		it := queryPrefixString(s, prefix)
   548		defer closeIterator(it, &err)
   549		for it.Next() {
   550			n++
   551			if n == 1 {
   552				mu.Lock()
   553				defer mu.Unlock()
   554			}
   555			if typeKey == keySignerKeyID.name {
   556				signerBlobRef, ok := blob.Parse(strings.TrimPrefix(it.Key(), keySignerKeyID.name+":"))
   557				if !ok {
   558					c.logf("WARNING: bogus signer blob in %v row: %q", keySignerKeyID.name, it.Key())
   559					continue
   560				}
   561				if err := c.addKeyID(&mutationMap{
   562					signerBlobRef: signerBlobRef,
   563					signerID:      it.Value(),
   564				}); err != nil {
   565					return err
   566				}
   567			} else {
   568				if err := fn(c, it.KeyBytes(), it.ValueBytes()); err != nil {
   569					return err
   570				}
   571			}
   572		}
   573		if logCorpusStats {
   574			d := time.Since(t0)
   575			c.logf("loaded prefix %q: %d rows, %v", prefix[:len(prefix)-1], n, d)
   576		}
   577		return nil
   578	}
   579	
   580	func (c *Corpus) addKeyID(mm *mutationMap) error {
   581		if mm.signerID == "" || !mm.signerBlobRef.Valid() {
   582			return nil
   583		}
   584		id, ok := c.keyId[mm.signerBlobRef]
   585		// only add it if we don't already have it, to save on allocs.
   586		if ok {
   587			if id != mm.signerID {
   588				return fmt.Errorf("GPG ID mismatch for signer %q: refusing to overwrite %v with %v", mm.signerBlobRef, id, mm.signerID)
   589			}
   590			return nil
   591		}
   592		c.signerRefs[mm.signerID] = append(c.signerRefs[mm.signerID], mm.signerBlobRef.String())
   593		return c.mergeSignerKeyIdRow([]byte("signerkeyid:"+mm.signerBlobRef.String()), []byte(mm.signerID))
   594	}
   595	
   596	func (c *Corpus) addBlob(ctx context.Context, br blob.Ref, mm *mutationMap) error {
   597		if _, dup := c.blobs[br]; dup {
   598			return nil
   599		}
   600		c.gen++
   601		// make sure keySignerKeyID is done first before the actual mutations, even
   602		// though it's also going to be done in the loop below.
   603		if err := c.addKeyID(mm); err != nil {
   604			return err
   605		}
   606		for k, v := range mm.kv {
   607			kt := typeOfKey(k)
   608			if kt == keySignerKeyID.name {
   609				// because we already took care of it in addKeyID
   610				continue
   611			}
   612			if !slurpedKeyType[kt] {
   613				continue
   614			}
   615			if err := corpusMergeFunc[kt](c, []byte(k), []byte(v)); err != nil {
   616				return err
   617			}
   618		}
   619		for _, cl := range mm.deletes {
   620			if err := c.updateDeletes(cl); err != nil {
   621				return fmt.Errorf("Could not update the deletes cache after deletion from %v: %v", cl, err)
   622			}
   623		}
   624		return nil
   625	}
   626	
   627	// updateDeletes updates the corpus deletes with the delete claim deleteClaim.
   628	// deleteClaim is trusted to be a valid delete Claim.
   629	func (c *Corpus) updateDeletes(deleteClaim schema.Claim) error {
   630		target := c.br(deleteClaim.Target())
   631		deleter := deleteClaim.Blob()
   632		when, err := deleter.ClaimDate()
   633		if err != nil {
   634			return fmt.Errorf("Could not get date of delete claim %v: %v", deleteClaim, err)
   635		}
   636		del := deletion{
   637			deleter: c.br(deleter.BlobRef()),
   638			when:    when,
   639		}
   640		for _, v := range c.deletes[target] {
   641			if v == del {
   642				return nil
   643			}
   644		}
   645		targetDeletions := append(c.deletes[target], del)
   646		sort.Sort(sort.Reverse(byDeletionDate(targetDeletions)))
   647		c.deletes[target] = targetDeletions
   648		return nil
   649	}
   650	
   651	func (c *Corpus) mergeMetaRow(k, v []byte) error {
   652		bm, ok := kvBlobMeta_bytes(k, v)
   653		if !ok {
   654			return fmt.Errorf("bogus meta row: %q -> %q", k, v)
   655		}
   656		return c.mergeBlobMeta(bm)
   657	}
   658	
   659	func (c *Corpus) mergeBlobMeta(bm camtypes.BlobMeta) error {
   660		if _, dup := c.blobs[bm.Ref]; dup {
   661			panic("dup blob seen")
   662		}
   663		bm.CamliType = schema.CamliType((c.str(string(bm.CamliType))))
   664	
   665		c.blobs[bm.Ref] = &bm
   666		c.sumBlobBytes += int64(bm.Size)
   667		if bm.CamliType != "" {
   668			m, ok := c.camBlobs[bm.CamliType]
   669			if !ok {
   670				m = make(map[blob.Ref]*camtypes.BlobMeta)
   671				c.camBlobs[bm.CamliType] = m
   672			}
   673			m[bm.Ref] = &bm
   674		}
   675		return nil
   676	}
   677	
   678	func (c *Corpus) mergeSignerKeyIdRow(k, v []byte) error {
   679		br, ok := blob.ParseBytes(k[len("signerkeyid:"):])
   680		if !ok {
   681			return fmt.Errorf("bogus signerid row: %q -> %q", k, v)
   682		}
   683		c.keyId[br] = string(v)
   684		return nil
   685	}
   686	
   687	func (c *Corpus) mergeClaimRow(k, v []byte) error {
   688		// TODO: update kvClaim to take []byte instead of string
   689		cl, ok := kvClaim(string(k), string(v), c.blobParse)
   690		if !ok || !cl.Permanode.Valid() {
   691			return fmt.Errorf("bogus claim row: %q -> %q", k, v)
   692		}
   693		cl.Type = c.str(cl.Type)
   694		cl.Attr = c.str(cl.Attr)
   695		cl.Value = c.str(cl.Value) // less likely to intern, but some (tags) do
   696	
   697		pn := c.br(cl.Permanode)
   698		pm, ok := c.permanodes[pn]
   699		if !ok {
   700			pm = new(PermanodeMeta)
   701			c.permanodes[pn] = pm
   702		}
   703		pm.Claims = append(pm.Claims, &cl)
   704		if !c.building {
   705			// Unless we're still starting up (at which we sort at
   706			// the end instead), keep claims sorted and attrs in sync.
   707			if err := pm.fixupLastClaim(c.keyId); err != nil {
   708				return err
   709			}
   710		}
   711	
   712		if vbr, ok := blob.Parse(cl.Value); ok {
   713			c.claimBack[vbr] = append(c.claimBack[vbr], &cl)
   714		}
   715		if cl.Attr == "camliNodeType" {
   716			set := c.permanodesSetByNodeType[cl.Value]
   717			if set == nil {
   718				set = make(map[blob.Ref]bool)
   719				c.permanodesSetByNodeType[cl.Value] = set
   720			}
   721			set[pn] = true
   722		}
   723		return nil
   724	}
   725	
   726	func (c *Corpus) mergeFileInfoRow(k, v []byte) error {
   727		// fileinfo|sha1-579f7f246bd420d486ddeb0dadbb256cfaf8bf6b" "5|some-stuff.txt|"
   728		pipe := bytes.IndexByte(k, '|')
   729		if pipe < 0 {
   730			return fmt.Errorf("unexpected fileinfo key %q", k)
   731		}
   732		br, ok := blob.ParseBytes(k[pipe+1:])
   733		if !ok {
   734			return fmt.Errorf("unexpected fileinfo blobref in key %q", k)
   735		}
   736	
   737		// TODO: could at least use strutil.ParseUintBytes to not stringify and retain
   738		// the length bytes of v.
   739		c.ss = strutil.AppendSplitN(c.ss[:0], string(v), "|", 4)
   740		if len(c.ss) != 3 && len(c.ss) != 4 {
   741			return fmt.Errorf("unexpected fileinfo value %q", v)
   742		}
   743		size, err := strconv.ParseInt(c.ss[0], 10, 64)
   744		if err != nil {
   745			return fmt.Errorf("unexpected fileinfo value %q", v)
   746		}
   747		var wholeRef blob.Ref
   748		if len(c.ss) == 4 && c.ss[3] != "" { // checking for "" because of special files such as symlinks.
   749			var ok bool
   750			wholeRef, ok = blob.Parse(urld(c.ss[3]))
   751			if !ok {
   752				return fmt.Errorf("invalid wholeRef blobref in value %q for fileinfo key %q", v, k)
   753			}
   754		}
   755		c.mutateFileInfo(br, func(fi *camtypes.FileInfo) {
   756			fi.Size = size
   757			fi.FileName = c.str(urld(c.ss[1]))
   758			fi.MIMEType = c.str(urld(c.ss[2]))
   759			fi.WholeRef = wholeRef
   760		})
   761		return nil
   762	}
   763	
   764	func (c *Corpus) mergeStaticDirChildRow(k, v []byte) error {
   765		// dirchild|sha1-dir|sha1-child" "1"
   766		// strip the key name
   767		sk := k[len(keyStaticDirChild.name)+1:]
   768		pipe := bytes.IndexByte(sk, '|')
   769		if pipe < 0 {
   770			return fmt.Errorf("invalid dirchild key %q, missing second pipe", k)
   771		}
   772		parent, ok := blob.ParseBytes(sk[:pipe])
   773		if !ok {
   774			return fmt.Errorf("invalid dirchild parent blobref in key %q", k)
   775		}
   776		child, ok := blob.ParseBytes(sk[pipe+1:])
   777		if !ok {
   778			return fmt.Errorf("invalid dirchild child blobref in key %q", k)
   779		}
   780		parent = c.br(parent)
   781		child = c.br(child)
   782		children, ok := c.dirChildren[parent]
   783		if !ok {
   784			children = make(map[blob.Ref]struct{})
   785		}
   786		children[child] = struct{}{}
   787		c.dirChildren[parent] = children
   788		parents, ok := c.fileParents[child]
   789		if !ok {
   790			parents = make(map[blob.Ref]struct{})
   791		}
   792		parents[parent] = struct{}{}
   793		c.fileParents[child] = parents
   794		return nil
   795	}
   796	
   797	func (c *Corpus) mergeFileTimesRow(k, v []byte) error {
   798		if len(v) == 0 {
   799			return nil
   800		}
   801		// "filetimes|sha1-579f7f246bd420d486ddeb0dadbb256cfaf8bf6b" "1970-01-01T00%3A02%3A03Z"
   802		pipe := bytes.IndexByte(k, '|')
   803		if pipe < 0 {
   804			return fmt.Errorf("unexpected fileinfo key %q", k)
   805		}
   806		br, ok := blob.ParseBytes(k[pipe+1:])
   807		if !ok {
   808			return fmt.Errorf("unexpected filetimes blobref in key %q", k)
   809		}
   810		c.ss = strutil.AppendSplitN(c.ss[:0], urld(string(v)), ",", -1)
   811		times := c.ss
   812		c.mutateFileInfo(br, func(fi *camtypes.FileInfo) {
   813			updateFileInfoTimes(fi, times)
   814		})
   815		return nil
   816	}
   817	
   818	func (c *Corpus) mutateFileInfo(br blob.Ref, fn func(*camtypes.FileInfo)) {
   819		br = c.br(br)
   820		fi := c.files[br] // use zero value if not present
   821		fn(&fi)
   822		c.files[br] = fi
   823	}
   824	
   825	func (c *Corpus) mergeImageSizeRow(k, v []byte) error {
   826		br, okk := blob.ParseBytes(k[len("imagesize|"):])
   827		ii, okv := kvImageInfo(v)
   828		if !okk || !okv {
   829			return fmt.Errorf("bogus row %q = %q", k, v)
   830		}
   831		br = c.br(br)
   832		c.imageInfo[br] = ii
   833		return nil
   834	}
   835	
   836	var sha1Prefix = []byte("sha1-")
   837	
   838	// "wholetofile|sha1-17b53c7c3e664d3613dfdce50ef1f2a09e8f04b5|sha1-fb88f3eab3acfcf3cfc8cd77ae4366f6f975d227" -> "1"
   839	func (c *Corpus) mergeWholeToFileRow(k, v []byte) error {
   840		pair := k[len("wholetofile|"):]
   841		pipe := bytes.IndexByte(pair, '|')
   842		if pipe < 0 {
   843			return fmt.Errorf("bogus row %q = %q", k, v)
   844		}
   845		wholeRef, ok1 := blob.ParseBytes(pair[:pipe])
   846		fileRef, ok2 := blob.ParseBytes(pair[pipe+1:])
   847		if !ok1 || !ok2 {
   848			return fmt.Errorf("bogus row %q = %q", k, v)
   849		}
   850		c.fileWholeRef[fileRef] = wholeRef
   851		if c.building && !c.hasLegacySHA1 {
   852			if bytes.HasPrefix(pair, sha1Prefix) {
   853				c.hasLegacySHA1 = true
   854			}
   855		}
   856		return nil
   857	}
   858	
   859	// "mediatag|sha1-2b219be9d9691b4f8090e7ee2690098097f59566|album" = "Some+Album+Name"
   860	func (c *Corpus) mergeMediaTag(k, v []byte) error {
   861		f := strings.Split(string(k), "|")
   862		if len(f) != 3 {
   863			return fmt.Errorf("unexpected key %q", k)
   864		}
   865		wholeRef, ok := blob.Parse(f[1])
   866		if !ok {
   867			return fmt.Errorf("failed to parse wholeref from key %q", k)
   868		}
   869		tm, ok := c.mediaTags[wholeRef]
   870		if !ok {
   871			tm = make(map[string]string)
   872			c.mediaTags[wholeRef] = tm
   873		}
   874		tm[c.str(f[2])] = c.str(urld(string(v)))
   875		return nil
   876	}
   877	
   878	// "exifgps|sha1-17b53c7c3e664d3613dfdce50ef1f2a09e8f04b5" -> "-122.39897155555556|37.61952208333334"
   879	func (c *Corpus) mergeEXIFGPSRow(k, v []byte) error {
   880		wholeRef, ok := blob.ParseBytes(k[len("exifgps|"):])
   881		pipe := bytes.IndexByte(v, '|')
   882		if pipe < 0 || !ok {
   883			return fmt.Errorf("bogus row %q = %q", k, v)
   884		}
   885		lat, err := strconv.ParseFloat(string(v[:pipe]), 64)
   886		long, err1 := strconv.ParseFloat(string(v[pipe+1:]), 64)
   887		if err != nil || err1 != nil {
   888			if err != nil {
   889				log.Printf("index: bogus latitude in value of row %q = %q", k, v)
   890			} else {
   891				log.Printf("index: bogus longitude in value of row %q = %q", k, v)
   892			}
   893			return nil
   894		}
   895		c.gps[wholeRef] = latLong{lat, long}
   896		return nil
   897	}
   898	
   899	// This enables the blob.Parse fast path cache, which reduces CPU (via
   900	// reduced GC from new garbage), but increases memory usage, even
   901	// though it shouldn't.  The GC should fully discard the brOfStr map
   902	// (which we nil out at the end of parsing), but the Go GC doesn't
   903	// seem to clear it all.
   904	// TODO: investigate / file bugs.
   905	const useBlobParseCache = false
   906	
   907	func (c *Corpus) blobParse(v string) (br blob.Ref, ok bool) {
   908		if useBlobParseCache {
   909			br, ok = c.brOfStr[v]
   910			if ok {
   911				return
   912			}
   913		}
   914		return blob.Parse(v)
   915	}
   916	
   917	// str returns s, interned.
   918	func (c *Corpus) str(s string) string {
   919		if s == "" {
   920			return ""
   921		}
   922		if s, ok := c.strs[s]; ok {
   923			return s
   924		}
   925		if c.strs == nil {
   926			c.strs = make(map[string]string)
   927		}
   928		c.strs[s] = s
   929		return s
   930	}
   931	
   932	// br returns br, interned.
   933	func (c *Corpus) br(br blob.Ref) blob.Ref {
   934		if bm, ok := c.blobs[br]; ok {
   935			c.brInterns++
   936			return bm.Ref
   937		}
   938		return br
   939	}
   940	
   941	// *********** Reading from the corpus
   942	
   943	// EnumerateCamliBlobs calls fn for all known meta blobs.
   944	//
   945	// If camType is not empty, it specifies a filter for which meta blob
   946	// types to call fn for. If empty, all are emitted.
   947	//
   948	// If fn returns false, iteration ends.
   949	func (c *Corpus) EnumerateCamliBlobs(camType schema.CamliType, fn func(camtypes.BlobMeta) bool) {
   950		if camType != "" {
   951			for _, bm := range c.camBlobs[camType] {
   952				if !fn(*bm) {
   953					return
   954				}
   955			}
   956			return
   957		}
   958		for _, m := range c.camBlobs {
   959			for _, bm := range m {
   960				if !fn(*bm) {
   961					return
   962				}
   963			}
   964		}
   965	}
   966	
   967	// EnumerateBlobMeta calls fn for all known meta blobs in an undefined
   968	// order.
   969	// If fn returns false, iteration ends.
   970	func (c *Corpus) EnumerateBlobMeta(fn func(camtypes.BlobMeta) bool) {
   971		for _, bm := range c.blobs {
   972			if !fn(*bm) {
   973				return
   974			}
   975		}
   976	}
   977	
   978	// pnAndTime is a value type wrapping a permanode blobref and its modtime.
   979	// It's used by EnumeratePermanodesLastModified and EnumeratePermanodesCreated.
   980	type pnAndTime struct {
   981		pn blob.Ref
   982		t  time.Time
   983	}
   984	
   985	type byPermanodeTime []pnAndTime
   986	
   987	func (s byPermanodeTime) Len() int      { return len(s) }
   988	func (s byPermanodeTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
   989	func (s byPermanodeTime) Less(i, j int) bool {
   990		if s[i].t.Equal(s[j].t) {
   991			return s[i].pn.Less(s[j].pn)
   992		}
   993		return s[i].t.Before(s[j].t)
   994	}
   995	
   996	type lazySortedPermanodes struct {
   997		c      *Corpus
   998		pnTime func(blob.Ref) (time.Time, bool) // returns permanode's time (if any) to sort on
   999	
  1000		mu                  sync.Mutex  // guards sortedCache and ofGen
  1001		sortedCache         []pnAndTime // nil if invalidated
  1002		sortedCacheReversed []pnAndTime // nil if invalidated
  1003		ofGen               int64       // the Corpus.gen from which sortedCache was built
  1004	}
  1005	
  1006	func reversedCopy(original []pnAndTime) []pnAndTime {
  1007		l := len(original)
  1008		reversed := make([]pnAndTime, l)
  1009		for k, v := range original {
  1010			reversed[l-1-k] = v
  1011		}
  1012		return reversed
  1013	}
  1014	
  1015	func (lsp *lazySortedPermanodes) sorted(reverse bool) []pnAndTime {
  1016		lsp.mu.Lock()
  1017		defer lsp.mu.Unlock()
  1018		if lsp.ofGen == lsp.c.gen {
  1019			// corpus hasn't changed -> caches are still valid, if they exist.
  1020			if reverse {
  1021				if lsp.sortedCacheReversed != nil {
  1022					return lsp.sortedCacheReversed
  1023				}
  1024				if lsp.sortedCache != nil {
  1025					// using sortedCache to quickly build sortedCacheReversed
  1026					lsp.sortedCacheReversed = reversedCopy(lsp.sortedCache)
  1027					return lsp.sortedCacheReversed
  1028				}
  1029			}
  1030			if !reverse {
  1031				if lsp.sortedCache != nil {
  1032					return lsp.sortedCache
  1033				}
  1034				if lsp.sortedCacheReversed != nil {
  1035					// using sortedCacheReversed to quickly build sortedCache
  1036					lsp.sortedCache = reversedCopy(lsp.sortedCacheReversed)
  1037					return lsp.sortedCache
  1038				}
  1039			}
  1040		}
  1041		// invalidate the caches
  1042		lsp.sortedCache = nil
  1043		lsp.sortedCacheReversed = nil
  1044		pns := make([]pnAndTime, 0, len(lsp.c.permanodes))
  1045		for pn := range lsp.c.permanodes {
  1046			if lsp.c.IsDeleted(pn) {
  1047				continue
  1048			}
  1049			if pt, ok := lsp.pnTime(pn); ok {
  1050				pns = append(pns, pnAndTime{pn, pt})
  1051			}
  1052		}
  1053		// and rebuild one of them
  1054		if reverse {
  1055			sort.Sort(sort.Reverse(byPermanodeTime(pns)))
  1056			lsp.sortedCacheReversed = pns
  1057		} else {
  1058			sort.Sort(byPermanodeTime(pns))
  1059			lsp.sortedCache = pns
  1060		}
  1061		lsp.ofGen = lsp.c.gen
  1062		return pns
  1063	}
  1064	
  1065	func (c *Corpus) enumeratePermanodes(fn func(camtypes.BlobMeta) bool, pns []pnAndTime) {
  1066		for _, cand := range pns {
  1067			bm := c.blobs[cand.pn]
  1068			if bm == nil {
  1069				continue
  1070			}
  1071			if !fn(*bm) {
  1072				return
  1073			}
  1074		}
  1075	}
  1076	
  1077	// EnumeratePermanodesLastModified calls fn for all permanodes, sorted by most recently modified first.
  1078	// Iteration ends prematurely if fn returns false.
  1079	func (c *Corpus) EnumeratePermanodesLastModified(fn func(camtypes.BlobMeta) bool) {
  1080		c.enumeratePermanodes(fn, c.permanodesByModtime.sorted(true))
  1081	}
  1082	
  1083	// EnumeratePermanodesCreated calls fn for all permanodes.
  1084	// They are sorted using the contents creation date if any, the permanode modtime
  1085	// otherwise, and in the order specified by newestFirst.
  1086	// Iteration ends prematurely if fn returns false.
  1087	func (c *Corpus) EnumeratePermanodesCreated(fn func(camtypes.BlobMeta) bool, newestFirst bool) {
  1088		c.enumeratePermanodes(fn, c.permanodesByTime.sorted(newestFirst))
  1089	}
  1090	
  1091	// EnumerateSingleBlob calls fn with br's BlobMeta if br exists in the corpus.
  1092	func (c *Corpus) EnumerateSingleBlob(fn func(camtypes.BlobMeta) bool, br blob.Ref) {
  1093		if bm := c.blobs[br]; bm != nil {
  1094			fn(*bm)
  1095		}
  1096	}
  1097	
  1098	// EnumeratePermanodesByNodeTypes enumerates over all permanodes that might
  1099	// have one of the provided camliNodeType values, calling fn for each. If fn returns false,
  1100	// enumeration ends.
  1101	func (c *Corpus) EnumeratePermanodesByNodeTypes(fn func(camtypes.BlobMeta) bool, camliNodeTypes []string) {
  1102		for _, t := range camliNodeTypes {
  1103			set := c.permanodesSetByNodeType[t]
  1104			for br := range set {
  1105				if bm := c.blobs[br]; bm != nil {
  1106					if !fn(*bm) {
  1107						return
  1108					}
  1109				}
  1110			}
  1111		}
  1112	}
  1113	
  1114	func (c *Corpus) GetBlobMeta(ctx context.Context, br blob.Ref) (camtypes.BlobMeta, error) {
  1115		bm, ok := c.blobs[br]
  1116		if !ok {
  1117			return camtypes.BlobMeta{}, os.ErrNotExist
  1118		}
  1119		return *bm, nil
  1120	}
  1121	
  1122	func (c *Corpus) KeyId(ctx context.Context, signer blob.Ref) (string, error) {
  1123		if v, ok := c.keyId[signer]; ok {
  1124			return v, nil
  1125		}
  1126		return "", sorted.ErrNotFound
  1127	}
  1128	
  1129	func (c *Corpus) pnTimeAttr(pn blob.Ref, attr string) (t time.Time, ok bool) {
  1130		if v := c.PermanodeAttrValue(pn, attr, time.Time{}, ""); v != "" {
  1131			if t, err := time.Parse(time.RFC3339, v); err == nil {
  1132				return t, true
  1133			}
  1134		}
  1135		return
  1136	}
  1137	
  1138	// PermanodeTime returns the time of the content in permanode.
  1139	func (c *Corpus) PermanodeTime(pn blob.Ref) (t time.Time, ok bool) {
  1140		// TODO(bradfitz): keep this time property cached on the permanode / files
  1141		// TODO(bradfitz): finish implementing all these
  1142	
  1143		// Priorities:
  1144		// -- Permanode explicit "camliTime" property
  1145		// -- EXIF GPS time
  1146		// -- Exif camera time - this one is actually already in the FileInfo,
  1147		// because we use schema.FileTime (which returns the EXIF time, if available)
  1148		// to index the time when receiving a file.
  1149		// -- File time
  1150		// -- File modtime
  1151		// -- camliContent claim set time
  1152	
  1153		if t, ok = c.pnTimeAttr(pn, nodeattr.PaymentDueDate); ok {
  1154			return
  1155		}
  1156		if t, ok = c.pnTimeAttr(pn, nodeattr.StartDate); ok {
  1157			return
  1158		}
  1159		if t, ok = c.pnTimeAttr(pn, nodeattr.DateCreated); ok {
  1160			return
  1161		}
  1162		var fi camtypes.FileInfo
  1163		ccRef, ccTime, ok := c.pnCamliContent(pn)
  1164		if ok {
  1165			fi, _ = c.files[ccRef]
  1166		}
  1167		if fi.Time != nil {
  1168			return time.Time(*fi.Time), true
  1169		}
  1170	
  1171		if t, ok = c.pnTimeAttr(pn, nodeattr.DatePublished); ok {
  1172			return
  1173		}
  1174		if t, ok = c.pnTimeAttr(pn, nodeattr.DateModified); ok {
  1175			return
  1176		}
  1177		if fi.ModTime != nil {
  1178			return time.Time(*fi.ModTime), true
  1179		}
  1180		if ok {
  1181			return ccTime, true
  1182		}
  1183		return time.Time{}, false
  1184	}
  1185	
  1186	// PermanodeAnyTime returns the time that best qualifies the permanode.
  1187	// It tries content-specific times first, the permanode modtime otherwise.
  1188	func (c *Corpus) PermanodeAnyTime(pn blob.Ref) (t time.Time, ok bool) {
  1189		if t, ok := c.PermanodeTime(pn); ok {
  1190			return t, ok
  1191		}
  1192		return c.PermanodeModtime(pn)
  1193	}
  1194	
  1195	func (c *Corpus) pnCamliContent(pn blob.Ref) (cc blob.Ref, t time.Time, ok bool) {
  1196		// TODO(bradfitz): keep this property cached
  1197		pm, ok := c.permanodes[pn]
  1198		if !ok {
  1199			return
  1200		}
  1201		for _, cl := range pm.Claims {
  1202			if cl.Attr != "camliContent" {
  1203				continue
  1204			}
  1205			// TODO: pass down the 'PermanodeConstraint.At' parameter, and then do: if cl.Date.After(at) { continue }
  1206			switch cl.Type {
  1207			case string(schema.DelAttributeClaim):
  1208				cc = blob.Ref{}
  1209				t = time.Time{}
  1210			case string(schema.SetAttributeClaim):
  1211				cc = blob.ParseOrZero(cl.Value)
  1212				t = cl.Date
  1213			}
  1214		}
  1215		return cc, t, cc.Valid()
  1216	
  1217	}
  1218	
  1219	// PermanodeModtime returns the latest modification time of the given
  1220	// permanode.
  1221	//
  1222	// The ok value is true only if the permanode is known and has any
  1223	// non-deleted claims. A deleted claim is ignored and neither its
  1224	// claim date nor the date of the delete claim affect the modtime of
  1225	// the permanode.
  1226	func (c *Corpus) PermanodeModtime(pn blob.Ref) (t time.Time, ok bool) {
  1227		pm, ok := c.permanodes[pn]
  1228		if !ok {
  1229			return
  1230		}
  1231	
  1232		// Note: We intentionally don't try to derive any information
  1233		// (except the owner, elsewhere) from the permanode blob
  1234		// itself. Even though the permanode blob sometimes has the
  1235		// GPG signature time, we intentionally ignore it.
  1236		for _, cl := range pm.Claims {
  1237			if c.IsDeleted(cl.BlobRef) {
  1238				continue
  1239			}
  1240			if cl.Date.After(t) {
  1241				t = cl.Date
  1242			}
  1243		}
  1244		return t, !t.IsZero()
  1245	}
  1246	
  1247	// PermanodeAttrValue returns a single-valued attribute or "".
  1248	// signerFilter, if set, should be the GPG ID of a signer
  1249	// (e.g. 2931A67C26F5ABDA).
  1250	func (c *Corpus) PermanodeAttrValue(permaNode blob.Ref,
  1251		attr string,
  1252		at time.Time,
  1253		signerFilter string) string {
  1254		pm, ok := c.permanodes[permaNode]
  1255		if !ok {
  1256			return ""
  1257		}
  1258		var signerRefs SignerRefSet
  1259		if signerFilter != "" {
  1260			signerRefs, ok = c.signerRefs[signerFilter]
  1261			if !ok {
  1262				return ""
  1263			}
  1264		}
  1265	
  1266		if values, ok := pm.valuesAtSigner(at, signerFilter); ok {
  1267			v := values[attr]
  1268			if len(v) == 0 {
  1269				return ""
  1270			}
  1271			return v[0]
  1272		}
  1273	
  1274		return claimPtrsAttrValue(pm.Claims, attr, at, signerRefs)
  1275	}
  1276	
  1277	// permanodeAttrsOrClaims returns the best available source
  1278	// to query attr values of permaNode at the given time
  1279	// for the signerID, which is either:
  1280	// a. m that represents attr values for the parameters, or
  1281	// b. all claims of the permanode.
  1282	// Only one of m or claims will be non-nil.
  1283	//
  1284	// (m, nil) is returned if m represents attrValues
  1285	// valid for the specified parameters.
  1286	//
  1287	// (nil, claims) is returned if
  1288	// no cached attribute map is valid for the given time,
  1289	// because e.g. some claims are more recent than this time. In which
  1290	// case the caller should resort to query claims directly.
  1291	//
  1292	// (nil, nil) is returned if the permaNode does not exist,
  1293	// or permaNode exists and signerID is valid,
  1294	// but permaNode has no attributes for it.
  1295	//
  1296	// The returned values must not be changed by the caller.
  1297	func (c *Corpus) permanodeAttrsOrClaims(permaNode blob.Ref,
  1298		at time.Time, signerID string) (m map[string][]string, claims []*camtypes.Claim) {
  1299	
  1300		pm, ok := c.permanodes[permaNode]
  1301		if !ok {
  1302			return nil, nil
  1303		}
  1304	
  1305		m, ok = pm.valuesAtSigner(at, signerID)
  1306		if ok {
  1307			return m, nil
  1308		}
  1309		return nil, pm.Claims
  1310	}
  1311	
  1312	// AppendPermanodeAttrValues appends to dst all the values for the attribute
  1313	// attr set on permaNode.
  1314	// signerFilter, if set, should be the GPG ID of a signer (e.g. 2931A67C26F5ABDA).
  1315	// dst must start with length 0 (laziness, mostly)
  1316	func (c *Corpus) AppendPermanodeAttrValues(dst []string,
  1317		permaNode blob.Ref,
  1318		attr string,
  1319		at time.Time,
  1320		signerFilter string) []string {
  1321		if len(dst) > 0 {
  1322			panic("len(dst) must be 0")
  1323		}
  1324		pm, ok := c.permanodes[permaNode]
  1325		if !ok {
  1326			return dst
  1327		}
  1328		var signerRefs SignerRefSet
  1329		if signerFilter != "" {
  1330			signerRefs, ok = c.signerRefs[signerFilter]
  1331			if !ok {
  1332				return dst
  1333			}
  1334		}
  1335		if values, ok := pm.valuesAtSigner(at, signerFilter); ok {
  1336			return append(dst, values[attr]...)
  1337		}
  1338		if at.IsZero() {
  1339			at = time.Now()
  1340		}
  1341		for _, cl := range pm.Claims {
  1342			if cl.Attr != attr || cl.Date.After(at) {
  1343				continue
  1344			}
  1345			if len(signerRefs) > 0 && !signerRefs.blobMatches(cl.Signer) {
  1346				continue
  1347			}
  1348			switch cl.Type {
  1349			case string(schema.DelAttributeClaim):
  1350				if cl.Value == "" {
  1351					dst = dst[:0] // delete all
  1352				} else {
  1353					for i := 0; i < len(dst); i++ {
  1354						v := dst[i]
  1355						if v == cl.Value {
  1356							copy(dst[i:], dst[i+1:])
  1357							dst = dst[:len(dst)-1]
  1358							i--
  1359						}
  1360					}
  1361				}
  1362			case string(schema.SetAttributeClaim):
  1363				dst = append(dst[:0], cl.Value)
  1364			case string(schema.AddAttributeClaim):
  1365				dst = append(dst, cl.Value)
  1366			}
  1367		}
  1368		return dst
  1369	}
  1370	
  1371	func (c *Corpus) AppendClaims(ctx context.Context, dst []camtypes.Claim, permaNode blob.Ref,
  1372		signerFilter string,
  1373		attrFilter string) ([]camtypes.Claim, error) {
  1374		pm, ok := c.permanodes[permaNode]
  1375		if !ok {
  1376			return nil, nil
  1377		}
  1378	
  1379		var signerRefs SignerRefSet
  1380		if signerFilter != "" {
  1381			signerRefs, ok = c.signerRefs[signerFilter]
  1382			if !ok {
  1383				return dst, nil
  1384			}
  1385		}
  1386	
  1387		for _, cl := range pm.Claims {
  1388			if c.IsDeleted(cl.BlobRef) {
  1389				continue
  1390			}
  1391	
  1392			if len(signerRefs) > 0 && !signerRefs.blobMatches(cl.Signer) {
  1393				continue
  1394			}
  1395	
  1396			if attrFilter != "" && cl.Attr != attrFilter {
  1397				continue
  1398			}
  1399			dst = append(dst, *cl)
  1400		}
  1401		return dst, nil
  1402	}
  1403	
  1404	func (c *Corpus) GetFileInfo(ctx context.Context, fileRef blob.Ref) (fi camtypes.FileInfo, err error) {
  1405		fi, ok := c.files[fileRef]
  1406		if !ok {
  1407			err = os.ErrNotExist
  1408		}
  1409		return
  1410	}
  1411	
  1412	// GetDirChildren returns the direct children (static-set entries) of the directory dirRef.
  1413	// It only returns an error if dirRef does not exist.
  1414	func (c *Corpus) GetDirChildren(ctx context.Context, dirRef blob.Ref) (map[blob.Ref]struct{}, error) {
  1415		children, ok := c.dirChildren[dirRef]
  1416		if !ok {
  1417			if _, ok := c.files[dirRef]; !ok {
  1418				return nil, os.ErrNotExist
  1419			}
  1420			return nil, nil
  1421		}
  1422		return children, nil
  1423	}
  1424	
  1425	// GetParentDirs returns the direct parents (directories) of the file or directory childRef.
  1426	// It only returns an error if childRef does not exist.
  1427	func (c *Corpus) GetParentDirs(ctx context.Context, childRef blob.Ref) (map[blob.Ref]struct{}, error) {
  1428		parents, ok := c.fileParents[childRef]
  1429		if !ok {
  1430			if _, ok := c.files[childRef]; !ok {
  1431				return nil, os.ErrNotExist
  1432			}
  1433			return nil, nil
  1434		}
  1435		return parents, nil
  1436	}
  1437	
  1438	func (c *Corpus) GetImageInfo(ctx context.Context, fileRef blob.Ref) (ii camtypes.ImageInfo, err error) {
  1439		ii, ok := c.imageInfo[fileRef]
  1440		if !ok {
  1441			err = os.ErrNotExist
  1442		}
  1443		return
  1444	}
  1445	
  1446	func (c *Corpus) GetMediaTags(ctx context.Context, fileRef blob.Ref) (map[string]string, error) {
  1447		wholeRef, ok := c.fileWholeRef[fileRef]
  1448		if !ok {
  1449			return nil, os.ErrNotExist
  1450		}
  1451		tags, ok := c.mediaTags[wholeRef]
  1452		if !ok {
  1453			return nil, os.ErrNotExist
  1454		}
  1455		return tags, nil
  1456	}
  1457	
  1458	func (c *Corpus) GetWholeRef(ctx context.Context, fileRef blob.Ref) (wholeRef blob.Ref, ok bool) {
  1459		wholeRef, ok = c.fileWholeRef[fileRef]
  1460		return
  1461	}
  1462	
  1463	func (c *Corpus) FileLatLong(fileRef blob.Ref) (lat, long float64, ok bool) {
  1464		wholeRef, ok := c.fileWholeRef[fileRef]
  1465		if !ok {
  1466			return
  1467		}
  1468		ll, ok := c.gps[wholeRef]
  1469		if !ok {
  1470			return
  1471		}
  1472		return ll.lat, ll.long, true
  1473	}
  1474	
  1475	// ForeachClaim calls fn for each claim of permaNode.
  1476	// If at is zero, all claims are yielded.
  1477	// If at is non-zero, claims after that point are skipped.
  1478	// If fn returns false, iteration ends.
  1479	// Iteration is in an undefined order.
  1480	func (c *Corpus) ForeachClaim(permaNode blob.Ref, at time.Time, fn func(*camtypes.Claim) bool) {
  1481		pm, ok := c.permanodes[permaNode]
  1482		if !ok {
  1483			return
  1484		}
  1485		for _, cl := range pm.Claims {
  1486			if !at.IsZero() && cl.Date.After(at) {
  1487				continue
  1488			}
  1489			if !fn(cl) {
  1490				return
  1491			}
  1492		}
  1493	}
  1494	
  1495	// ForeachClaimBack calls fn for each claim with a value referencing br.
  1496	// If at is zero, all claims are yielded.
  1497	// If at is non-zero, claims after that point are skipped.
  1498	// If fn returns false, iteration ends.
  1499	// Iteration is in an undefined order.
  1500	func (c *Corpus) ForeachClaimBack(value blob.Ref, at time.Time, fn func(*camtypes.Claim) bool) {
  1501		for _, cl := range c.claimBack[value] {
  1502			if !at.IsZero() && cl.Date.After(at) {
  1503				continue
  1504			}
  1505			if !fn(cl) {
  1506				return
  1507			}
  1508		}
  1509	}
  1510	
  1511	// PermanodeHasAttrValue reports whether the permanode pn at
  1512	// time at (zero means now) has the given attribute with the given
  1513	// value. If the attribute is multi-valued, any may match.
  1514	func (c *Corpus) PermanodeHasAttrValue(pn blob.Ref, at time.Time, attr, val string) bool {
  1515		pm, ok := c.permanodes[pn]
  1516		if !ok {
  1517			return false
  1518		}
  1519		if values, ok := pm.valuesAtSigner(at, ""); ok {
  1520			for _, v := range values[attr] {
  1521				if v == val {
  1522					return true
  1523				}
  1524			}
  1525			return false
  1526		}
  1527		if at.IsZero() {
  1528			at = time.Now()
  1529		}
  1530		ret := false
  1531		for _, cl := range pm.Claims {
  1532			if cl.Attr != attr {
  1533				continue
  1534			}
  1535			if cl.Date.After(at) {
  1536				break
  1537			}
  1538			switch cl.Type {
  1539			case string(schema.DelAttributeClaim):
  1540				if cl.Value == "" || cl.Value == val {
  1541					ret = false
  1542				}
  1543			case string(schema.SetAttributeClaim):
  1544				ret = (cl.Value == val)
  1545			case string(schema.AddAttributeClaim):
  1546				if cl.Value == val {
  1547					ret = true
  1548				}
  1549			}
  1550		}
  1551		return ret
  1552	}
  1553	
  1554	// SetVerboseCorpusLogging controls corpus setup verbosity. It's on by default
  1555	// but used to disable verbose logging in tests.
  1556	func SetVerboseCorpusLogging(v bool) {
  1557		logCorpusStats = v
  1558	}
Website layout inspired by memcached.
Content by the authors.