1 /* 2 Copyright 2011 The Perkeep Authors 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 package blobserver 18 19 import ( 20 "context" 21 "errors" 22 "io" 23 "net/http" 24 "time" 25 26 "perkeep.org/pkg/blob" 27 "perkeep.org/pkg/constants" 28 ) 29 30 // MaxBlobSize is the size of a single blob in Perkeep. 31 const MaxBlobSize = constants.MaxBlobSize 32 33 var ( 34 ErrCorruptBlob = errors.New("corrupt blob; digest doesn't match") 35 36 // ErrNotImplemented should be returned in methods where the function is not implemented 37 ErrNotImplemented = errors.New("not implemented") 38 ) 39 40 // BlobReceiver is the interface for receiving blobs. 41 type BlobReceiver interface { 42 // ReceiveBlob accepts a newly uploaded blob and writes it to 43 // permanent storage. 44 // 45 // Implementations of BlobReceiver downstream of the HTTP 46 // server can trust that the source isn't larger than 47 // MaxBlobSize and that its digest matches the provided blob 48 // ref. (If not, the read of the source will fail before EOF) 49 // 50 // To ensure those guarantees, callers of ReceiveBlob should 51 // not call ReceiveBlob directly but instead use either 52 // blobserver.Receive or blobserver.ReceiveString, which also 53 // take care of notifying the BlobReceiver's "BlobHub" 54 // notification bus for observers. 55 ReceiveBlob(ctx context.Context, br blob.Ref, source io.Reader) (blob.SizedRef, error) 56 } 57 58 // BlobStatter is the interface for checking the size and existence of blobs. 59 type BlobStatter interface { 60 // Stat checks for the existence of blobs, calling fn in 61 // serial for each found blob, in any order, but with no 62 // duplicates. The blobs slice should not have duplicates. 63 // 64 // If fn returns an error, StatBlobs returns with that value 65 // and makes no further calls to fn. 66 // 67 // StatBlobs does not return an error on missing blobs, only 68 // on failure to stat blobs. 69 StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error 70 } 71 72 type StatReceiver interface { 73 BlobReceiver 74 BlobStatter 75 } 76 77 type BlobEnumerator interface { 78 // EnumerateBobs sends at most limit SizedBlobRef into dest, 79 // sorted, as long as they are lexigraphically greater than 80 // after (if provided). 81 // limit will be supplied and sanity checked by caller. 82 // EnumerateBlobs must close the channel. (even if limit 83 // was hit and more blobs remain, or an error is returned, or 84 // the ctx is canceled) 85 EnumerateBlobs(ctx context.Context, 86 dest chan<- blob.SizedRef, 87 after string, 88 limit int) error 89 90 // TODO: remove limit from this interface, since the caller 91 // can cancel? see if that would simplify implementations and 92 // callers. 93 } 94 95 // BlobAndToken is the value used by the BlobStreamer interface, 96 // containing both a Blob and a continuation token. 97 type BlobAndToken struct { 98 *blob.Blob 99 // Token is the continuation token to resume streaming 100 // starting at this blob in the future. 101 Token string 102 } 103 104 type BlobStreamer interface { 105 // BlobStream is an optional interface that may be implemented by 106 // Storage implementations. 107 // 108 // StreamBlobs sends blobs to dest in an unspecified order. It is 109 // expected that a Storage implementation implementing 110 // BlobStreamer will send blobs to dest in the most efficient 111 // order possible. 112 // 113 // The provided continuation token resumes the stream at a 114 // point. To start from the beginning, send the empty string. 115 // The token is opaque and must never be interpreted; its 116 // format may change between versions of the server. 117 // 118 // If the content is canceled, the error value is 119 // context.Canceled. 120 // 121 // StreamBlobs must unconditionally close dest before 122 // returning, and it must return context.Canceled if 123 // ctx.Done() becomes readable. 124 // 125 // When StreamBlobs reaches the end, the return value is nil. 126 StreamBlobs(ctx context.Context, dest chan<- BlobAndToken, contToken string) error 127 } 128 129 // Cache is the minimal interface expected of a blob cache. 130 type Cache interface { 131 blob.Fetcher 132 BlobReceiver 133 BlobStatter 134 } 135 136 type BlobReceiveConfiger interface { 137 BlobReceiver 138 Configer 139 } 140 141 type Config struct { 142 Writable bool 143 Readable bool 144 Deletable bool 145 CanLongPoll bool 146 147 // the "http://host:port" and optional path (but without trailing slash) to have "/camli/*" appended 148 URLBase string 149 HandlerFinder FindHandlerByTyper 150 } 151 152 type BlobRemover interface { 153 // RemoveBlobs removes 0 or more blobs. Removal of 154 // non-existent items isn't an error. Returns failure if any 155 // items existed but failed to be deleted. 156 // ErrNotImplemented may be returned for storage types not implementing removal. 157 // If RemoveBlobs returns an error, it's possible that either 158 // none or only some of the blobs were deleted. 159 RemoveBlobs(ctx context.Context, blobs []blob.Ref) error 160 } 161 162 // Storage is the interface that must be implemented by a blobserver 163 // storage type. (e.g. localdisk, s3, encrypt, shard, replica, remote) 164 type Storage interface { 165 blob.Fetcher 166 BlobReceiver 167 BlobStatter 168 BlobEnumerator 169 BlobRemover 170 } 171 172 type FetcherEnumerator interface { 173 blob.Fetcher 174 BlobEnumerator 175 } 176 177 // StorageHandler is a storage implementation that also exports an HTTP 178 // status page. 179 type StorageHandler interface { 180 Storage 181 http.Handler 182 } 183 184 // ShutdownStorage is an optional interface for storage 185 // implementations which can be asked to shut down 186 // cleanly. Regardless, all implementations should be able to survive 187 // crashes without data loss. 188 type ShutdownStorage interface { 189 Storage 190 io.Closer 191 } 192 193 // WholeRefFetcher is an optional fast-path interface exposed by the 194 // 'blobpacked' blob storage implementation, which packs pieces of 195 // files together and can efficiently serve them contigously. 196 type WholeRefFetcher interface { 197 // OpenWholeRef returns a ReadCloser reading from offset bytes 198 // into wholeRef (the blobref of an entire file). 199 // 200 // The returned wholeSize is the size of the file, without 201 // subtracting any offset. 202 // 203 // The err will be os.ErrNotExist if the wholeref is not 204 // known. 205 OpenWholeRef(wholeRef blob.Ref, offset int64) (rc io.ReadCloser, wholeSize int64, err error) 206 } 207 208 // A GenerationNotSupportedError explains why a Storage 209 // value implemented the Generationer interface but failed due 210 // to a wrapped Storage value not implementing the interface. 211 type GenerationNotSupportedError string 212 213 func (s GenerationNotSupportedError) Error() string { return string(s) } 214 215 /* 216 Generationer is an optional interface and an optimization and paranoia 217 facility for clients which can be implemented by Storage 218 implementations. 219 220 If the client sees the same random string in multiple upload sessions, 221 it assumes that the blobserver still has all the same blobs, and also 222 it's the same server. This mechanism is not fundamental to 223 Perkeep's operation: the client could also check each blob before 224 uploading, or enumerate all blobs from the server too. This is purely 225 an optimization so clients can mix this value into their "is this file 226 uploaded?" local cache keys. 227 */ 228 type Generationer interface { 229 // Generation returns a Storage's initialization time and 230 // and unique random string (or UUID). Implementations 231 // should call ResetStorageGeneration on demand if no 232 // information is known. 233 // The error will be of type GenerationNotSupportedError if an underlying 234 // storage target doesn't support the Generationer interface. 235 StorageGeneration() (initTime time.Time, random string, err error) 236 237 // ResetGeneration deletes the information returned by Generation 238 // and re-generates it. 239 ResetStorageGeneration() error 240 } 241 242 type Configer interface { 243 Config() *Config 244 } 245 246 type StorageConfiger interface { 247 Storage 248 Configer 249 } 250 251 // MaxEnumerateConfig is an optional interface implemented by Storage 252 // interfaces to advertise their max value for how many items can 253 // be enumerated at once. 254 type MaxEnumerateConfig interface { 255 Storage 256 257 // MaxEnumerate returns the max that this storage interface is 258 // capable of enumerating at once. 259 MaxEnumerate() int 260 }