Home Download Docs Code Community
     1	/*
     2	Copyright 2014 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	// Package namespace implements the "namespace" blobserver storage type.
    18	//
    19	// A namespace storage is backed by another storage target but only
    20	// has access and visibility to a subset of the blobs which have been
    21	// uploaded through this namespace. The list of accessible blobs are
    22	// stored in the provided "inventory" sorted key/value target.
    23	package namespace // import "perkeep.org/pkg/blobserver/namespace"
    24	
    25	import (
    26		"bytes"
    27		"context"
    28		"fmt"
    29		"io"
    30		"log"
    31		"os"
    32		"strconv"
    33	
    34		"go4.org/jsonconfig"
    35		"perkeep.org/pkg/blob"
    36		"perkeep.org/pkg/blobserver"
    37		"perkeep.org/pkg/sorted"
    38	
    39		"go4.org/strutil"
    40	)
    41	
    42	type nsto struct {
    43		inventory sorted.KeyValue
    44		master    blobserver.Storage
    45	}
    46	
    47	func init() {
    48		blobserver.RegisterStorageConstructor("namespace", blobserver.StorageConstructor(newFromConfig))
    49	}
    50	
    51	func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err error) {
    52		sto := &nsto{}
    53		invConf := config.RequiredObject("inventory")
    54		masterName := config.RequiredString("storage")
    55		if err := config.Validate(); err != nil {
    56			return nil, err
    57		}
    58		sto.inventory, err = sorted.NewKeyValue(invConf)
    59		if err != nil {
    60			return nil, fmt.Errorf("Invalid 'inventory' configuration: %v", err)
    61		}
    62		sto.master, err = ld.GetStorage(masterName)
    63		if err != nil {
    64			return nil, fmt.Errorf("Invalid 'storage' configuration: %v", err)
    65		}
    66		return sto, nil
    67	}
    68	
    69	func (ns *nsto) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
    70		defer close(dest)
    71		done := ctx.Done()
    72	
    73		it := ns.inventory.Find(after, "")
    74		first := true
    75		for limit > 0 && it.Next() {
    76			if first {
    77				first = false
    78				if after != "" && it.Key() == after {
    79					continue
    80				}
    81			}
    82			br, ok := blob.ParseBytes(it.KeyBytes())
    83			size, err := strutil.ParseUintBytes(it.ValueBytes(), 10, 32)
    84			if !ok || err != nil {
    85				log.Printf("Bogus namespace key %q / value %q", it.Key(), it.Value())
    86				continue
    87			}
    88			select {
    89			case dest <- blob.SizedRef{Ref: br, Size: uint32(size)}:
    90			case <-done:
    91				return ctx.Err()
    92			}
    93			limit--
    94		}
    95		if err := it.Close(); err != nil {
    96			return err
    97		}
    98		return nil
    99	}
   100	
   101	func (ns *nsto) Fetch(ctx context.Context, br blob.Ref) (rc io.ReadCloser, size uint32, err error) {
   102		invSizeStr, err := ns.inventory.Get(br.String())
   103		if err == sorted.ErrNotFound {
   104			err = os.ErrNotExist
   105			return
   106		}
   107		if err != nil {
   108			return
   109		}
   110		invSize, err := strconv.ParseUint(invSizeStr, 10, 32)
   111		if err != nil {
   112			return
   113		}
   114		rc, size, err = ns.master.Fetch(ctx, br)
   115		if err != nil {
   116			return
   117		}
   118		if size != uint32(invSize) {
   119			log.Printf("namespace: on blob %v, unexpected inventory size %d for master size %d", br, invSize, size)
   120			return nil, 0, os.ErrNotExist
   121		}
   122		return rc, size, nil
   123	}
   124	
   125	func (ns *nsto) ReceiveBlob(ctx context.Context, br blob.Ref, src io.Reader) (sb blob.SizedRef, err error) {
   126		var buf bytes.Buffer
   127		size, err := io.Copy(&buf, src)
   128		if err != nil {
   129			return
   130		}
   131	
   132		// Check if a duplicate blob, already uploaded previously.
   133		if _, ierr := ns.inventory.Get(br.String()); ierr == nil {
   134			return blob.SizedRef{Ref: br, Size: uint32(size)}, nil
   135		}
   136	
   137		sb, err = ns.master.ReceiveBlob(ctx, br, &buf)
   138		if err != nil {
   139			return
   140		}
   141	
   142		err = ns.inventory.Set(br.String(), strconv.Itoa(int(size)))
   143		return
   144	}
   145	
   146	func (ns *nsto) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
   147		for _, br := range blobs {
   148			if err := ns.inventory.Delete(br.String()); err != nil {
   149				return err
   150			}
   151		}
   152		return nil
   153	}
   154	
   155	func (ns *nsto) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
   156		for _, br := range blobs {
   157			invSizeStr, err := ns.inventory.Get(br.String())
   158			if err == sorted.ErrNotFound {
   159				continue
   160			}
   161			if err != nil {
   162				return err
   163			}
   164			invSize, err := strconv.ParseUint(invSizeStr, 10, 32)
   165			if err != nil {
   166				log.Printf("Bogus namespace key %q / value %q", br.String(), invSizeStr)
   167			}
   168			if err := fn(blob.SizedRef{Ref: br, Size: uint32(invSize)}); err != nil {
   169				return err
   170			}
   171		}
   172		return nil
   173	}
Website layout inspired by memcached.
Content by the authors.