1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
40 package overlay
41
42 import (
43 "context"
44 "fmt"
45 "io"
46 "log"
47 "os"
48 "time"
49
50 "go4.org/jsonconfig"
51 "perkeep.org/pkg/blob"
52 "perkeep.org/pkg/blobserver"
53 "perkeep.org/pkg/sorted"
54 )
55
56 func init() {
57 blobserver.RegisterStorageConstructor("overlay", blobserver.StorageConstructor(newFromConfig))
58 }
59
60
61 type readOnlyStorage interface {
62 blob.Fetcher
63 blobserver.BlobEnumerator
64 blobserver.BlobStatter
65 }
66
67 type overlayStorage struct {
68 lower readOnlyStorage
69
70
71 deleted sorted.KeyValue
72
73
74 upper blobserver.Storage
75 }
76
77 func newFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (blobserver.Storage, error) {
78 var (
79 lowerPrefix = conf.RequiredString("lower")
80 upperPrefix = conf.RequiredString("upper")
81 deletedConf = conf.OptionalObject("deleted")
82 )
83 if err := conf.Validate(); err != nil {
84 return nil, err
85 }
86
87 lower, err := ld.GetStorage(lowerPrefix)
88 if err != nil {
89 return nil, fmt.Errorf("failed to load lower at %s: %v", lowerPrefix, err)
90 }
91 upper, err := ld.GetStorage(upperPrefix)
92 if err != nil {
93 return nil, fmt.Errorf("failed to load upper at %s: %v", upperPrefix, err)
94 }
95 var deleted sorted.KeyValue
96 if len(deletedConf) != 0 {
97 deleted, err = sorted.NewKeyValueMaybeWipe(deletedConf)
98 if err != nil {
99 return nil, fmt.Errorf("failed to setup deleted: %v", err)
100 }
101 }
102
103 sto := &overlayStorage{
104 lower: lower,
105 upper: upper,
106 deleted: deleted,
107 }
108
109 return sto, nil
110 }
111
112 func (sto *overlayStorage) Close() error {
113 if sto.deleted == nil {
114 return nil
115 }
116 return sto.deleted.Close()
117 }
118
119
120 func (sto *overlayStorage) ReceiveBlob(ctx context.Context, br blob.Ref, src io.Reader) (sb blob.SizedRef, err error) {
121 sb, err = sto.upper.ReceiveBlob(ctx, br, src)
122 if err == nil && sto.deleted != nil {
123 err = sto.deleted.Delete(br.String())
124 }
125 return sb, err
126 }
127
128
129 func (sto *overlayStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
130 if sto.deleted == nil {
131 return blobserver.ErrNotImplemented
132 }
133
134 err := sto.upper.RemoveBlobs(ctx, blobs)
135 if err != nil {
136 return err
137 }
138
139 m := sto.deleted.BeginBatch()
140 for _, br := range blobs {
141 m.Set(br.String(), "1")
142 }
143 return sto.deleted.CommitBatch(m)
144 }
145
146 func (sto *overlayStorage) isDeleted(br blob.Ref) bool {
147 if sto.deleted == nil {
148 return false
149 }
150
151 _, err := sto.deleted.Get(br.String())
152 if err == nil {
153 return true
154 }
155
156 if err != sorted.ErrNotFound {
157 log.Printf("overlayStorage error accessing deleted: %v", err)
158 }
159
160 return false
161 }
162
163
164
165 func (sto *overlayStorage) Fetch(ctx context.Context, br blob.Ref) (file io.ReadCloser, size uint32, err error) {
166 if sto.isDeleted(br) {
167 return nil, 0, os.ErrNotExist
168 }
169
170 file, size, err = sto.upper.Fetch(ctx, br)
171 if err != os.ErrNotExist {
172 return file, size, err
173 }
174
175 return sto.lower.Fetch(ctx, br)
176 }
177
178
179 func (sto *overlayStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, f func(blob.SizedRef) error) error {
180 exists := make([]blob.Ref, 0, len(blobs))
181 for _, br := range blobs {
182 if !sto.isDeleted(br) {
183 exists = append(exists, br)
184 }
185 }
186
187 seen := make(map[blob.Ref]struct{}, len(exists))
188
189 err := sto.upper.StatBlobs(ctx, exists, func(sbr blob.SizedRef) error {
190 seen[sbr.Ref] = struct{}{}
191 return f(sbr)
192 })
193
194 if err != nil {
195 return err
196 }
197
198 lowerBlobs := make([]blob.Ref, 0, len(exists))
199 for _, br := range exists {
200 if _, s := seen[br]; !s {
201 lowerBlobs = append(lowerBlobs, br)
202 }
203 }
204
205 return sto.lower.StatBlobs(ctx, lowerBlobs, f)
206 }
207
208
209 func (sto *overlayStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
210 defer close(dest)
211
212 enums := []blobserver.BlobEnumerator{sto.lower, sto.upper}
213
214
215 sent := 0
216 for sent < limit {
217 ch := make(chan blob.SizedRef)
218 errch := make(chan error, 1)
219 go func() {
220 errch <- blobserver.MergedEnumerate(ctx, ch, enums, after, limit-sent)
221 }()
222
223 var last blob.Ref
224
225
226 seen := 0
227 for sbr := range ch {
228 seen++
229 if !sto.isDeleted(sbr.Ref) {
230 log.Println(sent, sbr.Ref)
231 dest <- sbr
232 sent++
233 }
234 last = sbr.Ref
235 }
236
237 if err := <-errch; err != nil {
238 return err
239 }
240
241
242 if seen == 0 {
243 return nil
244 }
245
246
247 after = last.String()
248 }
249
250 return nil
251 }
252
253 func (sto *overlayStorage) StorageGeneration() (initTime time.Time, random string, err error) {
254 if gener, ok := sto.upper.(blobserver.Generationer); ok {
255 return gener.StorageGeneration()
256 }
257 err = blobserver.GenerationNotSupportedError(fmt.Sprintf("blobserver.Generationer not implemented on %T", sto.upper))
258 return
259 }
260
261 func (sto *overlayStorage) ResetStorageGeneration() error {
262 if gener, ok := sto.upper.(blobserver.Generationer); ok {
263 return gener.ResetStorageGeneration()
264 }
265 return blobserver.GenerationNotSupportedError(fmt.Sprintf("blobserver.Generationer not implemented on %T", sto.upper))
266 }