Home Download Docs Code Community
     1	/*
     2	Copyright 2013 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	package diskpacked
    18	
    19	import (
    20		"bufio"
    21		"bytes"
    22		"context"
    23		"errors"
    24		"fmt"
    25		"io"
    26		"log"
    27		"os"
    28		"strconv"
    29	
    30		"go4.org/jsonconfig"
    31		"perkeep.org/pkg/blob"
    32		"perkeep.org/pkg/sorted"
    33	
    34		// possible index formats
    35		_ "perkeep.org/pkg/sorted/kvfile"
    36		_ "perkeep.org/pkg/sorted/leveldb"
    37		_ "perkeep.org/pkg/sorted/sqlite"
    38	)
    39	
    40	// Reindex rewrites the index files of the diskpacked .pack files
    41	func Reindex(ctx context.Context, root string, overwrite bool, indexConf jsonconfig.Obj) (err error) {
    42		// there is newStorage, but that may open a file for writing
    43		var s = &storage{root: root}
    44		index, err := newIndex(root, indexConf)
    45		if err != nil {
    46			return err
    47		}
    48		defer func() {
    49			closeErr := index.Close()
    50			// just returning the first error - if the index or disk is corrupt
    51			// and can't close, it's very likely these two errors are related and
    52			// have the same root cause.
    53			if err == nil {
    54				err = closeErr
    55			}
    56		}()
    57	
    58		for i := 0; i >= 0; i++ {
    59			fh, err := os.Open(s.filename(i))
    60			if err != nil {
    61				if os.IsNotExist(err) {
    62					break
    63				}
    64				return err
    65			}
    66			err = s.reindexOne(ctx, index, overwrite, i)
    67			fh.Close()
    68			if err != nil {
    69				return err
    70			}
    71		}
    72		return nil
    73	}
    74	
    75	func (s *storage) reindexOne(ctx context.Context, index sorted.KeyValue, overwrite bool, packID int) error {
    76	
    77		var batch sorted.BatchMutation
    78		if overwrite {
    79			batch = index.BeginBatch()
    80		}
    81		allOk := true
    82	
    83		verbose := ctxGetVerbose(ctx)
    84		misses := make(map[blob.Ref]string, 8)
    85		err := s.walkPack(verbose, packID,
    86			func(packID int, ref blob.Ref, offset int64, size uint32) error {
    87				if !ref.Valid() {
    88					if verbose {
    89						log.Printf("found deleted blob in %d at %d with size %d", packID, offset, size)
    90					}
    91					return nil
    92				}
    93				meta := blobMeta{packID, offset, size}.String()
    94				if overwrite && batch != nil {
    95					batch.Set(ref.String(), meta)
    96					return nil
    97				}
    98				delete(misses, ref)
    99				if old, err := index.Get(ref.String()); err != nil {
   100					allOk = false
   101					if errors.Is(err, sorted.ErrNotFound) {
   102						log.Println(ref.String() + ": cannot find in index!")
   103					} else {
   104						log.Println(ref.String()+": error getting from index: ", err.Error())
   105					}
   106				} else if old != meta {
   107					if old > meta {
   108						misses[ref] = meta
   109						log.Printf("WARN: possible duplicate blob %s", ref.String())
   110					} else {
   111						allOk = false
   112						log.Printf("ERROR: index mismatch for %s - index=%s, meta=%s!", ref.String(), old, meta)
   113					}
   114				}
   115				return nil
   116			})
   117		if err != nil {
   118			return err
   119		}
   120	
   121		for ref, meta := range misses {
   122			log.Printf("ERROR: index mismatch for %s (%s)!", ref.String(), meta)
   123			allOk = false
   124		}
   125	
   126		if overwrite && batch != nil {
   127			if err := index.CommitBatch(batch); err != nil {
   128				return err
   129			}
   130		} else if !allOk {
   131			return fmt.Errorf("index does not match data in %d", packID)
   132		}
   133		return nil
   134	}
   135	
   136	// Walk walks the storage and calls the walker callback with each blobref
   137	// stops if walker returns non-nil error, and returns that
   138	func (s *storage) Walk(ctx context.Context,
   139		walker func(packID int, ref blob.Ref, offset int64, size uint32) error) error {
   140	
   141		verbose := ctxGetVerbose(ctx)
   142	
   143		for i := 0; i >= 0; i++ {
   144			fh, err := os.Open(s.filename(i))
   145			if err != nil {
   146				if os.IsNotExist(err) {
   147					break
   148				}
   149				return err
   150			}
   151			fh.Close()
   152			if err = s.walkPack(verbose, i, walker); err != nil {
   153				return err
   154			}
   155		}
   156		return nil
   157	}
   158	
   159	// walkPack walks the given pack and calls the walker callback with each blobref.
   160	// Stops if walker returns non-nil error and returns that.
   161	func (s *storage) walkPack(verbose bool, packID int,
   162		walker func(packID int, ref blob.Ref, offset int64, size uint32) error) error {
   163	
   164		fh, err := os.Open(s.filename(packID))
   165		if err != nil {
   166			return err
   167		}
   168		defer fh.Close()
   169		name := fh.Name()
   170	
   171		var (
   172			pos  int64
   173			size uint32
   174			ref  blob.Ref
   175		)
   176	
   177		errAt := func(prefix, suffix string) error {
   178			if prefix != "" {
   179				prefix = prefix + " "
   180			}
   181			if suffix != "" {
   182				suffix = " " + suffix
   183			}
   184			return fmt.Errorf(prefix+"at %d (0x%x) in %q:"+suffix, pos, pos, name)
   185		}
   186	
   187		br := bufio.NewReaderSize(fh, 512)
   188		for {
   189			if b, err := br.ReadByte(); err != nil {
   190				if errors.Is(err, io.EOF) {
   191					break
   192				}
   193				return errAt("error while reading", err.Error())
   194			} else if b != '[' {
   195				return errAt(fmt.Sprintf("found byte 0x%x", b), "but '[' should be here!")
   196			}
   197			chunk, err := br.ReadSlice(']')
   198			if err != nil {
   199				if errors.Is(err, io.EOF) {
   200					break
   201				}
   202				return errAt("error reading blob header", err.Error())
   203			}
   204			m := len(chunk)
   205			chunk = chunk[:m-1]
   206			i := bytes.IndexByte(chunk, byte(' '))
   207			if i <= 0 {
   208				return errAt("", fmt.Sprintf("bad header format (no space in %q)", chunk))
   209			}
   210			size64, err := strconv.ParseUint(string(chunk[i+1:]), 10, 32)
   211			if err != nil {
   212				return errAt(fmt.Sprintf("cannot parse size %q as int", chunk[i+1:]), err.Error())
   213			}
   214			size = uint32(size64)
   215	
   216			if deletedBlobRef.Match(chunk[:i]) {
   217				ref = blob.Ref{}
   218				if verbose {
   219					log.Printf("found deleted at %d", pos)
   220				}
   221			} else {
   222				var ok bool
   223				ref, ok = blob.Parse(string(chunk[:i]))
   224				if !ok {
   225					return errAt("", fmt.Sprintf("cannot parse %q as blobref", chunk[:i]))
   226				}
   227				if verbose {
   228					log.Printf("found %s at %d", ref, pos)
   229				}
   230			}
   231			if err = walker(packID, ref, pos+1+int64(m), size); err != nil {
   232				return err
   233			}
   234	
   235			pos += 1 + int64(m)
   236			// TODO(tgulacsi): not just seek, but check the hashes of the files
   237			// maybe with a different command-line flag, only.
   238			if pos, err = fh.Seek(pos+int64(size), 0); err != nil {
   239				return errAt("", "cannot seek +"+strconv.FormatUint(size64, 10)+" bytes")
   240			}
   241			// drain the buffer after the underlying reader Seeks
   242			_, _ = io.CopyN(io.Discard, br, int64(br.Buffered()))
   243		}
   244		return nil
   245	}
   246	
   247	type verboseCtxKey struct{}
   248	
   249	func ctxGetVerbose(ctx context.Context) bool {
   250		b, _ := ctx.Value(verboseCtxKey{}).(bool)
   251		return b
   252	}
   253	func CtxSetVerbose(ctx context.Context, verbose bool) context.Context {
   254		return context.WithValue(ctx, verboseCtxKey{}, verbose)
   255	}
Website layout inspired by memcached.
Content by the authors.