1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package files
18
19 import (
20 "context"
21 "fmt"
22 "log"
23 "os"
24 "path/filepath"
25 "sort"
26 "strings"
27 "sync"
28
29 "perkeep.org/pkg/blob"
30 )
31
32 type readBlobRequest struct {
33 ch chan<- blob.SizedRef
34 after string
35 remain *int
36 dirRoot string
37
38
39 blobPrefix, pathInto string
40 }
41
42 type enumerateError struct {
43 msg string
44 err error
45 }
46
47 func (ee *enumerateError) Error() string {
48 return fmt.Sprintf("files enumerate error: %s: %v", ee.msg, ee.err)
49 }
50
51
52 func (ds *Storage) readBlobs(ctx context.Context, opts readBlobRequest) error {
53 dirFullPath := filepath.Join(opts.dirRoot, opts.pathInto)
54 names, err := ds.fs.ReadDirNames(dirFullPath)
55 if err != nil {
56 return &enumerateError{"readdirnames of " + dirFullPath, err}
57 }
58 if len(names) == 0 {
59
60 if strings.Contains(dirFullPath, "queue-") &&
61 !strings.Contains(filepath.Base(dirFullPath), "queue-") {
62 go ds.tryRemoveDir(dirFullPath)
63 }
64 return nil
65 }
66 sort.Strings(names)
67 stat := make(map[string]*future)
68
69 var toStat []func()
70 for _, name := range names {
71 if skipDir(name) || isShardDir(name) {
72 continue
73 }
74 fullFile := filepath.Join(dirFullPath, name)
75 f := newFuture(func() (os.FileInfo, error) {
76 fi, err := ds.fs.Stat(fullFile)
77 if err != nil {
78 return nil, &enumerateError{"stat", err}
79 }
80 return fi, nil
81 })
82 stat[name] = f
83 toStat = append(toStat, f.ForceLoad)
84 }
85
86
87 go func() {
88 for _, f := range toStat {
89 ds.statGate.Start()
90 f := f
91 go func() {
92 ds.statGate.Done()
93 f()
94 }()
95 }
96 }()
97
98 for _, name := range names {
99 if *opts.remain == 0 {
100 return nil
101 }
102 if skipDir(name) {
103 continue
104 }
105
106 isDir := isShardDir(name)
107 if !isDir {
108 fi, err := stat[name].Get()
109 if err != nil {
110 return err
111 }
112 isDir = fi.IsDir()
113 }
114
115 if isDir {
116 var newBlobPrefix string
117 if opts.blobPrefix == "" {
118 newBlobPrefix = name + "-"
119 } else {
120 newBlobPrefix = opts.blobPrefix + name
121 }
122 if len(opts.after) > 0 {
123 compareLen := len(newBlobPrefix)
124 if len(opts.after) < compareLen {
125 compareLen = len(opts.after)
126 }
127 if newBlobPrefix[:compareLen] < opts.after[:compareLen] {
128 continue
129 }
130 }
131 ropts := opts
132 ropts.blobPrefix = newBlobPrefix
133 ropts.pathInto = opts.pathInto + "/" + name
134 if err := ds.readBlobs(ctx, ropts); err != nil {
135 return err
136 }
137 continue
138 }
139
140 if !strings.HasSuffix(name, ".dat") {
141 continue
142 }
143
144 fi, err := stat[name].Get()
145 if err != nil {
146 return err
147 }
148
149 if !fi.IsDir() {
150 blobName := strings.TrimSuffix(name, ".dat")
151 if blobName <= opts.after {
152 continue
153 }
154 if blobRef, ok := blob.Parse(blobName); ok {
155 select {
156 case opts.ch <- blob.SizedRef{Ref: blobRef, Size: uint32(fi.Size())}:
157 (*opts.remain)--
158 case <-ctx.Done():
159 return ctx.Err()
160 }
161 }
162 }
163 }
164
165 return nil
166 }
167
168 func (ds *Storage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
169 defer close(dest)
170 if limit == 0 {
171 log.Printf("Warning: files.EnumerateBlobs called with a limit of 0")
172 }
173
174 limitMutable := limit
175 return ds.readBlobs(ctx, readBlobRequest{
176 ch: dest,
177 dirRoot: ds.root,
178 after: after,
179 remain: &limitMutable,
180 })
181 }
182
183 func skipDir(name string) bool {
184
185
186
187
188 return name == "partition" || name == "cache" || name == "packed"
189 }
190
191 func isShardDir(name string) bool {
192 return len(name) == 2 && isHex(name[0]) && isHex(name[1])
193 }
194
195 func isHex(b byte) bool {
196 return ('0' <= b && b <= '9') || ('a' <= b && b <= 'f')
197 }
198
199
200 type future struct {
201 once sync.Once
202 f func() (os.FileInfo, error)
203 v os.FileInfo
204 err error
205 }
206
207 func newFuture(f func() (os.FileInfo, error)) *future {
208 return &future{f: f}
209 }
210
211 func (f *future) Get() (os.FileInfo, error) {
212 f.once.Do(f.run)
213 return f.v, f.err
214 }
215
216 func (f *future) ForceLoad() {
217 f.Get()
218 }
219
220 func (f *future) run() { f.v, f.err = f.f() }