1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package index
18
19 import (
20 "bytes"
21 "context"
22 "errors"
23 "fmt"
24 "log"
25 "os"
26 "runtime"
27 "sort"
28 "strconv"
29 "strings"
30 "sync"
31 "time"
32
33 "perkeep.org/internal/osutil"
34 "perkeep.org/pkg/blob"
35 "perkeep.org/pkg/schema"
36 "perkeep.org/pkg/schema/nodeattr"
37 "perkeep.org/pkg/sorted"
38 "perkeep.org/pkg/types/camtypes"
39
40 "go4.org/strutil"
41 "go4.org/syncutil"
42 )
43
44
45
46
47
48 type Corpus struct {
49
50
51
52
53 building bool
54
55
56
57 hasLegacySHA1 bool
58
59
60
61 gen int64
62
63 strs map[string]string
64 brOfStr map[string]blob.Ref
65 brInterns int64
66
67 blobs map[blob.Ref]*camtypes.BlobMeta
68 sumBlobBytes int64
69
70
71
72 camBlobs map[schema.CamliType]map[blob.Ref]*camtypes.BlobMeta
73
74
75 keyId signerFromBlobrefMap
76
77
78 signerRefs map[string]SignerRefSet
79 files map[blob.Ref]camtypes.FileInfo
80 permanodes map[blob.Ref]*PermanodeMeta
81 imageInfo map[blob.Ref]camtypes.ImageInfo
82 fileWholeRef map[blob.Ref]blob.Ref
83 gps map[blob.Ref]latLong
84
85 dirChildren map[blob.Ref]map[blob.Ref]struct{}
86
87 fileParents map[blob.Ref]map[blob.Ref]struct{}
88
89
90
91
92
93
94
95
96
97
98 claimBack map[blob.Ref][]*camtypes.Claim
99
100
101 deletedBy map[blob.Ref]blob.Ref
102
103
104
105
106 deletes map[blob.Ref][]deletion
107
108 mediaTags map[blob.Ref]map[string]string
109
110 permanodesByTime *lazySortedPermanodes
111 permanodesByModtime *lazySortedPermanodes
112
113
114
115
116 permanodesSetByNodeType map[string]map[blob.Ref]bool
117
118
119 ss []string
120 }
121
122 func (c *Corpus) logf(format string, args ...interface{}) {
123 log.Printf("index/corpus: "+format, args...)
124 }
125
126
127 func (srs SignerRefSet) blobMatches(br blob.Ref) bool {
128 for _, v := range srs {
129 if br.EqualString(v) {
130 return true
131 }
132 }
133 return false
134 }
135
136
137
138
139
140
141
142
143 type signerFromBlobrefMap map[blob.Ref]string
144
145 type latLong struct {
146 lat, long float64
147 }
148
149
150 func (c *Corpus) IsDeleted(br blob.Ref) bool {
151 for _, v := range c.deletes[br] {
152 if !c.IsDeleted(v.deleter) {
153 return true
154 }
155 }
156 return false
157 }
158
159 type PermanodeMeta struct {
160 Claims []*camtypes.Claim
161
162 attr attrValues
163
164
165
166 signer map[string]attrValues
167 }
168
169 type attrValues map[string][]string
170
171
172 func (m attrValues) cacheAttrClaim(cl *camtypes.Claim) {
173 switch cl.Type {
174 case string(schema.SetAttributeClaim):
175 m[cl.Attr] = []string{cl.Value}
176 case string(schema.AddAttributeClaim):
177 m[cl.Attr] = append(m[cl.Attr], cl.Value)
178 case string(schema.DelAttributeClaim):
179 if cl.Value == "" {
180 delete(m, cl.Attr)
181 } else {
182 a, i := m[cl.Attr], 0
183 for _, v := range a {
184 if v != cl.Value {
185 a[i] = v
186 i++
187 }
188 }
189 m[cl.Attr] = a[:i]
190 }
191 }
192 }
193
194
195
196 func (pm *PermanodeMeta) restoreInvariants(signers signerFromBlobrefMap) error {
197 sort.Sort(camtypes.ClaimPtrsByDate(pm.Claims))
198 pm.attr = make(attrValues)
199 pm.signer = make(map[string]attrValues)
200 for _, cl := range pm.Claims {
201 if err := pm.appendAttrClaim(cl, signers); err != nil {
202 return err
203 }
204 }
205 return nil
206 }
207
208
209
210
211 func (pm *PermanodeMeta) fixupLastClaim(signers signerFromBlobrefMap) error {
212 if pm.attr != nil {
213 n := len(pm.Claims)
214 if n < 2 || camtypes.ClaimPtrsByDate(pm.Claims).Less(n-2, n-1) {
215
216 return pm.appendAttrClaim(pm.Claims[n-1], signers)
217 }
218 }
219 return pm.restoreInvariants(signers)
220 }
221
222
223
224
225
226 func (pm *PermanodeMeta) appendAttrClaim(cl *camtypes.Claim, signers signerFromBlobrefMap) error {
227 signer, ok := signers[cl.Signer]
228 if !ok {
229 return fmt.Errorf("claim %v has unknown signer %q", cl.BlobRef, cl.Signer)
230 }
231 sc, ok := pm.signer[signer]
232 if !ok {
233
234
235
236
237
238 switch len(pm.signer) {
239 case 0:
240
241
242 pm.attr.cacheAttrClaim(cl)
243 pm.signer[signer] = pm.attr
244 return nil
245
246 case 1:
247
248
249
250
251 m := make(attrValues)
252 for a, v := range pm.attr {
253 xv := make([]string, len(v))
254 copy(xv, v)
255 m[a] = xv
256 }
257
258 for sig := range pm.signer {
259 pm.signer[sig] = m
260 break
261 }
262 }
263 sc = make(attrValues)
264 pm.signer[signer] = sc
265 }
266
267 pm.attr.cacheAttrClaim(cl)
268
269
270 if len(pm.signer) > 1 {
271 sc.cacheAttrClaim(cl)
272 }
273 return nil
274 }
275
276
277
278
279
280
281
282
283
284
285
286 func (pm *PermanodeMeta) valuesAtSigner(at time.Time,
287 signerFilter string) (v attrValues, ok bool) {
288
289 if pm.attr == nil {
290 return nil, false
291 }
292
293 var m attrValues
294 if signerFilter != "" {
295 m = pm.signer[signerFilter]
296 if m == nil {
297 return nil, true
298 }
299 } else {
300 m = pm.attr
301 }
302 if at.IsZero() {
303 return m, true
304 }
305 if n := len(pm.Claims); n == 0 || !pm.Claims[n-1].Date.After(at) {
306 return m, true
307 }
308 return nil, false
309 }
310
311 func newCorpus() *Corpus {
312 c := &Corpus{
313 blobs: make(map[blob.Ref]*camtypes.BlobMeta),
314 camBlobs: make(map[schema.CamliType]map[blob.Ref]*camtypes.BlobMeta),
315 files: make(map[blob.Ref]camtypes.FileInfo),
316 permanodes: make(map[blob.Ref]*PermanodeMeta),
317 imageInfo: make(map[blob.Ref]camtypes.ImageInfo),
318 deletedBy: make(map[blob.Ref]blob.Ref),
319 keyId: make(map[blob.Ref]string),
320 signerRefs: make(map[string]SignerRefSet),
321 brOfStr: make(map[string]blob.Ref),
322 fileWholeRef: make(map[blob.Ref]blob.Ref),
323 gps: make(map[blob.Ref]latLong),
324 mediaTags: make(map[blob.Ref]map[string]string),
325 deletes: make(map[blob.Ref][]deletion),
326 claimBack: make(map[blob.Ref][]*camtypes.Claim),
327 permanodesSetByNodeType: make(map[string]map[blob.Ref]bool),
328 dirChildren: make(map[blob.Ref]map[blob.Ref]struct{}),
329 fileParents: make(map[blob.Ref]map[blob.Ref]struct{}),
330 }
331 c.permanodesByModtime = &lazySortedPermanodes{
332 c: c,
333 pnTime: c.PermanodeModtime,
334 }
335 c.permanodesByTime = &lazySortedPermanodes{
336 c: c,
337 pnTime: c.PermanodeAnyTime,
338 }
339 return c
340 }
341
342 func NewCorpusFromStorage(s sorted.KeyValue) (*Corpus, error) {
343 if s == nil {
344 return nil, errors.New("storage is nil")
345 }
346 c := newCorpus()
347 return c, c.scanFromStorage(s)
348 }
349
350 func (x *Index) KeepInMemory() (*Corpus, error) {
351 var err error
352 x.corpus, err = NewCorpusFromStorage(x.s)
353 return x.corpus, err
354 }
355
356
357
358 func (x *Index) PreventStorageAccessForTesting() {
359 x.s = crashStorage{}
360 }
361
362 type crashStorage struct {
363 sorted.KeyValue
364 }
365
366 func (crashStorage) Get(key string) (string, error) {
367 panic(fmt.Sprintf("unexpected KeyValue.Get(%q) called", key))
368 }
369
370 func (crashStorage) Find(start, end string) sorted.Iterator {
371 panic(fmt.Sprintf("unexpected KeyValue.Find(%q, %q) called", start, end))
372 }
373
374
375
376 var corpusMergeFunc = map[string]func(c *Corpus, k, v []byte) error{
377 "have": nil,
378 "recpn": nil,
379 "meta": (*Corpus).mergeMetaRow,
380 keySignerKeyID.name: (*Corpus).mergeSignerKeyIdRow,
381 "claim": (*Corpus).mergeClaimRow,
382 "fileinfo": (*Corpus).mergeFileInfoRow,
383 keyFileTimes.name: (*Corpus).mergeFileTimesRow,
384 "imagesize": (*Corpus).mergeImageSizeRow,
385 "wholetofile": (*Corpus).mergeWholeToFileRow,
386 "exifgps": (*Corpus).mergeEXIFGPSRow,
387 "exiftag": nil,
388 "signerattrvalue": nil,
389 "mediatag": (*Corpus).mergeMediaTag,
390 keyStaticDirChild.name: (*Corpus).mergeStaticDirChildRow,
391 }
392
393 func memstats() *runtime.MemStats {
394 ms := new(runtime.MemStats)
395 runtime.GC()
396 runtime.ReadMemStats(ms)
397 return ms
398 }
399
400 var logCorpusStats = true
401
402 var slurpPrefixes = []string{
403 "meta:",
404 keySignerKeyID.name + ":",
405
406
407
408 "claim|",
409 "fileinfo|",
410 keyFileTimes.name + "|",
411 "imagesize|",
412 "wholetofile|",
413 "exifgps|",
414 "mediatag|",
415 keyStaticDirChild.name + "|",
416 }
417
418
419 var slurpedKeyType = make(map[string]bool)
420
421 func init() {
422 for _, prefix := range slurpPrefixes {
423 slurpedKeyType[typeOfKey(prefix)] = true
424 }
425 }
426
427 func (c *Corpus) scanFromStorage(s sorted.KeyValue) error {
428 c.building = true
429
430 var ms0 *runtime.MemStats
431 if logCorpusStats {
432 ms0 = memstats()
433 c.logf("loading into memory...")
434 c.logf("loading into memory... (1/%d: meta rows)", len(slurpPrefixes))
435 }
436
437 scanmu := new(sync.Mutex)
438
439
440
441
442 if err := c.scanPrefix(scanmu, s, "meta:"); err != nil {
443 return err
444 }
445
446
447 if err := c.scanPrefix(scanmu, s, keySignerKeyID.name+":"); err != nil {
448 return err
449 }
450
451 c.files = make(map[blob.Ref]camtypes.FileInfo, len(c.camBlobs[schema.TypeFile]))
452 c.permanodes = make(map[blob.Ref]*PermanodeMeta, len(c.camBlobs[schema.TypePermanode]))
453 cpu0 := osutil.CPUUsage()
454
455 var grp syncutil.Group
456 for i, prefix := range slurpPrefixes[2:] {
457 if logCorpusStats {
458 c.logf("loading into memory... (%d/%d: prefix %q)", i+2, len(slurpPrefixes),
459 prefix[:len(prefix)-1])
460 }
461 prefix := prefix
462 grp.Go(func() error { return c.scanPrefix(scanmu, s, prefix) })
463 }
464 if err := grp.Err(); err != nil {
465 return err
466 }
467
468
469 for _, pm := range c.permanodes {
470
471 if err := pm.restoreInvariants(c.keyId); err != nil {
472 return err
473 }
474
475
476 for _, cl := range pm.Claims {
477 cl.BlobRef = c.br(cl.BlobRef)
478 cl.Signer = c.br(cl.Signer)
479 cl.Permanode = c.br(cl.Permanode)
480 cl.Target = c.br(cl.Target)
481 }
482 }
483 c.brOfStr = nil
484 c.building = false
485
486
487 if err := c.initDeletes(s); err != nil {
488 return fmt.Errorf("Could not populate the corpus deletes: %v", err)
489 }
490
491 if logCorpusStats {
492 cpu := osutil.CPUUsage() - cpu0
493 ms1 := memstats()
494 memUsed := ms1.Alloc - ms0.Alloc
495 if ms1.Alloc < ms0.Alloc {
496 memUsed = 0
497 }
498 c.logf("stats: %.3f MiB mem: %d blobs (%.3f GiB) (%d schema (%d permanode, %d file (%d image), ...)",
499 float64(memUsed)/(1<<20),
500 len(c.blobs),
501 float64(c.sumBlobBytes)/(1<<30),
502 c.numSchemaBlobs(),
503 len(c.permanodes),
504 len(c.files),
505 len(c.imageInfo))
506 c.logf("scanning CPU usage: %v", cpu)
507 }
508
509 return nil
510 }
511
512
513 func (c *Corpus) initDeletes(s sorted.KeyValue) (err error) {
514 it := queryPrefix(s, keyDeleted)
515 defer closeIterator(it, &err)
516 for it.Next() {
517 cl, ok := kvDeleted(it.Key())
518 if !ok {
519 return fmt.Errorf("Bogus keyDeleted entry key: want |\"deleted\"|<deleted blobref>|<reverse claimdate>|<deleter claim>|, got %q", it.Key())
520 }
521 targetDeletions := append(c.deletes[cl.Target],
522 deletion{
523 deleter: cl.BlobRef,
524 when: cl.Date,
525 })
526 sort.Sort(sort.Reverse(byDeletionDate(targetDeletions)))
527 c.deletes[cl.Target] = targetDeletions
528 }
529 return err
530 }
531
532 func (c *Corpus) numSchemaBlobs() (n int64) {
533 for _, m := range c.camBlobs {
534 n += int64(len(m))
535 }
536 return
537 }
538
539 func (c *Corpus) scanPrefix(mu *sync.Mutex, s sorted.KeyValue, prefix string) (err error) {
540 typeKey := typeOfKey(prefix)
541 fn, ok := corpusMergeFunc[typeKey]
542 if !ok {
543 panic("No registered merge func for prefix " + prefix)
544 }
545
546 n, t0 := 0, time.Now()
547 it := queryPrefixString(s, prefix)
548 defer closeIterator(it, &err)
549 for it.Next() {
550 n++
551 if n == 1 {
552 mu.Lock()
553 defer mu.Unlock()
554 }
555 if typeKey == keySignerKeyID.name {
556 signerBlobRef, ok := blob.Parse(strings.TrimPrefix(it.Key(), keySignerKeyID.name+":"))
557 if !ok {
558 c.logf("WARNING: bogus signer blob in %v row: %q", keySignerKeyID.name, it.Key())
559 continue
560 }
561 if err := c.addKeyID(&mutationMap{
562 signerBlobRef: signerBlobRef,
563 signerID: it.Value(),
564 }); err != nil {
565 return err
566 }
567 } else {
568 if err := fn(c, it.KeyBytes(), it.ValueBytes()); err != nil {
569 return err
570 }
571 }
572 }
573 if logCorpusStats {
574 d := time.Since(t0)
575 c.logf("loaded prefix %q: %d rows, %v", prefix[:len(prefix)-1], n, d)
576 }
577 return nil
578 }
579
580 func (c *Corpus) addKeyID(mm *mutationMap) error {
581 if mm.signerID == "" || !mm.signerBlobRef.Valid() {
582 return nil
583 }
584 id, ok := c.keyId[mm.signerBlobRef]
585
586 if ok {
587 if id != mm.signerID {
588 return fmt.Errorf("GPG ID mismatch for signer %q: refusing to overwrite %v with %v", mm.signerBlobRef, id, mm.signerID)
589 }
590 return nil
591 }
592 c.signerRefs[mm.signerID] = append(c.signerRefs[mm.signerID], mm.signerBlobRef.String())
593 return c.mergeSignerKeyIdRow([]byte("signerkeyid:"+mm.signerBlobRef.String()), []byte(mm.signerID))
594 }
595
596 func (c *Corpus) addBlob(ctx context.Context, br blob.Ref, mm *mutationMap) error {
597 if _, dup := c.blobs[br]; dup {
598 return nil
599 }
600 c.gen++
601
602
603 if err := c.addKeyID(mm); err != nil {
604 return err
605 }
606 for k, v := range mm.kv {
607 kt := typeOfKey(k)
608 if kt == keySignerKeyID.name {
609
610 continue
611 }
612 if !slurpedKeyType[kt] {
613 continue
614 }
615 if err := corpusMergeFunc[kt](c, []byte(k), []byte(v)); err != nil {
616 return err
617 }
618 }
619 for _, cl := range mm.deletes {
620 if err := c.updateDeletes(cl); err != nil {
621 return fmt.Errorf("Could not update the deletes cache after deletion from %v: %v", cl, err)
622 }
623 }
624 return nil
625 }
626
627
628
629 func (c *Corpus) updateDeletes(deleteClaim schema.Claim) error {
630 target := c.br(deleteClaim.Target())
631 deleter := deleteClaim.Blob()
632 when, err := deleter.ClaimDate()
633 if err != nil {
634 return fmt.Errorf("Could not get date of delete claim %v: %v", deleteClaim, err)
635 }
636 del := deletion{
637 deleter: c.br(deleter.BlobRef()),
638 when: when,
639 }
640 for _, v := range c.deletes[target] {
641 if v == del {
642 return nil
643 }
644 }
645 targetDeletions := append(c.deletes[target], del)
646 sort.Sort(sort.Reverse(byDeletionDate(targetDeletions)))
647 c.deletes[target] = targetDeletions
648 return nil
649 }
650
651 func (c *Corpus) mergeMetaRow(k, v []byte) error {
652 bm, ok := kvBlobMeta_bytes(k, v)
653 if !ok {
654 return fmt.Errorf("bogus meta row: %q -> %q", k, v)
655 }
656 return c.mergeBlobMeta(bm)
657 }
658
659 func (c *Corpus) mergeBlobMeta(bm camtypes.BlobMeta) error {
660 if _, dup := c.blobs[bm.Ref]; dup {
661 panic("dup blob seen")
662 }
663 bm.CamliType = schema.CamliType((c.str(string(bm.CamliType))))
664
665 c.blobs[bm.Ref] = &bm
666 c.sumBlobBytes += int64(bm.Size)
667 if bm.CamliType != "" {
668 m, ok := c.camBlobs[bm.CamliType]
669 if !ok {
670 m = make(map[blob.Ref]*camtypes.BlobMeta)
671 c.camBlobs[bm.CamliType] = m
672 }
673 m[bm.Ref] = &bm
674 }
675 return nil
676 }
677
678 func (c *Corpus) mergeSignerKeyIdRow(k, v []byte) error {
679 br, ok := blob.ParseBytes(k[len("signerkeyid:"):])
680 if !ok {
681 return fmt.Errorf("bogus signerid row: %q -> %q", k, v)
682 }
683 c.keyId[br] = string(v)
684 return nil
685 }
686
687 func (c *Corpus) mergeClaimRow(k, v []byte) error {
688
689 cl, ok := kvClaim(string(k), string(v), c.blobParse)
690 if !ok || !cl.Permanode.Valid() {
691 return fmt.Errorf("bogus claim row: %q -> %q", k, v)
692 }
693 cl.Type = c.str(cl.Type)
694 cl.Attr = c.str(cl.Attr)
695 cl.Value = c.str(cl.Value)
696
697 pn := c.br(cl.Permanode)
698 pm, ok := c.permanodes[pn]
699 if !ok {
700 pm = new(PermanodeMeta)
701 c.permanodes[pn] = pm
702 }
703 pm.Claims = append(pm.Claims, &cl)
704 if !c.building {
705
706
707 if err := pm.fixupLastClaim(c.keyId); err != nil {
708 return err
709 }
710 }
711
712 if vbr, ok := blob.Parse(cl.Value); ok {
713 c.claimBack[vbr] = append(c.claimBack[vbr], &cl)
714 }
715 if cl.Attr == "camliNodeType" {
716 set := c.permanodesSetByNodeType[cl.Value]
717 if set == nil {
718 set = make(map[blob.Ref]bool)
719 c.permanodesSetByNodeType[cl.Value] = set
720 }
721 set[pn] = true
722 }
723 return nil
724 }
725
726 func (c *Corpus) mergeFileInfoRow(k, v []byte) error {
727
728 pipe := bytes.IndexByte(k, '|')
729 if pipe < 0 {
730 return fmt.Errorf("unexpected fileinfo key %q", k)
731 }
732 br, ok := blob.ParseBytes(k[pipe+1:])
733 if !ok {
734 return fmt.Errorf("unexpected fileinfo blobref in key %q", k)
735 }
736
737
738
739 c.ss = strutil.AppendSplitN(c.ss[:0], string(v), "|", 4)
740 if len(c.ss) != 3 && len(c.ss) != 4 {
741 return fmt.Errorf("unexpected fileinfo value %q", v)
742 }
743 size, err := strconv.ParseInt(c.ss[0], 10, 64)
744 if err != nil {
745 return fmt.Errorf("unexpected fileinfo value %q", v)
746 }
747 var wholeRef blob.Ref
748 if len(c.ss) == 4 && c.ss[3] != "" {
749 var ok bool
750 wholeRef, ok = blob.Parse(urld(c.ss[3]))
751 if !ok {
752 return fmt.Errorf("invalid wholeRef blobref in value %q for fileinfo key %q", v, k)
753 }
754 }
755 c.mutateFileInfo(br, func(fi *camtypes.FileInfo) {
756 fi.Size = size
757 fi.FileName = c.str(urld(c.ss[1]))
758 fi.MIMEType = c.str(urld(c.ss[2]))
759 fi.WholeRef = wholeRef
760 })
761 return nil
762 }
763
764 func (c *Corpus) mergeStaticDirChildRow(k, v []byte) error {
765
766
767 sk := k[len(keyStaticDirChild.name)+1:]
768 pipe := bytes.IndexByte(sk, '|')
769 if pipe < 0 {
770 return fmt.Errorf("invalid dirchild key %q, missing second pipe", k)
771 }
772 parent, ok := blob.ParseBytes(sk[:pipe])
773 if !ok {
774 return fmt.Errorf("invalid dirchild parent blobref in key %q", k)
775 }
776 child, ok := blob.ParseBytes(sk[pipe+1:])
777 if !ok {
778 return fmt.Errorf("invalid dirchild child blobref in key %q", k)
779 }
780 parent = c.br(parent)
781 child = c.br(child)
782 children, ok := c.dirChildren[parent]
783 if !ok {
784 children = make(map[blob.Ref]struct{})
785 }
786 children[child] = struct{}{}
787 c.dirChildren[parent] = children
788 parents, ok := c.fileParents[child]
789 if !ok {
790 parents = make(map[blob.Ref]struct{})
791 }
792 parents[parent] = struct{}{}
793 c.fileParents[child] = parents
794 return nil
795 }
796
797 func (c *Corpus) mergeFileTimesRow(k, v []byte) error {
798 if len(v) == 0 {
799 return nil
800 }
801
802 pipe := bytes.IndexByte(k, '|')
803 if pipe < 0 {
804 return fmt.Errorf("unexpected fileinfo key %q", k)
805 }
806 br, ok := blob.ParseBytes(k[pipe+1:])
807 if !ok {
808 return fmt.Errorf("unexpected filetimes blobref in key %q", k)
809 }
810 c.ss = strutil.AppendSplitN(c.ss[:0], urld(string(v)), ",", -1)
811 times := c.ss
812 c.mutateFileInfo(br, func(fi *camtypes.FileInfo) {
813 updateFileInfoTimes(fi, times)
814 })
815 return nil
816 }
817
818 func (c *Corpus) mutateFileInfo(br blob.Ref, fn func(*camtypes.FileInfo)) {
819 br = c.br(br)
820 fi := c.files[br]
821 fn(&fi)
822 c.files[br] = fi
823 }
824
825 func (c *Corpus) mergeImageSizeRow(k, v []byte) error {
826 br, okk := blob.ParseBytes(k[len("imagesize|"):])
827 ii, okv := kvImageInfo(v)
828 if !okk || !okv {
829 return fmt.Errorf("bogus row %q = %q", k, v)
830 }
831 br = c.br(br)
832 c.imageInfo[br] = ii
833 return nil
834 }
835
836 var sha1Prefix = []byte("sha1-")
837
838
839 func (c *Corpus) mergeWholeToFileRow(k, v []byte) error {
840 pair := k[len("wholetofile|"):]
841 pipe := bytes.IndexByte(pair, '|')
842 if pipe < 0 {
843 return fmt.Errorf("bogus row %q = %q", k, v)
844 }
845 wholeRef, ok1 := blob.ParseBytes(pair[:pipe])
846 fileRef, ok2 := blob.ParseBytes(pair[pipe+1:])
847 if !ok1 || !ok2 {
848 return fmt.Errorf("bogus row %q = %q", k, v)
849 }
850 c.fileWholeRef[fileRef] = wholeRef
851 if c.building && !c.hasLegacySHA1 {
852 if bytes.HasPrefix(pair, sha1Prefix) {
853 c.hasLegacySHA1 = true
854 }
855 }
856 return nil
857 }
858
859
860 func (c *Corpus) mergeMediaTag(k, v []byte) error {
861 f := strings.Split(string(k), "|")
862 if len(f) != 3 {
863 return fmt.Errorf("unexpected key %q", k)
864 }
865 wholeRef, ok := blob.Parse(f[1])
866 if !ok {
867 return fmt.Errorf("failed to parse wholeref from key %q", k)
868 }
869 tm, ok := c.mediaTags[wholeRef]
870 if !ok {
871 tm = make(map[string]string)
872 c.mediaTags[wholeRef] = tm
873 }
874 tm[c.str(f[2])] = c.str(urld(string(v)))
875 return nil
876 }
877
878
879 func (c *Corpus) mergeEXIFGPSRow(k, v []byte) error {
880 wholeRef, ok := blob.ParseBytes(k[len("exifgps|"):])
881 pipe := bytes.IndexByte(v, '|')
882 if pipe < 0 || !ok {
883 return fmt.Errorf("bogus row %q = %q", k, v)
884 }
885 lat, err := strconv.ParseFloat(string(v[:pipe]), 64)
886 long, err1 := strconv.ParseFloat(string(v[pipe+1:]), 64)
887 if err != nil || err1 != nil {
888 if err != nil {
889 log.Printf("index: bogus latitude in value of row %q = %q", k, v)
890 } else {
891 log.Printf("index: bogus longitude in value of row %q = %q", k, v)
892 }
893 return nil
894 }
895 c.gps[wholeRef] = latLong{lat, long}
896 return nil
897 }
898
899
900
901
902
903
904
905 const useBlobParseCache = false
906
907 func (c *Corpus) blobParse(v string) (br blob.Ref, ok bool) {
908 if useBlobParseCache {
909 br, ok = c.brOfStr[v]
910 if ok {
911 return
912 }
913 }
914 return blob.Parse(v)
915 }
916
917
918 func (c *Corpus) str(s string) string {
919 if s == "" {
920 return ""
921 }
922 if s, ok := c.strs[s]; ok {
923 return s
924 }
925 if c.strs == nil {
926 c.strs = make(map[string]string)
927 }
928 c.strs[s] = s
929 return s
930 }
931
932
933 func (c *Corpus) br(br blob.Ref) blob.Ref {
934 if bm, ok := c.blobs[br]; ok {
935 c.brInterns++
936 return bm.Ref
937 }
938 return br
939 }
940
941
942
943
944
945
946
947
948
949 func (c *Corpus) EnumerateCamliBlobs(camType schema.CamliType, fn func(camtypes.BlobMeta) bool) {
950 if camType != "" {
951 for _, bm := range c.camBlobs[camType] {
952 if !fn(*bm) {
953 return
954 }
955 }
956 return
957 }
958 for _, m := range c.camBlobs {
959 for _, bm := range m {
960 if !fn(*bm) {
961 return
962 }
963 }
964 }
965 }
966
967
968
969
970 func (c *Corpus) EnumerateBlobMeta(fn func(camtypes.BlobMeta) bool) {
971 for _, bm := range c.blobs {
972 if !fn(*bm) {
973 return
974 }
975 }
976 }
977
978
979
980 type pnAndTime struct {
981 pn blob.Ref
982 t time.Time
983 }
984
985 type byPermanodeTime []pnAndTime
986
987 func (s byPermanodeTime) Len() int { return len(s) }
988 func (s byPermanodeTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
989 func (s byPermanodeTime) Less(i, j int) bool {
990 if s[i].t.Equal(s[j].t) {
991 return s[i].pn.Less(s[j].pn)
992 }
993 return s[i].t.Before(s[j].t)
994 }
995
996 type lazySortedPermanodes struct {
997 c *Corpus
998 pnTime func(blob.Ref) (time.Time, bool)
999
1000 mu sync.Mutex
1001 sortedCache []pnAndTime
1002 sortedCacheReversed []pnAndTime
1003 ofGen int64
1004 }
1005
1006 func reversedCopy(original []pnAndTime) []pnAndTime {
1007 l := len(original)
1008 reversed := make([]pnAndTime, l)
1009 for k, v := range original {
1010 reversed[l-1-k] = v
1011 }
1012 return reversed
1013 }
1014
1015 func (lsp *lazySortedPermanodes) sorted(reverse bool) []pnAndTime {
1016 lsp.mu.Lock()
1017 defer lsp.mu.Unlock()
1018 if lsp.ofGen == lsp.c.gen {
1019
1020 if reverse {
1021 if lsp.sortedCacheReversed != nil {
1022 return lsp.sortedCacheReversed
1023 }
1024 if lsp.sortedCache != nil {
1025
1026 lsp.sortedCacheReversed = reversedCopy(lsp.sortedCache)
1027 return lsp.sortedCacheReversed
1028 }
1029 }
1030 if !reverse {
1031 if lsp.sortedCache != nil {
1032 return lsp.sortedCache
1033 }
1034 if lsp.sortedCacheReversed != nil {
1035
1036 lsp.sortedCache = reversedCopy(lsp.sortedCacheReversed)
1037 return lsp.sortedCache
1038 }
1039 }
1040 }
1041
1042 lsp.sortedCache = nil
1043 lsp.sortedCacheReversed = nil
1044 pns := make([]pnAndTime, 0, len(lsp.c.permanodes))
1045 for pn := range lsp.c.permanodes {
1046 if lsp.c.IsDeleted(pn) {
1047 continue
1048 }
1049 if pt, ok := lsp.pnTime(pn); ok {
1050 pns = append(pns, pnAndTime{pn, pt})
1051 }
1052 }
1053
1054 if reverse {
1055 sort.Sort(sort.Reverse(byPermanodeTime(pns)))
1056 lsp.sortedCacheReversed = pns
1057 } else {
1058 sort.Sort(byPermanodeTime(pns))
1059 lsp.sortedCache = pns
1060 }
1061 lsp.ofGen = lsp.c.gen
1062 return pns
1063 }
1064
1065 func (c *Corpus) enumeratePermanodes(fn func(camtypes.BlobMeta) bool, pns []pnAndTime) {
1066 for _, cand := range pns {
1067 bm := c.blobs[cand.pn]
1068 if bm == nil {
1069 continue
1070 }
1071 if !fn(*bm) {
1072 return
1073 }
1074 }
1075 }
1076
1077
1078
1079 func (c *Corpus) EnumeratePermanodesLastModified(fn func(camtypes.BlobMeta) bool) {
1080 c.enumeratePermanodes(fn, c.permanodesByModtime.sorted(true))
1081 }
1082
1083
1084
1085
1086
1087 func (c *Corpus) EnumeratePermanodesCreated(fn func(camtypes.BlobMeta) bool, newestFirst bool) {
1088 c.enumeratePermanodes(fn, c.permanodesByTime.sorted(newestFirst))
1089 }
1090
1091
1092 func (c *Corpus) EnumerateSingleBlob(fn func(camtypes.BlobMeta) bool, br blob.Ref) {
1093 if bm := c.blobs[br]; bm != nil {
1094 fn(*bm)
1095 }
1096 }
1097
1098
1099
1100
1101 func (c *Corpus) EnumeratePermanodesByNodeTypes(fn func(camtypes.BlobMeta) bool, camliNodeTypes []string) {
1102 for _, t := range camliNodeTypes {
1103 set := c.permanodesSetByNodeType[t]
1104 for br := range set {
1105 if bm := c.blobs[br]; bm != nil {
1106 if !fn(*bm) {
1107 return
1108 }
1109 }
1110 }
1111 }
1112 }
1113
1114 func (c *Corpus) GetBlobMeta(ctx context.Context, br blob.Ref) (camtypes.BlobMeta, error) {
1115 bm, ok := c.blobs[br]
1116 if !ok {
1117 return camtypes.BlobMeta{}, os.ErrNotExist
1118 }
1119 return *bm, nil
1120 }
1121
1122 func (c *Corpus) KeyId(ctx context.Context, signer blob.Ref) (string, error) {
1123 if v, ok := c.keyId[signer]; ok {
1124 return v, nil
1125 }
1126 return "", sorted.ErrNotFound
1127 }
1128
1129 func (c *Corpus) pnTimeAttr(pn blob.Ref, attr string) (t time.Time, ok bool) {
1130 if v := c.PermanodeAttrValue(pn, attr, time.Time{}, ""); v != "" {
1131 if t, err := time.Parse(time.RFC3339, v); err == nil {
1132 return t, true
1133 }
1134 }
1135 return
1136 }
1137
1138
1139 func (c *Corpus) PermanodeTime(pn blob.Ref) (t time.Time, ok bool) {
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153 if t, ok = c.pnTimeAttr(pn, nodeattr.PaymentDueDate); ok {
1154 return
1155 }
1156 if t, ok = c.pnTimeAttr(pn, nodeattr.StartDate); ok {
1157 return
1158 }
1159 if t, ok = c.pnTimeAttr(pn, nodeattr.DateCreated); ok {
1160 return
1161 }
1162 var fi camtypes.FileInfo
1163 ccRef, ccTime, ok := c.pnCamliContent(pn)
1164 if ok {
1165 fi, _ = c.files[ccRef]
1166 }
1167 if fi.Time != nil {
1168 return time.Time(*fi.Time), true
1169 }
1170
1171 if t, ok = c.pnTimeAttr(pn, nodeattr.DatePublished); ok {
1172 return
1173 }
1174 if t, ok = c.pnTimeAttr(pn, nodeattr.DateModified); ok {
1175 return
1176 }
1177 if fi.ModTime != nil {
1178 return time.Time(*fi.ModTime), true
1179 }
1180 if ok {
1181 return ccTime, true
1182 }
1183 return time.Time{}, false
1184 }
1185
1186
1187
1188 func (c *Corpus) PermanodeAnyTime(pn blob.Ref) (t time.Time, ok bool) {
1189 if t, ok := c.PermanodeTime(pn); ok {
1190 return t, ok
1191 }
1192 return c.PermanodeModtime(pn)
1193 }
1194
1195 func (c *Corpus) pnCamliContent(pn blob.Ref) (cc blob.Ref, t time.Time, ok bool) {
1196
1197 pm, ok := c.permanodes[pn]
1198 if !ok {
1199 return
1200 }
1201 for _, cl := range pm.Claims {
1202 if cl.Attr != "camliContent" {
1203 continue
1204 }
1205
1206 switch cl.Type {
1207 case string(schema.DelAttributeClaim):
1208 cc = blob.Ref{}
1209 t = time.Time{}
1210 case string(schema.SetAttributeClaim):
1211 cc = blob.ParseOrZero(cl.Value)
1212 t = cl.Date
1213 }
1214 }
1215 return cc, t, cc.Valid()
1216
1217 }
1218
1219
1220
1221
1222
1223
1224
1225
1226 func (c *Corpus) PermanodeModtime(pn blob.Ref) (t time.Time, ok bool) {
1227 pm, ok := c.permanodes[pn]
1228 if !ok {
1229 return
1230 }
1231
1232
1233
1234
1235
1236 for _, cl := range pm.Claims {
1237 if c.IsDeleted(cl.BlobRef) {
1238 continue
1239 }
1240 if cl.Date.After(t) {
1241 t = cl.Date
1242 }
1243 }
1244 return t, !t.IsZero()
1245 }
1246
1247
1248
1249
1250 func (c *Corpus) PermanodeAttrValue(permaNode blob.Ref,
1251 attr string,
1252 at time.Time,
1253 signerFilter string) string {
1254 pm, ok := c.permanodes[permaNode]
1255 if !ok {
1256 return ""
1257 }
1258 var signerRefs SignerRefSet
1259 if signerFilter != "" {
1260 signerRefs, ok = c.signerRefs[signerFilter]
1261 if !ok {
1262 return ""
1263 }
1264 }
1265
1266 if values, ok := pm.valuesAtSigner(at, signerFilter); ok {
1267 v := values[attr]
1268 if len(v) == 0 {
1269 return ""
1270 }
1271 return v[0]
1272 }
1273
1274 return claimPtrsAttrValue(pm.Claims, attr, at, signerRefs)
1275 }
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297 func (c *Corpus) permanodeAttrsOrClaims(permaNode blob.Ref,
1298 at time.Time, signerID string) (m map[string][]string, claims []*camtypes.Claim) {
1299
1300 pm, ok := c.permanodes[permaNode]
1301 if !ok {
1302 return nil, nil
1303 }
1304
1305 m, ok = pm.valuesAtSigner(at, signerID)
1306 if ok {
1307 return m, nil
1308 }
1309 return nil, pm.Claims
1310 }
1311
1312
1313
1314
1315
1316 func (c *Corpus) AppendPermanodeAttrValues(dst []string,
1317 permaNode blob.Ref,
1318 attr string,
1319 at time.Time,
1320 signerFilter string) []string {
1321 if len(dst) > 0 {
1322 panic("len(dst) must be 0")
1323 }
1324 pm, ok := c.permanodes[permaNode]
1325 if !ok {
1326 return dst
1327 }
1328 var signerRefs SignerRefSet
1329 if signerFilter != "" {
1330 signerRefs, ok = c.signerRefs[signerFilter]
1331 if !ok {
1332 return dst
1333 }
1334 }
1335 if values, ok := pm.valuesAtSigner(at, signerFilter); ok {
1336 return append(dst, values[attr]...)
1337 }
1338 if at.IsZero() {
1339 at = time.Now()
1340 }
1341 for _, cl := range pm.Claims {
1342 if cl.Attr != attr || cl.Date.After(at) {
1343 continue
1344 }
1345 if len(signerRefs) > 0 && !signerRefs.blobMatches(cl.Signer) {
1346 continue
1347 }
1348 switch cl.Type {
1349 case string(schema.DelAttributeClaim):
1350 if cl.Value == "" {
1351 dst = dst[:0]
1352 } else {
1353 for i := 0; i < len(dst); i++ {
1354 v := dst[i]
1355 if v == cl.Value {
1356 copy(dst[i:], dst[i+1:])
1357 dst = dst[:len(dst)-1]
1358 i--
1359 }
1360 }
1361 }
1362 case string(schema.SetAttributeClaim):
1363 dst = append(dst[:0], cl.Value)
1364 case string(schema.AddAttributeClaim):
1365 dst = append(dst, cl.Value)
1366 }
1367 }
1368 return dst
1369 }
1370
1371 func (c *Corpus) AppendClaims(ctx context.Context, dst []camtypes.Claim, permaNode blob.Ref,
1372 signerFilter string,
1373 attrFilter string) ([]camtypes.Claim, error) {
1374 pm, ok := c.permanodes[permaNode]
1375 if !ok {
1376 return nil, nil
1377 }
1378
1379 var signerRefs SignerRefSet
1380 if signerFilter != "" {
1381 signerRefs, ok = c.signerRefs[signerFilter]
1382 if !ok {
1383 return dst, nil
1384 }
1385 }
1386
1387 for _, cl := range pm.Claims {
1388 if c.IsDeleted(cl.BlobRef) {
1389 continue
1390 }
1391
1392 if len(signerRefs) > 0 && !signerRefs.blobMatches(cl.Signer) {
1393 continue
1394 }
1395
1396 if attrFilter != "" && cl.Attr != attrFilter {
1397 continue
1398 }
1399 dst = append(dst, *cl)
1400 }
1401 return dst, nil
1402 }
1403
1404 func (c *Corpus) GetFileInfo(ctx context.Context, fileRef blob.Ref) (fi camtypes.FileInfo, err error) {
1405 fi, ok := c.files[fileRef]
1406 if !ok {
1407 err = os.ErrNotExist
1408 }
1409 return
1410 }
1411
1412
1413
1414 func (c *Corpus) GetDirChildren(ctx context.Context, dirRef blob.Ref) (map[blob.Ref]struct{}, error) {
1415 children, ok := c.dirChildren[dirRef]
1416 if !ok {
1417 if _, ok := c.files[dirRef]; !ok {
1418 return nil, os.ErrNotExist
1419 }
1420 return nil, nil
1421 }
1422 return children, nil
1423 }
1424
1425
1426
1427 func (c *Corpus) GetParentDirs(ctx context.Context, childRef blob.Ref) (map[blob.Ref]struct{}, error) {
1428 parents, ok := c.fileParents[childRef]
1429 if !ok {
1430 if _, ok := c.files[childRef]; !ok {
1431 return nil, os.ErrNotExist
1432 }
1433 return nil, nil
1434 }
1435 return parents, nil
1436 }
1437
1438 func (c *Corpus) GetImageInfo(ctx context.Context, fileRef blob.Ref) (ii camtypes.ImageInfo, err error) {
1439 ii, ok := c.imageInfo[fileRef]
1440 if !ok {
1441 err = os.ErrNotExist
1442 }
1443 return
1444 }
1445
1446 func (c *Corpus) GetMediaTags(ctx context.Context, fileRef blob.Ref) (map[string]string, error) {
1447 wholeRef, ok := c.fileWholeRef[fileRef]
1448 if !ok {
1449 return nil, os.ErrNotExist
1450 }
1451 tags, ok := c.mediaTags[wholeRef]
1452 if !ok {
1453 return nil, os.ErrNotExist
1454 }
1455 return tags, nil
1456 }
1457
1458 func (c *Corpus) GetWholeRef(ctx context.Context, fileRef blob.Ref) (wholeRef blob.Ref, ok bool) {
1459 wholeRef, ok = c.fileWholeRef[fileRef]
1460 return
1461 }
1462
1463 func (c *Corpus) FileLatLong(fileRef blob.Ref) (lat, long float64, ok bool) {
1464 wholeRef, ok := c.fileWholeRef[fileRef]
1465 if !ok {
1466 return
1467 }
1468 ll, ok := c.gps[wholeRef]
1469 if !ok {
1470 return
1471 }
1472 return ll.lat, ll.long, true
1473 }
1474
1475
1476
1477
1478
1479
1480 func (c *Corpus) ForeachClaim(permaNode blob.Ref, at time.Time, fn func(*camtypes.Claim) bool) {
1481 pm, ok := c.permanodes[permaNode]
1482 if !ok {
1483 return
1484 }
1485 for _, cl := range pm.Claims {
1486 if !at.IsZero() && cl.Date.After(at) {
1487 continue
1488 }
1489 if !fn(cl) {
1490 return
1491 }
1492 }
1493 }
1494
1495
1496
1497
1498
1499
1500 func (c *Corpus) ForeachClaimBack(value blob.Ref, at time.Time, fn func(*camtypes.Claim) bool) {
1501 for _, cl := range c.claimBack[value] {
1502 if !at.IsZero() && cl.Date.After(at) {
1503 continue
1504 }
1505 if !fn(cl) {
1506 return
1507 }
1508 }
1509 }
1510
1511
1512
1513
1514 func (c *Corpus) PermanodeHasAttrValue(pn blob.Ref, at time.Time, attr, val string) bool {
1515 pm, ok := c.permanodes[pn]
1516 if !ok {
1517 return false
1518 }
1519 if values, ok := pm.valuesAtSigner(at, ""); ok {
1520 for _, v := range values[attr] {
1521 if v == val {
1522 return true
1523 }
1524 }
1525 return false
1526 }
1527 if at.IsZero() {
1528 at = time.Now()
1529 }
1530 ret := false
1531 for _, cl := range pm.Claims {
1532 if cl.Attr != attr {
1533 continue
1534 }
1535 if cl.Date.After(at) {
1536 break
1537 }
1538 switch cl.Type {
1539 case string(schema.DelAttributeClaim):
1540 if cl.Value == "" || cl.Value == val {
1541 ret = false
1542 }
1543 case string(schema.SetAttributeClaim):
1544 ret = (cl.Value == val)
1545 case string(schema.AddAttributeClaim):
1546 if cl.Value == val {
1547 ret = true
1548 }
1549 }
1550 }
1551 return ret
1552 }
1553
1554
1555
1556 func SetVerboseCorpusLogging(v bool) {
1557 logCorpusStats = v
1558 }