1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package s3
18
19 import (
20 "context"
21 "fmt"
22 "path"
23
24 "github.com/aws/aws-sdk-go/aws"
25 "github.com/aws/aws-sdk-go/service/s3"
26 "perkeep.org/pkg/blob"
27 "perkeep.org/pkg/blobserver"
28 )
29
30
31
32 const s3MaxKeys = 1000
33
34 var _ blobserver.MaxEnumerateConfig = (*s3Storage)(nil)
35
36 func (sto *s3Storage) MaxEnumerate() int { return 1000 }
37
38 func (sto *s3Storage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) (retErr error) {
39 defer close(dest)
40 if faultEnumerate.FailErr(&retErr) {
41 return
42 }
43
44 var maxKeys *int64
45 if limit < s3MaxKeys {
46 maxKeys = aws.Int64(int64(limit))
47 }
48
49 keysGotten := 0
50
51 err := sto.client.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
52 Bucket: &sto.bucket,
53 StartAfter: aws.String(sto.dirPrefix + after),
54 MaxKeys: maxKeys,
55 Prefix: &sto.dirPrefix,
56 }, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
57 for _, obj := range page.Contents {
58 dir, file := path.Split(*obj.Key)
59 if dir != sto.dirPrefix {
60 continue
61 }
62 if file == after {
63 continue
64 }
65 br, ok := blob.Parse(file)
66 if !ok {
67 retErr = fmt.Errorf("non-Perkeep object named %q found in %v s3 bucket", file, sto.bucket)
68 return false
69 }
70 select {
71 case dest <- blob.SizedRef{Ref: br, Size: uint32(*obj.Size)}:
72 case <-ctx.Done():
73 retErr = ctx.Err()
74 return false
75 }
76 keysGotten++
77 if keysGotten >= limit {
78 return false
79 }
80 }
81 return true
82 })
83 if err == nil {
84 err = retErr
85 }
86
87 if err != nil {
88 return fmt.Errorf("s3 EnumerateBlobs: %v", err)
89 }
90 return nil
91 }