1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package client
18
19 import (
20 "bytes"
21 "context"
22 "errors"
23 "fmt"
24 "io"
25 "math"
26 "net/http"
27 "os"
28 "regexp"
29
30 "perkeep.org/pkg/blob"
31 "perkeep.org/pkg/blobserver"
32 "perkeep.org/pkg/constants"
33 "perkeep.org/pkg/schema"
34
35 "go4.org/readerutil"
36 "go4.org/types"
37 )
38
39 func (c *Client) FetchSchemaBlob(ctx context.Context, b blob.Ref) (*schema.Blob, error) {
40 rc, _, err := c.Fetch(ctx, b)
41 if err != nil {
42 return nil, err
43 }
44 defer rc.Close()
45 return schema.BlobFromReader(b, rc)
46 }
47
48 func (c *Client) Fetch(ctx context.Context, b blob.Ref) (io.ReadCloser, uint32, error) {
49 return c.fetchVia(ctx, b, c.viaPathTo(b))
50 }
51
52 func (c *Client) viaPathTo(b blob.Ref) (path []blob.Ref) {
53 c.viaMu.RLock()
54 defer c.viaMu.RUnlock()
55
56 key := b
57 for {
58 v, ok := c.via[key]
59 if !ok {
60 break
61 }
62 key = v
63 path = append(path, key)
64 }
65
66 for i := 0; i < len(path)/2; i++ {
67 path[i], path[len(path)-i-1] = path[len(path)-i-1], path[i]
68 }
69 return
70 }
71
72 var blobsRx = regexp.MustCompile(blob.Pattern)
73
74 func (c *Client) fetchVia(ctx context.Context, b blob.Ref, v []blob.Ref) (body io.ReadCloser, size uint32, err error) {
75 if c.sto != nil {
76 if len(v) > 0 {
77 return nil, 0, errors.New("FetchVia not supported in non-HTTP mode")
78 }
79 return c.sto.Fetch(ctx, b)
80 }
81 pfx, err := c.blobPrefix()
82 if err != nil {
83 return nil, 0, err
84 }
85 url := fmt.Sprintf("%s/%s", pfx, b)
86
87 if len(v) > 0 {
88 buf := bytes.NewBufferString(url)
89 buf.WriteString("?via=")
90 for i, br := range v {
91 if i != 0 {
92 buf.WriteString(",")
93 }
94 buf.WriteString(br.String())
95 }
96 url = buf.String()
97 }
98
99 req := c.newRequest(ctx, "GET", url)
100 resp, err := c.httpClient.Do(req)
101 if err != nil {
102 return nil, 0, err
103 }
104 defer func() {
105 if err != nil {
106 resp.Body.Close()
107 }
108 }()
109 if resp.StatusCode == http.StatusNotFound {
110
111 return nil, 0, os.ErrNotExist
112 }
113 if resp.StatusCode != http.StatusOK {
114 return nil, 0, fmt.Errorf("Got status code %d from blobserver for %s", resp.StatusCode, b)
115 }
116
117 var reader io.Reader = resp.Body
118 var closer io.Closer = resp.Body
119 if resp.ContentLength > 0 {
120 if resp.ContentLength > math.MaxUint32 {
121 return nil, 0, fmt.Errorf("Blob %s over %d bytes", b, uint32(math.MaxUint32))
122 }
123 size = uint32(resp.ContentLength)
124 } else {
125 var buf bytes.Buffer
126 size = 0
127
128 n, err := io.CopyN(&buf, resp.Body, constants.MaxBlobSize+1)
129 if n > blobserver.MaxBlobSize {
130 return nil, 0, fmt.Errorf("Blob %s over %d bytes; not reading more", b, blobserver.MaxBlobSize)
131 }
132 if err == nil {
133 panic("unexpected")
134 } else if err == io.EOF {
135 size = uint32(n)
136 reader, closer = &buf, types.NopCloser
137 } else {
138 return nil, 0, fmt.Errorf("Error reading %s: %w", b, err)
139 }
140 }
141
142 var buf bytes.Buffer
143 if err := c.UpdateShareChain(b, io.TeeReader(reader, &buf)); err != nil {
144 if err != ErrNotSharing {
145 return nil, 0, err
146 }
147 }
148 mr := io.MultiReader(&buf, reader)
149 var rc io.ReadCloser = struct {
150 io.Reader
151 io.Closer
152 }{mr, closer}
153
154 return rc, size, nil
155 }
156
157
158
159 var ErrNotSharing = errors.New("client can not deal with shared blobs. Create it with NewFromShareRoot")
160
161
162
163
164
165 func (c *Client) UpdateShareChain(b blob.Ref, r io.Reader) error {
166 c.viaMu.Lock()
167 defer c.viaMu.Unlock()
168 if c.via == nil {
169
170 return ErrNotSharing
171 }
172
173 var buf bytes.Buffer
174 const maxSlurp = 1 << 20
175 if _, err := io.Copy(&buf, io.LimitReader(r, maxSlurp)); err != nil {
176 return err
177 }
178
179 if schema.LikelySchemaBlob(buf.Bytes()) {
180 for _, blobstr := range blobsRx.FindAllString(buf.String(), -1) {
181 br, ok := blob.Parse(blobstr)
182 if !ok {
183 c.printf("Invalid blob ref %q noticed in schema of %v", blobstr, b)
184 continue
185 }
186 c.via[br] = b
187 }
188 }
189 return nil
190 }
191
192 func (c *Client) ReceiveBlob(ctx context.Context, br blob.Ref, source io.Reader) (blob.SizedRef, error) {
193 if c.sto != nil {
194 return blobserver.Receive(ctx, c.sto, br, source)
195 }
196 size, ok := readerutil.Size(source)
197 if !ok {
198 size = 0
199 }
200 h := &UploadHandle{
201 BlobRef: br,
202 Size: uint32(size),
203 Contents: source,
204 SkipStat: true,
205 }
206 pr, err := c.Upload(ctx, h)
207 if err != nil {
208 return blob.SizedRef{}, err
209 }
210 return pr.SizedBlobRef(), nil
211 }