Home Download Docs Code Community
     1	/*
     2	Copyright 2011 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	package client
    18	
    19	import (
    20		"bytes"
    21		"context"
    22		"errors"
    23		"fmt"
    24		"io"
    25		"log"
    26		"mime/multipart"
    27		"net/http"
    28		"net/url"
    29		"os"
    30		"strings"
    31		"sync"
    32		"time"
    33	
    34		"perkeep.org/internal/hashutil"
    35		"perkeep.org/internal/httputil"
    36		"perkeep.org/pkg/blob"
    37		"perkeep.org/pkg/blobserver"
    38		"perkeep.org/pkg/blobserver/protocol"
    39		"perkeep.org/pkg/constants"
    40		"perkeep.org/pkg/env"
    41		"perkeep.org/pkg/schema"
    42	)
    43	
    44	// UploadHandle contains the parameters is a request to upload a blob.
    45	type UploadHandle struct {
    46		// BlobRef is the required blobref of the blob to upload.
    47		BlobRef blob.Ref
    48	
    49		// Contents is the blob data.
    50		Contents io.Reader
    51	
    52		// Size optionally specifies the size of Contents.
    53		// If <= 0, the Contents are slurped into memory to count the size.
    54		Size uint32
    55	
    56		// Vivify optionally instructs the server to create a
    57		// permanode for this blob. If used, the blob should be a
    58		// "file" schema blob. This is typically used by
    59		// lesser-trusted clients (such a mobile phones) which don't
    60		// have rights to do signing directly.
    61		Vivify bool
    62	
    63		// SkipStat indicates whether the stat check (checking whether
    64		// the server already has the blob) will be skipped and the
    65		// blob should be uploaded immediately. This is useful for
    66		// small blobs that the server is unlikely to already have
    67		// (e.g. new claims).
    68		SkipStat bool
    69	}
    70	
    71	type PutResult struct {
    72		BlobRef blob.Ref
    73		Size    uint32
    74		Skipped bool // already present on blobserver
    75	}
    76	
    77	func (pr *PutResult) SizedBlobRef() blob.SizedRef {
    78		return blob.SizedRef{Ref: pr.BlobRef, Size: pr.Size}
    79	}
    80	
    81	// TODO: ditch this type and use protocol.StatResponse directly?
    82	// Or at least make HaveMap keyed by a blob.Ref instead of a string.
    83	type statResponse struct {
    84		HaveMap     map[string]blob.SizedRef
    85		canLongPoll bool
    86	}
    87	
    88	type ResponseFormatError error
    89	
    90	var (
    91		multipartOnce     sync.Once
    92		multipartOverhead int64
    93	)
    94	
    95	// multipartOverhead is how many extra bytes mime/multipart's
    96	// Writer adds around content
    97	func getMultipartOverhead() int64 {
    98		multipartOnce.Do(func() {
    99			var b bytes.Buffer
   100			w := multipart.NewWriter(&b)
   101			part, _ := w.CreateFormFile("0", "0")
   102	
   103			dummyContents := []byte("0")
   104			part.Write(dummyContents)
   105	
   106			w.Close()
   107			multipartOverhead = int64(b.Len()) - 3 // remove what was added
   108		})
   109		return multipartOverhead
   110	}
   111	
   112	func parseStatResponse(res *http.Response) (*statResponse, error) {
   113		var s = &statResponse{HaveMap: make(map[string]blob.SizedRef)}
   114		var pres protocol.StatResponse
   115		if err := httputil.DecodeJSON(res, &pres); err != nil {
   116			return nil, ResponseFormatError(err)
   117		}
   118	
   119		s.canLongPoll = pres.CanLongPoll
   120		for _, statItem := range pres.Stat {
   121			br := statItem.Ref
   122			if !br.Valid() {
   123				continue
   124			}
   125			s.HaveMap[br.String()] = blob.SizedRef{Ref: br, Size: uint32(statItem.Size)}
   126		}
   127		return s, nil
   128	}
   129	
   130	// NewUploadHandleFromString returns an upload handle
   131	func NewUploadHandleFromString(data string) *UploadHandle {
   132		bref := blob.RefFromString(data)
   133		r := strings.NewReader(data)
   134		return &UploadHandle{BlobRef: bref, Size: uint32(len(data)), Contents: r}
   135	}
   136	
   137	// TODO(bradfitz): delete most of this. use new perkeep.org/pkg/blobserver/protocol types instead
   138	// of a map[string]interface{}.
   139	func (c *Client) responseJSONMap(requestName string, resp *http.Response) (map[string]interface{}, error) {
   140		if resp.StatusCode != 200 {
   141			c.printf("After %s request, failed to JSON from response; status code is %d", requestName, resp.StatusCode)
   142			io.Copy(os.Stderr, resp.Body)
   143			return nil, fmt.Errorf("after %s request, HTTP response code is %d; no JSON to parse", requestName, resp.StatusCode)
   144		}
   145		jmap := make(map[string]interface{})
   146		if err := httputil.DecodeJSON(resp, &jmap); err != nil {
   147			return nil, err
   148		}
   149		return jmap, nil
   150	}
   151	
   152	func (c *Client) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
   153		if c.sto != nil {
   154			return c.sto.StatBlobs(ctx, blobs, fn)
   155		}
   156		var needStat []blob.Ref
   157		for _, br := range blobs {
   158			if !br.Valid() {
   159				panic("invalid blob")
   160			}
   161			if size, ok := c.haveCache.StatBlobCache(br); ok {
   162				if err := fn(blob.SizedRef{Ref: br, Size: size}); err != nil {
   163					return err
   164				}
   165			} else {
   166				needStat = append(needStat, br)
   167			}
   168		}
   169		if len(needStat) == 0 {
   170			return nil
   171		}
   172		return blobserver.StatBlobsParallelHelper(ctx, blobs, fn, c.httpGate, func(br blob.Ref) (workerSB blob.SizedRef, err error) {
   173			err = c.doStat(ctx, []blob.Ref{br}, 0, false, func(sb blob.SizedRef) error {
   174				workerSB = sb
   175				c.haveCache.NoteBlobExists(sb.Ref, sb.Size)
   176				return fn(sb)
   177			})
   178			return
   179		})
   180	}
   181	
   182	// doStat does an HTTP request for the stat. the number of blobs is used verbatim. No extra splitting
   183	// or batching is done at this layer.
   184	// The semantics are the same as blobserver.BlobStatter.
   185	// gate controls whether it uses httpGate to pause on requests.
   186	func (c *Client) doStat(ctx context.Context, blobs []blob.Ref, wait time.Duration, gated bool, fn func(blob.SizedRef) error) error {
   187		var buf bytes.Buffer
   188		fmt.Fprintf(&buf, "camliversion=1")
   189		if wait > 0 {
   190			secs := int(wait.Seconds())
   191			if secs == 0 {
   192				secs = 1
   193			}
   194			fmt.Fprintf(&buf, "&maxwaitsec=%d", secs)
   195		}
   196		for i, blob := range blobs {
   197			fmt.Fprintf(&buf, "&blob%d=%s", i+1, blob)
   198		}
   199	
   200		pfx, err := c.prefix()
   201		if err != nil {
   202			return err
   203		}
   204		req := c.newRequest(ctx, "POST", fmt.Sprintf("%s/camli/stat", pfx), &buf)
   205		req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
   206	
   207		var resp *http.Response
   208		if gated {
   209			resp, err = c.doReqGated(req)
   210		} else {
   211			resp, err = c.httpClient.Do(req)
   212		}
   213		if err != nil {
   214			return fmt.Errorf("stat HTTP error: %w", err)
   215		}
   216		if resp.Body != nil {
   217			defer resp.Body.Close()
   218		}
   219	
   220		if resp.StatusCode != 200 {
   221			return fmt.Errorf("stat response had http status %d", resp.StatusCode)
   222		}
   223	
   224		stat, err := parseStatResponse(resp)
   225		if err != nil {
   226			return err
   227		}
   228		for _, sb := range stat.HaveMap {
   229			if err := fn(sb); err != nil {
   230				return err
   231			}
   232		}
   233		return nil
   234	}
   235	
   236	// Figure out the size of the contents.
   237	// If the size was provided, trust it.
   238	func (h *UploadHandle) readerAndSize() (io.Reader, int64, error) {
   239		if h.Size > 0 {
   240			return h.Contents, int64(h.Size), nil
   241		}
   242		var b bytes.Buffer
   243		n, err := io.Copy(&b, h.Contents)
   244		if err != nil {
   245			return nil, 0, err
   246		}
   247		return &b, n, nil
   248	}
   249	
   250	// Upload uploads a blob, as described by the provided UploadHandle parameters.
   251	func (c *Client) Upload(ctx context.Context, h *UploadHandle) (*PutResult, error) {
   252		errorf := func(msg string, arg ...interface{}) (*PutResult, error) {
   253			err := fmt.Errorf(msg, arg...)
   254			c.printf("%v", err)
   255			return nil, err
   256		}
   257	
   258		bodyReader, bodySize, err := h.readerAndSize()
   259		if err != nil {
   260			return nil, fmt.Errorf("client: error slurping upload handle to find its length: %v", err)
   261		}
   262		if bodySize > constants.MaxBlobSize {
   263			return nil, errors.New("client: body is bigger then max blob size")
   264		}
   265	
   266		c.statsMutex.Lock()
   267		c.stats.UploadRequests.Blobs++
   268		c.stats.UploadRequests.Bytes += bodySize
   269		c.statsMutex.Unlock()
   270	
   271		pr := &PutResult{BlobRef: h.BlobRef, Size: uint32(bodySize)}
   272	
   273		if c.sto != nil {
   274			// TODO: stat first so we can show skipped?
   275			_, err := blobserver.Receive(ctx, c.sto, h.BlobRef, bodyReader)
   276			if err != nil {
   277				return nil, err
   278			}
   279			return pr, nil
   280		}
   281	
   282		if !h.Vivify {
   283			if _, ok := c.haveCache.StatBlobCache(h.BlobRef); ok {
   284				pr.Skipped = true
   285				return pr, nil
   286			}
   287		}
   288	
   289		blobrefStr := h.BlobRef.String()
   290	
   291		// Pre-upload. Check whether the blob already exists on the
   292		// server and if not, the URL to upload it to.
   293		pfx, err := c.prefix()
   294		if err != nil {
   295			return nil, err
   296		}
   297	
   298		if !h.SkipStat {
   299			url_ := fmt.Sprintf("%s/camli/stat", pfx)
   300			req := c.newRequest(ctx, "POST", url_, strings.NewReader("camliversion=1&blob1="+blobrefStr))
   301			req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
   302	
   303			resp, err := c.doReqGated(req)
   304			if err != nil {
   305				return errorf("stat http error: %w", err)
   306			}
   307			defer resp.Body.Close()
   308	
   309			if resp.StatusCode != 200 {
   310				return errorf("stat response had http status %d", resp.StatusCode)
   311			}
   312	
   313			stat, err := parseStatResponse(resp)
   314			if err != nil {
   315				return nil, err
   316			}
   317			for _, sbr := range stat.HaveMap {
   318				c.haveCache.NoteBlobExists(sbr.Ref, uint32(sbr.Size))
   319			}
   320			_, serverHasIt := stat.HaveMap[blobrefStr]
   321			if env.DebugUploads() {
   322				log.Printf("HTTP Stat(%s) = %v", blobrefStr, serverHasIt)
   323			}
   324			if !h.Vivify && serverHasIt {
   325				pr.Skipped = true
   326				if closer, ok := h.Contents.(io.Closer); ok {
   327					// TODO(bradfitz): I did this
   328					// Close-if-possible thing early on, before I
   329					// knew better.  Fix the callers instead, and
   330					// fix the docs.
   331					closer.Close()
   332				}
   333				c.haveCache.NoteBlobExists(h.BlobRef, uint32(bodySize))
   334				return pr, nil
   335			}
   336		}
   337	
   338		if env.DebugUploads() {
   339			log.Printf("Uploading: %s (%d bytes)", blobrefStr, bodySize)
   340		}
   341	
   342		pipeReader, pipeWriter := io.Pipe()
   343		multipartWriter := multipart.NewWriter(pipeWriter)
   344	
   345		copyResult := make(chan error, 1)
   346		go func() {
   347			defer pipeWriter.Close()
   348			part, err := multipartWriter.CreateFormFile(blobrefStr, blobrefStr)
   349			if err != nil {
   350				copyResult <- err
   351				return
   352			}
   353			_, err = io.Copy(part, bodyReader)
   354			if err == nil {
   355				err = multipartWriter.Close()
   356			}
   357			copyResult <- err
   358		}()
   359	
   360		uploadURL := fmt.Sprintf("%s/camli/upload", pfx)
   361		req := c.newRequest(ctx, "POST", uploadURL)
   362		req.Header.Set("Content-Type", multipartWriter.FormDataContentType())
   363		if h.Vivify {
   364			req.Header.Add("X-Camlistore-Vivify", "1")
   365		}
   366		req.Body = io.NopCloser(pipeReader)
   367		req.ContentLength = getMultipartOverhead() + bodySize + int64(len(blobrefStr))*2
   368		resp, err := c.doReqGated(req)
   369		if err != nil {
   370			return errorf("upload http error: %w", err)
   371		}
   372		defer resp.Body.Close()
   373	
   374		// check error from earlier copy
   375		if err := <-copyResult; err != nil {
   376			return errorf("failed to copy contents into multipart writer: %w", err)
   377		}
   378	
   379		// The only valid HTTP responses are 200 and 303.
   380		if resp.StatusCode != 200 && resp.StatusCode != http.StatusSeeOther {
   381			return errorf("invalid http response %d in upload response", resp.StatusCode)
   382		}
   383	
   384		if resp.StatusCode == http.StatusSeeOther {
   385			otherLocation := resp.Header.Get("Location")
   386			if otherLocation == "" {
   387				return errorf("303 without a Location")
   388			}
   389			baseURL, _ := url.Parse(uploadURL)
   390			absURL, err := baseURL.Parse(otherLocation)
   391			if err != nil {
   392				return errorf("303 Location URL relative resolve error: %w", err)
   393			}
   394			otherLocation = absURL.String()
   395			resp, err = http.Get(otherLocation)
   396			if err != nil {
   397				return errorf("error following 303 redirect after upload: %w", err)
   398			}
   399		}
   400	
   401		var ures protocol.UploadResponse
   402		if err := httputil.DecodeJSON(resp, &ures); err != nil {
   403			return errorf("error in upload response: %w", err)
   404		}
   405	
   406		if ures.ErrorText != "" {
   407			c.printf("Blob server reports error: %s", ures.ErrorText)
   408		}
   409	
   410		expectedSize := uint32(bodySize)
   411	
   412		for _, sb := range ures.Received {
   413			if sb.Ref != h.BlobRef {
   414				continue
   415			}
   416			if sb.Size != expectedSize {
   417				return errorf("Server got blob %v, but reports wrong length (%v; we sent %d)",
   418					sb.Ref, sb.Size, expectedSize)
   419			}
   420			c.statsMutex.Lock()
   421			c.stats.Uploads.Blobs++
   422			c.stats.Uploads.Bytes += bodySize
   423			c.statsMutex.Unlock()
   424			if pr.Size <= 0 {
   425				pr.Size = sb.Size
   426			}
   427			c.haveCache.NoteBlobExists(pr.BlobRef, pr.Size)
   428			return pr, nil
   429		}
   430	
   431		return nil, errors.New("server didn't receive blob")
   432	}
   433	
   434	// FileUploadOptions is optionally provided to UploadFile.
   435	type FileUploadOptions struct {
   436		// FileInfo optionally specifies the FileInfo to populate the schema of the file blob.
   437		FileInfo os.FileInfo
   438		// WholeRef optionally specifies the digest of the uploaded file contents, which
   439		// allows UploadFile to skip computing the digest (needed to check if the contents
   440		// are already on the server).
   441		WholeRef blob.Ref
   442	}
   443	
   444	// UploadFile uploads the contents of the file, as well as a file blob with
   445	// filename for these contents. If the contents or the file blob are found on
   446	// the server, they're not uploaded.
   447	//
   448	// Note: this method is still a work in progress, and might change to accommodate
   449	// the needs of pk-put file.
   450	func (c *Client) UploadFile(ctx context.Context, filename string, contents io.Reader, opts *FileUploadOptions) (blob.Ref, error) {
   451		fileMap := schema.NewFileMap(filename)
   452		if opts != nil && opts.FileInfo != nil {
   453			fileMap = schema.NewCommonFileMap(filename, opts.FileInfo)
   454			modTime := opts.FileInfo.ModTime()
   455			if !modTime.IsZero() {
   456				fileMap.SetModTime(modTime)
   457			}
   458		}
   459		fileMap.SetType(schema.TypeFile)
   460	
   461		var wholeRef []blob.Ref
   462		if opts != nil && opts.WholeRef.Valid() {
   463			wholeRef = append(wholeRef, opts.WholeRef)
   464		} else {
   465			var buf bytes.Buffer
   466			var err error
   467			wholeRef, err = c.wholeRef(io.TeeReader(contents, &buf))
   468			if err != nil {
   469				return blob.Ref{}, err
   470			}
   471			contents = io.MultiReader(&buf, contents)
   472		}
   473	
   474		fileRef, err := c.fileMapFromDuplicate(ctx, fileMap, wholeRef)
   475		if err != nil {
   476			return blob.Ref{}, err
   477		}
   478		if fileRef.Valid() {
   479			return fileRef, nil
   480		}
   481	
   482		return schema.WriteFileMap(ctx, c, fileMap, contents)
   483	}
   484	
   485	// TODO(mpl): replace up.wholeFileDigest in pk-put with c.wholeRef maybe.
   486	
   487	// wholeRef returns the blob ref(s) of the regular file's contents
   488	// as if it were one entire blob (ignoring blob size limits).
   489	// By default, only one ref is returned, unless the server has advertised
   490	// that it has indexes calculated for other hash functions.
   491	func (c *Client) wholeRef(contents io.Reader) ([]blob.Ref, error) {
   492		hasLegacySHA1, err := c.HasLegacySHA1()
   493		if err != nil {
   494			return nil, fmt.Errorf("cannot discover if server has legacy sha1: %v", err)
   495		}
   496		td := hashutil.NewTrackDigestReader(contents)
   497		td.DoLegacySHA1 = hasLegacySHA1
   498		if _, err := io.Copy(io.Discard, td); err != nil {
   499			return nil, err
   500		}
   501		refs := []blob.Ref{blob.RefFromHash(td.Hash())}
   502		if td.DoLegacySHA1 {
   503			refs = append(refs, blob.RefFromHash(td.LegacySHA1Hash()))
   504		}
   505		return refs, nil
   506	}
   507	
   508	// fileMapFromDuplicate queries the server's search interface for an
   509	// existing file blob for the file contents any of wholeRef.
   510	// If the server has it, it's validated, and then fileMap (which must
   511	// already be partially populated) has its "parts" field populated,
   512	// and then fileMap is uploaded (if necessary).
   513	// If no file blob is found, a zero blob.Ref (and no error) is returned.
   514	func (c *Client) fileMapFromDuplicate(ctx context.Context, fileMap *schema.Builder, wholeRef []blob.Ref) (blob.Ref, error) {
   515		dupFileRef, err := c.SearchExistingFileSchema(ctx, wholeRef...)
   516		if err != nil {
   517			return blob.Ref{}, err
   518		}
   519		if !dupFileRef.Valid() {
   520			// because SearchExistingFileSchema returns blob.Ref{}, nil when file is not found.
   521			return blob.Ref{}, nil
   522		}
   523		dupMap, err := c.FetchSchemaBlob(ctx, dupFileRef)
   524		if err != nil {
   525			return blob.Ref{}, fmt.Errorf("could not find existing file blob for wholeRef %q: %v", wholeRef, err)
   526		}
   527		fileMap.PopulateParts(dupMap.PartsSize(), dupMap.ByteParts())
   528		json, err := fileMap.JSON()
   529		if err != nil {
   530			return blob.Ref{}, fmt.Errorf("could not write file map for wholeRef %q: %v", wholeRef, err)
   531		}
   532		bref := blob.RefFromString(json)
   533		if bref == dupFileRef {
   534			// Unchanged (same filename, modtime, JSON serialization, etc)
   535			// Different signer (e.g. existing file has a sha1 signer, and
   536			// we're now using a sha224 signer) means we upload a new file schema.
   537			return dupFileRef, nil
   538		}
   539		sbr, err := c.ReceiveBlob(ctx, bref, strings.NewReader(json))
   540		if err != nil {
   541			return blob.Ref{}, err
   542		}
   543		return sbr.Ref, nil
   544	}
Website layout inspired by memcached.
Content by the authors.