1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package client
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "math"
24 "net/url"
25 "time"
26
27 "perkeep.org/pkg/blob"
28 )
29
30
31 type EnumerateOpts struct {
32 After string
33 MaxWait time.Duration
34 Limit int
35 }
36
37
38
39 func (c *Client) SimpleEnumerateBlobs(ctx context.Context, ch chan<- blob.SizedRef) error {
40 return c.EnumerateBlobsOpts(ctx, ch, EnumerateOpts{})
41 }
42
43 func (c *Client) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
44 if c.sto != nil {
45 return c.sto.EnumerateBlobs(ctx, dest, after, limit)
46 }
47 if limit == 0 {
48 c.printf("Warning: Client.EnumerateBlobs called with a limit of zero")
49 close(dest)
50 return nil
51 }
52 return c.EnumerateBlobsOpts(ctx, dest, EnumerateOpts{
53 After: after,
54 Limit: limit,
55 })
56 }
57
58 const enumerateBatchSize = 1000
59
60
61
62 func (c *Client) EnumerateBlobsOpts(ctx context.Context, ch chan<- blob.SizedRef, opts EnumerateOpts) error {
63 defer close(ch)
64 if opts.After != "" && opts.MaxWait != 0 {
65 return errors.New("client error: it's invalid to use enumerate After and MaxWaitSec together")
66 }
67 pfx, err := c.prefix()
68 if err != nil {
69 return err
70 }
71
72 error := func(msg string, e error) error {
73 err := fmt.Errorf("client enumerate error: %s: %v", msg, e)
74 c.printf("%v", err)
75 return err
76 }
77
78 nSent := 0
79 keepGoing := true
80 after := opts.After
81 for keepGoing {
82 waitSec := 0
83 if after == "" {
84 if opts.MaxWait > 0 {
85 waitSec = int(opts.MaxWait.Seconds())
86 if waitSec == 0 {
87 waitSec = 1
88 }
89 }
90 }
91 url_ := fmt.Sprintf("%s/camli/enumerate-blobs?after=%s&limit=%d&maxwaitsec=%d",
92 pfx, url.QueryEscape(after), enumerateBatchSize, waitSec)
93 req := c.newRequest(ctx, "GET", url_)
94 resp, err := c.httpClient.Do(req)
95 if err != nil {
96 return error("http request", err)
97 }
98
99 json, err := c.responseJSONMap("enumerate-blobs", resp)
100 if err != nil {
101 return error("stat json parse error", err)
102 }
103
104 blobs, ok := getJSONMapArray(json, "blobs")
105 if !ok {
106 return error("response JSON didn't contain 'blobs' array", nil)
107 }
108 for _, v := range blobs {
109 itemJSON, ok := v.(map[string]interface{})
110 if !ok {
111 return error("item in 'blobs' was malformed", nil)
112 }
113 blobrefStr, ok := getJSONMapString(itemJSON, "blobRef")
114 if !ok {
115 return error("item in 'blobs' was missing string 'blobRef'", nil)
116 }
117 size, ok := getJSONMapUint32(itemJSON, "size")
118 if !ok {
119 return error("item in 'blobs' was missing numeric 'size'", nil)
120 }
121 br, ok := blob.Parse(blobrefStr)
122 if !ok {
123 return error("item in 'blobs' had invalid blobref.", nil)
124 }
125 select {
126 case ch <- blob.SizedRef{Ref: br, Size: uint32(size)}:
127 case <-ctx.Done():
128 return ctx.Err()
129 }
130 nSent++
131 if opts.Limit == nSent {
132
133
134 return nil
135 }
136 }
137
138 after, keepGoing = getJSONMapString(json, "continueAfter")
139 }
140 return nil
141 }
142
143 func getJSONMapString(m map[string]interface{}, key string) (string, bool) {
144 if v, ok := m[key]; ok {
145 if s, ok := v.(string); ok {
146 return s, true
147 }
148 }
149 return "", false
150 }
151
152 func getJSONMapInt64(m map[string]interface{}, key string) (int64, bool) {
153 if v, ok := m[key]; ok {
154 if n, ok := v.(float64); ok {
155 return int64(n), true
156 }
157 }
158 return 0, false
159 }
160
161 func getJSONMapUint32(m map[string]interface{}, key string) (uint32, bool) {
162 u, ok := getJSONMapInt64(m, key)
163 if !ok {
164 return 0, false
165 }
166 if u < 0 || u > math.MaxUint32 {
167 return 0, false
168 }
169 return uint32(u), true
170 }
171
172 func getJSONMapArray(m map[string]interface{}, key string) ([]interface{}, bool) {
173 if v, ok := m[key]; ok {
174 if a, ok := v.([]interface{}); ok {
175 return a, true
176 }
177 }
178 return nil, false
179 }