1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package blobserver
18
19 import (
20 "context"
21
22 "perkeep.org/pkg/blob"
23 )
24
25 const buffered = 8
26
27
28
29 func MergedEnumerate(ctx context.Context, dest chan<- blob.SizedRef, sources []BlobEnumerator, after string, limit int) error {
30 return mergedEnumerate(ctx, dest, len(sources), func(i int) BlobEnumerator { return sources[i] }, after, limit)
31 }
32
33
34
35
36
37
38 func MergedEnumerateStorage(ctx context.Context, dest chan<- blob.SizedRef, sources []Storage, after string, limit int) error {
39 return mergedEnumerate(ctx, dest, len(sources), func(i int) BlobEnumerator { return sources[i] }, after, limit)
40 }
41
42 func mergedEnumerate(ctx context.Context, dest chan<- blob.SizedRef, nsrc int, getSource func(int) BlobEnumerator, after string, limit int) error {
43 defer close(dest)
44
45 subctx, cancel := context.WithCancel(ctx)
46 defer cancel()
47
48 errch := make(chan error, nsrc+1)
49 startEnum := func(source BlobEnumerator) *blob.ChanPeeker {
50 ch := make(chan blob.SizedRef, buffered)
51 go func() {
52 err := source.EnumerateBlobs(subctx, ch, after, limit)
53 if err != nil {
54 errch <- err
55 }
56 }()
57 return &blob.ChanPeeker{Ch: ch}
58 }
59
60 peekers := make([]*blob.ChanPeeker, 0, nsrc)
61 for i := 0; i < nsrc; i++ {
62 peekers = append(peekers, startEnum(getSource(i)))
63 }
64
65 nSent := 0
66 var lastSent blob.Ref
67 tooLow := func(br blob.Ref) bool { return lastSent.Valid() && (br == lastSent || br.Less(lastSent)) }
68 for nSent < limit {
69 lowestIdx := -1
70 var lowest blob.SizedRef
71 for idx, peeker := range peekers {
72 for !peeker.Closed() && tooLow(peeker.MustPeek().Ref) {
73 peeker.Take()
74 }
75 if peeker.Closed() {
76 continue
77 }
78 sb := peeker.MustPeek()
79 if lowestIdx == -1 || sb.Ref.Less(lowest.Ref) {
80 lowestIdx = idx
81 lowest = sb
82 }
83 }
84 if lowestIdx == -1 {
85
86 break
87 }
88
89 select {
90 case dest <- lowest:
91 nSent++
92 lastSent = lowest.Ref
93 case <-ctx.Done():
94 return ctx.Err()
95 case err := <-errch:
96 return err
97 }
98 }
99
100
101 errch <- nil
102 return <-errch
103 }