1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package index
18
19 import (
20 "bytes"
21 "context"
22 "errors"
23 "fmt"
24 _ "image/gif"
25 _ "image/jpeg"
26 _ "image/png"
27 "io"
28 "log"
29 "math"
30 "os"
31 "path/filepath"
32 "sort"
33 "strconv"
34 "strings"
35 "sync"
36 "time"
37
38 "github.com/hjfreyer/taglib-go/taglib"
39 "github.com/rwcarlsen/goexif/exif"
40 "github.com/rwcarlsen/goexif/tiff"
41 _ "go4.org/media/heif"
42 "go4.org/readerutil"
43 "go4.org/types"
44 "perkeep.org/internal/images"
45 "perkeep.org/internal/magic"
46 "perkeep.org/internal/media"
47 "perkeep.org/pkg/blob"
48 "perkeep.org/pkg/blobserver"
49 "perkeep.org/pkg/jsonsign"
50 "perkeep.org/pkg/schema"
51 )
52
53 func init() {
54 t, err := time.Parse(time.RFC3339, msdosEpoch)
55 if err != nil {
56 panic(fmt.Sprintf("Cannot parse MSDOS epoch: %v", err))
57 }
58 msdosEpochTime = t
59 }
60
61 type mutationMap struct {
62
63
64
65
66
67 signerBlobRef blob.Ref
68 signerID string
69 kv map[string]string
70
71
72
73
74
75
76 deletes []schema.Claim
77 }
78
79 func (mm *mutationMap) Set(k, v string) {
80 if mm.kv == nil {
81 mm.kv = make(map[string]string)
82 }
83 mm.kv[k] = v
84 }
85
86 func (mm *mutationMap) noteDelete(deleteClaim schema.Claim) {
87 mm.deletes = append(mm.deletes, deleteClaim)
88 }
89
90 func blobsFilteringOut(v []blob.Ref, x blob.Ref) []blob.Ref {
91 switch len(v) {
92 case 0:
93 return nil
94 case 1:
95 if v[0] == x {
96 return nil
97 }
98 return v
99 }
100 nl := v[:0]
101 for _, vb := range v {
102 if vb != x {
103 nl = append(nl, vb)
104 }
105 }
106 return nl
107 }
108
109 func (ix *Index) indexBlob(ctx context.Context, br blob.Ref) error {
110 rc, _, err := ix.blobSource.Fetch(ctx, br)
111 if err != nil {
112 return fmt.Errorf("index: failed to fetch %v for reindexing: %v", br, err)
113 }
114 defer rc.Close()
115 if _, err := blobserver.Receive(ctx, ix, br, rc); err != nil {
116 return err
117 }
118 return nil
119 }
120
121
122
123 func (ix *Index) indexReadyBlobs(ctx context.Context) {
124 defer ix.reindexWg.Done()
125
126 popReadyReindex := func() (blob.Ref, bool) {
127 ix.Lock()
128 defer ix.Unlock()
129
130 if len(ix.readyReindex) == 0 {
131 return blob.Ref{}, false
132 }
133 var br blob.Ref
134 for br = range ix.readyReindex {
135 break
136 }
137 delete(ix.readyReindex, br)
138
139 return br, true
140 }
141
142 failed := make(map[blob.Ref]bool)
143 for br, ok := popReadyReindex(); ok; br, ok = popReadyReindex() {
144 if err := ix.indexBlob(ctx, br); err != nil {
145 log.Printf("out-of-order indexBlob(%v) = %v", br, err)
146 failed[br] = true
147 }
148 }
149
150 ix.Lock()
151 defer ix.Unlock()
152 for br := range failed {
153 ix.readyReindex[br] = true
154 }
155 }
156
157
158
159
160 func (ix *Index) noteBlobIndexed(br blob.Ref) {
161 for _, needer := range ix.neededBy[br] {
162 newNeeds := blobsFilteringOut(ix.needs[needer], br)
163 if len(newNeeds) == 0 {
164 ix.readyReindex[needer] = true
165 delete(ix.needs, needer)
166 ix.reindexWg.Add(1)
167 go ix.indexReadyBlobs(context.Background())
168 } else {
169 ix.needs[needer] = newNeeds
170 }
171 }
172 delete(ix.neededBy, br)
173 }
174
175 func (ix *Index) removeAllMissingEdges(br blob.Ref) {
176 var toDelete []string
177 it := ix.queryPrefix(keyMissing, br)
178 for it.Next() {
179 toDelete = append(toDelete, it.Key())
180 }
181 if err := it.Close(); err != nil {
182
183 log.Printf("Iterator close error: %v", err)
184 }
185 for _, k := range toDelete {
186 if err := ix.s.Delete(k); err != nil {
187 log.Printf("Error deleting key %s: %v", k, err)
188 }
189 }
190 }
191
192 func (ix *Index) ReceiveBlob(ctx context.Context, blobRef blob.Ref, source io.Reader) (blob.SizedRef, error) {
193
194 sniffer := NewBlobSniffer(blobRef)
195 written, err := io.Copy(sniffer, source)
196 if err != nil {
197 return blob.SizedRef{}, err
198 }
199 sbr := blob.SizedRef{Ref: blobRef, Size: uint32(written)}
200
201 ix.Lock()
202 defer ix.Unlock()
203
204 missingDeps := false
205 defer func() {
206 if err == nil {
207 ix.noteBlobIndexed(blobRef)
208 if !missingDeps {
209 ix.removeAllMissingEdges(blobRef)
210 }
211 }
212 }()
213
214
215
216
217
218
219
220 if haveVal, haveErr := ix.s.Get("have:" + blobRef.String()); haveErr == nil {
221 if strings.HasSuffix(haveVal, "|indexed") {
222 if allowReindex, _ := strconv.ParseBool(os.Getenv("CAMLI_REDO_INDEX_ON_RECEIVE")); allowReindex {
223 if debugEnv {
224 log.Printf("index: reindexing %v", sbr)
225 }
226 } else {
227 if debugEnv {
228 log.Printf("index: ignoring upload of already-indexed %v", sbr)
229 }
230 return sbr, nil
231 }
232 }
233 }
234
235 sniffer.Parse()
236
237 fetcher := &missTrackFetcher{
238 fetcher: ix.blobSource,
239 }
240
241 mm, err := ix.populateMutationMap(ctx, fetcher, blobRef, sniffer)
242 if debugEnv {
243 log.Printf("index of %v: mm=%v, err=%v", blobRef, mm, err)
244 }
245 if err != nil {
246 if err != errMissingDep {
247 return blob.SizedRef{}, err
248 }
249 fetcher.mu.Lock()
250 defer fetcher.mu.Unlock()
251 if len(fetcher.missing) == 0 {
252 panic("errMissingDep happened, but no fetcher.missing recorded")
253 }
254 missingDeps = true
255 allRecorded := true
256 for _, missing := range fetcher.missing {
257 if err := ix.noteNeeded(blobRef, missing); err != nil {
258 allRecorded = false
259 }
260 }
261 if allRecorded {
262
263
264
265
266 return sbr, nil
267 }
268 return blob.SizedRef{}, err
269 }
270
271 if err := ix.commit(mm); err != nil {
272 return blob.SizedRef{}, err
273 }
274
275 if c := ix.corpus; c != nil {
276 if err = c.addBlob(ctx, blobRef, mm); err != nil {
277 return blob.SizedRef{}, err
278 }
279 }
280
281
282
283
284
285
286
287
288 return blob.SizedRef{Ref: blobRef, Size: uint32(written)}, nil
289 }
290
291
292
293
294 func (ix *Index) commit(mm *mutationMap) error {
295
296
297
298 ix.deletes.Lock()
299 defer ix.deletes.Unlock()
300 bm := ix.s.BeginBatch()
301 for k, v := range mm.kv {
302 bm.Set(k, v)
303 }
304 err := ix.s.CommitBatch(bm)
305 if err != nil {
306 return err
307 }
308 for _, cl := range mm.deletes {
309 if err := ix.updateDeletesCache(cl); err != nil {
310 return fmt.Errorf("Could not update the deletes cache after deletion from %v: %v", cl, err)
311 }
312 }
313 return nil
314 }
315
316 func (ix *Index) verifySignature(ctx context.Context, fetcher *missTrackFetcher, schemaBlob *schema.Blob) (*jsonsign.VerifyRequest, error) {
317 tf := &trackErrorsFetcher{f: fetcher}
318 vr := jsonsign.NewVerificationRequest(schemaBlob.JSON(), blob.NewSerialFetcher(ix.KeyFetcher, tf))
319 _, err := vr.Verify(ctx)
320
321 if err != nil {
322
323
324 if tf.hasErrNotExist() {
325 return nil, errMissingDep
326 }
327 return nil, err
328 }
329
330 return vr, nil
331 }
332
333 func (ix *Index) populateMutationMapForSchema(ctx context.Context, fetcher *missTrackFetcher, schemaBlob *schema.Blob, mm *mutationMap) error {
334 switch schemaBlob.Type() {
335 case schema.TypePermanode:
336 _, err := ix.verifySignature(ctx, fetcher, schemaBlob)
337 return err
338 case schema.TypeClaim:
339 vr, err := ix.verifySignature(ctx, fetcher, schemaBlob)
340 if err != nil {
341 return err
342 }
343 return ix.populateClaim(ctx, fetcher, schemaBlob, vr, mm)
344 case schema.TypeFile:
345 return ix.populateFile(ctx, fetcher, schemaBlob, mm)
346 case schema.TypeDirectory:
347 return ix.populateDir(ctx, fetcher, schemaBlob, mm)
348 default:
349 return nil
350 }
351 }
352
353
354
355
356
357
358 func (ix *Index) populateMutationMap(ctx context.Context, fetcher *missTrackFetcher, br blob.Ref, sniffer *BlobSniffer) (*mutationMap, error) {
359 mm := &mutationMap{
360 kv: map[string]string{
361 "meta:" + br.String(): fmt.Sprintf("%d|%s", sniffer.Size(), sniffer.MIMEType()),
362 },
363 }
364 var err error
365 if schemaBlob, ok := sniffer.SchemaBlob(); ok {
366 err = ix.populateMutationMapForSchema(ctx, fetcher, schemaBlob, mm)
367 }
368
369 if err != nil && err != errMissingDep {
370 return nil, err
371 }
372 var haveVal string
373 if err == errMissingDep {
374 haveVal = fmt.Sprintf("%d", sniffer.Size())
375 } else {
376 haveVal = fmt.Sprintf("%d|indexed", sniffer.Size())
377 }
378 mm.kv["have:"+br.String()] = haveVal
379 if len(fetcher.missing) == 0 {
380
381
382
383
384
385 return mm, nil
386 }
387 return mm, err
388 }
389
390
391 type keepFirstN struct {
392 N int
393 Bytes []byte
394 }
395
396 func (w *keepFirstN) Write(p []byte) (n int, err error) {
397 if n := w.N - len(w.Bytes); n > 0 {
398 if n > len(p) {
399 n = len(p)
400 }
401 w.Bytes = append(w.Bytes, p[:n]...)
402 }
403 return len(p), nil
404 }
405
406
407
408 type missTrackFetcher struct {
409 fetcher blob.Fetcher
410
411 mu sync.Mutex
412 missing []blob.Ref
413 }
414
415 func (f *missTrackFetcher) Fetch(ctx context.Context, br blob.Ref) (blob io.ReadCloser, size uint32, err error) {
416 blob, size, err = f.fetcher.Fetch(ctx, br)
417 if err == os.ErrNotExist {
418 f.mu.Lock()
419 defer f.mu.Unlock()
420 f.missing = append(f.missing, br)
421 }
422 return
423 }
424
425
426 type trackErrorsFetcher struct {
427 mu sync.RWMutex
428 errs []error
429
430 f blob.Fetcher
431 }
432
433 func (tf *trackErrorsFetcher) Fetch(ctx context.Context, br blob.Ref) (blob io.ReadCloser, size uint32, err error) {
434 blob, size, err = tf.f.Fetch(ctx, br)
435 if err != nil {
436 tf.mu.Lock()
437 defer tf.mu.Unlock()
438 tf.errs = append(tf.errs, err)
439 }
440 return
441 }
442
443
444
445 func (tf *trackErrorsFetcher) hasErrNotExist() bool {
446 tf.mu.RLock()
447 defer tf.mu.RUnlock()
448 if len(tf.errs) == 0 {
449 return false
450 }
451 for _, v := range tf.errs {
452 if v != os.ErrNotExist {
453 return false
454 }
455 }
456 return true
457 }
458
459
460 type filePrefixReader interface {
461 io.Reader
462 io.ReaderAt
463 }
464
465
466
467 func readPrefixOrFile(prefix []byte, fetcher blob.Fetcher, b *schema.Blob, fn func(filePrefixReader) error) (err error) {
468 pr := bytes.NewReader(prefix)
469 err = fn(pr)
470 if err == io.EOF || err == io.ErrUnexpectedEOF {
471 var fr *schema.FileReader
472 fr, err = b.NewFileReader(fetcher)
473 if err == nil {
474 err = fn(fr)
475 fr.Close()
476 }
477 }
478 return err
479 }
480
481 const msdosEpoch = "1980-01-01T00:00:00Z"
482
483 var (
484 exifDebug, _ = strconv.ParseBool(os.Getenv("CAMLI_DEBUG_IMAGES"))
485 debugEnv, _ = strconv.ParseBool(os.Getenv("CAMLI_DEBUG"))
486 msdosEpochTime time.Time
487 )
488
489
490
491 func (ix *Index) populateFile(ctx context.Context, fetcher blob.Fetcher, b *schema.Blob, mm *mutationMap) (err error) {
492 var times []time.Time
493 times = append(times, b.ModTime())
494
495 blobRef := b.BlobRef()
496 tf := &trackErrorsFetcher{f: fetcher.(*missTrackFetcher)}
497 fr, err := b.NewFileReader(tf)
498 if err != nil {
499 return err
500 }
501 defer fr.Close()
502 mimeType, mr := magic.MIMETypeFromReader(fr)
503 if mimeType == "" {
504 mimeType = magic.MIMETypeByExtension(filepath.Ext(b.FileName()))
505 }
506
507 h := blob.NewHash()
508 var copyDest io.Writer = h
509 var imageBuf *keepFirstN
510 if strings.HasPrefix(mimeType, "image/") {
511 imageBuf = &keepFirstN{N: 512 << 10}
512 copyDest = io.MultiWriter(copyDest, imageBuf)
513 }
514 size, err := io.Copy(copyDest, mr)
515 if err != nil {
516 if tf.hasErrNotExist() {
517 return errMissingDep
518 }
519 return err
520 }
521 wholeRef := blob.RefFromHash(h)
522
523 if imageBuf != nil {
524 var conf images.Config
525 decodeConfig := func(r filePrefixReader) error {
526 conf, err = images.DecodeConfig(r)
527 return err
528 }
529 if err := readPrefixOrFile(imageBuf.Bytes, fetcher, b, decodeConfig); err == nil {
530 mm.Set(keyImageSize.Key(blobRef), keyImageSize.Val(fmt.Sprint(conf.Width), fmt.Sprint(conf.Height)))
531 } else if debugEnv {
532 log.Printf("index: WARNING: image decodeConfig: %v", err)
533 }
534
535 exifData := imageBuf.Bytes
536 if conf.HEICEXIF != nil {
537 exifData = conf.HEICEXIF
538 }
539 var ft time.Time
540 fileTime := func(r filePrefixReader) error {
541 ft, err = schema.FileTime(r)
542 return err
543 }
544
545 if err = readPrefixOrFile(exifData, fetcher, b, fileTime); err == nil {
546 times = append(times, ft)
547 } else if debugEnv {
548 log.Printf("index: WARNING: image fileTime: %v", err)
549
550 }
551 if exifDebug {
552 log.Printf("filename %q exif = %v, %v", b.FileName(), ft, err)
553 }
554
555
556 indexEXIFData := func(r filePrefixReader) error {
557 return indexEXIF(wholeRef, r, mm)
558 }
559 if err = readPrefixOrFile(exifData, fetcher, b, indexEXIFData); err != nil {
560 if exifDebug {
561 log.Printf("error parsing EXIF: %v", err)
562 }
563 }
564 }
565
566 var sortTimes []time.Time
567 for _, t := range times {
568 if !t.IsZero() {
569 sortTimes = append(sortTimes, t)
570 }
571 }
572 sort.Sort(types.ByTime(sortTimes))
573 var time3339s string
574 switch {
575 case len(sortTimes) == 1:
576 time3339s = types.Time3339(sortTimes[0]).String()
577 case len(sortTimes) >= 2:
578 oldest, newest := sortTimes[0], sortTimes[len(sortTimes)-1]
579
580
581
582 if oldest.After(msdosEpochTime) {
583 time3339s = types.Time3339(oldest).String() + "," + types.Time3339(newest).String()
584 } else {
585 time3339s = types.Time3339(newest).String()
586 }
587 }
588
589 mm.Set(keyWholeToFileRef.Key(wholeRef, blobRef), "1")
590 mm.Set(keyFileInfo.Key(blobRef), keyFileInfo.Val(size, b.FileName(), mimeType, wholeRef))
591 mm.Set(keyFileTimes.Key(blobRef), keyFileTimes.Val(time3339s))
592
593 if strings.HasPrefix(mimeType, "audio/") {
594 indexMusic(io.NewSectionReader(fr, 0, fr.Size()), wholeRef, mm)
595 }
596
597 return nil
598 }
599
600 func tagFormatString(tag *tiff.Tag) string {
601 switch tag.Format() {
602 case tiff.IntVal:
603 return "int"
604 case tiff.RatVal:
605 return "rat"
606 case tiff.FloatVal:
607 return "float"
608 case tiff.StringVal:
609 return "string"
610 }
611 return ""
612 }
613
614 type exifWalkFunc func(name exif.FieldName, tag *tiff.Tag) error
615
616 func (f exifWalkFunc) Walk(name exif.FieldName, tag *tiff.Tag) error { return f(name, tag) }
617
618 var errEXIFPanic = errors.New("EXIF library panicked while walking fields")
619
620 func indexEXIF(wholeRef blob.Ref, r io.Reader, mm *mutationMap) (err error) {
621 var tiffErr error
622 ex, err := exif.Decode(r)
623 if err != nil {
624 tiffErr = err
625 if exif.IsCriticalError(err) {
626 if exif.IsShortReadTagValueError(err) {
627 return io.ErrUnexpectedEOF
628 }
629 return
630 }
631 log.Printf("Non critical TIFF decoding error: %v", err)
632 }
633 defer func() {
634
635
636
637
638 if e := recover(); e != nil {
639 err = errEXIFPanic
640 }
641 }()
642
643 err = ex.Walk(exifWalkFunc(func(name exif.FieldName, tag *tiff.Tag) error {
644 tagFmt := tagFormatString(tag)
645 if tagFmt == "" {
646 return nil
647 }
648 key := keyEXIFTag.Key(wholeRef, fmt.Sprintf("%04x", tag.Id))
649 numComp := int(tag.Count)
650 if tag.Format() == tiff.StringVal {
651 numComp = 1
652 }
653 var val bytes.Buffer
654 val.WriteString(keyEXIFTag.Val(tagFmt, numComp, ""))
655 if tag.Format() == tiff.StringVal {
656 str, err := tag.StringVal()
657 if err != nil {
658 log.Printf("Invalid EXIF string data: %v", err)
659 return nil
660 }
661 if containsUnsafeRawStrByte(str) {
662 val.WriteString(urle(str))
663 } else {
664 val.WriteString(str)
665 }
666 } else {
667 for i := 0; i < int(tag.Count); i++ {
668 if i > 0 {
669 val.WriteByte('|')
670 }
671 switch tagFmt {
672 case "int":
673 v, err := tag.Int(i)
674 if err != nil {
675 log.Printf("Invalid EXIF int data: %v", err)
676 return nil
677 }
678 fmt.Fprintf(&val, "%d", v)
679 case "rat":
680 n, d, err := tag.Rat2(i)
681 if err != nil {
682 log.Printf("Invalid EXIF rat data: %v", err)
683 return nil
684 }
685 fmt.Fprintf(&val, "%d/%d", n, d)
686 case "float":
687 v, err := tag.Float(i)
688 if err != nil {
689 log.Printf("Invalid EXIF float data: %v", err)
690 return nil
691 }
692 fmt.Fprintf(&val, "%v", v)
693 default:
694 panic("shouldn't get here")
695 }
696 }
697 }
698 valStr := val.String()
699 mm.Set(key, valStr)
700 return nil
701 }))
702 if err != nil {
703 return
704 }
705
706 if exif.IsGPSError(tiffErr) {
707 log.Printf("Invalid EXIF GPS data: %v", tiffErr)
708 return nil
709 }
710 if lat, long, err := ex.LatLong(); err == nil {
711 if math.Abs(long) > 180.0 || math.Abs(lat) > 90.0 {
712 log.Printf("Long, lat outside allowed range: %v, %v", long, lat)
713 return nil
714 }
715 if math.IsNaN(long) || math.IsNaN(lat) {
716 log.Print("Latitude or Longitude is NaN")
717 return nil
718 }
719
720
721 mm.Set(keyEXIFGPS.Key(wholeRef), keyEXIFGPS.Val(fmt.Sprintf("%.7f", lat), fmt.Sprintf("%.7f", long)))
722 } else if !exif.IsTagNotPresentError(err) {
723 log.Printf("Invalid EXIF GPS data: %v", err)
724 }
725 return nil
726 }
727
728
729 func indexMusic(r readerutil.SizeReaderAt, wholeRef blob.Ref, mm *mutationMap) {
730 tag, err := taglib.Decode(r, r.Size())
731 if err != nil {
732 log.Print("index: error parsing tag: ", err)
733 return
734 }
735
736 var footerLength int64 = 0
737 if hasTag, err := media.HasID3v1Tag(r); err != nil {
738 log.Print("index: unable to check for ID3v1 tag: ", err)
739 return
740 } else if hasTag {
741 footerLength = media.ID3v1TagLength
742 }
743
744
745 audioStart := int64(tag.TagSize())
746 audioSize := r.Size() - audioStart - footerLength
747 hash := blob.NewHash()
748 if _, err := io.Copy(hash, io.NewSectionReader(r, audioStart, audioSize)); err != nil {
749 log.Print("index: error generating hash of audio data: ", err)
750 return
751 }
752 mediaRef := blob.RefFromHash(hash)
753
754 duration, err := media.GetMPEGAudioDuration(io.NewSectionReader(r, audioStart, audioSize))
755 if err != nil {
756 log.Print("index: unable to calculate audio duration: ", err)
757 duration = 0
758 }
759
760 var yearStr, trackStr, discStr, durationStr string
761 if !tag.Year().IsZero() {
762 const justYearLayout = "2006"
763 yearStr = tag.Year().Format(justYearLayout)
764 }
765 if tag.Track() != 0 {
766 trackStr = fmt.Sprintf("%d", tag.Track())
767 }
768 if tag.Disc() != 0 {
769 discStr = fmt.Sprintf("%d", tag.Disc())
770 }
771 if duration != 0 {
772 durationStr = fmt.Sprintf("%d", duration/time.Millisecond)
773 }
774
775
776
777 tags := map[string]string{
778 "title": tag.Title(),
779 "artist": tag.Artist(),
780 "album": tag.Album(),
781 "genre": tag.Genre(),
782 "musicbrainzalbumid": tag.CustomFrames()["MusicBrainz Album Id"],
783 "year": yearStr,
784 "track": trackStr,
785 "disc": discStr,
786 "mediaref": mediaRef.String(),
787 "durationms": durationStr,
788 }
789
790 for tag, value := range tags {
791 if value != "" {
792 mm.Set(keyMediaTag.Key(wholeRef, tag), keyMediaTag.Val(value))
793 }
794 }
795 }
796
797
798
799 func (ix *Index) populateDir(ctx context.Context, fetcher blob.Fetcher, b *schema.Blob, mm *mutationMap) error {
800 blobRef := b.BlobRef()
801
802
803
804 tf := &trackErrorsFetcher{f: fetcher.(*missTrackFetcher)}
805 dr, err := b.NewDirReader(ctx, tf)
806 if err != nil {
807
808
809
810 log.Printf("index: error indexing directory, creating NewDirReader %s: %v", blobRef, err)
811 return nil
812 }
813 sts, err := dr.StaticSet(ctx)
814 if err != nil {
815 if tf.hasErrNotExist() {
816 return errMissingDep
817 }
818 log.Printf("index: error indexing directory: can't get StaticSet: %v\n", err)
819 return nil
820 }
821
822 mm.Set(keyFileInfo.Key(blobRef), keyFileInfo.Val(len(sts), b.FileName(), "", blob.Ref{}))
823 for _, br := range sts {
824 mm.Set(keyStaticDirChild.Key(blobRef, br.String()), "1")
825 }
826 return nil
827 }
828
829 var errMissingDep = errors.New("blob was not fully indexed because of a missing dependency")
830
831
832
833 func (ix *Index) populateDeleteClaim(ctx context.Context, cl schema.Claim, vr *jsonsign.VerifyRequest, mm *mutationMap) error {
834 br := cl.Blob().BlobRef()
835 target := cl.Target()
836 if !target.Valid() {
837 log.Print(fmt.Errorf("no valid target for delete claim %v", br))
838 return nil
839 }
840 meta, err := ix.GetBlobMeta(ctx, target)
841 if err != nil {
842 if err == os.ErrNotExist {
843 if err := ix.noteNeeded(br, target); err != nil {
844 return fmt.Errorf("could not note that delete claim %v depends on %v: %v", br, target, err)
845 }
846 return errMissingDep
847 }
848 log.Print(fmt.Errorf("Could not get mime type of target blob %v: %v", target, err))
849 return nil
850 }
851
852 if meta.CamliType != schema.TypePermanode && meta.CamliType != schema.TypeClaim {
853 log.Print(fmt.Errorf("delete claim target in %v is neither a permanode nor a claim: %v", br, meta.CamliType))
854 return nil
855 }
856 mm.Set(keyDeleted.Key(target, cl.ClaimDateString(), br), "")
857 if meta.CamliType == schema.TypeClaim {
858 return nil
859 }
860 recentKey := keyRecentPermanode.Key(vr.SignerKeyId, cl.ClaimDateString(), br)
861 mm.Set(recentKey, target.String())
862 attr, value := cl.Attribute(), cl.Value()
863 claimKey := keyPermanodeClaim.Key(target, vr.SignerKeyId, cl.ClaimDateString(), br)
864 mm.Set(claimKey, keyPermanodeClaim.Val(cl.ClaimType(), attr, value, vr.CamliSigner))
865 return nil
866 }
867
868 func (ix *Index) populateClaim(ctx context.Context, fetcher *missTrackFetcher, b *schema.Blob, vr *jsonsign.VerifyRequest, mm *mutationMap) error {
869 br := b.BlobRef()
870
871 claim, ok := b.AsClaim()
872 if !ok {
873
874 return nil
875 }
876
877 verifiedKeyId := vr.SignerKeyId
878 mm.signerID = verifiedKeyId
879 mm.signerBlobRef = vr.CamliSigner
880 mm.Set(keySignerKeyID.name+":"+vr.CamliSigner.String(), verifiedKeyId)
881
882 if claim.ClaimType() == string(schema.DeleteClaim) {
883 if err := ix.populateDeleteClaim(ctx, claim, vr, mm); err != nil {
884 return err
885 }
886 mm.noteDelete(claim)
887 return nil
888 }
889
890 pnbr := claim.ModifiedPermanode()
891 if !pnbr.Valid() {
892
893 return nil
894 }
895
896 attr, value := claim.Attribute(), claim.Value()
897 recentKey := keyRecentPermanode.Key(verifiedKeyId, claim.ClaimDateString(), br)
898 mm.Set(recentKey, pnbr.String())
899 claimKey := keyPermanodeClaim.Key(pnbr, verifiedKeyId, claim.ClaimDateString(), br)
900 mm.Set(claimKey, keyPermanodeClaim.Val(claim.ClaimType(), attr, value, vr.CamliSigner))
901
902 if strings.HasPrefix(attr, "camliPath:") {
903 targetRef, ok := blob.Parse(value)
904 if ok {
905
906
907
908
909
910 suffix := attr[len("camliPath:"):]
911 active := "Y"
912 if claim.ClaimType() == "del-attribute" {
913 active = "N"
914 }
915 baseRef := pnbr
916 claimRef := br
917
918 key := keyPathBackward.Key(verifiedKeyId, targetRef, claimRef)
919 val := keyPathBackward.Val(claim.ClaimDateString(), baseRef, active, suffix)
920 mm.Set(key, val)
921
922 key = keyPathForward.Key(verifiedKeyId, baseRef, suffix, claim.ClaimDateString(), claimRef)
923 val = keyPathForward.Val(active, targetRef)
924 mm.Set(key, val)
925 }
926 }
927
928 if claim.ClaimType() != string(schema.DelAttributeClaim) && IsIndexedAttribute(attr) {
929 key := keySignerAttrValue.Key(verifiedKeyId, attr, value, claim.ClaimDateString(), br)
930 mm.Set(key, keySignerAttrValue.Val(pnbr))
931 }
932
933 if IsBlobReferenceAttribute(attr) {
934 targetRef, ok := blob.Parse(value)
935 if ok {
936 key := keyEdgeBackward.Key(targetRef, pnbr, br)
937 mm.Set(key, keyEdgeBackward.Val("permanode", ""))
938 }
939 }
940
941 return nil
942 }
943
944
945
946 func (ix *Index) updateDeletesCache(deleteClaim schema.Claim) error {
947 target := deleteClaim.Target()
948 deleter := deleteClaim.Blob()
949 when, err := deleter.ClaimDate()
950 if err != nil {
951 return fmt.Errorf("Could not get date of delete claim %v: %v", deleteClaim, err)
952 }
953 targetDeletions := append(ix.deletes.m[target],
954 deletion{
955 deleter: deleter.BlobRef(),
956 when: when,
957 })
958 sort.Sort(sort.Reverse(byDeletionDate(targetDeletions)))
959 ix.deletes.m[target] = targetDeletions
960 return nil
961 }