Home Download Docs Code Community
     1	/*
     2	Copyright 2011 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	package files
    18	
    19	import (
    20		"context"
    21		"fmt"
    22		"log"
    23		"os"
    24		"path/filepath"
    25		"sort"
    26		"strings"
    27		"sync"
    28	
    29		"perkeep.org/pkg/blob"
    30	)
    31	
    32	type readBlobRequest struct {
    33		ch      chan<- blob.SizedRef
    34		after   string
    35		remain  *int // limit countdown
    36		dirRoot string
    37	
    38		// Not used on initial request, only on recursion
    39		blobPrefix, pathInto string
    40	}
    41	
    42	type enumerateError struct {
    43		msg string
    44		err error
    45	}
    46	
    47	func (ee *enumerateError) Error() string {
    48		return fmt.Sprintf("files enumerate error: %s: %v", ee.msg, ee.err)
    49	}
    50	
    51	// readBlobs implements EnumerateBlobs. It calls itself recursively on subdirectories.
    52	func (ds *Storage) readBlobs(ctx context.Context, opts readBlobRequest) error {
    53		dirFullPath := filepath.Join(opts.dirRoot, opts.pathInto)
    54		names, err := ds.fs.ReadDirNames(dirFullPath)
    55		if err != nil {
    56			return &enumerateError{"readdirnames of " + dirFullPath, err}
    57		}
    58		if len(names) == 0 {
    59			// remove empty blob dir if we are in a queue but not the queue root itself
    60			if strings.Contains(dirFullPath, "queue-") &&
    61				!strings.Contains(filepath.Base(dirFullPath), "queue-") {
    62				go ds.tryRemoveDir(dirFullPath)
    63			}
    64			return nil
    65		}
    66		sort.Strings(names)
    67		stat := make(map[string]*future) // name -> future<os.FileInfo>
    68	
    69		var toStat []func()
    70		for _, name := range names {
    71			if skipDir(name) || isShardDir(name) {
    72				continue
    73			}
    74			fullFile := filepath.Join(dirFullPath, name)
    75			f := newFuture(func() (os.FileInfo, error) {
    76				fi, err := ds.fs.Stat(fullFile)
    77				if err != nil {
    78					return nil, &enumerateError{"stat", err}
    79				}
    80				return fi, nil
    81			})
    82			stat[name] = f
    83			toStat = append(toStat, f.ForceLoad)
    84		}
    85	
    86		// Start pre-statting things.
    87		go func() {
    88			for _, f := range toStat {
    89				ds.statGate.Start()
    90				f := f
    91				go func() {
    92					ds.statGate.Done()
    93					f()
    94				}()
    95			}
    96		}()
    97	
    98		for _, name := range names {
    99			if *opts.remain == 0 {
   100				return nil
   101			}
   102			if skipDir(name) {
   103				continue
   104			}
   105	
   106			isDir := isShardDir(name)
   107			if !isDir {
   108				fi, err := stat[name].Get()
   109				if err != nil {
   110					return err
   111				}
   112				isDir = fi.IsDir()
   113			}
   114	
   115			if isDir {
   116				var newBlobPrefix string
   117				if opts.blobPrefix == "" {
   118					newBlobPrefix = name + "-"
   119				} else {
   120					newBlobPrefix = opts.blobPrefix + name
   121				}
   122				if len(opts.after) > 0 {
   123					compareLen := len(newBlobPrefix)
   124					if len(opts.after) < compareLen {
   125						compareLen = len(opts.after)
   126					}
   127					if newBlobPrefix[:compareLen] < opts.after[:compareLen] {
   128						continue
   129					}
   130				}
   131				ropts := opts
   132				ropts.blobPrefix = newBlobPrefix
   133				ropts.pathInto = opts.pathInto + "/" + name
   134				if err := ds.readBlobs(ctx, ropts); err != nil {
   135					return err
   136				}
   137				continue
   138			}
   139	
   140			if !strings.HasSuffix(name, ".dat") {
   141				continue
   142			}
   143	
   144			fi, err := stat[name].Get()
   145			if err != nil {
   146				return err
   147			}
   148	
   149			if !fi.IsDir() {
   150				blobName := strings.TrimSuffix(name, ".dat")
   151				if blobName <= opts.after {
   152					continue
   153				}
   154				if blobRef, ok := blob.Parse(blobName); ok {
   155					select {
   156					case opts.ch <- blob.SizedRef{Ref: blobRef, Size: uint32(fi.Size())}:
   157						(*opts.remain)--
   158					case <-ctx.Done():
   159						return ctx.Err()
   160					}
   161				}
   162			}
   163		}
   164	
   165		return nil
   166	}
   167	
   168	func (ds *Storage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
   169		defer close(dest)
   170		if limit == 0 {
   171			log.Printf("Warning: files.EnumerateBlobs called with a limit of 0")
   172		}
   173	
   174		limitMutable := limit
   175		return ds.readBlobs(ctx, readBlobRequest{
   176			ch:      dest,
   177			dirRoot: ds.root,
   178			after:   after,
   179			remain:  &limitMutable,
   180		})
   181	}
   182	
   183	func skipDir(name string) bool {
   184		// The partition directory is old. (removed from codebase, but
   185		// likely still on disk for some people) the "cache" and
   186		// "packed" directories are used by the serverconfig/genconfig
   187		// code, as a default location for most users.
   188		return name == "partition" || name == "cache" || name == "packed"
   189	}
   190	
   191	func isShardDir(name string) bool {
   192		return len(name) == 2 && isHex(name[0]) && isHex(name[1])
   193	}
   194	
   195	func isHex(b byte) bool {
   196		return ('0' <= b && b <= '9') || ('a' <= b && b <= 'f')
   197	}
   198	
   199	// future is an os.FileInfo future.
   200	type future struct {
   201		once sync.Once
   202		f    func() (os.FileInfo, error)
   203		v    os.FileInfo
   204		err  error
   205	}
   206	
   207	func newFuture(f func() (os.FileInfo, error)) *future {
   208		return &future{f: f}
   209	}
   210	
   211	func (f *future) Get() (os.FileInfo, error) {
   212		f.once.Do(f.run)
   213		return f.v, f.err
   214	}
   215	
   216	func (f *future) ForceLoad() {
   217		f.Get()
   218	}
   219	
   220	func (f *future) run() { f.v, f.err = f.f() }
Website layout inspired by memcached.
Content by the authors.