Home Download Docs Code Community
     1	/*
     2	Copyright 2013 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	/*
    18	Package diskpacked registers the "diskpacked" blobserver storage type,
    19	storing blobs packed together into monolithic data files
    20	with an index listing the sizes and offsets of the little blobs
    21	within the large files.
    22	
    23	Example low-level config:
    24	
    25		"/storage/": {
    26		    "handler": "storage-diskpacked",
    27		    "handlerArgs": {
    28		       "path": "/var/camlistore/blobs"
    29		     }
    30		},
    31	*/
    32	package diskpacked // import "perkeep.org/pkg/blobserver/diskpacked"
    33	
    34	import (
    35		"bufio"
    36		"bytes"
    37		"context"
    38		"errors"
    39		"expvar"
    40		"fmt"
    41		"io"
    42		"log"
    43		"os"
    44		"path/filepath"
    45		"regexp"
    46		"strings"
    47		"sync"
    48	
    49		"perkeep.org/pkg/blob"
    50		"perkeep.org/pkg/blobserver"
    51		"perkeep.org/pkg/blobserver/local"
    52		"perkeep.org/pkg/sorted"
    53	
    54		"go4.org/jsonconfig"
    55		"go4.org/lock"
    56		"go4.org/readerutil"
    57		"go4.org/strutil"
    58		"go4.org/syncutil"
    59		"go4.org/types"
    60	)
    61	
    62	// TODO(wathiede): replace with glog.V(2) when we decide our logging story.
    63	type debugT bool
    64	
    65	var debug = debugT(false)
    66	
    67	func (d debugT) Printf(format string, args ...interface{}) {
    68		if bool(d) {
    69			log.Printf(format, args...)
    70		}
    71	}
    72	
    73	func (d debugT) Println(args ...interface{}) {
    74		if bool(d) {
    75			log.Println(args...)
    76		}
    77	}
    78	
    79	const defaultMaxFileSize = 512 << 20 // 512MB
    80	
    81	type storage struct {
    82		root        string
    83		index       sorted.KeyValue
    84		maxFileSize int64
    85	
    86		writeLock io.Closer // Provided by lock.Lock, and guards other processes from accessing the file open for writes.
    87	
    88		*local.Generationer
    89	
    90		mu     sync.Mutex // Guards all I/O state.
    91		closed bool
    92		writer *os.File
    93		fds    []*os.File
    94		size   int64
    95	}
    96	
    97	func (s *storage) String() string {
    98		return fmt.Sprintf("\"diskpacked\" blob packs at %s", s.root)
    99	}
   100	
   101	var (
   102		readVar     = expvar.NewMap("diskpacked-read-bytes")
   103		readTotVar  = expvar.NewMap("diskpacked-total-read-bytes")
   104		openFdsVar  = expvar.NewMap("diskpacked-open-fds")
   105		writeVar    = expvar.NewMap("diskpacked-write-bytes")
   106		writeTotVar = expvar.NewMap("diskpacked-total-write-bytes")
   107	)
   108	
   109	const defaultIndexType = sorted.DefaultKVFileType
   110	const defaultIndexFile = "index." + defaultIndexType
   111	
   112	// IsDir reports whether dir is a diskpacked directory.
   113	func IsDir(dir string) (bool, error) {
   114		_, err := os.Stat(filepath.Join(dir, defaultIndexFile))
   115		if os.IsNotExist(err) {
   116			return false, nil
   117		}
   118		return err == nil, err
   119	}
   120	
   121	// New returns a diskpacked storage implementation, adding blobs to
   122	// the provided directory. It doesn't delete any existing blob pack
   123	// files.
   124	func New(dir string) (blobserver.Storage, error) {
   125		var maxSize int64
   126		if dh, err := os.Open(dir); err == nil {
   127			var nBlobFiles, atMax int
   128			if fis, err := dh.Readdir(-1); err == nil {
   129				// Detect existing max size from size of files, if obvious, and set maxSize to that
   130				for _, fi := range fis {
   131					if nm := fi.Name(); strings.HasPrefix(nm, "pack-") && strings.HasSuffix(nm, ".blobs") {
   132						nBlobFiles++
   133						if s := fi.Size(); s > maxSize {
   134							maxSize, atMax = fi.Size(), 0
   135						} else if s == maxSize {
   136							atMax++
   137						}
   138					}
   139				}
   140			}
   141			// Believe the deduced size only if at least 2 files has that maximum size,
   142			// and all files (except one) has the same.
   143			if !(atMax > 1 && nBlobFiles == atMax+1) {
   144				maxSize = 0
   145			}
   146		}
   147		return newStorage(dir, maxSize, nil)
   148	}
   149	
   150	// newIndex returns a new sorted.KeyValue, using either the given config, or the default.
   151	func newIndex(root string, indexConf jsonconfig.Obj) (sorted.KeyValue, error) {
   152		if len(indexConf) > 0 {
   153			return sorted.NewKeyValueMaybeWipe(indexConf)
   154		}
   155		return sorted.NewKeyValueMaybeWipe(jsonconfig.Obj{
   156			"type": defaultIndexType,
   157			"file": filepath.Join(root, defaultIndexFile),
   158		})
   159	}
   160	
   161	// newStorage returns a new storage in path root with the given maxFileSize,
   162	// or defaultMaxFileSize (512MB) if <= 0
   163	func newStorage(root string, maxFileSize int64, indexConf jsonconfig.Obj) (s *storage, err error) {
   164		fi, err := os.Stat(root)
   165		if os.IsNotExist(err) {
   166			return nil, fmt.Errorf("storage root %q doesn't exist", root)
   167		}
   168		if err != nil {
   169			return nil, fmt.Errorf("failed to stat directory %q: %w", root, err)
   170		}
   171		if !fi.IsDir() {
   172			return nil, fmt.Errorf("storage root %q exists but is not a directory", root)
   173		}
   174		index, err := newIndex(root, indexConf)
   175		if err != nil {
   176			return nil, err
   177		}
   178		defer func() {
   179			if err != nil {
   180				index.Close()
   181			}
   182		}()
   183		if maxFileSize <= 0 {
   184			maxFileSize = defaultMaxFileSize
   185		}
   186		// Be consistent with trailing slashes.  Makes expvar stats for total
   187		// reads/writes consistent across diskpacked targets, regardless of what
   188		// people put in their low level config.
   189		root = strings.TrimRight(root, `\/`)
   190		s = &storage{
   191			root:         root,
   192			index:        index,
   193			maxFileSize:  maxFileSize,
   194			Generationer: local.NewGenerationer(root),
   195		}
   196		if err := s.openAllPacks(); err != nil {
   197			s.Close()
   198			return nil, err
   199		}
   200		s.mu.Lock()
   201		defer s.mu.Unlock()
   202		if _, _, err := s.StorageGeneration(); err != nil {
   203			return nil, fmt.Errorf("error initialization generation for %q: %w", root, err)
   204		}
   205		return s, nil
   206	}
   207	
   208	func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err error) {
   209		var (
   210			path        = config.RequiredString("path")
   211			maxFileSize = config.OptionalInt("maxFileSize", 0)
   212			indexConf   = config.OptionalObject("metaIndex")
   213		)
   214		if err := config.Validate(); err != nil {
   215			return nil, err
   216		}
   217		return newStorage(path, int64(maxFileSize), indexConf)
   218	}
   219	
   220	func init() {
   221		blobserver.RegisterStorageConstructor("diskpacked", blobserver.StorageConstructor(newFromConfig))
   222	}
   223	
   224	// openForRead will open pack file n for read and keep a handle to it in
   225	// s.fds.  os.IsNotExist returned if n >= the number of pack files in s.root.
   226	// This function is not thread safe, s.mu should be locked by the caller.
   227	func (s *storage) openForRead(n int) error {
   228		if n > len(s.fds) {
   229			panic(fmt.Sprintf("openForRead called out of order got %d, expected %d", n, len(s.fds)))
   230		}
   231	
   232		fn := s.filename(n)
   233		f, err := os.Open(fn)
   234		if err != nil {
   235			return err
   236		}
   237		openFdsVar.Add(s.root, 1)
   238		debug.Printf("diskpacked: opened for read %q", fn)
   239		s.fds = append(s.fds, f)
   240		return nil
   241	}
   242	
   243	// openForWrite will create or open pack file n for writes, create a lock
   244	// visible external to the process and seek to the end of the file ready for
   245	// appending new data.
   246	// This function is not thread safe, s.mu should be locked by the caller.
   247	func (s *storage) openForWrite(n int) error {
   248		fn := s.filename(n)
   249		l, err := lock.Lock(fn + ".lock")
   250		if err != nil {
   251			return err
   252		}
   253		f, err := os.OpenFile(fn, os.O_RDWR|os.O_CREATE, 0666)
   254		if err != nil {
   255			l.Close()
   256			return err
   257		}
   258		openFdsVar.Add(s.root, 1)
   259		debug.Printf("diskpacked: opened for write %q", fn)
   260	
   261		s.size, err = f.Seek(0, os.SEEK_END)
   262		if err != nil {
   263			f.Close()
   264			l.Close()
   265			return err
   266		}
   267	
   268		s.writer = f
   269		s.writeLock = l
   270		return nil
   271	}
   272	
   273	// closePack opens any pack file currently open for writing.
   274	func (s *storage) closePack() error {
   275		var err error
   276		if s.writer != nil {
   277			err = s.writer.Close()
   278			openFdsVar.Add(s.root, -1)
   279			s.writer = nil
   280		}
   281		if s.writeLock != nil {
   282			lerr := s.writeLock.Close()
   283			if err == nil {
   284				err = lerr
   285			}
   286			s.writeLock = nil
   287		}
   288		return err
   289	}
   290	
   291	// nextPack will close the current writer and release its lock if open,
   292	// open the next pack file in sequence for writing, grab its lock, set it
   293	// to the currently active writer, and open another copy for read-only use.
   294	// This function is not thread safe, s.mu should be locked by the caller.
   295	func (s *storage) nextPack() error {
   296		debug.Println("diskpacked: nextPack")
   297		s.size = 0
   298		if err := s.closePack(); err != nil {
   299			return err
   300		}
   301		n := len(s.fds)
   302		if err := s.openForWrite(n); err != nil {
   303			return err
   304		}
   305		return s.openForRead(n)
   306	}
   307	
   308	// openAllPacks opens read-only each pack file in s.root, populating s.fds.
   309	// The latest pack file will also have a writable handle opened.
   310	// This function is not thread safe, s.mu should be locked by the caller.
   311	func (s *storage) openAllPacks() error {
   312		debug.Println("diskpacked: openAllPacks")
   313		n := 0
   314		for {
   315			err := s.openForRead(n)
   316			if os.IsNotExist(err) {
   317				break
   318			}
   319			if err != nil {
   320				return err
   321			}
   322			n++
   323		}
   324	
   325		if n == 0 {
   326			// If no pack files are found, we create one open for read and write.
   327			return s.nextPack()
   328		}
   329	
   330		// If 1 or more pack files are found, open the last one read and write.
   331		return s.openForWrite(n - 1)
   332	}
   333	
   334	// Close index and all opened fds, with locking.
   335	func (s *storage) Close() error {
   336		s.mu.Lock()
   337		defer s.mu.Unlock()
   338		if s.closed {
   339			return nil
   340		}
   341		var closeErr error
   342		s.closed = true
   343		if err := s.index.Close(); err != nil {
   344			log.Println("diskpacked: closing index:", err)
   345		}
   346		for _, f := range s.fds {
   347			err := f.Close()
   348			openFdsVar.Add(s.root, -1)
   349			if err != nil {
   350				closeErr = err
   351			}
   352		}
   353		if err := s.closePack(); err != nil && closeErr == nil {
   354			closeErr = err
   355		}
   356		return closeErr
   357	}
   358	
   359	func (s *storage) Fetch(ctx context.Context, br blob.Ref) (io.ReadCloser, uint32, error) {
   360		return s.fetch(br, 0, -1)
   361	}
   362	
   363	func (s *storage) SubFetch(ctx context.Context, br blob.Ref, offset, length int64) (io.ReadCloser, error) {
   364		if offset < 0 || length < 0 {
   365			return nil, blob.ErrNegativeSubFetch
   366		}
   367		rc, _, err := s.fetch(br, offset, length)
   368		return rc, err
   369	}
   370	
   371	// length of -1 means all
   372	func (s *storage) fetch(br blob.Ref, offset, length int64) (rc io.ReadCloser, size uint32, err error) {
   373		meta, err := s.meta(br)
   374		if err != nil {
   375			return nil, 0, err
   376		}
   377	
   378		if meta.file >= len(s.fds) {
   379			return nil, 0, fmt.Errorf("diskpacked: attempt to fetch blob from out of range pack file %d > %d", meta.file, len(s.fds))
   380		}
   381		rac := s.fds[meta.file]
   382		var rs io.ReadSeeker
   383		if length == -1 {
   384			// normal Fetch mode
   385			rs = io.NewSectionReader(rac, meta.offset, int64(meta.size))
   386		} else {
   387			if offset > int64(meta.size) {
   388				return nil, 0, blob.ErrOutOfRangeOffsetSubFetch
   389			} else if offset+length > int64(meta.size) {
   390				length = int64(meta.size) - offset
   391			}
   392			rs = io.NewSectionReader(rac, meta.offset+offset, length)
   393		}
   394		fn := rac.Name()
   395		// Ensure entry is in map.
   396		readVar.Add(fn, 0)
   397		if v, ok := readVar.Get(fn).(*expvar.Int); ok {
   398			rs = readerutil.NewStatsReadSeeker(v, rs)
   399		}
   400		readTotVar.Add(s.root, 0)
   401		if v, ok := readTotVar.Get(s.root).(*expvar.Int); ok {
   402			rs = readerutil.NewStatsReadSeeker(v, rs)
   403		}
   404		rsc := struct {
   405			io.ReadSeeker
   406			io.Closer
   407		}{
   408			rs,
   409			types.NopCloser,
   410		}
   411		return rsc, meta.size, nil
   412	}
   413	
   414	func (s *storage) filename(file int) string {
   415		return filepath.Join(s.root, fmt.Sprintf("pack-%05d.blobs", file))
   416	}
   417	
   418	var removeGate = syncutil.NewGate(20) // arbitrary
   419	
   420	// RemoveBlobs removes the blobs from index and pads data with zero bytes
   421	func (s *storage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
   422		batch := s.index.BeginBatch()
   423		var wg syncutil.Group
   424		for _, br := range blobs {
   425			br := br
   426			removeGate.Start()
   427			batch.Delete(br.String())
   428			wg.Go(func() error {
   429				defer removeGate.Done()
   430				if err := s.delete(br); err != nil && !errors.Is(err, os.ErrNotExist) {
   431					return err
   432				}
   433				return nil
   434			})
   435		}
   436		err1 := wg.Err()
   437		err2 := s.index.CommitBatch(batch)
   438		if err1 != nil {
   439			return err1
   440		}
   441		return err2
   442	}
   443	
   444	var statGate = syncutil.NewGate(20) // arbitrary
   445	
   446	func (s *storage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
   447		return blobserver.StatBlobsParallelHelper(ctx, blobs, fn, statGate, func(br blob.Ref) (sb blob.SizedRef, err error) {
   448			m, err := s.meta(br)
   449			if err == nil {
   450				return m.SizedRef(br), nil
   451			}
   452			if errors.Is(err, os.ErrNotExist) {
   453				return sb, nil
   454			}
   455			return sb, err
   456		})
   457	}
   458	
   459	func (s *storage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) (err error) {
   460		defer close(dest)
   461	
   462		t := s.index.Find(after, "")
   463		defer func() {
   464			closeErr := t.Close()
   465			if err == nil {
   466				err = closeErr
   467			}
   468		}()
   469		for i := 0; i < limit && t.Next(); {
   470			key := t.Key()
   471			if key <= after {
   472				// EnumerateBlobs' semantics are '>', but sorted.KeyValue.Find is '>='.
   473				continue
   474			}
   475			br, ok := blob.Parse(key)
   476			if !ok {
   477				return fmt.Errorf("diskpacked: couldn't parse index key %q", key)
   478			}
   479			m, ok := parseBlobMeta(t.Value())
   480			if !ok {
   481				return fmt.Errorf("diskpacked: couldn't parse index value %q: %q", key, t.Value())
   482			}
   483			select {
   484			case dest <- m.SizedRef(br):
   485			case <-ctx.Done():
   486				return ctx.Err()
   487			}
   488			i++
   489		}
   490		return nil
   491	}
   492	
   493	// The continuation token will be in the form: "<pack#> <offset>"
   494	func parseContToken(token string) (pack int, offset int64, err error) {
   495		// Special case
   496		if token == "" {
   497			pack = 0
   498			offset = 0
   499			return
   500		}
   501		_, err = fmt.Sscan(token, &pack, &offset)
   502	
   503		return
   504	}
   505	
   506	// readHeader parses "[sha1-fooooo 1234]" from r and returns the
   507	// number of bytes read (including the starting '[' and ending ']'),
   508	// the blobref bytes (not necessarily valid) and the number as a
   509	// uint32.
   510	// The consumed count returned is only valid if err == nil.
   511	// The returned digest slice is only valid until the next read from br.
   512	func readHeader(br *bufio.Reader) (consumed int, digest []byte, size uint32, err error) {
   513		line, err := br.ReadSlice(']')
   514		if err != nil {
   515			return
   516		}
   517		const minSize = len("[b-c 0]")
   518		sp := bytes.IndexByte(line, ' ')
   519		size64, err := strutil.ParseUintBytes(line[sp+1:len(line)-1], 10, 32)
   520		if len(line) < minSize || line[0] != '[' || line[len(line)-1] != ']' || sp < 0 || err != nil {
   521			return 0, nil, 0, errors.New("diskpacked: invalid header reader")
   522		}
   523		return len(line), line[1:sp], uint32(size64), nil
   524	}
   525	
   526	// The header of deleted blobs has a digest in which the hash type is
   527	// set to all 'x', the hash value is all '0', and has the correct size.
   528	var deletedBlobRef = regexp.MustCompile(`^x+-0+$`)
   529	
   530	var _ blobserver.BlobStreamer = (*storage)(nil)
   531	
   532	// StreamBlobs Implements the blobserver.StreamBlobs interface.
   533	func (s *storage) StreamBlobs(ctx context.Context, dest chan<- blobserver.BlobAndToken, contToken string) error {
   534		defer close(dest)
   535	
   536		fileNum, offset, err := parseContToken(contToken)
   537		if err != nil {
   538			return errors.New("diskpacked: invalid continuation token")
   539		}
   540		debug.Printf("Continuing blob streaming from pack %s, offset %d",
   541			s.filename(fileNum), offset)
   542	
   543		fd, err := os.Open(s.filename(fileNum))
   544		if err != nil {
   545			return err
   546		}
   547		// fd will change over time; Close whichever is current when we exit.
   548		defer func() {
   549			if fd != nil { // may be nil on os.Open error below
   550				fd.Close()
   551			}
   552		}()
   553	
   554		// ContToken always refers to the exact next place we will read from.
   555		// Note that seeking past the end is legal on Unix and for io.Seeker,
   556		// but that will just result in a mostly harmless EOF.
   557		//
   558		// TODO: probably be stricter here and don't allow seek past
   559		// the end, since we know the size of closed files and the
   560		// size of the file diskpacked currently still writing.
   561		_, err = fd.Seek(offset, io.SeekStart)
   562		if err != nil {
   563			return err
   564		}
   565	
   566		const ioBufSize = 256 * 1024
   567	
   568		// We'll use bufio to avoid read system call overhead.
   569		r := bufio.NewReaderSize(fd, ioBufSize)
   570	
   571		for {
   572			//  Are we at the EOF of this pack?
   573			if _, err := r.Peek(1); err != nil {
   574				if !errors.Is(err, io.EOF) {
   575					return err
   576				}
   577				// EOF case; continue to the next pack, if any.
   578				fileNum++
   579				offset = 0
   580				fd.Close() // Close the previous pack
   581				fd, err = os.Open(s.filename(fileNum))
   582				if os.IsNotExist(err) {
   583					// We reached the end.
   584					return nil
   585				} else if err != nil {
   586					return err
   587				}
   588				r.Reset(fd)
   589				continue
   590			}
   591	
   592			thisOffset := offset // of current blob's header
   593			consumed, digest, size, err := readHeader(r)
   594			if err != nil {
   595				return err
   596			}
   597	
   598			offset += int64(consumed)
   599			if deletedBlobRef.Match(digest) {
   600				// Skip over deletion padding
   601				if _, err := io.CopyN(io.Discard, r, int64(size)); err != nil {
   602					return err
   603				}
   604				offset += int64(size)
   605				continue
   606			}
   607	
   608			ref, ok := blob.ParseBytes(digest)
   609			if !ok {
   610				return fmt.Errorf("diskpacked: Invalid blobref %q", digest)
   611			}
   612	
   613			// Finally, read and send the blob.
   614	
   615			// TODO: remove this allocation per blob. We can make one instead
   616			// outside of the loop, guarded by a mutex, and re-use it, only to
   617			// lock the mutex and clone it if somebody actually calls ReadFull
   618			// on the *blob.Blob. Otherwise callers just scanning all the blobs
   619			// to see if they have everything incur lots of garbage if they
   620			// don't open any blobs.
   621			data := make([]byte, size)
   622			if _, err := io.ReadFull(r, data); err != nil {
   623				return err
   624			}
   625			offset += int64(size)
   626			blob := blob.NewBlob(ref, size, func(context.Context) ([]byte, error) {
   627				return data, nil
   628			})
   629			select {
   630			case dest <- blobserver.BlobAndToken{
   631				Blob:  blob,
   632				Token: fmt.Sprintf("%d %d", fileNum, thisOffset),
   633			}:
   634				// Nothing.
   635			case <-ctx.Done():
   636				return ctx.Err()
   637			}
   638		}
   639	}
   640	
   641	func (s *storage) ReceiveBlob(ctx context.Context, br blob.Ref, source io.Reader) (sbr blob.SizedRef, err error) {
   642		var b bytes.Buffer
   643		n, err := b.ReadFrom(source)
   644		if err != nil {
   645			return
   646		}
   647	
   648		sbr = blob.SizedRef{Ref: br, Size: uint32(n)}
   649	
   650		// Check if it's a dup. Still accept it if the pack file on disk seems to be corrupt
   651		// or truncated.
   652		if m, err := s.meta(br); err == nil {
   653			fi, err := os.Stat(s.filename(m.file))
   654			if err == nil && fi.Size() >= m.offset+int64(m.size) {
   655				return sbr, nil
   656			}
   657		}
   658	
   659		err = s.append(sbr, &b)
   660		return
   661	}
   662	
   663	// append writes the provided blob to the current data file.
   664	func (s *storage) append(br blob.SizedRef, r io.Reader) error {
   665		s.mu.Lock()
   666		defer s.mu.Unlock()
   667		if s.closed {
   668			return errors.New("diskpacked: write to closed storage")
   669		}
   670	
   671		// to be able to undo the append
   672		origOffset := s.size
   673	
   674		fn := s.writer.Name()
   675		n, err := fmt.Fprintf(s.writer, "[%v %v]", br.Ref.String(), br.Size)
   676		s.size += int64(n)
   677		writeVar.Add(fn, int64(n))
   678		writeTotVar.Add(s.root, int64(n))
   679		if err != nil {
   680			return err
   681		}
   682	
   683		// TODO(adg): remove this seek and the offset check once confident
   684		offset, err := s.writer.Seek(0, io.SeekCurrent)
   685		if err != nil {
   686			return err
   687		}
   688		if offset != s.size {
   689			return fmt.Errorf("diskpacked: seek says offset = %d, we think %d",
   690				offset, s.size)
   691		}
   692		offset = s.size // make this a declaration once the above is removed
   693	
   694		n2, err := io.Copy(s.writer, r)
   695		s.size += n2
   696		writeVar.Add(fn, int64(n))
   697		writeTotVar.Add(s.root, int64(n))
   698		if err != nil {
   699			return err
   700		}
   701		if n2 != int64(br.Size) {
   702			return fmt.Errorf("diskpacked: written blob size %d didn't match size %d", n, br.Size)
   703		}
   704		if err = s.writer.Sync(); err != nil {
   705			return err
   706		}
   707	
   708		packIdx := len(s.fds) - 1
   709		if s.size > s.maxFileSize {
   710			if err := s.nextPack(); err != nil {
   711				return err
   712			}
   713		}
   714		err = s.index.Set(br.Ref.String(), blobMeta{packIdx, offset, br.Size}.String())
   715		if err != nil {
   716			if _, seekErr := s.writer.Seek(origOffset, io.SeekStart); seekErr != nil {
   717				log.Printf("ERROR seeking back to the original offset: %v", seekErr)
   718			} else if truncErr := s.writer.Truncate(origOffset); truncErr != nil {
   719				log.Printf("ERROR truncating file after index error: %v", truncErr)
   720			} else {
   721				s.size = origOffset
   722			}
   723		}
   724		return err
   725	}
   726	
   727	// meta fetches the metadata for the specified blob from the index.
   728	func (s *storage) meta(br blob.Ref) (m blobMeta, err error) {
   729		ms, err := s.index.Get(br.String())
   730		if err != nil {
   731			if errors.Is(err, sorted.ErrNotFound) {
   732				err = os.ErrNotExist
   733			}
   734			return
   735		}
   736		m, ok := parseBlobMeta(ms)
   737		if !ok {
   738			err = fmt.Errorf("diskpacked: bad blob metadata: %q", ms)
   739		}
   740		return
   741	}
   742	
   743	// blobMeta is the blob metadata stored in the index.
   744	type blobMeta struct {
   745		file   int
   746		offset int64
   747		size   uint32
   748	}
   749	
   750	func parseBlobMeta(s string) (m blobMeta, ok bool) {
   751		n, err := fmt.Sscan(s, &m.file, &m.offset, &m.size)
   752		return m, n == 3 && err == nil
   753	}
   754	
   755	func (m blobMeta) String() string {
   756		return fmt.Sprintf("%v %v %v", m.file, m.offset, m.size)
   757	}
   758	
   759	func (m blobMeta) SizedRef(br blob.Ref) blob.SizedRef {
   760		return blob.SizedRef{Ref: br, Size: m.size}
   761	}
Website layout inspired by memcached.
Content by the authors.