Home Download Docs Code Community
     1	/*
     2	Copyright 2011 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	package index
    18	
    19	import (
    20		"bytes"
    21		"context"
    22		"errors"
    23		"fmt"
    24		_ "image/gif"
    25		_ "image/jpeg"
    26		_ "image/png"
    27		"io"
    28		"log"
    29		"math"
    30		"os"
    31		"path/filepath"
    32		"sort"
    33		"strconv"
    34		"strings"
    35		"sync"
    36		"time"
    37	
    38		"github.com/hjfreyer/taglib-go/taglib"
    39		"github.com/rwcarlsen/goexif/exif"
    40		"github.com/rwcarlsen/goexif/tiff"
    41		_ "go4.org/media/heif"
    42		"go4.org/readerutil"
    43		"go4.org/types"
    44		"perkeep.org/internal/images"
    45		"perkeep.org/internal/magic"
    46		"perkeep.org/internal/media"
    47		"perkeep.org/pkg/blob"
    48		"perkeep.org/pkg/blobserver"
    49		"perkeep.org/pkg/jsonsign"
    50		"perkeep.org/pkg/schema"
    51	)
    52	
    53	func init() {
    54		t, err := time.Parse(time.RFC3339, msdosEpoch)
    55		if err != nil {
    56			panic(fmt.Sprintf("Cannot parse MSDOS epoch: %v", err))
    57		}
    58		msdosEpochTime = t
    59	}
    60	
    61	type mutationMap struct {
    62		// When the mutations are from a claim, signerBlobRef is the signer of the
    63		// claim, and signerID is its matching GPG key ID. They are copied out of kv because,
    64		// when adding the corresponding entries in the corpus, the signerBlobRef-signerID
    65		// relation needs to be known before the claim mutations themselves, so we need to
    66		// make sure the keySignerKeyID entry is always added first.
    67		signerBlobRef blob.Ref
    68		signerID      string
    69		kv            map[string]string // the keys and values we populate
    70	
    71		// We record if we get a delete claim, so we can update
    72		// the deletes cache right after committing the mutation.
    73		//
    74		// TODO(mpl): we only need to keep track of one claim so far,
    75		// but I chose a slice for when we need to do multi-claims?
    76		deletes []schema.Claim
    77	}
    78	
    79	func (mm *mutationMap) Set(k, v string) {
    80		if mm.kv == nil {
    81			mm.kv = make(map[string]string)
    82		}
    83		mm.kv[k] = v
    84	}
    85	
    86	func (mm *mutationMap) noteDelete(deleteClaim schema.Claim) {
    87		mm.deletes = append(mm.deletes, deleteClaim)
    88	}
    89	
    90	func blobsFilteringOut(v []blob.Ref, x blob.Ref) []blob.Ref {
    91		switch len(v) {
    92		case 0:
    93			return nil
    94		case 1:
    95			if v[0] == x {
    96				return nil
    97			}
    98			return v
    99		}
   100		nl := v[:0]
   101		for _, vb := range v {
   102			if vb != x {
   103				nl = append(nl, vb)
   104			}
   105		}
   106		return nl
   107	}
   108	
   109	func (ix *Index) indexBlob(ctx context.Context, br blob.Ref) error {
   110		rc, _, err := ix.blobSource.Fetch(ctx, br)
   111		if err != nil {
   112			return fmt.Errorf("index: failed to fetch %v for reindexing: %v", br, err)
   113		}
   114		defer rc.Close()
   115		if _, err := blobserver.Receive(ctx, ix, br, rc); err != nil {
   116			return err
   117		}
   118		return nil
   119	}
   120	
   121	// indexReadyBlobs indexes blobs that have been recently marked as ready to be
   122	// reindexed, after the blobs they depend on eventually were indexed.
   123	func (ix *Index) indexReadyBlobs(ctx context.Context) {
   124		defer ix.reindexWg.Done()
   125	
   126		popReadyReindex := func() (blob.Ref, bool) {
   127			ix.Lock()
   128			defer ix.Unlock()
   129	
   130			if len(ix.readyReindex) == 0 {
   131				return blob.Ref{}, false
   132			}
   133			var br blob.Ref
   134			for br = range ix.readyReindex {
   135				break
   136			}
   137			delete(ix.readyReindex, br)
   138	
   139			return br, true
   140		}
   141	
   142		failed := make(map[blob.Ref]bool)
   143		for br, ok := popReadyReindex(); ok; br, ok = popReadyReindex() {
   144			if err := ix.indexBlob(ctx, br); err != nil {
   145				log.Printf("out-of-order indexBlob(%v) = %v", br, err)
   146				failed[br] = true
   147			}
   148		}
   149	
   150		ix.Lock()
   151		defer ix.Unlock()
   152		for br := range failed {
   153			ix.readyReindex[br] = true
   154		}
   155	}
   156	
   157	// noteBlobIndexed checks if the recent indexing of br now allows the blobs that
   158	// were depending on br, to be indexed in turn. If yes, they're reindexed
   159	// asynchronously by indexReadyBlobs.
   160	func (ix *Index) noteBlobIndexed(br blob.Ref) {
   161		for _, needer := range ix.neededBy[br] {
   162			newNeeds := blobsFilteringOut(ix.needs[needer], br)
   163			if len(newNeeds) == 0 {
   164				ix.readyReindex[needer] = true
   165				delete(ix.needs, needer)
   166				ix.reindexWg.Add(1)
   167				go ix.indexReadyBlobs(context.Background())
   168			} else {
   169				ix.needs[needer] = newNeeds
   170			}
   171		}
   172		delete(ix.neededBy, br)
   173	}
   174	
   175	func (ix *Index) removeAllMissingEdges(br blob.Ref) {
   176		var toDelete []string
   177		it := ix.queryPrefix(keyMissing, br)
   178		for it.Next() {
   179			toDelete = append(toDelete, it.Key())
   180		}
   181		if err := it.Close(); err != nil {
   182			// TODO: Care? Can lazily clean up later.
   183			log.Printf("Iterator close error: %v", err)
   184		}
   185		for _, k := range toDelete {
   186			if err := ix.s.Delete(k); err != nil {
   187				log.Printf("Error deleting key %s: %v", k, err)
   188			}
   189		}
   190	}
   191	
   192	func (ix *Index) ReceiveBlob(ctx context.Context, blobRef blob.Ref, source io.Reader) (blob.SizedRef, error) {
   193		// Read from source before acquiring ix.Lock (Issue 878):
   194		sniffer := NewBlobSniffer(blobRef)
   195		written, err := io.Copy(sniffer, source)
   196		if err != nil {
   197			return blob.SizedRef{}, err
   198		}
   199		sbr := blob.SizedRef{Ref: blobRef, Size: uint32(written)}
   200	
   201		ix.Lock()
   202		defer ix.Unlock()
   203	
   204		missingDeps := false
   205		defer func() {
   206			if err == nil {
   207				ix.noteBlobIndexed(blobRef)
   208				if !missingDeps {
   209					ix.removeAllMissingEdges(blobRef)
   210				}
   211			}
   212		}()
   213	
   214		// By default, return immediately if it looks like we already
   215		// have indexed this blob before.  But if the user has
   216		// CAMLI_REDO_INDEX_ON_RECEIVE set in their environment,
   217		// always index it. This is generally only useful when working
   218		// on the indexing code and retroactively indexing a subset of
   219		// content without forcing a global reindexing.
   220		if haveVal, haveErr := ix.s.Get("have:" + blobRef.String()); haveErr == nil {
   221			if strings.HasSuffix(haveVal, "|indexed") {
   222				if allowReindex, _ := strconv.ParseBool(os.Getenv("CAMLI_REDO_INDEX_ON_RECEIVE")); allowReindex {
   223					if debugEnv {
   224						log.Printf("index: reindexing %v", sbr)
   225					}
   226				} else {
   227					if debugEnv {
   228						log.Printf("index: ignoring upload of already-indexed %v", sbr)
   229					}
   230					return sbr, nil
   231				}
   232			}
   233		}
   234	
   235		sniffer.Parse()
   236	
   237		fetcher := &missTrackFetcher{
   238			fetcher: ix.blobSource,
   239		}
   240	
   241		mm, err := ix.populateMutationMap(ctx, fetcher, blobRef, sniffer)
   242		if debugEnv {
   243			log.Printf("index of %v: mm=%v, err=%v", blobRef, mm, err)
   244		}
   245		if err != nil {
   246			if err != errMissingDep {
   247				return blob.SizedRef{}, err
   248			}
   249			fetcher.mu.Lock()
   250			defer fetcher.mu.Unlock()
   251			if len(fetcher.missing) == 0 {
   252				panic("errMissingDep happened, but no fetcher.missing recorded")
   253			}
   254			missingDeps = true
   255			allRecorded := true
   256			for _, missing := range fetcher.missing {
   257				if err := ix.noteNeeded(blobRef, missing); err != nil {
   258					allRecorded = false
   259				}
   260			}
   261			if allRecorded {
   262				// Lie and say things are good. We've
   263				// successfully recorded that the blob isn't
   264				// indexed, but we'll reindex it later once
   265				// the dependent blobs arrive.
   266				return sbr, nil
   267			}
   268			return blob.SizedRef{}, err
   269		}
   270	
   271		if err := ix.commit(mm); err != nil {
   272			return blob.SizedRef{}, err
   273		}
   274	
   275		if c := ix.corpus; c != nil {
   276			if err = c.addBlob(ctx, blobRef, mm); err != nil {
   277				return blob.SizedRef{}, err
   278			}
   279		}
   280	
   281		// TODO(bradfitz): log levels? These are generally noisy
   282		// (especially in tests, like search/handler_test), but I
   283		// could see it being useful in production. For now, disabled:
   284		//
   285		// mimeType := sniffer.MIMEType()
   286		// log.Printf("indexer: received %s; type=%v; truncated=%v", blobRef, mimeType, sniffer.IsTruncated())
   287	
   288		return blob.SizedRef{Ref: blobRef, Size: uint32(written)}, nil
   289	}
   290	
   291	// commit writes the contents of the mutationMap on a batch
   292	// mutation and commits that batch. It also updates the deletes
   293	// cache.
   294	func (ix *Index) commit(mm *mutationMap) error {
   295		// We want the update of the deletes cache to be atomic
   296		// with the transaction commit, so we lock here instead
   297		// of within updateDeletesCache.
   298		ix.deletes.Lock()
   299		defer ix.deletes.Unlock()
   300		bm := ix.s.BeginBatch()
   301		for k, v := range mm.kv {
   302			bm.Set(k, v)
   303		}
   304		err := ix.s.CommitBatch(bm)
   305		if err != nil {
   306			return err
   307		}
   308		for _, cl := range mm.deletes {
   309			if err := ix.updateDeletesCache(cl); err != nil {
   310				return fmt.Errorf("Could not update the deletes cache after deletion from %v: %v", cl, err)
   311			}
   312		}
   313		return nil
   314	}
   315	
   316	func (ix *Index) verifySignature(ctx context.Context, fetcher *missTrackFetcher, schemaBlob *schema.Blob) (*jsonsign.VerifyRequest, error) {
   317		tf := &trackErrorsFetcher{f: fetcher}
   318		vr := jsonsign.NewVerificationRequest(schemaBlob.JSON(), blob.NewSerialFetcher(ix.KeyFetcher, tf))
   319		_, err := vr.Verify(ctx)
   320	
   321		if err != nil {
   322			// TODO(bradfitz): ask if the vr.Err.(jsonsign.Error).IsPermanent() and retry
   323			// later if it's not permanent?
   324			if tf.hasErrNotExist() {
   325				return nil, errMissingDep
   326			}
   327			return nil, err
   328		}
   329	
   330		return vr, nil
   331	}
   332	
   333	func (ix *Index) populateMutationMapForSchema(ctx context.Context, fetcher *missTrackFetcher, schemaBlob *schema.Blob, mm *mutationMap) error {
   334		switch schemaBlob.Type() {
   335		case schema.TypePermanode:
   336			_, err := ix.verifySignature(ctx, fetcher, schemaBlob)
   337			return err
   338		case schema.TypeClaim:
   339			vr, err := ix.verifySignature(ctx, fetcher, schemaBlob)
   340			if err != nil {
   341				return err
   342			}
   343			return ix.populateClaim(ctx, fetcher, schemaBlob, vr, mm)
   344		case schema.TypeFile:
   345			return ix.populateFile(ctx, fetcher, schemaBlob, mm)
   346		case schema.TypeDirectory:
   347			return ix.populateDir(ctx, fetcher, schemaBlob, mm)
   348		default:
   349			return nil
   350		}
   351	}
   352	
   353	// populateMutationMap populates keys & values that will be committed
   354	// into the returned map.
   355	//
   356	// the blobref can be trusted at this point (it's been fully consumed
   357	// and verified to match), and the sniffer has been populated.
   358	func (ix *Index) populateMutationMap(ctx context.Context, fetcher *missTrackFetcher, br blob.Ref, sniffer *BlobSniffer) (*mutationMap, error) {
   359		mm := &mutationMap{
   360			kv: map[string]string{
   361				"meta:" + br.String(): fmt.Sprintf("%d|%s", sniffer.Size(), sniffer.MIMEType()),
   362			},
   363		}
   364		var err error
   365		if schemaBlob, ok := sniffer.SchemaBlob(); ok {
   366			err = ix.populateMutationMapForSchema(ctx, fetcher, schemaBlob, mm)
   367		}
   368	
   369		if err != nil && err != errMissingDep {
   370			return nil, err
   371		}
   372		var haveVal string
   373		if err == errMissingDep {
   374			haveVal = fmt.Sprintf("%d", sniffer.Size())
   375		} else {
   376			haveVal = fmt.Sprintf("%d|indexed", sniffer.Size())
   377		}
   378		mm.kv["have:"+br.String()] = haveVal
   379		if len(fetcher.missing) == 0 {
   380			// If err == nil, we're good. Else (err == errMissingDep), we
   381			// know the error did not come from a fetching miss (because
   382			// len(fetcher.missing) == 0) , but from an index miss. Therefore
   383			// we know the miss has already been noted and will be dealt with
   384			// later, so we can also pretend everything's fine.
   385			return mm, nil
   386		}
   387		return mm, err
   388	}
   389	
   390	// keepFirstN keeps the first N bytes written to it in Bytes.
   391	type keepFirstN struct {
   392		N     int
   393		Bytes []byte
   394	}
   395	
   396	func (w *keepFirstN) Write(p []byte) (n int, err error) {
   397		if n := w.N - len(w.Bytes); n > 0 {
   398			if n > len(p) {
   399				n = len(p)
   400			}
   401			w.Bytes = append(w.Bytes, p[:n]...)
   402		}
   403		return len(p), nil
   404	}
   405	
   406	// missTrackFetcher is a blob.Fetcher that records which blob(s) it
   407	// failed to load from src.
   408	type missTrackFetcher struct {
   409		fetcher blob.Fetcher
   410	
   411		mu      sync.Mutex // guards missing
   412		missing []blob.Ref
   413	}
   414	
   415	func (f *missTrackFetcher) Fetch(ctx context.Context, br blob.Ref) (blob io.ReadCloser, size uint32, err error) {
   416		blob, size, err = f.fetcher.Fetch(ctx, br)
   417		if err == os.ErrNotExist {
   418			f.mu.Lock()
   419			defer f.mu.Unlock()
   420			f.missing = append(f.missing, br)
   421		}
   422		return
   423	}
   424	
   425	// trackErrorsFetcher is a blob.Fetcher that records to errs all Fetch errors.
   426	type trackErrorsFetcher struct {
   427		mu   sync.RWMutex
   428		errs []error
   429	
   430		f blob.Fetcher
   431	}
   432	
   433	func (tf *trackErrorsFetcher) Fetch(ctx context.Context, br blob.Ref) (blob io.ReadCloser, size uint32, err error) {
   434		blob, size, err = tf.f.Fetch(ctx, br)
   435		if err != nil {
   436			tf.mu.Lock()
   437			defer tf.mu.Unlock()
   438			tf.errs = append(tf.errs, err)
   439		}
   440		return
   441	}
   442	
   443	// hasErrNotExist reports whether tf recorded any error and if all of them are
   444	// os.ErrNotExist errors.
   445	func (tf *trackErrorsFetcher) hasErrNotExist() bool {
   446		tf.mu.RLock()
   447		defer tf.mu.RUnlock()
   448		if len(tf.errs) == 0 {
   449			return false
   450		}
   451		for _, v := range tf.errs {
   452			if v != os.ErrNotExist {
   453				return false
   454			}
   455		}
   456		return true
   457	}
   458	
   459	// filePrefixReader is both a *bytes.Reader and a *schema.FileReader for use in readPrefixOrFile
   460	type filePrefixReader interface {
   461		io.Reader
   462		io.ReaderAt
   463	}
   464	
   465	// readPrefixOrFile executes a given func with a reader on the passed prefix and
   466	// falls back to passing a reader on the whole file if the func returns an error.
   467	func readPrefixOrFile(prefix []byte, fetcher blob.Fetcher, b *schema.Blob, fn func(filePrefixReader) error) (err error) {
   468		pr := bytes.NewReader(prefix)
   469		err = fn(pr)
   470		if err == io.EOF || err == io.ErrUnexpectedEOF {
   471			var fr *schema.FileReader
   472			fr, err = b.NewFileReader(fetcher)
   473			if err == nil {
   474				err = fn(fr)
   475				fr.Close()
   476			}
   477		}
   478		return err
   479	}
   480	
   481	const msdosEpoch = "1980-01-01T00:00:00Z"
   482	
   483	var (
   484		exifDebug, _   = strconv.ParseBool(os.Getenv("CAMLI_DEBUG_IMAGES"))
   485		debugEnv, _    = strconv.ParseBool(os.Getenv("CAMLI_DEBUG"))
   486		msdosEpochTime time.Time
   487	)
   488	
   489	// b: the parsed file schema blob
   490	// mm: keys to populate
   491	func (ix *Index) populateFile(ctx context.Context, fetcher blob.Fetcher, b *schema.Blob, mm *mutationMap) (err error) {
   492		var times []time.Time // all creation or mod times seen; may be zero
   493		times = append(times, b.ModTime())
   494	
   495		blobRef := b.BlobRef()
   496		tf := &trackErrorsFetcher{f: fetcher.(*missTrackFetcher)}
   497		fr, err := b.NewFileReader(tf)
   498		if err != nil {
   499			return err
   500		}
   501		defer fr.Close()
   502		mimeType, mr := magic.MIMETypeFromReader(fr)
   503		if mimeType == "" {
   504			mimeType = magic.MIMETypeByExtension(filepath.Ext(b.FileName()))
   505		}
   506	
   507		h := blob.NewHash()
   508		var copyDest io.Writer = h
   509		var imageBuf *keepFirstN // or nil
   510		if strings.HasPrefix(mimeType, "image/") {
   511			imageBuf = &keepFirstN{N: 512 << 10}
   512			copyDest = io.MultiWriter(copyDest, imageBuf)
   513		}
   514		size, err := io.Copy(copyDest, mr)
   515		if err != nil {
   516			if tf.hasErrNotExist() {
   517				return errMissingDep
   518			}
   519			return err
   520		}
   521		wholeRef := blob.RefFromHash(h)
   522	
   523		if imageBuf != nil {
   524			var conf images.Config
   525			decodeConfig := func(r filePrefixReader) error {
   526				conf, err = images.DecodeConfig(r)
   527				return err
   528			}
   529			if err := readPrefixOrFile(imageBuf.Bytes, fetcher, b, decodeConfig); err == nil {
   530				mm.Set(keyImageSize.Key(blobRef), keyImageSize.Val(fmt.Sprint(conf.Width), fmt.Sprint(conf.Height)))
   531			} else if debugEnv {
   532				log.Printf("index: WARNING: image decodeConfig: %v", err)
   533			}
   534	
   535			exifData := imageBuf.Bytes
   536			if conf.HEICEXIF != nil {
   537				exifData = conf.HEICEXIF
   538			}
   539			var ft time.Time
   540			fileTime := func(r filePrefixReader) error {
   541				ft, err = schema.FileTime(r)
   542				return err
   543			}
   544	
   545			if err = readPrefixOrFile(exifData, fetcher, b, fileTime); err == nil {
   546				times = append(times, ft)
   547			} else if debugEnv {
   548				log.Printf("index: WARNING: image fileTime: %v", err)
   549	
   550			}
   551			if exifDebug {
   552				log.Printf("filename %q exif = %v, %v", b.FileName(), ft, err)
   553			}
   554	
   555			// TODO(mpl): find (generate?) more broken EXIF images to experiment with.
   556			indexEXIFData := func(r filePrefixReader) error {
   557				return indexEXIF(wholeRef, r, mm)
   558			}
   559			if err = readPrefixOrFile(exifData, fetcher, b, indexEXIFData); err != nil {
   560				if exifDebug {
   561					log.Printf("error parsing EXIF: %v", err)
   562				}
   563			}
   564		}
   565	
   566		var sortTimes []time.Time
   567		for _, t := range times {
   568			if !t.IsZero() {
   569				sortTimes = append(sortTimes, t)
   570			}
   571		}
   572		sort.Sort(types.ByTime(sortTimes))
   573		var time3339s string
   574		switch {
   575		case len(sortTimes) == 1:
   576			time3339s = types.Time3339(sortTimes[0]).String()
   577		case len(sortTimes) >= 2:
   578			oldest, newest := sortTimes[0], sortTimes[len(sortTimes)-1]
   579			// Common enough exception: unset creation time from an MSDOS
   580			// system (which is the default in zip files). So if we have
   581			// another time to use, just ignore the MSDOS epoch one.
   582			if oldest.After(msdosEpochTime) {
   583				time3339s = types.Time3339(oldest).String() + "," + types.Time3339(newest).String()
   584			} else {
   585				time3339s = types.Time3339(newest).String()
   586			}
   587		}
   588	
   589		mm.Set(keyWholeToFileRef.Key(wholeRef, blobRef), "1")
   590		mm.Set(keyFileInfo.Key(blobRef), keyFileInfo.Val(size, b.FileName(), mimeType, wholeRef))
   591		mm.Set(keyFileTimes.Key(blobRef), keyFileTimes.Val(time3339s))
   592	
   593		if strings.HasPrefix(mimeType, "audio/") {
   594			indexMusic(io.NewSectionReader(fr, 0, fr.Size()), wholeRef, mm)
   595		}
   596	
   597		return nil
   598	}
   599	
   600	func tagFormatString(tag *tiff.Tag) string {
   601		switch tag.Format() {
   602		case tiff.IntVal:
   603			return "int"
   604		case tiff.RatVal:
   605			return "rat"
   606		case tiff.FloatVal:
   607			return "float"
   608		case tiff.StringVal:
   609			return "string"
   610		}
   611		return ""
   612	}
   613	
   614	type exifWalkFunc func(name exif.FieldName, tag *tiff.Tag) error
   615	
   616	func (f exifWalkFunc) Walk(name exif.FieldName, tag *tiff.Tag) error { return f(name, tag) }
   617	
   618	var errEXIFPanic = errors.New("EXIF library panicked while walking fields")
   619	
   620	func indexEXIF(wholeRef blob.Ref, r io.Reader, mm *mutationMap) (err error) {
   621		var tiffErr error
   622		ex, err := exif.Decode(r)
   623		if err != nil {
   624			tiffErr = err
   625			if exif.IsCriticalError(err) {
   626				if exif.IsShortReadTagValueError(err) {
   627					return io.ErrUnexpectedEOF // trigger a retry with whole file
   628				}
   629				return
   630			}
   631			log.Printf("Non critical TIFF decoding error: %v", err)
   632		}
   633		defer func() {
   634			// The EXIF library panics if you access a field past
   635			// what the file contains.  Be paranoid and just
   636			// recover here, instead of crashing on an invalid
   637			// EXIF file.
   638			if e := recover(); e != nil {
   639				err = errEXIFPanic
   640			}
   641		}()
   642	
   643		err = ex.Walk(exifWalkFunc(func(name exif.FieldName, tag *tiff.Tag) error {
   644			tagFmt := tagFormatString(tag)
   645			if tagFmt == "" {
   646				return nil
   647			}
   648			key := keyEXIFTag.Key(wholeRef, fmt.Sprintf("%04x", tag.Id))
   649			numComp := int(tag.Count)
   650			if tag.Format() == tiff.StringVal {
   651				numComp = 1
   652			}
   653			var val bytes.Buffer
   654			val.WriteString(keyEXIFTag.Val(tagFmt, numComp, ""))
   655			if tag.Format() == tiff.StringVal {
   656				str, err := tag.StringVal()
   657				if err != nil {
   658					log.Printf("Invalid EXIF string data: %v", err)
   659					return nil
   660				}
   661				if containsUnsafeRawStrByte(str) {
   662					val.WriteString(urle(str))
   663				} else {
   664					val.WriteString(str)
   665				}
   666			} else {
   667				for i := 0; i < int(tag.Count); i++ {
   668					if i > 0 {
   669						val.WriteByte('|')
   670					}
   671					switch tagFmt {
   672					case "int":
   673						v, err := tag.Int(i)
   674						if err != nil {
   675							log.Printf("Invalid EXIF int data: %v", err)
   676							return nil
   677						}
   678						fmt.Fprintf(&val, "%d", v)
   679					case "rat":
   680						n, d, err := tag.Rat2(i)
   681						if err != nil {
   682							log.Printf("Invalid EXIF rat data: %v", err)
   683							return nil
   684						}
   685						fmt.Fprintf(&val, "%d/%d", n, d)
   686					case "float":
   687						v, err := tag.Float(i)
   688						if err != nil {
   689							log.Printf("Invalid EXIF float data: %v", err)
   690							return nil
   691						}
   692						fmt.Fprintf(&val, "%v", v)
   693					default:
   694						panic("shouldn't get here")
   695					}
   696				}
   697			}
   698			valStr := val.String()
   699			mm.Set(key, valStr)
   700			return nil
   701		}))
   702		if err != nil {
   703			return
   704		}
   705	
   706		if exif.IsGPSError(tiffErr) {
   707			log.Printf("Invalid EXIF GPS data: %v", tiffErr)
   708			return nil
   709		}
   710		if lat, long, err := ex.LatLong(); err == nil {
   711			if math.Abs(long) > 180.0 || math.Abs(lat) > 90.0 {
   712				log.Printf("Long, lat outside allowed range: %v, %v", long, lat)
   713				return nil
   714			}
   715			if math.IsNaN(long) || math.IsNaN(lat) {
   716				log.Print("Latitude or Longitude is NaN")
   717				return nil
   718			}
   719			// index 7 places fixed precision (~10mm worst case at equator)
   720			// http://stackoverflow.com/a/1947615/114581
   721			mm.Set(keyEXIFGPS.Key(wholeRef), keyEXIFGPS.Val(fmt.Sprintf("%.7f", lat), fmt.Sprintf("%.7f", long)))
   722		} else if !exif.IsTagNotPresentError(err) {
   723			log.Printf("Invalid EXIF GPS data: %v", err)
   724		}
   725		return nil
   726	}
   727	
   728	// indexMusic adds mutations to index the wholeRef by attached metadata and other properties.
   729	func indexMusic(r readerutil.SizeReaderAt, wholeRef blob.Ref, mm *mutationMap) {
   730		tag, err := taglib.Decode(r, r.Size())
   731		if err != nil {
   732			log.Print("index: error parsing tag: ", err)
   733			return
   734		}
   735	
   736		var footerLength int64 = 0
   737		if hasTag, err := media.HasID3v1Tag(r); err != nil {
   738			log.Print("index: unable to check for ID3v1 tag: ", err)
   739			return
   740		} else if hasTag {
   741			footerLength = media.ID3v1TagLength
   742		}
   743	
   744		// Generate a hash of the audio portion of the file (i.e. excluding ID3v1 and v2 tags).
   745		audioStart := int64(tag.TagSize())
   746		audioSize := r.Size() - audioStart - footerLength
   747		hash := blob.NewHash()
   748		if _, err := io.Copy(hash, io.NewSectionReader(r, audioStart, audioSize)); err != nil {
   749			log.Print("index: error generating hash of audio data: ", err)
   750			return
   751		}
   752		mediaRef := blob.RefFromHash(hash)
   753	
   754		duration, err := media.GetMPEGAudioDuration(io.NewSectionReader(r, audioStart, audioSize))
   755		if err != nil {
   756			log.Print("index: unable to calculate audio duration: ", err)
   757			duration = 0
   758		}
   759	
   760		var yearStr, trackStr, discStr, durationStr string
   761		if !tag.Year().IsZero() {
   762			const justYearLayout = "2006"
   763			yearStr = tag.Year().Format(justYearLayout)
   764		}
   765		if tag.Track() != 0 {
   766			trackStr = fmt.Sprintf("%d", tag.Track())
   767		}
   768		if tag.Disc() != 0 {
   769			discStr = fmt.Sprintf("%d", tag.Disc())
   770		}
   771		if duration != 0 {
   772			durationStr = fmt.Sprintf("%d", duration/time.Millisecond)
   773		}
   774	
   775		// Note: if you add to this map, please update
   776		// pkg/search/query.go's MediaTagConstraint Tag docs.
   777		tags := map[string]string{
   778			"title":              tag.Title(),
   779			"artist":             tag.Artist(),
   780			"album":              tag.Album(),
   781			"genre":              tag.Genre(),
   782			"musicbrainzalbumid": tag.CustomFrames()["MusicBrainz Album Id"],
   783			"year":               yearStr,
   784			"track":              trackStr,
   785			"disc":               discStr,
   786			"mediaref":           mediaRef.String(),
   787			"durationms":         durationStr,
   788		}
   789	
   790		for tag, value := range tags {
   791			if value != "" {
   792				mm.Set(keyMediaTag.Key(wholeRef, tag), keyMediaTag.Val(value))
   793			}
   794		}
   795	}
   796	
   797	// b: the parsed file schema blob
   798	// mm: keys to populate
   799	func (ix *Index) populateDir(ctx context.Context, fetcher blob.Fetcher, b *schema.Blob, mm *mutationMap) error {
   800		blobRef := b.BlobRef()
   801		// TODO(bradfitz): move the NewDirReader and FileName method off *schema.Blob and onto
   802		// StaticFile/StaticDirectory or something.
   803	
   804		tf := &trackErrorsFetcher{f: fetcher.(*missTrackFetcher)}
   805		dr, err := b.NewDirReader(ctx, tf)
   806		if err != nil {
   807			// TODO(bradfitz): propagate up a transient failure
   808			// error type, so we can retry indexing files in the
   809			// future if blobs are only temporarily unavailable.
   810			log.Printf("index: error indexing directory, creating NewDirReader %s: %v", blobRef, err)
   811			return nil
   812		}
   813		sts, err := dr.StaticSet(ctx)
   814		if err != nil {
   815			if tf.hasErrNotExist() {
   816				return errMissingDep
   817			}
   818			log.Printf("index: error indexing directory: can't get StaticSet: %v\n", err)
   819			return nil
   820		}
   821	
   822		mm.Set(keyFileInfo.Key(blobRef), keyFileInfo.Val(len(sts), b.FileName(), "", blob.Ref{}))
   823		for _, br := range sts {
   824			mm.Set(keyStaticDirChild.Key(blobRef, br.String()), "1")
   825		}
   826		return nil
   827	}
   828	
   829	var errMissingDep = errors.New("blob was not fully indexed because of a missing dependency")
   830	
   831	// populateDeleteClaim adds to mm the entries resulting from the delete claim cl.
   832	// It is assumed cl is a valid claim, and vr has already been verified.
   833	func (ix *Index) populateDeleteClaim(ctx context.Context, cl schema.Claim, vr *jsonsign.VerifyRequest, mm *mutationMap) error {
   834		br := cl.Blob().BlobRef()
   835		target := cl.Target()
   836		if !target.Valid() {
   837			log.Print(fmt.Errorf("no valid target for delete claim %v", br))
   838			return nil
   839		}
   840		meta, err := ix.GetBlobMeta(ctx, target)
   841		if err != nil {
   842			if err == os.ErrNotExist {
   843				if err := ix.noteNeeded(br, target); err != nil {
   844					return fmt.Errorf("could not note that delete claim %v depends on %v: %v", br, target, err)
   845				}
   846				return errMissingDep
   847			}
   848			log.Print(fmt.Errorf("Could not get mime type of target blob %v: %v", target, err))
   849			return nil
   850		}
   851	
   852		if meta.CamliType != schema.TypePermanode && meta.CamliType != schema.TypeClaim {
   853			log.Print(fmt.Errorf("delete claim target in %v is neither a permanode nor a claim: %v", br, meta.CamliType))
   854			return nil
   855		}
   856		mm.Set(keyDeleted.Key(target, cl.ClaimDateString(), br), "")
   857		if meta.CamliType == schema.TypeClaim {
   858			return nil
   859		}
   860		recentKey := keyRecentPermanode.Key(vr.SignerKeyId, cl.ClaimDateString(), br)
   861		mm.Set(recentKey, target.String())
   862		attr, value := cl.Attribute(), cl.Value()
   863		claimKey := keyPermanodeClaim.Key(target, vr.SignerKeyId, cl.ClaimDateString(), br)
   864		mm.Set(claimKey, keyPermanodeClaim.Val(cl.ClaimType(), attr, value, vr.CamliSigner))
   865		return nil
   866	}
   867	
   868	func (ix *Index) populateClaim(ctx context.Context, fetcher *missTrackFetcher, b *schema.Blob, vr *jsonsign.VerifyRequest, mm *mutationMap) error {
   869		br := b.BlobRef()
   870	
   871		claim, ok := b.AsClaim()
   872		if !ok {
   873			// Skip bogus claim with malformed permanode.
   874			return nil
   875		}
   876	
   877		verifiedKeyId := vr.SignerKeyId
   878		mm.signerID = verifiedKeyId
   879		mm.signerBlobRef = vr.CamliSigner
   880		mm.Set(keySignerKeyID.name+":"+vr.CamliSigner.String(), verifiedKeyId)
   881	
   882		if claim.ClaimType() == string(schema.DeleteClaim) {
   883			if err := ix.populateDeleteClaim(ctx, claim, vr, mm); err != nil {
   884				return err
   885			}
   886			mm.noteDelete(claim)
   887			return nil
   888		}
   889	
   890		pnbr := claim.ModifiedPermanode()
   891		if !pnbr.Valid() {
   892			// A different type of claim; not modifying a permanode.
   893			return nil
   894		}
   895	
   896		attr, value := claim.Attribute(), claim.Value()
   897		recentKey := keyRecentPermanode.Key(verifiedKeyId, claim.ClaimDateString(), br)
   898		mm.Set(recentKey, pnbr.String())
   899		claimKey := keyPermanodeClaim.Key(pnbr, verifiedKeyId, claim.ClaimDateString(), br)
   900		mm.Set(claimKey, keyPermanodeClaim.Val(claim.ClaimType(), attr, value, vr.CamliSigner))
   901	
   902		if strings.HasPrefix(attr, "camliPath:") {
   903			targetRef, ok := blob.Parse(value)
   904			if ok {
   905				// TODO: deal with set-attribute vs. del-attribute
   906				// properly? I think we get it for free when
   907				// del-attribute has no Value, but we need to deal
   908				// with the case where they explicitly delete the
   909				// current value.
   910				suffix := attr[len("camliPath:"):]
   911				active := "Y"
   912				if claim.ClaimType() == "del-attribute" {
   913					active = "N"
   914				}
   915				baseRef := pnbr
   916				claimRef := br
   917	
   918				key := keyPathBackward.Key(verifiedKeyId, targetRef, claimRef)
   919				val := keyPathBackward.Val(claim.ClaimDateString(), baseRef, active, suffix)
   920				mm.Set(key, val)
   921	
   922				key = keyPathForward.Key(verifiedKeyId, baseRef, suffix, claim.ClaimDateString(), claimRef)
   923				val = keyPathForward.Val(active, targetRef)
   924				mm.Set(key, val)
   925			}
   926		}
   927	
   928		if claim.ClaimType() != string(schema.DelAttributeClaim) && IsIndexedAttribute(attr) {
   929			key := keySignerAttrValue.Key(verifiedKeyId, attr, value, claim.ClaimDateString(), br)
   930			mm.Set(key, keySignerAttrValue.Val(pnbr))
   931		}
   932	
   933		if IsBlobReferenceAttribute(attr) {
   934			targetRef, ok := blob.Parse(value)
   935			if ok {
   936				key := keyEdgeBackward.Key(targetRef, pnbr, br)
   937				mm.Set(key, keyEdgeBackward.Val("permanode", ""))
   938			}
   939		}
   940	
   941		return nil
   942	}
   943	
   944	// updateDeletesCache updates the index deletes cache with the cl delete claim.
   945	// deleteClaim is trusted to be a valid delete Claim.
   946	func (ix *Index) updateDeletesCache(deleteClaim schema.Claim) error {
   947		target := deleteClaim.Target()
   948		deleter := deleteClaim.Blob()
   949		when, err := deleter.ClaimDate()
   950		if err != nil {
   951			return fmt.Errorf("Could not get date of delete claim %v: %v", deleteClaim, err)
   952		}
   953		targetDeletions := append(ix.deletes.m[target],
   954			deletion{
   955				deleter: deleter.BlobRef(),
   956				when:    when,
   957			})
   958		sort.Sort(sort.Reverse(byDeletionDate(targetDeletions)))
   959		ix.deletes.m[target] = targetDeletions
   960		return nil
   961	}
Website layout inspired by memcached.
Content by the authors.