1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
34 package s3
35
36 import (
37 "context"
38 "fmt"
39 "net/http"
40 "strings"
41
42 "perkeep.org/pkg/blob"
43 "perkeep.org/pkg/blobserver"
44 "perkeep.org/pkg/blobserver/memory"
45 "perkeep.org/pkg/blobserver/proxycache"
46
47 "github.com/aws/aws-sdk-go/aws"
48 "github.com/aws/aws-sdk-go/aws/awserr"
49 "github.com/aws/aws-sdk-go/aws/credentials"
50 "github.com/aws/aws-sdk-go/aws/session"
51 "github.com/aws/aws-sdk-go/service/s3"
52 "github.com/aws/aws-sdk-go/service/s3/s3iface"
53 "go4.org/fault"
54 "go4.org/jsonconfig"
55 )
56
57 var (
58 _ blob.SubFetcher = (*s3Storage)(nil)
59 _ blobserver.MaxEnumerateConfig = (*s3Storage)(nil)
60 )
61
62 var (
63 faultReceive = fault.NewInjector("s3_receive")
64 faultEnumerate = fault.NewInjector("s3_enumerate")
65 faultStat = fault.NewInjector("s3_stat")
66 faultGet = fault.NewInjector("s3_get")
67 )
68
69 type s3Storage struct {
70 client s3iface.S3API
71 bucket string
72
73
74
75
76
77 dirPrefix string
78
79
80
81
82 hostname string
83 }
84
85 func (s *s3Storage) String() string {
86 if s.dirPrefix != "" {
87 return fmt.Sprintf("\"S3\" blob storage at host %q, bucket %q, directory %q", s.hostname, s.bucket, s.dirPrefix)
88 }
89 return fmt.Sprintf("\"S3\" blob storage at host %q, bucket %q", s.hostname, s.bucket)
90 }
91
92 func newFromConfig(l blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) {
93 return newFromConfigWithTransport(l, config, nil)
94 }
95
96
97
98
99
100 func newFromConfigWithTransport(_ blobserver.Loader, config jsonconfig.Obj, transport http.RoundTripper) (blobserver.Storage, error) {
101 hostname := config.OptionalString("hostname", "")
102 region := config.OptionalString("aws_region", "us-east-1")
103
104 cacheSize := config.OptionalInt64("cacheSize", 32<<20)
105 s3Cfg := aws.NewConfig().WithCredentials(credentials.NewStaticCredentials(
106 config.RequiredString("aws_access_key"),
107 config.RequiredString("aws_secret_access_key"),
108 "",
109 ))
110 if hostname != "" {
111 s3Cfg.WithEndpoint(hostname)
112 }
113 s3Cfg.WithRegion(region)
114 if transport != nil {
115 httpClient := *http.DefaultClient
116 httpClient.Transport = transport
117 s3Cfg.WithHTTPClient(&httpClient)
118 }
119 awsSession := session.New(s3Cfg)
120
121 bucket := config.RequiredString("bucket")
122 var dirPrefix string
123 if parts := strings.SplitN(bucket, "/", 2); len(parts) > 1 {
124 dirPrefix = parts[1]
125 bucket = parts[0]
126 }
127 if dirPrefix != "" && !strings.HasSuffix(dirPrefix, "/") {
128 dirPrefix += "/"
129 }
130
131 skipStartupCheck := config.OptionalBool("skipStartupCheck", false)
132 if err := config.Validate(); err != nil {
133 return nil, err
134 }
135
136 ctx := context.TODO()
137 if !skipStartupCheck {
138 info, err := normalizeBucketLocation(ctx, awsSession, hostname, bucket, region)
139 if err != nil {
140 return nil, err
141 }
142 awsSession.Config.WithRegion(info.region)
143 awsSession.Config.WithEndpoint(info.endpoint)
144 if !info.isAWS {
145 awsSession.Config.WithS3ForcePathStyle(true)
146 }
147 } else {
148
149 awsSession.Config.WithS3ForcePathStyle(true)
150 }
151
152 sto := &s3Storage{
153 client: s3.New(awsSession),
154 bucket: bucket,
155 dirPrefix: dirPrefix,
156 hostname: hostname,
157 }
158
159 if cacheSize != 0 {
160
161
162
163 return proxycache.New(cacheSize<<2, memory.NewCache(cacheSize), sto), nil
164 }
165 return sto, nil
166 }
167
168 func init() {
169 blobserver.RegisterStorageConstructor("s3", blobserver.StorageConstructor(newFromConfig))
170 blobserver.RegisterStorageConstructor("b2", blobserver.StorageConstructor(newFromConfig))
171 }
172
173
174 func isNotFound(err error) bool {
175 if err == nil {
176 return false
177 }
178 if aerr, ok := err.(awserr.Error); ok {
179 return aerr.Code() == s3.ErrCodeNoSuchKey ||
180
181
182
183 aerr.Code() == "NotFound"
184 }
185 return false
186 }