1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17
18
19 package union
20
21 import (
22 "context"
23 "errors"
24 "io"
25 "sync"
26
27 "go4.org/jsonconfig"
28 "perkeep.org/pkg/blob"
29 "perkeep.org/pkg/blobserver"
30 )
31
32 type unionStorage struct {
33 subsets []blobserver.Storage
34 }
35
36 func newFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (blobserver.Storage, error) {
37 sto := &unionStorage{}
38
39 reads := conf.RequiredList("subsets")
40 if err := conf.Validate(); err != nil {
41 return nil, err
42 }
43
44 for _, s := range reads {
45 rs, err := ld.GetStorage(s)
46 if err != nil {
47 return nil, err
48 }
49 sto.subsets = append(sto.subsets, rs)
50 }
51
52 return sto, nil
53 }
54
55
56 func (sto *unionStorage) ReceiveBlob(ctx context.Context, br blob.Ref, src io.Reader) (sb blob.SizedRef, err error) {
57 return blob.SizedRef{}, blobserver.ErrReadonly
58 }
59
60
61 func (sto *unionStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
62 return blobserver.ErrReadonly
63 }
64
65
66
67 func (sto *unionStorage) Fetch(ctx context.Context, b blob.Ref) (file io.ReadCloser, size uint32, err error) {
68 type result struct {
69 file io.ReadCloser
70 size uint32
71 err error
72 }
73 results := make(chan result, len(sto.subsets))
74 var wg sync.WaitGroup
75 for _, bs := range sto.subsets {
76 bs := bs
77 wg.Add(1)
78 go func() {
79 defer wg.Done()
80 var res result
81 res.file, res.size, res.err = bs.Fetch(ctx, b)
82 results <- res
83 }()
84 }
85 go func() {
86 wg.Wait()
87 close(results)
88 }()
89 var firstErr error
90 var firstRes result
91 for r := range results {
92 if r.err != nil {
93 if firstErr == nil {
94 firstErr = r.err
95 }
96 continue
97 }
98 if firstRes.file != nil {
99 if r.file != nil {
100 r.file.Close()
101 }
102 continue
103 }
104
105 firstRes = r
106 }
107 if firstRes.file != nil {
108 return firstRes.file, firstRes.size, nil
109 }
110 return nil, 0, firstErr
111 }
112
113
114 func (sto *unionStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, f func(blob.SizedRef) error) error {
115 if err := ctx.Err(); err != nil {
116 return err
117 }
118
119 maybeDup := make(chan blob.SizedRef)
120 errCh := make(chan error, 1)
121 var wg sync.WaitGroup
122 var any bool
123 for _, s := range sto.subsets {
124 if bs, ok := s.(blobserver.BlobStatter); ok {
125 any = true
126 wg.Add(1)
127 go func() {
128 defer wg.Done()
129 if err := bs.StatBlobs(ctx, blobs, func(sr blob.SizedRef) error {
130 maybeDup <- sr
131 return nil
132 }); err != nil {
133 errCh <- err
134 }
135 }()
136 }
137 }
138 if !any {
139 return errors.New("union: No BlobStatter reader configured")
140 }
141
142 var closeChanOnce sync.Once
143 go func() {
144 wg.Wait()
145 closeChanOnce.Do(func() { close(maybeDup) })
146 }()
147
148 seen := make(map[blob.Ref]struct{}, len(blobs))
149 for {
150 select {
151 case <-ctx.Done():
152 return ctx.Err()
153 case err := <-errCh:
154 closeChanOnce.Do(func() { close(maybeDup) })
155 return err
156 case sr, ok := <-maybeDup:
157 if !ok {
158 return nil
159 }
160 if _, ok = seen[sr.Ref]; !ok {
161 seen[sr.Ref] = struct{}{}
162 if err := f(sr); err != nil {
163 return err
164 }
165 }
166 }
167 }
168 }
169
170
171 func (sto *unionStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
172 return blobserver.MergedEnumerateStorage(ctx, dest, sto.subsets, after, limit)
173 }
174
175 func init() {
176 blobserver.RegisterStorageConstructor("union", blobserver.StorageConstructor(newFromConfig))
177 }