1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package index
18
19 import (
20 "bytes"
21 "context"
22 "errors"
23 "fmt"
24 "io"
25 "log"
26 "math"
27 "os"
28 "runtime"
29 "sort"
30 "strconv"
31 "strings"
32 "sync"
33 "time"
34
35 "perkeep.org/pkg/blob"
36 "perkeep.org/pkg/blobserver"
37 "perkeep.org/pkg/env"
38 "perkeep.org/pkg/schema"
39 "perkeep.org/pkg/sorted"
40 "perkeep.org/pkg/types/camtypes"
41
42 "go4.org/jsonconfig"
43 "go4.org/strutil"
44 "go4.org/types"
45 )
46
47 func init() {
48 blobserver.RegisterStorageConstructor("index", newFromConfig)
49 }
50
51 type Index struct {
52 *blobserver.NoImplStorage
53 reindex bool
54 keepGoing bool
55 s sorted.KeyValue
56
57 KeyFetcher blob.Fetcher
58
59
60
61
62
63
64 deletes *deletionCache
65
66 corpus *Corpus
67
68 mu sync.RWMutex
69
70
71
72
73 needs map[blob.Ref][]blob.Ref
74
75
76 neededBy map[blob.Ref][]blob.Ref
77 readyReindex map[blob.Ref]bool
78
79
80 reindexWg sync.WaitGroup
81
82
83
84
85 blobSource blobserver.FetcherEnumerator
86
87 hasWiped bool
88 }
89
90 func (x *Index) Lock() { x.mu.Lock() }
91 func (x *Index) Unlock() { x.mu.Unlock() }
92 func (x *Index) RLock() { x.mu.RLock() }
93 func (x *Index) RUnlock() { x.mu.RUnlock() }
94
95 var (
96 _ blobserver.Storage = (*Index)(nil)
97 _ Interface = (*Index)(nil)
98 )
99
100 func (x *Index) logf(format string, args ...interface{}) {
101 log.Printf("index: "+format, args...)
102 }
103
104 var aboutToReindex = false
105
106
107
108
109
110 type SignerRefSet []string
111
112
113
114 type Owner struct {
115 keyID []string
116
117
118 blobByKeyID map[string]SignerRefSet
119 }
120
121
122 func NewOwner(keyID string, ref blob.Ref) *Owner {
123 return &Owner{
124 keyID: []string{keyID},
125 blobByKeyID: map[string]SignerRefSet{keyID: {ref.String()}},
126 }
127 }
128
129
130
131 func (o *Owner) KeyID() string {
132 if o == nil || len(o.keyID) == 0 {
133 return ""
134 }
135 return o.keyID[0]
136 }
137
138
139 func (o *Owner) RefSet(keyID string) SignerRefSet {
140 if o == nil || len(o.blobByKeyID) == 0 {
141 return nil
142 }
143 refs := o.blobByKeyID[keyID]
144 if len(refs) == 0 {
145 return nil
146 }
147 return refs
148 }
149
150
151
152
153 func (o *Owner) BlobRef() blob.Ref {
154 if o == nil || len(o.blobByKeyID) == 0 {
155 return blob.Ref{}
156 }
157 refs := o.blobByKeyID[o.KeyID()]
158 if len(refs) == 0 {
159 return blob.Ref{}
160 }
161 ref, ok := blob.Parse(refs[0])
162 if !ok {
163 return blob.Ref{}
164 }
165 return ref
166 }
167
168
169
170
171
172
173
174
175
176 func (x *Index) InitBlobSource(blobSource blobserver.FetcherEnumerator) {
177 x.Lock()
178 defer x.Unlock()
179 if x.blobSource != nil {
180 panic("blobSource of Index already set")
181 }
182 x.blobSource = blobSource
183 if x.KeyFetcher == nil {
184 x.KeyFetcher = blobSource
185 }
186 }
187
188
189 func New(s sorted.KeyValue) (*Index, error) {
190 idx := &Index{
191 s: s,
192 needs: make(map[blob.Ref][]blob.Ref),
193 neededBy: make(map[blob.Ref][]blob.Ref),
194 readyReindex: make(map[blob.Ref]bool),
195 }
196 if aboutToReindex {
197 idx.deletes = newDeletionCache()
198 return idx, nil
199 }
200
201 schemaVersion := idx.schemaVersion()
202 switch {
203 case schemaVersion == 0 && idx.isEmpty():
204
205 err := idx.s.Set(keySchemaVersion.name, fmt.Sprint(requiredSchemaVersion))
206 if err != nil {
207 return nil, fmt.Errorf("Could not write index schema version %q: %w", requiredSchemaVersion, err)
208 }
209 case schemaVersion != requiredSchemaVersion:
210 tip := ""
211 if env.IsDev() {
212
213
214 tip = `(For the dev server, run "devcam server --wipe" to wipe both your blobs and index)`
215 } else {
216 if is4To5SchemaBump(schemaVersion) {
217 return idx, errMissingWholeRef
218 }
219 tip = "Run 'perkeepd --reindex' (it might take awhile, but shows status). Alternative: 'camtool dbinit' (or just delete the file for a file based index), and then 'camtool sync --all'"
220 }
221 return nil, fmt.Errorf("index schema version is %d; required one is %d. You need to reindex. %s",
222 schemaVersion, requiredSchemaVersion, tip)
223 }
224 if err := idx.initDeletesCache(); err != nil {
225 return nil, fmt.Errorf("Could not initialize index's deletes cache: %w", err)
226 }
227 if err := idx.initNeededMaps(); err != nil {
228 return nil, fmt.Errorf("Could not initialize index's missing blob maps: %w", err)
229 }
230 return idx, nil
231 }
232
233 func is4To5SchemaBump(schemaVersion int) bool {
234 return schemaVersion == 4 && requiredSchemaVersion == 5
235 }
236
237 var errMissingWholeRef = errors.New("missing wholeRef field in fileInfo rows")
238
239
240
241 func (x *Index) fixMissingWholeRef(fetcher blob.Fetcher) (err error) {
242
243
244 if x.schemaVersion() != 4 || requiredSchemaVersion != 5 {
245 panic("fixMissingWholeRef should only be used when upgrading from v4 to v5 of the index schema")
246 }
247 x.logf("fixing the missing wholeRef in the fileInfo rows...")
248 defer func() {
249 if err != nil {
250 x.logf("fixing the fileInfo rows failed: %v", err)
251 return
252 }
253 x.logf("successfully fixed wholeRef in FileInfo rows.")
254 }()
255
256
257 fileRefToWholeRef := make(map[blob.Ref]blob.Ref)
258 it := x.queryPrefix(keyWholeToFileRef)
259 var keyA [3]string
260 for it.Next() {
261 keyPart := strutil.AppendSplitN(keyA[:0], it.Key(), "|", 3)
262 if len(keyPart) != 3 {
263 return fmt.Errorf("bogus keyWholeToFileRef key: got %q, wanted \"wholetofile|wholeRef|fileRef\"", it.Key())
264 }
265 wholeRef, ok1 := blob.Parse(keyPart[1])
266 fileRef, ok2 := blob.Parse(keyPart[2])
267 if !ok1 || !ok2 {
268 return fmt.Errorf("bogus part in keyWholeToFileRef key: %q", it.Key())
269 }
270 fileRefToWholeRef[fileRef] = wholeRef
271 }
272 if err := it.Close(); err != nil {
273 return err
274 }
275
276 var fixedEntries, missedEntries int
277 t := time.NewTicker(5 * time.Second)
278 defer t.Stop()
279
280
281
282 mutations := make(map[string]string)
283 keyPrefix := keyFileInfo.name + "|"
284 it = x.queryPrefix(keyFileInfo)
285 defer it.Close()
286 var valA [3]string
287 for it.Next() {
288 select {
289 case <-t.C:
290 x.logf("recorded %d missing wholeRef that we'll try to fix, and %d that we can't fix.", fixedEntries, missedEntries)
291 default:
292 }
293 br, ok := blob.ParseBytes(it.KeyBytes()[len(keyPrefix):])
294 if !ok {
295 return fmt.Errorf("invalid blobRef %q", it.KeyBytes()[len(keyPrefix):])
296 }
297 wholeRef, ok := fileRefToWholeRef[br]
298 if !ok {
299 missedEntries++
300 x.logf("WARNING: wholeRef for %v not found in index. You should probably rebuild the whole index.", br)
301 continue
302 }
303 valPart := strutil.AppendSplitN(valA[:0], it.Value(), "|", 3)
304
305 if len(valPart) != 3 {
306 return fmt.Errorf("bogus keyFileInfo value: got %q, wanted \"size|filename|mimetype\"", it.Value())
307 }
308 size_s, filename, mimetype := valPart[0], valPart[1], urld(valPart[2])
309 if strings.Contains(mimetype, "|") {
310
311
312
313
314
315
316 x.logf("%v: %v already has a wholeRef, not fixing it", it.Key(), it.Value())
317 continue
318 }
319 size, err := strconv.Atoi(size_s)
320 if err != nil {
321 return fmt.Errorf("bogus size in keyFileInfo value %v: %w", it.Value(), err)
322 }
323 mutations[keyFileInfo.Key(br)] = keyFileInfo.Val(size, filename, mimetype, wholeRef)
324 fixedEntries++
325 }
326 if err := it.Close(); err != nil {
327 return err
328 }
329 x.logf("starting to commit the missing wholeRef fixes (%d entries) now, this can take a while.", fixedEntries)
330 bm := x.s.BeginBatch()
331 for k, v := range mutations {
332 bm.Set(k, v)
333 }
334 bm.Set(keySchemaVersion.name, "5")
335 if err := x.s.CommitBatch(bm); err != nil {
336 return err
337 }
338 if missedEntries > 0 {
339 x.logf("some missing wholeRef entries were not fixed (%d), you should do a full reindex.", missedEntries)
340 }
341 return nil
342 }
343
344 func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) {
345 blobPrefix := config.RequiredString("blobSource")
346 kvConfig := config.RequiredObject("storage")
347 reindex := config.OptionalBool("reindex", false)
348 keepGoing := config.OptionalBool("keepGoing", false)
349
350 if err := config.Validate(); err != nil {
351 return nil, err
352 }
353
354 kv, err := sorted.NewKeyValue(kvConfig)
355 if err != nil {
356 var nwe sorted.NeedWipeError
357 if !errors.As(err, &nwe) {
358 return nil, err
359 }
360 if !reindex {
361 return nil, err
362 }
363 }
364 if reindex {
365 aboutToReindex = true
366 wiper, ok := kv.(sorted.Wiper)
367 if !ok {
368 return nil, fmt.Errorf("index's storage type %T doesn't support sorted.Wiper", kv)
369 }
370 if err := wiper.Wipe(); err != nil {
371 return nil, fmt.Errorf("error wiping index's sorted key/value type %T: %w", kv, err)
372 }
373 log.Printf("Index wiped.")
374 }
375
376 sto, err := ld.GetStorage(blobPrefix)
377 if err != nil {
378 return nil, err
379 }
380
381 ix, err := New(kv)
382
383
384
385 if errors.Is(err, errMissingWholeRef) {
386
387
388 if err := ix.fixMissingWholeRef(sto); err != nil {
389 ix.Close()
390 return nil, fmt.Errorf("could not fix missing wholeRef entries: %w", err)
391 }
392 ix, err = New(kv)
393 }
394 ix.keepGoing = keepGoing
395 ix.reindex = reindex
396 if reindex {
397 ix.hasWiped = true
398 }
399 if err != nil {
400 return nil, err
401 }
402 ix.InitBlobSource(sto)
403
404 if !reindex {
405 if err := ix.integrityCheck(3 * time.Second); err != nil {
406 return nil, err
407 }
408 }
409
410 return ix, err
411 }
412
413 func (x *Index) String() string {
414 return fmt.Sprintf("Perkeep index, using key/value implementation %T", x.s)
415 }
416
417 func (x *Index) isEmpty() bool {
418 iter := x.s.Find("", "")
419 hasRows := iter.Next()
420 if err := iter.Close(); err != nil {
421 panic(err)
422 }
423 return !hasRows
424 }
425
426
427 var reindexMaxProcs = struct {
428 sync.RWMutex
429 v int
430 }{v: runtime.NumCPU()}
431
432
433
434 func SetReindexMaxProcs(n int) {
435 reindexMaxProcs.Lock()
436 defer reindexMaxProcs.Unlock()
437 reindexMaxProcs.v = n
438 }
439
440
441
442 func ReindexMaxProcs() int {
443 reindexMaxProcs.RLock()
444 defer reindexMaxProcs.RUnlock()
445 return reindexMaxProcs.v
446 }
447
448 func (x *Index) WantsReindex() bool { return x.reindex }
449 func (x *Index) WantsKeepGoing() bool { return x.keepGoing }
450
451 func (x *Index) Reindex() error {
452 x.Lock()
453 if x.blobSource == nil {
454 x.Unlock()
455 return errors.New("index: can't re-index: no blobSource")
456 }
457 x.Unlock()
458
459 reindexMaxProcs.RLock()
460 defer reindexMaxProcs.RUnlock()
461
462 ctx := context.Background()
463
464 if !x.hasWiped {
465 wiper, ok := x.s.(sorted.Wiper)
466 if !ok {
467 return fmt.Errorf("index's storage type %T doesn't support sorted.Wiper", x.s)
468 }
469 log.Printf("Wiping index storage type %T ...", x.s)
470 if err := wiper.Wipe(); err != nil {
471 return fmt.Errorf("error wiping index's sorted key/value type %T: %w", x.s, err)
472 }
473 log.Printf("Index wiped.")
474 }
475 log.Printf("Rebuilding index...")
476
477 reindexStart, _ := blob.Parse(os.Getenv("CAMLI_REINDEX_START"))
478
479 err := x.s.Set(keySchemaVersion.name, fmt.Sprintf("%d", requiredSchemaVersion))
480 if err != nil {
481 return err
482 }
483
484 var nerrmu sync.Mutex
485 nerr := 0
486
487 blobc := make(chan blob.Ref, 32)
488
489 enumCtx := context.Background()
490 enumErr := make(chan error, 1)
491 go func() {
492 defer close(blobc)
493 donec := enumCtx.Done()
494 var lastTick time.Time
495 enumErr <- blobserver.EnumerateAll(enumCtx, x.blobSource, func(sb blob.SizedRef) error {
496 now := time.Now()
497 if lastTick.Before(now.Add(-1 * time.Second)) {
498 log.Printf("Reindexing at %v", sb.Ref)
499 lastTick = now
500 }
501 if reindexStart.Valid() && sb.Ref.Less(reindexStart) {
502 return nil
503 }
504 select {
505 case <-donec:
506 return ctx.Err()
507 case blobc <- sb.Ref:
508 return nil
509 }
510 })
511 }()
512 var wg sync.WaitGroup
513 for i := 0; i < reindexMaxProcs.v; i++ {
514 wg.Add(1)
515 go func() {
516 defer wg.Done()
517 for br := range blobc {
518 if err := x.indexBlob(ctx, br); err != nil {
519 log.Printf("Error reindexing %v: %v", br, err)
520 nerrmu.Lock()
521 nerr++
522 nerrmu.Unlock()
523
524
525 }
526 }
527 }()
528 }
529 if err := <-enumErr; err != nil {
530 return err
531 }
532
533 wg.Wait()
534 x.reindexWg.Wait()
535
536 x.RLock()
537 readyCount := len(x.readyReindex)
538 needed := len(x.needs)
539 var needSample bytes.Buffer
540 if needed > 0 {
541 n := 0
542 Sample:
543 for x, needs := range x.needs {
544 for _, need := range needs {
545 n++
546 if n == 10 {
547 break Sample
548 }
549 if n > 1 {
550 fmt.Fprintf(&needSample, ", ")
551 }
552 fmt.Fprintf(&needSample, "%v needs %v", x, need)
553 }
554 }
555 }
556 x.RUnlock()
557 if readyCount > 0 {
558 return fmt.Errorf("%d blobs were ready to reindex in out-of-order queue, but not yet ran", readyCount)
559 }
560 if needed > 0 {
561 return fmt.Errorf("%d blobs are still needed as dependencies; a sample: %s", needed, needSample.Bytes())
562 }
563
564 nerrmu.Lock()
565 if nerr != 0 {
566 return fmt.Errorf("%d blobs failed to re-index", nerr)
567 }
568 if err := x.initDeletesCache(); err != nil {
569 return err
570 }
571 log.Printf("Index rebuild complete.")
572 return nil
573 }
574
575
576
577
578
579 func (x *Index) integrityCheck(timeout time.Duration) error {
580 t0 := time.Now()
581 x.logf("starting integrity check...")
582 defer func() {
583 x.logf("integrity check done (after %v)", time.Since(t0).Round(10*time.Millisecond))
584 }()
585 if x.blobSource == nil {
586 return errors.New("index: can't check sanity of index: no blobSource")
587 }
588
589
590
591 seen := make([]blob.Ref, 0)
592 notFound := make([]blob.Ref, 0)
593 enumCtx := context.TODO()
594 stopTime := time.NewTimer(timeout)
595 defer stopTime.Stop()
596 var errEOT = errors.New("time's out")
597 if err := blobserver.EnumerateAll(enumCtx, x.blobSource, func(sb blob.SizedRef) error {
598 select {
599 case <-stopTime.C:
600 return errEOT
601 default:
602 }
603 if _, err := x.GetBlobMeta(enumCtx, sb.Ref); err != nil {
604 if !os.IsNotExist(err) {
605 return err
606 }
607 notFound = append(notFound, sb.Ref)
608 return nil
609 }
610 seen = append(seen, sb.Ref)
611 return nil
612 }); err != nil && err != errEOT {
613 return err
614 }
615 if len(notFound) > 0 {
616
617 x.logf("WARNING: sanity checking of the index found %d non-indexed blobs out of %d tested blobs. Reindexing is advised.", len(notFound), len(notFound)+len(seen))
618 }
619 return nil
620 }
621
622 func queryPrefixString(s sorted.KeyValue, prefix string) sorted.Iterator {
623 if prefix == "" {
624 return s.Find("", "")
625 }
626 lastByte := prefix[len(prefix)-1]
627 if lastByte == 0xff {
628 panic("unsupported query prefix ending in 0xff")
629 }
630 end := prefix[:len(prefix)-1] + string(lastByte+1)
631 return s.Find(prefix, end)
632 }
633
634 func (x *Index) queryPrefixString(prefix string) sorted.Iterator {
635 return queryPrefixString(x.s, prefix)
636 }
637
638 func queryPrefix(s sorted.KeyValue, key *keyType, args ...interface{}) sorted.Iterator {
639 return queryPrefixString(s, key.Prefix(args...))
640 }
641
642 func (x *Index) queryPrefix(key *keyType, args ...interface{}) sorted.Iterator {
643 return x.queryPrefixString(key.Prefix(args...))
644 }
645
646 func closeIterator(it sorted.Iterator, perr *error) {
647 err := it.Close()
648 if err != nil && *perr == nil {
649 *perr = err
650 }
651 }
652
653
654
655 func (x *Index) schemaVersion() int {
656 schemaVersionStr, err := x.s.Get(keySchemaVersion.name)
657 if err != nil {
658 if errors.Is(err, sorted.ErrNotFound) {
659 return 0
660 }
661 panic(fmt.Sprintf("Could not get index schema version: %v", err))
662 }
663 schemaVersion, err := strconv.Atoi(schemaVersionStr)
664 if err != nil {
665 panic(fmt.Sprintf("Bogus index schema version: %q", schemaVersionStr))
666 }
667 return schemaVersion
668 }
669
670 type deletion struct {
671 deleter blob.Ref
672 when time.Time
673 }
674
675 type byDeletionDate []deletion
676
677 func (d byDeletionDate) Len() int { return len(d) }
678 func (d byDeletionDate) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
679 func (d byDeletionDate) Less(i, j int) bool { return d[i].when.Before(d[j].when) }
680
681 type deletionCache struct {
682 sync.RWMutex
683 m map[blob.Ref][]deletion
684 }
685
686 func newDeletionCache() *deletionCache {
687 return &deletionCache{
688 m: make(map[blob.Ref][]deletion),
689 }
690 }
691
692
693
694 func (x *Index) initDeletesCache() (err error) {
695 x.deletes = newDeletionCache()
696 it := x.queryPrefix(keyDeleted)
697 defer closeIterator(it, &err)
698 for it.Next() {
699 cl, ok := kvDeleted(it.Key())
700 if !ok {
701 return fmt.Errorf("Bogus keyDeleted entry key: want |\"deleted\"|<deleted blobref>|<reverse claimdate>|<deleter claim>|, got %q", it.Key())
702 }
703 targetDeletions := append(x.deletes.m[cl.Target],
704 deletion{
705 deleter: cl.BlobRef,
706 when: cl.Date,
707 })
708 sort.Sort(sort.Reverse(byDeletionDate(targetDeletions)))
709 x.deletes.m[cl.Target] = targetDeletions
710 }
711 return err
712 }
713
714 func kvDeleted(k string) (c camtypes.Claim, ok bool) {
715
716 keyPart := strings.Split(k, "|")
717 if len(keyPart) != 4 {
718 return
719 }
720 if keyPart[0] != "deleted" {
721 return
722 }
723 target, ok := blob.Parse(keyPart[1])
724 if !ok {
725 return
726 }
727 claimRef, ok := blob.Parse(keyPart[3])
728 if !ok {
729 return
730 }
731 date, err := time.Parse(time.RFC3339, unreverseTimeString(keyPart[2]))
732 if err != nil {
733 return
734 }
735 return camtypes.Claim{
736 BlobRef: claimRef,
737 Target: target,
738 Date: date,
739 Type: string(schema.DeleteClaim),
740 }, true
741 }
742
743
744
745 func (x *Index) IsDeleted(br blob.Ref) bool {
746 if x.deletes == nil {
747
748
749 return x.isDeletedNoCache(br)
750 }
751 x.deletes.RLock()
752 defer x.deletes.RUnlock()
753 return x.isDeleted(br)
754 }
755
756
757 func (x *Index) isDeleted(br blob.Ref) bool {
758 deletes, ok := x.deletes.m[br]
759 if !ok {
760 return false
761 }
762 for _, v := range deletes {
763 if !x.isDeleted(v.deleter) {
764 return true
765 }
766 }
767 return false
768 }
769
770
771 func (x *Index) isDeletedNoCache(br blob.Ref) bool {
772 var err error
773 it := x.queryPrefix(keyDeleted, br)
774 for it.Next() {
775 cl, ok := kvDeleted(it.Key())
776 if !ok {
777 panic(fmt.Sprintf("Bogus keyDeleted entry key: want |\"deleted\"|<deleted blobref>|<reverse claimdate>|<deleter claim>|, got %q", it.Key()))
778 }
779 if !x.isDeletedNoCache(cl.BlobRef) {
780 closeIterator(it, &err)
781 if err != nil {
782
783 panic(fmt.Sprintf("Could not close iterator on keyDeleted: %v", err))
784 }
785 return true
786 }
787 }
788 closeIterator(it, &err)
789 if err != nil {
790
791 panic(fmt.Sprintf("Could not close iterator on keyDeleted: %v", err))
792 }
793 return false
794 }
795
796
797
798
799
800
801
802
803 func (x *Index) GetRecentPermanodes(ctx context.Context, dest chan<- camtypes.RecentPermanode, owner blob.Ref, limit int, before time.Time) (err error) {
804 defer close(dest)
805
806 keyId, err := x.KeyId(ctx, owner)
807 if errors.Is(err, sorted.ErrNotFound) {
808 x.logf("no recent permanodes because keyId for owner %v not found", owner)
809 return nil
810 }
811 if err != nil {
812 x.logf("error fetching keyId for owner %v: %v", owner, err)
813 return err
814 }
815
816 sent := 0
817 var seenPermanode dupSkipper
818
819 if before.IsZero() {
820 before = time.Now()
821 }
822
823 it := x.queryPrefix(keyRecentPermanode, keyId)
824 defer closeIterator(it, &err)
825 for it.Next() {
826 permaStr := it.Value()
827 parts := strings.SplitN(it.Key(), "|", 4)
828 if len(parts) != 4 {
829 continue
830 }
831 mTime, _ := time.Parse(time.RFC3339, unreverseTimeString(parts[2]))
832 permaRef, ok := blob.Parse(permaStr)
833 if !ok {
834 continue
835 }
836 if x.IsDeleted(permaRef) {
837 continue
838 }
839 if seenPermanode.Dup(permaStr) {
840 continue
841 }
842
843 if !mTime.Before(before) {
844 continue
845 }
846 dest <- camtypes.RecentPermanode{
847 Permanode: permaRef,
848 Signer: owner,
849 LastModTime: mTime,
850 }
851 sent++
852 if sent == limit {
853 break
854 }
855 }
856 return nil
857 }
858
859 func (x *Index) AppendClaims(ctx context.Context, dst []camtypes.Claim, permaNode blob.Ref,
860 signerFilter string,
861 attrFilter string) ([]camtypes.Claim, error) {
862 if x.corpus != nil {
863 return x.corpus.AppendClaims(ctx, dst, permaNode, signerFilter, attrFilter)
864 }
865 var (
866 err error
867 it sorted.Iterator
868 )
869 var signerRefs SignerRefSet
870 if signerFilter != "" {
871 signerRefs, err = x.signerRefs(ctx, signerFilter)
872 if err != nil {
873 return dst, err
874 }
875 if len(signerRefs) == 0 {
876 return dst, nil
877 }
878 it = x.queryPrefix(keyPermanodeClaim, permaNode, signerFilter)
879 } else {
880 it = x.queryPrefix(keyPermanodeClaim, permaNode)
881 }
882 defer closeIterator(it, &err)
883
884
885
886
887 var mustHave string
888 if attrFilter != "" && urle(attrFilter) == attrFilter {
889 mustHave = attrFilter
890 }
891
892 for it.Next() {
893 val := it.Value()
894 if mustHave != "" && !strings.Contains(val, mustHave) {
895 continue
896 }
897 cl, ok := kvClaim(it.Key(), val, blob.Parse)
898 if !ok {
899 continue
900 }
901 if x.IsDeleted(cl.BlobRef) {
902 continue
903 }
904 if attrFilter != "" && cl.Attr != attrFilter {
905 continue
906 }
907
908
909 if signerFilter != "" && !signerRefs.blobMatches(cl.Signer) {
910 continue
911 }
912 dst = append(dst, cl)
913 }
914 return dst, nil
915 }
916
917 func kvClaim(k, v string, blobParse func(string) (blob.Ref, bool)) (c camtypes.Claim, ok bool) {
918 const nKeyPart = 5
919 const nValPart = 4
920 var keya [nKeyPart]string
921 var vala [nValPart]string
922 keyPart := strutil.AppendSplitN(keya[:0], k, "|", -1)
923 valPart := strutil.AppendSplitN(vala[:0], v, "|", -1)
924 if len(keyPart) < nKeyPart || len(valPart) < nValPart {
925 return
926 }
927 signerRef, ok := blobParse(valPart[3])
928 if !ok {
929 return
930 }
931 permaNode, ok := blobParse(keyPart[1])
932 if !ok {
933 return
934 }
935 claimRef, ok := blobParse(keyPart[4])
936 if !ok {
937 return
938 }
939 date, err := time.Parse(time.RFC3339, keyPart[3])
940 if err != nil {
941 return
942 }
943 return camtypes.Claim{
944 BlobRef: claimRef,
945 Signer: signerRef,
946 Permanode: permaNode,
947 Date: date,
948 Type: urld(valPart[0]),
949 Attr: urld(valPart[1]),
950 Value: urld(valPart[2]),
951 }, true
952 }
953
954 func (x *Index) GetBlobMeta(ctx context.Context, br blob.Ref) (camtypes.BlobMeta, error) {
955 if x.corpus != nil {
956 return x.corpus.GetBlobMeta(ctx, br)
957 }
958 key := "meta:" + br.String()
959 meta, err := x.s.Get(key)
960 if errors.Is(err, sorted.ErrNotFound) {
961 err = os.ErrNotExist
962 }
963 if err != nil {
964 return camtypes.BlobMeta{}, err
965 }
966 pos := strings.Index(meta, "|")
967 if pos < 0 {
968 panic(fmt.Sprintf("Bogus index row for key %q: got value %q", key, meta))
969 }
970 size, err := strconv.ParseUint(meta[:pos], 10, 32)
971 if err != nil {
972 return camtypes.BlobMeta{}, err
973 }
974 mime := meta[pos+1:]
975 return camtypes.BlobMeta{
976 Ref: br,
977 Size: uint32(size),
978 CamliType: camliTypeFromMIME(mime),
979 }, nil
980 }
981
982
983 func (x *Index) HasLegacySHA1() (ok bool, err error) {
984 if x.corpus != nil {
985 return x.corpus.hasLegacySHA1, err
986 }
987 it := x.queryPrefix(keyWholeToFileRef, "sha1-")
988 defer closeIterator(it, &err)
989 for it.Next() {
990 return true, err
991 }
992 return false, err
993 }
994
995 func (x *Index) KeyId(ctx context.Context, signer blob.Ref) (string, error) {
996 if x.corpus != nil {
997 return x.corpus.KeyId(ctx, signer)
998 }
999 return x.s.Get("signerkeyid:" + signer.String())
1000 }
1001
1002
1003
1004 func (x *Index) signerRefs(ctx context.Context, keyID string) (SignerRefSet, error) {
1005 if x.corpus != nil {
1006 return x.corpus.signerRefs[keyID], nil
1007 }
1008 it := x.queryPrefixString(keySignerKeyID.name)
1009 var err error
1010 var refs SignerRefSet
1011 defer closeIterator(it, &err)
1012 prefix := keySignerKeyID.name + ":"
1013 for it.Next() {
1014 if it.Value() == keyID {
1015 refs = append(refs, strings.TrimPrefix(it.Key(), prefix))
1016 }
1017 }
1018 return refs, nil
1019 }
1020
1021 func (x *Index) PermanodeOfSignerAttrValue(ctx context.Context, signer blob.Ref, attr, val string) (permaNode blob.Ref, err error) {
1022 keyId, err := x.KeyId(ctx, signer)
1023 if errors.Is(err, sorted.ErrNotFound) {
1024 return blob.Ref{}, os.ErrNotExist
1025 }
1026 if err != nil {
1027 return blob.Ref{}, err
1028 }
1029 it := x.queryPrefix(keySignerAttrValue, keyId, attr, val)
1030 defer closeIterator(it, &err)
1031 for it.Next() {
1032 permaRef, ok := blob.Parse(it.Value())
1033 if ok && !x.IsDeleted(permaRef) {
1034 return permaRef, nil
1035 }
1036 }
1037 return blob.Ref{}, os.ErrNotExist
1038 }
1039
1040
1041
1042
1043 func (x *Index) SearchPermanodesWithAttr(ctx context.Context, dest chan<- blob.Ref, request *camtypes.PermanodeByAttrRequest) (err error) {
1044 defer close(dest)
1045 if request.FuzzyMatch {
1046
1047 return errors.New("TODO: SearchPermanodesWithAttr: generic indexer doesn't support FuzzyMatch on PermanodeByAttrRequest")
1048 }
1049 if request.Attribute == "" {
1050 return errors.New("index: missing Attribute in SearchPermanodesWithAttr")
1051 }
1052 if !IsIndexedAttribute(request.Attribute) {
1053 return fmt.Errorf("SearchPermanodesWithAttr: called with a non-indexed attribute %q", request.Attribute)
1054 }
1055
1056 keyId, err := x.KeyId(ctx, request.Signer)
1057 if errors.Is(err, sorted.ErrNotFound) {
1058 return nil
1059 }
1060 if err != nil {
1061 return err
1062 }
1063 seen := make(map[string]bool)
1064 var it sorted.Iterator
1065 if request.Query == "" {
1066 it = x.queryPrefix(keySignerAttrValue, keyId, request.Attribute)
1067 } else {
1068 it = x.queryPrefix(keySignerAttrValue, keyId, request.Attribute, request.Query)
1069 }
1070 defer closeIterator(it, &err)
1071 before := request.At
1072 if before.IsZero() {
1073 before = time.Now()
1074 }
1075 for it.Next() {
1076 cl, ok := kvSignerAttrValue(it.Key(), it.Value())
1077 if !ok {
1078 continue
1079 }
1080 if x.IsDeleted(cl.BlobRef) {
1081 continue
1082 }
1083 if x.IsDeleted(cl.Permanode) {
1084 continue
1085 }
1086 if cl.Date.After(before) {
1087 continue
1088 }
1089 pnstr := cl.Permanode.String()
1090 if seen[pnstr] {
1091 continue
1092 }
1093 seen[pnstr] = true
1094
1095 dest <- cl.Permanode
1096 if len(seen) == request.MaxResults {
1097 break
1098 }
1099 }
1100 return nil
1101 }
1102
1103 func kvSignerAttrValue(k, v string) (c camtypes.Claim, ok bool) {
1104
1105 keyPart := strings.Split(k, "|")
1106 valPart := strings.Split(v, "|")
1107 if len(keyPart) != 6 || len(valPart) != 1 {
1108
1109 log.Printf("bogus keySignerAttrValue index entry: %q = %q", k, v)
1110 return
1111 }
1112 if keyPart[0] != "signerattrvalue" {
1113 return
1114 }
1115 date, err := time.Parse(time.RFC3339, unreverseTimeString(keyPart[4]))
1116 if err != nil {
1117 log.Printf("bogus time in keySignerAttrValue index entry: %q", keyPart[4])
1118 return
1119 }
1120 claimRef, ok := blob.Parse(keyPart[5])
1121 if !ok {
1122 log.Printf("bogus claim in keySignerAttrValue index entry: %q", keyPart[5])
1123 return
1124 }
1125 permaNode, ok := blob.Parse(valPart[0])
1126 if !ok {
1127 log.Printf("bogus permanode in keySignerAttrValue index entry: %q", valPart[0])
1128 return
1129 }
1130 return camtypes.Claim{
1131 BlobRef: claimRef,
1132 Permanode: permaNode,
1133 Date: date,
1134 Attr: urld(keyPart[2]),
1135 Value: urld(keyPart[3]),
1136 }, true
1137 }
1138
1139 func (x *Index) PathsOfSignerTarget(ctx context.Context, signer, target blob.Ref) (paths []*camtypes.Path, err error) {
1140 paths = []*camtypes.Path{}
1141 keyId, err := x.KeyId(ctx, signer)
1142 if err != nil {
1143 if errors.Is(err, sorted.ErrNotFound) {
1144 err = nil
1145 }
1146 return
1147 }
1148
1149 mostRecent := make(map[string]*camtypes.Path)
1150 maxClaimDates := make(map[string]time.Time)
1151
1152 it := x.queryPrefix(keyPathBackward, keyId, target)
1153 defer closeIterator(it, &err)
1154 for it.Next() {
1155 p, ok, active := kvPathBackward(it.Key(), it.Value())
1156 if !ok {
1157 continue
1158 }
1159 if x.IsDeleted(p.Claim) {
1160 continue
1161 }
1162 if x.IsDeleted(p.Base) {
1163 continue
1164 }
1165
1166 key := p.Base.String() + "/" + p.Suffix
1167 if p.ClaimDate.After(maxClaimDates[key]) {
1168 maxClaimDates[key] = p.ClaimDate
1169 if active {
1170 mostRecent[key] = &p
1171 } else {
1172 delete(mostRecent, key)
1173 }
1174 }
1175 }
1176 for _, v := range mostRecent {
1177 paths = append(paths, v)
1178 }
1179 return paths, nil
1180 }
1181
1182 func kvPathBackward(k, v string) (p camtypes.Path, ok bool, active bool) {
1183
1184 keyPart := strings.Split(k, "|")
1185 valPart := strings.Split(v, "|")
1186 if len(keyPart) != 4 || len(valPart) != 4 {
1187
1188 log.Printf("bogus keyPathBackward index entry: %q = %q", k, v)
1189 return
1190 }
1191 if keyPart[0] != "signertargetpath" {
1192 return
1193 }
1194 target, ok := blob.Parse(keyPart[2])
1195 if !ok {
1196 log.Printf("bogus target in keyPathBackward index entry: %q", keyPart[2])
1197 return
1198 }
1199 claim, ok := blob.Parse(keyPart[3])
1200 if !ok {
1201 log.Printf("bogus claim in keyPathBackward index entry: %q", keyPart[3])
1202 return
1203 }
1204 date, err := time.Parse(time.RFC3339, valPart[0])
1205 if err != nil {
1206 log.Printf("bogus date in keyPathBackward index entry: %q", valPart[0])
1207 return
1208 }
1209 base, ok := blob.Parse(valPart[1])
1210 if !ok {
1211 log.Printf("bogus base in keyPathBackward index entry: %q", valPart[1])
1212 return
1213 }
1214 if valPart[2] == "Y" {
1215 active = true
1216 }
1217 return camtypes.Path{
1218 Claim: claim,
1219 Base: base,
1220 Target: target,
1221 ClaimDate: date,
1222 Suffix: urld(valPart[3]),
1223 }, true, active
1224 }
1225
1226 func (x *Index) PathsLookup(ctx context.Context, signer, base blob.Ref, suffix string) (paths []*camtypes.Path, err error) {
1227 paths = []*camtypes.Path{}
1228 keyId, err := x.KeyId(ctx, signer)
1229 if err != nil {
1230 if errors.Is(err, sorted.ErrNotFound) {
1231 err = nil
1232 }
1233 return
1234 }
1235
1236 it := x.queryPrefix(keyPathForward, keyId, base, suffix)
1237 defer closeIterator(it, &err)
1238 for it.Next() {
1239 p, ok, active := kvPathForward(it.Key(), it.Value())
1240 if !ok {
1241 continue
1242 }
1243 if x.IsDeleted(p.Claim) {
1244 continue
1245 }
1246 if x.IsDeleted(p.Target) {
1247 continue
1248 }
1249
1250
1251
1252
1253 _ = active
1254
1255 paths = append(paths, &p)
1256 }
1257 return
1258 }
1259
1260 func kvPathForward(k, v string) (p camtypes.Path, ok bool, active bool) {
1261
1262 keyPart := strings.Split(k, "|")
1263 valPart := strings.Split(v, "|")
1264 if len(keyPart) != 6 || len(valPart) != 2 {
1265
1266 log.Printf("bogus keyPathForward index entry: %q = %q", k, v)
1267 return
1268 }
1269 if keyPart[0] != "path" {
1270 return
1271 }
1272 base, ok := blob.Parse(keyPart[2])
1273 if !ok {
1274 log.Printf("bogus base in keyPathForward index entry: %q", keyPart[2])
1275 return
1276 }
1277 date, err := time.Parse(time.RFC3339, unreverseTimeString(keyPart[4]))
1278 if err != nil {
1279 log.Printf("bogus date in keyPathForward index entry: %q", keyPart[4])
1280 return
1281 }
1282 claim, ok := blob.Parse(keyPart[5])
1283 if !ok {
1284 log.Printf("bogus claim in keyPathForward index entry: %q", keyPart[5])
1285 return
1286 }
1287 if valPart[0] == "Y" {
1288 active = true
1289 }
1290 target, ok := blob.Parse(valPart[1])
1291 if !ok {
1292 log.Printf("bogus target in keyPathForward index entry: %q", valPart[1])
1293 return
1294 }
1295 return camtypes.Path{
1296 Claim: claim,
1297 Base: base,
1298 Target: target,
1299 ClaimDate: date,
1300 Suffix: urld(keyPart[3]),
1301 }, true, active
1302 }
1303
1304 func (x *Index) PathLookup(ctx context.Context, signer, base blob.Ref, suffix string, at time.Time) (*camtypes.Path, error) {
1305 paths, err := x.PathsLookup(ctx, signer, base, suffix)
1306 if err != nil {
1307 return nil, err
1308 }
1309 var (
1310 newest = int64(0)
1311 atSeconds = int64(0)
1312 best *camtypes.Path
1313 )
1314
1315 if !at.IsZero() {
1316 atSeconds = at.Unix()
1317 }
1318
1319 for _, path := range paths {
1320 t := path.ClaimDate
1321 secs := t.Unix()
1322 if atSeconds != 0 && secs > atSeconds {
1323
1324 continue
1325 }
1326 if newest > secs {
1327
1328 continue
1329 }
1330
1331 newest, best = secs, path
1332 }
1333 if best == nil {
1334 return nil, os.ErrNotExist
1335 }
1336 return best, nil
1337 }
1338
1339 func (x *Index) existingFileSchemas(wholeRef blob.Ref) (schemaRefs []blob.Ref, err error) {
1340 it := x.queryPrefix(keyWholeToFileRef, wholeRef)
1341 defer closeIterator(it, &err)
1342 for it.Next() {
1343 keyPart := strings.Split(it.Key(), "|")[1:]
1344 if len(keyPart) < 2 {
1345 continue
1346 }
1347 ref, ok := blob.Parse(keyPart[1])
1348 if ok {
1349 schemaRefs = append(schemaRefs, ref)
1350 }
1351 }
1352 return schemaRefs, nil
1353 }
1354
1355
1356 type WholeRefToFile map[string][]blob.Ref
1357
1358
1359 func (x *Index) ExistingFileSchemas(wholeRef ...blob.Ref) (WholeRefToFile, error) {
1360 schemaRefs := make(WholeRefToFile)
1361 for _, v := range wholeRef {
1362 newRefs, err := x.existingFileSchemas(v)
1363 if err != nil {
1364 return nil, err
1365 }
1366 schemaRefs[v.String()] = newRefs
1367 }
1368 return schemaRefs, nil
1369 }
1370
1371 func (x *Index) loadKey(key string, val *string, err *error, wg *sync.WaitGroup) {
1372 defer wg.Done()
1373 *val, *err = x.s.Get(key)
1374 }
1375
1376 func (x *Index) GetFileInfo(ctx context.Context, fileRef blob.Ref) (camtypes.FileInfo, error) {
1377 if x.corpus != nil {
1378 return x.corpus.GetFileInfo(ctx, fileRef)
1379 }
1380 ikey := "fileinfo|" + fileRef.String()
1381 tkey := keyFileTimes.name + "|" + fileRef.String()
1382
1383 wg := new(sync.WaitGroup)
1384 wg.Add(2)
1385 var iv, tv string
1386 var ierr, terr error
1387 go x.loadKey(ikey, &iv, &ierr, wg)
1388 go x.loadKey(tkey, &tv, &terr, wg)
1389 wg.Wait()
1390
1391 if errors.Is(ierr, sorted.ErrNotFound) {
1392 return camtypes.FileInfo{}, os.ErrNotExist
1393 }
1394 if ierr != nil {
1395 return camtypes.FileInfo{}, ierr
1396 }
1397 valPart := strings.Split(iv, "|")
1398 if len(valPart) < 3 {
1399 x.logf("bogus key %q = %q", ikey, iv)
1400 return camtypes.FileInfo{}, os.ErrNotExist
1401 }
1402 var wholeRef blob.Ref
1403 if len(valPart) >= 4 {
1404 wholeRef, _ = blob.Parse(valPart[3])
1405 }
1406 size, err := strconv.ParseInt(valPart[0], 10, 64)
1407 if err != nil {
1408 x.logf("bogus integer at position 0 in key %q = %q", ikey, iv)
1409 return camtypes.FileInfo{}, os.ErrNotExist
1410 }
1411 fileName := urld(valPart[1])
1412 fi := camtypes.FileInfo{
1413 Size: size,
1414 FileName: fileName,
1415 MIMEType: urld(valPart[2]),
1416 WholeRef: wholeRef,
1417 }
1418
1419 if tv != "" {
1420 times := strings.Split(urld(tv), ",")
1421 updateFileInfoTimes(&fi, times)
1422 }
1423
1424 return fi, nil
1425 }
1426
1427 func updateFileInfoTimes(fi *camtypes.FileInfo, times []string) {
1428 if len(times) == 0 {
1429 return
1430 }
1431 fi.Time = types.ParseTime3339OrNil(times[0])
1432 if len(times) == 2 {
1433 fi.ModTime = types.ParseTime3339OrNil(times[1])
1434 }
1435 }
1436
1437
1438 func kvImageInfo(v []byte) (ii camtypes.ImageInfo, ok bool) {
1439 pipei := bytes.IndexByte(v, '|')
1440 if pipei < 0 {
1441 return
1442 }
1443 w, err := strutil.ParseUintBytes(v[:pipei], 10, 16)
1444 if err != nil {
1445 return
1446 }
1447 h, err := strutil.ParseUintBytes(v[pipei+1:], 10, 16)
1448 if err != nil {
1449 return
1450 }
1451 ii.Width = uint16(w)
1452 ii.Height = uint16(h)
1453 return ii, true
1454 }
1455
1456 func (x *Index) GetImageInfo(ctx context.Context, fileRef blob.Ref) (camtypes.ImageInfo, error) {
1457 if x.corpus != nil {
1458 return x.corpus.GetImageInfo(ctx, fileRef)
1459 }
1460
1461
1462 key := keyImageSize.Key(fileRef.String())
1463 v, err := x.s.Get(key)
1464 if errors.Is(err, sorted.ErrNotFound) {
1465 err = os.ErrNotExist
1466 }
1467 if err != nil {
1468 return camtypes.ImageInfo{}, err
1469 }
1470 ii, ok := kvImageInfo([]byte(v))
1471 if !ok {
1472 return camtypes.ImageInfo{}, fmt.Errorf("index: bogus key %q = %q", key, v)
1473 }
1474 return ii, nil
1475 }
1476
1477 func (x *Index) GetMediaTags(ctx context.Context, fileRef blob.Ref) (tags map[string]string, err error) {
1478 if x.corpus != nil {
1479 return x.corpus.GetMediaTags(ctx, fileRef)
1480 }
1481 fi, err := x.GetFileInfo(ctx, fileRef)
1482 if err != nil {
1483 return nil, err
1484 }
1485 it := x.queryPrefix(keyMediaTag, fi.WholeRef.String())
1486 defer closeIterator(it, &err)
1487 for it.Next() {
1488 if tags == nil {
1489 tags = make(map[string]string)
1490 }
1491 tags[it.Key()] = it.Value()
1492 }
1493 return tags, nil
1494 }
1495
1496 func (x *Index) GetFileLocation(ctx context.Context, fileRef blob.Ref) (camtypes.Location, error) {
1497 if x.corpus != nil {
1498 lat, long, ok := x.corpus.FileLatLong(fileRef)
1499 if !ok {
1500 return camtypes.Location{}, os.ErrNotExist
1501 }
1502
1503 if math.IsNaN(long) || math.IsNaN(lat) {
1504 return camtypes.Location{}, fmt.Errorf("latitude or longitude in corpus for %v is NaN. Reindex to fix it", fileRef)
1505 }
1506 return camtypes.Location{Latitude: lat, Longitude: long}, nil
1507 }
1508 fi, err := x.GetFileInfo(ctx, fileRef)
1509 if err != nil {
1510 return camtypes.Location{}, err
1511 }
1512 it := x.queryPrefixString(keyEXIFGPS.Key(fi.WholeRef.String()))
1513 defer closeIterator(it, &err)
1514 if !it.Next() {
1515 return camtypes.Location{}, os.ErrNotExist
1516 }
1517
1518 var lat, long float64
1519 key, v := it.Key(), it.Value()
1520 pipe := strings.Index(v, "|")
1521 if pipe < 0 {
1522 return camtypes.Location{}, fmt.Errorf("index: bogus key %q = %q", key, v)
1523 }
1524 lat, err = strconv.ParseFloat(v[:pipe], 64)
1525 if err != nil {
1526 return camtypes.Location{}, fmt.Errorf("index: bogus value at position 0 in key %q = %q", key, v)
1527 }
1528 long, err = strconv.ParseFloat(v[pipe+1:], 64)
1529 if err != nil {
1530 return camtypes.Location{}, fmt.Errorf("index: bogus value at position 1 in key %q = %q", key, v)
1531 }
1532 if math.IsNaN(long) || math.IsNaN(lat) {
1533 return camtypes.Location{}, fmt.Errorf("latitude or longitude in index for %v is NaN. Reindex to fix it", fileRef)
1534 }
1535 return camtypes.Location{Latitude: lat, Longitude: long}, nil
1536 }
1537
1538 func (x *Index) EdgesTo(ref blob.Ref, opts *camtypes.EdgesToOpts) (edges []*camtypes.Edge, err error) {
1539 it := x.queryPrefix(keyEdgeBackward, ref)
1540 defer closeIterator(it, &err)
1541 permanodeParents := make(map[string]*camtypes.Edge)
1542 for it.Next() {
1543 edge, ok := kvEdgeBackward(it.Key(), it.Value())
1544 if !ok {
1545 continue
1546 }
1547 if x.IsDeleted(edge.From) {
1548 continue
1549 }
1550 if x.IsDeleted(edge.BlobRef) {
1551 continue
1552 }
1553 edge.To = ref
1554 if edge.FromType == schema.TypePermanode {
1555 permanodeParents[edge.From.String()] = edge
1556 } else {
1557 edges = append(edges, edge)
1558 }
1559 }
1560 for _, e := range permanodeParents {
1561 edges = append(edges, e)
1562 }
1563 return edges, nil
1564 }
1565
1566 func kvEdgeBackward(k, v string) (edge *camtypes.Edge, ok bool) {
1567
1568 keyPart := strings.Split(k, "|")
1569 valPart := strings.Split(v, "|")
1570 if len(keyPart) != 4 || len(valPart) != 2 {
1571
1572 log.Printf("bogus keyEdgeBackward index entry: %q = %q", k, v)
1573 return
1574 }
1575 if keyPart[0] != "edgeback" {
1576 return
1577 }
1578 parentRef, ok := blob.Parse(keyPart[2])
1579 if !ok {
1580 log.Printf("bogus parent in keyEdgeBackward index entry: %q", keyPart[2])
1581 return
1582 }
1583 blobRef, ok := blob.Parse(keyPart[3])
1584 if !ok {
1585 log.Printf("bogus blobref in keyEdgeBackward index entry: %q", keyPart[3])
1586 return
1587 }
1588 return &camtypes.Edge{
1589 From: parentRef,
1590 FromType: schema.CamliType(valPart[0]),
1591 FromTitle: valPart[1],
1592 BlobRef: blobRef,
1593 }, true
1594 }
1595
1596
1597 func (x *Index) GetDirMembers(ctx context.Context, dir blob.Ref, dest chan<- blob.Ref, limit int) (err error) {
1598 defer close(dest)
1599
1600 sent := 0
1601 if x.corpus != nil {
1602 children, err := x.corpus.GetDirChildren(ctx, dir)
1603 if err != nil {
1604 return err
1605 }
1606 for child := range children {
1607 dest <- child
1608 sent++
1609 if sent == limit {
1610 break
1611 }
1612 }
1613 return nil
1614 }
1615
1616 it := x.queryPrefix(keyStaticDirChild, dir.String())
1617 defer closeIterator(it, &err)
1618 for it.Next() {
1619 keyPart := strings.Split(it.Key(), "|")
1620 if len(keyPart) != 3 {
1621 return fmt.Errorf("index: bogus key keyStaticDirChild = %q", it.Key())
1622 }
1623
1624 child, ok := blob.Parse(keyPart[2])
1625 if !ok {
1626 continue
1627 }
1628 dest <- child
1629 sent++
1630 if sent == limit {
1631 break
1632 }
1633 }
1634 return nil
1635 }
1636
1637 func kvBlobMeta(k, v string) (bm camtypes.BlobMeta, ok bool) {
1638 refStr := k[len("meta:"):]
1639 br, ok := blob.Parse(refStr)
1640 if !ok {
1641 return
1642 }
1643 pipe := strings.Index(v, "|")
1644 if pipe < 0 {
1645 return
1646 }
1647 size, err := strconv.ParseUint(v[:pipe], 10, 32)
1648 if err != nil {
1649 return
1650 }
1651 return camtypes.BlobMeta{
1652 Ref: br,
1653 Size: uint32(size),
1654 CamliType: camliTypeFromMIME(v[pipe+1:]),
1655 }, true
1656 }
1657
1658 func kvBlobMeta_bytes(k, v []byte) (bm camtypes.BlobMeta, ok bool) {
1659 ref := k[len("meta:"):]
1660 br, ok := blob.ParseBytes(ref)
1661 if !ok {
1662 return
1663 }
1664 pipe := bytes.IndexByte(v, '|')
1665 if pipe < 0 {
1666 return
1667 }
1668 size, err := strutil.ParseUintBytes(v[:pipe], 10, 32)
1669 if err != nil {
1670 return
1671 }
1672 return camtypes.BlobMeta{
1673 Ref: br,
1674 Size: uint32(size),
1675 CamliType: camliTypeFromMIME_bytes(v[pipe+1:]),
1676 }, true
1677 }
1678
1679 func enumerateBlobMeta(s sorted.KeyValue, cb func(camtypes.BlobMeta) error) (err error) {
1680 it := queryPrefixString(s, "meta:")
1681 defer closeIterator(it, &err)
1682 for it.Next() {
1683 bm, ok := kvBlobMeta(it.Key(), it.Value())
1684 if !ok {
1685 continue
1686 }
1687 if err := cb(bm); err != nil {
1688 return err
1689 }
1690 }
1691 return nil
1692 }
1693
1694 var errStopIteration = errors.New("stop iteration")
1695
1696
1697
1698
1699 func (x *Index) EnumerateBlobMeta(ctx context.Context, fn func(camtypes.BlobMeta) bool) error {
1700 if x.corpus != nil {
1701 var err error
1702 var n int
1703 done := ctx.Done()
1704 x.corpus.EnumerateBlobMeta(func(m camtypes.BlobMeta) bool {
1705
1706 n++
1707 if n%256 == 0 {
1708 select {
1709 case <-done:
1710 err = ctx.Err()
1711 return false
1712 default:
1713
1714 }
1715 }
1716 return fn(m)
1717 })
1718 return err
1719 }
1720 done := ctx.Done()
1721 err := enumerateBlobMeta(x.s, func(bm camtypes.BlobMeta) error {
1722 select {
1723 case <-done:
1724 return ctx.Err()
1725 default:
1726 if !fn(bm) {
1727 return errStopIteration
1728 }
1729 return nil
1730 }
1731 })
1732 if errors.Is(err, errStopIteration) {
1733 err = nil
1734 }
1735 return err
1736 }
1737
1738
1739 func (x *Index) Storage() sorted.KeyValue { return x.s }
1740
1741
1742
1743
1744 func (x *Index) Close() error {
1745 if cl, ok := x.s.(io.Closer); ok {
1746 return cl.Close()
1747 }
1748 return nil
1749 }
1750
1751
1752 func (x *Index) initNeededMaps() (err error) {
1753 x.deletes = newDeletionCache()
1754 it := x.queryPrefix(keyMissing)
1755 defer closeIterator(it, &err)
1756 for it.Next() {
1757 key := it.KeyBytes()
1758 pair := key[len("missing|"):]
1759 pipe := bytes.IndexByte(pair, '|')
1760 if pipe < 0 {
1761 return fmt.Errorf("Bogus missing key %q", key)
1762 }
1763 have, ok1 := blob.ParseBytes(pair[:pipe])
1764 missing, ok2 := blob.ParseBytes(pair[pipe+1:])
1765 if !ok1 || !ok2 {
1766 return fmt.Errorf("Bogus missing key %q", key)
1767 }
1768 x.noteNeededMemory(have, missing)
1769 }
1770 return
1771 }
1772
1773 func (x *Index) noteNeeded(have, missing blob.Ref) error {
1774 if err := x.s.Set(keyMissing.Key(have, missing), "1"); err != nil {
1775 return err
1776 }
1777 x.noteNeededMemory(have, missing)
1778 return nil
1779 }
1780
1781 func (x *Index) noteNeededMemory(have, missing blob.Ref) {
1782 x.needs[have] = append(x.needs[have], missing)
1783 x.neededBy[missing] = append(x.neededBy[missing], have)
1784 }
1785
1786 const camliTypeMIMEPrefix = "application/json; camliType="
1787
1788 var camliTypeMIMEPrefixBytes = []byte(camliTypeMIMEPrefix)
1789
1790
1791
1792 func camliTypeFromMIME(mime string) schema.CamliType {
1793 if v := strings.TrimPrefix(mime, camliTypeMIMEPrefix); v != mime {
1794 return schema.CamliType(v)
1795 }
1796 return ""
1797 }
1798
1799 func camliTypeFromMIME_bytes(mime []byte) schema.CamliType {
1800 if v := bytes.TrimPrefix(mime, camliTypeMIMEPrefixBytes); len(v) != len(mime) {
1801 return schema.CamliType(strutil.StringFromBytes(v))
1802 }
1803 return ""
1804 }
1805
1806
1807
1808 func IsIndexedAttribute(attr string) bool {
1809 switch attr {
1810 case "camliRoot", "camliImportRoot", "tag", "title":
1811 return true
1812 }
1813 return false
1814 }
1815
1816
1817
1818
1819
1820 func IsBlobReferenceAttribute(attr string) bool {
1821 switch attr {
1822 case "camliMember":
1823 return true
1824 }
1825 return false
1826 }
1827
1828 func IsFulltextAttribute(attr string) bool {
1829 switch attr {
1830 case "tag", "title":
1831 return true
1832 }
1833 return false
1834 }