Home Download Docs Code Community
     1	/*
     2	Copyright 2011 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	// Package cloudstorage registers the "googlecloudstorage" blob storage type, storing blobs
    18	// on Google Cloud Storage (not Google Drive).
    19	// See https://cloud.google.com/products/cloud-storage
    20	package cloudstorage
    21	
    22	import (
    23		"bytes"
    24		"context"
    25		"crypto/sha1"
    26		"errors"
    27		"fmt"
    28		"io"
    29		"log"
    30		"net/http"
    31		"os"
    32		"path"
    33		"strings"
    34		"time"
    35	
    36		"perkeep.org/pkg/blob"
    37		"perkeep.org/pkg/blobserver"
    38		"perkeep.org/pkg/blobserver/memory"
    39		"perkeep.org/pkg/constants"
    40	
    41		"cloud.google.com/go/storage"
    42		"go4.org/cloud/google/gcsutil"
    43		"go4.org/ctxutil"
    44		"go4.org/jsonconfig"
    45		"go4.org/oauthutil"
    46		"go4.org/syncutil"
    47		"golang.org/x/oauth2"
    48		"golang.org/x/oauth2/google"
    49		"google.golang.org/api/option"
    50	)
    51	
    52	type Storage struct {
    53		bucket string // the gs bucket containing blobs
    54		// optional "directory" where the blobs are stored, instead of at the root of the bucket.
    55		// gcs is actually flat, which in effect just means that all the objects should have this
    56		// dirPrefix as a prefix of their key.
    57		// If non empty, it should be a slash separated path with a trailing slash and no starting
    58		// slash.
    59		dirPrefix string
    60		client    *storage.Client
    61		cache     *memory.Storage // or nil for no cache
    62	
    63		// an OAuth-authenticated HTTP client, for methods that can't yet use a
    64		// *storage.Client
    65		baseHTTPClient *http.Client
    66	
    67		// For blobserver.Generationer:
    68		genTime   time.Time
    69		genRandom string
    70	}
    71	
    72	var (
    73		_ blob.SubFetcher               = (*Storage)(nil)
    74		_ blobserver.Generationer       = (*Storage)(nil)
    75		_ blobserver.MaxEnumerateConfig = (*Storage)(nil)
    76	)
    77	
    78	func (s *Storage) MaxEnumerate() int { return 1000 }
    79	
    80	func (s *Storage) StorageGeneration() (time.Time, string, error) {
    81		return s.genTime, s.genRandom, nil
    82	}
    83	func (s *Storage) ResetStorageGeneration() error { return errors.New("not supported") }
    84	
    85	func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) {
    86		var (
    87			auth      = config.RequiredObject("auth")
    88			bucket    = config.RequiredString("bucket")
    89			cacheSize = config.OptionalInt64("cacheSize", 32<<20)
    90	
    91			clientID     = auth.RequiredString("client_id") // or "auto" for service accounts
    92			clientSecret = auth.OptionalString("client_secret", "")
    93			refreshToken = auth.OptionalString("refresh_token", "")
    94		)
    95	
    96		if err := config.Validate(); err != nil {
    97			return nil, err
    98		}
    99		if err := auth.Validate(); err != nil {
   100			return nil, err
   101		}
   102	
   103		var dirPrefix string
   104		if parts := strings.SplitN(bucket, "/", 2); len(parts) > 1 {
   105			dirPrefix = parts[1]
   106			bucket = parts[0]
   107		}
   108		if dirPrefix != "" && !strings.HasSuffix(dirPrefix, "/") {
   109			dirPrefix += "/"
   110		}
   111		gs := &Storage{
   112			bucket:    bucket,
   113			dirPrefix: dirPrefix,
   114		}
   115	
   116		var (
   117			ctx = context.Background()
   118			ts  oauth2.TokenSource
   119			cl  *storage.Client
   120			err error
   121		)
   122		if clientID == "auto" {
   123			ts, err = google.DefaultTokenSource(ctx, storage.ScopeReadWrite)
   124			if err != nil {
   125				return nil, err
   126			}
   127			cl, err = storage.NewClient(ctx)
   128			if err != nil {
   129				return nil, err
   130			}
   131		} else {
   132			if clientSecret == "" {
   133				return nil, errors.New("missing required parameter 'client_secret'")
   134			}
   135			if refreshToken == "" {
   136				return nil, errors.New("missing required parameter 'refresh_token'")
   137			}
   138			ts = oauthutil.NewRefreshTokenSource(&oauth2.Config{
   139				Scopes:       []string{storage.ScopeReadWrite},
   140				Endpoint:     google.Endpoint,
   141				ClientID:     clientID,
   142				ClientSecret: clientSecret,
   143				RedirectURL:  oauthutil.TitleBarRedirectURL,
   144			}, refreshToken)
   145			cl, err = storage.NewClient(ctx, option.WithTokenSource(ts))
   146			if err != nil {
   147				return nil, err
   148			}
   149		}
   150	
   151		gs.baseHTTPClient = oauth2.NewClient(ctx, ts)
   152		gs.client = cl
   153	
   154		if cacheSize != 0 {
   155			gs.cache = memory.NewCache(cacheSize)
   156		}
   157	
   158		ba, err := gs.client.Bucket(gs.bucket).Attrs(ctx)
   159		if err != nil {
   160			return nil, fmt.Errorf("error statting bucket %q: %v", gs.bucket, err)
   161		}
   162		hash := sha1.New()
   163		fmt.Fprintf(hash, "%v%v", ba.Created, ba.MetaGeneration)
   164		gs.genRandom = fmt.Sprintf("%x", hash.Sum(nil))
   165		gs.genTime = ba.Created
   166	
   167		return gs, nil
   168	}
   169	
   170	// TODO(mpl, bradfitz): use a *storage.Client in EnumerateBlobs, instead of hitting the
   171	// XML API, once we have an efficient replacement for the "marker" from the XML API. See
   172	// https://github.com/GoogleCloudPlatform/gcloud-golang/issues/197
   173	
   174	func (s *Storage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
   175		defer close(dest)
   176		ectx := context.WithValue(ctx, ctxutil.HTTPClient, s.baseHTTPClient)
   177		objs, err := gcsutil.EnumerateObjects(ectx, s.bucket, s.dirPrefix+after, limit)
   178		if err != nil {
   179			log.Printf("gstorage EnumerateObjects: %v", err)
   180			return err
   181		}
   182		for _, obj := range objs {
   183			dir, file := path.Split(obj.Name)
   184			if dir != s.dirPrefix {
   185				continue
   186			}
   187			br, ok := blob.Parse(file)
   188			if !ok {
   189				return fmt.Errorf("Non-Perkeep object named %q found in bucket", file)
   190			}
   191			select {
   192			case dest <- blob.SizedRef{Ref: br, Size: uint32(obj.Size)}:
   193			case <-ctx.Done():
   194				return ctx.Err()
   195			}
   196		}
   197		return nil
   198	}
   199	
   200	func (s *Storage) ReceiveBlob(ctx context.Context, br blob.Ref, source io.Reader) (blob.SizedRef, error) {
   201		var buf bytes.Buffer
   202		size, err := io.Copy(&buf, source)
   203		if err != nil {
   204			return blob.SizedRef{}, err
   205		}
   206	
   207		w := s.client.Bucket(s.bucket).Object(s.dirPrefix + br.String()).NewWriter(ctx)
   208		if _, err := io.Copy(w, bytes.NewReader(buf.Bytes())); err != nil {
   209			return blob.SizedRef{}, err
   210		}
   211		if err := w.Close(); err != nil {
   212			return blob.SizedRef{}, err
   213		}
   214	
   215		if s.cache != nil {
   216			// NoHash because it's already verified if we read it
   217			// without errors on the io.Copy above.
   218			blobserver.ReceiveNoHash(ctx, s.cache, br, bytes.NewReader(buf.Bytes()))
   219		}
   220		return blob.SizedRef{Ref: br, Size: uint32(size)}, nil
   221	}
   222	
   223	var statGate = syncutil.NewGate(20) // arbitrary cap
   224	
   225	func (s *Storage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
   226		// TODO: use cache
   227		return blobserver.StatBlobsParallelHelper(ctx, blobs, fn, statGate, func(br blob.Ref) (sb blob.SizedRef, err error) {
   228			attrs, err := s.client.Bucket(s.bucket).Object(s.dirPrefix + br.String()).Attrs(ctx)
   229			if err == storage.ErrObjectNotExist {
   230				return sb, nil
   231			}
   232			if err != nil {
   233				return sb, err
   234			}
   235			size := attrs.Size
   236			if size > constants.MaxBlobSize {
   237				return sb, fmt.Errorf("blob %s stat size too large (%d)", br, size)
   238			}
   239			return blob.SizedRef{Ref: br, Size: uint32(size)}, nil
   240		})
   241	}
   242	
   243	func (s *Storage) Fetch(ctx context.Context, br blob.Ref) (rc io.ReadCloser, size uint32, err error) {
   244		if s.cache != nil {
   245			if rc, size, err = s.cache.Fetch(ctx, br); err == nil {
   246				return
   247			}
   248		}
   249		r, err := s.client.Bucket(s.bucket).Object(s.dirPrefix + br.String()).NewReader(ctx)
   250		if err == storage.ErrObjectNotExist {
   251			return nil, 0, os.ErrNotExist
   252		}
   253		if err != nil {
   254			return nil, 0, err
   255		}
   256		if r.Size() >= 1<<32 {
   257			r.Close()
   258			return nil, 0, errors.New("object larger than a uint32")
   259		}
   260		size = uint32(r.Size())
   261		if size > constants.MaxBlobSize {
   262			r.Close()
   263			return nil, size, errors.New("object too big")
   264		}
   265		return r, size, nil
   266	}
   267	
   268	func (s *Storage) SubFetch(ctx context.Context, br blob.Ref, offset, length int64) (rc io.ReadCloser, err error) {
   269		if offset < 0 || length < 0 {
   270			return nil, blob.ErrNegativeSubFetch
   271		}
   272		ctx = context.WithValue(ctx, ctxutil.HTTPClient, s.baseHTTPClient)
   273		rc, err = gcsutil.GetPartialObject(ctx, gcsutil.Object{Bucket: s.bucket, Key: s.dirPrefix + br.String()}, offset, length)
   274		if err == gcsutil.ErrInvalidRange {
   275			return nil, blob.ErrOutOfRangeOffsetSubFetch
   276		}
   277		if err == storage.ErrObjectNotExist {
   278			return nil, os.ErrNotExist
   279		}
   280		return rc, err
   281	}
   282	
   283	func (s *Storage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
   284		if s.cache != nil {
   285			s.cache.RemoveBlobs(ctx, blobs)
   286		}
   287		gate := syncutil.NewGate(50) // arbitrary
   288		var grp syncutil.Group
   289		for i := range blobs {
   290			gate.Start()
   291			br := blobs[i]
   292			grp.Go(func() error {
   293				defer gate.Done()
   294				err := s.client.Bucket(s.bucket).Object(s.dirPrefix + br.String()).Delete(ctx)
   295				if err == storage.ErrObjectNotExist {
   296					return nil
   297				}
   298				return err
   299			})
   300		}
   301		return grp.Err()
   302	}
   303	
   304	func init() {
   305		blobserver.RegisterStorageConstructor("googlecloudstorage", blobserver.StorageConstructor(newFromConfig))
   306	}
Website layout inspired by memcached.
Content by the authors.