1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17
18
19 package memory
20
21 import (
22 "bytes"
23 "context"
24 "fmt"
25 "io"
26 "os"
27 "sort"
28 "sync"
29 "sync/atomic"
30
31 "perkeep.org/internal/lru"
32 "perkeep.org/pkg/blob"
33 "perkeep.org/pkg/blobserver"
34
35 "go4.org/jsonconfig"
36 "go4.org/types"
37 )
38
39
40
41
42
43
44 type Storage struct {
45 maxSize int64
46
47 mu sync.RWMutex
48 m map[blob.Ref][]byte
49 size int64
50
51
52
53 lru *lru.Cache
54
55 blobsFetched int64
56 bytesFetched int64
57 }
58
59 var _ blobserver.BlobStreamer = (*Storage)(nil)
60
61 func init() {
62 blobserver.RegisterStorageConstructor("memory", blobserver.StorageConstructor(newFromConfig))
63 }
64
65 func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) {
66 if err := config.Validate(); err != nil {
67 return nil, err
68 }
69 return &Storage{}, nil
70 }
71
72
73
74 func NewCache(size int64) *Storage {
75 return &Storage{
76 maxSize: size,
77 lru: lru.New(0),
78 }
79 }
80
81 func (s *Storage) Fetch(ctx context.Context, ref blob.Ref) (file io.ReadCloser, size uint32, err error) {
82 s.mu.RLock()
83 defer s.mu.RUnlock()
84 if s.lru != nil {
85 s.lru.Get(ref.String())
86 }
87 if s.m == nil {
88 err = os.ErrNotExist
89 return
90 }
91 b, ok := s.m[ref]
92 if !ok {
93 err = os.ErrNotExist
94 return
95 }
96 size = uint32(len(b))
97 atomic.AddInt64(&s.blobsFetched, 1)
98 atomic.AddInt64(&s.bytesFetched, int64(len(b)))
99
100 return struct {
101 *io.SectionReader
102 io.Closer
103 }{
104 io.NewSectionReader(bytes.NewReader(b), 0, int64(size)),
105 types.NopCloser,
106 }, size, nil
107 }
108
109 func (s *Storage) SubFetch(ctx context.Context, ref blob.Ref, offset, length int64) (io.ReadCloser, error) {
110 if offset < 0 || length < 0 {
111 return nil, blob.ErrNegativeSubFetch
112 }
113 s.mu.RLock()
114 defer s.mu.RUnlock()
115 b, ok := s.m[ref]
116 if !ok {
117 return nil, os.ErrNotExist
118 }
119 if offset > int64(len(b)) {
120 return nil, blob.ErrOutOfRangeOffsetSubFetch
121 }
122 atomic.AddInt64(&s.blobsFetched, 1)
123 atomic.AddInt64(&s.bytesFetched, length)
124
125 return struct {
126 *io.SectionReader
127 io.Closer
128 }{
129 io.NewSectionReader(bytes.NewReader(b), offset, int64(length)),
130 types.NopCloser,
131 }, nil
132 }
133
134 func (s *Storage) ReceiveBlob(ctx context.Context, br blob.Ref, source io.Reader) (blob.SizedRef, error) {
135 sb := blob.SizedRef{}
136 h := br.Hash()
137 if h == nil {
138 return sb, fmt.Errorf("Unsupported blobref hash for %s", br)
139 }
140 all, err := io.ReadAll(io.TeeReader(source, h))
141 if err != nil {
142 return sb, err
143 }
144 if !br.HashMatches(h) {
145
146
147
148 return sb, fmt.Errorf("Hash mismatch receiving blob %s", br)
149 }
150 s.mu.Lock()
151 defer s.mu.Unlock()
152 if s.m == nil {
153 s.m = make(map[blob.Ref][]byte)
154 }
155 _, had := s.m[br]
156 if !had {
157 s.m[br] = all
158 if s.lru != nil {
159 s.lru.Add(br.String(), nil)
160 }
161 s.size += int64(len(all))
162 for s.maxSize != 0 && s.size > s.maxSize {
163 if key, _ := s.lru.RemoveOldest(); key != "" {
164 s.removeBlobLocked(blob.MustParse(key))
165 } else {
166 break
167 }
168 }
169 }
170 return blob.SizedRef{Ref: br, Size: uint32(len(all))}, nil
171 }
172
173 func (s *Storage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
174 for _, br := range blobs {
175 s.mu.RLock()
176 b, ok := s.m[br]
177 s.mu.RUnlock()
178 if ok {
179 if err := fn(blob.SizedRef{Ref: br, Size: uint32(len(b))}); err != nil {
180 return err
181 }
182 }
183 }
184 return nil
185 }
186
187 func (s *Storage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
188 defer close(dest)
189 s.mu.RLock()
190 defer s.mu.RUnlock()
191
192
193
194
195
196
197
198 sorted := make([]blob.Ref, 0, len(s.m))
199 for br := range s.m {
200 sorted = append(sorted, br)
201 }
202 sort.Sort(blob.ByRef(sorted))
203
204 n := 0
205 for _, br := range sorted {
206 if after != "" && br.String() <= after {
207 continue
208 }
209 select {
210 case dest <- blob.SizedRef{Ref: br, Size: uint32(len(s.m[br]))}:
211 case <-ctx.Done():
212 return ctx.Err()
213 }
214 n++
215 if limit > 0 && n == limit {
216 break
217 }
218 }
219 return nil
220 }
221
222 func (s *Storage) StreamBlobs(ctx context.Context, dest chan<- blobserver.BlobAndToken, contToken string) error {
223
224 defer close(dest)
225 s.mu.RLock()
226 defer s.mu.RUnlock()
227
228 sorted := make([]blob.Ref, 0, len(s.m))
229 for br := range s.m {
230 sorted = append(sorted, br)
231 }
232 sort.Sort(blob.ByRef(sorted))
233
234 for _, br := range sorted {
235 if br.String() < contToken {
236 continue
237 }
238 contents := s.m[br]
239 select {
240 case <-ctx.Done():
241 return ctx.Err()
242 case dest <- blobserver.BlobAndToken{
243 Blob: blob.NewBlob(br, uint32(len(contents)), func(ctx context.Context) ([]byte, error) {
244 return contents, nil
245 }),
246 Token: br.String(),
247 }:
248 }
249 }
250 return nil
251 }
252
253 func (s *Storage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
254 s.mu.Lock()
255 defer s.mu.Unlock()
256 for _, br := range blobs {
257 s.removeBlobLocked(br)
258 }
259 return nil
260 }
261
262 func (s *Storage) removeBlobLocked(br blob.Ref) {
263 v, had := s.m[br]
264 if !had {
265 return
266 }
267 s.size -= int64(len(v))
268 delete(s.m, br)
269 }
270
271
272
273
274
275 func (s *Storage) BlobContents(br blob.Ref) (contents string, ok bool) {
276 s.mu.RLock()
277 defer s.mu.RUnlock()
278 b, ok := s.m[br]
279 if !ok {
280 return
281 }
282 return string(b), true
283 }
284
285
286 func (s *Storage) NumBlobs() int {
287 s.mu.RLock()
288 defer s.mu.RUnlock()
289 return len(s.m)
290 }
291
292
293 func (s *Storage) SumBlobSize() int64 {
294 s.mu.RLock()
295 defer s.mu.RUnlock()
296 return s.size
297 }
298
299
300 func (s *Storage) BlobrefStrings() []string {
301 s.mu.RLock()
302 defer s.mu.RUnlock()
303 sorted := make([]string, 0, len(s.m))
304 for br := range s.m {
305 sorted = append(sorted, br.String())
306 }
307 sort.Strings(sorted)
308 return sorted
309 }
310
311
312 func (s *Storage) Stats() (blobsFetched, bytesFetched int64) {
313 return atomic.LoadInt64(&s.blobsFetched), atomic.LoadInt64(&s.bytesFetched)
314 }