1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package client
18
19 import (
20 "bytes"
21 "context"
22 "errors"
23 "fmt"
24 "io"
25 "log"
26 "mime/multipart"
27 "net/http"
28 "net/url"
29 "os"
30 "strings"
31 "sync"
32 "time"
33
34 "perkeep.org/internal/hashutil"
35 "perkeep.org/internal/httputil"
36 "perkeep.org/pkg/blob"
37 "perkeep.org/pkg/blobserver"
38 "perkeep.org/pkg/blobserver/protocol"
39 "perkeep.org/pkg/constants"
40 "perkeep.org/pkg/env"
41 "perkeep.org/pkg/schema"
42 )
43
44
45 type UploadHandle struct {
46
47 BlobRef blob.Ref
48
49
50 Contents io.Reader
51
52
53
54 Size uint32
55
56
57
58
59
60
61 Vivify bool
62
63
64
65
66
67
68 SkipStat bool
69 }
70
71 type PutResult struct {
72 BlobRef blob.Ref
73 Size uint32
74 Skipped bool
75 }
76
77 func (pr *PutResult) SizedBlobRef() blob.SizedRef {
78 return blob.SizedRef{Ref: pr.BlobRef, Size: pr.Size}
79 }
80
81
82
83 type statResponse struct {
84 HaveMap map[string]blob.SizedRef
85 canLongPoll bool
86 }
87
88 type ResponseFormatError error
89
90 var (
91 multipartOnce sync.Once
92 multipartOverhead int64
93 )
94
95
96
97 func getMultipartOverhead() int64 {
98 multipartOnce.Do(func() {
99 var b bytes.Buffer
100 w := multipart.NewWriter(&b)
101 part, _ := w.CreateFormFile("0", "0")
102
103 dummyContents := []byte("0")
104 part.Write(dummyContents)
105
106 w.Close()
107 multipartOverhead = int64(b.Len()) - 3
108 })
109 return multipartOverhead
110 }
111
112 func parseStatResponse(res *http.Response) (*statResponse, error) {
113 var s = &statResponse{HaveMap: make(map[string]blob.SizedRef)}
114 var pres protocol.StatResponse
115 if err := httputil.DecodeJSON(res, &pres); err != nil {
116 return nil, ResponseFormatError(err)
117 }
118
119 s.canLongPoll = pres.CanLongPoll
120 for _, statItem := range pres.Stat {
121 br := statItem.Ref
122 if !br.Valid() {
123 continue
124 }
125 s.HaveMap[br.String()] = blob.SizedRef{Ref: br, Size: uint32(statItem.Size)}
126 }
127 return s, nil
128 }
129
130
131 func NewUploadHandleFromString(data string) *UploadHandle {
132 bref := blob.RefFromString(data)
133 r := strings.NewReader(data)
134 return &UploadHandle{BlobRef: bref, Size: uint32(len(data)), Contents: r}
135 }
136
137
138
139 func (c *Client) responseJSONMap(requestName string, resp *http.Response) (map[string]interface{}, error) {
140 if resp.StatusCode != 200 {
141 c.printf("After %s request, failed to JSON from response; status code is %d", requestName, resp.StatusCode)
142 io.Copy(os.Stderr, resp.Body)
143 return nil, fmt.Errorf("after %s request, HTTP response code is %d; no JSON to parse", requestName, resp.StatusCode)
144 }
145 jmap := make(map[string]interface{})
146 if err := httputil.DecodeJSON(resp, &jmap); err != nil {
147 return nil, err
148 }
149 return jmap, nil
150 }
151
152 func (c *Client) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
153 if c.sto != nil {
154 return c.sto.StatBlobs(ctx, blobs, fn)
155 }
156 var needStat []blob.Ref
157 for _, br := range blobs {
158 if !br.Valid() {
159 panic("invalid blob")
160 }
161 if size, ok := c.haveCache.StatBlobCache(br); ok {
162 if err := fn(blob.SizedRef{Ref: br, Size: size}); err != nil {
163 return err
164 }
165 } else {
166 needStat = append(needStat, br)
167 }
168 }
169 if len(needStat) == 0 {
170 return nil
171 }
172 return blobserver.StatBlobsParallelHelper(ctx, blobs, fn, c.httpGate, func(br blob.Ref) (workerSB blob.SizedRef, err error) {
173 err = c.doStat(ctx, []blob.Ref{br}, 0, false, func(sb blob.SizedRef) error {
174 workerSB = sb
175 c.haveCache.NoteBlobExists(sb.Ref, sb.Size)
176 return fn(sb)
177 })
178 return
179 })
180 }
181
182
183
184
185
186 func (c *Client) doStat(ctx context.Context, blobs []blob.Ref, wait time.Duration, gated bool, fn func(blob.SizedRef) error) error {
187 var buf bytes.Buffer
188 fmt.Fprintf(&buf, "camliversion=1")
189 if wait > 0 {
190 secs := int(wait.Seconds())
191 if secs == 0 {
192 secs = 1
193 }
194 fmt.Fprintf(&buf, "&maxwaitsec=%d", secs)
195 }
196 for i, blob := range blobs {
197 fmt.Fprintf(&buf, "&blob%d=%s", i+1, blob)
198 }
199
200 pfx, err := c.prefix()
201 if err != nil {
202 return err
203 }
204 req := c.newRequest(ctx, "POST", fmt.Sprintf("%s/camli/stat", pfx), &buf)
205 req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
206
207 var resp *http.Response
208 if gated {
209 resp, err = c.doReqGated(req)
210 } else {
211 resp, err = c.httpClient.Do(req)
212 }
213 if err != nil {
214 return fmt.Errorf("stat HTTP error: %w", err)
215 }
216 if resp.Body != nil {
217 defer resp.Body.Close()
218 }
219
220 if resp.StatusCode != 200 {
221 return fmt.Errorf("stat response had http status %d", resp.StatusCode)
222 }
223
224 stat, err := parseStatResponse(resp)
225 if err != nil {
226 return err
227 }
228 for _, sb := range stat.HaveMap {
229 if err := fn(sb); err != nil {
230 return err
231 }
232 }
233 return nil
234 }
235
236
237
238 func (h *UploadHandle) readerAndSize() (io.Reader, int64, error) {
239 if h.Size > 0 {
240 return h.Contents, int64(h.Size), nil
241 }
242 var b bytes.Buffer
243 n, err := io.Copy(&b, h.Contents)
244 if err != nil {
245 return nil, 0, err
246 }
247 return &b, n, nil
248 }
249
250
251 func (c *Client) Upload(ctx context.Context, h *UploadHandle) (*PutResult, error) {
252 errorf := func(msg string, arg ...interface{}) (*PutResult, error) {
253 err := fmt.Errorf(msg, arg...)
254 c.printf("%v", err)
255 return nil, err
256 }
257
258 bodyReader, bodySize, err := h.readerAndSize()
259 if err != nil {
260 return nil, fmt.Errorf("client: error slurping upload handle to find its length: %v", err)
261 }
262 if bodySize > constants.MaxBlobSize {
263 return nil, errors.New("client: body is bigger then max blob size")
264 }
265
266 c.statsMutex.Lock()
267 c.stats.UploadRequests.Blobs++
268 c.stats.UploadRequests.Bytes += bodySize
269 c.statsMutex.Unlock()
270
271 pr := &PutResult{BlobRef: h.BlobRef, Size: uint32(bodySize)}
272
273 if c.sto != nil {
274
275 _, err := blobserver.Receive(ctx, c.sto, h.BlobRef, bodyReader)
276 if err != nil {
277 return nil, err
278 }
279 return pr, nil
280 }
281
282 if !h.Vivify {
283 if _, ok := c.haveCache.StatBlobCache(h.BlobRef); ok {
284 pr.Skipped = true
285 return pr, nil
286 }
287 }
288
289 blobrefStr := h.BlobRef.String()
290
291
292
293 pfx, err := c.prefix()
294 if err != nil {
295 return nil, err
296 }
297
298 if !h.SkipStat {
299 url_ := fmt.Sprintf("%s/camli/stat", pfx)
300 req := c.newRequest(ctx, "POST", url_, strings.NewReader("camliversion=1&blob1="+blobrefStr))
301 req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
302
303 resp, err := c.doReqGated(req)
304 if err != nil {
305 return errorf("stat http error: %w", err)
306 }
307 defer resp.Body.Close()
308
309 if resp.StatusCode != 200 {
310 return errorf("stat response had http status %d", resp.StatusCode)
311 }
312
313 stat, err := parseStatResponse(resp)
314 if err != nil {
315 return nil, err
316 }
317 for _, sbr := range stat.HaveMap {
318 c.haveCache.NoteBlobExists(sbr.Ref, uint32(sbr.Size))
319 }
320 _, serverHasIt := stat.HaveMap[blobrefStr]
321 if env.DebugUploads() {
322 log.Printf("HTTP Stat(%s) = %v", blobrefStr, serverHasIt)
323 }
324 if !h.Vivify && serverHasIt {
325 pr.Skipped = true
326 if closer, ok := h.Contents.(io.Closer); ok {
327
328
329
330
331 closer.Close()
332 }
333 c.haveCache.NoteBlobExists(h.BlobRef, uint32(bodySize))
334 return pr, nil
335 }
336 }
337
338 if env.DebugUploads() {
339 log.Printf("Uploading: %s (%d bytes)", blobrefStr, bodySize)
340 }
341
342 pipeReader, pipeWriter := io.Pipe()
343 multipartWriter := multipart.NewWriter(pipeWriter)
344
345 copyResult := make(chan error, 1)
346 go func() {
347 defer pipeWriter.Close()
348 part, err := multipartWriter.CreateFormFile(blobrefStr, blobrefStr)
349 if err != nil {
350 copyResult <- err
351 return
352 }
353 _, err = io.Copy(part, bodyReader)
354 if err == nil {
355 err = multipartWriter.Close()
356 }
357 copyResult <- err
358 }()
359
360 uploadURL := fmt.Sprintf("%s/camli/upload", pfx)
361 req := c.newRequest(ctx, "POST", uploadURL)
362 req.Header.Set("Content-Type", multipartWriter.FormDataContentType())
363 if h.Vivify {
364 req.Header.Add("X-Camlistore-Vivify", "1")
365 }
366 req.Body = io.NopCloser(pipeReader)
367 req.ContentLength = getMultipartOverhead() + bodySize + int64(len(blobrefStr))*2
368 resp, err := c.doReqGated(req)
369 if err != nil {
370 return errorf("upload http error: %w", err)
371 }
372 defer resp.Body.Close()
373
374
375 if err := <-copyResult; err != nil {
376 return errorf("failed to copy contents into multipart writer: %w", err)
377 }
378
379
380 if resp.StatusCode != 200 && resp.StatusCode != http.StatusSeeOther {
381 return errorf("invalid http response %d in upload response", resp.StatusCode)
382 }
383
384 if resp.StatusCode == http.StatusSeeOther {
385 otherLocation := resp.Header.Get("Location")
386 if otherLocation == "" {
387 return errorf("303 without a Location")
388 }
389 baseURL, _ := url.Parse(uploadURL)
390 absURL, err := baseURL.Parse(otherLocation)
391 if err != nil {
392 return errorf("303 Location URL relative resolve error: %w", err)
393 }
394 otherLocation = absURL.String()
395 resp, err = http.Get(otherLocation)
396 if err != nil {
397 return errorf("error following 303 redirect after upload: %w", err)
398 }
399 }
400
401 var ures protocol.UploadResponse
402 if err := httputil.DecodeJSON(resp, &ures); err != nil {
403 return errorf("error in upload response: %w", err)
404 }
405
406 if ures.ErrorText != "" {
407 c.printf("Blob server reports error: %s", ures.ErrorText)
408 }
409
410 expectedSize := uint32(bodySize)
411
412 for _, sb := range ures.Received {
413 if sb.Ref != h.BlobRef {
414 continue
415 }
416 if sb.Size != expectedSize {
417 return errorf("Server got blob %v, but reports wrong length (%v; we sent %d)",
418 sb.Ref, sb.Size, expectedSize)
419 }
420 c.statsMutex.Lock()
421 c.stats.Uploads.Blobs++
422 c.stats.Uploads.Bytes += bodySize
423 c.statsMutex.Unlock()
424 if pr.Size <= 0 {
425 pr.Size = sb.Size
426 }
427 c.haveCache.NoteBlobExists(pr.BlobRef, pr.Size)
428 return pr, nil
429 }
430
431 return nil, errors.New("server didn't receive blob")
432 }
433
434
435 type FileUploadOptions struct {
436
437 FileInfo os.FileInfo
438
439
440
441 WholeRef blob.Ref
442 }
443
444
445
446
447
448
449
450 func (c *Client) UploadFile(ctx context.Context, filename string, contents io.Reader, opts *FileUploadOptions) (blob.Ref, error) {
451 fileMap := schema.NewFileMap(filename)
452 if opts != nil && opts.FileInfo != nil {
453 fileMap = schema.NewCommonFileMap(filename, opts.FileInfo)
454 modTime := opts.FileInfo.ModTime()
455 if !modTime.IsZero() {
456 fileMap.SetModTime(modTime)
457 }
458 }
459 fileMap.SetType(schema.TypeFile)
460
461 var wholeRef []blob.Ref
462 if opts != nil && opts.WholeRef.Valid() {
463 wholeRef = append(wholeRef, opts.WholeRef)
464 } else {
465 var buf bytes.Buffer
466 var err error
467 wholeRef, err = c.wholeRef(io.TeeReader(contents, &buf))
468 if err != nil {
469 return blob.Ref{}, err
470 }
471 contents = io.MultiReader(&buf, contents)
472 }
473
474 fileRef, err := c.fileMapFromDuplicate(ctx, fileMap, wholeRef)
475 if err != nil {
476 return blob.Ref{}, err
477 }
478 if fileRef.Valid() {
479 return fileRef, nil
480 }
481
482 return schema.WriteFileMap(ctx, c, fileMap, contents)
483 }
484
485
486
487
488
489
490
491 func (c *Client) wholeRef(contents io.Reader) ([]blob.Ref, error) {
492 hasLegacySHA1, err := c.HasLegacySHA1()
493 if err != nil {
494 return nil, fmt.Errorf("cannot discover if server has legacy sha1: %v", err)
495 }
496 td := hashutil.NewTrackDigestReader(contents)
497 td.DoLegacySHA1 = hasLegacySHA1
498 if _, err := io.Copy(io.Discard, td); err != nil {
499 return nil, err
500 }
501 refs := []blob.Ref{blob.RefFromHash(td.Hash())}
502 if td.DoLegacySHA1 {
503 refs = append(refs, blob.RefFromHash(td.LegacySHA1Hash()))
504 }
505 return refs, nil
506 }
507
508
509
510
511
512
513
514 func (c *Client) fileMapFromDuplicate(ctx context.Context, fileMap *schema.Builder, wholeRef []blob.Ref) (blob.Ref, error) {
515 dupFileRef, err := c.SearchExistingFileSchema(ctx, wholeRef...)
516 if err != nil {
517 return blob.Ref{}, err
518 }
519 if !dupFileRef.Valid() {
520
521 return blob.Ref{}, nil
522 }
523 dupMap, err := c.FetchSchemaBlob(ctx, dupFileRef)
524 if err != nil {
525 return blob.Ref{}, fmt.Errorf("could not find existing file blob for wholeRef %q: %v", wholeRef, err)
526 }
527 fileMap.PopulateParts(dupMap.PartsSize(), dupMap.ByteParts())
528 json, err := fileMap.JSON()
529 if err != nil {
530 return blob.Ref{}, fmt.Errorf("could not write file map for wholeRef %q: %v", wholeRef, err)
531 }
532 bref := blob.RefFromString(json)
533 if bref == dupFileRef {
534
535
536
537 return dupFileRef, nil
538 }
539 sbr, err := c.ReceiveBlob(ctx, bref, strings.NewReader(json))
540 if err != nil {
541 return blob.Ref{}, err
542 }
543 return sbr.Ref, nil
544 }