1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package schema
18
19 import (
20 "bufio"
21 "bytes"
22 "context"
23 "fmt"
24 "io"
25 "os"
26 "strings"
27 "time"
28
29 "go4.org/rollsum"
30 "perkeep.org/pkg/blob"
31 "perkeep.org/pkg/blobserver"
32
33 "go4.org/syncutil"
34 )
35
36 const (
37
38
39 maxBlobSize = 1 << 20
40
41
42
43
44
45
46
47
48
49 firstChunkSize = 256 << 10
50
51
52
53
54
55
56
57 bufioReaderSize = 32 << 10
58
59
60
61
62 tooSmallThreshold = 64 << 10
63 )
64
65
66
67
68
69 func WriteFileFromReaderWithModTime(ctx context.Context, bs blobserver.StatReceiver, filename string, modTime time.Time, r io.Reader) (blob.Ref, error) {
70 if strings.Contains(filename, "/") {
71 return blob.Ref{}, fmt.Errorf("schema.WriteFileFromReader: filename %q shouldn't contain a slash", filename)
72 }
73
74 m := NewFileMap(filename)
75 if !modTime.IsZero() {
76 m.SetModTime(modTime)
77 }
78 return WriteFileMap(ctx, bs, m, r)
79 }
80
81
82
83
84
85 func WriteFileFromReader(ctx context.Context, bs blobserver.StatReceiver, filename string, r io.Reader) (blob.Ref, error) {
86 return WriteFileFromReaderWithModTime(ctx, bs, filename, time.Time{}, r)
87 }
88
89
90
91
92 func WriteFileMap(ctx context.Context, bs blobserver.StatReceiver, file *Builder, r io.Reader) (blob.Ref, error) {
93 return writeFileMapRolling(ctx, bs, file, r)
94 }
95
96 func serverHasBlob(ctx context.Context, bs blobserver.BlobStatter, br blob.Ref) (have bool, err error) {
97 _, err = blobserver.StatBlob(ctx, bs, br)
98 if err == nil {
99 have = true
100 } else if err == os.ErrNotExist {
101 err = nil
102 }
103 return
104 }
105
106 type span struct {
107 from, to int64
108 bits int
109 br blob.Ref
110 children []span
111 }
112
113 func (s *span) isSingleBlob() bool {
114 return len(s.children) == 0
115 }
116
117 func (s *span) size() int64 {
118 size := s.to - s.from
119 for _, cs := range s.children {
120 size += cs.size()
121 }
122 return size
123 }
124
125
126
127 type noteEOFReader struct {
128 r io.Reader
129 sawEOF bool
130 }
131
132 func (r *noteEOFReader) Read(p []byte) (n int, err error) {
133 n, err = r.r.Read(p)
134 if err == io.EOF {
135 r.sawEOF = true
136 }
137 return
138 }
139
140 func uploadString(ctx context.Context, bs blobserver.StatReceiver, br blob.Ref, s string) (blob.Ref, error) {
141 if !br.Valid() {
142 panic("invalid blobref")
143 }
144 hasIt, err := serverHasBlob(ctx, bs, br)
145 if err != nil {
146 return blob.Ref{}, err
147 }
148 if hasIt {
149 return br, nil
150 }
151 _, err = blobserver.ReceiveNoHash(ctx, bs, br, strings.NewReader(s))
152 if err != nil {
153 return blob.Ref{}, err
154 }
155 return br, nil
156 }
157
158
159
160
161
162 func uploadBytes(ctx context.Context, bs blobserver.StatReceiver, bb *Builder, size int64, s []span) *uploadBytesFuture {
163 future := newUploadBytesFuture()
164 parts := []BytesPart{}
165 addBytesParts(ctx, bs, &parts, s, future)
166
167 if err := bb.PopulateParts(size, parts); err != nil {
168 future.errc <- err
169 return future
170 }
171
172
173
174
175
176 if bb.Type() == TypeFile {
177 future.errc <- nil
178 _, err := future.Get()
179 future = newUploadBytesFuture()
180 if err != nil {
181 future.errc <- err
182 return future
183 }
184 }
185
186 json := bb.Blob().JSON()
187 br := blob.RefFromString(json)
188 future.br = br
189 go func() {
190 _, err := uploadString(ctx, bs, br, json)
191 future.errc <- err
192 }()
193 return future
194 }
195
196 func newUploadBytesFuture() *uploadBytesFuture {
197 return &uploadBytesFuture{
198 errc: make(chan error, 1),
199 }
200 }
201
202
203
204 type uploadBytesFuture struct {
205 br blob.Ref
206 errc chan error
207 children []*uploadBytesFuture
208 }
209
210
211 func (f *uploadBytesFuture) BlobRef() blob.Ref {
212 return f.br
213 }
214
215
216 func (f *uploadBytesFuture) Get() (blob.Ref, error) {
217 for _, f := range f.children {
218 if _, err := f.Get(); err != nil {
219 return blob.Ref{}, err
220 }
221 }
222 return f.br, <-f.errc
223 }
224
225
226 func addBytesParts(ctx context.Context, bs blobserver.StatReceiver, dst *[]BytesPart, spans []span, parent *uploadBytesFuture) {
227 for _, sp := range spans {
228 if len(sp.children) == 1 && sp.children[0].isSingleBlob() {
229
230
231
232
233 child := sp.children[0]
234 *dst = append(*dst, BytesPart{
235 BlobRef: child.br,
236 Size: uint64(child.size()),
237 })
238 sp.children = nil
239 }
240 if len(sp.children) > 0 {
241 childrenSize := int64(0)
242 for _, cs := range sp.children {
243 childrenSize += cs.size()
244 }
245 future := uploadBytes(ctx, bs, newBytes(), childrenSize, sp.children)
246 parent.children = append(parent.children, future)
247 *dst = append(*dst, BytesPart{
248 BytesRef: future.BlobRef(),
249 Size: uint64(childrenSize),
250 })
251 }
252 if sp.from == sp.to {
253 panic("Shouldn't happen. " + fmt.Sprintf("weird span with same from & to: %#v", sp))
254 }
255 *dst = append(*dst, BytesPart{
256 BlobRef: sp.br,
257 Size: uint64(sp.to - sp.from),
258 })
259 }
260 }
261
262
263
264
265 func writeFileMapRolling(ctx context.Context, bs blobserver.StatReceiver, file *Builder, r io.Reader) (blob.Ref, error) {
266 n, spans, err := writeFileChunks(ctx, bs, file, r)
267 if err != nil {
268 return blob.Ref{}, err
269 }
270
271 return uploadBytes(ctx, bs, file, n, spans).Get()
272 }
273
274
275
276 func WriteFileChunks(ctx context.Context, bs blobserver.StatReceiver, file *Builder, r io.Reader) error {
277 size, spans, err := writeFileChunks(ctx, bs, file, r)
278 if err != nil {
279 return err
280 }
281 parts := []BytesPart{}
282 future := newUploadBytesFuture()
283 addBytesParts(ctx, bs, &parts, spans, future)
284 future.errc <- nil
285 if _, err := future.Get(); err != nil {
286 return err
287 }
288 return file.PopulateParts(size, parts)
289 }
290
291 func writeFileChunks(ctx context.Context, bs blobserver.StatReceiver, file *Builder, r io.Reader) (n int64, spans []span, outerr error) {
292 src := ¬eEOFReader{r: r}
293 bufr := bufio.NewReaderSize(src, bufioReaderSize)
294 spans = []span{}
295 rs := rollsum.New()
296 var last int64
297 var buf bytes.Buffer
298 blobSize := 0
299
300 const chunksInFlight = 32
301 gatec := syncutil.NewGate(chunksInFlight)
302 firsterrc := make(chan error, 1)
303
304
305
306
307 uploadLastSpan := func() bool {
308 chunk := buf.String()
309 buf.Reset()
310 br := blob.RefFromString(chunk)
311 spans[len(spans)-1].br = br
312 select {
313 case outerr = <-firsterrc:
314 return false
315 default:
316
317 }
318 gatec.Start()
319 go func() {
320 defer gatec.Done()
321 if _, err := uploadString(ctx, bs, br, chunk); err != nil {
322 select {
323 case firsterrc <- err:
324 default:
325 }
326 }
327 }()
328 return true
329 }
330
331 for {
332 c, err := bufr.ReadByte()
333 if err == io.EOF {
334 if n != last {
335 spans = append(spans, span{from: last, to: n})
336 if !uploadLastSpan() {
337 return
338 }
339 }
340 break
341 }
342 if err != nil {
343 return 0, nil, err
344 }
345
346 buf.WriteByte(c)
347 n++
348 blobSize++
349 rs.Roll(c)
350
351 var bits int
352 onRollSplit := rs.OnSplit()
353 switch {
354 case blobSize == maxBlobSize:
355 bits = 20
356 case src.sawEOF:
357
358 continue
359 case onRollSplit && n > firstChunkSize && blobSize > tooSmallThreshold:
360 bits = rs.Bits()
361 case n == firstChunkSize:
362 bits = 18
363 default:
364
365 continue
366 }
367 blobSize = 0
368
369
370
371
372 var children []span
373 childrenFrom := len(spans)
374 for childrenFrom > 0 && spans[childrenFrom-1].bits < bits {
375 childrenFrom--
376 }
377 if nCopy := len(spans) - childrenFrom; nCopy > 0 {
378 children = make([]span, nCopy)
379 copy(children, spans[childrenFrom:])
380 spans = spans[:childrenFrom]
381 }
382
383 spans = append(spans, span{from: last, to: n, bits: bits, children: children})
384 last = n
385 if !uploadLastSpan() {
386 return
387 }
388 }
389
390
391 if outerr != nil {
392 return 0, nil, outerr
393 }
394
395
396
397
398
399 for i := 0; i < chunksInFlight; i++ {
400 gatec.Start()
401 }
402 select {
403 case err := <-firsterrc:
404 return 0, nil, err
405 default:
406 }
407
408 return n, spans, nil
409
410 }