1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package diskpacked
18
19 import (
20 "bufio"
21 "bytes"
22 "context"
23 "errors"
24 "fmt"
25 "io"
26 "log"
27 "os"
28 "strconv"
29
30 "go4.org/jsonconfig"
31 "perkeep.org/pkg/blob"
32 "perkeep.org/pkg/sorted"
33
34
35 _ "perkeep.org/pkg/sorted/kvfile"
36 _ "perkeep.org/pkg/sorted/leveldb"
37 _ "perkeep.org/pkg/sorted/sqlite"
38 )
39
40
41 func Reindex(ctx context.Context, root string, overwrite bool, indexConf jsonconfig.Obj) (err error) {
42
43 var s = &storage{root: root}
44 index, err := newIndex(root, indexConf)
45 if err != nil {
46 return err
47 }
48 defer func() {
49 closeErr := index.Close()
50
51
52
53 if err == nil {
54 err = closeErr
55 }
56 }()
57
58 for i := 0; i >= 0; i++ {
59 fh, err := os.Open(s.filename(i))
60 if err != nil {
61 if os.IsNotExist(err) {
62 break
63 }
64 return err
65 }
66 err = s.reindexOne(ctx, index, overwrite, i)
67 fh.Close()
68 if err != nil {
69 return err
70 }
71 }
72 return nil
73 }
74
75 func (s *storage) reindexOne(ctx context.Context, index sorted.KeyValue, overwrite bool, packID int) error {
76
77 var batch sorted.BatchMutation
78 if overwrite {
79 batch = index.BeginBatch()
80 }
81 allOk := true
82
83 verbose := ctxGetVerbose(ctx)
84 misses := make(map[blob.Ref]string, 8)
85 err := s.walkPack(verbose, packID,
86 func(packID int, ref blob.Ref, offset int64, size uint32) error {
87 if !ref.Valid() {
88 if verbose {
89 log.Printf("found deleted blob in %d at %d with size %d", packID, offset, size)
90 }
91 return nil
92 }
93 meta := blobMeta{packID, offset, size}.String()
94 if overwrite && batch != nil {
95 batch.Set(ref.String(), meta)
96 return nil
97 }
98 delete(misses, ref)
99 if old, err := index.Get(ref.String()); err != nil {
100 allOk = false
101 if errors.Is(err, sorted.ErrNotFound) {
102 log.Println(ref.String() + ": cannot find in index!")
103 } else {
104 log.Println(ref.String()+": error getting from index: ", err.Error())
105 }
106 } else if old != meta {
107 if old > meta {
108 misses[ref] = meta
109 log.Printf("WARN: possible duplicate blob %s", ref.String())
110 } else {
111 allOk = false
112 log.Printf("ERROR: index mismatch for %s - index=%s, meta=%s!", ref.String(), old, meta)
113 }
114 }
115 return nil
116 })
117 if err != nil {
118 return err
119 }
120
121 for ref, meta := range misses {
122 log.Printf("ERROR: index mismatch for %s (%s)!", ref.String(), meta)
123 allOk = false
124 }
125
126 if overwrite && batch != nil {
127 if err := index.CommitBatch(batch); err != nil {
128 return err
129 }
130 } else if !allOk {
131 return fmt.Errorf("index does not match data in %d", packID)
132 }
133 return nil
134 }
135
136
137
138 func (s *storage) Walk(ctx context.Context,
139 walker func(packID int, ref blob.Ref, offset int64, size uint32) error) error {
140
141 verbose := ctxGetVerbose(ctx)
142
143 for i := 0; i >= 0; i++ {
144 fh, err := os.Open(s.filename(i))
145 if err != nil {
146 if os.IsNotExist(err) {
147 break
148 }
149 return err
150 }
151 fh.Close()
152 if err = s.walkPack(verbose, i, walker); err != nil {
153 return err
154 }
155 }
156 return nil
157 }
158
159
160
161 func (s *storage) walkPack(verbose bool, packID int,
162 walker func(packID int, ref blob.Ref, offset int64, size uint32) error) error {
163
164 fh, err := os.Open(s.filename(packID))
165 if err != nil {
166 return err
167 }
168 defer fh.Close()
169 name := fh.Name()
170
171 var (
172 pos int64
173 size uint32
174 ref blob.Ref
175 )
176
177 errAt := func(prefix, suffix string) error {
178 if prefix != "" {
179 prefix = prefix + " "
180 }
181 if suffix != "" {
182 suffix = " " + suffix
183 }
184 return fmt.Errorf(prefix+"at %d (0x%x) in %q:"+suffix, pos, pos, name)
185 }
186
187 br := bufio.NewReaderSize(fh, 512)
188 for {
189 if b, err := br.ReadByte(); err != nil {
190 if errors.Is(err, io.EOF) {
191 break
192 }
193 return errAt("error while reading", err.Error())
194 } else if b != '[' {
195 return errAt(fmt.Sprintf("found byte 0x%x", b), "but '[' should be here!")
196 }
197 chunk, err := br.ReadSlice(']')
198 if err != nil {
199 if errors.Is(err, io.EOF) {
200 break
201 }
202 return errAt("error reading blob header", err.Error())
203 }
204 m := len(chunk)
205 chunk = chunk[:m-1]
206 i := bytes.IndexByte(chunk, byte(' '))
207 if i <= 0 {
208 return errAt("", fmt.Sprintf("bad header format (no space in %q)", chunk))
209 }
210 size64, err := strconv.ParseUint(string(chunk[i+1:]), 10, 32)
211 if err != nil {
212 return errAt(fmt.Sprintf("cannot parse size %q as int", chunk[i+1:]), err.Error())
213 }
214 size = uint32(size64)
215
216 if deletedBlobRef.Match(chunk[:i]) {
217 ref = blob.Ref{}
218 if verbose {
219 log.Printf("found deleted at %d", pos)
220 }
221 } else {
222 var ok bool
223 ref, ok = blob.Parse(string(chunk[:i]))
224 if !ok {
225 return errAt("", fmt.Sprintf("cannot parse %q as blobref", chunk[:i]))
226 }
227 if verbose {
228 log.Printf("found %s at %d", ref, pos)
229 }
230 }
231 if err = walker(packID, ref, pos+1+int64(m), size); err != nil {
232 return err
233 }
234
235 pos += 1 + int64(m)
236
237
238 if pos, err = fh.Seek(pos+int64(size), 0); err != nil {
239 return errAt("", "cannot seek +"+strconv.FormatUint(size64, 10)+" bytes")
240 }
241
242 _, _ = io.CopyN(io.Discard, br, int64(br.Buffered()))
243 }
244 return nil
245 }
246
247 type verboseCtxKey struct{}
248
249 func ctxGetVerbose(ctx context.Context) bool {
250 b, _ := ctx.Value(verboseCtxKey{}).(bool)
251 return b
252 }
253 func CtxSetVerbose(ctx context.Context, verbose bool) context.Context {
254 return context.WithValue(ctx, verboseCtxKey{}, verbose)
255 }