Home Download Docs Code Community
     1	/*
     2	Copyright 2011 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	package schema
    18	
    19	import (
    20		"bufio"
    21		"bytes"
    22		"context"
    23		"fmt"
    24		"io"
    25		"os"
    26		"strings"
    27		"time"
    28	
    29		"go4.org/rollsum"
    30		"perkeep.org/pkg/blob"
    31		"perkeep.org/pkg/blobserver"
    32	
    33		"go4.org/syncutil"
    34	)
    35	
    36	const (
    37		// maxBlobSize is the largest blob we ever make when cutting up
    38		// a file.
    39		maxBlobSize = 1 << 20
    40	
    41		// firstChunkSize is the ideal size of the first chunk of a
    42		// file.  It's kept smaller for the file(1) command, which
    43		// likes to read 96 kB on Linux and 256 kB on OS X.  Related
    44		// are tools which extract the EXIF metadata from JPEGs,
    45		// ID3 from mp3s, etc.  Nautilus, OS X Finder, etc.
    46		// The first chunk may be larger than this if cutting the file
    47		// here would create a small subsequent chunk (e.g. a file one
    48		// byte larger than firstChunkSize)
    49		firstChunkSize = 256 << 10
    50	
    51		// bufioReaderSize is an explicit size for our bufio.Reader,
    52		// so we don't rely on NewReader's implicit size.
    53		// We care about the buffer size because it affects how far
    54		// in advance we can detect EOF from an io.Reader that doesn't
    55		// know its size.  Detecting an EOF bufioReaderSize bytes early
    56		// means we can plan for the final chunk.
    57		bufioReaderSize = 32 << 10
    58	
    59		// tooSmallThreshold is the threshold at which rolling checksum
    60		// boundaries are ignored if the current chunk being built is
    61		// smaller than this.
    62		tooSmallThreshold = 64 << 10
    63	)
    64	
    65	// WriteFileFromReaderWithModTime creates and uploads a "file" JSON schema
    66	// composed of chunks of r, also uploading the chunks.  The returned
    67	// BlobRef is of the JSON file schema blob.
    68	// Both filename and modTime are optional.
    69	func WriteFileFromReaderWithModTime(ctx context.Context, bs blobserver.StatReceiver, filename string, modTime time.Time, r io.Reader) (blob.Ref, error) {
    70		if strings.Contains(filename, "/") {
    71			return blob.Ref{}, fmt.Errorf("schema.WriteFileFromReader: filename %q shouldn't contain a slash", filename)
    72		}
    73	
    74		m := NewFileMap(filename)
    75		if !modTime.IsZero() {
    76			m.SetModTime(modTime)
    77		}
    78		return WriteFileMap(ctx, bs, m, r)
    79	}
    80	
    81	// WriteFileFromReader creates and uploads a "file" JSON schema
    82	// composed of chunks of r, also uploading the chunks.  The returned
    83	// BlobRef is of the JSON file schema blob.
    84	// The filename is optional.
    85	func WriteFileFromReader(ctx context.Context, bs blobserver.StatReceiver, filename string, r io.Reader) (blob.Ref, error) {
    86		return WriteFileFromReaderWithModTime(ctx, bs, filename, time.Time{}, r)
    87	}
    88	
    89	// WriteFileMap uploads chunks of r to bs while populating file and
    90	// finally uploading file's Blob. The returned blobref is of file's
    91	// JSON blob.
    92	func WriteFileMap(ctx context.Context, bs blobserver.StatReceiver, file *Builder, r io.Reader) (blob.Ref, error) {
    93		return writeFileMapRolling(ctx, bs, file, r)
    94	}
    95	
    96	func serverHasBlob(ctx context.Context, bs blobserver.BlobStatter, br blob.Ref) (have bool, err error) {
    97		_, err = blobserver.StatBlob(ctx, bs, br)
    98		if err == nil {
    99			have = true
   100		} else if err == os.ErrNotExist {
   101			err = nil
   102		}
   103		return
   104	}
   105	
   106	type span struct {
   107		from, to int64
   108		bits     int
   109		br       blob.Ref
   110		children []span
   111	}
   112	
   113	func (s *span) isSingleBlob() bool {
   114		return len(s.children) == 0
   115	}
   116	
   117	func (s *span) size() int64 {
   118		size := s.to - s.from
   119		for _, cs := range s.children {
   120			size += cs.size()
   121		}
   122		return size
   123	}
   124	
   125	// noteEOFReader keeps track of when it's seen EOF, but otherwise
   126	// delegates entirely to r.
   127	type noteEOFReader struct {
   128		r      io.Reader
   129		sawEOF bool
   130	}
   131	
   132	func (r *noteEOFReader) Read(p []byte) (n int, err error) {
   133		n, err = r.r.Read(p)
   134		if err == io.EOF {
   135			r.sawEOF = true
   136		}
   137		return
   138	}
   139	
   140	func uploadString(ctx context.Context, bs blobserver.StatReceiver, br blob.Ref, s string) (blob.Ref, error) {
   141		if !br.Valid() {
   142			panic("invalid blobref")
   143		}
   144		hasIt, err := serverHasBlob(ctx, bs, br)
   145		if err != nil {
   146			return blob.Ref{}, err
   147		}
   148		if hasIt {
   149			return br, nil
   150		}
   151		_, err = blobserver.ReceiveNoHash(ctx, bs, br, strings.NewReader(s))
   152		if err != nil {
   153			return blob.Ref{}, err
   154		}
   155		return br, nil
   156	}
   157	
   158	// uploadBytes populates bb (a builder of either type "bytes" or
   159	// "file", which is a superset of "bytes"), sets it to the provided
   160	// size, and populates with provided spans.  The bytes or file schema
   161	// blob is uploaded and its blobref is returned.
   162	func uploadBytes(ctx context.Context, bs blobserver.StatReceiver, bb *Builder, size int64, s []span) *uploadBytesFuture {
   163		future := newUploadBytesFuture()
   164		parts := []BytesPart{}
   165		addBytesParts(ctx, bs, &parts, s, future)
   166	
   167		if err := bb.PopulateParts(size, parts); err != nil {
   168			future.errc <- err
   169			return future
   170		}
   171	
   172		// Hack until perkeep.org/issue/102 is fixed. If we happen to upload
   173		// the "file" schema before any of its parts arrive, then the indexer
   174		// can get confused.  So wait on the parts before, and then upload
   175		// the "file" blob afterwards.
   176		if bb.Type() == TypeFile {
   177			future.errc <- nil
   178			_, err := future.Get() // may not be nil, if children parts failed
   179			future = newUploadBytesFuture()
   180			if err != nil {
   181				future.errc <- err
   182				return future
   183			}
   184		}
   185	
   186		json := bb.Blob().JSON()
   187		br := blob.RefFromString(json)
   188		future.br = br
   189		go func() {
   190			_, err := uploadString(ctx, bs, br, json)
   191			future.errc <- err
   192		}()
   193		return future
   194	}
   195	
   196	func newUploadBytesFuture() *uploadBytesFuture {
   197		return &uploadBytesFuture{
   198			errc: make(chan error, 1),
   199		}
   200	}
   201	
   202	// An uploadBytesFuture is an eager result of a still-in-progress uploadBytes call.
   203	// Call Get to wait and get its final result.
   204	type uploadBytesFuture struct {
   205		br       blob.Ref
   206		errc     chan error
   207		children []*uploadBytesFuture
   208	}
   209	
   210	// BlobRef returns the optimistic blobref of this uploadBytes call without blocking.
   211	func (f *uploadBytesFuture) BlobRef() blob.Ref {
   212		return f.br
   213	}
   214	
   215	// Get blocks for all children and returns any final error.
   216	func (f *uploadBytesFuture) Get() (blob.Ref, error) {
   217		for _, f := range f.children {
   218			if _, err := f.Get(); err != nil {
   219				return blob.Ref{}, err
   220			}
   221		}
   222		return f.br, <-f.errc
   223	}
   224	
   225	// addBytesParts uploads the provided spans to bs, appending elements to *dst.
   226	func addBytesParts(ctx context.Context, bs blobserver.StatReceiver, dst *[]BytesPart, spans []span, parent *uploadBytesFuture) {
   227		for _, sp := range spans {
   228			if len(sp.children) == 1 && sp.children[0].isSingleBlob() {
   229				// Remove an occasional useless indirection of
   230				// what would become a bytes schema blob
   231				// pointing to a single blobref.  Just promote
   232				// the blobref child instead.
   233				child := sp.children[0]
   234				*dst = append(*dst, BytesPart{
   235					BlobRef: child.br,
   236					Size:    uint64(child.size()),
   237				})
   238				sp.children = nil
   239			}
   240			if len(sp.children) > 0 {
   241				childrenSize := int64(0)
   242				for _, cs := range sp.children {
   243					childrenSize += cs.size()
   244				}
   245				future := uploadBytes(ctx, bs, newBytes(), childrenSize, sp.children)
   246				parent.children = append(parent.children, future)
   247				*dst = append(*dst, BytesPart{
   248					BytesRef: future.BlobRef(),
   249					Size:     uint64(childrenSize),
   250				})
   251			}
   252			if sp.from == sp.to {
   253				panic("Shouldn't happen. " + fmt.Sprintf("weird span with same from & to: %#v", sp))
   254			}
   255			*dst = append(*dst, BytesPart{
   256				BlobRef: sp.br,
   257				Size:    uint64(sp.to - sp.from),
   258			})
   259		}
   260	}
   261	
   262	// writeFileMap uploads chunks of r to bs while populating fileMap and
   263	// finally uploading fileMap. The returned blobref is of fileMap's
   264	// JSON blob. It uses rolling checksum for the chunks sizes.
   265	func writeFileMapRolling(ctx context.Context, bs blobserver.StatReceiver, file *Builder, r io.Reader) (blob.Ref, error) {
   266		n, spans, err := writeFileChunks(ctx, bs, file, r)
   267		if err != nil {
   268			return blob.Ref{}, err
   269		}
   270		// The top-level content parts
   271		return uploadBytes(ctx, bs, file, n, spans).Get()
   272	}
   273	
   274	// WriteFileChunks uploads chunks of r to bs while populating file.
   275	// It does not upload file.
   276	func WriteFileChunks(ctx context.Context, bs blobserver.StatReceiver, file *Builder, r io.Reader) error {
   277		size, spans, err := writeFileChunks(ctx, bs, file, r)
   278		if err != nil {
   279			return err
   280		}
   281		parts := []BytesPart{}
   282		future := newUploadBytesFuture()
   283		addBytesParts(ctx, bs, &parts, spans, future)
   284		future.errc <- nil // Get will still block on addBytesParts' children
   285		if _, err := future.Get(); err != nil {
   286			return err
   287		}
   288		return file.PopulateParts(size, parts)
   289	}
   290	
   291	func writeFileChunks(ctx context.Context, bs blobserver.StatReceiver, file *Builder, r io.Reader) (n int64, spans []span, outerr error) {
   292		src := &noteEOFReader{r: r}
   293		bufr := bufio.NewReaderSize(src, bufioReaderSize)
   294		spans = []span{} // the tree of spans, cut on interesting rollsum boundaries
   295		rs := rollsum.New()
   296		var last int64
   297		var buf bytes.Buffer
   298		blobSize := 0 // of the next blob being built, should be same as buf.Len()
   299	
   300		const chunksInFlight = 32 // at ~64 KB chunks, this is ~2MB memory per file
   301		gatec := syncutil.NewGate(chunksInFlight)
   302		firsterrc := make(chan error, 1)
   303	
   304		// uploadLastSpan runs in the same goroutine as the loop below and is responsible for
   305		// starting uploading the contents of the buf.  It returns false if there's been
   306		// an error and the loop below should be stopped.
   307		uploadLastSpan := func() bool {
   308			chunk := buf.String()
   309			buf.Reset()
   310			br := blob.RefFromString(chunk)
   311			spans[len(spans)-1].br = br
   312			select {
   313			case outerr = <-firsterrc:
   314				return false
   315			default:
   316				// No error seen so far, continue.
   317			}
   318			gatec.Start()
   319			go func() {
   320				defer gatec.Done()
   321				if _, err := uploadString(ctx, bs, br, chunk); err != nil {
   322					select {
   323					case firsterrc <- err:
   324					default:
   325					}
   326				}
   327			}()
   328			return true
   329		}
   330	
   331		for {
   332			c, err := bufr.ReadByte()
   333			if err == io.EOF {
   334				if n != last {
   335					spans = append(spans, span{from: last, to: n})
   336					if !uploadLastSpan() {
   337						return
   338					}
   339				}
   340				break
   341			}
   342			if err != nil {
   343				return 0, nil, err
   344			}
   345	
   346			buf.WriteByte(c)
   347			n++
   348			blobSize++
   349			rs.Roll(c)
   350	
   351			var bits int
   352			onRollSplit := rs.OnSplit()
   353			switch {
   354			case blobSize == maxBlobSize:
   355				bits = 20 // arbitrary node weight; 1<<20 == 1MB
   356			case src.sawEOF:
   357				// Don't split. End is coming soon enough.
   358				continue
   359			case onRollSplit && n > firstChunkSize && blobSize > tooSmallThreshold:
   360				bits = rs.Bits()
   361			case n == firstChunkSize:
   362				bits = 18 // 1 << 18 == 256KB
   363			default:
   364				// Don't split.
   365				continue
   366			}
   367			blobSize = 0
   368	
   369			// Take any spans from the end of the spans slice that
   370			// have a smaller 'bits' score and make them children
   371			// of this node.
   372			var children []span
   373			childrenFrom := len(spans)
   374			for childrenFrom > 0 && spans[childrenFrom-1].bits < bits {
   375				childrenFrom--
   376			}
   377			if nCopy := len(spans) - childrenFrom; nCopy > 0 {
   378				children = make([]span, nCopy)
   379				copy(children, spans[childrenFrom:])
   380				spans = spans[:childrenFrom]
   381			}
   382	
   383			spans = append(spans, span{from: last, to: n, bits: bits, children: children})
   384			last = n
   385			if !uploadLastSpan() {
   386				return
   387			}
   388		}
   389	
   390		// Loop was already hit earlier.
   391		if outerr != nil {
   392			return 0, nil, outerr
   393		}
   394	
   395		// Wait for all uploads to finish, one way or another, and then
   396		// see if any generated errors.
   397		// Once this loop is done, we own all the tokens in gatec, so nobody
   398		// else can have one outstanding.
   399		for i := 0; i < chunksInFlight; i++ {
   400			gatec.Start()
   401		}
   402		select {
   403		case err := <-firsterrc:
   404			return 0, nil, err
   405		default:
   406		}
   407	
   408		return n, spans, nil
   409	
   410	}
Website layout inspired by memcached.
Content by the authors.