1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
40 package overlay
41
42 import (
43 "context"
44 "errors"
45 "fmt"
46 "io"
47 "log"
48 "os"
49 "time"
50
51 "go4.org/jsonconfig"
52
53 "perkeep.org/pkg/blob"
54 "perkeep.org/pkg/blobserver"
55 "perkeep.org/pkg/sorted"
56 )
57
58 func init() {
59 blobserver.RegisterStorageConstructor("overlay", blobserver.StorageConstructor(newFromConfig))
60 }
61
62
63 type readOnlyStorage interface {
64 blob.Fetcher
65 blobserver.BlobEnumerator
66 blobserver.BlobStatter
67 }
68
69 type overlayStorage struct {
70 lower readOnlyStorage
71
72
73 deleted sorted.KeyValue
74
75
76 upper blobserver.Storage
77 }
78
79 func newFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (blobserver.Storage, error) {
80 var (
81 lowerPrefix = conf.RequiredString("lower")
82 upperPrefix = conf.RequiredString("upper")
83 deletedConf = conf.OptionalObject("deleted")
84 )
85 if err := conf.Validate(); err != nil {
86 return nil, err
87 }
88
89 lower, err := ld.GetStorage(lowerPrefix)
90 if err != nil {
91 return nil, fmt.Errorf("failed to load lower at %s: %w", lowerPrefix, err)
92 }
93 upper, err := ld.GetStorage(upperPrefix)
94 if err != nil {
95 return nil, fmt.Errorf("failed to load upper at %s: %w", upperPrefix, err)
96 }
97 var deleted sorted.KeyValue
98 if len(deletedConf) != 0 {
99 deleted, err = sorted.NewKeyValueMaybeWipe(deletedConf)
100 if err != nil {
101 return nil, fmt.Errorf("failed to setup deleted: %w", err)
102 }
103 }
104
105 sto := &overlayStorage{
106 lower: lower,
107 upper: upper,
108 deleted: deleted,
109 }
110
111 return sto, nil
112 }
113
114 func (sto *overlayStorage) Close() error {
115 if sto.deleted == nil {
116 return nil
117 }
118 return sto.deleted.Close()
119 }
120
121
122 func (sto *overlayStorage) ReceiveBlob(ctx context.Context, br blob.Ref, src io.Reader) (sb blob.SizedRef, err error) {
123 sb, err = sto.upper.ReceiveBlob(ctx, br, src)
124 if err == nil && sto.deleted != nil {
125 err = sto.deleted.Delete(br.String())
126 }
127 return sb, err
128 }
129
130
131 func (sto *overlayStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
132 if sto.deleted == nil {
133 return blobserver.ErrNotImplemented
134 }
135
136 err := sto.upper.RemoveBlobs(ctx, blobs)
137 if err != nil {
138 return err
139 }
140
141 m := sto.deleted.BeginBatch()
142 for _, br := range blobs {
143 m.Set(br.String(), "1")
144 }
145 return sto.deleted.CommitBatch(m)
146 }
147
148 func (sto *overlayStorage) isDeleted(br blob.Ref) bool {
149 if sto.deleted == nil {
150 return false
151 }
152
153 _, err := sto.deleted.Get(br.String())
154 if err == nil {
155 return true
156 }
157
158 if !errors.Is(err, sorted.ErrNotFound) {
159 log.Printf("overlayStorage error accessing deleted: %v", err)
160 }
161
162 return false
163 }
164
165
166
167 func (sto *overlayStorage) Fetch(ctx context.Context, br blob.Ref) (file io.ReadCloser, size uint32, err error) {
168 if sto.isDeleted(br) {
169 return nil, 0, os.ErrNotExist
170 }
171
172 file, size, err = sto.upper.Fetch(ctx, br)
173 if !errors.Is(err, os.ErrNotExist) {
174 return file, size, err
175 }
176
177 return sto.lower.Fetch(ctx, br)
178 }
179
180
181 func (sto *overlayStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, f func(blob.SizedRef) error) error {
182 exists := make([]blob.Ref, 0, len(blobs))
183 for _, br := range blobs {
184 if !sto.isDeleted(br) {
185 exists = append(exists, br)
186 }
187 }
188
189 seen := make(map[blob.Ref]struct{}, len(exists))
190
191 err := sto.upper.StatBlobs(ctx, exists, func(sbr blob.SizedRef) error {
192 seen[sbr.Ref] = struct{}{}
193 return f(sbr)
194 })
195
196 if err != nil {
197 return err
198 }
199
200 lowerBlobs := make([]blob.Ref, 0, len(exists))
201 for _, br := range exists {
202 if _, s := seen[br]; !s {
203 lowerBlobs = append(lowerBlobs, br)
204 }
205 }
206
207 return sto.lower.StatBlobs(ctx, lowerBlobs, f)
208 }
209
210
211 func (sto *overlayStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
212 defer close(dest)
213
214 enums := []blobserver.BlobEnumerator{sto.lower, sto.upper}
215
216
217 sent := 0
218 for sent < limit {
219 ch := make(chan blob.SizedRef)
220 errch := make(chan error, 1)
221 go func() {
222 errch <- blobserver.MergedEnumerate(ctx, ch, enums, after, limit-sent)
223 }()
224
225 var last blob.Ref
226
227
228 seen := 0
229 for sbr := range ch {
230 seen++
231 if !sto.isDeleted(sbr.Ref) {
232 log.Println(sent, sbr.Ref)
233 dest <- sbr
234 sent++
235 }
236 last = sbr.Ref
237 }
238
239 if err := <-errch; err != nil {
240 return err
241 }
242
243
244 if seen == 0 {
245 return nil
246 }
247
248
249 after = last.String()
250 }
251
252 return nil
253 }
254
255 func (sto *overlayStorage) StorageGeneration() (initTime time.Time, random string, err error) {
256 if gener, ok := sto.upper.(blobserver.Generationer); ok {
257 return gener.StorageGeneration()
258 }
259 err = blobserver.GenerationNotSupportedError(fmt.Sprintf("blobserver.Generationer not implemented on %T", sto.upper))
260 return
261 }
262
263 func (sto *overlayStorage) ResetStorageGeneration() error {
264 if gener, ok := sto.upper.(blobserver.Generationer); ok {
265 return gener.ResetStorageGeneration()
266 }
267 return blobserver.GenerationNotSupportedError(fmt.Sprintf("blobserver.Generationer not implemented on %T", sto.upper))
268 }