1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package s3
18
19 import (
20 "context"
21 "io"
22
23 "github.com/aws/aws-sdk-go/aws"
24 "github.com/aws/aws-sdk-go/service/s3/s3manager"
25 "go4.org/readerutil"
26 "perkeep.org/pkg/blob"
27 )
28
29 func (sto *s3Storage) ReceiveBlob(ctx context.Context, b blob.Ref, source io.Reader) (sr blob.SizedRef, err error) {
30 if faultReceive.FailErr(&err) {
31 return
32 }
33
34
35
36
37
38 if size, ok := readerutil.Size(source); ok {
39 if err := sto.doUpload(ctx, b, source); err != nil {
40 return sr, err
41 }
42 return blob.SizedRef{Ref: b, Size: uint32(size)}, nil
43 }
44
45 cr := readerutil.CountingReader{
46 Reader: source,
47 N: aws.Int64(0),
48 }
49 if err = sto.doUpload(ctx, b, cr); err != nil {
50 return sr, err
51 }
52 return blob.SizedRef{Ref: b, Size: uint32(*cr.N)}, nil
53 }
54
55 func (sto *s3Storage) doUpload(ctx context.Context, b blob.Ref, r io.Reader) error {
56 uploader := s3manager.NewUploaderWithClient(sto.client)
57
58 _, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{
59 Bucket: &sto.bucket,
60 Key: aws.String(sto.dirPrefix + b.String()),
61 Body: r,
62 })
63 return err
64 }