Home Download Docs Code Community
     1	/*
     2	Copyright 2015 The Perkeep AUTHORS
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	package blobpacked
    18	
    19	// TODO: the test coverage is a little weak here. TestPacked has a large
    20	// integration test, but some Read/Close paths aren't tested well.
    21	
    22	import (
    23		"context"
    24		"errors"
    25		"fmt"
    26		"io"
    27		"log"
    28		"os"
    29		"sort"
    30	
    31		"perkeep.org/pkg/blob"
    32		"perkeep.org/pkg/conv"
    33	)
    34	
    35	// zipPart is some of the state from the "w:sha1-xxx:n" meta rows.
    36	// See docs on storage.meta.
    37	type zipPart struct {
    38		idx    uint32 // 0, 1, ..., N-1 (of N zips making up the wholeref)
    39		zipRef blob.Ref
    40		zipOff uint32 // offset inside zip to get the uncompressed content
    41		len    uint32
    42	}
    43	
    44	// byZipIndex sorts zip parts by their numeric index.
    45	// This was necessary because they may appear in lexical order from
    46	// the sorted.KeyValue as "1", "10", "11", "2", "3".
    47	type byZipIndex []zipPart
    48	
    49	func (s byZipIndex) Len() int           { return len(s) }
    50	func (s byZipIndex) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
    51	func (s byZipIndex) Less(i, j int) bool { return s[i].idx < s[j].idx }
    52	
    53	func (s *storage) OpenWholeRef(wholeRef blob.Ref, offset int64) (rc io.ReadCloser, wholeSize int64, err error) {
    54		ctx := context.TODO()
    55		// See comment before the storage.meta field for the keys/values
    56		// being scanned here.
    57		startKey := wholeMetaPrefix + wholeRef.String()
    58		it := s.meta.Find(startKey, startKey+";")
    59		if it == nil {
    60			panic("nil iterator")
    61		}
    62		rows := 0
    63		var parts []zipPart
    64		var nZipWant uint32
    65		for it.Next() {
    66			rows++
    67			k := it.KeyBytes()
    68			if len(k) == len(startKey) {
    69				if rows != 1 {
    70					// Should be first. Confused.
    71					break
    72				}
    73				if err := conv.ParseFields(it.ValueBytes(), &wholeSize, &nZipWant); err != nil {
    74					it.Close()
    75					return nil, 0, err
    76				}
    77				continue
    78			}
    79			var zp zipPart
    80			if k[len(startKey)] != ':' {
    81				// Unexpected key. Confused.
    82				break
    83			}
    84			if err := conv.ParseFields(k[len(startKey)+len(":"):], &zp.idx); err != nil {
    85				it.Close()
    86				return nil, 0, fmt.Errorf("blobpacked: error parsing meta key %q: %v", k, err)
    87			}
    88			// "<zipchunk-blobref> <offset-in-zipchunk-blobref> <offset-in-whole_u64> <length_u32>"
    89			var ignore uint64
    90			if err := conv.ParseFields(it.ValueBytes(), &zp.zipRef, &zp.zipOff, &ignore, &zp.len); err != nil {
    91				it.Close()
    92				return nil, 0, fmt.Errorf("blobpacked: error parsing meta key %q = %q: %v", k, it.ValueBytes(), err)
    93			}
    94			parts = append(parts, zp)
    95		}
    96		if err := it.Close(); err != nil {
    97			return nil, 0, err
    98		}
    99		if rows == 0 || uint32(len(parts)) != nZipWant {
   100			return nil, 0, os.ErrNotExist
   101		}
   102		sort.Sort(byZipIndex(parts))
   103		for i, zp := range parts {
   104			if zp.idx != uint32(i) {
   105				log.Printf("blobpacked: discontiguous or overlapping index for wholeref %v", wholeRef)
   106				return nil, 0, os.ErrNotExist
   107			}
   108		}
   109		needSkip := offset
   110		for len(parts) > 0 && needSkip >= int64(parts[0].len) {
   111			needSkip -= int64(parts[0].len)
   112			parts = parts[1:]
   113		}
   114		if len(parts) > 0 && needSkip > 0 {
   115			parts[0].zipOff += uint32(needSkip)
   116			parts[0].len -= uint32(needSkip)
   117		}
   118		rc = &wholeFromZips{
   119			ctx:    ctx,
   120			src:    s.large,
   121			remain: wholeSize - offset,
   122			zp:     parts,
   123		}
   124		return rc, wholeSize, nil
   125	}
   126	
   127	// wholeFromZips is an io.ReadCloser that stitches together
   128	// a wholeRef from the inside of 0+ blobpacked zip files.
   129	type wholeFromZips struct {
   130		ctx    context.Context
   131		src    blob.SubFetcher
   132		err    error // sticky
   133		closed bool
   134		remain int64
   135	
   136		// cur if non-nil is the reader to read on the next Read.
   137		// It has curRemain bytes remaining.
   138		cur       io.ReadCloser
   139		curRemain uint32
   140	
   141		// zp are the zip parts to open next, as cur is exhausted.
   142		zp []zipPart
   143	}
   144	
   145	func (zr *wholeFromZips) Read(p []byte) (n int, err error) {
   146		if zr.closed {
   147			return 0, errors.New("blobpacked: Read on Closed wholeref reader")
   148		}
   149		zr.initCur()
   150		if zr.err != nil {
   151			return 0, zr.err
   152		}
   153		if uint32(len(p)) > zr.curRemain {
   154			p = p[:zr.curRemain]
   155		}
   156		n, err = zr.cur.Read(p)
   157		if int64(n) > int64(zr.curRemain) || n < 0 {
   158			panic("Reader returned bogus number of bytes read")
   159		}
   160		zr.curRemain -= uint32(n)
   161		zr.remain -= int64(n)
   162		if zr.curRemain == 0 {
   163			zr.cur.Close()
   164			zr.cur = nil
   165		}
   166		if err == io.EOF && zr.remain > 0 {
   167			err = nil
   168		} else if err == nil && zr.remain == 0 {
   169			err = io.EOF
   170		}
   171		return n, err
   172	}
   173	
   174	func (zr *wholeFromZips) initCur() {
   175		if zr.err != nil || zr.cur != nil {
   176			return
   177		}
   178		if zr.remain <= 0 {
   179			zr.err = io.EOF
   180			return
   181		}
   182		if len(zr.zp) == 0 {
   183			zr.err = io.ErrUnexpectedEOF
   184			return
   185		}
   186		zp := zr.zp[0]
   187		zr.zp = zr.zp[1:]
   188		rc, err := zr.src.SubFetch(zr.ctx, zp.zipRef, int64(zp.zipOff), int64(zp.len))
   189		if err != nil {
   190			if err == os.ErrNotExist {
   191				err = fmt.Errorf("blobpacked: error opening next part of file: %v", err)
   192			}
   193			zr.err = err
   194			return
   195		}
   196		zr.cur = rc
   197		zr.curRemain = zp.len
   198	}
   199	
   200	func (zr *wholeFromZips) Close() error {
   201		if zr.closed {
   202			return nil
   203		}
   204		zr.closed = true
   205		var err error
   206		if zr.cur != nil {
   207			err = zr.cur.Close()
   208			zr.cur = nil
   209		}
   210		return err
   211	}
Website layout inspired by memcached.
Content by the authors.