1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package client
18
19 import (
20 "bytes"
21 "context"
22 "errors"
23 "fmt"
24 "io"
25 "io/ioutil"
26 "log"
27 "mime/multipart"
28 "net/http"
29 "net/url"
30 "os"
31 "strings"
32 "sync"
33 "time"
34
35 "perkeep.org/internal/hashutil"
36 "perkeep.org/internal/httputil"
37 "perkeep.org/pkg/blob"
38 "perkeep.org/pkg/blobserver"
39 "perkeep.org/pkg/blobserver/protocol"
40 "perkeep.org/pkg/constants"
41 "perkeep.org/pkg/env"
42 "perkeep.org/pkg/schema"
43 )
44
45
46 type UploadHandle struct {
47
48 BlobRef blob.Ref
49
50
51 Contents io.Reader
52
53
54
55 Size uint32
56
57
58
59
60
61
62 Vivify bool
63
64
65
66
67
68
69 SkipStat bool
70 }
71
72 type PutResult struct {
73 BlobRef blob.Ref
74 Size uint32
75 Skipped bool
76 }
77
78 func (pr *PutResult) SizedBlobRef() blob.SizedRef {
79 return blob.SizedRef{Ref: pr.BlobRef, Size: pr.Size}
80 }
81
82
83
84 type statResponse struct {
85 HaveMap map[string]blob.SizedRef
86 canLongPoll bool
87 }
88
89 type ResponseFormatError error
90
91 var (
92 multipartOnce sync.Once
93 multipartOverhead int64
94 )
95
96
97
98 func getMultipartOverhead() int64 {
99 multipartOnce.Do(func() {
100 var b bytes.Buffer
101 w := multipart.NewWriter(&b)
102 part, _ := w.CreateFormFile("0", "0")
103
104 dummyContents := []byte("0")
105 part.Write(dummyContents)
106
107 w.Close()
108 multipartOverhead = int64(b.Len()) - 3
109 })
110 return multipartOverhead
111 }
112
113 func parseStatResponse(res *http.Response) (*statResponse, error) {
114 var s = &statResponse{HaveMap: make(map[string]blob.SizedRef)}
115 var pres protocol.StatResponse
116 if err := httputil.DecodeJSON(res, &pres); err != nil {
117 return nil, ResponseFormatError(err)
118 }
119
120 s.canLongPoll = pres.CanLongPoll
121 for _, statItem := range pres.Stat {
122 br := statItem.Ref
123 if !br.Valid() {
124 continue
125 }
126 s.HaveMap[br.String()] = blob.SizedRef{Ref: br, Size: uint32(statItem.Size)}
127 }
128 return s, nil
129 }
130
131
132 func NewUploadHandleFromString(data string) *UploadHandle {
133 bref := blob.RefFromString(data)
134 r := strings.NewReader(data)
135 return &UploadHandle{BlobRef: bref, Size: uint32(len(data)), Contents: r}
136 }
137
138
139
140 func (c *Client) responseJSONMap(requestName string, resp *http.Response) (map[string]interface{}, error) {
141 if resp.StatusCode != 200 {
142 c.printf("After %s request, failed to JSON from response; status code is %d", requestName, resp.StatusCode)
143 io.Copy(os.Stderr, resp.Body)
144 return nil, fmt.Errorf("after %s request, HTTP response code is %d; no JSON to parse", requestName, resp.StatusCode)
145 }
146 jmap := make(map[string]interface{})
147 if err := httputil.DecodeJSON(resp, &jmap); err != nil {
148 return nil, err
149 }
150 return jmap, nil
151 }
152
153 func (c *Client) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
154 if c.sto != nil {
155 return c.sto.StatBlobs(ctx, blobs, fn)
156 }
157 var needStat []blob.Ref
158 for _, br := range blobs {
159 if !br.Valid() {
160 panic("invalid blob")
161 }
162 if size, ok := c.haveCache.StatBlobCache(br); ok {
163 if err := fn(blob.SizedRef{Ref: br, Size: size}); err != nil {
164 return err
165 }
166 } else {
167 needStat = append(needStat, br)
168 }
169 }
170 if len(needStat) == 0 {
171 return nil
172 }
173 return blobserver.StatBlobsParallelHelper(ctx, blobs, fn, c.httpGate, func(br blob.Ref) (workerSB blob.SizedRef, err error) {
174 err = c.doStat(ctx, []blob.Ref{br}, 0, false, func(sb blob.SizedRef) error {
175 workerSB = sb
176 c.haveCache.NoteBlobExists(sb.Ref, sb.Size)
177 return fn(sb)
178 })
179 return
180 })
181 }
182
183
184
185
186
187 func (c *Client) doStat(ctx context.Context, blobs []blob.Ref, wait time.Duration, gated bool, fn func(blob.SizedRef) error) error {
188 var buf bytes.Buffer
189 fmt.Fprintf(&buf, "camliversion=1")
190 if wait > 0 {
191 secs := int(wait.Seconds())
192 if secs == 0 {
193 secs = 1
194 }
195 fmt.Fprintf(&buf, "&maxwaitsec=%d", secs)
196 }
197 for i, blob := range blobs {
198 fmt.Fprintf(&buf, "&blob%d=%s", i+1, blob)
199 }
200
201 pfx, err := c.prefix()
202 if err != nil {
203 return err
204 }
205 req := c.newRequest(ctx, "POST", fmt.Sprintf("%s/camli/stat", pfx), &buf)
206 req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
207
208 var resp *http.Response
209 if gated {
210 resp, err = c.doReqGated(req)
211 } else {
212 resp, err = c.httpClient.Do(req)
213 }
214 if err != nil {
215 return fmt.Errorf("stat HTTP error: %v", err)
216 }
217 if resp.Body != nil {
218 defer resp.Body.Close()
219 }
220
221 if resp.StatusCode != 200 {
222 return fmt.Errorf("stat response had http status %d", resp.StatusCode)
223 }
224
225 stat, err := parseStatResponse(resp)
226 if err != nil {
227 return err
228 }
229 for _, sb := range stat.HaveMap {
230 if err := fn(sb); err != nil {
231 return err
232 }
233 }
234 return nil
235 }
236
237
238
239 func (h *UploadHandle) readerAndSize() (io.Reader, int64, error) {
240 if h.Size > 0 {
241 return h.Contents, int64(h.Size), nil
242 }
243 var b bytes.Buffer
244 n, err := io.Copy(&b, h.Contents)
245 if err != nil {
246 return nil, 0, err
247 }
248 return &b, n, nil
249 }
250
251
252 func (c *Client) Upload(ctx context.Context, h *UploadHandle) (*PutResult, error) {
253 errorf := func(msg string, arg ...interface{}) (*PutResult, error) {
254 err := fmt.Errorf(msg, arg...)
255 c.printf("%v", err)
256 return nil, err
257 }
258
259 bodyReader, bodySize, err := h.readerAndSize()
260 if err != nil {
261 return nil, fmt.Errorf("client: error slurping upload handle to find its length: %v", err)
262 }
263 if bodySize > constants.MaxBlobSize {
264 return nil, errors.New("client: body is bigger then max blob size")
265 }
266
267 c.statsMutex.Lock()
268 c.stats.UploadRequests.Blobs++
269 c.stats.UploadRequests.Bytes += bodySize
270 c.statsMutex.Unlock()
271
272 pr := &PutResult{BlobRef: h.BlobRef, Size: uint32(bodySize)}
273
274 if c.sto != nil {
275
276 _, err := blobserver.Receive(ctx, c.sto, h.BlobRef, bodyReader)
277 if err != nil {
278 return nil, err
279 }
280 return pr, nil
281 }
282
283 if !h.Vivify {
284 if _, ok := c.haveCache.StatBlobCache(h.BlobRef); ok {
285 pr.Skipped = true
286 return pr, nil
287 }
288 }
289
290 blobrefStr := h.BlobRef.String()
291
292
293
294 pfx, err := c.prefix()
295 if err != nil {
296 return nil, err
297 }
298
299 if !h.SkipStat {
300 url_ := fmt.Sprintf("%s/camli/stat", pfx)
301 req := c.newRequest(ctx, "POST", url_, strings.NewReader("camliversion=1&blob1="+blobrefStr))
302 req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
303
304 resp, err := c.doReqGated(req)
305 if err != nil {
306 return errorf("stat http error: %v", err)
307 }
308 defer resp.Body.Close()
309
310 if resp.StatusCode != 200 {
311 return errorf("stat response had http status %d", resp.StatusCode)
312 }
313
314 stat, err := parseStatResponse(resp)
315 if err != nil {
316 return nil, err
317 }
318 for _, sbr := range stat.HaveMap {
319 c.haveCache.NoteBlobExists(sbr.Ref, uint32(sbr.Size))
320 }
321 _, serverHasIt := stat.HaveMap[blobrefStr]
322 if env.DebugUploads() {
323 log.Printf("HTTP Stat(%s) = %v", blobrefStr, serverHasIt)
324 }
325 if !h.Vivify && serverHasIt {
326 pr.Skipped = true
327 if closer, ok := h.Contents.(io.Closer); ok {
328
329
330
331
332 closer.Close()
333 }
334 c.haveCache.NoteBlobExists(h.BlobRef, uint32(bodySize))
335 return pr, nil
336 }
337 }
338
339 if env.DebugUploads() {
340 log.Printf("Uploading: %s (%d bytes)", blobrefStr, bodySize)
341 }
342
343 pipeReader, pipeWriter := io.Pipe()
344 multipartWriter := multipart.NewWriter(pipeWriter)
345
346 copyResult := make(chan error, 1)
347 go func() {
348 defer pipeWriter.Close()
349 part, err := multipartWriter.CreateFormFile(blobrefStr, blobrefStr)
350 if err != nil {
351 copyResult <- err
352 return
353 }
354 _, err = io.Copy(part, bodyReader)
355 if err == nil {
356 err = multipartWriter.Close()
357 }
358 copyResult <- err
359 }()
360
361 uploadURL := fmt.Sprintf("%s/camli/upload", pfx)
362 req := c.newRequest(ctx, "POST", uploadURL)
363 req.Header.Set("Content-Type", multipartWriter.FormDataContentType())
364 if h.Vivify {
365 req.Header.Add("X-Camlistore-Vivify", "1")
366 }
367 req.Body = ioutil.NopCloser(pipeReader)
368 req.ContentLength = getMultipartOverhead() + bodySize + int64(len(blobrefStr))*2
369 resp, err := c.doReqGated(req)
370 if err != nil {
371 return errorf("upload http error: %v", err)
372 }
373 defer resp.Body.Close()
374
375
376 if err := <-copyResult; err != nil {
377 return errorf("failed to copy contents into multipart writer: %v", err)
378 }
379
380
381 if resp.StatusCode != 200 && resp.StatusCode != 303 {
382 return errorf("invalid http response %d in upload response", resp.StatusCode)
383 }
384
385 if resp.StatusCode == 303 {
386 otherLocation := resp.Header.Get("Location")
387 if otherLocation == "" {
388 return errorf("303 without a Location")
389 }
390 baseURL, _ := url.Parse(uploadURL)
391 absURL, err := baseURL.Parse(otherLocation)
392 if err != nil {
393 return errorf("303 Location URL relative resolve error: %v", err)
394 }
395 otherLocation = absURL.String()
396 resp, err = http.Get(otherLocation)
397 if err != nil {
398 return errorf("error following 303 redirect after upload: %v", err)
399 }
400 }
401
402 var ures protocol.UploadResponse
403 if err := httputil.DecodeJSON(resp, &ures); err != nil {
404 return errorf("error in upload response: %v", err)
405 }
406
407 if ures.ErrorText != "" {
408 c.printf("Blob server reports error: %s", ures.ErrorText)
409 }
410
411 expectedSize := uint32(bodySize)
412
413 for _, sb := range ures.Received {
414 if sb.Ref != h.BlobRef {
415 continue
416 }
417 if sb.Size != expectedSize {
418 return errorf("Server got blob %v, but reports wrong length (%v; we sent %d)",
419 sb.Ref, sb.Size, expectedSize)
420 }
421 c.statsMutex.Lock()
422 c.stats.Uploads.Blobs++
423 c.stats.Uploads.Bytes += bodySize
424 c.statsMutex.Unlock()
425 if pr.Size <= 0 {
426 pr.Size = sb.Size
427 }
428 c.haveCache.NoteBlobExists(pr.BlobRef, pr.Size)
429 return pr, nil
430 }
431
432 return nil, errors.New("server didn't receive blob")
433 }
434
435
436 type FileUploadOptions struct {
437
438 FileInfo os.FileInfo
439
440
441
442 WholeRef blob.Ref
443 }
444
445
446
447
448
449
450
451 func (c *Client) UploadFile(ctx context.Context, filename string, contents io.Reader, opts *FileUploadOptions) (blob.Ref, error) {
452 fileMap := schema.NewFileMap(filename)
453 if opts != nil && opts.FileInfo != nil {
454 fileMap = schema.NewCommonFileMap(filename, opts.FileInfo)
455 modTime := opts.FileInfo.ModTime()
456 if !modTime.IsZero() {
457 fileMap.SetModTime(modTime)
458 }
459 }
460 fileMap.SetType(schema.TypeFile)
461
462 var wholeRef []blob.Ref
463 if opts != nil && opts.WholeRef.Valid() {
464 wholeRef = append(wholeRef, opts.WholeRef)
465 } else {
466 var buf bytes.Buffer
467 var err error
468 wholeRef, err = c.wholeRef(io.TeeReader(contents, &buf))
469 if err != nil {
470 return blob.Ref{}, err
471 }
472 contents = io.MultiReader(&buf, contents)
473 }
474
475 fileRef, err := c.fileMapFromDuplicate(ctx, fileMap, wholeRef)
476 if err != nil {
477 return blob.Ref{}, err
478 }
479 if fileRef.Valid() {
480 return fileRef, nil
481 }
482
483 return schema.WriteFileMap(ctx, c, fileMap, contents)
484 }
485
486
487
488
489
490
491
492 func (c *Client) wholeRef(contents io.Reader) ([]blob.Ref, error) {
493 hasLegacySHA1, err := c.HasLegacySHA1()
494 if err != nil {
495 return nil, fmt.Errorf("cannot discover if server has legacy sha1: %v", err)
496 }
497 td := hashutil.NewTrackDigestReader(contents)
498 td.DoLegacySHA1 = hasLegacySHA1
499 if _, err := io.Copy(ioutil.Discard, td); err != nil {
500 return nil, err
501 }
502 refs := []blob.Ref{blob.RefFromHash(td.Hash())}
503 if td.DoLegacySHA1 {
504 refs = append(refs, blob.RefFromHash(td.LegacySHA1Hash()))
505 }
506 return refs, nil
507 }
508
509
510
511
512
513
514
515 func (c *Client) fileMapFromDuplicate(ctx context.Context, fileMap *schema.Builder, wholeRef []blob.Ref) (blob.Ref, error) {
516 dupFileRef, err := c.SearchExistingFileSchema(ctx, wholeRef...)
517 if err != nil {
518 return blob.Ref{}, err
519 }
520 if !dupFileRef.Valid() {
521
522 return blob.Ref{}, nil
523 }
524 dupMap, err := c.FetchSchemaBlob(ctx, dupFileRef)
525 if err != nil {
526 return blob.Ref{}, fmt.Errorf("could not find existing file blob for wholeRef %q: %v", wholeRef, err)
527 }
528 fileMap.PopulateParts(dupMap.PartsSize(), dupMap.ByteParts())
529 json, err := fileMap.JSON()
530 if err != nil {
531 return blob.Ref{}, fmt.Errorf("could not write file map for wholeRef %q: %v", wholeRef, err)
532 }
533 bref := blob.RefFromString(json)
534 if bref == dupFileRef {
535
536
537
538 return dupFileRef, nil
539 }
540 sbr, err := c.ReceiveBlob(ctx, bref, strings.NewReader(json))
541 if err != nil {
542 return blob.Ref{}, err
543 }
544 return sbr.Ref, nil
545 }