Home Download Docs Code Community
     1	/*
     2	Copyright 2011 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	package client
    18	
    19	import (
    20		"bytes"
    21		"context"
    22		"errors"
    23		"fmt"
    24		"io"
    25		"io/ioutil"
    26		"log"
    27		"mime/multipart"
    28		"net/http"
    29		"net/url"
    30		"os"
    31		"strings"
    32		"sync"
    33		"time"
    34	
    35		"perkeep.org/internal/hashutil"
    36		"perkeep.org/internal/httputil"
    37		"perkeep.org/pkg/blob"
    38		"perkeep.org/pkg/blobserver"
    39		"perkeep.org/pkg/blobserver/protocol"
    40		"perkeep.org/pkg/constants"
    41		"perkeep.org/pkg/env"
    42		"perkeep.org/pkg/schema"
    43	)
    44	
    45	// UploadHandle contains the parameters is a request to upload a blob.
    46	type UploadHandle struct {
    47		// BlobRef is the required blobref of the blob to upload.
    48		BlobRef blob.Ref
    49	
    50		// Contents is the blob data.
    51		Contents io.Reader
    52	
    53		// Size optionally specifies the size of Contents.
    54		// If <= 0, the Contents are slurped into memory to count the size.
    55		Size uint32
    56	
    57		// Vivify optionally instructs the server to create a
    58		// permanode for this blob. If used, the blob should be a
    59		// "file" schema blob. This is typically used by
    60		// lesser-trusted clients (such a mobile phones) which don't
    61		// have rights to do signing directly.
    62		Vivify bool
    63	
    64		// SkipStat indicates whether the stat check (checking whether
    65		// the server already has the blob) will be skipped and the
    66		// blob should be uploaded immediately. This is useful for
    67		// small blobs that the server is unlikely to already have
    68		// (e.g. new claims).
    69		SkipStat bool
    70	}
    71	
    72	type PutResult struct {
    73		BlobRef blob.Ref
    74		Size    uint32
    75		Skipped bool // already present on blobserver
    76	}
    77	
    78	func (pr *PutResult) SizedBlobRef() blob.SizedRef {
    79		return blob.SizedRef{Ref: pr.BlobRef, Size: pr.Size}
    80	}
    81	
    82	// TODO: ditch this type and use protocol.StatResponse directly?
    83	// Or at least make HaveMap keyed by a blob.Ref instead of a string.
    84	type statResponse struct {
    85		HaveMap     map[string]blob.SizedRef
    86		canLongPoll bool
    87	}
    88	
    89	type ResponseFormatError error
    90	
    91	var (
    92		multipartOnce     sync.Once
    93		multipartOverhead int64
    94	)
    95	
    96	// multipartOverhead is how many extra bytes mime/multipart's
    97	// Writer adds around content
    98	func getMultipartOverhead() int64 {
    99		multipartOnce.Do(func() {
   100			var b bytes.Buffer
   101			w := multipart.NewWriter(&b)
   102			part, _ := w.CreateFormFile("0", "0")
   103	
   104			dummyContents := []byte("0")
   105			part.Write(dummyContents)
   106	
   107			w.Close()
   108			multipartOverhead = int64(b.Len()) - 3 // remove what was added
   109		})
   110		return multipartOverhead
   111	}
   112	
   113	func parseStatResponse(res *http.Response) (*statResponse, error) {
   114		var s = &statResponse{HaveMap: make(map[string]blob.SizedRef)}
   115		var pres protocol.StatResponse
   116		if err := httputil.DecodeJSON(res, &pres); err != nil {
   117			return nil, ResponseFormatError(err)
   118		}
   119	
   120		s.canLongPoll = pres.CanLongPoll
   121		for _, statItem := range pres.Stat {
   122			br := statItem.Ref
   123			if !br.Valid() {
   124				continue
   125			}
   126			s.HaveMap[br.String()] = blob.SizedRef{Ref: br, Size: uint32(statItem.Size)}
   127		}
   128		return s, nil
   129	}
   130	
   131	// NewUploadHandleFromString returns an upload handle
   132	func NewUploadHandleFromString(data string) *UploadHandle {
   133		bref := blob.RefFromString(data)
   134		r := strings.NewReader(data)
   135		return &UploadHandle{BlobRef: bref, Size: uint32(len(data)), Contents: r}
   136	}
   137	
   138	// TODO(bradfitz): delete most of this. use new camlistore.org/pkg/blobserver/protocol types instead
   139	// of a map[string]interface{}.
   140	func (c *Client) responseJSONMap(requestName string, resp *http.Response) (map[string]interface{}, error) {
   141		if resp.StatusCode != 200 {
   142			c.printf("After %s request, failed to JSON from response; status code is %d", requestName, resp.StatusCode)
   143			io.Copy(os.Stderr, resp.Body)
   144			return nil, fmt.Errorf("after %s request, HTTP response code is %d; no JSON to parse", requestName, resp.StatusCode)
   145		}
   146		jmap := make(map[string]interface{})
   147		if err := httputil.DecodeJSON(resp, &jmap); err != nil {
   148			return nil, err
   149		}
   150		return jmap, nil
   151	}
   152	
   153	func (c *Client) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
   154		if c.sto != nil {
   155			return c.sto.StatBlobs(ctx, blobs, fn)
   156		}
   157		var needStat []blob.Ref
   158		for _, br := range blobs {
   159			if !br.Valid() {
   160				panic("invalid blob")
   161			}
   162			if size, ok := c.haveCache.StatBlobCache(br); ok {
   163				if err := fn(blob.SizedRef{Ref: br, Size: size}); err != nil {
   164					return err
   165				}
   166			} else {
   167				needStat = append(needStat, br)
   168			}
   169		}
   170		if len(needStat) == 0 {
   171			return nil
   172		}
   173		return blobserver.StatBlobsParallelHelper(ctx, blobs, fn, c.httpGate, func(br blob.Ref) (workerSB blob.SizedRef, err error) {
   174			err = c.doStat(ctx, []blob.Ref{br}, 0, false, func(sb blob.SizedRef) error {
   175				workerSB = sb
   176				c.haveCache.NoteBlobExists(sb.Ref, sb.Size)
   177				return fn(sb)
   178			})
   179			return
   180		})
   181	}
   182	
   183	// doStat does an HTTP request for the stat. the number of blobs is used verbatim. No extra splitting
   184	// or batching is done at this layer.
   185	// The semantics are the same as blobserver.BlobStatter.
   186	// gate controls whether it uses httpGate to pause on requests.
   187	func (c *Client) doStat(ctx context.Context, blobs []blob.Ref, wait time.Duration, gated bool, fn func(blob.SizedRef) error) error {
   188		var buf bytes.Buffer
   189		fmt.Fprintf(&buf, "camliversion=1")
   190		if wait > 0 {
   191			secs := int(wait.Seconds())
   192			if secs == 0 {
   193				secs = 1
   194			}
   195			fmt.Fprintf(&buf, "&maxwaitsec=%d", secs)
   196		}
   197		for i, blob := range blobs {
   198			fmt.Fprintf(&buf, "&blob%d=%s", i+1, blob)
   199		}
   200	
   201		pfx, err := c.prefix()
   202		if err != nil {
   203			return err
   204		}
   205		req := c.newRequest(ctx, "POST", fmt.Sprintf("%s/camli/stat", pfx), &buf)
   206		req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
   207	
   208		var resp *http.Response
   209		if gated {
   210			resp, err = c.doReqGated(req)
   211		} else {
   212			resp, err = c.httpClient.Do(req)
   213		}
   214		if err != nil {
   215			return fmt.Errorf("stat HTTP error: %v", err)
   216		}
   217		if resp.Body != nil {
   218			defer resp.Body.Close()
   219		}
   220	
   221		if resp.StatusCode != 200 {
   222			return fmt.Errorf("stat response had http status %d", resp.StatusCode)
   223		}
   224	
   225		stat, err := parseStatResponse(resp)
   226		if err != nil {
   227			return err
   228		}
   229		for _, sb := range stat.HaveMap {
   230			if err := fn(sb); err != nil {
   231				return err
   232			}
   233		}
   234		return nil
   235	}
   236	
   237	// Figure out the size of the contents.
   238	// If the size was provided, trust it.
   239	func (h *UploadHandle) readerAndSize() (io.Reader, int64, error) {
   240		if h.Size > 0 {
   241			return h.Contents, int64(h.Size), nil
   242		}
   243		var b bytes.Buffer
   244		n, err := io.Copy(&b, h.Contents)
   245		if err != nil {
   246			return nil, 0, err
   247		}
   248		return &b, n, nil
   249	}
   250	
   251	// Upload uploads a blob, as described by the provided UploadHandle parameters.
   252	func (c *Client) Upload(ctx context.Context, h *UploadHandle) (*PutResult, error) {
   253		errorf := func(msg string, arg ...interface{}) (*PutResult, error) {
   254			err := fmt.Errorf(msg, arg...)
   255			c.printf("%v", err)
   256			return nil, err
   257		}
   258	
   259		bodyReader, bodySize, err := h.readerAndSize()
   260		if err != nil {
   261			return nil, fmt.Errorf("client: error slurping upload handle to find its length: %v", err)
   262		}
   263		if bodySize > constants.MaxBlobSize {
   264			return nil, errors.New("client: body is bigger then max blob size")
   265		}
   266	
   267		c.statsMutex.Lock()
   268		c.stats.UploadRequests.Blobs++
   269		c.stats.UploadRequests.Bytes += bodySize
   270		c.statsMutex.Unlock()
   271	
   272		pr := &PutResult{BlobRef: h.BlobRef, Size: uint32(bodySize)}
   273	
   274		if c.sto != nil {
   275			// TODO: stat first so we can show skipped?
   276			_, err := blobserver.Receive(ctx, c.sto, h.BlobRef, bodyReader)
   277			if err != nil {
   278				return nil, err
   279			}
   280			return pr, nil
   281		}
   282	
   283		if !h.Vivify {
   284			if _, ok := c.haveCache.StatBlobCache(h.BlobRef); ok {
   285				pr.Skipped = true
   286				return pr, nil
   287			}
   288		}
   289	
   290		blobrefStr := h.BlobRef.String()
   291	
   292		// Pre-upload. Check whether the blob already exists on the
   293		// server and if not, the URL to upload it to.
   294		pfx, err := c.prefix()
   295		if err != nil {
   296			return nil, err
   297		}
   298	
   299		if !h.SkipStat {
   300			url_ := fmt.Sprintf("%s/camli/stat", pfx)
   301			req := c.newRequest(ctx, "POST", url_, strings.NewReader("camliversion=1&blob1="+blobrefStr))
   302			req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
   303	
   304			resp, err := c.doReqGated(req)
   305			if err != nil {
   306				return errorf("stat http error: %v", err)
   307			}
   308			defer resp.Body.Close()
   309	
   310			if resp.StatusCode != 200 {
   311				return errorf("stat response had http status %d", resp.StatusCode)
   312			}
   313	
   314			stat, err := parseStatResponse(resp)
   315			if err != nil {
   316				return nil, err
   317			}
   318			for _, sbr := range stat.HaveMap {
   319				c.haveCache.NoteBlobExists(sbr.Ref, uint32(sbr.Size))
   320			}
   321			_, serverHasIt := stat.HaveMap[blobrefStr]
   322			if env.DebugUploads() {
   323				log.Printf("HTTP Stat(%s) = %v", blobrefStr, serverHasIt)
   324			}
   325			if !h.Vivify && serverHasIt {
   326				pr.Skipped = true
   327				if closer, ok := h.Contents.(io.Closer); ok {
   328					// TODO(bradfitz): I did this
   329					// Close-if-possible thing early on, before I
   330					// knew better.  Fix the callers instead, and
   331					// fix the docs.
   332					closer.Close()
   333				}
   334				c.haveCache.NoteBlobExists(h.BlobRef, uint32(bodySize))
   335				return pr, nil
   336			}
   337		}
   338	
   339		if env.DebugUploads() {
   340			log.Printf("Uploading: %s (%d bytes)", blobrefStr, bodySize)
   341		}
   342	
   343		pipeReader, pipeWriter := io.Pipe()
   344		multipartWriter := multipart.NewWriter(pipeWriter)
   345	
   346		copyResult := make(chan error, 1)
   347		go func() {
   348			defer pipeWriter.Close()
   349			part, err := multipartWriter.CreateFormFile(blobrefStr, blobrefStr)
   350			if err != nil {
   351				copyResult <- err
   352				return
   353			}
   354			_, err = io.Copy(part, bodyReader)
   355			if err == nil {
   356				err = multipartWriter.Close()
   357			}
   358			copyResult <- err
   359		}()
   360	
   361		uploadURL := fmt.Sprintf("%s/camli/upload", pfx)
   362		req := c.newRequest(ctx, "POST", uploadURL)
   363		req.Header.Set("Content-Type", multipartWriter.FormDataContentType())
   364		if h.Vivify {
   365			req.Header.Add("X-Camlistore-Vivify", "1")
   366		}
   367		req.Body = ioutil.NopCloser(pipeReader)
   368		req.ContentLength = getMultipartOverhead() + bodySize + int64(len(blobrefStr))*2
   369		resp, err := c.doReqGated(req)
   370		if err != nil {
   371			return errorf("upload http error: %v", err)
   372		}
   373		defer resp.Body.Close()
   374	
   375		// check error from earlier copy
   376		if err := <-copyResult; err != nil {
   377			return errorf("failed to copy contents into multipart writer: %v", err)
   378		}
   379	
   380		// The only valid HTTP responses are 200 and 303.
   381		if resp.StatusCode != 200 && resp.StatusCode != 303 {
   382			return errorf("invalid http response %d in upload response", resp.StatusCode)
   383		}
   384	
   385		if resp.StatusCode == 303 {
   386			otherLocation := resp.Header.Get("Location")
   387			if otherLocation == "" {
   388				return errorf("303 without a Location")
   389			}
   390			baseURL, _ := url.Parse(uploadURL)
   391			absURL, err := baseURL.Parse(otherLocation)
   392			if err != nil {
   393				return errorf("303 Location URL relative resolve error: %v", err)
   394			}
   395			otherLocation = absURL.String()
   396			resp, err = http.Get(otherLocation)
   397			if err != nil {
   398				return errorf("error following 303 redirect after upload: %v", err)
   399			}
   400		}
   401	
   402		var ures protocol.UploadResponse
   403		if err := httputil.DecodeJSON(resp, &ures); err != nil {
   404			return errorf("error in upload response: %v", err)
   405		}
   406	
   407		if ures.ErrorText != "" {
   408			c.printf("Blob server reports error: %s", ures.ErrorText)
   409		}
   410	
   411		expectedSize := uint32(bodySize)
   412	
   413		for _, sb := range ures.Received {
   414			if sb.Ref != h.BlobRef {
   415				continue
   416			}
   417			if sb.Size != expectedSize {
   418				return errorf("Server got blob %v, but reports wrong length (%v; we sent %d)",
   419					sb.Ref, sb.Size, expectedSize)
   420			}
   421			c.statsMutex.Lock()
   422			c.stats.Uploads.Blobs++
   423			c.stats.Uploads.Bytes += bodySize
   424			c.statsMutex.Unlock()
   425			if pr.Size <= 0 {
   426				pr.Size = sb.Size
   427			}
   428			c.haveCache.NoteBlobExists(pr.BlobRef, pr.Size)
   429			return pr, nil
   430		}
   431	
   432		return nil, errors.New("server didn't receive blob")
   433	}
   434	
   435	// FileUploadOptions is optionally provided to UploadFile.
   436	type FileUploadOptions struct {
   437		// FileInfo optionally specifies the FileInfo to populate the schema of the file blob.
   438		FileInfo os.FileInfo
   439		// WholeRef optionally specifies the digest of the uploaded file contents, which
   440		// allows UploadFile to skip computing the digest (needed to check if the contents
   441		// are already on the server).
   442		WholeRef blob.Ref
   443	}
   444	
   445	// UploadFile uploads the contents of the file, as well as a file blob with
   446	// filename for these contents. If the contents or the file blob are found on
   447	// the server, they're not uploaded.
   448	//
   449	// Note: this method is still a work in progress, and might change to accommodate
   450	// the needs of pk-put file.
   451	func (c *Client) UploadFile(ctx context.Context, filename string, contents io.Reader, opts *FileUploadOptions) (blob.Ref, error) {
   452		fileMap := schema.NewFileMap(filename)
   453		if opts != nil && opts.FileInfo != nil {
   454			fileMap = schema.NewCommonFileMap(filename, opts.FileInfo)
   455			modTime := opts.FileInfo.ModTime()
   456			if !modTime.IsZero() {
   457				fileMap.SetModTime(modTime)
   458			}
   459		}
   460		fileMap.SetType(schema.TypeFile)
   461	
   462		var wholeRef []blob.Ref
   463		if opts != nil && opts.WholeRef.Valid() {
   464			wholeRef = append(wholeRef, opts.WholeRef)
   465		} else {
   466			var buf bytes.Buffer
   467			var err error
   468			wholeRef, err = c.wholeRef(io.TeeReader(contents, &buf))
   469			if err != nil {
   470				return blob.Ref{}, err
   471			}
   472			contents = io.MultiReader(&buf, contents)
   473		}
   474	
   475		fileRef, err := c.fileMapFromDuplicate(ctx, fileMap, wholeRef)
   476		if err != nil {
   477			return blob.Ref{}, err
   478		}
   479		if fileRef.Valid() {
   480			return fileRef, nil
   481		}
   482	
   483		return schema.WriteFileMap(ctx, c, fileMap, contents)
   484	}
   485	
   486	// TODO(mpl): replace up.wholeFileDigest in pk-put with c.wholeRef maybe.
   487	
   488	// wholeRef returns the blob ref(s) of the regular file's contents
   489	// as if it were one entire blob (ignoring blob size limits).
   490	// By default, only one ref is returned, unless the server has advertised
   491	// that it has indexes calculated for other hash functions.
   492	func (c *Client) wholeRef(contents io.Reader) ([]blob.Ref, error) {
   493		hasLegacySHA1, err := c.HasLegacySHA1()
   494		if err != nil {
   495			return nil, fmt.Errorf("cannot discover if server has legacy sha1: %v", err)
   496		}
   497		td := hashutil.NewTrackDigestReader(contents)
   498		td.DoLegacySHA1 = hasLegacySHA1
   499		if _, err := io.Copy(ioutil.Discard, td); err != nil {
   500			return nil, err
   501		}
   502		refs := []blob.Ref{blob.RefFromHash(td.Hash())}
   503		if td.DoLegacySHA1 {
   504			refs = append(refs, blob.RefFromHash(td.LegacySHA1Hash()))
   505		}
   506		return refs, nil
   507	}
   508	
   509	// fileMapFromDuplicate queries the server's search interface for an
   510	// existing file blob for the file contents any of wholeRef.
   511	// If the server has it, it's validated, and then fileMap (which must
   512	// already be partially populated) has its "parts" field populated,
   513	// and then fileMap is uploaded (if necessary).
   514	// If no file blob is found, a zero blob.Ref (and no error) is returned.
   515	func (c *Client) fileMapFromDuplicate(ctx context.Context, fileMap *schema.Builder, wholeRef []blob.Ref) (blob.Ref, error) {
   516		dupFileRef, err := c.SearchExistingFileSchema(ctx, wholeRef...)
   517		if err != nil {
   518			return blob.Ref{}, err
   519		}
   520		if !dupFileRef.Valid() {
   521			// because SearchExistingFileSchema returns blob.Ref{}, nil when file is not found.
   522			return blob.Ref{}, nil
   523		}
   524		dupMap, err := c.FetchSchemaBlob(ctx, dupFileRef)
   525		if err != nil {
   526			return blob.Ref{}, fmt.Errorf("could not find existing file blob for wholeRef %q: %v", wholeRef, err)
   527		}
   528		fileMap.PopulateParts(dupMap.PartsSize(), dupMap.ByteParts())
   529		json, err := fileMap.JSON()
   530		if err != nil {
   531			return blob.Ref{}, fmt.Errorf("could not write file map for wholeRef %q: %v", wholeRef, err)
   532		}
   533		bref := blob.RefFromString(json)
   534		if bref == dupFileRef {
   535			// Unchanged (same filename, modtime, JSON serialization, etc)
   536			// Different signer (e.g. existing file has a sha1 signer, and
   537			// we're now using a sha224 signer) means we upload a new file schema.
   538			return dupFileRef, nil
   539		}
   540		sbr, err := c.ReceiveBlob(ctx, bref, strings.NewReader(json))
   541		if err != nil {
   542			return blob.Ref{}, err
   543		}
   544		return sbr.Ref, nil
   545	}
Website layout inspired by memcached.
Content by the authors.