1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package index
18
19 import (
20 "bytes"
21 "context"
22 "errors"
23 "fmt"
24 "io"
25 "log"
26 "math"
27 "os"
28 "runtime"
29 "sort"
30 "strconv"
31 "strings"
32 "sync"
33 "time"
34
35 "perkeep.org/pkg/blob"
36 "perkeep.org/pkg/blobserver"
37 "perkeep.org/pkg/env"
38 "perkeep.org/pkg/schema"
39 "perkeep.org/pkg/sorted"
40 "perkeep.org/pkg/types/camtypes"
41
42 "go4.org/jsonconfig"
43 "go4.org/strutil"
44 "go4.org/types"
45 )
46
47 func init() {
48 blobserver.RegisterStorageConstructor("index", newFromConfig)
49 }
50
51 type Index struct {
52 *blobserver.NoImplStorage
53 reindex bool
54 keepGoing bool
55 s sorted.KeyValue
56
57 KeyFetcher blob.Fetcher
58
59
60
61
62
63
64 deletes *deletionCache
65
66 corpus *Corpus
67
68 mu sync.RWMutex
69
70
71
72
73 needs map[blob.Ref][]blob.Ref
74
75
76 neededBy map[blob.Ref][]blob.Ref
77 readyReindex map[blob.Ref]bool
78
79
80 reindexWg sync.WaitGroup
81
82
83
84
85 blobSource blobserver.FetcherEnumerator
86
87 hasWiped bool
88 }
89
90 func (x *Index) Lock() { x.mu.Lock() }
91 func (x *Index) Unlock() { x.mu.Unlock() }
92 func (x *Index) RLock() { x.mu.RLock() }
93 func (x *Index) RUnlock() { x.mu.RUnlock() }
94
95 var (
96 _ blobserver.Storage = (*Index)(nil)
97 _ Interface = (*Index)(nil)
98 )
99
100 func (x *Index) logf(format string, args ...interface{}) {
101 log.Printf("index: "+format, args...)
102 }
103
104 var aboutToReindex = false
105
106
107
108
109
110 type SignerRefSet []string
111
112
113
114 type Owner struct {
115 keyID []string
116
117
118 blobByKeyID map[string]SignerRefSet
119 }
120
121
122 func NewOwner(keyID string, ref blob.Ref) *Owner {
123 return &Owner{
124 keyID: []string{keyID},
125 blobByKeyID: map[string]SignerRefSet{keyID: SignerRefSet{ref.String()}},
126 }
127 }
128
129
130
131 func (o *Owner) KeyID() string {
132 if o == nil || len(o.keyID) == 0 {
133 return ""
134 }
135 return o.keyID[0]
136 }
137
138
139 func (o *Owner) RefSet(keyID string) SignerRefSet {
140 if o == nil || len(o.blobByKeyID) == 0 {
141 return nil
142 }
143 refs := o.blobByKeyID[keyID]
144 if len(refs) == 0 {
145 return nil
146 }
147 return refs
148 }
149
150
151
152
153 func (o *Owner) BlobRef() blob.Ref {
154 if o == nil || len(o.blobByKeyID) == 0 {
155 return blob.Ref{}
156 }
157 refs := o.blobByKeyID[o.KeyID()]
158 if len(refs) == 0 {
159 return blob.Ref{}
160 }
161 ref, ok := blob.Parse(refs[0])
162 if !ok {
163 return blob.Ref{}
164 }
165 return ref
166 }
167
168
169
170
171
172
173
174
175
176 func (x *Index) InitBlobSource(blobSource blobserver.FetcherEnumerator) {
177 x.Lock()
178 defer x.Unlock()
179 if x.blobSource != nil {
180 panic("blobSource of Index already set")
181 }
182 x.blobSource = blobSource
183 if x.KeyFetcher == nil {
184 x.KeyFetcher = blobSource
185 }
186 }
187
188
189 func New(s sorted.KeyValue) (*Index, error) {
190 idx := &Index{
191 s: s,
192 needs: make(map[blob.Ref][]blob.Ref),
193 neededBy: make(map[blob.Ref][]blob.Ref),
194 readyReindex: make(map[blob.Ref]bool),
195 }
196 if aboutToReindex {
197 idx.deletes = newDeletionCache()
198 return idx, nil
199 }
200
201 schemaVersion := idx.schemaVersion()
202 switch {
203 case schemaVersion == 0 && idx.isEmpty():
204
205 err := idx.s.Set(keySchemaVersion.name, fmt.Sprint(requiredSchemaVersion))
206 if err != nil {
207 return nil, fmt.Errorf("Could not write index schema version %q: %v", requiredSchemaVersion, err)
208 }
209 case schemaVersion != requiredSchemaVersion:
210 tip := ""
211 if env.IsDev() {
212
213
214 tip = `(For the dev server, run "devcam server --wipe" to wipe both your blobs and index)`
215 } else {
216 if is4To5SchemaBump(schemaVersion) {
217 return idx, errMissingWholeRef
218 }
219 tip = "Run 'perkeepd --reindex' (it might take awhile, but shows status). Alternative: 'camtool dbinit' (or just delete the file for a file based index), and then 'camtool sync --all'"
220 }
221 return nil, fmt.Errorf("index schema version is %d; required one is %d. You need to reindex. %s",
222 schemaVersion, requiredSchemaVersion, tip)
223 }
224 if err := idx.initDeletesCache(); err != nil {
225 return nil, fmt.Errorf("Could not initialize index's deletes cache: %v", err)
226 }
227 if err := idx.initNeededMaps(); err != nil {
228 return nil, fmt.Errorf("Could not initialize index's missing blob maps: %v", err)
229 }
230 return idx, nil
231 }
232
233 func is4To5SchemaBump(schemaVersion int) bool {
234 return schemaVersion == 4 && requiredSchemaVersion == 5
235 }
236
237 var errMissingWholeRef = errors.New("missing wholeRef field in fileInfo rows")
238
239
240
241 func (x *Index) fixMissingWholeRef(fetcher blob.Fetcher) (err error) {
242
243
244 if x.schemaVersion() != 4 || requiredSchemaVersion != 5 {
245 panic("fixMissingWholeRef should only be used when upgrading from v4 to v5 of the index schema")
246 }
247 x.logf("fixing the missing wholeRef in the fileInfo rows...")
248 defer func() {
249 if err != nil {
250 x.logf("fixing the fileInfo rows failed: %v", err)
251 return
252 }
253 x.logf("successfully fixed wholeRef in FileInfo rows.")
254 }()
255
256
257 fileRefToWholeRef := make(map[blob.Ref]blob.Ref)
258 it := x.queryPrefix(keyWholeToFileRef)
259 var keyA [3]string
260 for it.Next() {
261 keyPart := strutil.AppendSplitN(keyA[:0], it.Key(), "|", 3)
262 if len(keyPart) != 3 {
263 return fmt.Errorf("bogus keyWholeToFileRef key: got %q, wanted \"wholetofile|wholeRef|fileRef\"", it.Key())
264 }
265 wholeRef, ok1 := blob.Parse(keyPart[1])
266 fileRef, ok2 := blob.Parse(keyPart[2])
267 if !ok1 || !ok2 {
268 return fmt.Errorf("bogus part in keyWholeToFileRef key: %q", it.Key())
269 }
270 fileRefToWholeRef[fileRef] = wholeRef
271 }
272 if err := it.Close(); err != nil {
273 return err
274 }
275
276 var fixedEntries, missedEntries int
277 t := time.NewTicker(5 * time.Second)
278 defer t.Stop()
279
280
281
282 mutations := make(map[string]string)
283 keyPrefix := keyFileInfo.name + "|"
284 it = x.queryPrefix(keyFileInfo)
285 defer it.Close()
286 var valA [3]string
287 for it.Next() {
288 select {
289 case <-t.C:
290 x.logf("recorded %d missing wholeRef that we'll try to fix, and %d that we can't fix.", fixedEntries, missedEntries)
291 default:
292 }
293 br, ok := blob.ParseBytes(it.KeyBytes()[len(keyPrefix):])
294 if !ok {
295 return fmt.Errorf("invalid blobRef %q", it.KeyBytes()[len(keyPrefix):])
296 }
297 wholeRef, ok := fileRefToWholeRef[br]
298 if !ok {
299 missedEntries++
300 x.logf("WARNING: wholeRef for %v not found in index. You should probably rebuild the whole index.", br)
301 continue
302 }
303 valPart := strutil.AppendSplitN(valA[:0], it.Value(), "|", 3)
304
305 if len(valPart) != 3 {
306 return fmt.Errorf("bogus keyFileInfo value: got %q, wanted \"size|filename|mimetype\"", it.Value())
307 }
308 size_s, filename, mimetype := valPart[0], valPart[1], urld(valPart[2])
309 if strings.Contains(mimetype, "|") {
310
311
312
313
314
315
316 x.logf("%v: %v already has a wholeRef, not fixing it", it.Key(), it.Value())
317 continue
318 }
319 size, err := strconv.Atoi(size_s)
320 if err != nil {
321 return fmt.Errorf("bogus size in keyFileInfo value %v: %v", it.Value(), err)
322 }
323 mutations[keyFileInfo.Key(br)] = keyFileInfo.Val(size, filename, mimetype, wholeRef)
324 fixedEntries++
325 }
326 if err := it.Close(); err != nil {
327 return err
328 }
329 x.logf("starting to commit the missing wholeRef fixes (%d entries) now, this can take a while.", fixedEntries)
330 bm := x.s.BeginBatch()
331 for k, v := range mutations {
332 bm.Set(k, v)
333 }
334 bm.Set(keySchemaVersion.name, "5")
335 if err := x.s.CommitBatch(bm); err != nil {
336 return err
337 }
338 if missedEntries > 0 {
339 x.logf("some missing wholeRef entries were not fixed (%d), you should do a full reindex.", missedEntries)
340 }
341 return nil
342 }
343
344 func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) {
345 blobPrefix := config.RequiredString("blobSource")
346 kvConfig := config.RequiredObject("storage")
347 reindex := config.OptionalBool("reindex", false)
348 keepGoing := config.OptionalBool("keepGoing", false)
349
350 if err := config.Validate(); err != nil {
351 return nil, err
352 }
353
354 kv, err := sorted.NewKeyValue(kvConfig)
355 if err != nil {
356 if _, ok := err.(sorted.NeedWipeError); !ok {
357 return nil, err
358 }
359 if !reindex {
360 return nil, err
361 }
362 }
363 if reindex {
364 aboutToReindex = true
365 wiper, ok := kv.(sorted.Wiper)
366 if !ok {
367 return nil, fmt.Errorf("index's storage type %T doesn't support sorted.Wiper", kv)
368 }
369 if err := wiper.Wipe(); err != nil {
370 return nil, fmt.Errorf("error wiping index's sorted key/value type %T: %v", kv, err)
371 }
372 log.Printf("Index wiped.")
373 }
374
375 sto, err := ld.GetStorage(blobPrefix)
376 if err != nil {
377 return nil, err
378 }
379
380 ix, err := New(kv)
381
382
383
384 if err == errMissingWholeRef {
385
386
387 if err := ix.fixMissingWholeRef(sto); err != nil {
388 ix.Close()
389 return nil, fmt.Errorf("could not fix missing wholeRef entries: %v", err)
390 }
391 ix, err = New(kv)
392 }
393 ix.keepGoing = keepGoing
394 ix.reindex = reindex
395 if reindex {
396 ix.hasWiped = true
397 }
398 if err != nil {
399 return nil, err
400 }
401 ix.InitBlobSource(sto)
402
403 if !reindex {
404 if err := ix.integrityCheck(3 * time.Second); err != nil {
405 return nil, err
406 }
407 }
408
409 return ix, err
410 }
411
412 func (x *Index) String() string {
413 return fmt.Sprintf("Perkeep index, using key/value implementation %T", x.s)
414 }
415
416 func (x *Index) isEmpty() bool {
417 iter := x.s.Find("", "")
418 hasRows := iter.Next()
419 if err := iter.Close(); err != nil {
420 panic(err)
421 }
422 return !hasRows
423 }
424
425
426 var reindexMaxProcs = struct {
427 sync.RWMutex
428 v int
429 }{v: runtime.NumCPU()}
430
431
432
433 func SetReindexMaxProcs(n int) {
434 reindexMaxProcs.Lock()
435 defer reindexMaxProcs.Unlock()
436 reindexMaxProcs.v = n
437 }
438
439
440
441 func ReindexMaxProcs() int {
442 reindexMaxProcs.RLock()
443 defer reindexMaxProcs.RUnlock()
444 return reindexMaxProcs.v
445 }
446
447 func (x *Index) WantsReindex() bool { return x.reindex }
448 func (x *Index) WantsKeepGoing() bool { return x.keepGoing }
449
450 func (x *Index) Reindex() error {
451 x.Lock()
452 if x.blobSource == nil {
453 x.Unlock()
454 return errors.New("index: can't re-index: no blobSource")
455 }
456 x.Unlock()
457
458 reindexMaxProcs.RLock()
459 defer reindexMaxProcs.RUnlock()
460
461 ctx := context.Background()
462
463 if !x.hasWiped {
464 wiper, ok := x.s.(sorted.Wiper)
465 if !ok {
466 return fmt.Errorf("index's storage type %T doesn't support sorted.Wiper", x.s)
467 }
468 log.Printf("Wiping index storage type %T ...", x.s)
469 if err := wiper.Wipe(); err != nil {
470 return fmt.Errorf("error wiping index's sorted key/value type %T: %v", x.s, err)
471 }
472 log.Printf("Index wiped.")
473 }
474 log.Printf("Rebuilding index...")
475
476 reindexStart, _ := blob.Parse(os.Getenv("CAMLI_REINDEX_START"))
477
478 err := x.s.Set(keySchemaVersion.name, fmt.Sprintf("%d", requiredSchemaVersion))
479 if err != nil {
480 return err
481 }
482
483 var nerrmu sync.Mutex
484 nerr := 0
485
486 blobc := make(chan blob.Ref, 32)
487
488 enumCtx := context.Background()
489 enumErr := make(chan error, 1)
490 go func() {
491 defer close(blobc)
492 donec := enumCtx.Done()
493 var lastTick time.Time
494 enumErr <- blobserver.EnumerateAll(enumCtx, x.blobSource, func(sb blob.SizedRef) error {
495 now := time.Now()
496 if lastTick.Before(now.Add(-1 * time.Second)) {
497 log.Printf("Reindexing at %v", sb.Ref)
498 lastTick = now
499 }
500 if reindexStart.Valid() && sb.Ref.Less(reindexStart) {
501 return nil
502 }
503 select {
504 case <-donec:
505 return ctx.Err()
506 case blobc <- sb.Ref:
507 return nil
508 }
509 })
510 }()
511 var wg sync.WaitGroup
512 for i := 0; i < reindexMaxProcs.v; i++ {
513 wg.Add(1)
514 go func() {
515 defer wg.Done()
516 for br := range blobc {
517 if err := x.indexBlob(ctx, br); err != nil {
518 log.Printf("Error reindexing %v: %v", br, err)
519 nerrmu.Lock()
520 nerr++
521 nerrmu.Unlock()
522
523
524 }
525 }
526 }()
527 }
528 if err := <-enumErr; err != nil {
529 return err
530 }
531
532 wg.Wait()
533 x.reindexWg.Wait()
534
535 x.RLock()
536 readyCount := len(x.readyReindex)
537 needed := len(x.needs)
538 var needSample bytes.Buffer
539 if needed > 0 {
540 n := 0
541 Sample:
542 for x, needs := range x.needs {
543 for _, need := range needs {
544 n++
545 if n == 10 {
546 break Sample
547 }
548 if n > 1 {
549 fmt.Fprintf(&needSample, ", ")
550 }
551 fmt.Fprintf(&needSample, "%v needs %v", x, need)
552 }
553 }
554 }
555 x.RUnlock()
556 if readyCount > 0 {
557 return fmt.Errorf("%d blobs were ready to reindex in out-of-order queue, but not yet ran", readyCount)
558 }
559 if needed > 0 {
560 return fmt.Errorf("%d blobs are still needed as dependencies; a sample: %s", needed, needSample.Bytes())
561 }
562
563 nerrmu.Lock()
564 if nerr != 0 {
565 return fmt.Errorf("%d blobs failed to re-index", nerr)
566 }
567 if err := x.initDeletesCache(); err != nil {
568 return err
569 }
570 log.Printf("Index rebuild complete.")
571 return nil
572 }
573
574
575
576
577
578 func (x *Index) integrityCheck(timeout time.Duration) error {
579 t0 := time.Now()
580 x.logf("starting integrity check...")
581 defer func() {
582 x.logf("integrity check done (after %v)", time.Since(t0).Round(10*time.Millisecond))
583 }()
584 if x.blobSource == nil {
585 return errors.New("index: can't check sanity of index: no blobSource")
586 }
587
588
589
590 seen := make([]blob.Ref, 0)
591 notFound := make([]blob.Ref, 0)
592 enumCtx := context.TODO()
593 stopTime := time.NewTimer(timeout)
594 defer stopTime.Stop()
595 var errEOT = errors.New("time's out")
596 if err := blobserver.EnumerateAll(enumCtx, x.blobSource, func(sb blob.SizedRef) error {
597 select {
598 case <-stopTime.C:
599 return errEOT
600 default:
601 }
602 if _, err := x.GetBlobMeta(enumCtx, sb.Ref); err != nil {
603 if !os.IsNotExist(err) {
604 return err
605 }
606 notFound = append(notFound, sb.Ref)
607 return nil
608 }
609 seen = append(seen, sb.Ref)
610 return nil
611 }); err != nil && err != errEOT {
612 return err
613 }
614 if len(notFound) > 0 {
615
616 x.logf("WARNING: sanity checking of the index found %d non-indexed blobs out of %d tested blobs. Reindexing is advised.", len(notFound), len(notFound)+len(seen))
617 }
618 return nil
619 }
620
621 func queryPrefixString(s sorted.KeyValue, prefix string) sorted.Iterator {
622 if prefix == "" {
623 return s.Find("", "")
624 }
625 lastByte := prefix[len(prefix)-1]
626 if lastByte == 0xff {
627 panic("unsupported query prefix ending in 0xff")
628 }
629 end := prefix[:len(prefix)-1] + string(lastByte+1)
630 return s.Find(prefix, end)
631 }
632
633 func (x *Index) queryPrefixString(prefix string) sorted.Iterator {
634 return queryPrefixString(x.s, prefix)
635 }
636
637 func queryPrefix(s sorted.KeyValue, key *keyType, args ...interface{}) sorted.Iterator {
638 return queryPrefixString(s, key.Prefix(args...))
639 }
640
641 func (x *Index) queryPrefix(key *keyType, args ...interface{}) sorted.Iterator {
642 return x.queryPrefixString(key.Prefix(args...))
643 }
644
645 func closeIterator(it sorted.Iterator, perr *error) {
646 err := it.Close()
647 if err != nil && *perr == nil {
648 *perr = err
649 }
650 }
651
652
653
654 func (x *Index) schemaVersion() int {
655 schemaVersionStr, err := x.s.Get(keySchemaVersion.name)
656 if err != nil {
657 if err == sorted.ErrNotFound {
658 return 0
659 }
660 panic(fmt.Sprintf("Could not get index schema version: %v", err))
661 }
662 schemaVersion, err := strconv.Atoi(schemaVersionStr)
663 if err != nil {
664 panic(fmt.Sprintf("Bogus index schema version: %q", schemaVersionStr))
665 }
666 return schemaVersion
667 }
668
669 type deletion struct {
670 deleter blob.Ref
671 when time.Time
672 }
673
674 type byDeletionDate []deletion
675
676 func (d byDeletionDate) Len() int { return len(d) }
677 func (d byDeletionDate) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
678 func (d byDeletionDate) Less(i, j int) bool { return d[i].when.Before(d[j].when) }
679
680 type deletionCache struct {
681 sync.RWMutex
682 m map[blob.Ref][]deletion
683 }
684
685 func newDeletionCache() *deletionCache {
686 return &deletionCache{
687 m: make(map[blob.Ref][]deletion),
688 }
689 }
690
691
692
693 func (x *Index) initDeletesCache() (err error) {
694 x.deletes = newDeletionCache()
695 it := x.queryPrefix(keyDeleted)
696 defer closeIterator(it, &err)
697 for it.Next() {
698 cl, ok := kvDeleted(it.Key())
699 if !ok {
700 return fmt.Errorf("Bogus keyDeleted entry key: want |\"deleted\"|<deleted blobref>|<reverse claimdate>|<deleter claim>|, got %q", it.Key())
701 }
702 targetDeletions := append(x.deletes.m[cl.Target],
703 deletion{
704 deleter: cl.BlobRef,
705 when: cl.Date,
706 })
707 sort.Sort(sort.Reverse(byDeletionDate(targetDeletions)))
708 x.deletes.m[cl.Target] = targetDeletions
709 }
710 return err
711 }
712
713 func kvDeleted(k string) (c camtypes.Claim, ok bool) {
714
715 keyPart := strings.Split(k, "|")
716 if len(keyPart) != 4 {
717 return
718 }
719 if keyPart[0] != "deleted" {
720 return
721 }
722 target, ok := blob.Parse(keyPart[1])
723 if !ok {
724 return
725 }
726 claimRef, ok := blob.Parse(keyPart[3])
727 if !ok {
728 return
729 }
730 date, err := time.Parse(time.RFC3339, unreverseTimeString(keyPart[2]))
731 if err != nil {
732 return
733 }
734 return camtypes.Claim{
735 BlobRef: claimRef,
736 Target: target,
737 Date: date,
738 Type: string(schema.DeleteClaim),
739 }, true
740 }
741
742
743
744 func (x *Index) IsDeleted(br blob.Ref) bool {
745 if x.deletes == nil {
746
747
748 return x.isDeletedNoCache(br)
749 }
750 x.deletes.RLock()
751 defer x.deletes.RUnlock()
752 return x.isDeleted(br)
753 }
754
755
756 func (x *Index) isDeleted(br blob.Ref) bool {
757 deletes, ok := x.deletes.m[br]
758 if !ok {
759 return false
760 }
761 for _, v := range deletes {
762 if !x.isDeleted(v.deleter) {
763 return true
764 }
765 }
766 return false
767 }
768
769
770 func (x *Index) isDeletedNoCache(br blob.Ref) bool {
771 var err error
772 it := x.queryPrefix(keyDeleted, br)
773 for it.Next() {
774 cl, ok := kvDeleted(it.Key())
775 if !ok {
776 panic(fmt.Sprintf("Bogus keyDeleted entry key: want |\"deleted\"|<deleted blobref>|<reverse claimdate>|<deleter claim>|, got %q", it.Key()))
777 }
778 if !x.isDeletedNoCache(cl.BlobRef) {
779 closeIterator(it, &err)
780 if err != nil {
781
782 panic(fmt.Sprintf("Could not close iterator on keyDeleted: %v", err))
783 }
784 return true
785 }
786 }
787 closeIterator(it, &err)
788 if err != nil {
789
790 panic(fmt.Sprintf("Could not close iterator on keyDeleted: %v", err))
791 }
792 return false
793 }
794
795
796
797
798
799
800
801
802 func (x *Index) GetRecentPermanodes(ctx context.Context, dest chan<- camtypes.RecentPermanode, owner blob.Ref, limit int, before time.Time) (err error) {
803 defer close(dest)
804
805 keyId, err := x.KeyId(ctx, owner)
806 if err == sorted.ErrNotFound {
807 x.logf("no recent permanodes because keyId for owner %v not found", owner)
808 return nil
809 }
810 if err != nil {
811 x.logf("error fetching keyId for owner %v: %v", owner, err)
812 return err
813 }
814
815 sent := 0
816 var seenPermanode dupSkipper
817
818 if before.IsZero() {
819 before = time.Now()
820 }
821
822 it := x.queryPrefix(keyRecentPermanode, keyId)
823 defer closeIterator(it, &err)
824 for it.Next() {
825 permaStr := it.Value()
826 parts := strings.SplitN(it.Key(), "|", 4)
827 if len(parts) != 4 {
828 continue
829 }
830 mTime, _ := time.Parse(time.RFC3339, unreverseTimeString(parts[2]))
831 permaRef, ok := blob.Parse(permaStr)
832 if !ok {
833 continue
834 }
835 if x.IsDeleted(permaRef) {
836 continue
837 }
838 if seenPermanode.Dup(permaStr) {
839 continue
840 }
841
842 if !mTime.Before(before) {
843 continue
844 }
845 dest <- camtypes.RecentPermanode{
846 Permanode: permaRef,
847 Signer: owner,
848 LastModTime: mTime,
849 }
850 sent++
851 if sent == limit {
852 break
853 }
854 }
855 return nil
856 }
857
858 func (x *Index) AppendClaims(ctx context.Context, dst []camtypes.Claim, permaNode blob.Ref,
859 signerFilter string,
860 attrFilter string) ([]camtypes.Claim, error) {
861 if x.corpus != nil {
862 return x.corpus.AppendClaims(ctx, dst, permaNode, signerFilter, attrFilter)
863 }
864 var (
865 err error
866 it sorted.Iterator
867 )
868 var signerRefs SignerRefSet
869 if signerFilter != "" {
870 signerRefs, err = x.signerRefs(ctx, signerFilter)
871 if err != nil {
872 return dst, err
873 }
874 if len(signerRefs) == 0 {
875 return dst, nil
876 }
877 it = x.queryPrefix(keyPermanodeClaim, permaNode, signerFilter)
878 } else {
879 it = x.queryPrefix(keyPermanodeClaim, permaNode)
880 }
881 defer closeIterator(it, &err)
882
883
884
885
886 var mustHave string
887 if attrFilter != "" && urle(attrFilter) == attrFilter {
888 mustHave = attrFilter
889 }
890
891 for it.Next() {
892 val := it.Value()
893 if mustHave != "" && !strings.Contains(val, mustHave) {
894 continue
895 }
896 cl, ok := kvClaim(it.Key(), val, blob.Parse)
897 if !ok {
898 continue
899 }
900 if x.IsDeleted(cl.BlobRef) {
901 continue
902 }
903 if attrFilter != "" && cl.Attr != attrFilter {
904 continue
905 }
906
907
908 if signerFilter != "" && !signerRefs.blobMatches(cl.Signer) {
909 continue
910 }
911 dst = append(dst, cl)
912 }
913 return dst, nil
914 }
915
916 func kvClaim(k, v string, blobParse func(string) (blob.Ref, bool)) (c camtypes.Claim, ok bool) {
917 const nKeyPart = 5
918 const nValPart = 4
919 var keya [nKeyPart]string
920 var vala [nValPart]string
921 keyPart := strutil.AppendSplitN(keya[:0], k, "|", -1)
922 valPart := strutil.AppendSplitN(vala[:0], v, "|", -1)
923 if len(keyPart) < nKeyPart || len(valPart) < nValPart {
924 return
925 }
926 signerRef, ok := blobParse(valPart[3])
927 if !ok {
928 return
929 }
930 permaNode, ok := blobParse(keyPart[1])
931 if !ok {
932 return
933 }
934 claimRef, ok := blobParse(keyPart[4])
935 if !ok {
936 return
937 }
938 date, err := time.Parse(time.RFC3339, keyPart[3])
939 if err != nil {
940 return
941 }
942 return camtypes.Claim{
943 BlobRef: claimRef,
944 Signer: signerRef,
945 Permanode: permaNode,
946 Date: date,
947 Type: urld(valPart[0]),
948 Attr: urld(valPart[1]),
949 Value: urld(valPart[2]),
950 }, true
951 }
952
953 func (x *Index) GetBlobMeta(ctx context.Context, br blob.Ref) (camtypes.BlobMeta, error) {
954 if x.corpus != nil {
955 return x.corpus.GetBlobMeta(ctx, br)
956 }
957 key := "meta:" + br.String()
958 meta, err := x.s.Get(key)
959 if err == sorted.ErrNotFound {
960 err = os.ErrNotExist
961 }
962 if err != nil {
963 return camtypes.BlobMeta{}, err
964 }
965 pos := strings.Index(meta, "|")
966 if pos < 0 {
967 panic(fmt.Sprintf("Bogus index row for key %q: got value %q", key, meta))
968 }
969 size, err := strconv.ParseUint(meta[:pos], 10, 32)
970 if err != nil {
971 return camtypes.BlobMeta{}, err
972 }
973 mime := meta[pos+1:]
974 return camtypes.BlobMeta{
975 Ref: br,
976 Size: uint32(size),
977 CamliType: camliTypeFromMIME(mime),
978 }, nil
979 }
980
981
982 func (x *Index) HasLegacySHA1() (ok bool, err error) {
983 if x.corpus != nil {
984 return x.corpus.hasLegacySHA1, err
985 }
986 it := x.queryPrefix(keyWholeToFileRef, "sha1-")
987 defer closeIterator(it, &err)
988 for it.Next() {
989 return true, err
990 }
991 return false, err
992 }
993
994 func (x *Index) KeyId(ctx context.Context, signer blob.Ref) (string, error) {
995 if x.corpus != nil {
996 return x.corpus.KeyId(ctx, signer)
997 }
998 return x.s.Get("signerkeyid:" + signer.String())
999 }
1000
1001
1002
1003 func (x *Index) signerRefs(ctx context.Context, keyID string) (SignerRefSet, error) {
1004 if x.corpus != nil {
1005 return x.corpus.signerRefs[keyID], nil
1006 }
1007 it := x.queryPrefixString(keySignerKeyID.name)
1008 var err error
1009 var refs SignerRefSet
1010 defer closeIterator(it, &err)
1011 prefix := keySignerKeyID.name + ":"
1012 for it.Next() {
1013 if it.Value() == keyID {
1014 refs = append(refs, strings.TrimPrefix(it.Key(), prefix))
1015 }
1016 }
1017 return refs, nil
1018 }
1019
1020 func (x *Index) PermanodeOfSignerAttrValue(ctx context.Context, signer blob.Ref, attr, val string) (permaNode blob.Ref, err error) {
1021 keyId, err := x.KeyId(ctx, signer)
1022 if err == sorted.ErrNotFound {
1023 return blob.Ref{}, os.ErrNotExist
1024 }
1025 if err != nil {
1026 return blob.Ref{}, err
1027 }
1028 it := x.queryPrefix(keySignerAttrValue, keyId, attr, val)
1029 defer closeIterator(it, &err)
1030 for it.Next() {
1031 permaRef, ok := blob.Parse(it.Value())
1032 if ok && !x.IsDeleted(permaRef) {
1033 return permaRef, nil
1034 }
1035 }
1036 return blob.Ref{}, os.ErrNotExist
1037 }
1038
1039
1040
1041
1042 func (x *Index) SearchPermanodesWithAttr(ctx context.Context, dest chan<- blob.Ref, request *camtypes.PermanodeByAttrRequest) (err error) {
1043 defer close(dest)
1044 if request.FuzzyMatch {
1045
1046 return errors.New("TODO: SearchPermanodesWithAttr: generic indexer doesn't support FuzzyMatch on PermanodeByAttrRequest")
1047 }
1048 if request.Attribute == "" {
1049 return errors.New("index: missing Attribute in SearchPermanodesWithAttr")
1050 }
1051 if !IsIndexedAttribute(request.Attribute) {
1052 return fmt.Errorf("SearchPermanodesWithAttr: called with a non-indexed attribute %q", request.Attribute)
1053 }
1054
1055 keyId, err := x.KeyId(ctx, request.Signer)
1056 if err == sorted.ErrNotFound {
1057 return nil
1058 }
1059 if err != nil {
1060 return err
1061 }
1062 seen := make(map[string]bool)
1063 var it sorted.Iterator
1064 if request.Query == "" {
1065 it = x.queryPrefix(keySignerAttrValue, keyId, request.Attribute)
1066 } else {
1067 it = x.queryPrefix(keySignerAttrValue, keyId, request.Attribute, request.Query)
1068 }
1069 defer closeIterator(it, &err)
1070 before := request.At
1071 if before.IsZero() {
1072 before = time.Now()
1073 }
1074 for it.Next() {
1075 cl, ok := kvSignerAttrValue(it.Key(), it.Value())
1076 if !ok {
1077 continue
1078 }
1079 if x.IsDeleted(cl.BlobRef) {
1080 continue
1081 }
1082 if x.IsDeleted(cl.Permanode) {
1083 continue
1084 }
1085 if cl.Date.After(before) {
1086 continue
1087 }
1088 pnstr := cl.Permanode.String()
1089 if seen[pnstr] {
1090 continue
1091 }
1092 seen[pnstr] = true
1093
1094 dest <- cl.Permanode
1095 if len(seen) == request.MaxResults {
1096 break
1097 }
1098 }
1099 return nil
1100 }
1101
1102 func kvSignerAttrValue(k, v string) (c camtypes.Claim, ok bool) {
1103
1104 keyPart := strings.Split(k, "|")
1105 valPart := strings.Split(v, "|")
1106 if len(keyPart) != 6 || len(valPart) != 1 {
1107
1108 log.Printf("bogus keySignerAttrValue index entry: %q = %q", k, v)
1109 return
1110 }
1111 if keyPart[0] != "signerattrvalue" {
1112 return
1113 }
1114 date, err := time.Parse(time.RFC3339, unreverseTimeString(keyPart[4]))
1115 if err != nil {
1116 log.Printf("bogus time in keySignerAttrValue index entry: %q", keyPart[4])
1117 return
1118 }
1119 claimRef, ok := blob.Parse(keyPart[5])
1120 if !ok {
1121 log.Printf("bogus claim in keySignerAttrValue index entry: %q", keyPart[5])
1122 return
1123 }
1124 permaNode, ok := blob.Parse(valPart[0])
1125 if !ok {
1126 log.Printf("bogus permanode in keySignerAttrValue index entry: %q", valPart[0])
1127 return
1128 }
1129 return camtypes.Claim{
1130 BlobRef: claimRef,
1131 Permanode: permaNode,
1132 Date: date,
1133 Attr: urld(keyPart[2]),
1134 Value: urld(keyPart[3]),
1135 }, true
1136 }
1137
1138 func (x *Index) PathsOfSignerTarget(ctx context.Context, signer, target blob.Ref) (paths []*camtypes.Path, err error) {
1139 paths = []*camtypes.Path{}
1140 keyId, err := x.KeyId(ctx, signer)
1141 if err != nil {
1142 if err == sorted.ErrNotFound {
1143 err = nil
1144 }
1145 return
1146 }
1147
1148 mostRecent := make(map[string]*camtypes.Path)
1149 maxClaimDates := make(map[string]time.Time)
1150
1151 it := x.queryPrefix(keyPathBackward, keyId, target)
1152 defer closeIterator(it, &err)
1153 for it.Next() {
1154 p, ok, active := kvPathBackward(it.Key(), it.Value())
1155 if !ok {
1156 continue
1157 }
1158 if x.IsDeleted(p.Claim) {
1159 continue
1160 }
1161 if x.IsDeleted(p.Base) {
1162 continue
1163 }
1164
1165 key := p.Base.String() + "/" + p.Suffix
1166 if p.ClaimDate.After(maxClaimDates[key]) {
1167 maxClaimDates[key] = p.ClaimDate
1168 if active {
1169 mostRecent[key] = &p
1170 } else {
1171 delete(mostRecent, key)
1172 }
1173 }
1174 }
1175 for _, v := range mostRecent {
1176 paths = append(paths, v)
1177 }
1178 return paths, nil
1179 }
1180
1181 func kvPathBackward(k, v string) (p camtypes.Path, ok bool, active bool) {
1182
1183 keyPart := strings.Split(k, "|")
1184 valPart := strings.Split(v, "|")
1185 if len(keyPart) != 4 || len(valPart) != 4 {
1186
1187 log.Printf("bogus keyPathBackward index entry: %q = %q", k, v)
1188 return
1189 }
1190 if keyPart[0] != "signertargetpath" {
1191 return
1192 }
1193 target, ok := blob.Parse(keyPart[2])
1194 if !ok {
1195 log.Printf("bogus target in keyPathBackward index entry: %q", keyPart[2])
1196 return
1197 }
1198 claim, ok := blob.Parse(keyPart[3])
1199 if !ok {
1200 log.Printf("bogus claim in keyPathBackward index entry: %q", keyPart[3])
1201 return
1202 }
1203 date, err := time.Parse(time.RFC3339, valPart[0])
1204 if err != nil {
1205 log.Printf("bogus date in keyPathBackward index entry: %q", valPart[0])
1206 return
1207 }
1208 base, ok := blob.Parse(valPart[1])
1209 if !ok {
1210 log.Printf("bogus base in keyPathBackward index entry: %q", valPart[1])
1211 return
1212 }
1213 if valPart[2] == "Y" {
1214 active = true
1215 }
1216 return camtypes.Path{
1217 Claim: claim,
1218 Base: base,
1219 Target: target,
1220 ClaimDate: date,
1221 Suffix: urld(valPart[3]),
1222 }, true, active
1223 }
1224
1225 func (x *Index) PathsLookup(ctx context.Context, signer, base blob.Ref, suffix string) (paths []*camtypes.Path, err error) {
1226 paths = []*camtypes.Path{}
1227 keyId, err := x.KeyId(ctx, signer)
1228 if err != nil {
1229 if err == sorted.ErrNotFound {
1230 err = nil
1231 }
1232 return
1233 }
1234
1235 it := x.queryPrefix(keyPathForward, keyId, base, suffix)
1236 defer closeIterator(it, &err)
1237 for it.Next() {
1238 p, ok, active := kvPathForward(it.Key(), it.Value())
1239 if !ok {
1240 continue
1241 }
1242 if x.IsDeleted(p.Claim) {
1243 continue
1244 }
1245 if x.IsDeleted(p.Target) {
1246 continue
1247 }
1248
1249
1250
1251
1252 _ = active
1253
1254 paths = append(paths, &p)
1255 }
1256 return
1257 }
1258
1259 func kvPathForward(k, v string) (p camtypes.Path, ok bool, active bool) {
1260
1261 keyPart := strings.Split(k, "|")
1262 valPart := strings.Split(v, "|")
1263 if len(keyPart) != 6 || len(valPart) != 2 {
1264
1265 log.Printf("bogus keyPathForward index entry: %q = %q", k, v)
1266 return
1267 }
1268 if keyPart[0] != "path" {
1269 return
1270 }
1271 base, ok := blob.Parse(keyPart[2])
1272 if !ok {
1273 log.Printf("bogus base in keyPathForward index entry: %q", keyPart[2])
1274 return
1275 }
1276 date, err := time.Parse(time.RFC3339, unreverseTimeString(keyPart[4]))
1277 if err != nil {
1278 log.Printf("bogus date in keyPathForward index entry: %q", keyPart[4])
1279 return
1280 }
1281 claim, ok := blob.Parse(keyPart[5])
1282 if !ok {
1283 log.Printf("bogus claim in keyPathForward index entry: %q", keyPart[5])
1284 return
1285 }
1286 if valPart[0] == "Y" {
1287 active = true
1288 }
1289 target, ok := blob.Parse(valPart[1])
1290 if !ok {
1291 log.Printf("bogus target in keyPathForward index entry: %q", valPart[1])
1292 return
1293 }
1294 return camtypes.Path{
1295 Claim: claim,
1296 Base: base,
1297 Target: target,
1298 ClaimDate: date,
1299 Suffix: urld(keyPart[3]),
1300 }, true, active
1301 }
1302
1303 func (x *Index) PathLookup(ctx context.Context, signer, base blob.Ref, suffix string, at time.Time) (*camtypes.Path, error) {
1304 paths, err := x.PathsLookup(ctx, signer, base, suffix)
1305 if err != nil {
1306 return nil, err
1307 }
1308 var (
1309 newest = int64(0)
1310 atSeconds = int64(0)
1311 best *camtypes.Path
1312 )
1313
1314 if !at.IsZero() {
1315 atSeconds = at.Unix()
1316 }
1317
1318 for _, path := range paths {
1319 t := path.ClaimDate
1320 secs := t.Unix()
1321 if atSeconds != 0 && secs > atSeconds {
1322
1323 continue
1324 }
1325 if newest > secs {
1326
1327 continue
1328 }
1329
1330 newest, best = secs, path
1331 }
1332 if best == nil {
1333 return nil, os.ErrNotExist
1334 }
1335 return best, nil
1336 }
1337
1338 func (x *Index) existingFileSchemas(wholeRef blob.Ref) (schemaRefs []blob.Ref, err error) {
1339 it := x.queryPrefix(keyWholeToFileRef, wholeRef)
1340 defer closeIterator(it, &err)
1341 for it.Next() {
1342 keyPart := strings.Split(it.Key(), "|")[1:]
1343 if len(keyPart) < 2 {
1344 continue
1345 }
1346 ref, ok := blob.Parse(keyPart[1])
1347 if ok {
1348 schemaRefs = append(schemaRefs, ref)
1349 }
1350 }
1351 return schemaRefs, nil
1352 }
1353
1354
1355 type WholeRefToFile map[string][]blob.Ref
1356
1357
1358 func (x *Index) ExistingFileSchemas(wholeRef ...blob.Ref) (WholeRefToFile, error) {
1359 schemaRefs := make(WholeRefToFile)
1360 for _, v := range wholeRef {
1361 newRefs, err := x.existingFileSchemas(v)
1362 if err != nil {
1363 return nil, err
1364 }
1365 schemaRefs[v.String()] = newRefs
1366 }
1367 return schemaRefs, nil
1368 }
1369
1370 func (x *Index) loadKey(key string, val *string, err *error, wg *sync.WaitGroup) {
1371 defer wg.Done()
1372 *val, *err = x.s.Get(key)
1373 }
1374
1375 func (x *Index) GetFileInfo(ctx context.Context, fileRef blob.Ref) (camtypes.FileInfo, error) {
1376 if x.corpus != nil {
1377 return x.corpus.GetFileInfo(ctx, fileRef)
1378 }
1379 ikey := "fileinfo|" + fileRef.String()
1380 tkey := keyFileTimes.name + "|" + fileRef.String()
1381
1382 wg := new(sync.WaitGroup)
1383 wg.Add(2)
1384 var iv, tv string
1385 var ierr, terr error
1386 go x.loadKey(ikey, &iv, &ierr, wg)
1387 go x.loadKey(tkey, &tv, &terr, wg)
1388 wg.Wait()
1389
1390 if ierr == sorted.ErrNotFound {
1391 return camtypes.FileInfo{}, os.ErrNotExist
1392 }
1393 if ierr != nil {
1394 return camtypes.FileInfo{}, ierr
1395 }
1396 valPart := strings.Split(iv, "|")
1397 if len(valPart) < 3 {
1398 x.logf("bogus key %q = %q", ikey, iv)
1399 return camtypes.FileInfo{}, os.ErrNotExist
1400 }
1401 var wholeRef blob.Ref
1402 if len(valPart) >= 4 {
1403 wholeRef, _ = blob.Parse(valPart[3])
1404 }
1405 size, err := strconv.ParseInt(valPart[0], 10, 64)
1406 if err != nil {
1407 x.logf("bogus integer at position 0 in key %q = %q", ikey, iv)
1408 return camtypes.FileInfo{}, os.ErrNotExist
1409 }
1410 fileName := urld(valPart[1])
1411 fi := camtypes.FileInfo{
1412 Size: size,
1413 FileName: fileName,
1414 MIMEType: urld(valPart[2]),
1415 WholeRef: wholeRef,
1416 }
1417
1418 if tv != "" {
1419 times := strings.Split(urld(tv), ",")
1420 updateFileInfoTimes(&fi, times)
1421 }
1422
1423 return fi, nil
1424 }
1425
1426 func updateFileInfoTimes(fi *camtypes.FileInfo, times []string) {
1427 if len(times) == 0 {
1428 return
1429 }
1430 fi.Time = types.ParseTime3339OrNil(times[0])
1431 if len(times) == 2 {
1432 fi.ModTime = types.ParseTime3339OrNil(times[1])
1433 }
1434 }
1435
1436
1437 func kvImageInfo(v []byte) (ii camtypes.ImageInfo, ok bool) {
1438 pipei := bytes.IndexByte(v, '|')
1439 if pipei < 0 {
1440 return
1441 }
1442 w, err := strutil.ParseUintBytes(v[:pipei], 10, 16)
1443 if err != nil {
1444 return
1445 }
1446 h, err := strutil.ParseUintBytes(v[pipei+1:], 10, 16)
1447 if err != nil {
1448 return
1449 }
1450 ii.Width = uint16(w)
1451 ii.Height = uint16(h)
1452 return ii, true
1453 }
1454
1455 func (x *Index) GetImageInfo(ctx context.Context, fileRef blob.Ref) (camtypes.ImageInfo, error) {
1456 if x.corpus != nil {
1457 return x.corpus.GetImageInfo(ctx, fileRef)
1458 }
1459
1460
1461 key := keyImageSize.Key(fileRef.String())
1462 v, err := x.s.Get(key)
1463 if err == sorted.ErrNotFound {
1464 err = os.ErrNotExist
1465 }
1466 if err != nil {
1467 return camtypes.ImageInfo{}, err
1468 }
1469 ii, ok := kvImageInfo([]byte(v))
1470 if !ok {
1471 return camtypes.ImageInfo{}, fmt.Errorf("index: bogus key %q = %q", key, v)
1472 }
1473 return ii, nil
1474 }
1475
1476 func (x *Index) GetMediaTags(ctx context.Context, fileRef blob.Ref) (tags map[string]string, err error) {
1477 if x.corpus != nil {
1478 return x.corpus.GetMediaTags(ctx, fileRef)
1479 }
1480 fi, err := x.GetFileInfo(ctx, fileRef)
1481 if err != nil {
1482 return nil, err
1483 }
1484 it := x.queryPrefix(keyMediaTag, fi.WholeRef.String())
1485 defer closeIterator(it, &err)
1486 for it.Next() {
1487 if tags == nil {
1488 tags = make(map[string]string)
1489 }
1490 tags[it.Key()] = it.Value()
1491 }
1492 return tags, nil
1493 }
1494
1495 func (x *Index) GetFileLocation(ctx context.Context, fileRef blob.Ref) (camtypes.Location, error) {
1496 if x.corpus != nil {
1497 lat, long, ok := x.corpus.FileLatLong(fileRef)
1498 if !ok {
1499 return camtypes.Location{}, os.ErrNotExist
1500 }
1501
1502 if math.IsNaN(long) || math.IsNaN(lat) {
1503 return camtypes.Location{}, fmt.Errorf("latitude or longitude in corpus for %v is NaN. Reindex to fix it", fileRef)
1504 }
1505 return camtypes.Location{Latitude: lat, Longitude: long}, nil
1506 }
1507 fi, err := x.GetFileInfo(ctx, fileRef)
1508 if err != nil {
1509 return camtypes.Location{}, err
1510 }
1511 it := x.queryPrefixString(keyEXIFGPS.Key(fi.WholeRef.String()))
1512 defer closeIterator(it, &err)
1513 if !it.Next() {
1514 return camtypes.Location{}, os.ErrNotExist
1515 }
1516
1517 var lat, long float64
1518 key, v := it.Key(), it.Value()
1519 pipe := strings.Index(v, "|")
1520 if pipe < 0 {
1521 return camtypes.Location{}, fmt.Errorf("index: bogus key %q = %q", key, v)
1522 }
1523 lat, err = strconv.ParseFloat(v[:pipe], 64)
1524 if err != nil {
1525 return camtypes.Location{}, fmt.Errorf("index: bogus value at position 0 in key %q = %q", key, v)
1526 }
1527 long, err = strconv.ParseFloat(v[pipe+1:], 64)
1528 if err != nil {
1529 return camtypes.Location{}, fmt.Errorf("index: bogus value at position 1 in key %q = %q", key, v)
1530 }
1531 if math.IsNaN(long) || math.IsNaN(lat) {
1532 return camtypes.Location{}, fmt.Errorf("latitude or longitude in index for %v is NaN. Reindex to fix it", fileRef)
1533 }
1534 return camtypes.Location{Latitude: lat, Longitude: long}, nil
1535 }
1536
1537 func (x *Index) EdgesTo(ref blob.Ref, opts *camtypes.EdgesToOpts) (edges []*camtypes.Edge, err error) {
1538 it := x.queryPrefix(keyEdgeBackward, ref)
1539 defer closeIterator(it, &err)
1540 permanodeParents := make(map[string]*camtypes.Edge)
1541 for it.Next() {
1542 edge, ok := kvEdgeBackward(it.Key(), it.Value())
1543 if !ok {
1544 continue
1545 }
1546 if x.IsDeleted(edge.From) {
1547 continue
1548 }
1549 if x.IsDeleted(edge.BlobRef) {
1550 continue
1551 }
1552 edge.To = ref
1553 if edge.FromType == schema.TypePermanode {
1554 permanodeParents[edge.From.String()] = edge
1555 } else {
1556 edges = append(edges, edge)
1557 }
1558 }
1559 for _, e := range permanodeParents {
1560 edges = append(edges, e)
1561 }
1562 return edges, nil
1563 }
1564
1565 func kvEdgeBackward(k, v string) (edge *camtypes.Edge, ok bool) {
1566
1567 keyPart := strings.Split(k, "|")
1568 valPart := strings.Split(v, "|")
1569 if len(keyPart) != 4 || len(valPart) != 2 {
1570
1571 log.Printf("bogus keyEdgeBackward index entry: %q = %q", k, v)
1572 return
1573 }
1574 if keyPart[0] != "edgeback" {
1575 return
1576 }
1577 parentRef, ok := blob.Parse(keyPart[2])
1578 if !ok {
1579 log.Printf("bogus parent in keyEdgeBackward index entry: %q", keyPart[2])
1580 return
1581 }
1582 blobRef, ok := blob.Parse(keyPart[3])
1583 if !ok {
1584 log.Printf("bogus blobref in keyEdgeBackward index entry: %q", keyPart[3])
1585 return
1586 }
1587 return &camtypes.Edge{
1588 From: parentRef,
1589 FromType: schema.CamliType(valPart[0]),
1590 FromTitle: valPart[1],
1591 BlobRef: blobRef,
1592 }, true
1593 }
1594
1595
1596 func (x *Index) GetDirMembers(ctx context.Context, dir blob.Ref, dest chan<- blob.Ref, limit int) (err error) {
1597 defer close(dest)
1598
1599 sent := 0
1600 if x.corpus != nil {
1601 children, err := x.corpus.GetDirChildren(ctx, dir)
1602 if err != nil {
1603 return err
1604 }
1605 for child := range children {
1606 dest <- child
1607 sent++
1608 if sent == limit {
1609 break
1610 }
1611 }
1612 return nil
1613 }
1614
1615 it := x.queryPrefix(keyStaticDirChild, dir.String())
1616 defer closeIterator(it, &err)
1617 for it.Next() {
1618 keyPart := strings.Split(it.Key(), "|")
1619 if len(keyPart) != 3 {
1620 return fmt.Errorf("index: bogus key keyStaticDirChild = %q", it.Key())
1621 }
1622
1623 child, ok := blob.Parse(keyPart[2])
1624 if !ok {
1625 continue
1626 }
1627 dest <- child
1628 sent++
1629 if sent == limit {
1630 break
1631 }
1632 }
1633 return nil
1634 }
1635
1636 func kvBlobMeta(k, v string) (bm camtypes.BlobMeta, ok bool) {
1637 refStr := k[len("meta:"):]
1638 br, ok := blob.Parse(refStr)
1639 if !ok {
1640 return
1641 }
1642 pipe := strings.Index(v, "|")
1643 if pipe < 0 {
1644 return
1645 }
1646 size, err := strconv.ParseUint(v[:pipe], 10, 32)
1647 if err != nil {
1648 return
1649 }
1650 return camtypes.BlobMeta{
1651 Ref: br,
1652 Size: uint32(size),
1653 CamliType: camliTypeFromMIME(v[pipe+1:]),
1654 }, true
1655 }
1656
1657 func kvBlobMeta_bytes(k, v []byte) (bm camtypes.BlobMeta, ok bool) {
1658 ref := k[len("meta:"):]
1659 br, ok := blob.ParseBytes(ref)
1660 if !ok {
1661 return
1662 }
1663 pipe := bytes.IndexByte(v, '|')
1664 if pipe < 0 {
1665 return
1666 }
1667 size, err := strutil.ParseUintBytes(v[:pipe], 10, 32)
1668 if err != nil {
1669 return
1670 }
1671 return camtypes.BlobMeta{
1672 Ref: br,
1673 Size: uint32(size),
1674 CamliType: camliTypeFromMIME_bytes(v[pipe+1:]),
1675 }, true
1676 }
1677
1678 func enumerateBlobMeta(s sorted.KeyValue, cb func(camtypes.BlobMeta) error) (err error) {
1679 it := queryPrefixString(s, "meta:")
1680 defer closeIterator(it, &err)
1681 for it.Next() {
1682 bm, ok := kvBlobMeta(it.Key(), it.Value())
1683 if !ok {
1684 continue
1685 }
1686 if err := cb(bm); err != nil {
1687 return err
1688 }
1689 }
1690 return nil
1691 }
1692
1693 var errStopIteration = errors.New("stop iteration")
1694
1695
1696
1697
1698 func (x *Index) EnumerateBlobMeta(ctx context.Context, fn func(camtypes.BlobMeta) bool) error {
1699 if x.corpus != nil {
1700 var err error
1701 var n int
1702 done := ctx.Done()
1703 x.corpus.EnumerateBlobMeta(func(m camtypes.BlobMeta) bool {
1704
1705 n++
1706 if n%256 == 0 {
1707 select {
1708 case <-done:
1709 err = ctx.Err()
1710 return false
1711 default:
1712
1713 }
1714 }
1715 return fn(m)
1716 })
1717 return err
1718 }
1719 done := ctx.Done()
1720 err := enumerateBlobMeta(x.s, func(bm camtypes.BlobMeta) error {
1721 select {
1722 case <-done:
1723 return ctx.Err()
1724 default:
1725 if !fn(bm) {
1726 return errStopIteration
1727 }
1728 return nil
1729 }
1730 })
1731 if err == errStopIteration {
1732 err = nil
1733 }
1734 return err
1735 }
1736
1737
1738 func (x *Index) Storage() sorted.KeyValue { return x.s }
1739
1740
1741
1742
1743 func (x *Index) Close() error {
1744 if cl, ok := x.s.(io.Closer); ok {
1745 return cl.Close()
1746 }
1747 return nil
1748 }
1749
1750
1751 func (x *Index) initNeededMaps() (err error) {
1752 x.deletes = newDeletionCache()
1753 it := x.queryPrefix(keyMissing)
1754 defer closeIterator(it, &err)
1755 for it.Next() {
1756 key := it.KeyBytes()
1757 pair := key[len("missing|"):]
1758 pipe := bytes.IndexByte(pair, '|')
1759 if pipe < 0 {
1760 return fmt.Errorf("Bogus missing key %q", key)
1761 }
1762 have, ok1 := blob.ParseBytes(pair[:pipe])
1763 missing, ok2 := blob.ParseBytes(pair[pipe+1:])
1764 if !ok1 || !ok2 {
1765 return fmt.Errorf("Bogus missing key %q", key)
1766 }
1767 x.noteNeededMemory(have, missing)
1768 }
1769 return
1770 }
1771
1772 func (x *Index) noteNeeded(have, missing blob.Ref) error {
1773 if err := x.s.Set(keyMissing.Key(have, missing), "1"); err != nil {
1774 return err
1775 }
1776 x.noteNeededMemory(have, missing)
1777 return nil
1778 }
1779
1780 func (x *Index) noteNeededMemory(have, missing blob.Ref) {
1781 x.needs[have] = append(x.needs[have], missing)
1782 x.neededBy[missing] = append(x.neededBy[missing], have)
1783 }
1784
1785 const camliTypeMIMEPrefix = "application/json; camliType="
1786
1787 var camliTypeMIMEPrefixBytes = []byte(camliTypeMIMEPrefix)
1788
1789
1790
1791 func camliTypeFromMIME(mime string) schema.CamliType {
1792 if v := strings.TrimPrefix(mime, camliTypeMIMEPrefix); v != mime {
1793 return schema.CamliType(v)
1794 }
1795 return ""
1796 }
1797
1798 func camliTypeFromMIME_bytes(mime []byte) schema.CamliType {
1799 if v := bytes.TrimPrefix(mime, camliTypeMIMEPrefixBytes); len(v) != len(mime) {
1800 return schema.CamliType(strutil.StringFromBytes(v))
1801 }
1802 return ""
1803 }
1804
1805
1806
1807 func IsIndexedAttribute(attr string) bool {
1808 switch attr {
1809 case "camliRoot", "camliImportRoot", "tag", "title":
1810 return true
1811 }
1812 return false
1813 }
1814
1815
1816
1817
1818
1819 func IsBlobReferenceAttribute(attr string) bool {
1820 switch attr {
1821 case "camliMember":
1822 return true
1823 }
1824 return false
1825 }
1826
1827 func IsFulltextAttribute(attr string) bool {
1828 switch attr {
1829 case "tag", "title":
1830 return true
1831 }
1832 return false
1833 }