Home Download Docs Code Community
     1	/*
     2	Copyright 2018 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	/*
    18	Package files implements the blobserver interface by storing each blob
    19	in its own file in nested directories. Users don't use the "files"
    20	type directly; it's used by "localdisk" and in the future "sftp" and
    21	"webdav".
    22	*/
    23	package files // import "perkeep.org/pkg/blobserver/files"
    24	
    25	import (
    26		"context"
    27		"fmt"
    28		"io"
    29		"math"
    30		"os"
    31		"path/filepath"
    32		"sync"
    33	
    34		"perkeep.org/pkg/blob"
    35		"perkeep.org/pkg/blobserver"
    36	
    37		"go4.org/syncutil"
    38	)
    39	
    40	// VFS describes the virtual filesystem needed by this package.
    41	//
    42	// It is currently not possible to use this from other packages, but
    43	// that will change.
    44	type VFS interface {
    45		Remove(string) error // files, not directories
    46		RemoveDir(string) error
    47		Stat(string) (os.FileInfo, error)
    48		Lstat(string) (os.FileInfo, error)
    49		Open(string) (ReadableFile, error)
    50		MkdirAll(path string, perm os.FileMode) error
    51	
    52		// Rename is a POSIX-style rename, overwriting newname if it exists.
    53		Rename(oldname, newname string) error
    54	
    55		// TempFile should behave like os.CreateTemp
    56		TempFile(dir, prefix string) (WritableFile, error)
    57	
    58		ReadDirNames(dir string) ([]string, error)
    59	}
    60	
    61	// WritableFile is the interface required by files opened for Write
    62	// from VFS.TempFile.
    63	type WritableFile interface {
    64		io.Writer
    65		io.Closer
    66		// Name returns the full path to the file.
    67		// It should behave like (*os.File).Name.
    68		Name() string
    69		// Sync fsyncs the file, like (*os.File).Sync.
    70		Sync() error
    71	}
    72	
    73	// ReadableFile is the interface required by read-only files
    74	// returned by VFS.Open.
    75	type ReadableFile interface {
    76		io.Reader
    77		io.Seeker
    78		io.Closer
    79	}
    80	
    81	type Storage struct {
    82		fs   VFS
    83		root string
    84	
    85		// dirLockMu must be held for writing when deleting an empty directory
    86		// and for read when receiving blobs.
    87		dirLockMu *sync.RWMutex
    88	
    89		// statGate limits how many pending Stat calls we have in flight.
    90		statGate *syncutil.Gate
    91	
    92		// tmpFileGate limits the number of temporary files open at the same
    93		// time, so we don't run into the max set by ulimit. It is nil on
    94		// systems (Windows) where we don't know the maximum number of open
    95		// file descriptors.
    96		tmpFileGate *syncutil.Gate
    97	}
    98	
    99	// SetNewFileGate sets a gate (counting semaphore) on the number of new files
   100	// that may be opened for writing at a time.
   101	func (s *Storage) SetNewFileGate(g *syncutil.Gate) { s.tmpFileGate = g }
   102	
   103	func NewStorage(fs VFS, root string) *Storage {
   104		return &Storage{
   105			fs:        fs,
   106			root:      root,
   107			dirLockMu: new(sync.RWMutex),
   108			statGate:  syncutil.NewGate(10), // arbitrary, but bounded; be more clever later?
   109		}
   110	}
   111	
   112	func (ds *Storage) tryRemoveDir(dir string) {
   113		ds.dirLockMu.Lock()
   114		defer ds.dirLockMu.Unlock()
   115		ds.fs.RemoveDir(dir) // ignore error
   116	}
   117	
   118	func (ds *Storage) Fetch(ctx context.Context, br blob.Ref) (io.ReadCloser, uint32, error) {
   119		return ds.fetch(ctx, br, 0, -1)
   120	}
   121	
   122	func (ds *Storage) SubFetch(ctx context.Context, br blob.Ref, offset, length int64) (io.ReadCloser, error) {
   123		if offset < 0 || length < 0 {
   124			return nil, blob.ErrNegativeSubFetch
   125		}
   126		rc, _, err := ds.fetch(ctx, br, offset, length)
   127		return rc, err
   128	}
   129	
   130	// u32 converts n to an uint32, or panics if n is out of range
   131	func u32(n int64) uint32 {
   132		if n < 0 || n > math.MaxUint32 {
   133			panic("bad size " + fmt.Sprint(n))
   134		}
   135		return uint32(n)
   136	}
   137	
   138	// length -1 means entire file
   139	func (ds *Storage) fetch(ctx context.Context, br blob.Ref, offset, length int64) (rc io.ReadCloser, size uint32, err error) {
   140		// TODO: use ctx, if the os package ever supports that.
   141		fileName := ds.blobPath(br)
   142		stat, err := ds.fs.Stat(fileName)
   143		if os.IsNotExist(err) {
   144			return nil, 0, os.ErrNotExist
   145		}
   146		size = u32(stat.Size())
   147		file, err := ds.fs.Open(fileName)
   148		if err != nil {
   149			if os.IsNotExist(err) {
   150				err = os.ErrNotExist
   151			}
   152			return nil, 0, err
   153		}
   154		// normal Fetch
   155		if length < 0 && offset == 0 {
   156			return file, size, nil
   157		}
   158		// SubFetch:
   159		if offset < 0 || offset > stat.Size() {
   160			if offset < 0 {
   161				return nil, 0, blob.ErrNegativeSubFetch
   162			}
   163			return nil, 0, blob.ErrOutOfRangeOffsetSubFetch
   164		}
   165		if offset != 0 {
   166			if at, err := file.Seek(offset, io.SeekStart); err != nil || at != offset {
   167				file.Close()
   168				return nil, 0, fmt.Errorf("localdisk: error seeking to %d: got %v, %v", offset, at, err)
   169			}
   170		}
   171		return struct {
   172			io.Reader
   173			io.Closer
   174		}{
   175			Reader: io.LimitReader(file, length),
   176			Closer: file,
   177		}, 0 /* unused */, nil
   178	}
   179	
   180	func (ds *Storage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
   181		for _, blob := range blobs {
   182			fileName := ds.blobPath(blob)
   183			err := ds.fs.Remove(fileName)
   184			switch {
   185			case err == nil:
   186				continue
   187			case os.IsNotExist(err):
   188				// deleting already-deleted file; harmless.
   189				continue
   190			default:
   191				return err
   192			}
   193		}
   194		return nil
   195	}
   196	
   197	func blobFileBaseName(b blob.Ref) string {
   198		return fmt.Sprintf("%s-%s.dat", b.HashName(), b.Digest())
   199	}
   200	
   201	func (ds *Storage) blobDirectory(b blob.Ref) string {
   202		d := b.Digest()
   203		if len(d) < 4 {
   204			d = d + "____"
   205		}
   206		return filepath.Join(ds.root, b.HashName(), d[0:2], d[2:4])
   207	}
   208	
   209	func (ds *Storage) blobPath(b blob.Ref) string {
   210		return filepath.Join(ds.blobDirectory(b), blobFileBaseName(b))
   211	}
   212	
   213	const maxParallelStats = 20
   214	
   215	var statGate = syncutil.NewGate(maxParallelStats)
   216	
   217	func (ds *Storage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
   218		return blobserver.StatBlobsParallelHelper(ctx, blobs, fn, statGate, func(ref blob.Ref) (sb blob.SizedRef, err error) {
   219			fi, err := ds.fs.Stat(ds.blobPath(ref))
   220			switch {
   221			case err == nil && fi.Mode().IsRegular():
   222				return blob.SizedRef{Ref: ref, Size: u32(fi.Size())}, nil
   223			case err != nil && !os.IsNotExist(err):
   224				return sb, err
   225			}
   226			return sb, nil
   227	
   228		})
   229	}
Website layout inspired by memcached.
Content by the authors.