Home Download Docs Code Community
     1	/*
     2	Copyright 2011 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	package server
    18	
    19	import (
    20		"bytes"
    21		"context"
    22		"crypto/rand"
    23		"errors"
    24		"fmt"
    25		"html"
    26		"io"
    27		"log"
    28		"net/http"
    29		"os"
    30		"sort"
    31		"strconv"
    32		"strings"
    33		"sync"
    34		"time"
    35	
    36		"go4.org/jsonconfig"
    37		"golang.org/x/net/xsrftoken"
    38		"perkeep.org/pkg/auth"
    39		"perkeep.org/pkg/blob"
    40		"perkeep.org/pkg/blobserver"
    41		"perkeep.org/pkg/constants"
    42		"perkeep.org/pkg/index"
    43		"perkeep.org/pkg/sorted"
    44		"perkeep.org/pkg/types/camtypes"
    45	
    46		"go4.org/syncutil"
    47	)
    48	
    49	const (
    50		maxRecentErrors   = 20
    51		queueSyncInterval = 5 * time.Second
    52	)
    53	
    54	type blobReceiverEnumerator interface {
    55		blobserver.BlobReceiver
    56		blobserver.BlobEnumerator
    57	}
    58	
    59	// The SyncHandler handles async replication in one direction between
    60	// a pair storage targets, a source and target.
    61	//
    62	// SyncHandler is a BlobReceiver but doesn't actually store incoming
    63	// blobs; instead, it records blobs it has received and queues them
    64	// for async replication soon, or whenever it can.
    65	type SyncHandler struct {
    66		// TODO: rate control tunables
    67		fromName, toName string
    68		from             blobserver.Storage
    69		to               blobReceiverEnumerator
    70		queue            sorted.KeyValue
    71		toIndex          bool // whether this sync is from a blob storage to an index
    72		idle             bool // if true, the handler does nothing other than providing the discovery.
    73		copierPoolSize   int
    74	
    75		// wakec wakes up the blob syncer loop when a blob is received.
    76		wakec chan bool
    77	
    78		mu             sync.Mutex // protects following
    79		status         string
    80		copying        map[blob.Ref]*copyStatus // to start time
    81		needCopy       map[blob.Ref]uint32      // blobs needing to be copied. some might be in lastFail too.
    82		lastFail       map[blob.Ref]failDetail  // subset of needCopy that previously failed, and why
    83		bytesRemain    int64                    // sum of needCopy values
    84		recentErrors   []blob.Ref               // up to maxRecentErrors, recent first. valid if still in lastFail.
    85		recentCopyTime time.Time
    86		totalCopies    int64
    87		totalCopyBytes int64
    88		totalErrors    int64
    89		vshards        []string // validation shards. if 0, validation not running
    90		vshardDone     int      // shards already processed (incl. errors)
    91		vshardErrs     []string
    92		vmissing       int64    // missing blobs found during validate
    93		vdestCount     int      // number of blobs seen on dest during validate
    94		vdestBytes     int64    // number of blob bytes seen on dest during validate
    95		vsrcCount      int      // number of blobs seen on src during validate
    96		vsrcBytes      int64    // number of blob bytes seen on src during validate
    97		comparedBlobs  int      // total number of blobs compared by hourly runs
    98		comparedBytes  uint64   // total number of bytes compared by hourly runs
    99		comparedRounds int      // total number of hourly compare runs
   100		compareErrors  []string // all errors encountered by hourly runs
   101		compLastBlob   string   // last blob compared by hourly runs
   102	
   103		// syncLoop tries to send on alarmIdlec each time we've slept for a full
   104		// queueSyncInterval. Initialized as a synchronous chan if we're not an
   105		// idle sync handler, otherwise nil.
   106		alarmIdlec chan struct{}
   107	}
   108	
   109	var (
   110		_ blobserver.Storage       = (*SyncHandler)(nil)
   111		_ blobserver.HandlerIniter = (*SyncHandler)(nil)
   112	)
   113	
   114	func (sh *SyncHandler) String() string {
   115		return fmt.Sprintf("[SyncHandler %v -> %v]", sh.fromName, sh.toName)
   116	}
   117	
   118	func (sh *SyncHandler) fromToString() string {
   119		return fmt.Sprintf("%v -> %v", sh.fromName, sh.toName)
   120	}
   121	
   122	func (sh *SyncHandler) logf(format string, args ...interface{}) {
   123		log.Printf("sync: "+sh.fromToString()+": "+format, args...)
   124	}
   125	
   126	func init() {
   127		blobserver.RegisterHandlerConstructor("sync", newSyncFromConfig)
   128	}
   129	
   130	// TODO: this is is temporary. should delete, or decide when it's on by default (probably always).
   131	// Then need genconfig option to disable it.
   132	var validateOnStartDefault, _ = strconv.ParseBool(os.Getenv("CAMLI_SYNC_VALIDATE"))
   133	
   134	func newSyncFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (http.Handler, error) {
   135		var (
   136			from           = conf.RequiredString("from")
   137			to             = conf.RequiredString("to")
   138			fullSync       = conf.OptionalBool("fullSyncOnStart", false)
   139			blockFullSync  = conf.OptionalBool("blockingFullSyncOnStart", false)
   140			idle           = conf.OptionalBool("idle", false)
   141			queueConf      = conf.OptionalObject("queue")
   142			copierPoolSize = conf.OptionalInt("copierPoolSize", 5)
   143			validate       = conf.OptionalBool("validateOnStart", validateOnStartDefault)
   144			hourlyCompare  = conf.OptionalInt("hourlyCompareBytes", 0)
   145		)
   146		if err := conf.Validate(); err != nil {
   147			return nil, err
   148		}
   149		if idle {
   150			return newIdleSyncHandler(from, to), nil
   151		}
   152		if len(queueConf) == 0 {
   153			return nil, errors.New(`Missing required "queue" object`)
   154		}
   155		q, err := sorted.NewKeyValueMaybeWipe(queueConf)
   156		if err != nil {
   157			return nil, err
   158		}
   159	
   160		isToIndex := false
   161		fromBs, err := ld.GetStorage(from)
   162		if err != nil {
   163			return nil, err
   164		}
   165		toBs, err := ld.GetStorage(to)
   166		if err != nil {
   167			return nil, err
   168		}
   169		if _, ok := fromBs.(*index.Index); !ok {
   170			if _, ok := toBs.(*index.Index); ok {
   171				isToIndex = true
   172			}
   173		}
   174	
   175		sh := newSyncHandler(from, to, fromBs, toBs, q)
   176		sh.toIndex = isToIndex
   177		sh.copierPoolSize = copierPoolSize
   178		if err := sh.readQueueToMemory(); err != nil {
   179			return nil, fmt.Errorf("Error reading sync queue to memory: %v", err)
   180		}
   181	
   182		if fullSync || blockFullSync {
   183			sh.logf("Doing full sync")
   184			didFullSync := make(chan bool, 1)
   185			go func() {
   186				for {
   187					n := sh.runSync("pending blobs queue", sh.enumeratePendingBlobs)
   188					if n > 0 {
   189						sh.logf("Queue sync copied %d blobs", n)
   190						continue
   191					}
   192					break
   193				}
   194				n := sh.runSync("full", blobserverEnumerator(context.TODO(), fromBs))
   195				sh.logf("Full sync copied %d blobs", n)
   196				didFullSync <- true
   197				sh.syncLoop()
   198			}()
   199			if blockFullSync {
   200				sh.logf("Blocking startup, waiting for full sync from %q to %q", from, to)
   201				<-didFullSync
   202				sh.logf("Full sync complete.")
   203			}
   204		} else {
   205			go sh.syncLoop()
   206		}
   207	
   208		if validate {
   209			go sh.startFullValidation()
   210		}
   211	
   212		if hourlyCompare != 0 {
   213			if _, ok := sh.to.(blob.Fetcher); !ok {
   214				return nil, errors.New(`can't specify "hourlyCompareBytes" if destination is not a Fetcher`)
   215			}
   216			go sh.hourlyCompare(uint64(hourlyCompare))
   217		}
   218	
   219		blobserver.GetHub(fromBs).AddReceiveHook(sh.enqueue)
   220		return sh, nil
   221	}
   222	
   223	func (sh *SyncHandler) InitHandler(hl blobserver.FindHandlerByTyper) error {
   224		_, h, err := hl.FindHandlerByType("root")
   225		if err == blobserver.ErrHandlerTypeNotFound {
   226			// It's optional. We register ourselves if it's there.
   227			return nil
   228		}
   229		if err != nil {
   230			return err
   231		}
   232		h.(*RootHandler).registerSyncHandler(sh)
   233		return nil
   234	}
   235	
   236	func newSyncHandler(fromName, toName string,
   237		from blobserver.Storage, to blobReceiverEnumerator,
   238		queue sorted.KeyValue) *SyncHandler {
   239		return &SyncHandler{
   240			copierPoolSize: 5,
   241			from:           from,
   242			to:             to,
   243			fromName:       fromName,
   244			toName:         toName,
   245			queue:          queue,
   246			wakec:          make(chan bool),
   247			status:         "not started",
   248			needCopy:       make(map[blob.Ref]uint32),
   249			lastFail:       make(map[blob.Ref]failDetail),
   250			copying:        make(map[blob.Ref]*copyStatus),
   251			alarmIdlec:     make(chan struct{}),
   252		}
   253	}
   254	
   255	// NewSyncHandler returns a handler that will asynchronously and continuously
   256	// copy blobs from src to dest, if missing on dest.
   257	// Blobs waiting to be copied are stored on pendingQueue. srcName and destName are
   258	// only used for status and debugging messages.
   259	// N.B: blobs should be added to src with a method that notifies the blob hub,
   260	// such as blobserver.Receive.
   261	func NewSyncHandler(srcName, destName string,
   262		src blobserver.Storage, dest blobReceiverEnumerator,
   263		pendingQueue sorted.KeyValue) *SyncHandler {
   264		sh := newSyncHandler(srcName, destName, src, dest, pendingQueue)
   265		go sh.syncLoop()
   266		blobserver.GetHub(sh.from).AddReceiveHook(sh.enqueue)
   267		return sh
   268	}
   269	
   270	// IdleWait waits until the sync handler has finished processing the currently
   271	// queued blobs.
   272	func (sh *SyncHandler) IdleWait() {
   273		if sh.idle {
   274			return
   275		}
   276		<-sh.alarmIdlec
   277	}
   278	
   279	func (sh *SyncHandler) signalIdle() {
   280		select {
   281		case sh.alarmIdlec <- struct{}{}:
   282		default:
   283		}
   284	}
   285	
   286	func newIdleSyncHandler(fromName, toName string) *SyncHandler {
   287		return &SyncHandler{
   288			fromName: fromName,
   289			toName:   toName,
   290			idle:     true,
   291			status:   "disabled",
   292		}
   293	}
   294	
   295	func (sh *SyncHandler) discovery() camtypes.SyncHandlerDiscovery {
   296		return camtypes.SyncHandlerDiscovery{
   297			From:    sh.fromName,
   298			To:      sh.toName,
   299			ToIndex: sh.toIndex,
   300		}
   301	}
   302	
   303	// syncStatus is a snapshot of the current status, for display by the
   304	// status handler (status.go) in both JSON and HTML forms.
   305	type syncStatus struct {
   306		sh *SyncHandler
   307	
   308		From           string `json:"from"`
   309		FromDesc       string `json:"fromDesc"`
   310		To             string `json:"to"`
   311		ToDesc         string `json:"toDesc"`
   312		DestIsIndex    bool   `json:"destIsIndex,omitempty"`
   313		BlobsToCopy    int    `json:"blobsToCopy"`
   314		BytesToCopy    int64  `json:"bytesToCopy"`
   315		LastCopySecAgo int    `json:"lastCopySecondsAgo,omitempty"`
   316	}
   317	
   318	func (sh *SyncHandler) currentStatus() syncStatus {
   319		sh.mu.Lock()
   320		defer sh.mu.Unlock()
   321		ago := 0
   322		if !sh.recentCopyTime.IsZero() {
   323			ago = int(time.Since(sh.recentCopyTime) / time.Second)
   324		}
   325		return syncStatus{
   326			sh:             sh,
   327			From:           sh.fromName,
   328			FromDesc:       storageDesc(sh.from),
   329			To:             sh.toName,
   330			ToDesc:         storageDesc(sh.to),
   331			DestIsIndex:    sh.toIndex,
   332			BlobsToCopy:    len(sh.needCopy),
   333			BytesToCopy:    sh.bytesRemain,
   334			LastCopySecAgo: ago,
   335		}
   336	}
   337	
   338	// readQueueToMemory slurps in the pending queue from disk (or
   339	// wherever) to memory.  Even with millions of blobs, it's not much
   340	// memory. The point of the persistent queue is to survive restarts if
   341	// the "fullSyncOnStart" option is off. With "fullSyncOnStart" set to
   342	// true, this is a little pointless (we'd figure out what's missing
   343	// eventually), but this might save us a few minutes (let us start
   344	// syncing missing blobs a few minutes earlier) since we won't have to
   345	// wait to figure out what the destination is missing.
   346	func (sh *SyncHandler) readQueueToMemory() error {
   347		errc := make(chan error, 1)
   348		blobs := make(chan blob.SizedRef, 16)
   349		intr := make(chan struct{})
   350		defer close(intr)
   351		go func() {
   352			errc <- sh.enumerateQueuedBlobs(blobs, intr)
   353		}()
   354		n := 0
   355		for sb := range blobs {
   356			sh.addBlobToCopy(sb)
   357			n++
   358		}
   359		sh.logf("added %d pending blobs from sync queue to pending list", n)
   360		return <-errc
   361	}
   362	
   363	func (sh *SyncHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
   364		if req.Method == "POST" {
   365			if req.FormValue("mode") == "validate" {
   366				token := req.FormValue("token")
   367				if xsrftoken.Valid(token, auth.Token(), "user", "runFullValidate") {
   368					sh.startFullValidation()
   369					http.Redirect(rw, req, "./", http.StatusFound)
   370					return
   371				}
   372			}
   373			http.Error(rw, "Bad POST request", http.StatusBadRequest)
   374			return
   375		}
   376	
   377		// TODO: remove this lock and instead just call currentStatus,
   378		// and transition to using that here.
   379		sh.mu.Lock()
   380		defer sh.mu.Unlock()
   381		f := func(p string, a ...interface{}) {
   382			fmt.Fprintf(rw, p, a...)
   383		}
   384		now := time.Now()
   385		f("<h1>Sync Status (for %s to %s)</h1>", sh.fromName, sh.toName)
   386		f("<p><b>Current status: </b>%s</p>", html.EscapeString(sh.status))
   387		if sh.idle {
   388			return
   389		}
   390	
   391		f("<h2>Stats:</h2><ul>")
   392		f("<li>Source: %s</li>", html.EscapeString(storageDesc(sh.from)))
   393		f("<li>Target: %s</li>", html.EscapeString(storageDesc(sh.to)))
   394		f("<li>Blobs synced: %d</li>", sh.totalCopies)
   395		f("<li>Bytes synced: %d</li>", sh.totalCopyBytes)
   396		f("<li>Blobs yet to copy: %d</li>", len(sh.needCopy))
   397		f("<li>Bytes yet to copy: %d</li>", sh.bytesRemain)
   398		if !sh.recentCopyTime.IsZero() {
   399			f("<li>Most recent copy: %s (%v ago)</li>", sh.recentCopyTime.Format(time.RFC3339), now.Sub(sh.recentCopyTime))
   400		}
   401		clarification := ""
   402		if len(sh.needCopy) == 0 && sh.totalErrors > 0 {
   403			clarification = "(all since resolved)"
   404		}
   405		f("<li>Previous copy errors: %d %s</li>", sh.totalErrors, clarification)
   406		f("</ul>")
   407	
   408		if sh.comparedRounds > 0 || sh.comparedBlobs > 0 {
   409			f("<h2>Hourly compares</h2><ul>")
   410			f("<li>Hourly rounds: %d</li>", sh.comparedRounds)
   411			f("<li>Compared blobs: %d</li>", sh.comparedBlobs)
   412			f("<li>Compared bytes: %d</li>", sh.comparedBytes)
   413			f("<li>Latest blob: %s</li>", sh.compLastBlob)
   414			f("</ul>")
   415			if len(sh.compareErrors) > 0 {
   416				f("<h3>Compare failures</h3><ul>")
   417				for _, err := range sh.compareErrors {
   418					f("<li><strong>%s</strong></li>", err)
   419				}
   420				f("</ul>")
   421			}
   422		}
   423	
   424		f("<h2>Validation</h2>")
   425		f("<p>Background scan of source and destination to ensure that the destination has everything the source does, or is at least enqueued to sync.</p>")
   426		if len(sh.vshards) == 0 || sh.vshardDone == len(sh.vshards) {
   427			token := xsrftoken.Generate(auth.Token(), "user", "runFullValidate")
   428			f("<form method='POST'><input type='hidden' name='mode' value='validate'><input type='hidden' name='token' value='%s'><input type='submit' value='Start validation'></form>", token)
   429		}
   430		if len(sh.vshards) != 0 {
   431			f("<ul>")
   432			f("<li>Shards processed: %d/%d (%.1f%%)</li>",
   433				sh.vshardDone,
   434				len(sh.vshards),
   435				100*float64(sh.vshardDone)/float64(len(sh.vshards)))
   436			f("<li>Source blobs seen: %d</li>", sh.vsrcCount)
   437			f("<li>Source bytes seen: %d</li>", sh.vsrcBytes)
   438			f("<li>Dest blobs seen: %d</li>", sh.vdestCount)
   439			f("<li>Dest bytes seen: %d</li>", sh.vdestBytes)
   440			f("<li>Blobs found missing &amp; enqueued: %d</li>", sh.vmissing)
   441			if len(sh.vshardErrs) > 0 {
   442				f("<li>Validation errors:<ul>\n")
   443				for _, e := range sh.vshardErrs {
   444					f("  <li>%s</li>\n", html.EscapeString(e))
   445				}
   446				f("</li>\n")
   447			}
   448			f("</ul>")
   449		}
   450	
   451		if len(sh.copying) > 0 {
   452			f("<h2>Currently Copying</h2><ul>")
   453			copying := make([]blob.Ref, 0, len(sh.copying))
   454			for br := range sh.copying {
   455				copying = append(copying, br)
   456			}
   457			sort.Sort(blob.ByRef(copying))
   458			for _, br := range copying {
   459				f("<li>%s</li>\n", sh.copying[br])
   460			}
   461			f("</ul>")
   462		}
   463	
   464		recentErrors := make([]blob.Ref, 0, len(sh.recentErrors))
   465		for _, br := range sh.recentErrors {
   466			if _, ok := sh.needCopy[br]; ok {
   467				// Only show it in the web UI if it's still a problem. Blobs that
   468				// have since succeeded just confused people.
   469				recentErrors = append(recentErrors, br)
   470			}
   471		}
   472		if len(recentErrors) > 0 {
   473			f("<h2>Recent Errors</h2><p>Blobs that haven't successfully copied over yet, and their last errors:</p><ul>")
   474			for _, br := range recentErrors {
   475				fail := sh.lastFail[br]
   476				f("<li>%s: %s: %s</li>\n",
   477					br,
   478					fail.when.Format(time.RFC3339),
   479					html.EscapeString(fail.err.Error()))
   480			}
   481			f("</ul>")
   482		}
   483	}
   484	
   485	func (sh *SyncHandler) setStatusf(s string, args ...interface{}) {
   486		s = time.Now().UTC().Format(time.RFC3339) + ": " + fmt.Sprintf(s, args...)
   487		sh.mu.Lock()
   488		defer sh.mu.Unlock()
   489		sh.status = s
   490	}
   491	
   492	type copyResult struct {
   493		sb  blob.SizedRef
   494		err error
   495	}
   496	
   497	func blobserverEnumerator(ctx context.Context, src blobserver.BlobEnumerator) func(chan<- blob.SizedRef, <-chan struct{}) error {
   498		return func(dst chan<- blob.SizedRef, intr <-chan struct{}) error {
   499			return blobserver.EnumerateAll(ctx, src, func(sb blob.SizedRef) error {
   500				select {
   501				case dst <- sb:
   502				case <-intr:
   503					return errors.New("interrupted")
   504				}
   505				return nil
   506			})
   507		}
   508	}
   509	
   510	// enumeratePendingBlobs yields blobs from the in-memory pending list (needCopy).
   511	// This differs from enumerateQueuedBlobs, which pulls in the on-disk sorted.KeyValue store.
   512	func (sh *SyncHandler) enumeratePendingBlobs(dst chan<- blob.SizedRef, intr <-chan struct{}) error {
   513		defer close(dst)
   514		sh.mu.Lock()
   515		var toSend []blob.SizedRef
   516		{
   517			n := len(sh.needCopy)
   518			const maxBatch = 1000
   519			if n > maxBatch {
   520				n = maxBatch
   521			}
   522			toSend = make([]blob.SizedRef, 0, n)
   523			for br, size := range sh.needCopy {
   524				toSend = append(toSend, blob.SizedRef{Ref: br, Size: size})
   525				if len(toSend) == n {
   526					break
   527				}
   528			}
   529		}
   530		sh.mu.Unlock()
   531		for _, sb := range toSend {
   532			select {
   533			case dst <- sb:
   534			case <-intr:
   535				return nil
   536			}
   537		}
   538		return nil
   539	}
   540	
   541	// enumerateQueuedBlobs yields blobs from the on-disk sorted.KeyValue store.
   542	// This differs from enumeratePendingBlobs, which sends from the in-memory pending list.
   543	func (sh *SyncHandler) enumerateQueuedBlobs(dst chan<- blob.SizedRef, intr <-chan struct{}) error {
   544		defer close(dst)
   545		it := sh.queue.Find("", "")
   546		for it.Next() {
   547			br, ok := blob.Parse(it.Key())
   548			size, err := strconv.ParseUint(it.Value(), 10, 32)
   549			if !ok || err != nil {
   550				sh.logf("ERROR: bogus sync queue entry: %q => %q", it.Key(), it.Value())
   551				continue
   552			}
   553			select {
   554			case dst <- blob.SizedRef{Ref: br, Size: uint32(size)}:
   555			case <-intr:
   556				return it.Close()
   557			}
   558		}
   559		return it.Close()
   560	}
   561	
   562	func (sh *SyncHandler) runSync(syncType string, enumSrc func(chan<- blob.SizedRef, <-chan struct{}) error) int {
   563		enumch := make(chan blob.SizedRef, 8)
   564		errch := make(chan error, 1)
   565		intr := make(chan struct{})
   566		defer close(intr)
   567		go func() { errch <- enumSrc(enumch, intr) }()
   568	
   569		nCopied := 0
   570		toCopy := 0
   571	
   572		workch := make(chan blob.SizedRef, 1000)
   573		resch := make(chan copyResult, 8)
   574	FeedWork:
   575		for sb := range enumch {
   576			if toCopy < sh.copierPoolSize {
   577				go sh.copyWorker(resch, workch)
   578			}
   579			select {
   580			case workch <- sb:
   581				toCopy++
   582			default:
   583				// Buffer full. Enough for this batch. Will get it later.
   584				break FeedWork
   585			}
   586		}
   587		close(workch)
   588		for i := 0; i < toCopy; i++ {
   589			sh.setStatusf("Copying blobs")
   590			res := <-resch
   591			if res.err == nil {
   592				nCopied++
   593			}
   594		}
   595	
   596		if err := <-errch; err != nil {
   597			sh.logf("error enumerating for %v sync: %v", syncType, err)
   598		}
   599		return nCopied
   600	}
   601	
   602	func (sh *SyncHandler) syncLoop() {
   603		for {
   604			t0 := time.Now()
   605	
   606			for sh.runSync(sh.fromName, sh.enumeratePendingBlobs) > 0 {
   607				// Loop, before sleeping.
   608			}
   609			sh.setStatusf("Sleeping briefly before next long poll.")
   610	
   611			d := queueSyncInterval - time.Since(t0)
   612			select {
   613			case <-time.After(d):
   614				sh.signalIdle()
   615			case <-sh.wakec:
   616			}
   617		}
   618	}
   619	
   620	func (sh *SyncHandler) copyWorker(res chan<- copyResult, work <-chan blob.SizedRef) {
   621		for sb := range work {
   622			res <- copyResult{sb, sh.copyBlob(context.TODO(), sb)}
   623		}
   624	}
   625	
   626	func (sh *SyncHandler) copyBlob(ctx context.Context, sb blob.SizedRef) (err error) {
   627		cs := sh.newCopyStatus(sb)
   628		defer func() { cs.setError(err) }()
   629		br := sb.Ref
   630	
   631		sh.mu.Lock()
   632		sh.copying[br] = cs
   633		sh.mu.Unlock()
   634	
   635		if sb.Size > constants.MaxBlobSize {
   636			return fmt.Errorf("blob size %d too large; max blob size is %d", sb.Size, constants.MaxBlobSize)
   637		}
   638	
   639		cs.setStatus(statusFetching)
   640		rc, fromSize, err := sh.from.Fetch(ctx, br)
   641		if err != nil {
   642			return fmt.Errorf("source fetch: %v", err)
   643		}
   644		if fromSize != sb.Size {
   645			rc.Close()
   646			return fmt.Errorf("source fetch size mismatch: get=%d, enumerate=%d", fromSize, sb.Size)
   647		}
   648	
   649		buf := make([]byte, fromSize)
   650		hash := br.Hash()
   651		cs.setStatus(statusReading)
   652		n, err := io.ReadFull(io.TeeReader(rc,
   653			io.MultiWriter(
   654				incrWriter{cs, &cs.nread},
   655				hash,
   656			)), buf)
   657		rc.Close()
   658		if err != nil {
   659			return fmt.Errorf("Read error after %d/%d bytes: %v", n, fromSize, err)
   660		}
   661		if !br.HashMatches(hash) {
   662			return fmt.Errorf("Read data has unexpected digest %x", hash.Sum(nil))
   663		}
   664	
   665		cs.setStatus(statusWriting)
   666		newsb, err := sh.to.ReceiveBlob(ctx, br, io.TeeReader(bytes.NewReader(buf), incrWriter{cs, &cs.nwrite}))
   667		if err != nil {
   668			return fmt.Errorf("dest write: %v", err)
   669		}
   670		if newsb.Size != sb.Size {
   671			return fmt.Errorf("write size mismatch: source_read=%d but dest_write=%d", sb.Size, newsb.Size)
   672		}
   673		return nil
   674	}
   675	
   676	func (sh *SyncHandler) ReceiveBlob(ctx context.Context, br blob.Ref, r io.Reader) (sb blob.SizedRef, err error) {
   677		// TODO: use ctx?
   678		n, err := io.Copy(io.Discard, r)
   679		if err != nil {
   680			return
   681		}
   682		sb = blob.SizedRef{Ref: br, Size: uint32(n)}
   683		return sb, sh.enqueue(sb)
   684	}
   685	
   686	// addBlobToCopy adds a blob to copy to memory (not to disk: that's enqueue).
   687	// It returns true if it was added, or false if it was a duplicate.
   688	func (sh *SyncHandler) addBlobToCopy(sb blob.SizedRef) bool {
   689		sh.mu.Lock()
   690		defer sh.mu.Unlock()
   691		if _, dup := sh.needCopy[sb.Ref]; dup {
   692			return false
   693		}
   694	
   695		sh.needCopy[sb.Ref] = sb.Size
   696		sh.bytesRemain += int64(sb.Size)
   697	
   698		// Non-blocking send to wake up looping goroutine if it's
   699		// sleeping...
   700		select {
   701		case sh.wakec <- true:
   702		default:
   703		}
   704		return true
   705	}
   706	
   707	func (sh *SyncHandler) enqueue(sb blob.SizedRef) error {
   708		if !sh.addBlobToCopy(sb) {
   709			// Dup
   710			return nil
   711		}
   712		// TODO: include current time in encoded value, to attempt to
   713		// do in-order delivery to remote side later? Possible
   714		// friendly optimization later. Might help peer's indexer have
   715		// less missing deps.
   716		if err := sh.queue.Set(sb.Ref.String(), fmt.Sprint(sb.Size)); err != nil {
   717			return err
   718		}
   719		return nil
   720	}
   721	
   722	func (sh *SyncHandler) startFullValidation() {
   723		sh.mu.Lock()
   724		if sh.vshardDone == len(sh.vshards) {
   725			sh.vshards, sh.vshardErrs = nil, nil
   726			sh.vshardDone, sh.vmissing = 0, 0
   727			sh.vdestCount, sh.vdestBytes = 0, 0
   728			sh.vsrcCount, sh.vsrcBytes = 0, 0
   729		}
   730		if len(sh.vshards) != 0 {
   731			sh.mu.Unlock()
   732			return
   733		}
   734		sh.mu.Unlock()
   735	
   736		sh.logf("running full validation; determining validation shards...")
   737		shards := sh.shardPrefixes()
   738	
   739		sh.mu.Lock()
   740		if len(sh.vshards) != 0 {
   741			sh.mu.Unlock()
   742			return
   743		}
   744		sh.vshards = shards
   745		sh.mu.Unlock()
   746	
   747		go sh.runFullValidation()
   748	}
   749	
   750	func (sh *SyncHandler) runFullValidation() {
   751		var wg sync.WaitGroup
   752	
   753		sh.mu.Lock()
   754		shards := sh.vshards
   755		wg.Add(len(shards))
   756		sh.mu.Unlock()
   757	
   758		sh.logf("full validation beginning with %d shards...", len(shards))
   759	
   760		const maxShardWorkers = 30 // arbitrary
   761		gate := syncutil.NewGate(maxShardWorkers)
   762	
   763		for _, pfx := range shards {
   764			pfx := pfx
   765			gate.Start()
   766			go func() {
   767				defer wg.Done()
   768				defer gate.Done()
   769				sh.validateShardPrefix(pfx)
   770			}()
   771		}
   772		wg.Wait()
   773		sh.logf("validation complete")
   774	}
   775	
   776	func (sh *SyncHandler) validateShardPrefix(pfx string) (err error) {
   777		defer func() {
   778			sh.mu.Lock()
   779			if err != nil {
   780				errs := fmt.Sprintf("Failed to validate prefix %s: %v", pfx, err)
   781				sh.logf("%s", errs)
   782				sh.vshardErrs = append(sh.vshardErrs, errs)
   783			}
   784			sh.vshardDone++
   785			sh.mu.Unlock()
   786		}()
   787		ctx, cancel := context.WithCancel(context.TODO())
   788		defer cancel()
   789		src, serrc := sh.startValidatePrefix(ctx, pfx, false)
   790		dst, derrc := sh.startValidatePrefix(ctx, pfx, true)
   791		srcErr := &chanError{
   792			C: serrc,
   793			Wrap: func(err error) error {
   794				return fmt.Errorf("Error enumerating source %s for validating shard %s: %v", sh.fromName, pfx, err)
   795			},
   796		}
   797		dstErr := &chanError{
   798			C: derrc,
   799			Wrap: func(err error) error {
   800				return fmt.Errorf("Error enumerating target %s for validating shard %s: %v", sh.toName, pfx, err)
   801			},
   802		}
   803	
   804		missingc := make(chan blob.SizedRef, 8)
   805		go blobserver.ListMissingDestinationBlobs(missingc, func(blob.Ref) {}, src, dst)
   806	
   807		var missing []blob.SizedRef
   808		for sb := range missingc {
   809			missing = append(missing, sb)
   810		}
   811	
   812		if err := srcErr.Get(); err != nil {
   813			return err
   814		}
   815		if err := dstErr.Get(); err != nil {
   816			return err
   817		}
   818	
   819		for _, sb := range missing {
   820			if enqErr := sh.enqueue(sb); enqErr != nil {
   821				if err == nil {
   822					err = enqErr
   823				}
   824			} else {
   825				sh.mu.Lock()
   826				sh.vmissing++
   827				sh.mu.Unlock()
   828			}
   829		}
   830		return err
   831	}
   832	
   833	var errNotPrefix = errors.New("sentinel error: hit blob into the next shard")
   834	
   835	// doDest is false for source and true for dest.
   836	func (sh *SyncHandler) startValidatePrefix(ctx context.Context, pfx string, doDest bool) (<-chan blob.SizedRef, <-chan error) {
   837		var e blobserver.BlobEnumerator
   838		if doDest {
   839			e = sh.to
   840		} else {
   841			e = sh.from
   842		}
   843		c := make(chan blob.SizedRef, 64)
   844		errc := make(chan error, 1)
   845		go func() {
   846			defer close(c)
   847			var last string // last blobref seen; to double check storage's enumeration works correctly.
   848			err := blobserver.EnumerateAllFrom(ctx, e, pfx, func(sb blob.SizedRef) error {
   849				// Just double-check that the storage target is returning sorted results correctly.
   850				brStr := sb.Ref.String()
   851				if brStr < pfx {
   852					log.Fatalf("Storage target %T enumerate not behaving: %q < requested prefix %q", e, brStr, pfx)
   853				}
   854				if last != "" && last >= brStr {
   855					log.Fatalf("Storage target %T enumerate not behaving: previous %q >= current %q", e, last, brStr)
   856				}
   857				last = brStr
   858	
   859				// TODO: could add a more efficient method on blob.Ref to do this,
   860				// that doesn't involve call String().
   861				if !strings.HasPrefix(brStr, pfx) {
   862					return errNotPrefix
   863				}
   864				select {
   865				case c <- sb:
   866					sh.mu.Lock()
   867					if doDest {
   868						sh.vdestCount++
   869						sh.vdestBytes += int64(sb.Size)
   870					} else {
   871						sh.vsrcCount++
   872						sh.vsrcBytes += int64(sb.Size)
   873					}
   874					sh.mu.Unlock()
   875					return nil
   876				case <-ctx.Done():
   877					return ctx.Err()
   878				}
   879			})
   880			if err == errNotPrefix {
   881				err = nil
   882			}
   883			if err != nil {
   884				// Send a zero value to shut down ListMissingDestinationBlobs.
   885				c <- blob.SizedRef{}
   886			}
   887			errc <- err
   888		}()
   889		return c, errc
   890	}
   891	
   892	func (sh *SyncHandler) shardPrefixes() []string {
   893		var pfx []string
   894		// TODO(bradfitz): do limit=1 enumerates against sh.from and sh.to with varying
   895		// "after" values to determine all the blobref types on both sides.
   896		// For now, be lazy and assume only sha1 and sha224:
   897		for i := 0; i < 256; i++ {
   898			pfx = append(pfx, fmt.Sprintf("sha1-%02x", i))
   899			pfx = append(pfx, fmt.Sprintf("sha224-%02x", i))
   900		}
   901		return pfx
   902	}
   903	
   904	func (sh *SyncHandler) newCopyStatus(sb blob.SizedRef) *copyStatus {
   905		now := time.Now()
   906		return &copyStatus{
   907			sh:    sh,
   908			sb:    sb,
   909			state: statusStarting,
   910			start: now,
   911			t:     now,
   912		}
   913	}
   914	
   915	// copyStatus is an in-progress copy.
   916	type copyStatus struct {
   917		sh    *SyncHandler
   918		sb    blob.SizedRef
   919		start time.Time
   920	
   921		mu     sync.Mutex
   922		state  string    // one of statusFoo, below
   923		t      time.Time // last status update time
   924		nread  uint32
   925		nwrite uint32
   926	}
   927	
   928	const (
   929		statusStarting = "starting"
   930		statusFetching = "fetching source"
   931		statusReading  = "reading"
   932		statusWriting  = "writing"
   933	)
   934	
   935	func (cs *copyStatus) setStatus(s string) {
   936		now := time.Now()
   937		cs.mu.Lock()
   938		defer cs.mu.Unlock()
   939		cs.state = s
   940		cs.t = now
   941	}
   942	
   943	func (cs *copyStatus) setError(err error) {
   944		now := time.Now()
   945		sh := cs.sh
   946		br := cs.sb.Ref
   947		if err == nil {
   948			// This is somewhat slow, so do it before we acquire the lock.
   949			// The queue is thread-safe.
   950			if derr := sh.queue.Delete(br.String()); derr != nil {
   951				sh.logf("queue delete of %v error: %v", cs.sb.Ref, derr)
   952			}
   953		}
   954	
   955		sh.mu.Lock()
   956		defer sh.mu.Unlock()
   957		if _, needCopy := sh.needCopy[br]; !needCopy {
   958			sh.logf("IGNORING DUPLICATE UPLOAD of %v = %v", br, err)
   959			return
   960		}
   961		delete(sh.copying, br)
   962		if err == nil {
   963			delete(sh.needCopy, br)
   964			delete(sh.lastFail, br)
   965			sh.recentCopyTime = now
   966			sh.totalCopies++
   967			sh.totalCopyBytes += int64(cs.sb.Size)
   968			sh.bytesRemain -= int64(cs.sb.Size)
   969			return
   970		}
   971	
   972		sh.totalErrors++
   973		sh.logf("error copying %v: %v", br, err)
   974		sh.lastFail[br] = failDetail{
   975			when: now,
   976			err:  err,
   977		}
   978	
   979		// Kinda lame. TODO: use a ring buffer or container/list instead.
   980		if len(sh.recentErrors) == maxRecentErrors {
   981			copy(sh.recentErrors, sh.recentErrors[1:])
   982			sh.recentErrors = sh.recentErrors[:maxRecentErrors-1]
   983		}
   984		sh.recentErrors = append(sh.recentErrors, br)
   985	}
   986	
   987	func (cs *copyStatus) String() string {
   988		var buf bytes.Buffer
   989		now := time.Now()
   990		buf.WriteString(cs.sb.Ref.String())
   991		buf.WriteString(": ")
   992	
   993		cs.mu.Lock()
   994		defer cs.mu.Unlock()
   995		sinceStart := now.Sub(cs.start)
   996		sinceLast := now.Sub(cs.t)
   997	
   998		switch cs.state {
   999		case statusReading:
  1000			buf.WriteString(cs.state)
  1001			fmt.Fprintf(&buf, " (%d/%dB)", cs.nread, cs.sb.Size)
  1002		case statusWriting:
  1003			if cs.nwrite == cs.sb.Size {
  1004				buf.WriteString("wrote all, waiting ack")
  1005			} else {
  1006				buf.WriteString(cs.state)
  1007				fmt.Fprintf(&buf, " (%d/%dB)", cs.nwrite, cs.sb.Size)
  1008			}
  1009		default:
  1010			buf.WriteString(cs.state)
  1011	
  1012		}
  1013		if sinceLast > 5*time.Second {
  1014			fmt.Fprintf(&buf, ", last change %v ago (total elapsed %v)", sinceLast, sinceStart)
  1015		}
  1016		return buf.String()
  1017	}
  1018	
  1019	type failDetail struct {
  1020		when time.Time
  1021		err  error
  1022	}
  1023	
  1024	// incrWriter is an io.Writer that locks mu and increments *n.
  1025	type incrWriter struct {
  1026		cs *copyStatus
  1027		n  *uint32
  1028	}
  1029	
  1030	func (w incrWriter) Write(p []byte) (n int, err error) {
  1031		w.cs.mu.Lock()
  1032		*w.n += uint32(len(p))
  1033		w.cs.t = time.Now()
  1034		w.cs.mu.Unlock()
  1035		return len(p), nil
  1036	}
  1037	
  1038	func storageDesc(v interface{}) string {
  1039		if s, ok := v.(fmt.Stringer); ok {
  1040			return s.String()
  1041		}
  1042		return fmt.Sprintf("%T", v)
  1043	}
  1044	
  1045	// TODO(bradfitz): implement these? what do they mean? possibilities:
  1046	// a) proxy to sh.from
  1047	// b) proxy to sh.to
  1048	// c) merge intersection of sh.from, sh.to, and sh.queue: that is, a blob this pair
  1049	//    currently or eventually will have. The only missing blob would be one that
  1050	//    sh.from has, sh.to doesn't have, and isn't in the queue to be replicated.
  1051	//
  1052	// For now, don't implement them. Wait until we need them.
  1053	
  1054	func (sh *SyncHandler) Fetch(context.Context, blob.Ref) (file io.ReadCloser, size uint32, err error) {
  1055		panic("unimplemented blobserver.Fetch called")
  1056	}
  1057	
  1058	func (sh *SyncHandler) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
  1059		sh.logf("unexpected StatBlobs call")
  1060		return nil
  1061	}
  1062	
  1063	func (sh *SyncHandler) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
  1064		defer close(dest)
  1065		sh.logf("unexpected EnumerateBlobs call")
  1066		return nil
  1067	}
  1068	
  1069	func (sh *SyncHandler) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
  1070		panic("unimplemented RemoveBlobs")
  1071	}
  1072	
  1073	var errStopEnumerating = errors.New("sentinel error: reached the hourly compare quota")
  1074	
  1075	// Every hour, hourlyCompare picks blob names from a random point in the source,
  1076	// downloads up to hourlyBytes from the destination, and verifies them.
  1077	func (sh *SyncHandler) hourlyCompare(hourlyBytes uint64) {
  1078		ctx := context.TODO()
  1079		ticker := time.NewTicker(time.Hour).C
  1080		for {
  1081			content := make([]byte, 16)
  1082			if _, err := rand.Read(content); err != nil {
  1083				panic(err)
  1084			}
  1085			after := blob.RefFromBytes(content).String()
  1086			var roundBytes uint64
  1087			var roundBlobs int
  1088			err := blobserver.EnumerateAllFrom(ctx, sh.from, after, func(sr blob.SizedRef) error {
  1089				sh.mu.Lock()
  1090				if _, ok := sh.needCopy[sr.Ref]; ok {
  1091					sh.mu.Unlock()
  1092					return nil // skip blobs in the copy queue
  1093				}
  1094				sh.mu.Unlock()
  1095	
  1096				if roundBytes+uint64(sr.Size) > hourlyBytes {
  1097					return errStopEnumerating
  1098				}
  1099				blob, size, err := sh.to.(blob.Fetcher).Fetch(ctx, sr.Ref)
  1100				if err != nil {
  1101					return fmt.Errorf("error fetching %s: %v", sr.Ref, err)
  1102				}
  1103				if size != sr.Size {
  1104					return fmt.Errorf("%s: expected size %d, got %d", sr.Ref, sr.Size, size)
  1105				}
  1106				h := sr.Ref.Hash()
  1107				if _, err := io.Copy(h, blob); err != nil {
  1108					return fmt.Errorf("error while reading %s: %v", sr.Ref, err)
  1109				}
  1110				if !sr.HashMatches(h) {
  1111					return fmt.Errorf("expected %s, got %x", sr.Ref, h.Sum(nil))
  1112				}
  1113	
  1114				sh.mu.Lock()
  1115				sh.comparedBlobs++
  1116				sh.comparedBytes += uint64(size)
  1117				sh.compLastBlob = sr.Ref.String()
  1118				sh.mu.Unlock()
  1119				roundBlobs++
  1120				roundBytes += uint64(size)
  1121				return nil
  1122			})
  1123			sh.mu.Lock()
  1124			if err != nil && err != errStopEnumerating {
  1125				sh.compareErrors = append(sh.compareErrors, fmt.Sprintf("%s %v", time.Now(), err))
  1126				sh.logf("!! hourly compare error !!: %v", err)
  1127			}
  1128			sh.comparedRounds++
  1129			sh.mu.Unlock()
  1130			sh.logf("compared %d blobs (%d bytes)", roundBlobs, roundBytes)
  1131			<-ticker
  1132		}
  1133	}
  1134	
  1135	// chanError is a Future around an incoming error channel of one item.
  1136	// It can also wrap its error in something more descriptive.
  1137	type chanError struct {
  1138		C        <-chan error
  1139		Wrap     func(error) error // optional
  1140		err      error
  1141		received bool
  1142	}
  1143	
  1144	func (ce *chanError) Set(err error) {
  1145		if ce.Wrap != nil && err != nil {
  1146			err = ce.Wrap(err)
  1147		}
  1148		ce.err = err
  1149		ce.received = true
  1150	}
  1151	
  1152	func (ce *chanError) Get() error {
  1153		if ce.received {
  1154			return ce.err
  1155		}
  1156		ce.Set(<-ce.C)
  1157		return ce.err
  1158	}
Website layout inspired by memcached.
Content by the authors.