Home Download Docs Code Community
     1	//go:build linux
     2	
     3	/*
     4	Copyright 2011 The Perkeep Authors
     5	
     6	Licensed under the Apache License, Version 2.0 (the "License");
     7	you may not use this file except in compliance with the License.
     8	You may obtain a copy of the License at
     9	
    10	     http://www.apache.org/licenses/LICENSE-2.0
    11	
    12	Unless required by applicable law or agreed to in writing, software
    13	distributed under the License is distributed on an "AS IS" BASIS,
    14	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    15	See the License for the specific language governing permissions and
    16	limitations under the License.
    17	*/
    18	
    19	// Package fs implements a FUSE filesystem for Perkeep and is
    20	// used by the pk-mount binary.
    21	package fs // import "perkeep.org/pkg/fs"
    22	
    23	import (
    24		"context"
    25		"fmt"
    26		"io"
    27		"log"
    28		"os"
    29		"sync"
    30		"time"
    31	
    32		"perkeep.org/internal/lru"
    33		"perkeep.org/pkg/blob"
    34		"perkeep.org/pkg/client"
    35		"perkeep.org/pkg/schema"
    36	
    37		"bazil.org/fuse"
    38		fusefs "bazil.org/fuse/fs"
    39	)
    40	
    41	var (
    42		serverStart = time.Now()
    43		// Logger is used by the package to print all sorts of debugging statements. It
    44		// is up to the user of the package to SetOutput the Logger to reduce verbosity.
    45		Logger = log.New(os.Stderr, "PerkeepFS: ", log.LstdFlags)
    46	)
    47	
    48	type CamliFileSystem struct {
    49		fetcher blob.Fetcher
    50		client  *client.Client // or nil, if not doing search queries
    51		root    fusefs.Node
    52	
    53		// IgnoreOwners, if true, collapses all file ownership to the
    54		// uid/gid running the fuse filesystem, and sets all the
    55		// permissions to 0600/0700.
    56		IgnoreOwners bool
    57	
    58		blobToSchema *lru.Cache // ~map[blobstring]*schema.Blob
    59		nameToBlob   *lru.Cache // ~map[string]blob.Ref
    60		nameToAttr   *lru.Cache // ~map[string]*fuse.Attr
    61	}
    62	
    63	var _ fusefs.FS = (*CamliFileSystem)(nil)
    64	
    65	func newCamliFileSystem(fetcher blob.Fetcher) *CamliFileSystem {
    66		return &CamliFileSystem{
    67			fetcher:      fetcher,
    68			blobToSchema: lru.New(1024), // arbitrary; TODO: tunable/smarter?
    69			nameToBlob:   lru.New(1024), // arbitrary: TODO: tunable/smarter?
    70			nameToAttr:   lru.New(1024), // arbitrary: TODO: tunable/smarter?
    71		}
    72	}
    73	
    74	// NewDefaultCamliFileSystem returns a filesystem with a generic base, from which
    75	// users can navigate by blobref, tag, date, etc.
    76	func NewDefaultCamliFileSystem(client *client.Client, fetcher blob.Fetcher) *CamliFileSystem {
    77		if client == nil || fetcher == nil {
    78			panic("nil argument")
    79		}
    80		fs := newCamliFileSystem(fetcher)
    81		fs.root = &root{fs: fs} // root.go
    82		fs.client = client
    83		return fs
    84	}
    85	
    86	// NewRootedCamliFileSystem returns a CamliFileSystem with a node based on a blobref
    87	// as its base.
    88	func NewRootedCamliFileSystem(cli *client.Client, fetcher blob.Fetcher, root blob.Ref) (*CamliFileSystem, error) {
    89		fs := newCamliFileSystem(fetcher)
    90		fs.client = cli
    91	
    92		n, err := fs.newNodeFromBlobRef(root)
    93	
    94		if err != nil {
    95			return nil, err
    96		}
    97	
    98		fs.root = n
    99	
   100		return fs, nil
   101	}
   102	
   103	// node implements fuse.Node with a read-only Camli "file" or
   104	// "directory" blob.
   105	type node struct {
   106		fs      *CamliFileSystem
   107		blobref blob.Ref
   108	
   109		pnodeModTime time.Time // optionally set by recent.go; modtime of permanode
   110	
   111		dmu     sync.Mutex    // guards dirents. acquire before mu.
   112		dirents []fuse.Dirent // nil until populated once
   113	
   114		mu      sync.Mutex // guards rest
   115		attr    fuse.Attr
   116		meta    *schema.Blob
   117		lookMap map[string]blob.Ref
   118	}
   119	
   120	var _ fusefs.Node = (*node)(nil)
   121	
   122	func (n *node) Attr(ctx context.Context, a *fuse.Attr) error {
   123		if _, err := n.schema(ctx); err != nil {
   124			return handleEIOorEINTR(err)
   125		}
   126		*a = n.attr
   127		return nil
   128	}
   129	
   130	func (n *node) addLookupEntry(name string, ref blob.Ref) {
   131		n.mu.Lock()
   132		defer n.mu.Unlock()
   133		if n.lookMap == nil {
   134			n.lookMap = make(map[string]blob.Ref)
   135		}
   136		n.lookMap[name] = ref
   137	}
   138	
   139	var _ fusefs.NodeStringLookuper = (*node)(nil)
   140	
   141	func (n *node) Lookup(ctx context.Context, name string) (fusefs.Node, error) {
   142		if name == ".quitquitquit" {
   143			// TODO: only in dev mode
   144			log.Fatalf("Shutting down due to .quitquitquit lookup.")
   145		}
   146	
   147		// If we haven't done Readdir yet (dirents isn't set), then force a Readdir
   148		// call to populate lookMap.
   149		n.dmu.Lock()
   150		loaded := n.dirents != nil
   151		n.dmu.Unlock()
   152		if !loaded {
   153			n.ReadDirAll(ctx)
   154		}
   155	
   156		n.mu.Lock()
   157		defer n.mu.Unlock()
   158		ref, ok := n.lookMap[name]
   159		if !ok {
   160			return nil, fuse.ENOENT
   161		}
   162		return &node{fs: n.fs, blobref: ref}, nil
   163	}
   164	
   165	func (n *node) schema(ctx context.Context) (*schema.Blob, error) {
   166		n.mu.Lock()
   167		defer n.mu.Unlock()
   168		if n.meta != nil {
   169			return n.meta, nil
   170		}
   171		blob, err := n.fs.fetchSchemaMeta(ctx, n.blobref)
   172		if err == nil {
   173			n.meta = blob
   174			n.populateAttr()
   175		}
   176		return blob, err
   177	}
   178	
   179	func isWriteFlags(flags fuse.OpenFlags) bool {
   180		// TODO read/writeness are not flags, use O_ACCMODE
   181		return flags&fuse.OpenFlags(os.O_WRONLY|os.O_RDWR|os.O_APPEND|os.O_CREATE) != 0
   182	}
   183	
   184	var _ fusefs.NodeOpener = (*node)(nil)
   185	
   186	func (n *node) Open(ctx context.Context, req *fuse.OpenRequest, res *fuse.OpenResponse) (fusefs.Handle, error) {
   187		Logger.Printf("CAMLI Open on %v: %#v", n.blobref, req)
   188		if isWriteFlags(req.Flags) {
   189			return nil, fuse.EPERM
   190		}
   191		ss, err := n.schema(ctx)
   192		if err != nil {
   193			Logger.Printf("open of %v: %v", n.blobref, err)
   194			return nil, handleEIOorEINTR(err)
   195		}
   196		if ss.Type() == schema.TypeDirectory {
   197			return n, nil
   198		}
   199		fr, err := ss.NewFileReader(n.fs.fetcher)
   200		if err != nil {
   201			// Will only happen if ss.Type != "file" or "bytes"
   202			Logger.Printf("NewFileReader(%s) = %v", n.blobref, err)
   203			return nil, fuse.EIO
   204		}
   205		return &nodeReader{n: n, fr: fr}, nil
   206	}
   207	
   208	type nodeReader struct {
   209		n  *node
   210		fr *schema.FileReader
   211	}
   212	
   213	var _ fusefs.HandleReader = (*nodeReader)(nil)
   214	
   215	func (nr *nodeReader) Read(ctx context.Context, req *fuse.ReadRequest, res *fuse.ReadResponse) error {
   216		Logger.Printf("CAMLI nodeReader READ on %v: %#v", nr.n.blobref, req)
   217		if req.Offset >= nr.fr.Size() {
   218			return nil
   219		}
   220		size := req.Size
   221		if int64(size)+req.Offset >= nr.fr.Size() {
   222			size -= int((int64(size) + req.Offset) - nr.fr.Size())
   223		}
   224		buf := make([]byte, size)
   225		n, err := nr.fr.ReadAt(buf, req.Offset)
   226		if err == io.EOF {
   227			err = nil
   228		}
   229		if err != nil {
   230			Logger.Printf("camli read on %v at %d: %v", nr.n.blobref, req.Offset, err)
   231			return fuse.EIO
   232		}
   233		res.Data = buf[:n]
   234		return nil
   235	}
   236	
   237	var _ fusefs.HandleReleaser = (*nodeReader)(nil)
   238	
   239	func (nr *nodeReader) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
   240		Logger.Printf("CAMLI nodeReader RELEASE on %v", nr.n.blobref)
   241		nr.fr.Close()
   242		return nil
   243	}
   244	
   245	var _ fusefs.HandleReadDirAller = (*node)(nil)
   246	
   247	func (n *node) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
   248		Logger.Printf("CAMLI ReadDirAll on %v", n.blobref)
   249		n.dmu.Lock()
   250		defer n.dmu.Unlock()
   251		if n.dirents != nil {
   252			return n.dirents, nil
   253		}
   254	
   255		ss, err := n.schema(ctx)
   256		if err != nil {
   257			Logger.Printf("camli.ReadDirAll error on %v: %v", n.blobref, err)
   258			return nil, handleEIOorEINTR(err)
   259		}
   260		dr, err := schema.NewDirReader(ctx, n.fs.fetcher, ss.BlobRef())
   261		if err != nil {
   262			Logger.Printf("camli.ReadDirAll error on %v: %v", n.blobref, err)
   263			return nil, handleEIOorEINTR(err)
   264		}
   265		schemaEnts, err := dr.Readdir(ctx, -1)
   266		if err != nil {
   267			Logger.Printf("camli.ReadDirAll error on %v: %v", n.blobref, err)
   268			return nil, handleEIOorEINTR(err)
   269		}
   270		n.dirents = make([]fuse.Dirent, 0)
   271		for _, sent := range schemaEnts {
   272			if name := sent.FileName(); name != "" {
   273				n.addLookupEntry(name, sent.BlobRef())
   274				n.dirents = append(n.dirents, fuse.Dirent{Name: name})
   275			}
   276		}
   277		return n.dirents, nil
   278	}
   279	
   280	// populateAttr should only be called once n.ss is known to be set and
   281	// non-nil
   282	func (n *node) populateAttr() error {
   283		meta := n.meta
   284	
   285		n.attr.Mode = meta.FileMode()
   286	
   287		if n.fs.IgnoreOwners {
   288			n.attr.Uid = uint32(os.Getuid())
   289			n.attr.Gid = uint32(os.Getgid())
   290			executeBit := n.attr.Mode & 0100
   291			n.attr.Mode = (n.attr.Mode ^ n.attr.Mode.Perm()) | 0400 | executeBit
   292		} else {
   293			n.attr.Uid = uint32(meta.MapUid())
   294			n.attr.Gid = uint32(meta.MapGid())
   295		}
   296	
   297		// TODO: inode?
   298	
   299		if mt := meta.ModTime(); !mt.IsZero() {
   300			n.attr.Mtime = mt
   301		} else {
   302			n.attr.Mtime = n.pnodeModTime
   303		}
   304	
   305		switch meta.Type() {
   306		case schema.TypeFile:
   307			n.attr.Size = uint64(meta.PartsSize())
   308			n.attr.Blocks = 0 // TODO: set?
   309			n.attr.Mode |= 0400
   310		case schema.TypeDirectory:
   311			n.attr.Mode |= 0500
   312		case schema.TypeSymlink:
   313			n.attr.Mode |= 0400
   314		default:
   315			Logger.Printf("unknown attr ss.Type %q in populateAttr", meta.Type())
   316		}
   317		return nil
   318	}
   319	
   320	func (fs *CamliFileSystem) Root() (fusefs.Node, error) {
   321		return fs.root, nil
   322	}
   323	
   324	var _ fusefs.FSStatfser = (*CamliFileSystem)(nil)
   325	
   326	func (fs *CamliFileSystem) Statfs(ctx context.Context, req *fuse.StatfsRequest, res *fuse.StatfsResponse) error {
   327		// Make some stuff up, just to see if it makes "lsof" happy.
   328		res.Blocks = 1 << 35
   329		res.Bfree = 1 << 34
   330		res.Bavail = 1 << 34
   331		res.Files = 1 << 29
   332		res.Ffree = 1 << 28
   333		res.Namelen = 2048
   334		res.Bsize = 1024
   335		return nil
   336	}
   337	
   338	// Errors returned are:
   339	//
   340	//	os.ErrNotExist -- blob not found
   341	//	os.ErrInvalid -- not JSON or a camli schema blob
   342	func (fs *CamliFileSystem) fetchSchemaMeta(ctx context.Context, br blob.Ref) (*schema.Blob, error) {
   343		blobStr := br.String()
   344		if blob, ok := fs.blobToSchema.Get(blobStr); ok {
   345			return blob.(*schema.Blob), nil
   346		}
   347	
   348		rc, _, err := fs.fetcher.Fetch(ctx, br)
   349		if err != nil {
   350			return nil, err
   351		}
   352		defer rc.Close()
   353		blob, err := schema.BlobFromReader(br, rc)
   354		if err != nil {
   355			Logger.Printf("Error parsing %s as schema blob: %v", br, err)
   356			return nil, os.ErrInvalid
   357		}
   358		if blob.Type() == "" {
   359			Logger.Printf("blob %s is JSON but lacks camliType", br)
   360			return nil, os.ErrInvalid
   361		}
   362		fs.blobToSchema.Add(blobStr, blob)
   363		return blob, nil
   364	}
   365	
   366	// consolated logic for determining a node to mount based on an arbitrary blobref
   367	func (fs *CamliFileSystem) newNodeFromBlobRef(root blob.Ref) (fusefs.Node, error) {
   368		blob, err := fs.fetchSchemaMeta(context.TODO(), root)
   369		if err != nil {
   370			return nil, err
   371		}
   372	
   373		switch blob.Type() {
   374		case schema.TypeDirectory:
   375			n := &node{fs: fs, blobref: root, meta: blob}
   376			n.populateAttr()
   377			return n, nil
   378	
   379		case schema.TypePermanode:
   380			// other mutDirs listed in the default filesystem have names and are displayed
   381			return &mutDir{fs: fs, permanode: root, name: "-", children: make(map[string]mutFileOrDir)}, nil
   382		}
   383	
   384		return nil, fmt.Errorf("Blobref must be of a directory or permanode got a %v", blob.Type())
   385	}
   386	
   387	type notImplementDirNode struct{}
   388	
   389	var _ fusefs.Node = (*notImplementDirNode)(nil)
   390	
   391	func (notImplementDirNode) Attr(ctx context.Context, a *fuse.Attr) error {
   392		a.Mode = os.ModeDir | 0000
   393		a.Uid = uint32(os.Getuid())
   394		a.Gid = uint32(os.Getgid())
   395		return nil
   396	}
   397	
   398	type staticFileNode string
   399	
   400	var _ fusefs.Node = (*notImplementDirNode)(nil)
   401	
   402	func (s staticFileNode) Attr(ctx context.Context, a *fuse.Attr) error {
   403		a.Mode = 0400
   404		a.Uid = uint32(os.Getuid())
   405		a.Gid = uint32(os.Getgid())
   406		a.Size = uint64(len(s))
   407		a.Mtime = serverStart
   408		a.Ctime = serverStart
   409		return nil
   410	}
   411	
   412	var _ fusefs.HandleReader = (*staticFileNode)(nil)
   413	
   414	func (s staticFileNode) Read(ctx context.Context, req *fuse.ReadRequest, res *fuse.ReadResponse) error {
   415		if req.Offset > int64(len(s)) {
   416			return nil
   417		}
   418		s = s[req.Offset:]
   419		size := req.Size
   420		if size > len(s) {
   421			size = len(s)
   422		}
   423		res.Data = make([]byte, size)
   424		copy(res.Data, s)
   425		return nil
   426	}
Website layout inspired by memcached.
Content by the authors.