Home Download Docs Code Community
import "perkeep/pkg/blobserver"
Overview
Index
Subdirectories

Overview ▾

Package blobserver defines how raw blobs are stored and accessed.

Index

Constants
Variables
func CreateHandler(typ string, loader Loader, config jsonconfig.Obj) (http.Handler, error)
func EnumerateAll(ctx context.Context, src BlobEnumerator, fn func(blob.SizedRef) error) error
func EnumerateAllFrom(ctx context.Context, src BlobEnumerator, after string, fn func(blob.SizedRef) error) error
func ListMissingDestinationBlobs(destMissing chan<- blob.SizedRef, sizeMismatch func(blob.Ref), srcch, dstch <-chan blob.SizedRef)
func MergedEnumerate(ctx context.Context, dest chan<- blob.SizedRef, sources []BlobEnumerator, after string, limit int) error
func MergedEnumerateStorage(ctx context.Context, dest chan<- blob.SizedRef, sources []Storage, after string, limit int) error
func Receive(ctx context.Context, dst BlobReceiver, br blob.Ref, src io.Reader) (blob.SizedRef, error)
func ReceiveNoHash(ctx context.Context, dst BlobReceiver, br blob.Ref, src io.Reader) (blob.SizedRef, error)
func ReceiveString(ctx context.Context, dst BlobReceiver, s string) (blob.SizedRef, error)
func RefTypes(src BlobEnumerator) ([]string, error)
func RegisterHandlerConstructor(typ string, ctor HandlerConstructor)
func RegisterStorageConstructor(typ string, ctor StorageConstructor)
func StatBlob(ctx context.Context, bs BlobStatter, br blob.Ref) (blob.SizedRef, error)
func StatBlobs(ctx context.Context, bs BlobStatter, blobs []blob.Ref) (map[blob.Ref]blob.SizedRef, error)
func StatBlobsParallelHelper(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error, gate *syncutil.Gate, worker func(blob.Ref) (blob.SizedRef, error)) error
func WaitForBlob(storage interface{}, deadline time.Time, blobs []blob.Ref)
type BlobAndToken
type BlobEnumerator
type BlobHub
    func GetHub(storage interface{}) BlobHub
type BlobReceiveConfiger
type BlobReceiver
type BlobRemover
type BlobStatter
type BlobStreamer
    func NewMultiBlobStreamer(streamers ...BlobStreamer) BlobStreamer
type Cache
type Config
type Configer
type FetcherEnumerator
type FindHandlerByTyper
type GenerationNotSupportedError
    func (s GenerationNotSupportedError) Error() string
type Generationer
type HandlerConstructor
type HandlerIniter
type Loader
type MaxEnumerateConfig
type NoImplStorage
    func (NoImplStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error
    func (NoImplStorage) Fetch(context.Context, blob.Ref) (file io.ReadCloser, size uint32, err error)
    func (NoImplStorage) ReceiveBlob(context.Context, blob.Ref, io.Reader) (sb blob.SizedRef, err error)
    func (NoImplStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error
    func (NoImplStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error
type ShutdownStorage
type StatReceiver
type Storage
    func CreateStorage(typ string, loader Loader, config jsonconfig.Obj) (Storage, error)
type StorageConfiger
type StorageConstructor
type StorageHandler
type WholeRefFetcher

Package files

blobhub.go doc.go enumerate.go interface.go mergedenum.go multistream.go noimpl.go receive.go registry.go stat.go sync.go

Constants

const MaxBlobSize = constants.MaxBlobSize

MaxBlobSize is the size of a single blob in Perkeep.

Variables

var (
    ErrCorruptBlob = errors.New("corrupt blob; digest doesn't match")

    // ErrNotImplemented should be returned in methods where the function is not implemented
    ErrNotImplemented = errors.New("not implemented")
)
var ErrHandlerTypeNotFound = errors.New("requested handler type not loaded")
var ErrReadonly = errors.New("this blobserver is read only")

ErrReadonly is the error value returned by read-only blobservers.

func CreateHandler

func CreateHandler(typ string, loader Loader, config jsonconfig.Obj) (http.Handler, error)

CreateHandler instantiates an http Handler of type 'typ' from the provided JSON configuration, and finding peer handlers and configuration from the environment in 'loader'.

The handler 'typ' must have been previously registered with RegisterHandlerConstructor.

func EnumerateAll

func EnumerateAll(ctx context.Context, src BlobEnumerator, fn func(blob.SizedRef) error) error

EnumerateAll runs fn for each blob in src. If fn returns an error, iteration stops and fn isn't called again. EnumerateAll will not return concurrently with fn.

func EnumerateAllFrom

func EnumerateAllFrom(ctx context.Context, src BlobEnumerator, after string, fn func(blob.SizedRef) error) error

EnumerateAllFrom is like EnumerateAll, but takes an after parameter.

func ListMissingDestinationBlobs

func ListMissingDestinationBlobs(destMissing chan<- blob.SizedRef, sizeMismatch func(blob.Ref), srcch, dstch <-chan blob.SizedRef)

ListMissingDestinationBlobs reads from 'srcch' and 'dstch' (sorted enumerations of blobs from two blob servers) and sends to 'destMissing' any blobs which appear on the source but not at the destination.

destMissing is closed at the end.

If an invalid (zero) blob from srcch or dstch arrives, ListMissingDestinationBlobs stops.

func MergedEnumerate

func MergedEnumerate(ctx context.Context, dest chan<- blob.SizedRef, sources []BlobEnumerator, after string, limit int) error

MergedEnumerate implements the BlobEnumerator interface by merge-joining 0 or more sources.

func MergedEnumerateStorage

func MergedEnumerateStorage(ctx context.Context, dest chan<- blob.SizedRef, sources []Storage, after string, limit int) error

MergedEnumerateStorage implements the BlobEnumerator interface by merge-joining 0 or more sources.

In this version, the sources implement the Storage interface, even though only the BlobEnumerator interface is used.

func Receive

func Receive(ctx context.Context, dst BlobReceiver, br blob.Ref, src io.Reader) (blob.SizedRef, error)

Receive wraps calling a BlobReceiver's ReceiveBlob method, additionally providing verification of the src digest, and also notifying the blob hub on success. The error will be ErrCorruptBlob if the blobref didn't match.

func ReceiveNoHash

func ReceiveNoHash(ctx context.Context, dst BlobReceiver, br blob.Ref, src io.Reader) (blob.SizedRef, error)

func ReceiveString

func ReceiveString(ctx context.Context, dst BlobReceiver, s string) (blob.SizedRef, error)

ReceiveString uploads the blob given by the string s to dst and returns its blobref and size.

func RefTypes

func RefTypes(src BlobEnumerator) ([]string, error)

RefTypes returns a list of blobref types appearing on the provided enumerator. A blobref type is a string like "sha1", or whatever is on the left side of the hyphen in a blobref. To get the alphabet valid for the right side of the hyphen, use blob.TypeAlphabet(type).

func RegisterHandlerConstructor

func RegisterHandlerConstructor(typ string, ctor HandlerConstructor)

RegisterHandlerConstructor registers an http Handler constructor function for a given handler type.

It is an error to register the same handler type twice.

func RegisterStorageConstructor

func RegisterStorageConstructor(typ string, ctor StorageConstructor)

func StatBlob

func StatBlob(ctx context.Context, bs BlobStatter, br blob.Ref) (blob.SizedRef, error)

StatBlob calls bs.StatBlobs to stat a single blob. If the blob is not found, the error is os.ErrNotExist.

func StatBlobs

func StatBlobs(ctx context.Context, bs BlobStatter, blobs []blob.Ref) (map[blob.Ref]blob.SizedRef, error)

StatBlobs stats multiple blobs and returns a map of the found refs to their sizes.

func StatBlobsParallelHelper

func StatBlobsParallelHelper(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error,
    gate *syncutil.Gate, worker func(blob.Ref) (blob.SizedRef, error)) error

StatBlobsParallelHelper is for use by blobserver implementations that want to issue stats in parallel. This runs worker in multiple goroutines (bounded by gate), but calls fn in serial, per the BlobStatter contract, and stops once there's a failure.

The worker func should return two zero values to signal that a blob doesn't exist. (This is different than the StatBlob func, which returns os.ErrNotExist)

func WaitForBlob

func WaitForBlob(storage interface{}, deadline time.Time, blobs []blob.Ref)

WaitForBlob waits until deadline for blobs to arrive. If blobs is empty, any blobs are waited on. Otherwise, those specific blobs are waited on. When WaitForBlob returns, nothing may have happened.

type BlobAndToken

type BlobAndToken struct {
    *blob.Blob
    // Token is the continuation token to resume streaming
    // starting at this blob in the future.
    Token string
}

BlobAndToken is the value used by the BlobStreamer interface, containing both a Blob and a continuation token.

type BlobEnumerator

type BlobEnumerator interface {
    // EnumerateBobs sends at most limit SizedBlobRef into dest,
    // sorted, as long as they are lexigraphically greater than
    // after (if provided).
    // limit will be supplied and sanity checked by caller.
    // EnumerateBlobs must close the channel.  (even if limit
    // was hit and more blobs remain, or an error is returned, or
    // the ctx is canceled)
    EnumerateBlobs(ctx context.Context,
        dest chan<- blob.SizedRef,
        after string,
        limit int) error
}

type BlobHub

type BlobHub interface {

    // NotifyBlobReceived notes that a storage target has successfully received
    // a blob and asynchronously notifies registered listeners.
    //
    // If any synchronous receive hooks are registered, they're run before
    // NotifyBlobReceived returns and their error is returned.
    NotifyBlobReceived(blob.SizedRef) error

    // AddReceiveHook adds a hook that is synchronously run
    // whenever blobs are received.  All registered hooks are run
    // on each blob upload but if more than one returns an error,
    // NotifyBlobReceived will only return one of the errors.
    AddReceiveHook(func(blob.SizedRef) error)

    RegisterListener(ch chan<- blob.Ref)
    UnregisterListener(ch chan<- blob.Ref)

    RegisterBlobListener(blob blob.Ref, ch chan<- blob.Ref)
    UnregisterBlobListener(blob blob.Ref, ch chan<- blob.Ref)
}

func GetHub

func GetHub(storage interface{}) BlobHub

GetHub return a BlobHub for the given storage implementation.

type BlobReceiveConfiger

type BlobReceiveConfiger interface {
    BlobReceiver
    Configer
}

type BlobReceiver

type BlobReceiver interface {
    // ReceiveBlob accepts a newly uploaded blob and writes it to
    // permanent storage.
    //
    // Implementations of BlobReceiver downstream of the HTTP
    // server can trust that the source isn't larger than
    // MaxBlobSize and that its digest matches the provided blob
    // ref. (If not, the read of the source will fail before EOF)
    //
    // To ensure those guarantees, callers of ReceiveBlob should
    // not call ReceiveBlob directly but instead use either
    // blobserver.Receive or blobserver.ReceiveString, which also
    // take care of notifying the BlobReceiver's "BlobHub"
    // notification bus for observers.
    ReceiveBlob(ctx context.Context, br blob.Ref, source io.Reader) (blob.SizedRef, error)
}

BlobReceiver is the interface for receiving blobs.

type BlobRemover

type BlobRemover interface {
    // RemoveBlobs removes 0 or more blobs. Removal of
    // non-existent items isn't an error. Returns failure if any
    // items existed but failed to be deleted.
    // ErrNotImplemented may be returned for storage types not implementing removal.
    // If RemoveBlobs returns an error, it's possible that either
    // none or only some of the blobs were deleted.
    RemoveBlobs(ctx context.Context, blobs []blob.Ref) error
}

type BlobStatter

type BlobStatter interface {
    // Stat checks for the existence of blobs, calling fn in
    // serial for each found blob, in any order, but with no
    // duplicates. The blobs slice should not have duplicates.
    //
    // If fn returns an error, StatBlobs returns with that value
    // and makes no further calls to fn.
    //
    // StatBlobs does not return an error on missing blobs, only
    // on failure to stat blobs.
    StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error
}

BlobStatter is the interface for checking the size and existence of blobs.

type BlobStreamer

type BlobStreamer interface {
    // BlobStream is an optional interface that may be implemented by
    // Storage implementations.
    //
    // StreamBlobs sends blobs to dest in an unspecified order. It is
    // expected that a Storage implementation implementing
    // BlobStreamer will send blobs to dest in the most efficient
    // order possible.
    //
    // The provided continuation token resumes the stream at a
    // point. To start from the beginning, send the empty string.
    // The token is opaque and must never be interpreted; its
    // format may change between versions of the server.
    //
    // If the content is canceled, the error value is
    // context.Canceled.
    //
    // StreamBlobs must unconditionally close dest before
    // returning, and it must return context.Canceled if
    // ctx.Done() becomes readable.
    //
    // When StreamBlobs reaches the end, the return value is nil.
    StreamBlobs(ctx context.Context, dest chan<- BlobAndToken, contToken string) error
}

func NewMultiBlobStreamer

func NewMultiBlobStreamer(streamers ...BlobStreamer) BlobStreamer

NewMultiBlobStreamer concatenates multiple BlobStreamers into one.

type Cache

type Cache interface {
    blob.Fetcher
    BlobReceiver
    BlobStatter
}

Cache is the minimal interface expected of a blob cache.

type Config

type Config struct {
    Writable    bool
    Readable    bool
    Deletable   bool
    CanLongPoll bool

    // the "http://host:port" and optional path (but without trailing slash) to have "/camli/*" appended
    URLBase       string
    HandlerFinder FindHandlerByTyper
}

type Configer

type Configer interface {
    Config() *Config
}

type FetcherEnumerator

type FetcherEnumerator interface {
    blob.Fetcher
    BlobEnumerator
}

type FindHandlerByTyper

type FindHandlerByTyper interface {
    // FindHandlerByType finds a handler by its handlerType and
    // returns its prefix and handler if it's loaded.  If it's not
    // loaded, the error will be ErrHandlerTypeNotFound.
    //
    // This is used by handlers to find siblings (such as the "ui" type handler)
    // which might have more knowledge about the configuration for discovery, etc.
    //
    // Note that if this is called during handler construction
    // time, only the prefix may be returned with a nil handler
    // and nil err.  Unlike GetHandler and GetStorage, this does
    // not cause the prefix to load immediately. At runtime (after
    // construction of all handlers), then prefix and handler will
    // both be non-nil when err is nil.
    FindHandlerByType(handlerType string) (prefix string, handler interface{}, err error)

    // AllHandlers returns a map from prefix to handler type, and
    // a map from prefix to handler.
    AllHandlers() (map[string]string, map[string]interface{})
}

type GenerationNotSupportedError

type GenerationNotSupportedError string

A GenerationNotSupportedError explains why a Storage value implemented the Generationer interface but failed due to a wrapped Storage value not implementing the interface.

func (GenerationNotSupportedError) Error

func (s GenerationNotSupportedError) Error() string

type Generationer

type Generationer interface {
    // Generation returns a Storage's initialization time and
    // and unique random string (or UUID).  Implementations
    // should call ResetStorageGeneration on demand if no
    // information is known.
    // The error will be of type GenerationNotSupportedError if an underlying
    // storage target doesn't support the Generationer interface.
    StorageGeneration() (initTime time.Time, random string, err error)

    // ResetGeneration deletes the information returned by Generation
    // and re-generates it.
    ResetStorageGeneration() error
}

Generationer is an optional interface and an optimization and paranoia facility for clients which can be implemented by Storage implementations.

If the client sees the same random string in multiple upload sessions, it assumes that the blobserver still has all the same blobs, and also it's the same server. This mechanism is not fundamental to Perkeep's operation: the client could also check each blob before uploading, or enumerate all blobs from the server too. This is purely an optimization so clients can mix this value into their "is this file uploaded?" local cache keys.

type HandlerConstructor

type HandlerConstructor func(Loader, jsonconfig.Obj) (http.Handler, error)

A HandlerConstructor returns an http.Handler from a Loader environment and a configuration.

type HandlerIniter

type HandlerIniter interface {
    InitHandler(FindHandlerByTyper) error
}

HandlerIniter is an optional interface which can be implemented by Storage or http.Handlers (from StorageConstructor or HandlerConstructor) to be called once all the handlers have been created.

type Loader

type Loader interface {
    FindHandlerByTyper

    // MyPrefix returns the prefix of the handler currently being constructed,
    // with both leading and trailing slashes (e.g. "/ui/").
    MyPrefix() string

    // BaseURL returns the server's base URL, without trailing slash, and not including
    // the prefix (as returned by MyPrefix).
    BaseURL() string

    // GetHandlerType returns the handler's configured type, but does
    // not force it to start being loaded yet.
    GetHandlerType(prefix string) string // returns "" if unknown

    // GetHandler returns either a Storage or an http.Handler.
    // It forces the handler to be loaded and returns an error if
    // a cycle is created.
    GetHandler(prefix string) (interface{}, error)

    // GetStorage is like GetHandler but requires that the Handler be
    // a storage Handler.
    GetStorage(prefix string) (Storage, error)
}

type MaxEnumerateConfig

type MaxEnumerateConfig interface {
    Storage

    // MaxEnumerate returns the max that this storage interface is
    // capable of enumerating at once.
    MaxEnumerate() int
}

MaxEnumerateConfig is an optional interface implemented by Storage interfaces to advertise their max value for how many items can be enumerated at once.

type NoImplStorage

type NoImplStorage struct{}

NoImplStorage is an implementation of Storage that returns a not implemented error for all operations.

func (NoImplStorage) EnumerateBlobs

func (NoImplStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error

func (NoImplStorage) Fetch

func (NoImplStorage) Fetch(context.Context, blob.Ref) (file io.ReadCloser, size uint32, err error)

func (NoImplStorage) ReceiveBlob

func (NoImplStorage) ReceiveBlob(context.Context, blob.Ref, io.Reader) (sb blob.SizedRef, err error)

func (NoImplStorage) RemoveBlobs

func (NoImplStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error

func (NoImplStorage) StatBlobs

func (NoImplStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error

type ShutdownStorage

type ShutdownStorage interface {
    Storage
    io.Closer
}

ShutdownStorage is an optional interface for storage implementations which can be asked to shut down cleanly. Regardless, all implementations should be able to survive crashes without data loss.

type StatReceiver

type StatReceiver interface {
    BlobReceiver
    BlobStatter
}

type Storage

type Storage interface {
    blob.Fetcher
    BlobReceiver
    BlobStatter
    BlobEnumerator
    BlobRemover
}

Storage is the interface that must be implemented by a blobserver storage type. (e.g. localdisk, s3, encrypt, shard, replica, remote)

func CreateStorage

func CreateStorage(typ string, loader Loader, config jsonconfig.Obj) (Storage, error)

type StorageConfiger

type StorageConfiger interface {
    Storage
    Configer
}

type StorageConstructor

type StorageConstructor func(Loader, jsonconfig.Obj) (Storage, error)

A StorageConstructor returns a Storage implementation from a Loader environment and a configuration.

type StorageHandler

type StorageHandler interface {
    Storage
    http.Handler
}

StorageHandler is a storage implementation that also exports an HTTP status page.

type WholeRefFetcher

type WholeRefFetcher interface {
    // OpenWholeRef returns a ReadCloser reading from offset bytes
    // into wholeRef (the blobref of an entire file).
    //
    // The returned wholeSize is the size of the file, without
    // subtracting any offset.
    //
    // The err will be os.ErrNotExist if the wholeref is not
    // known.
    OpenWholeRef(wholeRef blob.Ref, offset int64) (rc io.ReadCloser, wholeSize int64, err error)
}

WholeRefFetcher is an optional fast-path interface exposed by the 'blobpacked' blob storage implementation, which packs pieces of files together and can efficiently serve them contigously.

Subdirectories

Name      Synopsis
..
archiver      Package archiver zips lots of little blobs into bigger zip files and stores them somewhere.
azure      Package azure registers the "azure" blobserver storage type, storing blobs in a Microsoft Azure Blob Storage container.
blobpacked      Package blobpacked registers the "blobpacked" blobserver storage type, storing blobs initially as one physical blob per logical blob, but then rearranging little physical blobs into large contiguous blobs organized by how they'll likely be accessed.
cond      Package cond registers the "cond" conditional blobserver storage type to select routing of get/put operations on blobs to other storage targets as a function of their content.
dir      Package dir implements the blobserver Storage interface for a directory, detecting whether the directory is file-per-blob (localdisk) or diskpacked.
diskpacked      Package diskpacked registers the "diskpacked" blobserver storage type, storing blobs packed together into monolithic data files with an index listing the sizes and offsets of the little blobs within the large files.
encrypt      Package encrypt registers the "encrypt" blobserver storage type which stores all blobs and metadata with age encryption into other wrapped storage targets (e.g.
files      Package files implements the blobserver interface by storing each blob in its own file in nested directories.
gethandler      Package gethandler implements the HTTP handler for fetching blobs.
google     
     cloudstorage      Package cloudstorage registers the "googlecloudstorage" blob storage type, storing blobs on Google Cloud Storage (not Google Drive).
     drive      Package drive registers the "googledrive" blobserver storage type, storing blobs in a Google Drive folder.
          service      Package service translates blobserver.Storage methods into Google Drive API methods.
handlers      Package handlers implements the HTTP interface to the Perkeep blob server.
local      Package local implements functionality common to both the "localdisk" and "diskpacked" storage mechanisms.
localdisk      Package localdisk registers the "filesystem" blobserver storage type, storing blobs in a forest of sharded directories at the specified root.
memory      Package memory registers the "memory" blobserver storage type, storing blobs in an in-memory map.
mongo      Package mongo registers the "mongo" blobserver storage type, storing blobs using MongoDB.
namespace      Package namespace implements the "namespace" blobserver storage type.
overlay      Package overlay registers the "overlay" blobserver storage type that presents storage that is the result of overlaying a storage ("upper") on top of another storage ("lower").
protocol      Package protocol contains types for Perkeep protocol types.
proxycache      Package proxycache registers the "proxycache" blobserver storage type, which uses a provided blobserver as a cache for a second origin blobserver.
remote      Package remote registers the "remote" blobserver storage type, storing and fetching blobs from a remote Perkeep server over HTTPS.
replica      Package replica registers the "replica" blobserver storage type, providing synchronous replication to one more backends.
s3      Package s3 registers the "s3" blobserver storage type, storing blobs in an Amazon Web Services' S3 storage bucket.
sftp      Package sftp registers the "sftp" blobserver storage type, storing blobs one-per-file in a forest of sharded directories to a remote SFTP server over an SSH connection.
shard      Package shard registers the "shard" blobserver storage type, predictably spraying out blobs out over the provided backends based on their blobref.
stats      Package stats contains an in-memory StatReceiver that only stores sizes of received blobs but not their contents.
storagetest      Package storagetest tests blobserver.Storage implementations
union      Package union registers the "union" read-only blobserver storage type to read from the given subsets, serving the first responding.
Website layout inspired by memcached.
Content by the authors.