1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17
18
19
20 package cloudstorage
21
22 import (
23 "bytes"
24 "context"
25 "crypto/sha1"
26 "errors"
27 "fmt"
28 "io"
29 "log"
30 "net/http"
31 "os"
32 "path"
33 "strings"
34 "time"
35
36 "perkeep.org/pkg/blob"
37 "perkeep.org/pkg/blobserver"
38 "perkeep.org/pkg/blobserver/memory"
39 "perkeep.org/pkg/constants"
40
41 "cloud.google.com/go/storage"
42 "go4.org/cloud/google/gcsutil"
43 "go4.org/ctxutil"
44 "go4.org/jsonconfig"
45 "go4.org/oauthutil"
46 "go4.org/syncutil"
47 "golang.org/x/oauth2"
48 "golang.org/x/oauth2/google"
49 "google.golang.org/api/option"
50 )
51
52 type Storage struct {
53 bucket string
54
55
56
57
58
59 dirPrefix string
60 client *storage.Client
61 cache *memory.Storage
62
63
64
65 baseHTTPClient *http.Client
66
67
68 genTime time.Time
69 genRandom string
70 }
71
72 var (
73 _ blob.SubFetcher = (*Storage)(nil)
74 _ blobserver.Generationer = (*Storage)(nil)
75 _ blobserver.MaxEnumerateConfig = (*Storage)(nil)
76 )
77
78 func (s *Storage) MaxEnumerate() int { return 1000 }
79
80 func (s *Storage) StorageGeneration() (time.Time, string, error) {
81 return s.genTime, s.genRandom, nil
82 }
83 func (s *Storage) ResetStorageGeneration() error { return errors.New("not supported") }
84
85 func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) {
86 var (
87 auth = config.RequiredObject("auth")
88 bucket = config.RequiredString("bucket")
89 cacheSize = config.OptionalInt64("cacheSize", 32<<20)
90
91 clientID = auth.RequiredString("client_id")
92 clientSecret = auth.OptionalString("client_secret", "")
93 refreshToken = auth.OptionalString("refresh_token", "")
94 )
95
96 if err := config.Validate(); err != nil {
97 return nil, err
98 }
99 if err := auth.Validate(); err != nil {
100 return nil, err
101 }
102
103 var dirPrefix string
104 if parts := strings.SplitN(bucket, "/", 2); len(parts) > 1 {
105 dirPrefix = parts[1]
106 bucket = parts[0]
107 }
108 if dirPrefix != "" && !strings.HasSuffix(dirPrefix, "/") {
109 dirPrefix += "/"
110 }
111 gs := &Storage{
112 bucket: bucket,
113 dirPrefix: dirPrefix,
114 }
115
116 var (
117 ctx = context.Background()
118 ts oauth2.TokenSource
119 cl *storage.Client
120 err error
121 )
122 if clientID == "auto" {
123 ts, err = google.DefaultTokenSource(ctx, storage.ScopeReadWrite)
124 if err != nil {
125 return nil, err
126 }
127 cl, err = storage.NewClient(ctx)
128 if err != nil {
129 return nil, err
130 }
131 } else {
132 if clientSecret == "" {
133 return nil, errors.New("missing required parameter 'client_secret'")
134 }
135 if refreshToken == "" {
136 return nil, errors.New("missing required parameter 'refresh_token'")
137 }
138 ts = oauthutil.NewRefreshTokenSource(&oauth2.Config{
139 Scopes: []string{storage.ScopeReadWrite},
140 Endpoint: google.Endpoint,
141 ClientID: clientID,
142 ClientSecret: clientSecret,
143 RedirectURL: oauthutil.TitleBarRedirectURL,
144 }, refreshToken)
145 cl, err = storage.NewClient(ctx, option.WithTokenSource(ts))
146 if err != nil {
147 return nil, err
148 }
149 }
150
151 gs.baseHTTPClient = oauth2.NewClient(ctx, ts)
152 gs.client = cl
153
154 if cacheSize != 0 {
155 gs.cache = memory.NewCache(cacheSize)
156 }
157
158 ba, err := gs.client.Bucket(gs.bucket).Attrs(ctx)
159 if err != nil {
160 return nil, fmt.Errorf("error statting bucket %q: %v", gs.bucket, err)
161 }
162 hash := sha1.New()
163 fmt.Fprintf(hash, "%v%v", ba.Created, ba.MetaGeneration)
164 gs.genRandom = fmt.Sprintf("%x", hash.Sum(nil))
165 gs.genTime = ba.Created
166
167 return gs, nil
168 }
169
170
171
172
173
174 func (s *Storage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
175 defer close(dest)
176 ectx := context.WithValue(ctx, ctxutil.HTTPClient, s.baseHTTPClient)
177 objs, err := gcsutil.EnumerateObjects(ectx, s.bucket, s.dirPrefix+after, limit)
178 if err != nil {
179 log.Printf("gstorage EnumerateObjects: %v", err)
180 return err
181 }
182 for _, obj := range objs {
183 dir, file := path.Split(obj.Name)
184 if dir != s.dirPrefix {
185 continue
186 }
187 br, ok := blob.Parse(file)
188 if !ok {
189 return fmt.Errorf("Non-Perkeep object named %q found in bucket", file)
190 }
191 select {
192 case dest <- blob.SizedRef{Ref: br, Size: uint32(obj.Size)}:
193 case <-ctx.Done():
194 return ctx.Err()
195 }
196 }
197 return nil
198 }
199
200 func (s *Storage) ReceiveBlob(ctx context.Context, br blob.Ref, source io.Reader) (blob.SizedRef, error) {
201 var buf bytes.Buffer
202 size, err := io.Copy(&buf, source)
203 if err != nil {
204 return blob.SizedRef{}, err
205 }
206
207 w := s.client.Bucket(s.bucket).Object(s.dirPrefix + br.String()).NewWriter(ctx)
208 if _, err := io.Copy(w, bytes.NewReader(buf.Bytes())); err != nil {
209 return blob.SizedRef{}, err
210 }
211 if err := w.Close(); err != nil {
212 return blob.SizedRef{}, err
213 }
214
215 if s.cache != nil {
216
217
218 blobserver.ReceiveNoHash(ctx, s.cache, br, bytes.NewReader(buf.Bytes()))
219 }
220 return blob.SizedRef{Ref: br, Size: uint32(size)}, nil
221 }
222
223 var statGate = syncutil.NewGate(20)
224
225 func (s *Storage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
226
227 return blobserver.StatBlobsParallelHelper(ctx, blobs, fn, statGate, func(br blob.Ref) (sb blob.SizedRef, err error) {
228 attrs, err := s.client.Bucket(s.bucket).Object(s.dirPrefix + br.String()).Attrs(ctx)
229 if err == storage.ErrObjectNotExist {
230 return sb, nil
231 }
232 if err != nil {
233 return sb, err
234 }
235 size := attrs.Size
236 if size > constants.MaxBlobSize {
237 return sb, fmt.Errorf("blob %s stat size too large (%d)", br, size)
238 }
239 return blob.SizedRef{Ref: br, Size: uint32(size)}, nil
240 })
241 }
242
243 func (s *Storage) Fetch(ctx context.Context, br blob.Ref) (rc io.ReadCloser, size uint32, err error) {
244 if s.cache != nil {
245 if rc, size, err = s.cache.Fetch(ctx, br); err == nil {
246 return
247 }
248 }
249 r, err := s.client.Bucket(s.bucket).Object(s.dirPrefix + br.String()).NewReader(ctx)
250 if err == storage.ErrObjectNotExist {
251 return nil, 0, os.ErrNotExist
252 }
253 if err != nil {
254 return nil, 0, err
255 }
256 if r.Size() >= 1<<32 {
257 r.Close()
258 return nil, 0, errors.New("object larger than a uint32")
259 }
260 size = uint32(r.Size())
261 if size > constants.MaxBlobSize {
262 r.Close()
263 return nil, size, errors.New("object too big")
264 }
265 return r, size, nil
266 }
267
268 func (s *Storage) SubFetch(ctx context.Context, br blob.Ref, offset, length int64) (rc io.ReadCloser, err error) {
269 if offset < 0 || length < 0 {
270 return nil, blob.ErrNegativeSubFetch
271 }
272 ctx = context.WithValue(ctx, ctxutil.HTTPClient, s.baseHTTPClient)
273 rc, err = gcsutil.GetPartialObject(ctx, gcsutil.Object{Bucket: s.bucket, Key: s.dirPrefix + br.String()}, offset, length)
274 if err == gcsutil.ErrInvalidRange {
275 return nil, blob.ErrOutOfRangeOffsetSubFetch
276 }
277 if err == storage.ErrObjectNotExist {
278 return nil, os.ErrNotExist
279 }
280 return rc, err
281 }
282
283 func (s *Storage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
284 if s.cache != nil {
285 s.cache.RemoveBlobs(ctx, blobs)
286 }
287 gate := syncutil.NewGate(50)
288 var grp syncutil.Group
289 for i := range blobs {
290 gate.Start()
291 br := blobs[i]
292 grp.Go(func() error {
293 defer gate.Done()
294 err := s.client.Bucket(s.bucket).Object(s.dirPrefix + br.String()).Delete(ctx)
295 if err == storage.ErrObjectNotExist {
296 return nil
297 }
298 return err
299 })
300 }
301 return grp.Err()
302 }
303
304 func init() {
305 blobserver.RegisterStorageConstructor("googlecloudstorage", blobserver.StorageConstructor(newFromConfig))
306 }