1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package server
18
19 import (
20 "bytes"
21 "context"
22 "crypto/rand"
23 "errors"
24 "fmt"
25 "html"
26 "io"
27 "log"
28 "net/http"
29 "os"
30 "sort"
31 "strconv"
32 "strings"
33 "sync"
34 "time"
35
36 "go4.org/jsonconfig"
37 "golang.org/x/net/xsrftoken"
38 "perkeep.org/pkg/auth"
39 "perkeep.org/pkg/blob"
40 "perkeep.org/pkg/blobserver"
41 "perkeep.org/pkg/constants"
42 "perkeep.org/pkg/index"
43 "perkeep.org/pkg/sorted"
44 "perkeep.org/pkg/types/camtypes"
45
46 "go4.org/syncutil"
47 )
48
49 const (
50 maxRecentErrors = 20
51 queueSyncInterval = 5 * time.Second
52 )
53
54 type blobReceiverEnumerator interface {
55 blobserver.BlobReceiver
56 blobserver.BlobEnumerator
57 }
58
59
60
61
62
63
64
65 type SyncHandler struct {
66
67 fromName, toName string
68 from blobserver.Storage
69 to blobReceiverEnumerator
70 queue sorted.KeyValue
71 toIndex bool
72 idle bool
73 copierPoolSize int
74
75
76 wakec chan bool
77
78 mu sync.Mutex
79 status string
80 copying map[blob.Ref]*copyStatus
81 needCopy map[blob.Ref]uint32
82 lastFail map[blob.Ref]failDetail
83 bytesRemain int64
84 recentErrors []blob.Ref
85 recentCopyTime time.Time
86 totalCopies int64
87 totalCopyBytes int64
88 totalErrors int64
89 vshards []string
90 vshardDone int
91 vshardErrs []string
92 vmissing int64
93 vdestCount int
94 vdestBytes int64
95 vsrcCount int
96 vsrcBytes int64
97 comparedBlobs int
98 comparedBytes uint64
99 comparedRounds int
100 compareErrors []string
101 compLastBlob string
102
103
104
105
106 alarmIdlec chan struct{}
107 }
108
109 var (
110 _ blobserver.Storage = (*SyncHandler)(nil)
111 _ blobserver.HandlerIniter = (*SyncHandler)(nil)
112 )
113
114 func (sh *SyncHandler) String() string {
115 return fmt.Sprintf("[SyncHandler %v -> %v]", sh.fromName, sh.toName)
116 }
117
118 func (sh *SyncHandler) fromToString() string {
119 return fmt.Sprintf("%v -> %v", sh.fromName, sh.toName)
120 }
121
122 func (sh *SyncHandler) logf(format string, args ...interface{}) {
123 log.Printf("sync: "+sh.fromToString()+": "+format, args...)
124 }
125
126 func init() {
127 blobserver.RegisterHandlerConstructor("sync", newSyncFromConfig)
128 }
129
130
131
132 var validateOnStartDefault, _ = strconv.ParseBool(os.Getenv("CAMLI_SYNC_VALIDATE"))
133
134 func newSyncFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (http.Handler, error) {
135 var (
136 from = conf.RequiredString("from")
137 to = conf.RequiredString("to")
138 fullSync = conf.OptionalBool("fullSyncOnStart", false)
139 blockFullSync = conf.OptionalBool("blockingFullSyncOnStart", false)
140 idle = conf.OptionalBool("idle", false)
141 queueConf = conf.OptionalObject("queue")
142 copierPoolSize = conf.OptionalInt("copierPoolSize", 5)
143 validate = conf.OptionalBool("validateOnStart", validateOnStartDefault)
144 hourlyCompare = conf.OptionalInt("hourlyCompareBytes", 0)
145 )
146 if err := conf.Validate(); err != nil {
147 return nil, err
148 }
149 if idle {
150 return newIdleSyncHandler(from, to), nil
151 }
152 if len(queueConf) == 0 {
153 return nil, errors.New(`Missing required "queue" object`)
154 }
155 q, err := sorted.NewKeyValueMaybeWipe(queueConf)
156 if err != nil {
157 return nil, err
158 }
159
160 isToIndex := false
161 fromBs, err := ld.GetStorage(from)
162 if err != nil {
163 return nil, err
164 }
165 toBs, err := ld.GetStorage(to)
166 if err != nil {
167 return nil, err
168 }
169 if _, ok := fromBs.(*index.Index); !ok {
170 if _, ok := toBs.(*index.Index); ok {
171 isToIndex = true
172 }
173 }
174
175 sh := newSyncHandler(from, to, fromBs, toBs, q)
176 sh.toIndex = isToIndex
177 sh.copierPoolSize = copierPoolSize
178 if err := sh.readQueueToMemory(); err != nil {
179 return nil, fmt.Errorf("Error reading sync queue to memory: %v", err)
180 }
181
182 if fullSync || blockFullSync {
183 sh.logf("Doing full sync")
184 didFullSync := make(chan bool, 1)
185 go func() {
186 for {
187 n := sh.runSync("pending blobs queue", sh.enumeratePendingBlobs)
188 if n > 0 {
189 sh.logf("Queue sync copied %d blobs", n)
190 continue
191 }
192 break
193 }
194 n := sh.runSync("full", blobserverEnumerator(context.TODO(), fromBs))
195 sh.logf("Full sync copied %d blobs", n)
196 didFullSync <- true
197 sh.syncLoop()
198 }()
199 if blockFullSync {
200 sh.logf("Blocking startup, waiting for full sync from %q to %q", from, to)
201 <-didFullSync
202 sh.logf("Full sync complete.")
203 }
204 } else {
205 go sh.syncLoop()
206 }
207
208 if validate {
209 go sh.startFullValidation()
210 }
211
212 if hourlyCompare != 0 {
213 if _, ok := sh.to.(blob.Fetcher); !ok {
214 return nil, errors.New(`can't specify "hourlyCompareBytes" if destination is not a Fetcher`)
215 }
216 go sh.hourlyCompare(uint64(hourlyCompare))
217 }
218
219 blobserver.GetHub(fromBs).AddReceiveHook(sh.enqueue)
220 return sh, nil
221 }
222
223 func (sh *SyncHandler) InitHandler(hl blobserver.FindHandlerByTyper) error {
224 _, h, err := hl.FindHandlerByType("root")
225 if err == blobserver.ErrHandlerTypeNotFound {
226
227 return nil
228 }
229 if err != nil {
230 return err
231 }
232 h.(*RootHandler).registerSyncHandler(sh)
233 return nil
234 }
235
236 func newSyncHandler(fromName, toName string,
237 from blobserver.Storage, to blobReceiverEnumerator,
238 queue sorted.KeyValue) *SyncHandler {
239 return &SyncHandler{
240 copierPoolSize: 5,
241 from: from,
242 to: to,
243 fromName: fromName,
244 toName: toName,
245 queue: queue,
246 wakec: make(chan bool),
247 status: "not started",
248 needCopy: make(map[blob.Ref]uint32),
249 lastFail: make(map[blob.Ref]failDetail),
250 copying: make(map[blob.Ref]*copyStatus),
251 alarmIdlec: make(chan struct{}),
252 }
253 }
254
255
256
257
258
259
260
261 func NewSyncHandler(srcName, destName string,
262 src blobserver.Storage, dest blobReceiverEnumerator,
263 pendingQueue sorted.KeyValue) *SyncHandler {
264 sh := newSyncHandler(srcName, destName, src, dest, pendingQueue)
265 go sh.syncLoop()
266 blobserver.GetHub(sh.from).AddReceiveHook(sh.enqueue)
267 return sh
268 }
269
270
271
272 func (sh *SyncHandler) IdleWait() {
273 if sh.idle {
274 return
275 }
276 <-sh.alarmIdlec
277 }
278
279 func (sh *SyncHandler) signalIdle() {
280 select {
281 case sh.alarmIdlec <- struct{}{}:
282 default:
283 }
284 }
285
286 func newIdleSyncHandler(fromName, toName string) *SyncHandler {
287 return &SyncHandler{
288 fromName: fromName,
289 toName: toName,
290 idle: true,
291 status: "disabled",
292 }
293 }
294
295 func (sh *SyncHandler) discovery() camtypes.SyncHandlerDiscovery {
296 return camtypes.SyncHandlerDiscovery{
297 From: sh.fromName,
298 To: sh.toName,
299 ToIndex: sh.toIndex,
300 }
301 }
302
303
304
305 type syncStatus struct {
306 sh *SyncHandler
307
308 From string `json:"from"`
309 FromDesc string `json:"fromDesc"`
310 To string `json:"to"`
311 ToDesc string `json:"toDesc"`
312 DestIsIndex bool `json:"destIsIndex,omitempty"`
313 BlobsToCopy int `json:"blobsToCopy"`
314 BytesToCopy int64 `json:"bytesToCopy"`
315 LastCopySecAgo int `json:"lastCopySecondsAgo,omitempty"`
316 }
317
318 func (sh *SyncHandler) currentStatus() syncStatus {
319 sh.mu.Lock()
320 defer sh.mu.Unlock()
321 ago := 0
322 if !sh.recentCopyTime.IsZero() {
323 ago = int(time.Since(sh.recentCopyTime) / time.Second)
324 }
325 return syncStatus{
326 sh: sh,
327 From: sh.fromName,
328 FromDesc: storageDesc(sh.from),
329 To: sh.toName,
330 ToDesc: storageDesc(sh.to),
331 DestIsIndex: sh.toIndex,
332 BlobsToCopy: len(sh.needCopy),
333 BytesToCopy: sh.bytesRemain,
334 LastCopySecAgo: ago,
335 }
336 }
337
338
339
340
341
342
343
344
345
346 func (sh *SyncHandler) readQueueToMemory() error {
347 errc := make(chan error, 1)
348 blobs := make(chan blob.SizedRef, 16)
349 intr := make(chan struct{})
350 defer close(intr)
351 go func() {
352 errc <- sh.enumerateQueuedBlobs(blobs, intr)
353 }()
354 n := 0
355 for sb := range blobs {
356 sh.addBlobToCopy(sb)
357 n++
358 }
359 sh.logf("added %d pending blobs from sync queue to pending list", n)
360 return <-errc
361 }
362
363 func (sh *SyncHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
364 if req.Method == "POST" {
365 if req.FormValue("mode") == "validate" {
366 token := req.FormValue("token")
367 if xsrftoken.Valid(token, auth.Token(), "user", "runFullValidate") {
368 sh.startFullValidation()
369 http.Redirect(rw, req, "./", http.StatusFound)
370 return
371 }
372 }
373 http.Error(rw, "Bad POST request", http.StatusBadRequest)
374 return
375 }
376
377
378
379 sh.mu.Lock()
380 defer sh.mu.Unlock()
381 f := func(p string, a ...interface{}) {
382 fmt.Fprintf(rw, p, a...)
383 }
384 now := time.Now()
385 f("<h1>Sync Status (for %s to %s)</h1>", sh.fromName, sh.toName)
386 f("<p><b>Current status: </b>%s</p>", html.EscapeString(sh.status))
387 if sh.idle {
388 return
389 }
390
391 f("<h2>Stats:</h2><ul>")
392 f("<li>Source: %s</li>", html.EscapeString(storageDesc(sh.from)))
393 f("<li>Target: %s</li>", html.EscapeString(storageDesc(sh.to)))
394 f("<li>Blobs synced: %d</li>", sh.totalCopies)
395 f("<li>Bytes synced: %d</li>", sh.totalCopyBytes)
396 f("<li>Blobs yet to copy: %d</li>", len(sh.needCopy))
397 f("<li>Bytes yet to copy: %d</li>", sh.bytesRemain)
398 if !sh.recentCopyTime.IsZero() {
399 f("<li>Most recent copy: %s (%v ago)</li>", sh.recentCopyTime.Format(time.RFC3339), now.Sub(sh.recentCopyTime))
400 }
401 clarification := ""
402 if len(sh.needCopy) == 0 && sh.totalErrors > 0 {
403 clarification = "(all since resolved)"
404 }
405 f("<li>Previous copy errors: %d %s</li>", sh.totalErrors, clarification)
406 f("</ul>")
407
408 if sh.comparedRounds > 0 || sh.comparedBlobs > 0 {
409 f("<h2>Hourly compares</h2><ul>")
410 f("<li>Hourly rounds: %d</li>", sh.comparedRounds)
411 f("<li>Compared blobs: %d</li>", sh.comparedBlobs)
412 f("<li>Compared bytes: %d</li>", sh.comparedBytes)
413 f("<li>Latest blob: %s</li>", sh.compLastBlob)
414 f("</ul>")
415 if len(sh.compareErrors) > 0 {
416 f("<h3>Compare failures</h3><ul>")
417 for _, err := range sh.compareErrors {
418 f("<li><strong>%s</strong></li>", err)
419 }
420 f("</ul>")
421 }
422 }
423
424 f("<h2>Validation</h2>")
425 f("<p>Background scan of source and destination to ensure that the destination has everything the source does, or is at least enqueued to sync.</p>")
426 if len(sh.vshards) == 0 || sh.vshardDone == len(sh.vshards) {
427 token := xsrftoken.Generate(auth.Token(), "user", "runFullValidate")
428 f("<form method='POST'><input type='hidden' name='mode' value='validate'><input type='hidden' name='token' value='%s'><input type='submit' value='Start validation'></form>", token)
429 }
430 if len(sh.vshards) != 0 {
431 f("<ul>")
432 f("<li>Shards processed: %d/%d (%.1f%%)</li>",
433 sh.vshardDone,
434 len(sh.vshards),
435 100*float64(sh.vshardDone)/float64(len(sh.vshards)))
436 f("<li>Source blobs seen: %d</li>", sh.vsrcCount)
437 f("<li>Source bytes seen: %d</li>", sh.vsrcBytes)
438 f("<li>Dest blobs seen: %d</li>", sh.vdestCount)
439 f("<li>Dest bytes seen: %d</li>", sh.vdestBytes)
440 f("<li>Blobs found missing & enqueued: %d</li>", sh.vmissing)
441 if len(sh.vshardErrs) > 0 {
442 f("<li>Validation errors:<ul>\n")
443 for _, e := range sh.vshardErrs {
444 f(" <li>%s</li>\n", html.EscapeString(e))
445 }
446 f("</li>\n")
447 }
448 f("</ul>")
449 }
450
451 if len(sh.copying) > 0 {
452 f("<h2>Currently Copying</h2><ul>")
453 copying := make([]blob.Ref, 0, len(sh.copying))
454 for br := range sh.copying {
455 copying = append(copying, br)
456 }
457 sort.Sort(blob.ByRef(copying))
458 for _, br := range copying {
459 f("<li>%s</li>\n", sh.copying[br])
460 }
461 f("</ul>")
462 }
463
464 recentErrors := make([]blob.Ref, 0, len(sh.recentErrors))
465 for _, br := range sh.recentErrors {
466 if _, ok := sh.needCopy[br]; ok {
467
468
469 recentErrors = append(recentErrors, br)
470 }
471 }
472 if len(recentErrors) > 0 {
473 f("<h2>Recent Errors</h2><p>Blobs that haven't successfully copied over yet, and their last errors:</p><ul>")
474 for _, br := range recentErrors {
475 fail := sh.lastFail[br]
476 f("<li>%s: %s: %s</li>\n",
477 br,
478 fail.when.Format(time.RFC3339),
479 html.EscapeString(fail.err.Error()))
480 }
481 f("</ul>")
482 }
483 }
484
485 func (sh *SyncHandler) setStatusf(s string, args ...interface{}) {
486 s = time.Now().UTC().Format(time.RFC3339) + ": " + fmt.Sprintf(s, args...)
487 sh.mu.Lock()
488 defer sh.mu.Unlock()
489 sh.status = s
490 }
491
492 type copyResult struct {
493 sb blob.SizedRef
494 err error
495 }
496
497 func blobserverEnumerator(ctx context.Context, src blobserver.BlobEnumerator) func(chan<- blob.SizedRef, <-chan struct{}) error {
498 return func(dst chan<- blob.SizedRef, intr <-chan struct{}) error {
499 return blobserver.EnumerateAll(ctx, src, func(sb blob.SizedRef) error {
500 select {
501 case dst <- sb:
502 case <-intr:
503 return errors.New("interrupted")
504 }
505 return nil
506 })
507 }
508 }
509
510
511
512 func (sh *SyncHandler) enumeratePendingBlobs(dst chan<- blob.SizedRef, intr <-chan struct{}) error {
513 defer close(dst)
514 sh.mu.Lock()
515 var toSend []blob.SizedRef
516 {
517 n := len(sh.needCopy)
518 const maxBatch = 1000
519 if n > maxBatch {
520 n = maxBatch
521 }
522 toSend = make([]blob.SizedRef, 0, n)
523 for br, size := range sh.needCopy {
524 toSend = append(toSend, blob.SizedRef{Ref: br, Size: size})
525 if len(toSend) == n {
526 break
527 }
528 }
529 }
530 sh.mu.Unlock()
531 for _, sb := range toSend {
532 select {
533 case dst <- sb:
534 case <-intr:
535 return nil
536 }
537 }
538 return nil
539 }
540
541
542
543 func (sh *SyncHandler) enumerateQueuedBlobs(dst chan<- blob.SizedRef, intr <-chan struct{}) error {
544 defer close(dst)
545 it := sh.queue.Find("", "")
546 for it.Next() {
547 br, ok := blob.Parse(it.Key())
548 size, err := strconv.ParseUint(it.Value(), 10, 32)
549 if !ok || err != nil {
550 sh.logf("ERROR: bogus sync queue entry: %q => %q", it.Key(), it.Value())
551 continue
552 }
553 select {
554 case dst <- blob.SizedRef{Ref: br, Size: uint32(size)}:
555 case <-intr:
556 return it.Close()
557 }
558 }
559 return it.Close()
560 }
561
562 func (sh *SyncHandler) runSync(syncType string, enumSrc func(chan<- blob.SizedRef, <-chan struct{}) error) int {
563 enumch := make(chan blob.SizedRef, 8)
564 errch := make(chan error, 1)
565 intr := make(chan struct{})
566 defer close(intr)
567 go func() { errch <- enumSrc(enumch, intr) }()
568
569 nCopied := 0
570 toCopy := 0
571
572 workch := make(chan blob.SizedRef, 1000)
573 resch := make(chan copyResult, 8)
574 FeedWork:
575 for sb := range enumch {
576 if toCopy < sh.copierPoolSize {
577 go sh.copyWorker(resch, workch)
578 }
579 select {
580 case workch <- sb:
581 toCopy++
582 default:
583
584 break FeedWork
585 }
586 }
587 close(workch)
588 for i := 0; i < toCopy; i++ {
589 sh.setStatusf("Copying blobs")
590 res := <-resch
591 if res.err == nil {
592 nCopied++
593 }
594 }
595
596 if err := <-errch; err != nil {
597 sh.logf("error enumerating for %v sync: %v", syncType, err)
598 }
599 return nCopied
600 }
601
602 func (sh *SyncHandler) syncLoop() {
603 for {
604 t0 := time.Now()
605
606 for sh.runSync(sh.fromName, sh.enumeratePendingBlobs) > 0 {
607
608 }
609 sh.setStatusf("Sleeping briefly before next long poll.")
610
611 d := queueSyncInterval - time.Since(t0)
612 select {
613 case <-time.After(d):
614 sh.signalIdle()
615 case <-sh.wakec:
616 }
617 }
618 }
619
620 func (sh *SyncHandler) copyWorker(res chan<- copyResult, work <-chan blob.SizedRef) {
621 for sb := range work {
622 res <- copyResult{sb, sh.copyBlob(context.TODO(), sb)}
623 }
624 }
625
626 func (sh *SyncHandler) copyBlob(ctx context.Context, sb blob.SizedRef) (err error) {
627 cs := sh.newCopyStatus(sb)
628 defer func() { cs.setError(err) }()
629 br := sb.Ref
630
631 sh.mu.Lock()
632 sh.copying[br] = cs
633 sh.mu.Unlock()
634
635 if sb.Size > constants.MaxBlobSize {
636 return fmt.Errorf("blob size %d too large; max blob size is %d", sb.Size, constants.MaxBlobSize)
637 }
638
639 cs.setStatus(statusFetching)
640 rc, fromSize, err := sh.from.Fetch(ctx, br)
641 if err != nil {
642 return fmt.Errorf("source fetch: %v", err)
643 }
644 if fromSize != sb.Size {
645 rc.Close()
646 return fmt.Errorf("source fetch size mismatch: get=%d, enumerate=%d", fromSize, sb.Size)
647 }
648
649 buf := make([]byte, fromSize)
650 hash := br.Hash()
651 cs.setStatus(statusReading)
652 n, err := io.ReadFull(io.TeeReader(rc,
653 io.MultiWriter(
654 incrWriter{cs, &cs.nread},
655 hash,
656 )), buf)
657 rc.Close()
658 if err != nil {
659 return fmt.Errorf("Read error after %d/%d bytes: %v", n, fromSize, err)
660 }
661 if !br.HashMatches(hash) {
662 return fmt.Errorf("Read data has unexpected digest %x", hash.Sum(nil))
663 }
664
665 cs.setStatus(statusWriting)
666 newsb, err := sh.to.ReceiveBlob(ctx, br, io.TeeReader(bytes.NewReader(buf), incrWriter{cs, &cs.nwrite}))
667 if err != nil {
668 return fmt.Errorf("dest write: %v", err)
669 }
670 if newsb.Size != sb.Size {
671 return fmt.Errorf("write size mismatch: source_read=%d but dest_write=%d", sb.Size, newsb.Size)
672 }
673 return nil
674 }
675
676 func (sh *SyncHandler) ReceiveBlob(ctx context.Context, br blob.Ref, r io.Reader) (sb blob.SizedRef, err error) {
677
678 n, err := io.Copy(io.Discard, r)
679 if err != nil {
680 return
681 }
682 sb = blob.SizedRef{Ref: br, Size: uint32(n)}
683 return sb, sh.enqueue(sb)
684 }
685
686
687
688 func (sh *SyncHandler) addBlobToCopy(sb blob.SizedRef) bool {
689 sh.mu.Lock()
690 defer sh.mu.Unlock()
691 if _, dup := sh.needCopy[sb.Ref]; dup {
692 return false
693 }
694
695 sh.needCopy[sb.Ref] = sb.Size
696 sh.bytesRemain += int64(sb.Size)
697
698
699
700 select {
701 case sh.wakec <- true:
702 default:
703 }
704 return true
705 }
706
707 func (sh *SyncHandler) enqueue(sb blob.SizedRef) error {
708 if !sh.addBlobToCopy(sb) {
709
710 return nil
711 }
712
713
714
715
716 if err := sh.queue.Set(sb.Ref.String(), fmt.Sprint(sb.Size)); err != nil {
717 return err
718 }
719 return nil
720 }
721
722 func (sh *SyncHandler) startFullValidation() {
723 sh.mu.Lock()
724 if sh.vshardDone == len(sh.vshards) {
725 sh.vshards, sh.vshardErrs = nil, nil
726 sh.vshardDone, sh.vmissing = 0, 0
727 sh.vdestCount, sh.vdestBytes = 0, 0
728 sh.vsrcCount, sh.vsrcBytes = 0, 0
729 }
730 if len(sh.vshards) != 0 {
731 sh.mu.Unlock()
732 return
733 }
734 sh.mu.Unlock()
735
736 sh.logf("running full validation; determining validation shards...")
737 shards := sh.shardPrefixes()
738
739 sh.mu.Lock()
740 if len(sh.vshards) != 0 {
741 sh.mu.Unlock()
742 return
743 }
744 sh.vshards = shards
745 sh.mu.Unlock()
746
747 go sh.runFullValidation()
748 }
749
750 func (sh *SyncHandler) runFullValidation() {
751 var wg sync.WaitGroup
752
753 sh.mu.Lock()
754 shards := sh.vshards
755 wg.Add(len(shards))
756 sh.mu.Unlock()
757
758 sh.logf("full validation beginning with %d shards...", len(shards))
759
760 const maxShardWorkers = 30
761 gate := syncutil.NewGate(maxShardWorkers)
762
763 for _, pfx := range shards {
764 pfx := pfx
765 gate.Start()
766 go func() {
767 defer wg.Done()
768 defer gate.Done()
769 sh.validateShardPrefix(pfx)
770 }()
771 }
772 wg.Wait()
773 sh.logf("validation complete")
774 }
775
776 func (sh *SyncHandler) validateShardPrefix(pfx string) (err error) {
777 defer func() {
778 sh.mu.Lock()
779 if err != nil {
780 errs := fmt.Sprintf("Failed to validate prefix %s: %v", pfx, err)
781 sh.logf("%s", errs)
782 sh.vshardErrs = append(sh.vshardErrs, errs)
783 }
784 sh.vshardDone++
785 sh.mu.Unlock()
786 }()
787 ctx, cancel := context.WithCancel(context.TODO())
788 defer cancel()
789 src, serrc := sh.startValidatePrefix(ctx, pfx, false)
790 dst, derrc := sh.startValidatePrefix(ctx, pfx, true)
791 srcErr := &chanError{
792 C: serrc,
793 Wrap: func(err error) error {
794 return fmt.Errorf("Error enumerating source %s for validating shard %s: %v", sh.fromName, pfx, err)
795 },
796 }
797 dstErr := &chanError{
798 C: derrc,
799 Wrap: func(err error) error {
800 return fmt.Errorf("Error enumerating target %s for validating shard %s: %v", sh.toName, pfx, err)
801 },
802 }
803
804 missingc := make(chan blob.SizedRef, 8)
805 go blobserver.ListMissingDestinationBlobs(missingc, func(blob.Ref) {}, src, dst)
806
807 var missing []blob.SizedRef
808 for sb := range missingc {
809 missing = append(missing, sb)
810 }
811
812 if err := srcErr.Get(); err != nil {
813 return err
814 }
815 if err := dstErr.Get(); err != nil {
816 return err
817 }
818
819 for _, sb := range missing {
820 if enqErr := sh.enqueue(sb); enqErr != nil {
821 if err == nil {
822 err = enqErr
823 }
824 } else {
825 sh.mu.Lock()
826 sh.vmissing++
827 sh.mu.Unlock()
828 }
829 }
830 return err
831 }
832
833 var errNotPrefix = errors.New("sentinel error: hit blob into the next shard")
834
835
836 func (sh *SyncHandler) startValidatePrefix(ctx context.Context, pfx string, doDest bool) (<-chan blob.SizedRef, <-chan error) {
837 var e blobserver.BlobEnumerator
838 if doDest {
839 e = sh.to
840 } else {
841 e = sh.from
842 }
843 c := make(chan blob.SizedRef, 64)
844 errc := make(chan error, 1)
845 go func() {
846 defer close(c)
847 var last string
848 err := blobserver.EnumerateAllFrom(ctx, e, pfx, func(sb blob.SizedRef) error {
849
850 brStr := sb.Ref.String()
851 if brStr < pfx {
852 log.Fatalf("Storage target %T enumerate not behaving: %q < requested prefix %q", e, brStr, pfx)
853 }
854 if last != "" && last >= brStr {
855 log.Fatalf("Storage target %T enumerate not behaving: previous %q >= current %q", e, last, brStr)
856 }
857 last = brStr
858
859
860
861 if !strings.HasPrefix(brStr, pfx) {
862 return errNotPrefix
863 }
864 select {
865 case c <- sb:
866 sh.mu.Lock()
867 if doDest {
868 sh.vdestCount++
869 sh.vdestBytes += int64(sb.Size)
870 } else {
871 sh.vsrcCount++
872 sh.vsrcBytes += int64(sb.Size)
873 }
874 sh.mu.Unlock()
875 return nil
876 case <-ctx.Done():
877 return ctx.Err()
878 }
879 })
880 if err == errNotPrefix {
881 err = nil
882 }
883 if err != nil {
884
885 c <- blob.SizedRef{}
886 }
887 errc <- err
888 }()
889 return c, errc
890 }
891
892 func (sh *SyncHandler) shardPrefixes() []string {
893 var pfx []string
894
895
896
897 for i := 0; i < 256; i++ {
898 pfx = append(pfx, fmt.Sprintf("sha1-%02x", i))
899 pfx = append(pfx, fmt.Sprintf("sha224-%02x", i))
900 }
901 return pfx
902 }
903
904 func (sh *SyncHandler) newCopyStatus(sb blob.SizedRef) *copyStatus {
905 now := time.Now()
906 return ©Status{
907 sh: sh,
908 sb: sb,
909 state: statusStarting,
910 start: now,
911 t: now,
912 }
913 }
914
915
916 type copyStatus struct {
917 sh *SyncHandler
918 sb blob.SizedRef
919 start time.Time
920
921 mu sync.Mutex
922 state string
923 t time.Time
924 nread uint32
925 nwrite uint32
926 }
927
928 const (
929 statusStarting = "starting"
930 statusFetching = "fetching source"
931 statusReading = "reading"
932 statusWriting = "writing"
933 )
934
935 func (cs *copyStatus) setStatus(s string) {
936 now := time.Now()
937 cs.mu.Lock()
938 defer cs.mu.Unlock()
939 cs.state = s
940 cs.t = now
941 }
942
943 func (cs *copyStatus) setError(err error) {
944 now := time.Now()
945 sh := cs.sh
946 br := cs.sb.Ref
947 if err == nil {
948
949
950 if derr := sh.queue.Delete(br.String()); derr != nil {
951 sh.logf("queue delete of %v error: %v", cs.sb.Ref, derr)
952 }
953 }
954
955 sh.mu.Lock()
956 defer sh.mu.Unlock()
957 if _, needCopy := sh.needCopy[br]; !needCopy {
958 sh.logf("IGNORING DUPLICATE UPLOAD of %v = %v", br, err)
959 return
960 }
961 delete(sh.copying, br)
962 if err == nil {
963 delete(sh.needCopy, br)
964 delete(sh.lastFail, br)
965 sh.recentCopyTime = now
966 sh.totalCopies++
967 sh.totalCopyBytes += int64(cs.sb.Size)
968 sh.bytesRemain -= int64(cs.sb.Size)
969 return
970 }
971
972 sh.totalErrors++
973 sh.logf("error copying %v: %v", br, err)
974 sh.lastFail[br] = failDetail{
975 when: now,
976 err: err,
977 }
978
979
980 if len(sh.recentErrors) == maxRecentErrors {
981 copy(sh.recentErrors, sh.recentErrors[1:])
982 sh.recentErrors = sh.recentErrors[:maxRecentErrors-1]
983 }
984 sh.recentErrors = append(sh.recentErrors, br)
985 }
986
987 func (cs *copyStatus) String() string {
988 var buf bytes.Buffer
989 now := time.Now()
990 buf.WriteString(cs.sb.Ref.String())
991 buf.WriteString(": ")
992
993 cs.mu.Lock()
994 defer cs.mu.Unlock()
995 sinceStart := now.Sub(cs.start)
996 sinceLast := now.Sub(cs.t)
997
998 switch cs.state {
999 case statusReading:
1000 buf.WriteString(cs.state)
1001 fmt.Fprintf(&buf, " (%d/%dB)", cs.nread, cs.sb.Size)
1002 case statusWriting:
1003 if cs.nwrite == cs.sb.Size {
1004 buf.WriteString("wrote all, waiting ack")
1005 } else {
1006 buf.WriteString(cs.state)
1007 fmt.Fprintf(&buf, " (%d/%dB)", cs.nwrite, cs.sb.Size)
1008 }
1009 default:
1010 buf.WriteString(cs.state)
1011
1012 }
1013 if sinceLast > 5*time.Second {
1014 fmt.Fprintf(&buf, ", last change %v ago (total elapsed %v)", sinceLast, sinceStart)
1015 }
1016 return buf.String()
1017 }
1018
1019 type failDetail struct {
1020 when time.Time
1021 err error
1022 }
1023
1024
1025 type incrWriter struct {
1026 cs *copyStatus
1027 n *uint32
1028 }
1029
1030 func (w incrWriter) Write(p []byte) (n int, err error) {
1031 w.cs.mu.Lock()
1032 *w.n += uint32(len(p))
1033 w.cs.t = time.Now()
1034 w.cs.mu.Unlock()
1035 return len(p), nil
1036 }
1037
1038 func storageDesc(v interface{}) string {
1039 if s, ok := v.(fmt.Stringer); ok {
1040 return s.String()
1041 }
1042 return fmt.Sprintf("%T", v)
1043 }
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054 func (sh *SyncHandler) Fetch(context.Context, blob.Ref) (file io.ReadCloser, size uint32, err error) {
1055 panic("unimplemented blobserver.Fetch called")
1056 }
1057
1058 func (sh *SyncHandler) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
1059 sh.logf("unexpected StatBlobs call")
1060 return nil
1061 }
1062
1063 func (sh *SyncHandler) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
1064 defer close(dest)
1065 sh.logf("unexpected EnumerateBlobs call")
1066 return nil
1067 }
1068
1069 func (sh *SyncHandler) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
1070 panic("unimplemented RemoveBlobs")
1071 }
1072
1073 var errStopEnumerating = errors.New("sentinel error: reached the hourly compare quota")
1074
1075
1076
1077 func (sh *SyncHandler) hourlyCompare(hourlyBytes uint64) {
1078 ctx := context.TODO()
1079 ticker := time.NewTicker(time.Hour).C
1080 for {
1081 content := make([]byte, 16)
1082 if _, err := rand.Read(content); err != nil {
1083 panic(err)
1084 }
1085 after := blob.RefFromBytes(content).String()
1086 var roundBytes uint64
1087 var roundBlobs int
1088 err := blobserver.EnumerateAllFrom(ctx, sh.from, after, func(sr blob.SizedRef) error {
1089 sh.mu.Lock()
1090 if _, ok := sh.needCopy[sr.Ref]; ok {
1091 sh.mu.Unlock()
1092 return nil
1093 }
1094 sh.mu.Unlock()
1095
1096 if roundBytes+uint64(sr.Size) > hourlyBytes {
1097 return errStopEnumerating
1098 }
1099 blob, size, err := sh.to.(blob.Fetcher).Fetch(ctx, sr.Ref)
1100 if err != nil {
1101 return fmt.Errorf("error fetching %s: %v", sr.Ref, err)
1102 }
1103 if size != sr.Size {
1104 return fmt.Errorf("%s: expected size %d, got %d", sr.Ref, sr.Size, size)
1105 }
1106 h := sr.Ref.Hash()
1107 if _, err := io.Copy(h, blob); err != nil {
1108 return fmt.Errorf("error while reading %s: %v", sr.Ref, err)
1109 }
1110 if !sr.HashMatches(h) {
1111 return fmt.Errorf("expected %s, got %x", sr.Ref, h.Sum(nil))
1112 }
1113
1114 sh.mu.Lock()
1115 sh.comparedBlobs++
1116 sh.comparedBytes += uint64(size)
1117 sh.compLastBlob = sr.Ref.String()
1118 sh.mu.Unlock()
1119 roundBlobs++
1120 roundBytes += uint64(size)
1121 return nil
1122 })
1123 sh.mu.Lock()
1124 if err != nil && err != errStopEnumerating {
1125 sh.compareErrors = append(sh.compareErrors, fmt.Sprintf("%s %v", time.Now(), err))
1126 sh.logf("!! hourly compare error !!: %v", err)
1127 }
1128 sh.comparedRounds++
1129 sh.mu.Unlock()
1130 sh.logf("compared %d blobs (%d bytes)", roundBlobs, roundBytes)
1131 <-ticker
1132 }
1133 }
1134
1135
1136
1137 type chanError struct {
1138 C <-chan error
1139 Wrap func(error) error
1140 err error
1141 received bool
1142 }
1143
1144 func (ce *chanError) Set(err error) {
1145 if ce.Wrap != nil && err != nil {
1146 err = ce.Wrap(err)
1147 }
1148 ce.err = err
1149 ce.received = true
1150 }
1151
1152 func (ce *chanError) Get() error {
1153 if ce.received {
1154 return ce.err
1155 }
1156 ce.Set(<-ce.C)
1157 return ce.err
1158 }