1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package encrypt
18
19 import (
20 "bytes"
21 "container/heap"
22 "context"
23 "errors"
24 "fmt"
25 "io"
26 "log"
27 "os"
28 "sort"
29 "strconv"
30 "strings"
31 "sync"
32
33 "perkeep.org/internal/pools"
34 "perkeep.org/pkg/blob"
35 "perkeep.org/pkg/blobserver"
36 "perkeep.org/pkg/sorted"
37 )
38
39
40
41
42
43
44 const (
45
46 FullMetaBlobSize = 10 * 1000
47
48 SmallMetaCountLimit = 100
49 )
50
51 type metaBlob struct {
52 br blob.Ref
53 plains []blob.Ref
54 }
55
56
57
58 type metaBlobHeap struct {
59 sync.Mutex
60 s []*metaBlob
61 }
62
63 var _ heap.Interface = (*metaBlobHeap)(nil)
64
65 func (h *metaBlobHeap) Push(x interface{}) {
66 h.s = append(h.s, x.(*metaBlob))
67 }
68
69 func (h *metaBlobHeap) Pop() interface{} {
70 l := len(h.s)
71 v := h.s[l-1]
72 h.s = h.s[:l-1]
73 return v
74 }
75
76 func (h *metaBlobHeap) Less(i, j int) bool {
77 return len(h.s[i].plains) < len(h.s[j].plains)
78 }
79
80 func (h *metaBlobHeap) Len() int { return len(h.s) }
81 func (h *metaBlobHeap) Swap(i, j int) { h.s[i], h.s[j] = h.s[j], h.s[i] }
82
83 func (s *storage) recordMeta(b *metaBlob) {
84 if len(b.plains) > FullMetaBlobSize {
85 return
86 }
87
88 s.smallMeta.Lock()
89 defer s.smallMeta.Unlock()
90 heap.Push(s.smallMeta, b)
91
92
93
94 if s.smallMeta.Len() > SmallMetaCountLimit {
95 var plains, toDelete []blob.Ref
96 for s.smallMeta.Len() > 0 {
97 meta := heap.Pop(s.smallMeta).(*metaBlob)
98 plains = append(plains, meta.plains...)
99 toDelete = append(toDelete, meta.br)
100 if len(plains) > FullMetaBlobSize {
101 go s.makePackedMetaBlob(plains, toDelete)
102 plains, toDelete = nil, nil
103 }
104 }
105 if len(toDelete) == 1 {
106 heap.Push(s.smallMeta, &metaBlob{br: toDelete[0], plains: plains})
107 } else if len(toDelete) > 1 {
108 go s.makePackedMetaBlob(plains, toDelete)
109 }
110 }
111 }
112
113 func (s *storage) makePackedMetaBlob(plains, toDelete []blob.Ref) {
114 ctx := context.Background()
115
116 sort.Sort(blob.ByRef(plains))
117
118 metaBytes := pools.BytesBuffer()
119 defer pools.PutBuffer(metaBytes)
120
121 metaBytes.WriteString("#camlistore/encmeta=2\n")
122 metaBytes.Grow(len(plains[0].String()) * len(plains) * 2)
123 for _, plain := range plains {
124 p := plain.String()
125 metaBytes.WriteString(p)
126 metaBytes.WriteString("/")
127 v, err := s.index.Get(p)
128 if err != nil {
129 log.Printf("encrypt: failed to find the index entry for %s while packing: %v", p, err)
130 return
131 }
132 metaBytes.WriteString(v)
133 metaBytes.WriteString("\n")
134 }
135
136 encBytes := pools.BytesBuffer()
137 defer pools.PutBuffer(encBytes)
138
139 if err := s.encryptBlob(encBytes, metaBytes); err != nil {
140 log.Printf("encrypt: failed to encrypt meta: %v", err)
141 return
142 }
143
144 metaBR := blob.RefFromBytes(encBytes.Bytes())
145 metaSB, err := blobserver.ReceiveNoHash(ctx, s.meta, metaBR, encBytes)
146 if err != nil {
147 log.Printf("encrypt: failed to upload a packed meta: %v", err)
148 return
149 }
150 if len(plains) < FullMetaBlobSize {
151 s.recordMeta(&metaBlob{br: metaSB.Ref, plains: plains})
152 }
153 if err := s.meta.RemoveBlobs(ctx, toDelete); err != nil {
154 log.Printf("encrypt: failed to delete small meta blobs: %v", err)
155 }
156 log.Printf("encrypt: packed %d small meta blobs into one (%d refs)", len(toDelete), len(plains))
157 }
158
159
160 func (s *storage) makeSingleMetaBlob(plainBR, encBR blob.Ref, plainSize uint32) ([]byte, error) {
161
162 encBytes := bytes.NewBuffer(nil)
163
164 plain := fmt.Sprintf("#camlistore/encmeta=2\n%s/%d/%s\n", plainBR, plainSize, encBR)
165 if err := s.encryptBlob(encBytes, bytes.NewBufferString(plain)); err != nil {
166 return nil, err
167 }
168
169 return encBytes.Bytes(), nil
170 }
171
172 func packIndexEntry(plainSize uint32, encBR blob.Ref) string {
173 return fmt.Sprintf("%d/%s", plainSize, encBR)
174 }
175
176 func unpackIndexEntry(s string) (plainSize uint32, encBR blob.Ref, err error) {
177 parts := strings.Split(s, "/")
178 if len(parts) != 2 {
179 err = fmt.Errorf("malformed index entry %q", s)
180 return
181 }
182 size, err := strconv.ParseUint(parts[0], 10, 32)
183 if err != nil {
184 err = fmt.Errorf("malformed index entry %q: %s", s, err)
185 return
186 }
187 plainSize = uint32(size)
188 encBR = blob.ParseOrZero(parts[1])
189 if !encBR.Valid() {
190 err = fmt.Errorf("malformed index entry %q: %s", s, err)
191 }
192 return
193 }
194
195
196 func (s *storage) fetchMeta(ctx context.Context, b blob.Ref) (plainSize uint32, encBR blob.Ref, err error) {
197 v, err := s.index.Get(b.String())
198 if err == sorted.ErrNotFound {
199 err = os.ErrNotExist
200 }
201 if err != nil {
202 return 0, blob.Ref{}, err
203 }
204 return unpackIndexEntry(v)
205 }
206
207
208
209
210
211 func (s *storage) processEncryptedMetaBlob(br blob.Ref, dat []byte) error {
212 plainBytes := pools.BytesBuffer()
213 defer pools.PutBuffer(plainBytes)
214
215 if err := s.decryptBlob(plainBytes, bytes.NewBuffer(dat)); err != nil {
216 return err
217 }
218
219 header, err := plainBytes.ReadString('\n')
220 if err != nil {
221 return errors.New("No first line")
222 }
223 if header != "#camlistore/encmeta=2\n" {
224 if len(header) > 80 {
225 header = header[:80]
226 }
227 return fmt.Errorf("unsupported first line %q", header)
228 }
229 var plains []blob.Ref
230 for {
231 line, err := plainBytes.ReadString('\n')
232 if err != nil && len(line) != 0 {
233 return io.ErrUnexpectedEOF
234 } else if err != nil {
235 break
236 }
237 parts := strings.Split(strings.TrimRight(line, "\n"), "/")
238 if len(parts) != 3 {
239 if len(line) > 80 {
240 line = line[:80]
241 }
242 return fmt.Errorf("malformed line %q", line)
243 }
244
245
246 plainBR, ok := blob.ParseKnown(parts[0])
247 if !ok {
248 if len(line) > 80 {
249 line = line[:80]
250 }
251 return fmt.Errorf("malformed line %q", line)
252 }
253 plains = append(plains, plainBR)
254 if err := s.index.Set(parts[0], parts[1]+"/"+parts[2]); err != nil {
255 return err
256 }
257 }
258 s.recordMeta(&metaBlob{br: br, plains: plains})
259 return nil
260 }
261
262 func (s *storage) readAllMetaBlobs() error {
263 type encMB struct {
264 br blob.Ref
265 dat []byte
266 err error
267 }
268 metac := make(chan encMB, 16)
269
270 const maxInFlight = 5
271 var gate = make(chan bool, maxInFlight)
272
273 var stopEnumerate = make(chan bool)
274 enumErrc := make(chan error, 1)
275 go func() {
276 var wg sync.WaitGroup
277 ctx := context.TODO()
278 enumErrc <- blobserver.EnumerateAll(ctx, s.meta, func(sb blob.SizedRef) error {
279 select {
280 case <-stopEnumerate:
281 return errors.New("enumeration stopped")
282 default:
283 }
284
285 wg.Add(1)
286 gate <- true
287 go func() {
288 defer wg.Done()
289 defer func() { <-gate }()
290 rc, _, err := s.meta.Fetch(ctx, sb.Ref)
291 if err != nil {
292 metac <- encMB{sb.Ref, nil, fmt.Errorf("fetch failed: %v", err)}
293 return
294 }
295 defer rc.Close()
296 all, err := io.ReadAll(rc)
297 if err != nil {
298 metac <- encMB{sb.Ref, nil, fmt.Errorf("read failed: %v", err)}
299 return
300 }
301 metac <- encMB{sb.Ref, all, nil}
302 }()
303 return nil
304 })
305 wg.Wait()
306 close(metac)
307 }()
308
309 for mi := range metac {
310 err := mi.err
311 if err == nil {
312 err = s.processEncryptedMetaBlob(mi.br, mi.dat)
313 }
314 if err != nil {
315 close(stopEnumerate)
316 go func() {
317 for range metac {
318 }
319 }()
320
321
322
323 return fmt.Errorf("error with meta blob %v: %v", mi.br, err)
324 }
325 }
326
327 return <-enumErrc
328 }