1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package handlers
18
19 import (
20 "bytes"
21 "context"
22 "errors"
23 "fmt"
24 "io"
25 "log"
26 "mime"
27 "net/http"
28 "path"
29 "strings"
30
31 "perkeep.org/internal/httputil"
32 "perkeep.org/pkg/blob"
33 "perkeep.org/pkg/blobserver"
34 "perkeep.org/pkg/blobserver/protocol"
35 "perkeep.org/pkg/jsonsign/signhandler"
36 "perkeep.org/pkg/schema"
37
38 "go4.org/readerutil"
39 )
40
41
42
43 func CreateBatchUploadHandler(storage blobserver.BlobReceiveConfiger) http.Handler {
44 return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
45 handleMultiPartUpload(rw, req, storage)
46 })
47 }
48
49
50
51
52 func CreatePutUploadHandler(storage blobserver.BlobReceiver) http.Handler {
53 return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
54 ctx := req.Context()
55 if req.Method != "PUT" {
56 log.Printf("Inconfigured upload handler.")
57 httputil.BadRequestError(rw, "Inconfigured handler.")
58 return
59 }
60
61
62 if req.ContentLength > blobserver.MaxBlobSize {
63 httputil.BadRequestError(rw, "blob too big")
64 return
65 }
66 blobrefStr := path.Base(req.URL.Path)
67 br, ok := blob.Parse(blobrefStr)
68 if !ok {
69 log.Printf("Invalid PUT request to %q", req.URL.Path)
70 httputil.BadRequestError(rw, "Bad path")
71 return
72 }
73 if !br.IsSupported() {
74 httputil.BadRequestError(rw, "unsupported object hash function")
75 return
76 }
77 _, err := blobserver.Receive(ctx, storage, br, req.Body)
78 if err == blobserver.ErrCorruptBlob {
79 httputil.BadRequestError(rw, "data doesn't match declared digest")
80 return
81 }
82 if err != nil {
83 httputil.ServeError(rw, req, err)
84 return
85 }
86 rw.WriteHeader(http.StatusNoContent)
87 })
88 }
89
90
91
92
93 func vivify(ctx context.Context, blobReceiver blobserver.BlobReceiveConfiger, fileblob blob.SizedRef) error {
94 sf, ok := blobReceiver.(blob.Fetcher)
95 if !ok {
96 return fmt.Errorf("BlobReceiver is not a Fetcher")
97 }
98 fr, err := schema.NewFileReader(ctx, sf, fileblob.Ref)
99 if err != nil {
100 return fmt.Errorf("Filereader error for blobref %v: %v", fileblob.Ref.String(), err)
101 }
102 defer fr.Close()
103
104 h := blob.NewHash()
105 n, err := io.Copy(h, fr)
106 if err != nil {
107 return fmt.Errorf("Could not read all file of blobref %v: %v", fileblob.Ref.String(), err)
108 }
109 if n != fr.Size() {
110 return fmt.Errorf("Could not read all file of blobref %v. Wanted %v, got %v", fileblob.Ref.String(), fr.Size(), n)
111 }
112
113 config := blobReceiver.Config()
114 if config == nil {
115 return errors.New("blobReceiver has no config")
116 }
117 hf := config.HandlerFinder
118 if hf == nil {
119 return errors.New("blobReceiver config has no HandlerFinder")
120 }
121 JSONSignRoot, sh, err := hf.FindHandlerByType("jsonsign")
122 if err != nil || sh == nil {
123 return errors.New("jsonsign handler not found")
124 }
125 sigHelper, ok := sh.(*signhandler.Handler)
126 if !ok {
127 return errors.New("handler is not a JSON signhandler")
128 }
129 publicKeyBlobRef := sigHelper.Discovery(JSONSignRoot).PublicKeyBlobRef
130 if !publicKeyBlobRef.Valid() {
131 return fmt.Errorf("invalid publicKeyBlobRef %v in sign discovery", publicKeyBlobRef)
132 }
133
134
135
136
137
138 claimDate := fr.UnixMtime()
139 if claimDate.IsZero() {
140 return fmt.Errorf("While parsing modtime for file %v: %v", fr.FileName(), err)
141 }
142
143 permanodeBB := schema.NewHashPlannedPermanode(h)
144 permanodeBB.SetSigner(publicKeyBlobRef)
145 permanodeBB.SetClaimDate(claimDate)
146 permanodeSigned, err := sigHelper.Sign(ctx, permanodeBB)
147 if err != nil {
148 return fmt.Errorf("signing permanode %v: %v", permanodeSigned, err)
149 }
150 permanodeRef := blob.RefFromString(permanodeSigned)
151 _, err = blobserver.ReceiveNoHash(ctx, blobReceiver, permanodeRef, strings.NewReader(permanodeSigned))
152 if err != nil {
153 return fmt.Errorf("while uploading signed permanode %v, %v: %v", permanodeRef, permanodeSigned, err)
154 }
155
156 contentClaimBB := schema.NewSetAttributeClaim(permanodeRef, "camliContent", fileblob.Ref.String())
157 contentClaimBB.SetSigner(publicKeyBlobRef)
158 contentClaimBB.SetClaimDate(claimDate)
159 contentClaimSigned, err := sigHelper.Sign(ctx, contentClaimBB)
160 if err != nil {
161 return fmt.Errorf("signing camliContent claim: %v", err)
162 }
163 contentClaimRef := blob.RefFromString(contentClaimSigned)
164 _, err = blobserver.ReceiveNoHash(ctx, blobReceiver, contentClaimRef, strings.NewReader(contentClaimSigned))
165 if err != nil {
166 return fmt.Errorf("while uploading signed camliContent claim %v, %v: %v", contentClaimRef, contentClaimSigned, err)
167 }
168 return nil
169 }
170
171 func handleMultiPartUpload(rw http.ResponseWriter, req *http.Request, blobReceiver blobserver.BlobReceiveConfiger) {
172 ctx := req.Context()
173 res := new(protocol.UploadResponse)
174
175 if !(req.Method == "POST" && strings.Contains(req.URL.Path, "/camli/upload")) {
176 log.Printf("Inconfigured handler upload handler")
177 httputil.BadRequestError(rw, "Inconfigured handler.")
178 return
179 }
180
181 receivedBlobs := make([]blob.SizedRef, 0, 10)
182
183 multipart, err := req.MultipartReader()
184 if multipart == nil {
185 httputil.BadRequestError(rw, fmt.Sprintf(
186 "Expected multipart/form-data POST request; %v", err))
187 return
188 }
189
190 var errBuf bytes.Buffer
191 addError := func(s string) {
192 log.Printf("Client error: %s", s)
193 if errBuf.Len() > 0 {
194 errBuf.WriteByte('\n')
195 }
196 errBuf.WriteString(s)
197 }
198
199 for {
200 mimePart, err := multipart.NextPart()
201 if err == io.EOF {
202 break
203 }
204 if err != nil {
205 addError(fmt.Sprintf("Error reading multipart section: %v", err))
206 break
207 }
208
209 contentDisposition, params, err := mime.ParseMediaType(mimePart.Header.Get("Content-Disposition"))
210 if err != nil {
211 addError("invalid Content-Disposition")
212 break
213 }
214
215 if contentDisposition != "form-data" {
216 addError(fmt.Sprintf("Expected Content-Disposition of \"form-data\"; got %q", contentDisposition))
217 break
218 }
219
220 formName := params["name"]
221 ref, ok := blob.Parse(formName)
222 if !ok {
223 addError(fmt.Sprintf("Ignoring form key %q", formName))
224 continue
225 }
226
227 var tooBig int64 = blobserver.MaxBlobSize + 1
228 var readBytes int64
229 blobGot, err := blobserver.Receive(ctx, blobReceiver, ref, &readerutil.CountingReader{
230 Reader: io.LimitReader(mimePart, tooBig),
231 N: &readBytes,
232 })
233 if readBytes == tooBig {
234 err = fmt.Errorf("blob over the limit of %d bytes", blobserver.MaxBlobSize)
235 }
236 if err != nil {
237 addError(fmt.Sprintf("Error receiving blob %v: %v\n", ref, err))
238 break
239 }
240 log.Printf("Received blob %v\n", blobGot)
241 receivedBlobs = append(receivedBlobs, blobGot)
242 }
243
244 res.Received = receivedBlobs
245
246 if req.Header.Get("X-Camlistore-Vivify") == "1" {
247
248
249
250
251
252
253 for _, got := range receivedBlobs {
254 err := vivify(ctx, blobReceiver, got)
255 if err != nil {
256 addError(fmt.Sprintf("Error vivifying blob %v: %v\n", got.Ref.String(), err))
257 } else {
258 rw.Header().Add("X-Camlistore-Vivified", got.Ref.String())
259 }
260 }
261 }
262
263 res.ErrorText = errBuf.String()
264
265 httputil.ReturnJSON(rw, res)
266 }