1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46 package encrypt
47
48 import (
49 "bufio"
50 "bytes"
51 "context"
52 "errors"
53 "fmt"
54 "io"
55 "log"
56 "os"
57 "time"
58
59 "filippo.io/age"
60 "go4.org/jsonconfig"
61 "go4.org/syncutil"
62 "perkeep.org/internal/pools"
63 "perkeep.org/pkg/blob"
64 "perkeep.org/pkg/blobserver"
65 "perkeep.org/pkg/sorted"
66 )
67
68 type storage struct {
69
70
71
72 index sorted.KeyValue
73
74
75 identity *age.X25519Identity
76
77
78 blobs blobserver.Storage
79
80
81
82
83
84
85
86 meta blobserver.Storage
87
88
89 smallMeta *metaBlobHeap
90 }
91
92
93
94
95 const version = 2
96
97
98 func (s *storage) encryptBlob(ciphertext, plaintext *bytes.Buffer) error {
99 if err := ciphertext.WriteByte(version); err != nil {
100 return fmt.Errorf("unable to write version byte: %w", err)
101 }
102 enc, err := age.Encrypt(ciphertext, s.identity.Recipient())
103 if err != nil {
104 return fmt.Errorf("unable to encrypt plaintext: %w", err)
105 }
106 if _, err := io.Copy(enc, plaintext); err != nil {
107 return fmt.Errorf("unable to encrypt plaintext: %w", err)
108 }
109 if err := enc.Close(); err != nil {
110 return fmt.Errorf("unable to encrypt plaintext: %w", err)
111 }
112 return nil
113 }
114
115
116 func (s *storage) decryptBlob(plaintext, ciphertext *bytes.Buffer) error {
117 if versionByte, err := ciphertext.ReadByte(); err != nil {
118 return fmt.Errorf("unable to read version byte: %w", err)
119 } else if versionByte != version {
120 return fmt.Errorf("unknown encrypted blob version: %d", versionByte)
121 }
122
123 dec, err := age.Decrypt(ciphertext, s.identity)
124 if err != nil {
125 return fmt.Errorf("unable to decrypt ciphertext: %w", err)
126 }
127 if _, err := io.Copy(plaintext, dec); err != nil {
128 return fmt.Errorf("unable to decrypt plaintext: %w", err)
129 }
130 return nil
131 }
132
133 func (s *storage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
134 return blobserver.ErrNotImplemented
135 }
136
137 var statGate = syncutil.NewGate(20)
138
139 func (s *storage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
140 return blobserver.StatBlobsParallelHelper(ctx, blobs, fn, statGate, func(br blob.Ref) (sb blob.SizedRef, err error) {
141 plainSize, _, err := s.fetchMeta(ctx, br)
142 switch err {
143 case nil:
144 return blob.SizedRef{Ref: br, Size: plainSize}, nil
145 case os.ErrNotExist:
146 return sb, nil
147 default:
148 return sb, err
149 }
150 })
151 }
152
153 func (s *storage) ReceiveBlob(ctx context.Context, plainBR blob.Ref, source io.Reader) (sb blob.SizedRef, err error) {
154
155 if plainSize, _, err := s.fetchMeta(ctx, plainBR); err == nil {
156 log.Println("encrypt: duplicated blob received", plainBR)
157 return blob.SizedRef{Ref: plainBR, Size: uint32(plainSize)}, nil
158 }
159
160 plainBytes := pools.BytesBuffer()
161 defer pools.PutBuffer(plainBytes)
162
163 hash := plainBR.Hash()
164 plainSize, err := io.Copy(io.MultiWriter(plainBytes, hash), source)
165 if err != nil {
166 return sb, err
167 }
168 if !plainBR.HashMatches(hash) {
169 return sb, blobserver.ErrCorruptBlob
170 }
171
172 encBytes := pools.BytesBuffer()
173 defer pools.PutBuffer(encBytes)
174
175 if err := s.encryptBlob(encBytes, plainBytes); err != nil {
176 return sb, fmt.Errorf("encrypt: error encrypting blob: %w", err)
177 }
178 encBR := blob.RefFromBytes(encBytes.Bytes())
179
180 if _, err = blobserver.ReceiveNoHash(ctx, s.blobs, encBR, encBytes); err != nil {
181 return sb, fmt.Errorf("encrypt: error writing encrypted blob %v (plaintext %v): %v", encBR, plainBR, err)
182 }
183
184 metaBytes, err := s.makeSingleMetaBlob(plainBR, encBR, uint32(plainSize))
185 if err != nil {
186 return sb, fmt.Errorf("encrypt: error making meta blob: %w", err)
187 }
188
189 metaBR := blob.RefFromBytes(metaBytes)
190 metaSB, err := blobserver.ReceiveNoHash(ctx, s.meta, metaBR, bytes.NewReader(metaBytes))
191 if err != nil {
192 return sb, fmt.Errorf("encrypt: error writing encrypted meta for plaintext %v (encrypted blob %v): %v", plainBR, encBR, err)
193 }
194 s.recordMeta(&metaBlob{br: metaSB.Ref, plains: []blob.Ref{plainBR}})
195
196 err = s.index.Set(plainBR.String(), packIndexEntry(uint32(plainSize), encBR))
197 if err != nil {
198 return sb, fmt.Errorf("encrypt: error updating index for encrypted %v (plaintext %v): %v", encBR, plainBR, err)
199 }
200
201 return blob.SizedRef{Ref: plainBR, Size: uint32(plainSize)}, nil
202 }
203
204 func (s *storage) Fetch(ctx context.Context, plainBR blob.Ref) (io.ReadCloser, uint32, error) {
205 plainSize, encBR, err := s.fetchMeta(ctx, plainBR)
206 if err != nil {
207 return nil, 0, err
208 }
209 encData, _, err := s.blobs.Fetch(ctx, encBR)
210 if err != nil {
211 return nil, 0, fmt.Errorf("encrypt: error fetching plaintext %s's encrypted %v blob: %v", plainBR, encBR, err)
212 }
213 defer encData.Close()
214
215 encBytes := pools.BytesBuffer()
216 defer pools.PutBuffer(encBytes)
217
218 encHash := encBR.Hash()
219 _, err = io.Copy(io.MultiWriter(encBytes, encHash), encData)
220 if err != nil {
221 return nil, 0, err
222 }
223
224
225
226
227
228 if !encBR.HashMatches(encHash) {
229 return nil, 0, blobserver.ErrCorruptBlob
230 }
231
232
233 plainBytes := bytes.NewBuffer(nil)
234 if err := s.decryptBlob(plainBytes, encBytes); err != nil {
235 return nil, 0, fmt.Errorf("encrypt: encrypted blob %s failed validation: %s", encBR, err)
236 }
237
238 return io.NopCloser(plainBytes), plainSize, nil
239 }
240
241 func (s *storage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
242 defer close(dest)
243 iter := s.index.Find(after, "")
244 n := 0
245 for iter.Next() {
246 if iter.Key() == after {
247 continue
248 }
249
250 br := blob.MustParse(iter.Key())
251 plainSize, _, err := unpackIndexEntry(iter.Value())
252 if err != nil {
253 return fmt.Errorf("bogus encrypt index value %q: %s", iter.Value(), err)
254 }
255 select {
256 case dest <- blob.SizedRef{Ref: br, Size: plainSize}:
257 case <-ctx.Done():
258 return ctx.Err()
259 }
260 n++
261 if limit != 0 && n >= limit {
262 break
263 }
264 }
265 return iter.Close()
266 }
267
268 func init() {
269 blobserver.RegisterStorageConstructor("encrypt", blobserver.StorageConstructor(newFromConfig))
270 }
271
272 func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (bs blobserver.Storage, err error) {
273 sto := &storage{}
274 agreement := config.RequiredString("I_AGREE")
275 const wantAgreement = "that encryption support hasn't been peer-reviewed, isn't finished, and its format might change."
276 if agreement != wantAgreement {
277 return nil, errors.New("use of the 'encrypt' target without the proper I_AGREE value")
278 }
279
280 keyFile := config.RequiredString("keyFile")
281 blobStorage := config.RequiredString("blobs")
282 metaStorage := config.RequiredString("meta")
283 metaConf := config.RequiredObject("metaIndex")
284 if err := config.Validate(); err != nil {
285 return nil, err
286 }
287
288 sto.index, err = sorted.NewKeyValueMaybeWipe(metaConf)
289 if err != nil {
290 return
291 }
292
293 sto.blobs, err = ld.GetStorage(blobStorage)
294 if err != nil {
295 return
296 }
297 sto.meta, err = ld.GetStorage(metaStorage)
298 if err != nil {
299 return
300 }
301
302 keyData, err := readKeyFile(keyFile)
303 if err != nil {
304 return nil, fmt.Errorf("error reading key file '%s': %w", keyFile, err)
305 }
306
307 identity, err := age.ParseX25519Identity(keyData)
308 if err != nil {
309 return nil, fmt.Errorf("error parsing x25519 identity: %w", err)
310 }
311 sto.identity = identity
312
313 start := time.Now()
314 log.Printf("Reading encryption metadata...")
315 sto.smallMeta = &metaBlobHeap{}
316 if err := sto.readAllMetaBlobs(); err != nil {
317 return nil, fmt.Errorf("error scanning metadata on start-up: %v", err)
318 }
319 log.Printf("Read all encryption metadata in %.3f seconds", time.Since(start).Seconds())
320
321 return sto, nil
322 }
323
324 func readKeyFile(keyFile string) (string, error) {
325 if err := checkKeyFilePermissions(keyFile); err != nil {
326 return "", fmt.Errorf("error checking key file permissions: %w", err)
327 }
328 f, err := os.Open(keyFile)
329 if err != nil {
330 return "", fmt.Errorf("error opening key file: %w", err)
331 }
332 defer f.Close()
333
334 keyScanner := bufio.NewScanner(f)
335 if !keyScanner.Scan() {
336 return "", errors.New("empty key file")
337 }
338 keyData := keyScanner.Text()
339
340 if keyScanner.Scan() {
341 return "", errors.New("key file contained multiple lines")
342 }
343
344 return keyData, keyScanner.Err()
345 }