1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
38 package cond
39
40 import (
41 "bytes"
42 "context"
43 "errors"
44 "fmt"
45 "io"
46 "time"
47
48 "go4.org/jsonconfig"
49 "perkeep.org/pkg/blob"
50 "perkeep.org/pkg/blobserver"
51 "perkeep.org/pkg/schema"
52 )
53
54
55
56 type storageFunc func(br blob.Ref, src io.Reader) (dest blobserver.Storage, newSrc io.Reader, err error)
57
58 type condStorage struct {
59 storageForReceive storageFunc
60 read blobserver.Storage
61 remove blobserver.Storage
62 }
63
64 func (sto *condStorage) StorageGeneration() (initTime time.Time, random string, err error) {
65 if gener, ok := sto.read.(blobserver.Generationer); ok {
66 return gener.StorageGeneration()
67 }
68 err = blobserver.GenerationNotSupportedError(fmt.Sprintf("blobserver.Generationer not implemented on %T", sto.read))
69 return
70 }
71
72 func (sto *condStorage) ResetStorageGeneration() error {
73 if gener, ok := sto.read.(blobserver.Generationer); ok {
74 return gener.ResetStorageGeneration()
75 }
76 return blobserver.GenerationNotSupportedError(fmt.Sprintf("blobserver.Generationer not implemented on %T", sto.read))
77 }
78
79 func newFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (storage blobserver.Storage, err error) {
80 sto := &condStorage{}
81
82 receive := conf.OptionalStringOrObject("write")
83 read := conf.RequiredString("read")
84 remove := conf.OptionalString("remove", "")
85 if err := conf.Validate(); err != nil {
86 return nil, err
87 }
88
89 if receive != nil {
90 sto.storageForReceive, err = buildStorageForReceive(ld, receive)
91 if err != nil {
92 return
93 }
94 }
95
96 sto.read, err = ld.GetStorage(read)
97 if err != nil {
98 return
99 }
100
101 if remove != "" {
102 sto.remove, err = ld.GetStorage(remove)
103 if err != nil {
104 return
105 }
106 }
107 return sto, nil
108 }
109
110 func buildStorageForReceive(ld blobserver.Loader, confOrString interface{}) (storageFunc, error) {
111
112 if s, ok := confOrString.(string); ok {
113 sto, err := ld.GetStorage(s)
114 if err != nil {
115 return nil, err
116 }
117 f := func(br blob.Ref, src io.Reader) (blobserver.Storage, io.Reader, error) {
118 return sto, src, nil
119 }
120 return f, nil
121 }
122
123 conf := jsonconfig.Obj(confOrString.(map[string]interface{}))
124
125 ifStr := conf.RequiredString("if")
126
127
128
129 thenTarget := conf.RequiredString("then")
130 elseTarget := conf.RequiredString("else")
131 if err := conf.Validate(); err != nil {
132 return nil, err
133 }
134 thenSto, err := ld.GetStorage(thenTarget)
135 if err != nil {
136 return nil, err
137 }
138 elseSto, err := ld.GetStorage(elseTarget)
139 if err != nil {
140 return nil, err
141 }
142
143 switch ifStr {
144 case "isSchema":
145 return isSchemaPicker(thenSto, elseSto), nil
146 }
147 return nil, fmt.Errorf("cond: unsupported 'if' type of %q", ifStr)
148 }
149
150 func isSchemaPicker(thenSto, elseSto blobserver.Storage) storageFunc {
151 return func(br blob.Ref, src io.Reader) (dest blobserver.Storage, newSrc io.Reader, err error) {
152 var buf bytes.Buffer
153 blob, err := schema.BlobFromReader(br, io.TeeReader(src, &buf))
154 newSrc = io.MultiReader(bytes.NewReader(buf.Bytes()), src)
155 if err != nil || blob.Type() == "" {
156 return elseSto, newSrc, nil
157 }
158 return thenSto, newSrc, nil
159 }
160 }
161
162 func (sto *condStorage) ReceiveBlob(ctx context.Context, br blob.Ref, src io.Reader) (sb blob.SizedRef, err error) {
163 destSto, src, err := sto.storageForReceive(br, src)
164 if err != nil {
165 return
166 }
167 return blobserver.Receive(ctx, destSto, br, src)
168 }
169
170 func (sto *condStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
171 if sto.remove != nil {
172 return sto.remove.RemoveBlobs(ctx, blobs)
173 }
174 return errors.New("cond: Remove not configured")
175 }
176
177 func (sto *condStorage) Fetch(ctx context.Context, b blob.Ref) (file io.ReadCloser, size uint32, err error) {
178 if sto.read != nil {
179 return sto.read.Fetch(ctx, b)
180 }
181 err = errors.New("cond: Read not configured")
182 return
183 }
184
185 func (sto *condStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
186 if sto.read != nil {
187 return sto.read.StatBlobs(ctx, blobs, fn)
188 }
189 return errors.New("cond: Read not configured")
190 }
191
192 func (sto *condStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
193 if sto.read != nil {
194 return sto.read.EnumerateBlobs(ctx, dest, after, limit)
195 }
196 return errors.New("cond: Read not configured")
197 }
198
199 func init() {
200 blobserver.RegisterStorageConstructor("cond", blobserver.StorageConstructor(newFromConfig))
201 }