1
2
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
18
19
20
21 package fs
22
23 import (
24 "context"
25 "fmt"
26 "io"
27 "log"
28 "os"
29 "sync"
30 "time"
31
32 "perkeep.org/internal/lru"
33 "perkeep.org/pkg/blob"
34 "perkeep.org/pkg/client"
35 "perkeep.org/pkg/schema"
36
37 "bazil.org/fuse"
38 fusefs "bazil.org/fuse/fs"
39 )
40
41 var (
42 serverStart = time.Now()
43
44
45 Logger = log.New(os.Stderr, "PerkeepFS: ", log.LstdFlags)
46 )
47
48 type CamliFileSystem struct {
49 fetcher blob.Fetcher
50 client *client.Client
51 root fusefs.Node
52
53
54
55
56 IgnoreOwners bool
57
58 blobToSchema *lru.Cache
59 nameToBlob *lru.Cache
60 nameToAttr *lru.Cache
61 }
62
63 var _ fusefs.FS = (*CamliFileSystem)(nil)
64
65 func newCamliFileSystem(fetcher blob.Fetcher) *CamliFileSystem {
66 return &CamliFileSystem{
67 fetcher: fetcher,
68 blobToSchema: lru.New(1024),
69 nameToBlob: lru.New(1024),
70 nameToAttr: lru.New(1024),
71 }
72 }
73
74
75
76 func NewDefaultCamliFileSystem(client *client.Client, fetcher blob.Fetcher) *CamliFileSystem {
77 if client == nil || fetcher == nil {
78 panic("nil argument")
79 }
80 fs := newCamliFileSystem(fetcher)
81 fs.root = &root{fs: fs}
82 fs.client = client
83 return fs
84 }
85
86
87
88 func NewRootedCamliFileSystem(cli *client.Client, fetcher blob.Fetcher, root blob.Ref) (*CamliFileSystem, error) {
89 fs := newCamliFileSystem(fetcher)
90 fs.client = cli
91
92 n, err := fs.newNodeFromBlobRef(root)
93
94 if err != nil {
95 return nil, err
96 }
97
98 fs.root = n
99
100 return fs, nil
101 }
102
103
104
105 type node struct {
106 fs *CamliFileSystem
107 blobref blob.Ref
108
109 pnodeModTime time.Time
110
111 dmu sync.Mutex
112 dirents []fuse.Dirent
113
114 mu sync.Mutex
115 attr fuse.Attr
116 meta *schema.Blob
117 lookMap map[string]blob.Ref
118 }
119
120 var _ fusefs.Node = (*node)(nil)
121
122 func (n *node) Attr(ctx context.Context, a *fuse.Attr) error {
123 if _, err := n.schema(ctx); err != nil {
124 return handleEIOorEINTR(err)
125 }
126 *a = n.attr
127 return nil
128 }
129
130 func (n *node) addLookupEntry(name string, ref blob.Ref) {
131 n.mu.Lock()
132 defer n.mu.Unlock()
133 if n.lookMap == nil {
134 n.lookMap = make(map[string]blob.Ref)
135 }
136 n.lookMap[name] = ref
137 }
138
139 var _ fusefs.NodeStringLookuper = (*node)(nil)
140
141 func (n *node) Lookup(ctx context.Context, name string) (fusefs.Node, error) {
142 if name == ".quitquitquit" {
143
144 log.Fatalf("Shutting down due to .quitquitquit lookup.")
145 }
146
147
148
149 n.dmu.Lock()
150 loaded := n.dirents != nil
151 n.dmu.Unlock()
152 if !loaded {
153 n.ReadDirAll(ctx)
154 }
155
156 n.mu.Lock()
157 defer n.mu.Unlock()
158 ref, ok := n.lookMap[name]
159 if !ok {
160 return nil, fuse.ENOENT
161 }
162 return &node{fs: n.fs, blobref: ref}, nil
163 }
164
165 func (n *node) schema(ctx context.Context) (*schema.Blob, error) {
166 n.mu.Lock()
167 defer n.mu.Unlock()
168 if n.meta != nil {
169 return n.meta, nil
170 }
171 blob, err := n.fs.fetchSchemaMeta(ctx, n.blobref)
172 if err == nil {
173 n.meta = blob
174 n.populateAttr()
175 }
176 return blob, err
177 }
178
179 func isWriteFlags(flags fuse.OpenFlags) bool {
180
181 return flags&fuse.OpenFlags(os.O_WRONLY|os.O_RDWR|os.O_APPEND|os.O_CREATE) != 0
182 }
183
184 var _ fusefs.NodeOpener = (*node)(nil)
185
186 func (n *node) Open(ctx context.Context, req *fuse.OpenRequest, res *fuse.OpenResponse) (fusefs.Handle, error) {
187 Logger.Printf("CAMLI Open on %v: %#v", n.blobref, req)
188 if isWriteFlags(req.Flags) {
189 return nil, fuse.EPERM
190 }
191 ss, err := n.schema(ctx)
192 if err != nil {
193 Logger.Printf("open of %v: %v", n.blobref, err)
194 return nil, handleEIOorEINTR(err)
195 }
196 if ss.Type() == schema.TypeDirectory {
197 return n, nil
198 }
199 fr, err := ss.NewFileReader(n.fs.fetcher)
200 if err != nil {
201
202 Logger.Printf("NewFileReader(%s) = %v", n.blobref, err)
203 return nil, fuse.EIO
204 }
205 return &nodeReader{n: n, fr: fr}, nil
206 }
207
208 type nodeReader struct {
209 n *node
210 fr *schema.FileReader
211 }
212
213 var _ fusefs.HandleReader = (*nodeReader)(nil)
214
215 func (nr *nodeReader) Read(ctx context.Context, req *fuse.ReadRequest, res *fuse.ReadResponse) error {
216 Logger.Printf("CAMLI nodeReader READ on %v: %#v", nr.n.blobref, req)
217 if req.Offset >= nr.fr.Size() {
218 return nil
219 }
220 size := req.Size
221 if int64(size)+req.Offset >= nr.fr.Size() {
222 size -= int((int64(size) + req.Offset) - nr.fr.Size())
223 }
224 buf := make([]byte, size)
225 n, err := nr.fr.ReadAt(buf, req.Offset)
226 if err == io.EOF {
227 err = nil
228 }
229 if err != nil {
230 Logger.Printf("camli read on %v at %d: %v", nr.n.blobref, req.Offset, err)
231 return fuse.EIO
232 }
233 res.Data = buf[:n]
234 return nil
235 }
236
237 var _ fusefs.HandleReleaser = (*nodeReader)(nil)
238
239 func (nr *nodeReader) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
240 Logger.Printf("CAMLI nodeReader RELEASE on %v", nr.n.blobref)
241 nr.fr.Close()
242 return nil
243 }
244
245 var _ fusefs.HandleReadDirAller = (*node)(nil)
246
247 func (n *node) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
248 Logger.Printf("CAMLI ReadDirAll on %v", n.blobref)
249 n.dmu.Lock()
250 defer n.dmu.Unlock()
251 if n.dirents != nil {
252 return n.dirents, nil
253 }
254
255 ss, err := n.schema(ctx)
256 if err != nil {
257 Logger.Printf("camli.ReadDirAll error on %v: %v", n.blobref, err)
258 return nil, handleEIOorEINTR(err)
259 }
260 dr, err := schema.NewDirReader(ctx, n.fs.fetcher, ss.BlobRef())
261 if err != nil {
262 Logger.Printf("camli.ReadDirAll error on %v: %v", n.blobref, err)
263 return nil, handleEIOorEINTR(err)
264 }
265 schemaEnts, err := dr.Readdir(ctx, -1)
266 if err != nil {
267 Logger.Printf("camli.ReadDirAll error on %v: %v", n.blobref, err)
268 return nil, handleEIOorEINTR(err)
269 }
270 n.dirents = make([]fuse.Dirent, 0)
271 for _, sent := range schemaEnts {
272 if name := sent.FileName(); name != "" {
273 n.addLookupEntry(name, sent.BlobRef())
274 n.dirents = append(n.dirents, fuse.Dirent{Name: name})
275 }
276 }
277 return n.dirents, nil
278 }
279
280
281
282 func (n *node) populateAttr() error {
283 meta := n.meta
284
285 n.attr.Mode = meta.FileMode()
286
287 if n.fs.IgnoreOwners {
288 n.attr.Uid = uint32(os.Getuid())
289 n.attr.Gid = uint32(os.Getgid())
290 executeBit := n.attr.Mode & 0100
291 n.attr.Mode = (n.attr.Mode ^ n.attr.Mode.Perm()) | 0400 | executeBit
292 } else {
293 n.attr.Uid = uint32(meta.MapUid())
294 n.attr.Gid = uint32(meta.MapGid())
295 }
296
297
298
299 if mt := meta.ModTime(); !mt.IsZero() {
300 n.attr.Mtime = mt
301 } else {
302 n.attr.Mtime = n.pnodeModTime
303 }
304
305 switch meta.Type() {
306 case schema.TypeFile:
307 n.attr.Size = uint64(meta.PartsSize())
308 n.attr.Blocks = 0
309 n.attr.Mode |= 0400
310 case schema.TypeDirectory:
311 n.attr.Mode |= 0500
312 case schema.TypeSymlink:
313 n.attr.Mode |= 0400
314 default:
315 Logger.Printf("unknown attr ss.Type %q in populateAttr", meta.Type())
316 }
317 return nil
318 }
319
320 func (fs *CamliFileSystem) Root() (fusefs.Node, error) {
321 return fs.root, nil
322 }
323
324 var _ fusefs.FSStatfser = (*CamliFileSystem)(nil)
325
326 func (fs *CamliFileSystem) Statfs(ctx context.Context, req *fuse.StatfsRequest, res *fuse.StatfsResponse) error {
327
328 res.Blocks = 1 << 35
329 res.Bfree = 1 << 34
330 res.Bavail = 1 << 34
331 res.Files = 1 << 29
332 res.Ffree = 1 << 28
333 res.Namelen = 2048
334 res.Bsize = 1024
335 return nil
336 }
337
338
339
340
341
342 func (fs *CamliFileSystem) fetchSchemaMeta(ctx context.Context, br blob.Ref) (*schema.Blob, error) {
343 blobStr := br.String()
344 if blob, ok := fs.blobToSchema.Get(blobStr); ok {
345 return blob.(*schema.Blob), nil
346 }
347
348 rc, _, err := fs.fetcher.Fetch(ctx, br)
349 if err != nil {
350 return nil, err
351 }
352 defer rc.Close()
353 blob, err := schema.BlobFromReader(br, rc)
354 if err != nil {
355 Logger.Printf("Error parsing %s as schema blob: %v", br, err)
356 return nil, os.ErrInvalid
357 }
358 if blob.Type() == "" {
359 Logger.Printf("blob %s is JSON but lacks camliType", br)
360 return nil, os.ErrInvalid
361 }
362 fs.blobToSchema.Add(blobStr, blob)
363 return blob, nil
364 }
365
366
367 func (fs *CamliFileSystem) newNodeFromBlobRef(root blob.Ref) (fusefs.Node, error) {
368 blob, err := fs.fetchSchemaMeta(context.TODO(), root)
369 if err != nil {
370 return nil, err
371 }
372
373 switch blob.Type() {
374 case schema.TypeDirectory:
375 n := &node{fs: fs, blobref: root, meta: blob}
376 n.populateAttr()
377 return n, nil
378
379 case schema.TypePermanode:
380
381 return &mutDir{fs: fs, permanode: root, name: "-", children: make(map[string]mutFileOrDir)}, nil
382 }
383
384 return nil, fmt.Errorf("Blobref must be of a directory or permanode got a %v", blob.Type())
385 }
386
387 type notImplementDirNode struct{}
388
389 var _ fusefs.Node = (*notImplementDirNode)(nil)
390
391 func (notImplementDirNode) Attr(ctx context.Context, a *fuse.Attr) error {
392 a.Mode = os.ModeDir | 0000
393 a.Uid = uint32(os.Getuid())
394 a.Gid = uint32(os.Getgid())
395 return nil
396 }
397
398 type staticFileNode string
399
400 var _ fusefs.Node = (*notImplementDirNode)(nil)
401
402 func (s staticFileNode) Attr(ctx context.Context, a *fuse.Attr) error {
403 a.Mode = 0400
404 a.Uid = uint32(os.Getuid())
405 a.Gid = uint32(os.Getgid())
406 a.Size = uint64(len(s))
407 a.Mtime = serverStart
408 a.Ctime = serverStart
409 return nil
410 }
411
412 var _ fusefs.HandleReader = (*staticFileNode)(nil)
413
414 func (s staticFileNode) Read(ctx context.Context, req *fuse.ReadRequest, res *fuse.ReadResponse) error {
415 if req.Offset > int64(len(s)) {
416 return nil
417 }
418 s = s[req.Offset:]
419 size := req.Size
420 if size > len(s) {
421 size = len(s)
422 }
423 res.Data = make([]byte, size)
424 copy(res.Data, s)
425 return nil
426 }