1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package blobserver
18
19 import (
20 "context"
21 "errors"
22 "regexp"
23 "strconv"
24 "strings"
25 )
26
27
28 func NewMultiBlobStreamer(streamers ...BlobStreamer) BlobStreamer {
29 return multiStreamer{s: streamers}
30 }
31
32 type multiStreamer struct {
33 s []BlobStreamer
34 }
35
36 var msTokenPrefixRx = regexp.MustCompile(`^(\d+):`)
37
38 func (ms multiStreamer) StreamBlobs(ctx context.Context, dest chan<- BlobAndToken, contToken string) error {
39 defer close(dest)
40 part := 0
41 if contToken != "" {
42 pfx := msTokenPrefixRx.FindString(contToken)
43 var err error
44 part, err = strconv.Atoi(strings.TrimSuffix(pfx, ":"))
45 if pfx == "" || err != nil || part >= len(ms.s) {
46 return errors.New("invalid continuation token")
47 }
48 contToken = contToken[len(pfx):]
49 }
50 srcs := ms.s[part:]
51 for len(srcs) > 0 {
52 bs := srcs[0]
53 subDest := make(chan BlobAndToken, 16)
54 errc := make(chan error, 1)
55 go func() {
56 errc <- bs.StreamBlobs(ctx, subDest, contToken)
57 }()
58 partStr := strconv.Itoa(part)
59 for bt := range subDest {
60 select {
61 case <-ctx.Done():
62 return ctx.Err()
63 case dest <- BlobAndToken{Blob: bt.Blob, Token: partStr + ":" + bt.Token}:
64 }
65 }
66 if err := <-errc; err != nil {
67 return err
68 }
69
70 part++
71 srcs = srcs[1:]
72 contToken = ""
73 }
74 return nil
75 }