1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 18 19 20 21 22
23 package files
24
25 import (
26 "context"
27 "fmt"
28 "io"
29 "math"
30 "os"
31 "path/filepath"
32 "sync"
33
34 "perkeep.org/pkg/blob"
35 "perkeep.org/pkg/blobserver"
36
37 "go4.org/syncutil"
38 )
39
40
41
42
43
44 type VFS interface {
45 Remove(string) error
46 RemoveDir(string) error
47 Stat(string) (os.FileInfo, error)
48 Lstat(string) (os.FileInfo, error)
49 Open(string) (ReadableFile, error)
50 MkdirAll(path string, perm os.FileMode) error
51
52
53 Rename(oldname, newname string) error
54
55
56 TempFile(dir, prefix string) (WritableFile, error)
57
58 ReadDirNames(dir string) ([]string, error)
59 }
60
61
62
63 type WritableFile interface {
64 io.Writer
65 io.Closer
66
67
68 Name() string
69
70 Sync() error
71 }
72
73
74
75 type ReadableFile interface {
76 io.Reader
77 io.Seeker
78 io.Closer
79 }
80
81 type Storage struct {
82 fs VFS
83 root string
84
85
86
87 dirLockMu *sync.RWMutex
88
89
90 statGate *syncutil.Gate
91
92
93
94
95
96 tmpFileGate *syncutil.Gate
97 }
98
99
100
101 func (s *Storage) SetNewFileGate(g *syncutil.Gate) { s.tmpFileGate = g }
102
103 func NewStorage(fs VFS, root string) *Storage {
104 return &Storage{
105 fs: fs,
106 root: root,
107 dirLockMu: new(sync.RWMutex),
108 statGate: syncutil.NewGate(10),
109 }
110 }
111
112 func (ds *Storage) tryRemoveDir(dir string) {
113 ds.dirLockMu.Lock()
114 defer ds.dirLockMu.Unlock()
115 ds.fs.RemoveDir(dir)
116 }
117
118 func (ds *Storage) Fetch(ctx context.Context, br blob.Ref) (io.ReadCloser, uint32, error) {
119 return ds.fetch(ctx, br, 0, -1)
120 }
121
122 func (ds *Storage) SubFetch(ctx context.Context, br blob.Ref, offset, length int64) (io.ReadCloser, error) {
123 if offset < 0 || length < 0 {
124 return nil, blob.ErrNegativeSubFetch
125 }
126 rc, _, err := ds.fetch(ctx, br, offset, length)
127 return rc, err
128 }
129
130
131 func u32(n int64) uint32 {
132 if n < 0 || n > math.MaxUint32 {
133 panic("bad size " + fmt.Sprint(n))
134 }
135 return uint32(n)
136 }
137
138
139 func (ds *Storage) fetch(ctx context.Context, br blob.Ref, offset, length int64) (rc io.ReadCloser, size uint32, err error) {
140
141 fileName := ds.blobPath(br)
142 stat, err := ds.fs.Stat(fileName)
143 if os.IsNotExist(err) {
144 return nil, 0, os.ErrNotExist
145 }
146 size = u32(stat.Size())
147 file, err := ds.fs.Open(fileName)
148 if err != nil {
149 if os.IsNotExist(err) {
150 err = os.ErrNotExist
151 }
152 return nil, 0, err
153 }
154
155 if length < 0 && offset == 0 {
156 return file, size, nil
157 }
158
159 if offset < 0 || offset > stat.Size() {
160 if offset < 0 {
161 return nil, 0, blob.ErrNegativeSubFetch
162 }
163 return nil, 0, blob.ErrOutOfRangeOffsetSubFetch
164 }
165 if offset != 0 {
166 if at, err := file.Seek(offset, io.SeekStart); err != nil || at != offset {
167 file.Close()
168 return nil, 0, fmt.Errorf("localdisk: error seeking to %d: got %v, %v", offset, at, err)
169 }
170 }
171 return struct {
172 io.Reader
173 io.Closer
174 }{
175 Reader: io.LimitReader(file, length),
176 Closer: file,
177 }, 0 , nil
178 }
179
180 func (ds *Storage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
181 for _, blob := range blobs {
182 fileName := ds.blobPath(blob)
183 err := ds.fs.Remove(fileName)
184 switch {
185 case err == nil:
186 continue
187 case os.IsNotExist(err):
188
189 continue
190 default:
191 return err
192 }
193 }
194 return nil
195 }
196
197 func blobFileBaseName(b blob.Ref) string {
198 return fmt.Sprintf("%s-%s.dat", b.HashName(), b.Digest())
199 }
200
201 func (ds *Storage) blobDirectory(b blob.Ref) string {
202 d := b.Digest()
203 if len(d) < 4 {
204 d = d + "____"
205 }
206 return filepath.Join(ds.root, b.HashName(), d[0:2], d[2:4])
207 }
208
209 func (ds *Storage) blobPath(b blob.Ref) string {
210 return filepath.Join(ds.blobDirectory(b), blobFileBaseName(b))
211 }
212
213 const maxParallelStats = 20
214
215 var statGate = syncutil.NewGate(maxParallelStats)
216
217 func (ds *Storage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
218 return blobserver.StatBlobsParallelHelper(ctx, blobs, fn, statGate, func(ref blob.Ref) (sb blob.SizedRef, err error) {
219 fi, err := ds.fs.Stat(ds.blobPath(ref))
220 switch {
221 case err == nil && fi.Mode().IsRegular():
222 return blob.SizedRef{Ref: ref, Size: u32(fi.Size())}, nil
223 case err != nil && !os.IsNotExist(err):
224 return sb, err
225 }
226 return sb, nil
227
228 })
229 }