1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
35 package proxycache
36
37 import (
38 "bytes"
39 "context"
40 "io"
41 "log"
42 "os"
43 "sync"
44
45 "go4.org/jsonconfig"
46 "go4.org/syncutil"
47 "perkeep.org/internal/lru"
48 "perkeep.org/pkg/blob"
49 "perkeep.org/pkg/blobserver"
50 )
51
52
53 type Storage struct {
54 origin blobserver.Storage
55 cache blobserver.Storage
56
57 debug bool
58 maxCacheBytes int64
59
60 mu sync.Mutex
61 lru *lru.Cache
62 cacheBytes int64
63 }
64
65 var (
66 _ blobserver.Storage = (*Storage)(nil)
67 _ blob.SubFetcher = (*Storage)(nil)
68
69
70 )
71
72
73
74 func New(maxBytes int64, cache, origin blobserver.Storage) *Storage {
75 sto := &Storage{
76 origin: origin,
77 cache: cache,
78 lru: lru.NewUnlocked(0),
79 maxCacheBytes: maxBytes,
80 }
81 return sto
82 }
83
84 func init() {
85 blobserver.RegisterStorageConstructor("proxycache", blobserver.StorageConstructor(newFromConfig))
86 }
87
88 func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) {
89 var (
90 origin = config.RequiredString("origin")
91 cache = config.RequiredString("cache")
92 maxCacheBytes = config.OptionalInt64("maxCacheBytes", 512<<20)
93 )
94 if err := config.Validate(); err != nil {
95 return nil, err
96 }
97 cacheSto, err := ld.GetStorage(cache)
98 if err != nil {
99 return nil, err
100 }
101 originSto, err := ld.GetStorage(origin)
102 if err != nil {
103 return nil, err
104 }
105 return New(maxCacheBytes, cacheSto, originSto), nil
106 }
107
108
109
110 func (sto *Storage) removeOldest() bool {
111 ctx := context.TODO()
112 k, v := sto.lru.RemoveOldest()
113 if v == nil {
114 return false
115 }
116 sb := v.(blob.SizedRef)
117
118
119 err := sto.cache.RemoveBlobs(ctx, []blob.Ref{sb.Ref})
120 if err != nil {
121 log.Printf("proxycache: could not remove oldest blob %v (%d bytes): %v", sb.Ref, sb.Size, err)
122 sto.lru.Add(k, v)
123 return false
124 }
125 if sto.debug {
126 log.Printf("proxycache: removed blob %v (%d bytes)", sb.Ref, sb.Size)
127 }
128 sto.cacheBytes -= int64(sb.Size)
129 return true
130 }
131
132 func (sto *Storage) touch(sb blob.SizedRef) {
133 key := sb.Ref.String()
134
135 sto.mu.Lock()
136 defer sto.mu.Unlock()
137
138 _, old := sto.lru.Get(key)
139 if !old {
140 sto.lru.Add(key, sb)
141 sto.cacheBytes += int64(sb.Size)
142
143
144 for sto.cacheBytes > sto.maxCacheBytes {
145 if !sto.removeOldest() {
146 break
147 }
148 }
149 }
150 }
151
152 func (sto *Storage) Fetch(ctx context.Context, b blob.Ref) (rc io.ReadCloser, size uint32, err error) {
153 rc, size, err = sto.cache.Fetch(ctx, b)
154 if err == nil {
155 sto.touch(blob.SizedRef{Ref: b, Size: size})
156 return
157 }
158 if err != os.ErrNotExist {
159 log.Printf("warning: proxycache cache fetch error for %v: %v", b, err)
160 }
161 rc, size, err = sto.origin.Fetch(ctx, b)
162 if err != nil {
163 return
164 }
165 all, err := io.ReadAll(rc)
166 if err != nil {
167 return
168 }
169 if _, err := blobserver.Receive(ctx, sto.cache, b, bytes.NewReader(all)); err != nil {
170 log.Printf("populating proxycache cache for %v: %v", b, err)
171 } else {
172 sto.touch(blob.SizedRef{Ref: b, Size: size})
173 }
174 return io.NopCloser(bytes.NewReader(all)), size, nil
175 }
176
177 func (sto *Storage) SubFetch(ctx context.Context, ref blob.Ref, offset, length int64) (io.ReadCloser, error) {
178 if sf, ok := sto.cache.(blob.SubFetcher); ok {
179 rc, err := sf.SubFetch(ctx, ref, offset, length)
180 if err == nil {
181 return rc, nil
182 }
183 if err != os.ErrNotExist && err != blob.ErrUnimplemented {
184 log.Printf("proxycache: error fetching from cache %T: %v", sto.cache, err)
185 }
186 }
187 if sf, ok := sto.origin.(blob.SubFetcher); ok {
188 return sf.SubFetch(ctx, ref, offset, length)
189 }
190 return nil, blob.ErrUnimplemented
191 }
192
193 func (sto *Storage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
194 need := map[blob.Ref]bool{}
195 for _, br := range blobs {
196 need[br] = true
197 }
198 err := sto.cache.StatBlobs(ctx, blobs, func(sb blob.SizedRef) error {
199 sto.touch(sb)
200 delete(need, sb.Ref)
201 return fn(sb)
202 })
203 if err != nil {
204 return err
205 }
206 if len(need) == 0 {
207
208 return nil
209 }
210
211 blobs = make([]blob.Ref, 0, len(need))
212 for br := range need {
213 blobs = append(blobs, br)
214 }
215 return sto.origin.StatBlobs(ctx, blobs, func(sb blob.SizedRef) error {
216 sto.touch(sb)
217 return fn(sb)
218 })
219 }
220
221 func (sto *Storage) ReceiveBlob(ctx context.Context, br blob.Ref, src io.Reader) (blob.SizedRef, error) {
222
223 var buf bytes.Buffer
224 if _, err := io.Copy(&buf, src); err != nil {
225 return blob.SizedRef{}, err
226 }
227
228 sb, err := sto.origin.ReceiveBlob(ctx, br, bytes.NewReader(buf.Bytes()))
229 if err != nil {
230 return sb, err
231 }
232
233 if _, err := sto.cache.ReceiveBlob(ctx, br, bytes.NewReader(buf.Bytes())); err != nil {
234 log.Printf("proxycache: ignoring error populating blob %v in cache: %v", br, err)
235 } else {
236 sto.touch(sb)
237 }
238 return sb, err
239 }
240
241 func (sto *Storage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
242 var gr syncutil.Group
243 gr.Go(func() error {
244 return sto.cache.RemoveBlobs(ctx, blobs)
245 })
246 gr.Go(func() error {
247 return sto.origin.RemoveBlobs(ctx, blobs)
248 })
249 gr.Wait()
250 return gr.Err()
251 }
252
253 func (sto *Storage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
254 return sto.origin.EnumerateBlobs(ctx, dest, after, limit)
255 }