1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package server
18
19 import (
20 "bytes"
21 "errors"
22 "fmt"
23 "io"
24 "log"
25 "net/http"
26 "strconv"
27 "strings"
28 "time"
29
30 "go4.org/jsonconfig"
31 "perkeep.org/internal/httputil"
32 "perkeep.org/pkg/auth"
33 "perkeep.org/pkg/blob"
34 "perkeep.org/pkg/blobserver"
35 "perkeep.org/pkg/blobserver/gethandler"
36 "perkeep.org/pkg/index"
37 "perkeep.org/pkg/schema"
38 )
39
40 type responseType int
41
42 const (
43 badRequest responseType = 400
44 unauthorizedRequest responseType = 401
45 )
46
47 type errorCode int
48
49 const (
50 noError errorCode = iota
51 assembleNonTransitive
52 invalidMethod
53 invalidURL
54 invalidVia
55 shareBlobInvalid
56 shareBlobTooLarge
57 shareExpired
58 shareDeleted
59 shareFetchFailed
60 shareReadFailed
61 shareTargetInvalid
62 shareNotTransitive
63 viaChainFetchFailed
64 viaChainInvalidLink
65 viaChainReadFailed
66 )
67
68 var errorCodeStr = [...]string{
69 noError: "noError",
70 assembleNonTransitive: "assembleNonTransitive",
71 invalidMethod: "invalidMethod",
72 invalidURL: "invalidURL",
73 invalidVia: "invalidVia",
74 shareBlobInvalid: "shareBlobInvalid",
75 shareBlobTooLarge: "shareBlobTooLarge",
76 shareExpired: "shareExpired",
77 shareDeleted: "shareDeleted",
78 shareFetchFailed: "shareFetchFailed",
79 shareReadFailed: "shareReadFailed",
80 shareTargetInvalid: "shareTargetInvalid",
81 shareNotTransitive: "shareNotTransitive",
82 viaChainFetchFailed: "viaChainFetchFailed",
83 viaChainInvalidLink: "viaChainInvalidLink",
84 viaChainReadFailed: "viaChainReadFailed",
85 }
86
87 func (ec errorCode) String() string {
88 if ec < 0 || int(ec) >= len(errorCodeStr) || errorCodeStr[ec] == "" {
89 return fmt.Sprintf("ErrCode#%d", int(ec))
90 }
91 return errorCodeStr[ec]
92 }
93
94 type shareError struct {
95 code errorCode
96 response responseType
97 message string
98 }
99
100 func (e *shareError) Error() string {
101 return fmt.Sprintf("share: %v (code=%v, type=%v)", e.message, e.code, e.response)
102 }
103
104 func unauthorized(code errorCode, format string, args ...interface{}) *shareError {
105 return &shareError{
106 code: code, response: unauthorizedRequest, message: fmt.Sprintf(format, args...),
107 }
108 }
109
110 const fetchFailureDelay = 200 * time.Millisecond
111
112
113 type shareHandler struct {
114 fetcher blob.Fetcher
115 idx *index.Index
116 log bool
117 }
118
119 func init() {
120 blobserver.RegisterHandlerConstructor("share", newShareFromConfig)
121 }
122
123 func newShareFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (http.Handler, error) {
124 blobRoot := conf.RequiredString("blobRoot")
125 if blobRoot == "" {
126 return nil, errors.New("No blobRoot defined for share handler")
127 }
128 indexPrefix := conf.RequiredString("index")
129 if err := conf.Validate(); err != nil {
130 return nil, err
131 }
132
133 bs, err := ld.GetStorage(blobRoot)
134 if err != nil {
135 return nil, fmt.Errorf("failed to get share handler's storage at %q: %v", blobRoot, err)
136 }
137 fetcher, ok := bs.(blob.Fetcher)
138 if !ok {
139 return nil, errors.New("share handler's storage not a Fetcher")
140 }
141
142
143
144
145 indexHandler, err := ld.GetHandler(indexPrefix)
146 if err != nil {
147 return nil, fmt.Errorf("share handler config references unknown handler %q", indexPrefix)
148 }
149 indexer, ok := indexHandler.(*index.Index)
150 if !ok {
151 return nil, fmt.Errorf("share handler config references invalid indexer %q (actually a %T)", indexPrefix, indexHandler)
152 }
153
154 sh := &shareHandler{
155 fetcher: fetcher,
156 idx: indexer,
157 log: true,
158 }
159 return sh, nil
160 }
161
162 var timeSleep = time.Sleep
163
164
165 func (h *shareHandler) handleGetViaSharing(rw http.ResponseWriter, req *http.Request,
166 blobRef blob.Ref) error {
167 ctx := req.Context()
168 if !httputil.IsGet(req) {
169 return &shareError{code: invalidMethod, response: badRequest, message: "Invalid method"}
170 }
171
172 rw.Header().Set("Access-Control-Allow-Origin", "*")
173
174 viaPathOkay := false
175 startTime := time.Now()
176 defer func() {
177 if !viaPathOkay {
178
179
180 sleep := fetchFailureDelay - time.Since(startTime)
181 timeSleep(sleep)
182 }
183 }()
184 viaBlobs := make([]blob.Ref, 0)
185 if via := req.FormValue("via"); via != "" {
186 for _, vs := range strings.Split(via, ",") {
187 if br, ok := blob.Parse(vs); ok {
188 viaBlobs = append(viaBlobs, br)
189 } else {
190 return &shareError{code: invalidVia, response: badRequest, message: "Malformed blobref in via param"}
191 }
192 }
193 }
194
195 fetchChain := make([]blob.Ref, 0)
196 fetchChain = append(fetchChain, viaBlobs...)
197 fetchChain = append(fetchChain, blobRef)
198 isTransitive := false
199 for i, br := range fetchChain {
200 switch i {
201 case 0:
202 if h.idx != nil {
203 h.idx.RLock()
204 isDeleted := h.idx.IsDeleted(br)
205 h.idx.RUnlock()
206 if isDeleted {
207 return unauthorized(shareDeleted, "Share was deleted")
208 }
209 }
210 file, size, err := h.fetcher.Fetch(ctx, br)
211 if err != nil {
212 return unauthorized(shareFetchFailed, "Fetch chain 0 of %s failed: %v", br, err)
213 }
214 defer file.Close()
215 if size > schema.MaxSchemaBlobSize {
216 return unauthorized(shareBlobTooLarge, "Fetch chain 0 of %s too large", br)
217 }
218 blob, err := schema.BlobFromReader(br, file)
219 if err != nil {
220 return unauthorized(shareReadFailed, "Can't create a blob from %v: %v", br, err)
221 }
222 share, ok := blob.AsShare()
223 if !ok {
224 return unauthorized(shareBlobInvalid, "Fetch chain 0 of %s wasn't a valid Share (is %q)", br, blob.Type())
225 }
226 if share.IsExpired() {
227 return unauthorized(shareExpired, "Share is expired")
228 }
229 if len(fetchChain) > 1 && fetchChain[1].String() != share.Target().String() {
230 return unauthorized(shareTargetInvalid,
231 "Fetch chain 0->1 (%s -> %q) unauthorized, expected hop to %q",
232 br, fetchChain[1], share.Target())
233 }
234 isTransitive = share.IsTransitive()
235 if len(fetchChain) > 2 && !isTransitive {
236 return unauthorized(shareNotTransitive, "Share is not transitive")
237 }
238 case len(fetchChain) - 1:
239
240
241 continue
242 default:
243 rc, _, err := h.fetcher.Fetch(ctx, br)
244 if err != nil {
245 return unauthorized(viaChainFetchFailed, "Fetch chain %d of %s failed: %v", i, br, err)
246 }
247 defer rc.Close()
248 lr := io.LimitReader(rc, schema.MaxSchemaBlobSize)
249 slurpBytes, err := io.ReadAll(lr)
250 if err != nil {
251 return unauthorized(viaChainReadFailed,
252 "Fetch chain %d of %s failed in slurp: %v", i, br, err)
253 }
254 sought := fetchChain[i+1]
255 if !bytesHaveSchemaLink(br, slurpBytes, sought) {
256 return unauthorized(viaChainInvalidLink,
257 "Fetch chain %d of %s failed; no reference to %s", i, br, sought)
258 }
259 }
260 }
261
262 if assemble, _ := strconv.ParseBool(req.FormValue("assemble")); assemble {
263 if !isTransitive {
264 return unauthorized(assembleNonTransitive, "Cannot assemble non-transitive share")
265 }
266 dh := &DownloadHandler{
267 Fetcher: h.fetcher,
268 forceInline: true,
269
270 }
271 dh.ServeFile(rw, req, blobRef)
272 } else {
273 gethandler.ServeBlobRef(rw, req, blobRef, h.fetcher)
274 }
275 viaPathOkay = true
276 return nil
277 }
278
279 func (h *shareHandler) serveHTTP(rw http.ResponseWriter, req *http.Request) error {
280 var err error
281 pathSuffix := httputil.PathSuffix(req)
282 if len(pathSuffix) == 0 {
283
284 pathSuffix = strings.TrimLeft(req.URL.Path, "/")
285 }
286 pathParts := strings.SplitN(pathSuffix, "/", 2)
287 blobRef, ok := blob.Parse(pathParts[0])
288 if !ok {
289 err = &shareError{code: invalidURL, response: badRequest,
290 message: fmt.Sprintf("Malformed share pathSuffix: %s", pathSuffix)}
291 } else {
292 err = h.handleGetViaSharing(rw, req, blobRef)
293 }
294 if se, ok := err.(*shareError); ok {
295 switch se.response {
296 case badRequest:
297 httputil.BadRequestError(rw, err.Error())
298 case unauthorizedRequest:
299 if h.log {
300 log.Print(err)
301 }
302 auth.SendUnauthorized(rw, req)
303 }
304 }
305 return err
306 }
307
308 func (h *shareHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
309 h.serveHTTP(rw, req)
310 }
311
312
313
314
315 func bytesHaveSchemaLink(br blob.Ref, bb []byte, target blob.Ref) bool {
316
317 if !bytes.Contains(bb, []byte(target.String())) {
318 return false
319 }
320 b, err := schema.BlobFromReader(br, bytes.NewReader(bb))
321 if err != nil {
322 return false
323 }
324 typ := b.Type()
325 switch typ {
326 case schema.TypeFile, schema.TypeBytes:
327 for _, bp := range b.ByteParts() {
328 if bp.BlobRef.Valid() {
329 if bp.BlobRef == target {
330 return true
331 }
332 }
333 if bp.BytesRef.Valid() {
334 if bp.BytesRef == target {
335 return true
336 }
337 }
338 }
339 case schema.TypeDirectory:
340 if d, ok := b.DirectoryEntries(); ok {
341 return d == target
342 }
343 case schema.TypeStaticSet:
344 for _, m := range b.StaticSetMembers() {
345 if m == target {
346 return true
347 }
348 }
349 }
350 return false
351 }