Home Download Docs Code Community
     1	/*
     2	Copyright 2011 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	package client
    18	
    19	import (
    20		"bytes"
    21		"context"
    22		"errors"
    23		"fmt"
    24		"io"
    25		"math"
    26		"net/http"
    27		"os"
    28		"regexp"
    29	
    30		"perkeep.org/pkg/blob"
    31		"perkeep.org/pkg/blobserver"
    32		"perkeep.org/pkg/constants"
    33		"perkeep.org/pkg/schema"
    34	
    35		"go4.org/readerutil"
    36		"go4.org/types"
    37	)
    38	
    39	func (c *Client) FetchSchemaBlob(ctx context.Context, b blob.Ref) (*schema.Blob, error) {
    40		rc, _, err := c.Fetch(ctx, b)
    41		if err != nil {
    42			return nil, err
    43		}
    44		defer rc.Close()
    45		return schema.BlobFromReader(b, rc)
    46	}
    47	
    48	func (c *Client) Fetch(ctx context.Context, b blob.Ref) (io.ReadCloser, uint32, error) {
    49		return c.fetchVia(ctx, b, c.viaPathTo(b))
    50	}
    51	
    52	func (c *Client) viaPathTo(b blob.Ref) (path []blob.Ref) {
    53		c.viaMu.RLock()
    54		defer c.viaMu.RUnlock()
    55		// Append path backwards first,
    56		key := b
    57		for {
    58			v, ok := c.via[key]
    59			if !ok {
    60				break
    61			}
    62			key = v
    63			path = append(path, key)
    64		}
    65		// Then reverse it
    66		for i := 0; i < len(path)/2; i++ {
    67			path[i], path[len(path)-i-1] = path[len(path)-i-1], path[i]
    68		}
    69		return
    70	}
    71	
    72	var blobsRx = regexp.MustCompile(blob.Pattern)
    73	
    74	func (c *Client) fetchVia(ctx context.Context, b blob.Ref, v []blob.Ref) (body io.ReadCloser, size uint32, err error) {
    75		if c.sto != nil {
    76			if len(v) > 0 {
    77				return nil, 0, errors.New("FetchVia not supported in non-HTTP mode")
    78			}
    79			return c.sto.Fetch(ctx, b)
    80		}
    81		pfx, err := c.blobPrefix()
    82		if err != nil {
    83			return nil, 0, err
    84		}
    85		url := fmt.Sprintf("%s/%s", pfx, b)
    86	
    87		if len(v) > 0 {
    88			buf := bytes.NewBufferString(url)
    89			buf.WriteString("?via=")
    90			for i, br := range v {
    91				if i != 0 {
    92					buf.WriteString(",")
    93				}
    94				buf.WriteString(br.String())
    95			}
    96			url = buf.String()
    97		}
    98	
    99		req := c.newRequest(ctx, "GET", url)
   100		resp, err := c.httpClient.Do(req)
   101		if err != nil {
   102			return nil, 0, err
   103		}
   104		defer func() {
   105			if err != nil {
   106				resp.Body.Close()
   107			}
   108		}()
   109		if resp.StatusCode == http.StatusNotFound {
   110			// Per blob.Fetcher contract:
   111			return nil, 0, os.ErrNotExist
   112		}
   113		if resp.StatusCode != http.StatusOK {
   114			return nil, 0, fmt.Errorf("Got status code %d from blobserver for %s", resp.StatusCode, b)
   115		}
   116	
   117		var reader io.Reader = resp.Body
   118		var closer io.Closer = resp.Body
   119		if resp.ContentLength > 0 {
   120			if resp.ContentLength > math.MaxUint32 {
   121				return nil, 0, fmt.Errorf("Blob %s over %d bytes", b, uint32(math.MaxUint32))
   122			}
   123			size = uint32(resp.ContentLength)
   124		} else {
   125			var buf bytes.Buffer
   126			size = 0
   127			// Might be compressed. Slurp it to memory.
   128			n, err := io.CopyN(&buf, resp.Body, constants.MaxBlobSize+1)
   129			if n > blobserver.MaxBlobSize {
   130				return nil, 0, fmt.Errorf("Blob %s over %d bytes; not reading more", b, blobserver.MaxBlobSize)
   131			}
   132			if err == nil {
   133				panic("unexpected")
   134			} else if err == io.EOF {
   135				size = uint32(n)
   136				reader, closer = &buf, types.NopCloser
   137			} else {
   138				return nil, 0, fmt.Errorf("Error reading %s: %w", b, err)
   139			}
   140		}
   141	
   142		var buf bytes.Buffer
   143		if err := c.UpdateShareChain(b, io.TeeReader(reader, &buf)); err != nil {
   144			if err != ErrNotSharing {
   145				return nil, 0, err
   146			}
   147		}
   148		mr := io.MultiReader(&buf, reader)
   149		var rc io.ReadCloser = struct {
   150			io.Reader
   151			io.Closer
   152		}{mr, closer}
   153	
   154		return rc, size, nil
   155	}
   156	
   157	// ErrNotSharing is returned when a client that was not created with
   158	// NewFromShareRoot tries to access shared blobs.
   159	var ErrNotSharing = errors.New("client can not deal with shared blobs. Create it with NewFromShareRoot")
   160	
   161	// UpdateShareChain reads the schema of b from r, and instructs the client that
   162	// all blob refs found in this schema should use b as a preceding chain link, in
   163	// all subsequent shared blobs fetches. If the client was not created with
   164	// NewFromShareRoot, ErrNotSharing is returned.
   165	func (c *Client) UpdateShareChain(b blob.Ref, r io.Reader) error {
   166		c.viaMu.Lock()
   167		defer c.viaMu.Unlock()
   168		if c.via == nil {
   169			// Not in sharing mode, so return immediately.
   170			return ErrNotSharing
   171		}
   172		// Slurp 1 MB to find references to other blobrefs for the via path.
   173		var buf bytes.Buffer
   174		const maxSlurp = 1 << 20
   175		if _, err := io.Copy(&buf, io.LimitReader(r, maxSlurp)); err != nil {
   176			return err
   177		}
   178		// If it looks like a JSON schema blob (starts with '{')
   179		if schema.LikelySchemaBlob(buf.Bytes()) {
   180			for _, blobstr := range blobsRx.FindAllString(buf.String(), -1) {
   181				br, ok := blob.Parse(blobstr)
   182				if !ok {
   183					c.printf("Invalid blob ref %q noticed in schema of %v", blobstr, b)
   184					continue
   185				}
   186				c.via[br] = b
   187			}
   188		}
   189		return nil
   190	}
   191	
   192	func (c *Client) ReceiveBlob(ctx context.Context, br blob.Ref, source io.Reader) (blob.SizedRef, error) {
   193		if c.sto != nil {
   194			return blobserver.Receive(ctx, c.sto, br, source)
   195		}
   196		size, ok := readerutil.Size(source)
   197		if !ok {
   198			size = 0
   199		}
   200		h := &UploadHandle{
   201			BlobRef:  br,
   202			Size:     uint32(size), // 0 if we don't know
   203			Contents: source,
   204			SkipStat: true,
   205		}
   206		pr, err := c.Upload(ctx, h)
   207		if err != nil {
   208			return blob.SizedRef{}, err
   209		}
   210		return pr.SizedBlobRef(), nil
   211	}
Website layout inspired by memcached.
Content by the authors.