Home Download Docs Code Community
     1	/*
     2	Copyright 2011 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	/*
    18	Package replica registers the "replica" blobserver storage type,
    19	providing synchronous replication to one more backends.
    20	
    21	Writes wait for minWritesForSuccess (default: all). Reads are
    22	attempted in order and not load-balanced, randomized, or raced by
    23	default.
    24	
    25	Example config:
    26	
    27		"/repl/": {
    28		    "handler": "storage-replica",
    29		    "handlerArgs": {
    30		        "backends": ["/b1/", "/b2/", "/b3/"],
    31		        "minWritesForSuccess": 2
    32		    }
    33		},
    34	*/
    35	package replica // import "perkeep.org/pkg/blobserver/replica"
    36	
    37	import (
    38		"bytes"
    39		"context"
    40		"errors"
    41		"fmt"
    42		"io"
    43		"log"
    44		"os"
    45		"sync"
    46		"time"
    47	
    48		"golang.org/x/sync/errgroup"
    49	
    50		"go4.org/jsonconfig"
    51		"perkeep.org/pkg/blob"
    52		"perkeep.org/pkg/blobserver"
    53	)
    54	
    55	var (
    56		_ blobserver.Generationer    = (*replicaStorage)(nil)
    57		_ blobserver.WholeRefFetcher = (*replicaStorage)(nil)
    58	)
    59	
    60	const buffered = 8
    61	
    62	type replicaStorage struct {
    63		// Replicas for writing:
    64		replicaPrefixes []string
    65		replicas        []blobserver.Storage
    66	
    67		// Replicas for reading:
    68		readPrefixes []string
    69		readReplicas []blobserver.Storage
    70	
    71		// Minimum number of writes that must succeed before
    72		// acknowledging success to the client.
    73		minWritesForSuccess int
    74	}
    75	
    76	// NewForTest returns a replicated storage that writes, reads, and
    77	// deletes from all the provided storages.
    78	func NewForTest(sto []blobserver.Storage) blobserver.Storage {
    79		sto = append([]blobserver.Storage(nil), sto...) // clone
    80		names := make([]string, len(sto))
    81		for i := range names {
    82			names[i] = "/unknown-prefix/"
    83		}
    84		return &replicaStorage{
    85			replicaPrefixes:     names,
    86			replicas:            sto,
    87			readPrefixes:        names,
    88			readReplicas:        sto,
    89			minWritesForSuccess: len(sto),
    90		}
    91	}
    92	
    93	func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err error) {
    94		sto := &replicaStorage{
    95			replicaPrefixes: config.RequiredList("backends"),
    96			readPrefixes:    config.OptionalList("readBackends"),
    97		}
    98		nReplicas := len(sto.replicaPrefixes)
    99		sto.minWritesForSuccess = config.OptionalInt("minWritesForSuccess", nReplicas)
   100		if err := config.Validate(); err != nil {
   101			return nil, err
   102		}
   103		if nReplicas == 0 {
   104			return nil, errors.New("replica: need at least one replica")
   105		}
   106		if sto.minWritesForSuccess == 0 {
   107			sto.minWritesForSuccess = nReplicas
   108		}
   109		// readPrefixes defaults to the write prefixes.
   110		if len(sto.readPrefixes) == 0 {
   111			sto.readPrefixes = sto.replicaPrefixes
   112		}
   113	
   114		for _, prefix := range sto.replicaPrefixes {
   115			s, err := ld.GetStorage(prefix)
   116			if err != nil {
   117				// If it's not a storage interface, it might be an http Handler
   118				// that also supports being a target (e.g. a sync handler).
   119				h, _ := ld.GetHandler(prefix)
   120				var ok bool
   121				if s, ok = h.(blobserver.Storage); !ok {
   122					return nil, err
   123				}
   124			}
   125			sto.replicas = append(sto.replicas, s)
   126		}
   127		for _, prefix := range sto.readPrefixes {
   128			s, err := ld.GetStorage(prefix)
   129			if err != nil {
   130				return nil, err
   131			}
   132			sto.readReplicas = append(sto.readReplicas, s)
   133		}
   134		return sto, nil
   135	}
   136	
   137	func (sto *replicaStorage) Fetch(ctx context.Context, b blob.Ref) (file io.ReadCloser, size uint32, err error) {
   138		// TODO: race these? first to respond?
   139		for _, replica := range sto.readReplicas {
   140			file, size, err = replica.Fetch(ctx, b)
   141			if err == nil {
   142				return
   143			}
   144		}
   145		return
   146	}
   147	
   148	// StatBlobs stats all read replicas.
   149	func (sto *replicaStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
   150		var (
   151			mu     sync.Mutex // serializes calls to fn, guards need
   152			need   = make(map[blob.Ref]bool)
   153			failed bool
   154		)
   155		for _, br := range blobs {
   156			need[br] = true
   157		}
   158	
   159		group, ctx := errgroup.WithContext(ctx)
   160	
   161		for _, replica := range sto.readReplicas {
   162			replica := replica
   163			group.Go(func() error {
   164				return replica.StatBlobs(ctx, blobs, func(sb blob.SizedRef) error {
   165					mu.Lock()
   166					defer mu.Unlock()
   167					if failed {
   168						return nil
   169					}
   170					if !need[sb.Ref] {
   171						// dup, lost race from other replica
   172						return nil
   173					}
   174					delete(need, sb.Ref)
   175					if err := fn(sb); err != nil {
   176						failed = true
   177						return err
   178					}
   179					return nil
   180				})
   181			})
   182		}
   183	
   184		return group.Wait()
   185	}
   186	
   187	type sizedBlobAndError struct {
   188		idx int
   189		sb  blob.SizedRef
   190		err error
   191	}
   192	
   193	func (sto *replicaStorage) ReceiveBlob(ctx context.Context, br blob.Ref, src io.Reader) (_ blob.SizedRef, err error) {
   194		// Slurp the whole blob before replicating. Bounded by 16 MB anyway.
   195		var buf bytes.Buffer
   196		size, err := io.Copy(&buf, src)
   197		if err != nil {
   198			return
   199		}
   200	
   201		nReplicas := len(sto.replicas)
   202		resc := make(chan sizedBlobAndError, nReplicas)
   203		uploadToReplica := func(idx int, dst blobserver.BlobReceiver) {
   204			// Using ReceiveNoHash because it's already been
   205			// verified implicitly by the io.Copy above:
   206			sb, err := blobserver.ReceiveNoHash(ctx, dst, br, bytes.NewReader(buf.Bytes()))
   207			resc <- sizedBlobAndError{idx, sb, err}
   208		}
   209		for idx, replica := range sto.replicas {
   210			go uploadToReplica(idx, replica)
   211		}
   212	
   213		nSuccess := 0
   214		var fails []sizedBlobAndError
   215		for range sto.replicas {
   216			res := <-resc
   217			switch {
   218			case res.err == nil && int64(res.sb.Size) == size:
   219				nSuccess++
   220				if nSuccess == sto.minWritesForSuccess {
   221					return res.sb, nil
   222				}
   223			case res.err == nil:
   224				err = fmt.Errorf("replica: upload shard reported size %d, expected %d", res.sb.Size, size)
   225				res.err = err
   226				fails = append(fails, res)
   227			default:
   228				err = res.err
   229				fails = append(fails, res)
   230			}
   231		}
   232		for _, res := range fails {
   233			log.Printf("replica: receiving blob %v, %d successes, %d failures; backend %s reported: %v",
   234				br,
   235				nSuccess, len(fails),
   236				sto.replicaPrefixes[res.idx], res.err)
   237		}
   238		return
   239	}
   240	
   241	func (sto *replicaStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
   242		errch := make(chan error, buffered)
   243		removeFrom := func(s blobserver.Storage) {
   244			errch <- s.RemoveBlobs(ctx, blobs)
   245		}
   246		for _, replica := range sto.replicas {
   247			go removeFrom(replica)
   248		}
   249		var reterr error
   250		nSuccess := 0
   251		for range sto.replicas {
   252			if err := <-errch; err != nil {
   253				reterr = err
   254			} else {
   255				nSuccess++
   256			}
   257		}
   258		if nSuccess > 0 {
   259			// TODO: decide on the return value. for now this is best
   260			// effort and we return nil if any of the blobservers said
   261			// success.  maybe a bit weird, though.
   262			return nil
   263		}
   264		return reterr
   265	}
   266	
   267	func (sto *replicaStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
   268		return blobserver.MergedEnumerateStorage(ctx, dest, sto.readReplicas, after, limit)
   269	}
   270	
   271	func (sto *replicaStorage) ResetStorageGeneration() error {
   272		var ret error
   273		n := 0
   274		for _, replica := range sto.replicas {
   275			if g, ok := replica.(blobserver.Generationer); ok {
   276				n++
   277				if err := g.ResetStorageGeneration(); err != nil && ret == nil {
   278					ret = err
   279				}
   280			}
   281		}
   282		if n == 0 {
   283			return errors.New("ResetStorageGeneration not supported")
   284		}
   285		return ret
   286	}
   287	
   288	func (sto *replicaStorage) StorageGeneration() (initTime time.Time, random string, err error) {
   289		var buf bytes.Buffer
   290		n := 0
   291		for _, replica := range sto.replicas {
   292			if g, ok := replica.(blobserver.Generationer); ok {
   293				n++
   294				rt, rrand, rerr := g.StorageGeneration()
   295				if rerr != nil {
   296					err = rerr
   297				} else {
   298					if rt.After(initTime) {
   299						// Returning the max of all initialization times.
   300						// TODO: not sure whether max or min makes more sense.
   301						initTime = rt
   302					}
   303					if buf.Len() != 0 {
   304						buf.WriteByte('/')
   305					}
   306					buf.WriteString(rrand)
   307				}
   308			}
   309		}
   310		if n == 0 {
   311			err = errors.New("No replicas support StorageGeneration")
   312		}
   313		return initTime, buf.String(), err
   314	}
   315	
   316	func (sto *replicaStorage) OpenWholeRef(wholeRef blob.Ref, offset int64) (rc io.ReadCloser, wholeSize int64, err error) {
   317		// TODO: race these? first to respond?
   318		for _, replica := range sto.readReplicas {
   319			if v, ok := replica.(blobserver.WholeRefFetcher); ok {
   320				rc, wholeSize, err = v.OpenWholeRef(wholeRef, offset)
   321				if err == nil {
   322					return
   323				}
   324			}
   325		}
   326		if err == nil {
   327			err = os.ErrNotExist
   328		}
   329		return
   330	}
   331	
   332	func init() {
   333		blobserver.RegisterStorageConstructor("replica", blobserver.StorageConstructor(newFromConfig))
   334	}
Website layout inspired by memcached.
Content by the authors.