Home Download Docs Code Community
     1	/*
     2	Copyright 2011 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	package client
    18	
    19	import (
    20		"bytes"
    21		"context"
    22		"errors"
    23		"fmt"
    24		"io"
    25		"io/ioutil"
    26		"log"
    27		"mime/multipart"
    28		"net/http"
    29		"net/url"
    30		"os"
    31		"strings"
    32		"sync"
    33		"time"
    34	
    35		"perkeep.org/internal/hashutil"
    36		"perkeep.org/internal/httputil"
    37		"perkeep.org/pkg/blob"
    38		"perkeep.org/pkg/blobserver"
    39		"perkeep.org/pkg/blobserver/protocol"
    40		"perkeep.org/pkg/constants"
    41		"perkeep.org/pkg/env"
    42		"perkeep.org/pkg/schema"
    43	)
    44	
    45	// UploadHandle contains the parameters is a request to upload a blob.
    46	type UploadHandle struct {
    47		// BlobRef is the required blobref of the blob to upload.
    48		BlobRef blob.Ref
    49	
    50		// Contents is the blob data.
    51		Contents io.Reader
    52	
    53		// Size optionally specifies the size of Contents.
    54		// If <= 0, the Contents are slurped into memory to count the size.
    55		Size uint32
    56	
    57		// Vivify optionally instructs the server to create a
    58		// permanode for this blob. If used, the blob should be a
    59		// "file" schema blob. This is typically used by
    60		// lesser-trusted clients (such a mobile phones) which don't
    61		// have rights to do signing directly.
    62		Vivify bool
    63	
    64		// SkipStat indicates whether the stat check (checking whether
    65		// the server already has the blob) will be skipped and the
    66		// blob should be uploaded immediately. This is useful for
    67		// small blobs that the server is unlikely to already have
    68		// (e.g. new claims).
    69		SkipStat bool
    70	}
    71	
    72	type PutResult struct {
    73		BlobRef blob.Ref
    74		Size    uint32
    75		Skipped bool // already present on blobserver
    76	}
    77	
    78	func (pr *PutResult) SizedBlobRef() blob.SizedRef {
    79		return blob.SizedRef{pr.BlobRef, pr.Size}
    80	}
    81	
    82	// TODO: ditch this type and use protocol.StatResponse directly?
    83	// Or at least make HaveMap keyed by a blob.Ref instead of a string.
    84	type statResponse struct {
    85		HaveMap     map[string]blob.SizedRef
    86		canLongPoll bool
    87	}
    88	
    89	type ResponseFormatError error
    90	
    91	var (
    92		multipartOnce     sync.Once
    93		multipartOverhead int64
    94	)
    95	
    96	// multipartOverhead is how many extra bytes mime/multipart's
    97	// Writer adds around content
    98	func getMultipartOverhead() int64 {
    99		multipartOnce.Do(func() {
   100			var b bytes.Buffer
   101			w := multipart.NewWriter(&b)
   102			part, _ := w.CreateFormFile("0", "0")
   103	
   104			dummyContents := []byte("0")
   105			part.Write(dummyContents)
   106	
   107			w.Close()
   108			multipartOverhead = int64(b.Len()) - 3 // remove what was added
   109		})
   110		return multipartOverhead
   111	}
   112	
   113	func parseStatResponse(res *http.Response) (*statResponse, error) {
   114		var s = &statResponse{HaveMap: make(map[string]blob.SizedRef)}
   115		var pres protocol.StatResponse
   116		if err := httputil.DecodeJSON(res, &pres); err != nil {
   117			return nil, ResponseFormatError(err)
   118		}
   119	
   120		s.canLongPoll = pres.CanLongPoll
   121		for _, statItem := range pres.Stat {
   122			br := statItem.Ref
   123			if !br.Valid() {
   124				continue
   125			}
   126			s.HaveMap[br.String()] = blob.SizedRef{br, uint32(statItem.Size)}
   127		}
   128		return s, nil
   129	}
   130	
   131	// NewUploadHandleFromString returns an upload handle
   132	func NewUploadHandleFromString(data string) *UploadHandle {
   133		bref := blob.RefFromString(data)
   134		r := strings.NewReader(data)
   135		return &UploadHandle{BlobRef: bref, Size: uint32(len(data)), Contents: r}
   136	}
   137	
   138	// TODO(bradfitz): delete most of this. use new camlistore.org/pkg/blobserver/protocol types instead
   139	// of a map[string]interface{}.
   140	func (c *Client) responseJSONMap(requestName string, resp *http.Response) (map[string]interface{}, error) {
   141		if resp.StatusCode != 200 {
   142			c.printf("After %s request, failed to JSON from response; status code is %d", requestName, resp.StatusCode)
   143			io.Copy(os.Stderr, resp.Body)
   144			return nil, fmt.Errorf("after %s request, HTTP response code is %d; no JSON to parse", requestName, resp.StatusCode)
   145		}
   146		jmap := make(map[string]interface{})
   147		if err := httputil.DecodeJSON(resp, &jmap); err != nil {
   148			return nil, err
   149		}
   150		return jmap, nil
   151	}
   152	
   153	// statReq is a request to stat a blob.
   154	type statReq struct {
   155		br   blob.Ref
   156		dest chan<- blob.SizedRef // written to on success
   157		errc chan<- error         // written to on both failure and success (after any dest)
   158	}
   159	
   160	func (c *Client) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
   161		if c.sto != nil {
   162			return c.sto.StatBlobs(ctx, blobs, fn)
   163		}
   164		var needStat []blob.Ref
   165		for _, br := range blobs {
   166			if !br.Valid() {
   167				panic("invalid blob")
   168			}
   169			if size, ok := c.haveCache.StatBlobCache(br); ok {
   170				if err := fn(blob.SizedRef{br, size}); err != nil {
   171					return err
   172				}
   173			} else {
   174				needStat = append(needStat, br)
   175			}
   176		}
   177		if len(needStat) == 0 {
   178			return nil
   179		}
   180		return blobserver.StatBlobsParallelHelper(ctx, blobs, fn, c.httpGate, func(br blob.Ref) (workerSB blob.SizedRef, err error) {
   181			err = c.doStat(ctx, []blob.Ref{br}, 0, false, func(sb blob.SizedRef) error {
   182				workerSB = sb
   183				c.haveCache.NoteBlobExists(sb.Ref, sb.Size)
   184				return fn(sb)
   185			})
   186			return
   187		})
   188	}
   189	
   190	// doStat does an HTTP request for the stat. the number of blobs is used verbatim. No extra splitting
   191	// or batching is done at this layer.
   192	// The semantics are the same as blobserver.BlobStatter.
   193	// gate controls whether it uses httpGate to pause on requests.
   194	func (c *Client) doStat(ctx context.Context, blobs []blob.Ref, wait time.Duration, gated bool, fn func(blob.SizedRef) error) error {
   195		var buf bytes.Buffer
   196		fmt.Fprintf(&buf, "camliversion=1")
   197		if wait > 0 {
   198			secs := int(wait.Seconds())
   199			if secs == 0 {
   200				secs = 1
   201			}
   202			fmt.Fprintf(&buf, "&maxwaitsec=%d", secs)
   203		}
   204		for i, blob := range blobs {
   205			fmt.Fprintf(&buf, "&blob%d=%s", i+1, blob)
   206		}
   207	
   208		pfx, err := c.prefix()
   209		if err != nil {
   210			return err
   211		}
   212		req := c.newRequest(ctx, "POST", fmt.Sprintf("%s/camli/stat", pfx), &buf)
   213		req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
   214	
   215		var resp *http.Response
   216		if gated {
   217			resp, err = c.doReqGated(req)
   218		} else {
   219			resp, err = c.httpClient.Do(req)
   220		}
   221		if err != nil {
   222			return fmt.Errorf("stat HTTP error: %v", err)
   223		}
   224		if resp.Body != nil {
   225			defer resp.Body.Close()
   226		}
   227	
   228		if resp.StatusCode != 200 {
   229			return fmt.Errorf("stat response had http status %d", resp.StatusCode)
   230		}
   231	
   232		stat, err := parseStatResponse(resp)
   233		if err != nil {
   234			return err
   235		}
   236		for _, sb := range stat.HaveMap {
   237			if err := fn(sb); err != nil {
   238				return err
   239			}
   240		}
   241		return nil
   242	}
   243	
   244	// Figure out the size of the contents.
   245	// If the size was provided, trust it.
   246	func (h *UploadHandle) readerAndSize() (io.Reader, int64, error) {
   247		if h.Size > 0 {
   248			return h.Contents, int64(h.Size), nil
   249		}
   250		var b bytes.Buffer
   251		n, err := io.Copy(&b, h.Contents)
   252		if err != nil {
   253			return nil, 0, err
   254		}
   255		return &b, n, nil
   256	}
   257	
   258	// Upload uploads a blob, as described by the provided UploadHandle parameters.
   259	func (c *Client) Upload(ctx context.Context, h *UploadHandle) (*PutResult, error) {
   260		errorf := func(msg string, arg ...interface{}) (*PutResult, error) {
   261			err := fmt.Errorf(msg, arg...)
   262			c.printf("%v", err)
   263			return nil, err
   264		}
   265	
   266		bodyReader, bodySize, err := h.readerAndSize()
   267		if err != nil {
   268			return nil, fmt.Errorf("client: error slurping upload handle to find its length: %v", err)
   269		}
   270		if bodySize > constants.MaxBlobSize {
   271			return nil, errors.New("client: body is bigger then max blob size")
   272		}
   273	
   274		c.statsMutex.Lock()
   275		c.stats.UploadRequests.Blobs++
   276		c.stats.UploadRequests.Bytes += bodySize
   277		c.statsMutex.Unlock()
   278	
   279		pr := &PutResult{BlobRef: h.BlobRef, Size: uint32(bodySize)}
   280	
   281		if c.sto != nil {
   282			// TODO: stat first so we can show skipped?
   283			_, err := blobserver.Receive(ctx, c.sto, h.BlobRef, bodyReader)
   284			if err != nil {
   285				return nil, err
   286			}
   287			return pr, nil
   288		}
   289	
   290		if !h.Vivify {
   291			if _, ok := c.haveCache.StatBlobCache(h.BlobRef); ok {
   292				pr.Skipped = true
   293				return pr, nil
   294			}
   295		}
   296	
   297		blobrefStr := h.BlobRef.String()
   298	
   299		// Pre-upload. Check whether the blob already exists on the
   300		// server and if not, the URL to upload it to.
   301		pfx, err := c.prefix()
   302		if err != nil {
   303			return nil, err
   304		}
   305	
   306		if !h.SkipStat {
   307			url_ := fmt.Sprintf("%s/camli/stat", pfx)
   308			req := c.newRequest(ctx, "POST", url_, strings.NewReader("camliversion=1&blob1="+blobrefStr))
   309			req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
   310	
   311			resp, err := c.doReqGated(req)
   312			if err != nil {
   313				return errorf("stat http error: %v", err)
   314			}
   315			defer resp.Body.Close()
   316	
   317			if resp.StatusCode != 200 {
   318				return errorf("stat response had http status %d", resp.StatusCode)
   319			}
   320	
   321			stat, err := parseStatResponse(resp)
   322			if err != nil {
   323				return nil, err
   324			}
   325			for _, sbr := range stat.HaveMap {
   326				c.haveCache.NoteBlobExists(sbr.Ref, uint32(sbr.Size))
   327			}
   328			_, serverHasIt := stat.HaveMap[blobrefStr]
   329			if env.DebugUploads() {
   330				log.Printf("HTTP Stat(%s) = %v", blobrefStr, serverHasIt)
   331			}
   332			if !h.Vivify && serverHasIt {
   333				pr.Skipped = true
   334				if closer, ok := h.Contents.(io.Closer); ok {
   335					// TODO(bradfitz): I did this
   336					// Close-if-possible thing early on, before I
   337					// knew better.  Fix the callers instead, and
   338					// fix the docs.
   339					closer.Close()
   340				}
   341				c.haveCache.NoteBlobExists(h.BlobRef, uint32(bodySize))
   342				return pr, nil
   343			}
   344		}
   345	
   346		if env.DebugUploads() {
   347			log.Printf("Uploading: %s (%d bytes)", blobrefStr, bodySize)
   348		}
   349	
   350		pipeReader, pipeWriter := io.Pipe()
   351		multipartWriter := multipart.NewWriter(pipeWriter)
   352	
   353		copyResult := make(chan error, 1)
   354		go func() {
   355			defer pipeWriter.Close()
   356			part, err := multipartWriter.CreateFormFile(blobrefStr, blobrefStr)
   357			if err != nil {
   358				copyResult <- err
   359				return
   360			}
   361			_, err = io.Copy(part, bodyReader)
   362			if err == nil {
   363				err = multipartWriter.Close()
   364			}
   365			copyResult <- err
   366		}()
   367	
   368		uploadURL := fmt.Sprintf("%s/camli/upload", pfx)
   369		req := c.newRequest(ctx, "POST", uploadURL)
   370		req.Header.Set("Content-Type", multipartWriter.FormDataContentType())
   371		if h.Vivify {
   372			req.Header.Add("X-Camlistore-Vivify", "1")
   373		}
   374		req.Body = ioutil.NopCloser(pipeReader)
   375		req.ContentLength = getMultipartOverhead() + bodySize + int64(len(blobrefStr))*2
   376		resp, err := c.doReqGated(req)
   377		if err != nil {
   378			return errorf("upload http error: %v", err)
   379		}
   380		defer resp.Body.Close()
   381	
   382		// check error from earlier copy
   383		if err := <-copyResult; err != nil {
   384			return errorf("failed to copy contents into multipart writer: %v", err)
   385		}
   386	
   387		// The only valid HTTP responses are 200 and 303.
   388		if resp.StatusCode != 200 && resp.StatusCode != 303 {
   389			return errorf("invalid http response %d in upload response", resp.StatusCode)
   390		}
   391	
   392		if resp.StatusCode == 303 {
   393			otherLocation := resp.Header.Get("Location")
   394			if otherLocation == "" {
   395				return errorf("303 without a Location")
   396			}
   397			baseURL, _ := url.Parse(uploadURL)
   398			absURL, err := baseURL.Parse(otherLocation)
   399			if err != nil {
   400				return errorf("303 Location URL relative resolve error: %v", err)
   401			}
   402			otherLocation = absURL.String()
   403			resp, err = http.Get(otherLocation)
   404			if err != nil {
   405				return errorf("error following 303 redirect after upload: %v", err)
   406			}
   407		}
   408	
   409		var ures protocol.UploadResponse
   410		if err := httputil.DecodeJSON(resp, &ures); err != nil {
   411			return errorf("error in upload response: %v", err)
   412		}
   413	
   414		if ures.ErrorText != "" {
   415			c.printf("Blob server reports error: %s", ures.ErrorText)
   416		}
   417	
   418		expectedSize := uint32(bodySize)
   419	
   420		for _, sb := range ures.Received {
   421			if sb.Ref != h.BlobRef {
   422				continue
   423			}
   424			if sb.Size != expectedSize {
   425				return errorf("Server got blob %v, but reports wrong length (%v; we sent %d)",
   426					sb.Ref, sb.Size, expectedSize)
   427			}
   428			c.statsMutex.Lock()
   429			c.stats.Uploads.Blobs++
   430			c.stats.Uploads.Bytes += bodySize
   431			c.statsMutex.Unlock()
   432			if pr.Size <= 0 {
   433				pr.Size = sb.Size
   434			}
   435			c.haveCache.NoteBlobExists(pr.BlobRef, pr.Size)
   436			return pr, nil
   437		}
   438	
   439		return nil, errors.New("server didn't receive blob")
   440	}
   441	
   442	// FileUploadOptions is optionally provided to UploadFile.
   443	type FileUploadOptions struct {
   444		// FileInfo optionally specifies the FileInfo to populate the schema of the file blob.
   445		FileInfo os.FileInfo
   446		// WholeRef optionally specifies the digest of the uploaded file contents, which
   447		// allows UploadFile to skip computing the digest (needed to check if the contents
   448		// are already on the server).
   449		WholeRef blob.Ref
   450	}
   451	
   452	// UploadFile uploads the contents of the file, as well as a file blob with
   453	// filename for these contents. If the contents or the file blob are found on
   454	// the server, they're not uploaded.
   455	//
   456	// Note: this method is still a work in progress, and might change to accommodate
   457	// the needs of pk-put file.
   458	func (c *Client) UploadFile(ctx context.Context, filename string, contents io.Reader, opts *FileUploadOptions) (blob.Ref, error) {
   459		fileMap := schema.NewFileMap(filename)
   460		if opts != nil && opts.FileInfo != nil {
   461			fileMap = schema.NewCommonFileMap(filename, opts.FileInfo)
   462			modTime := opts.FileInfo.ModTime()
   463			if !modTime.IsZero() {
   464				fileMap.SetModTime(modTime)
   465			}
   466		}
   467		fileMap.SetType("file")
   468	
   469		var wholeRef []blob.Ref
   470		if opts != nil && opts.WholeRef.Valid() {
   471			wholeRef = append(wholeRef, opts.WholeRef)
   472		} else {
   473			var buf bytes.Buffer
   474			var err error
   475			wholeRef, err = c.wholeRef(io.TeeReader(contents, &buf))
   476			if err != nil {
   477				return blob.Ref{}, err
   478			}
   479			contents = io.MultiReader(&buf, contents)
   480		}
   481	
   482		fileRef, err := c.fileMapFromDuplicate(ctx, fileMap, wholeRef)
   483		if err != nil {
   484			return blob.Ref{}, err
   485		}
   486		if fileRef.Valid() {
   487			return fileRef, nil
   488		}
   489	
   490		return schema.WriteFileMap(ctx, c, fileMap, contents)
   491	}
   492	
   493	// TODO(mpl): replace up.wholeFileDigest in pk-put with c.wholeRef maybe.
   494	
   495	// wholeRef returns the blob ref(s) of the regular file's contents
   496	// as if it were one entire blob (ignoring blob size limits).
   497	// By default, only one ref is returned, unless the server has advertised
   498	// that it has indexes calculated for other hash functions.
   499	func (c *Client) wholeRef(contents io.Reader) ([]blob.Ref, error) {
   500		hasLegacySHA1, err := c.HasLegacySHA1()
   501		if err != nil {
   502			return nil, fmt.Errorf("cannot discover if server has legacy sha1: %v", err)
   503		}
   504		td := hashutil.NewTrackDigestReader(contents)
   505		td.DoLegacySHA1 = hasLegacySHA1
   506		if _, err := io.Copy(ioutil.Discard, td); err != nil {
   507			return nil, err
   508		}
   509		refs := []blob.Ref{blob.RefFromHash(td.Hash())}
   510		if td.DoLegacySHA1 {
   511			refs = append(refs, blob.RefFromHash(td.LegacySHA1Hash()))
   512		}
   513		return refs, nil
   514	}
   515	
   516	// fileMapFromDuplicate queries the server's search interface for an
   517	// existing file blob for the file contents any of wholeRef.
   518	// If the server has it, it's validated, and then fileMap (which must
   519	// already be partially populated) has its "parts" field populated,
   520	// and then fileMap is uploaded (if necessary).
   521	// If no file blob is found, a zero blob.Ref (and no error) is returned.
   522	func (c *Client) fileMapFromDuplicate(ctx context.Context, fileMap *schema.Builder, wholeRef []blob.Ref) (blob.Ref, error) {
   523		dupFileRef, err := c.SearchExistingFileSchema(ctx, wholeRef...)
   524		if err != nil {
   525			return blob.Ref{}, err
   526		}
   527		if !dupFileRef.Valid() {
   528			// because SearchExistingFileSchema returns blob.Ref{}, nil when file is not found.
   529			return blob.Ref{}, nil
   530		}
   531		dupMap, err := c.FetchSchemaBlob(ctx, dupFileRef)
   532		if err != nil {
   533			return blob.Ref{}, fmt.Errorf("could not find existing file blob for wholeRef %q: %v", wholeRef, err)
   534		}
   535		fileMap.PopulateParts(dupMap.PartsSize(), dupMap.ByteParts())
   536		json, err := fileMap.JSON()
   537		if err != nil {
   538			return blob.Ref{}, fmt.Errorf("could not write file map for wholeRef %q: %v", wholeRef, err)
   539		}
   540		bref := blob.RefFromString(json)
   541		if bref == dupFileRef {
   542			// Unchanged (same filename, modtime, JSON serialization, etc)
   543			// Different signer (e.g. existing file has a sha1 signer, and
   544			// we're now using a sha224 signer) means we upload a new file schema.
   545			return dupFileRef, nil
   546		}
   547		sbr, err := c.ReceiveBlob(ctx, bref, strings.NewReader(json))
   548		if err != nil {
   549			return blob.Ref{}, err
   550		}
   551		return sbr.Ref, nil
   552	}
Website layout inspired by memcached.
Content by the authors.