Home Download Docs Code Community
     1	/*
     2	Copyright 2011 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	package schema
    18	
    19	import (
    20		"context"
    21		"errors"
    22		"fmt"
    23		"io"
    24		"io/ioutil"
    25		"log"
    26		"os"
    27		"sync"
    28		"time"
    29	
    30		"perkeep.org/pkg/blob"
    31		"perkeep.org/pkg/env"
    32	
    33		"go4.org/readerutil"
    34		"go4.org/syncutil"
    35		"go4.org/syncutil/singleflight"
    36		"go4.org/types"
    37	)
    38	
    39	// A FileReader reads the bytes of "file" and "bytes" schema blobrefs.
    40	type FileReader struct {
    41		// Immutable stuff:
    42		*io.SectionReader             // provides Read, Seek, and Size.
    43		parent            *FileReader // or nil. for sub-region readers to find the top.
    44		rootOff           int64       // this FileReader's offset from the root
    45		fetcher           blob.Fetcher
    46		ss                *superset
    47		size              int64 // total number of bytes
    48	
    49		sfg singleflight.Group // for loading blobrefs for ssm
    50	
    51		blobmu   sync.Mutex // guards lastBlob
    52		lastBlob *blob.Blob // most recently fetched blob; cuts dup reads up to 85x
    53	
    54		ssmmu sync.Mutex             // guards ssm
    55		ssm   map[blob.Ref]*superset // blobref -> superset
    56	}
    57	
    58	var _ interface {
    59		io.Seeker
    60		io.ReaderAt
    61		io.Reader
    62		io.Closer
    63		Size() int64
    64	} = (*FileReader)(nil)
    65	
    66	// NewFileReader returns a new FileReader reading the contents of fileBlobRef,
    67	// fetching blobs from fetcher.  The fileBlobRef must be of a "bytes" or "file"
    68	// schema blob.
    69	//
    70	// The caller should call Close on the FileReader when done reading.
    71	func NewFileReader(ctx context.Context, fetcher blob.Fetcher, fileBlobRef blob.Ref) (*FileReader, error) {
    72		// TODO(bradfitz): rename this into bytes reader? but for now it's still
    73		//                 named FileReader, but can also read a "bytes" schema.
    74		if !fileBlobRef.Valid() {
    75			return nil, errors.New("schema/filereader: NewFileReader blobref invalid")
    76		}
    77		rc, _, err := fetcher.Fetch(ctx, fileBlobRef)
    78		if err != nil {
    79			return nil, fmt.Errorf("schema/filereader: fetching file schema blob: %v", err)
    80		}
    81		defer rc.Close()
    82		ss, err := parseSuperset(rc)
    83		if err != nil {
    84			return nil, fmt.Errorf("schema/filereader: decoding file schema blob: %v", err)
    85		}
    86		ss.BlobRef = fileBlobRef
    87		if ss.Type != "file" && ss.Type != "bytes" {
    88			return nil, fmt.Errorf("schema/filereader: expected \"file\" or \"bytes\" schema blob, got %q", ss.Type)
    89		}
    90		fr, err := ss.NewFileReader(fetcher)
    91		if err != nil {
    92			return nil, fmt.Errorf("schema/filereader: creating FileReader for %s: %v", fileBlobRef, err)
    93		}
    94		return fr, nil
    95	}
    96	
    97	func (b *Blob) NewFileReader(fetcher blob.Fetcher) (*FileReader, error) {
    98		return b.ss.NewFileReader(fetcher)
    99	}
   100	
   101	// NewFileReader returns a new FileReader, reading bytes and blobs
   102	// from the provided fetcher.
   103	//
   104	// NewFileReader does no fetch operation on the fetcher itself.  The
   105	// fetcher is only used in subsequent read operations.
   106	//
   107	// An error is only returned if the type of the superset is not either
   108	// "file" or "bytes".
   109	func (ss *superset) NewFileReader(fetcher blob.Fetcher) (*FileReader, error) {
   110		if ss.Type != "file" && ss.Type != "bytes" {
   111			return nil, fmt.Errorf("schema/filereader: Superset not of type \"file\" or \"bytes\"")
   112		}
   113		size := int64(ss.SumPartsSize())
   114		fr := &FileReader{
   115			fetcher: fetcher,
   116			ss:      ss,
   117			size:    size,
   118			ssm:     make(map[blob.Ref]*superset),
   119		}
   120		fr.SectionReader = io.NewSectionReader(fr, 0, size)
   121		return fr, nil
   122	}
   123	
   124	// LoadAllChunks starts a process of loading all chunks of this file
   125	// as quickly as possible. The contents are immediately discarded, so
   126	// it is assumed that the fetcher is a caching fetcher.
   127	func (fr *FileReader) LoadAllChunks() {
   128		// TODO: ask the underlying blobserver to do this if it would
   129		// prefer.  Some blobservers (like blobpacked) might not want
   130		// to do this at all.
   131		go fr.loadAllChunksSync(context.Background())
   132	}
   133	
   134	func (fr *FileReader) loadAllChunksSync(ctx context.Context) {
   135		gate := syncutil.NewGate(20) // num readahead chunk loads at a time
   136		fr.ForeachChunk(ctx, func(_ []blob.Ref, p BytesPart) error {
   137			if !p.BlobRef.Valid() {
   138				return nil
   139			}
   140			gate.Start()
   141			go func(br blob.Ref) {
   142				defer gate.Done()
   143				rc, _, err := fr.fetcher.Fetch(ctx, br)
   144				if err == nil {
   145					defer rc.Close()
   146					var b [1]byte
   147					rc.Read(b[:]) // fault in the blob
   148				}
   149			}(p.BlobRef)
   150			return nil
   151		})
   152	}
   153	
   154	// UnixMtime returns the file schema's UnixMtime field, or the zero value.
   155	func (fr *FileReader) UnixMtime() time.Time {
   156		t, err := time.Parse(time.RFC3339, fr.ss.UnixMtime)
   157		if err != nil {
   158			return time.Time{}
   159		}
   160		return t
   161	}
   162	
   163	// FileName returns the file schema's filename, if any.
   164	func (fr *FileReader) FileName() string { return fr.ss.FileNameString() }
   165	
   166	func (fr *FileReader) ModTime() time.Time { return fr.ss.ModTime() }
   167	
   168	func (fr *FileReader) FileMode() os.FileMode { return fr.ss.FileMode() }
   169	
   170	func (fr *FileReader) SchemaBlobRef() blob.Ref { return fr.ss.BlobRef }
   171	
   172	// Close currently does nothing.
   173	func (fr *FileReader) Close() error { return nil }
   174	
   175	func (fr *FileReader) ReadAt(p []byte, offset int64) (n int, err error) {
   176		if offset < 0 {
   177			return 0, errors.New("schema/filereader: negative offset")
   178		}
   179		if offset >= fr.Size() {
   180			return 0, io.EOF
   181		}
   182		want := len(p)
   183		for len(p) > 0 && err == nil {
   184			rc, err := fr.readerForOffset(context.TODO(), offset)
   185			if err != nil {
   186				return n, err
   187			}
   188			var n1 int
   189			n1, err = io.ReadFull(rc, p)
   190			rc.Close()
   191			if err == io.EOF || err == io.ErrUnexpectedEOF {
   192				err = nil
   193			}
   194			if n1 == 0 {
   195				break
   196			}
   197			p = p[n1:]
   198			offset += int64(n1)
   199			n += n1
   200		}
   201		if n < want && err == nil {
   202			err = io.ErrUnexpectedEOF
   203		}
   204		return n, err
   205	}
   206	
   207	// ForeachChunk calls fn for each chunk of fr, in order.
   208	//
   209	// The schemaPath argument will be the path from the "file" or "bytes"
   210	// schema blob down to possibly other "bytes" schema blobs, the final
   211	// one of which references the given BytesPart. The BytesPart will be
   212	// the actual chunk. The fn function will not be called with
   213	// BytesParts referencing a "BytesRef"; those are followed recursively
   214	// instead. The fn function must not retain or mutate schemaPath.
   215	//
   216	// If fn returns an error, iteration stops and that error is returned
   217	// from ForeachChunk. Other errors may be returned from ForeachChunk
   218	// if schema blob fetches fail.
   219	func (fr *FileReader) ForeachChunk(ctx context.Context, fn func(schemaPath []blob.Ref, p BytesPart) error) error {
   220		return fr.foreachChunk(ctx, fn, nil)
   221	}
   222	
   223	func (fr *FileReader) foreachChunk(ctx context.Context, fn func([]blob.Ref, BytesPart) error, path []blob.Ref) error {
   224		path = append(path, fr.ss.BlobRef)
   225		for _, bp := range fr.ss.Parts {
   226			if bp.BytesRef.Valid() && bp.BlobRef.Valid() {
   227				return fmt.Errorf("part in %v illegally contained both a blobRef and bytesRef", fr.ss.BlobRef)
   228			}
   229			if bp.BytesRef.Valid() {
   230				ss, err := fr.getSuperset(ctx, bp.BytesRef)
   231				if err != nil {
   232					return err
   233				}
   234				subfr, err := ss.NewFileReader(fr.fetcher)
   235				if err != nil {
   236					return err
   237				}
   238				subfr.parent = fr
   239				if err := subfr.foreachChunk(ctx, fn, path); err != nil {
   240					return err
   241				}
   242			} else {
   243				if err := fn(path, *bp); err != nil {
   244					return err
   245				}
   246			}
   247		}
   248		return nil
   249	}
   250	
   251	func (fr *FileReader) rootReader() *FileReader {
   252		if fr.parent != nil {
   253			return fr.parent.rootReader()
   254		}
   255		return fr
   256	}
   257	
   258	func (fr *FileReader) getBlob(ctx context.Context, br blob.Ref) (*blob.Blob, error) {
   259		if root := fr.rootReader(); root != fr {
   260			return root.getBlob(ctx, br)
   261		}
   262		fr.blobmu.Lock()
   263		last := fr.lastBlob
   264		fr.blobmu.Unlock()
   265		if last != nil && last.Ref() == br {
   266			return last, nil
   267		}
   268		blob, err := blob.FromFetcher(ctx, fr.fetcher, br)
   269		if err != nil {
   270			return nil, err
   271		}
   272	
   273		fr.blobmu.Lock()
   274		fr.lastBlob = blob
   275		fr.blobmu.Unlock()
   276		return blob, nil
   277	}
   278	
   279	func (fr *FileReader) getSuperset(ctx context.Context, br blob.Ref) (*superset, error) {
   280		if root := fr.rootReader(); root != fr {
   281			return root.getSuperset(ctx, br)
   282		}
   283		brStr := br.String()
   284		ssi, err := fr.sfg.Do(brStr, func() (interface{}, error) {
   285			fr.ssmmu.Lock()
   286			ss, ok := fr.ssm[br]
   287			fr.ssmmu.Unlock()
   288			if ok {
   289				return ss, nil
   290			}
   291			rc, _, err := fr.fetcher.Fetch(ctx, br)
   292			if err != nil {
   293				return nil, fmt.Errorf("schema/filereader: fetching file schema blob: %v", err)
   294			}
   295			defer rc.Close()
   296			ss, err = parseSuperset(rc)
   297			if err != nil {
   298				return nil, err
   299			}
   300			ss.BlobRef = br
   301			fr.ssmmu.Lock()
   302			defer fr.ssmmu.Unlock()
   303			fr.ssm[br] = ss
   304			return ss, nil
   305		})
   306		if err != nil {
   307			return nil, err
   308		}
   309		return ssi.(*superset), nil
   310	}
   311	
   312	var debug = env.IsDebug()
   313	
   314	// readerForOffset returns a ReadCloser that reads some number of bytes and then EOF
   315	// from the provided offset.  Seeing EOF doesn't mean the end of the whole file; just the
   316	// chunk at that offset.  The caller must close the ReadCloser when done reading.
   317	// The provided context is not used after the method returns.
   318	func (fr *FileReader) readerForOffset(ctx context.Context, off int64) (io.ReadCloser, error) {
   319		if debug {
   320			log.Printf("(%p) readerForOffset %d + %d = %d", fr, fr.rootOff, off, fr.rootOff+off)
   321		}
   322		if off < 0 {
   323			panic("negative offset")
   324		}
   325		if off >= fr.size {
   326			return types.EmptyBody, nil
   327		}
   328		offRemain := off
   329		var skipped int64
   330		parts := fr.ss.Parts
   331		for len(parts) > 0 && parts[0].Size <= uint64(offRemain) {
   332			offRemain -= int64(parts[0].Size)
   333			skipped += int64(parts[0].Size)
   334			parts = parts[1:]
   335		}
   336		if len(parts) == 0 {
   337			return types.EmptyBody, nil
   338		}
   339		p0 := parts[0]
   340		var rsc readerutil.ReadSeekCloser
   341		var err error
   342		switch {
   343		case p0.BlobRef.Valid() && p0.BytesRef.Valid():
   344			return nil, fmt.Errorf("part illegally contained both a blobRef and bytesRef")
   345		case !p0.BlobRef.Valid() && !p0.BytesRef.Valid():
   346			return ioutil.NopCloser(
   347				io.LimitReader(zeroReader{},
   348					int64(p0.Size-uint64(offRemain)))), nil
   349		case p0.BlobRef.Valid():
   350			blob, err := fr.getBlob(ctx, p0.BlobRef)
   351			if err != nil {
   352				return nil, err
   353			}
   354			byteReader, err := blob.ReadAll(ctx)
   355			if err != nil {
   356				return nil, err
   357			}
   358			rsc = struct {
   359				io.ReadSeeker
   360				io.Closer
   361			}{
   362				byteReader,
   363				ioutil.NopCloser(nil),
   364			}
   365		case p0.BytesRef.Valid():
   366			var ss *superset
   367			ss, err = fr.getSuperset(ctx, p0.BytesRef)
   368			if err != nil {
   369				return nil, err
   370			}
   371			rsc, err = ss.NewFileReader(fr.fetcher)
   372			if err == nil {
   373				subFR := rsc.(*FileReader)
   374				subFR.parent = fr.rootReader()
   375				subFR.rootOff = fr.rootOff + skipped
   376			}
   377		}
   378		if err != nil {
   379			return nil, err
   380		}
   381		offRemain += int64(p0.Offset)
   382		if offRemain > 0 {
   383			newPos, err := rsc.Seek(offRemain, os.SEEK_SET)
   384			if err != nil {
   385				return nil, err
   386			}
   387			if newPos != offRemain {
   388				panic("Seek didn't work")
   389			}
   390		}
   391		return struct {
   392			io.Reader
   393			io.Closer
   394		}{
   395			io.LimitReader(rsc, int64(p0.Size)),
   396			rsc,
   397		}, nil
   398	}
   399	
   400	type zeroReader struct{}
   401	
   402	func (zeroReader) Read(p []byte) (n int, err error) {
   403		for i := range p {
   404			p[i] = 0
   405		}
   406		return len(p), nil
   407	}
Website layout inspired by memcached.
Content by the authors.