Home Download Docs Code Community
     1	/*
     2	Copyright 2011 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	package schema
    18	
    19	import (
    20		"context"
    21		"encoding/json"
    22		"errors"
    23		"fmt"
    24		"io"
    25	
    26		"go4.org/syncutil"
    27	
    28		"perkeep.org/pkg/blob"
    29	)
    30	
    31	// A DirReader reads the entries of a "directory" schema blob's
    32	// referenced "static-set" blob.
    33	type DirReader struct {
    34		fetcher blob.Fetcher
    35		ss      *superset
    36	
    37		staticSet []blob.Ref
    38		current   int
    39	}
    40	
    41	// NewDirReader creates a new directory reader and prepares to
    42	// fetch the static-set entries
    43	func NewDirReader(ctx context.Context, fetcher blob.Fetcher, dirBlobRef blob.Ref) (*DirReader, error) {
    44		ss := new(superset)
    45		err := ss.setFromBlobRef(ctx, fetcher, dirBlobRef)
    46		if err != nil {
    47			return nil, err
    48		}
    49		if ss.Type != "directory" {
    50			return nil, fmt.Errorf("schema/dirreader: expected \"directory\" schema blob for %s, got %q", dirBlobRef, ss.Type)
    51		}
    52		dr, err := ss.NewDirReader(fetcher)
    53		if err != nil {
    54			return nil, fmt.Errorf("schema/dirreader: creating DirReader for %s: %v", dirBlobRef, err)
    55		}
    56		dr.current = 0
    57		return dr, nil
    58	}
    59	
    60	func (b *Blob) NewDirReader(ctx context.Context, fetcher blob.Fetcher) (*DirReader, error) {
    61		return b.ss.NewDirReader(fetcher)
    62	}
    63	
    64	func (ss *superset) NewDirReader(fetcher blob.Fetcher) (*DirReader, error) {
    65		if ss.Type != "directory" {
    66			return nil, fmt.Errorf("Superset not of type \"directory\"")
    67		}
    68		return &DirReader{fetcher: fetcher, ss: ss}, nil
    69	}
    70	
    71	func (ss *superset) setFromBlobRef(ctx context.Context, fetcher blob.Fetcher, blobRef blob.Ref) error {
    72		if !blobRef.Valid() {
    73			return errors.New("schema/dirreader: blobref invalid")
    74		}
    75		ss.BlobRef = blobRef
    76		rc, _, err := fetcher.Fetch(ctx, blobRef)
    77		if err != nil {
    78			return fmt.Errorf("schema/dirreader: fetching schema blob %s: %v", blobRef, err)
    79		}
    80		defer rc.Close()
    81		if err := json.NewDecoder(rc).Decode(ss); err != nil {
    82			return fmt.Errorf("schema/dirreader: decoding schema blob %s: %v", blobRef, err)
    83		}
    84		return nil
    85	}
    86	
    87	// StaticSet returns the whole of the static set members of that directory
    88	func (dr *DirReader) StaticSet(ctx context.Context) ([]blob.Ref, error) {
    89		if dr.staticSet != nil {
    90			return dr.staticSet, nil
    91		}
    92		staticSetBlobref := dr.ss.Entries
    93		if !staticSetBlobref.Valid() {
    94			return nil, errors.New("schema/dirreader: Invalid blobref")
    95		}
    96		members, err := staticSet(ctx, staticSetBlobref, dr.fetcher)
    97		if err != nil {
    98			return nil, err
    99		}
   100		dr.staticSet = members
   101		return dr.staticSet, nil
   102	}
   103	
   104	func staticSet(ctx context.Context, staticSetBlobref blob.Ref, fetcher blob.Fetcher) ([]blob.Ref, error) {
   105		rsc, _, err := fetcher.Fetch(ctx, staticSetBlobref)
   106		if err != nil {
   107			return nil, fmt.Errorf("schema/dirreader: fetching schema blob %s: %v", staticSetBlobref, err)
   108		}
   109		defer rsc.Close()
   110		ss, err := parseSuperset(rsc)
   111		if err != nil {
   112			return nil, fmt.Errorf("schema/dirreader: decoding schema blob %s: %v", staticSetBlobref, err)
   113		}
   114		if ss.Type != "static-set" {
   115			return nil, fmt.Errorf("schema/dirreader: expected \"static-set\" schema blob for %s, got %q", staticSetBlobref, ss.Type)
   116		}
   117		var members []blob.Ref
   118		if len(ss.Members) > 0 {
   119			// We have fileRefs or dirRefs in ss.Members, so we are either in the static-set
   120			// of a small directory, or one of the "leaf" subsets of a large directory spread.
   121			for _, member := range ss.Members {
   122				if !member.Valid() {
   123					return nil, fmt.Errorf("schema/dirreader: invalid (static-set member) blobref referred by \"static-set\" schema blob %v", staticSetBlobref)
   124				}
   125				members = append(members, member)
   126			}
   127			return members, nil
   128		}
   129		// We are either at the top static-set of a large directory, or in a "non-leaf"
   130		// subset of a large directory.
   131		for _, toMerge := range ss.MergeSets {
   132			if !toMerge.Valid() {
   133				return nil, fmt.Errorf("schema/dirreader: invalid (static-set subset) blobref referred by \"static-set\" schema blob %v", staticSetBlobref)
   134			}
   135			// TODO(mpl): do it concurrently
   136			subset, err := staticSet(ctx, toMerge, fetcher)
   137			if err != nil {
   138				return nil, fmt.Errorf("schema/dirreader: could not get members of %q, subset of %v: %v", toMerge, staticSetBlobref, err)
   139			}
   140			members = append(members, subset...)
   141		}
   142		return members, nil
   143	}
   144	
   145	// Readdir implements the Directory interface.
   146	func (dr *DirReader) Readdir(ctx context.Context, n int) (entries []DirectoryEntry, err error) {
   147		sts, err := dr.StaticSet(ctx)
   148		if err != nil {
   149			return nil, fmt.Errorf("schema/dirreader: can't get StaticSet: %v", err)
   150		}
   151		up := dr.current + n
   152		if n <= 0 {
   153			dr.current = 0
   154			up = len(sts)
   155		} else {
   156			if n > (len(sts) - dr.current) {
   157				err = io.EOF
   158				up = len(sts)
   159			}
   160		}
   161	
   162		// TODO(bradfitz): push down information to the fetcher
   163		// (e.g. cachingfetcher -> remote client http) that we're
   164		// going to load a bunch, so the HTTP client (if not using
   165		// SPDY) can do discovery and see if the server supports a
   166		// batch handler, then get them all in one round-trip, rather
   167		// than attacking the server with hundreds of parallel TLS
   168		// setups.
   169	
   170		type res struct {
   171			ent DirectoryEntry
   172			err error
   173		}
   174		var cs []chan res
   175	
   176		// Kick off all directory entry loads.
   177		gate := syncutil.NewGate(20) // Limit IO concurrency
   178		for _, entRef := range sts[dr.current:up] {
   179			c := make(chan res, 1)
   180			cs = append(cs, c)
   181			gate.Start()
   182			go func(entRef blob.Ref) {
   183				defer gate.Done()
   184				entry, err := NewDirectoryEntryFromBlobRef(ctx, dr.fetcher, entRef)
   185				c <- res{entry, err}
   186			}(entRef)
   187		}
   188	
   189		for _, c := range cs {
   190			res := <-c
   191			if res.err != nil {
   192				return nil, fmt.Errorf("schema/dirreader: can't create dirEntry: %v", res.err)
   193			}
   194			entries = append(entries, res.ent)
   195		}
   196		return entries, nil
   197	}
Website layout inspired by memcached.
Content by the authors.