Home Download Docs Code Community
     1	/*
     2	Copyright 2011 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	package schema
    18	
    19	import (
    20		"context"
    21		"errors"
    22		"fmt"
    23		"io"
    24		"log"
    25		"os"
    26		"sync"
    27		"time"
    28	
    29		"perkeep.org/pkg/blob"
    30		"perkeep.org/pkg/env"
    31	
    32		"go4.org/readerutil"
    33		"go4.org/syncutil"
    34		"go4.org/syncutil/singleflight"
    35		"go4.org/types"
    36	)
    37	
    38	// A FileReader reads the bytes of "file" and "bytes" schema blobrefs.
    39	type FileReader struct {
    40		// Immutable stuff:
    41		*io.SectionReader             // provides Read, Seek, and Size.
    42		parent            *FileReader // or nil. for sub-region readers to find the top.
    43		rootOff           int64       // this FileReader's offset from the root
    44		fetcher           blob.Fetcher
    45		ss                *superset
    46		size              int64 // total number of bytes
    47	
    48		sfg singleflight.Group // for loading blobrefs for ssm
    49	
    50		blobmu   sync.Mutex // guards lastBlob
    51		lastBlob *blob.Blob // most recently fetched blob; cuts dup reads up to 85x
    52	
    53		ssmmu sync.Mutex             // guards ssm
    54		ssm   map[blob.Ref]*superset // blobref -> superset
    55	}
    56	
    57	var _ interface {
    58		io.Seeker
    59		io.ReaderAt
    60		io.Reader
    61		io.Closer
    62		Size() int64
    63	} = (*FileReader)(nil)
    64	
    65	// NewFileReader returns a new FileReader reading the contents of fileBlobRef,
    66	// fetching blobs from fetcher.  The fileBlobRef must be of a "bytes" or "file"
    67	// schema blob.
    68	//
    69	// The caller should call Close on the FileReader when done reading.
    70	func NewFileReader(ctx context.Context, fetcher blob.Fetcher, fileBlobRef blob.Ref) (*FileReader, error) {
    71		// TODO(bradfitz): rename this into bytes reader? but for now it's still
    72		//                 named FileReader, but can also read a "bytes" schema.
    73		if !fileBlobRef.Valid() {
    74			return nil, errors.New("schema/filereader: NewFileReader blobref invalid")
    75		}
    76		rc, _, err := fetcher.Fetch(ctx, fileBlobRef)
    77		if err != nil {
    78			return nil, fmt.Errorf("schema/filereader: fetching file schema blob: %w", err)
    79		}
    80		defer rc.Close()
    81		ss, err := parseSuperset(rc)
    82		if err != nil {
    83			return nil, fmt.Errorf("schema/filereader: decoding file schema blob: %w", err)
    84		}
    85		ss.BlobRef = fileBlobRef
    86		if ss.Type != "file" && ss.Type != "bytes" {
    87			return nil, fmt.Errorf("schema/filereader: expected \"file\" or \"bytes\" schema blob, got %q", ss.Type)
    88		}
    89		fr, err := ss.NewFileReader(fetcher)
    90		if err != nil {
    91			return nil, fmt.Errorf("schema/filereader: creating FileReader for %s: %w", fileBlobRef, err)
    92		}
    93		return fr, nil
    94	}
    95	
    96	func (b *Blob) NewFileReader(fetcher blob.Fetcher) (*FileReader, error) {
    97		return b.ss.NewFileReader(fetcher)
    98	}
    99	
   100	// NewFileReader returns a new FileReader, reading bytes and blobs
   101	// from the provided fetcher.
   102	//
   103	// NewFileReader does no fetch operation on the fetcher itself.  The
   104	// fetcher is only used in subsequent read operations.
   105	//
   106	// An error is only returned if the type of the superset is not either
   107	// "file" or "bytes".
   108	func (ss *superset) NewFileReader(fetcher blob.Fetcher) (*FileReader, error) {
   109		if ss.Type != "file" && ss.Type != "bytes" {
   110			return nil, fmt.Errorf("schema/filereader: Superset not of type \"file\" or \"bytes\"")
   111		}
   112		size := int64(ss.SumPartsSize())
   113		fr := &FileReader{
   114			fetcher: fetcher,
   115			ss:      ss,
   116			size:    size,
   117			ssm:     make(map[blob.Ref]*superset),
   118		}
   119		fr.SectionReader = io.NewSectionReader(fr, 0, size)
   120		return fr, nil
   121	}
   122	
   123	// LoadAllChunks starts a process of loading all chunks of this file
   124	// as quickly as possible. The contents are immediately discarded, so
   125	// it is assumed that the fetcher is a caching fetcher.
   126	func (fr *FileReader) LoadAllChunks() {
   127		// TODO: ask the underlying blobserver to do this if it would
   128		// prefer.  Some blobservers (like blobpacked) might not want
   129		// to do this at all.
   130		go fr.loadAllChunksSync(context.Background())
   131	}
   132	
   133	func (fr *FileReader) loadAllChunksSync(ctx context.Context) {
   134		gate := syncutil.NewGate(20) // num readahead chunk loads at a time
   135		fr.ForeachChunk(ctx, func(_ []blob.Ref, p BytesPart) error {
   136			if !p.BlobRef.Valid() {
   137				return nil
   138			}
   139			gate.Start()
   140			go func(br blob.Ref) {
   141				defer gate.Done()
   142				rc, _, err := fr.fetcher.Fetch(ctx, br)
   143				if err == nil {
   144					defer rc.Close()
   145					var b [1]byte
   146					rc.Read(b[:]) // fault in the blob
   147				}
   148			}(p.BlobRef)
   149			return nil
   150		})
   151	}
   152	
   153	// UnixMtime returns the file schema's UnixMtime field, or the zero value.
   154	func (fr *FileReader) UnixMtime() time.Time {
   155		t, err := time.Parse(time.RFC3339, fr.ss.UnixMtime)
   156		if err != nil {
   157			return time.Time{}
   158		}
   159		return t
   160	}
   161	
   162	// FileName returns the file schema's filename, if any.
   163	func (fr *FileReader) FileName() string { return fr.ss.FileNameString() }
   164	
   165	func (fr *FileReader) ModTime() time.Time { return fr.ss.ModTime() }
   166	
   167	func (fr *FileReader) FileMode() os.FileMode { return fr.ss.FileMode() }
   168	
   169	func (fr *FileReader) SchemaBlobRef() blob.Ref { return fr.ss.BlobRef }
   170	
   171	// Close currently does nothing.
   172	func (fr *FileReader) Close() error { return nil }
   173	
   174	func (fr *FileReader) ReadAt(p []byte, offset int64) (n int, err error) {
   175		if offset < 0 {
   176			return 0, errors.New("schema/filereader: negative offset")
   177		}
   178		if offset >= fr.Size() {
   179			return 0, io.EOF
   180		}
   181		want := len(p)
   182		for len(p) > 0 && err == nil {
   183			rc, err := fr.readerForOffset(context.TODO(), offset)
   184			if err != nil {
   185				return n, err
   186			}
   187			var n1 int
   188			n1, err = io.ReadFull(rc, p)
   189			rc.Close()
   190			if err == io.EOF || err == io.ErrUnexpectedEOF {
   191				err = nil
   192			}
   193			if n1 == 0 {
   194				break
   195			}
   196			p = p[n1:]
   197			offset += int64(n1)
   198			n += n1
   199		}
   200		if n < want && err == nil {
   201			err = io.ErrUnexpectedEOF
   202		}
   203		return n, err
   204	}
   205	
   206	// ForeachChunk calls fn for each chunk of fr, in order.
   207	//
   208	// The schemaPath argument will be the path from the "file" or "bytes"
   209	// schema blob down to possibly other "bytes" schema blobs, the final
   210	// one of which references the given BytesPart. The BytesPart will be
   211	// the actual chunk. The fn function will not be called with
   212	// BytesParts referencing a "BytesRef"; those are followed recursively
   213	// instead. The fn function must not retain or mutate schemaPath.
   214	//
   215	// If fn returns an error, iteration stops and that error is returned
   216	// from ForeachChunk. Other errors may be returned from ForeachChunk
   217	// if schema blob fetches fail.
   218	func (fr *FileReader) ForeachChunk(ctx context.Context, fn func(schemaPath []blob.Ref, p BytesPart) error) error {
   219		return fr.foreachChunk(ctx, fn, nil)
   220	}
   221	
   222	func (fr *FileReader) foreachChunk(ctx context.Context, fn func([]blob.Ref, BytesPart) error, path []blob.Ref) error {
   223		path = append(path, fr.ss.BlobRef)
   224		for _, bp := range fr.ss.Parts {
   225			if bp.BytesRef.Valid() && bp.BlobRef.Valid() {
   226				return fmt.Errorf("part in %v illegally contained both a blobRef and bytesRef", fr.ss.BlobRef)
   227			}
   228			if bp.BytesRef.Valid() {
   229				ss, err := fr.getSuperset(ctx, bp.BytesRef)
   230				if err != nil {
   231					return err
   232				}
   233				subfr, err := ss.NewFileReader(fr.fetcher)
   234				if err != nil {
   235					return err
   236				}
   237				subfr.parent = fr
   238				if err := subfr.foreachChunk(ctx, fn, path); err != nil {
   239					return err
   240				}
   241			} else {
   242				if err := fn(path, *bp); err != nil {
   243					return err
   244				}
   245			}
   246		}
   247		return nil
   248	}
   249	
   250	func (fr *FileReader) rootReader() *FileReader {
   251		if fr.parent != nil {
   252			return fr.parent.rootReader()
   253		}
   254		return fr
   255	}
   256	
   257	func (fr *FileReader) getBlob(ctx context.Context, br blob.Ref) (*blob.Blob, error) {
   258		if root := fr.rootReader(); root != fr {
   259			return root.getBlob(ctx, br)
   260		}
   261		fr.blobmu.Lock()
   262		last := fr.lastBlob
   263		fr.blobmu.Unlock()
   264		if last != nil && last.Ref() == br {
   265			return last, nil
   266		}
   267		blob, err := blob.FromFetcher(ctx, fr.fetcher, br)
   268		if err != nil {
   269			return nil, err
   270		}
   271	
   272		fr.blobmu.Lock()
   273		fr.lastBlob = blob
   274		fr.blobmu.Unlock()
   275		return blob, nil
   276	}
   277	
   278	func (fr *FileReader) getSuperset(ctx context.Context, br blob.Ref) (*superset, error) {
   279		if root := fr.rootReader(); root != fr {
   280			return root.getSuperset(ctx, br)
   281		}
   282		brStr := br.String()
   283		ssi, err := fr.sfg.Do(brStr, func() (interface{}, error) {
   284			fr.ssmmu.Lock()
   285			ss, ok := fr.ssm[br]
   286			fr.ssmmu.Unlock()
   287			if ok {
   288				return ss, nil
   289			}
   290			rc, _, err := fr.fetcher.Fetch(ctx, br)
   291			if err != nil {
   292				return nil, fmt.Errorf("schema/filereader: fetching file schema blob: %w", err)
   293			}
   294			defer rc.Close()
   295			ss, err = parseSuperset(rc)
   296			if err != nil {
   297				return nil, err
   298			}
   299			ss.BlobRef = br
   300			fr.ssmmu.Lock()
   301			defer fr.ssmmu.Unlock()
   302			fr.ssm[br] = ss
   303			return ss, nil
   304		})
   305		if err != nil {
   306			return nil, err
   307		}
   308		return ssi.(*superset), nil
   309	}
   310	
   311	var debug = env.IsDebug()
   312	
   313	// readerForOffset returns a ReadCloser that reads some number of bytes and then EOF
   314	// from the provided offset.  Seeing EOF doesn't mean the end of the whole file; just the
   315	// chunk at that offset.  The caller must close the ReadCloser when done reading.
   316	// The provided context is not used after the method returns.
   317	func (fr *FileReader) readerForOffset(ctx context.Context, off int64) (io.ReadCloser, error) {
   318		if debug {
   319			log.Printf("(%p) readerForOffset %d + %d = %d", fr, fr.rootOff, off, fr.rootOff+off)
   320		}
   321		if off < 0 {
   322			panic("negative offset")
   323		}
   324		if off >= fr.size {
   325			return types.EmptyBody, nil
   326		}
   327		offRemain := off
   328		var skipped int64
   329		parts := fr.ss.Parts
   330		for len(parts) > 0 && parts[0].Size <= uint64(offRemain) {
   331			offRemain -= int64(parts[0].Size)
   332			skipped += int64(parts[0].Size)
   333			parts = parts[1:]
   334		}
   335		if len(parts) == 0 {
   336			return types.EmptyBody, nil
   337		}
   338		p0 := parts[0]
   339		var rsc readerutil.ReadSeekCloser
   340		var err error
   341		switch {
   342		case p0.BlobRef.Valid() && p0.BytesRef.Valid():
   343			return nil, fmt.Errorf("part illegally contained both a blobRef and bytesRef")
   344		case !p0.BlobRef.Valid() && !p0.BytesRef.Valid():
   345			return io.NopCloser(
   346				io.LimitReader(zeroReader{},
   347					int64(p0.Size-uint64(offRemain)))), nil
   348		case p0.BlobRef.Valid():
   349			blob, err := fr.getBlob(ctx, p0.BlobRef)
   350			if err != nil {
   351				return nil, err
   352			}
   353			byteReader, err := blob.ReadAll(ctx)
   354			if err != nil {
   355				return nil, err
   356			}
   357			rsc = struct {
   358				io.ReadSeeker
   359				io.Closer
   360			}{
   361				byteReader,
   362				io.NopCloser(nil),
   363			}
   364		case p0.BytesRef.Valid():
   365			var ss *superset
   366			ss, err = fr.getSuperset(ctx, p0.BytesRef)
   367			if err != nil {
   368				return nil, err
   369			}
   370			rsc, err = ss.NewFileReader(fr.fetcher)
   371			if err == nil {
   372				subFR := rsc.(*FileReader)
   373				subFR.parent = fr.rootReader()
   374				subFR.rootOff = fr.rootOff + skipped
   375			}
   376		}
   377		if err != nil {
   378			return nil, err
   379		}
   380		offRemain += int64(p0.Offset)
   381		if offRemain > 0 {
   382			newPos, err := rsc.Seek(offRemain, os.SEEK_SET)
   383			if err != nil {
   384				return nil, err
   385			}
   386			if newPos != offRemain {
   387				panic("Seek didn't work")
   388			}
   389		}
   390		return struct {
   391			io.Reader
   392			io.Closer
   393		}{
   394			io.LimitReader(rsc, int64(p0.Size)),
   395			rsc,
   396		}, nil
   397	}
   398	
   399	type zeroReader struct{}
   400	
   401	func (zeroReader) Read(p []byte) (n int, err error) {
   402		for i := range p {
   403			p[i] = 0
   404		}
   405		return len(p), nil
   406	}
Website layout inspired by memcached.
Content by the authors.