Home Download Docs Code Community
     1	/*
     2	Copyright 2011 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	package client
    18	
    19	import (
    20		"context"
    21		"errors"
    22		"fmt"
    23		"math"
    24		"net/url"
    25		"time"
    26	
    27		"perkeep.org/pkg/blob"
    28	)
    29	
    30	// EnumerateOpts are the options to Client.EnumerateBlobsOpts.
    31	type EnumerateOpts struct {
    32		After   string        // last blobref seen; start with ones greater than this
    33		MaxWait time.Duration // how long to poll for (second granularity), waiting for any blob, or 0 for no limit
    34		Limit   int           // if non-zero, the max blobs to return
    35	}
    36	
    37	// SimpleEnumerateBlobs sends all blobs to the provided channel.
    38	// The channel will be closed, regardless of whether an error is returned.
    39	func (c *Client) SimpleEnumerateBlobs(ctx context.Context, ch chan<- blob.SizedRef) error {
    40		return c.EnumerateBlobsOpts(ctx, ch, EnumerateOpts{})
    41	}
    42	
    43	func (c *Client) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
    44		if c.sto != nil {
    45			return c.sto.EnumerateBlobs(ctx, dest, after, limit)
    46		}
    47		if limit == 0 {
    48			c.printf("Warning: Client.EnumerateBlobs called with a limit of zero")
    49			close(dest)
    50			return nil
    51		}
    52		return c.EnumerateBlobsOpts(ctx, dest, EnumerateOpts{
    53			After: after,
    54			Limit: limit,
    55		})
    56	}
    57	
    58	const enumerateBatchSize = 1000
    59	
    60	// EnumerateBlobsOpts sends blobs to the provided channel, as directed by opts.
    61	// The channel will be closed, regardless of whether an error is returned.
    62	func (c *Client) EnumerateBlobsOpts(ctx context.Context, ch chan<- blob.SizedRef, opts EnumerateOpts) error {
    63		defer close(ch)
    64		if opts.After != "" && opts.MaxWait != 0 {
    65			return errors.New("client error: it's invalid to use enumerate After and MaxWaitSec together")
    66		}
    67		pfx, err := c.prefix()
    68		if err != nil {
    69			return err
    70		}
    71	
    72		error := func(msg string, e error) error {
    73			err := fmt.Errorf("client enumerate error: %s: %v", msg, e)
    74			c.printf("%v", err)
    75			return err
    76		}
    77	
    78		nSent := 0
    79		keepGoing := true
    80		after := opts.After
    81		for keepGoing {
    82			waitSec := 0
    83			if after == "" {
    84				if opts.MaxWait > 0 {
    85					waitSec = int(opts.MaxWait.Seconds())
    86					if waitSec == 0 {
    87						waitSec = 1
    88					}
    89				}
    90			}
    91			url_ := fmt.Sprintf("%s/camli/enumerate-blobs?after=%s&limit=%d&maxwaitsec=%d",
    92				pfx, url.QueryEscape(after), enumerateBatchSize, waitSec)
    93			req := c.newRequest(ctx, "GET", url_)
    94			resp, err := c.httpClient.Do(req)
    95			if err != nil {
    96				return error("http request", err)
    97			}
    98	
    99			json, err := c.responseJSONMap("enumerate-blobs", resp)
   100			if err != nil {
   101				return error("stat json parse error", err)
   102			}
   103	
   104			blobs, ok := getJSONMapArray(json, "blobs")
   105			if !ok {
   106				return error("response JSON didn't contain 'blobs' array", nil)
   107			}
   108			for _, v := range blobs {
   109				itemJSON, ok := v.(map[string]interface{})
   110				if !ok {
   111					return error("item in 'blobs' was malformed", nil)
   112				}
   113				blobrefStr, ok := getJSONMapString(itemJSON, "blobRef")
   114				if !ok {
   115					return error("item in 'blobs' was missing string 'blobRef'", nil)
   116				}
   117				size, ok := getJSONMapUint32(itemJSON, "size")
   118				if !ok {
   119					return error("item in 'blobs' was missing numeric 'size'", nil)
   120				}
   121				br, ok := blob.Parse(blobrefStr)
   122				if !ok {
   123					return error("item in 'blobs' had invalid blobref.", nil)
   124				}
   125				select {
   126				case ch <- blob.SizedRef{Ref: br, Size: uint32(size)}:
   127				case <-ctx.Done():
   128					return ctx.Err()
   129				}
   130				nSent++
   131				if opts.Limit == nSent {
   132					// nSent can't be zero at this point, so opts.Limit being 0
   133					// is okay.
   134					return nil
   135				}
   136			}
   137	
   138			after, keepGoing = getJSONMapString(json, "continueAfter")
   139		}
   140		return nil
   141	}
   142	
   143	func getJSONMapString(m map[string]interface{}, key string) (string, bool) {
   144		if v, ok := m[key]; ok {
   145			if s, ok := v.(string); ok {
   146				return s, true
   147			}
   148		}
   149		return "", false
   150	}
   151	
   152	func getJSONMapInt64(m map[string]interface{}, key string) (int64, bool) {
   153		if v, ok := m[key]; ok {
   154			if n, ok := v.(float64); ok {
   155				return int64(n), true
   156			}
   157		}
   158		return 0, false
   159	}
   160	
   161	func getJSONMapUint32(m map[string]interface{}, key string) (uint32, bool) {
   162		u, ok := getJSONMapInt64(m, key)
   163		if !ok {
   164			return 0, false
   165		}
   166		if u < 0 || u > math.MaxUint32 {
   167			return 0, false
   168		}
   169		return uint32(u), true
   170	}
   171	
   172	func getJSONMapArray(m map[string]interface{}, key string) ([]interface{}, bool) {
   173		if v, ok := m[key]; ok {
   174			if a, ok := v.([]interface{}); ok {
   175				return a, true
   176			}
   177		}
   178		return nil, false
   179	}
Website layout inspired by memcached.
Content by the authors.