- Constants
- Variables
- func CreateHandler(typ string, loader Loader, config jsonconfig.Obj) (http.Handler, error)
- func EnumerateAll(ctx context.Context, src BlobEnumerator, fn func(blob.SizedRef) error) error
- func EnumerateAllFrom(ctx context.Context, src BlobEnumerator, after string, fn func(blob.SizedRef) error) error
- func ListMissingDestinationBlobs(destMissing chan<- blob.SizedRef, sizeMismatch func(blob.Ref), srcch, dstch <-chan blob.SizedRef)
- func MergedEnumerate(ctx context.Context, dest chan<- blob.SizedRef, sources []BlobEnumerator, after string, limit int) error
- func MergedEnumerateStorage(ctx context.Context, dest chan<- blob.SizedRef, sources []Storage, after string, limit int) error
- func Receive(ctx context.Context, dst BlobReceiver, br blob.Ref, src io.Reader) (blob.SizedRef, error)
- func ReceiveNoHash(ctx context.Context, dst BlobReceiver, br blob.Ref, src io.Reader) (blob.SizedRef, error)
- func ReceiveString(ctx context.Context, dst BlobReceiver, s string) (blob.SizedRef, error)
- func RefTypes(src BlobEnumerator) ([]string, error)
- func RegisterHandlerConstructor(typ string, ctor HandlerConstructor)
- func RegisterStorageConstructor(typ string, ctor StorageConstructor)
- func StatBlob(ctx context.Context, bs BlobStatter, br blob.Ref) (blob.SizedRef, error)
- func StatBlobs(ctx context.Context, bs BlobStatter, blobs []blob.Ref) (map[blob.Ref]blob.SizedRef, error)
- func StatBlobsParallelHelper(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error,
gate *syncutil.Gate, worker func(blob.Ref) (blob.SizedRef, error)) error
- func WaitForBlob(storage interface{}, deadline time.Time, blobs []blob.Ref)
- type BlobAndToken
- type BlobEnumerator
- type BlobHub
- func GetHub(storage interface{}) BlobHub
- type BlobReceiveConfiger
- type BlobReceiver
- type BlobRemover
- type BlobStatter
- type BlobStreamer
- func NewMultiBlobStreamer(streamers ...BlobStreamer) BlobStreamer
- type Cache
- type Config
- type Configer
- type FetcherEnumerator
- type FindHandlerByTyper
- type GenerationNotSupportedError
- func (s GenerationNotSupportedError) Error() string
- type Generationer
- type HandlerConstructor
- type HandlerIniter
- type Loader
- type MaxEnumerateConfig
- type NoImplStorage
- func (NoImplStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error
- func (NoImplStorage) Fetch(context.Context, blob.Ref) (file io.ReadCloser, size uint32, err error)
- func (NoImplStorage) ReceiveBlob(context.Context, blob.Ref, io.Reader) (sb blob.SizedRef, err error)
- func (NoImplStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error
- func (NoImplStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error
- type ShutdownStorage
- type StatReceiver
- type Storage
- func CreateStorage(typ string, loader Loader, config jsonconfig.Obj) (Storage, error)
- type StorageConfiger
- type StorageConstructor
- type StorageHandler
- type WholeRefFetcher
Package files
blobhub.go
doc.go
enumerate.go
interface.go
mergedenum.go
multistream.go
noimpl.go
receive.go
registry.go
stat.go
sync.go
Constants
const MaxBlobSize = constants.MaxBlobSize
MaxBlobSize is the size of a single blob in Perkeep.
Variables
var (
ErrCorruptBlob = errors.New("corrupt blob; digest doesn't match")
ErrNotImplemented = errors.New("not implemented")
)
var ErrHandlerTypeNotFound = errors.New("requested handler type not loaded")
var ErrReadonly = errors.New("this blobserver is read only")
ErrReadonly is the error value returned by read-only blobservers.
func CreateHandler(typ string, loader Loader, config jsonconfig.Obj) (http.Handler, error)
CreateHandler instantiates an http Handler of type 'typ' from the
provided JSON configuration, and finding peer handlers and
configuration from the environment in 'loader'.
The handler 'typ' must have been previously registered with
RegisterHandlerConstructor.
func EnumerateAll(ctx context.Context, src BlobEnumerator, fn func(blob.SizedRef) error) error
EnumerateAll runs fn for each blob in src.
If fn returns an error, iteration stops and fn isn't called again.
EnumerateAll will not return concurrently with fn.
func EnumerateAllFrom(ctx context.Context, src BlobEnumerator, after string, fn func(blob.SizedRef) error) error
EnumerateAllFrom is like EnumerateAll, but takes an after parameter.
func ListMissingDestinationBlobs(destMissing chan<- blob.SizedRef, sizeMismatch func(blob.Ref), srcch, dstch <-chan blob.SizedRef)
ListMissingDestinationBlobs reads from 'srcch' and 'dstch' (sorted
enumerations of blobs from two blob servers) and sends to
'destMissing' any blobs which appear on the source but not at the
destination.
destMissing is closed at the end.
If an invalid (zero) blob from srcch or dstch arrives,
ListMissingDestinationBlobs stops.
func MergedEnumerate(ctx context.Context, dest chan<- blob.SizedRef, sources []BlobEnumerator, after string, limit int) error
MergedEnumerate implements the BlobEnumerator interface by
merge-joining 0 or more sources.
func MergedEnumerateStorage(ctx context.Context, dest chan<- blob.SizedRef, sources []Storage, after string, limit int) error
MergedEnumerateStorage implements the BlobEnumerator interface by
merge-joining 0 or more sources.
In this version, the sources implement the Storage interface, even
though only the BlobEnumerator interface is used.
func Receive(ctx context.Context, dst BlobReceiver, br blob.Ref, src io.Reader) (blob.SizedRef, error)
Receive wraps calling a BlobReceiver's ReceiveBlob method,
additionally providing verification of the src digest, and also
notifying the blob hub on success.
The error will be ErrCorruptBlob if the blobref didn't match.
func ReceiveNoHash(ctx context.Context, dst BlobReceiver, br blob.Ref, src io.Reader) (blob.SizedRef, error)
func ReceiveString(ctx context.Context, dst BlobReceiver, s string) (blob.SizedRef, error)
ReceiveString uploads the blob given by the string s to dst
and returns its blobref and size.
func RefTypes(src BlobEnumerator) ([]string, error)
RefTypes returns a list of blobref types appearing on the provided enumerator.
A blobref type is a string like "sha1", or whatever is on the left side
of the hyphen in a blobref.
To get the alphabet valid for the right side of the hyphen, use blob.TypeAlphabet(type).
func RegisterHandlerConstructor(typ string, ctor HandlerConstructor)
RegisterHandlerConstructor registers an http Handler constructor function
for a given handler type.
It is an error to register the same handler type twice.
func RegisterStorageConstructor(typ string, ctor StorageConstructor)
func StatBlob(ctx context.Context, bs BlobStatter, br blob.Ref) (blob.SizedRef, error)
StatBlob calls bs.StatBlobs to stat a single blob.
If the blob is not found, the error is os.ErrNotExist.
func StatBlobs(ctx context.Context, bs BlobStatter, blobs []blob.Ref) (map[blob.Ref]blob.SizedRef, error)
StatBlobs stats multiple blobs and returns a map
of the found refs to their sizes.
func StatBlobsParallelHelper(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error,
gate *syncutil.Gate, worker func(blob.Ref) (blob.SizedRef, error)) error
StatBlobsParallelHelper is for use by blobserver implementations
that want to issue stats in parallel. This runs worker in multiple
goroutines (bounded by gate), but calls fn in serial, per the
BlobStatter contract, and stops once there's a failure.
The worker func should return two zero values to signal that a blob
doesn't exist. (This is different than the StatBlob func, which
returns os.ErrNotExist)
func WaitForBlob(storage interface{}, deadline time.Time, blobs []blob.Ref)
WaitForBlob waits until deadline for blobs to arrive. If blobs is empty, any
blobs are waited on. Otherwise, those specific blobs are waited on.
When WaitForBlob returns, nothing may have happened.
type BlobAndToken struct {
*blob.Blob
Token string
}
BlobAndToken is the value used by the BlobStreamer interface,
containing both a Blob and a continuation token.
type BlobEnumerator interface {
EnumerateBlobs(ctx context.Context,
dest chan<- blob.SizedRef,
after string,
limit int) error
}
type BlobHub interface {
NotifyBlobReceived(blob.SizedRef) error
AddReceiveHook(func(blob.SizedRef) error)
RegisterListener(ch chan<- blob.Ref)
UnregisterListener(ch chan<- blob.Ref)
RegisterBlobListener(blob blob.Ref, ch chan<- blob.Ref)
UnregisterBlobListener(blob blob.Ref, ch chan<- blob.Ref)
}
func GetHub(storage interface{}) BlobHub
GetHub return a BlobHub for the given storage implementation.
type BlobReceiveConfiger interface {
BlobReceiver
Configer
}
type BlobReceiver interface {
ReceiveBlob(ctx context.Context, br blob.Ref, source io.Reader) (blob.SizedRef, error)
}
BlobReceiver is the interface for receiving blobs.
type BlobRemover interface {
RemoveBlobs(ctx context.Context, blobs []blob.Ref) error
}
type BlobStatter interface {
StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error
}
BlobStatter is the interface for checking the size and existence of blobs.
type BlobStreamer interface {
StreamBlobs(ctx context.Context, dest chan<- BlobAndToken, contToken string) error
}
func NewMultiBlobStreamer(streamers ...BlobStreamer) BlobStreamer
NewMultiBlobStreamer concatenates multiple BlobStreamers into one.
type Cache interface {
blob.Fetcher
BlobReceiver
BlobStatter
}
Cache is the minimal interface expected of a blob cache.
type Config struct {
Writable bool
Readable bool
Deletable bool
CanLongPoll bool
URLBase string
HandlerFinder FindHandlerByTyper
}
type Configer interface {
Config() *Config
}
type FetcherEnumerator interface {
blob.Fetcher
BlobEnumerator
}
type FindHandlerByTyper interface {
FindHandlerByType(handlerType string) (prefix string, handler interface{}, err error)
AllHandlers() (map[string]string, map[string]interface{})
}
type GenerationNotSupportedError string
A GenerationNotSupportedError explains why a Storage
value implemented the Generationer interface but failed due
to a wrapped Storage value not implementing the interface.
func (GenerationNotSupportedError) Error
func (s GenerationNotSupportedError) Error() string
type Generationer interface {
StorageGeneration() (initTime time.Time, random string, err error)
ResetStorageGeneration() error
}
Generationer is an optional interface and an optimization and paranoia
facility for clients which can be implemented by Storage
implementations.
If the client sees the same random string in multiple upload sessions,
it assumes that the blobserver still has all the same blobs, and also
it's the same server. This mechanism is not fundamental to
Perkeep's operation: the client could also check each blob before
uploading, or enumerate all blobs from the server too. This is purely
an optimization so clients can mix this value into their "is this file
uploaded?" local cache keys.
type HandlerConstructor func(Loader, jsonconfig.Obj) (http.Handler, error)
A HandlerConstructor returns an http.Handler from a Loader
environment and a configuration.
type HandlerIniter interface {
InitHandler(FindHandlerByTyper) error
}
HandlerIniter is an optional interface which can be implemented
by Storage or http.Handlers (from StorageConstructor or HandlerConstructor)
to be called once all the handlers have been created.
type Loader interface {
FindHandlerByTyper
MyPrefix() string
BaseURL() string
GetHandlerType(prefix string) string
GetHandler(prefix string) (interface{}, error)
GetStorage(prefix string) (Storage, error)
}
type MaxEnumerateConfig interface {
Storage
MaxEnumerate() int
}
MaxEnumerateConfig is an optional interface implemented by Storage
interfaces to advertise their max value for how many items can
be enumerated at once.
type NoImplStorage struct{}
NoImplStorage is an implementation of Storage that returns a not
implemented error for all operations.
func (NoImplStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error
func (NoImplStorage) Fetch
func (NoImplStorage) Fetch(context.Context, blob.Ref) (file io.ReadCloser, size uint32, err error)
func (NoImplStorage) ReceiveBlob(context.Context, blob.Ref, io.Reader) (sb blob.SizedRef, err error)
func (NoImplStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error
func (NoImplStorage) StatBlobs
func (NoImplStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error
type ShutdownStorage interface {
Storage
io.Closer
}
ShutdownStorage is an optional interface for storage
implementations which can be asked to shut down
cleanly. Regardless, all implementations should be able to survive
crashes without data loss.
type StatReceiver interface {
BlobReceiver
BlobStatter
}
type Storage interface {
blob.Fetcher
BlobReceiver
BlobStatter
BlobEnumerator
BlobRemover
}
Storage is the interface that must be implemented by a blobserver
storage type. (e.g. localdisk, s3, encrypt, shard, replica, remote)
func CreateStorage(typ string, loader Loader, config jsonconfig.Obj) (Storage, error)
type StorageConfiger interface {
Storage
Configer
}
type StorageConstructor func(Loader, jsonconfig.Obj) (Storage, error)
A StorageConstructor returns a Storage implementation from a Loader
environment and a configuration.
type StorageHandler interface {
Storage
http.Handler
}
StorageHandler is a storage implementation that also exports an HTTP
status page.
type WholeRefFetcher interface {
OpenWholeRef(wholeRef blob.Ref, offset int64) (rc io.ReadCloser, wholeSize int64, err error)
}
WholeRefFetcher is an optional fast-path interface exposed by the
'blobpacked' blob storage implementation, which packs pieces of
files together and can efficiently serve them contigously.