1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package server
18
19 import (
20 "bytes"
21 "context"
22 "errors"
23 "fmt"
24 "io"
25 "log"
26 "net/http"
27 "net/url"
28 "strconv"
29 "sync"
30
31 "perkeep.org/internal/httputil"
32
33 "perkeep.org/pkg/blob"
34 "perkeep.org/pkg/blobserver"
35 "perkeep.org/pkg/client"
36 "perkeep.org/pkg/index"
37 "perkeep.org/pkg/schema"
38 "perkeep.org/pkg/types/camtypes"
39
40 "go4.org/types"
41 )
42
43
44
45
46 const numWorkers = 10
47
48
49
50
51 type shareImporter struct {
52 shareURL string
53 src *client.Client
54 dest blobserver.Storage
55
56 mu sync.RWMutex
57 seen int
58 copied int
59 running bool
60 assembled bool
61 br blob.Ref
62 err error
63
64 wg sync.WaitGroup
65 workc chan work
66 }
67
68
69 type work struct {
70 br blob.Ref
71 errc chan<- error
72 }
73
74
75 func (si *shareImporter) imprt(ctx context.Context, br blob.Ref) error {
76
77 const sniffSize = 900 * 1024
78 select {
79 case <-ctx.Done():
80 return ctx.Err()
81 default:
82 }
83 src := si.src
84 dest := si.dest
85 rc, _, err := src.Fetch(ctx, br)
86 if err != nil {
87 return err
88 }
89 rcc := types.NewOnceCloser(rc)
90 defer rcc.Close()
91
92 sniffer := index.NewBlobSniffer(br)
93 _, err = io.CopyN(sniffer, rc, sniffSize)
94 if err != nil && err != io.EOF {
95 return err
96 }
97 sniffer.Parse()
98 b, ok := sniffer.SchemaBlob()
99 if !ok {
100 return fmt.Errorf("%q: not a Perkeep schema.", br)
101 }
102 body, err := sniffer.Body()
103 if err != nil {
104 return err
105 }
106 rc = io.NopCloser(io.MultiReader(bytes.NewReader(body), rc))
107
108 switch b.Type() {
109 case "directory":
110 if _, err := blobserver.Receive(ctx, dest, br, rc); err != nil {
111 return err
112 }
113 ssbr, ok := b.DirectoryEntries()
114 if !ok {
115 return fmt.Errorf("%q not actually a directory", br)
116 }
117 rcc.Close()
118 return si.imprt(ctx, ssbr)
119 case "static-set":
120 if _, err := blobserver.Receive(ctx, dest, br, rc); err != nil {
121 return err
122 }
123 rcc.Close()
124 si.wg.Add(1)
125
126 go func() {
127 defer si.wg.Done()
128 si.mu.RLock()
129
130 if si.err != nil {
131 si.mu.RUnlock()
132 return
133 }
134 si.mu.RUnlock()
135 var errcs []<-chan error
136 for _, mref := range b.StaticSetMembers() {
137 errc := make(chan error, 1)
138 errcs = append(errcs, errc)
139 si.workc <- work{mref, errc}
140 }
141 for _, errc := range errcs {
142 if err := <-errc; err != nil {
143 si.mu.Lock()
144 si.err = err
145 si.mu.Unlock()
146 }
147 }
148 }()
149 return nil
150 case "file":
151 rcc.Close()
152 fr, err := schema.NewFileReader(ctx, src, br)
153 if err != nil {
154 return fmt.Errorf("NewFileReader: %v", err)
155 }
156 defer fr.Close()
157 si.mu.Lock()
158 si.seen++
159 si.mu.Unlock()
160 if _, err := schema.WriteFileMap(ctx, dest, b.Builder(), fr); err != nil {
161 return err
162 }
163 si.mu.Lock()
164 si.copied++
165 si.mu.Unlock()
166 return nil
167
168 default:
169 return errors.New("unknown blob type: " + string(b.Type()))
170 }
171 }
172
173
174
175 func (si *shareImporter) importAll(ctx context.Context) error {
176 src, shared, err := client.NewFromShareRoot(ctx, si.shareURL, client.OptionNoExternalConfig())
177 if err != nil {
178 return err
179 }
180 si.src = src
181 si.br = shared
182 si.workc = make(chan work, 2*numWorkers)
183 defer close(si.workc)
184
185
186 for i := 0; i < numWorkers; i++ {
187 go func() {
188 for wi := range si.workc {
189 wi.errc <- si.imprt(ctx, wi.br)
190 }
191 }()
192 }
193
194 err = si.imprt(ctx, shared)
195 si.wg.Wait()
196 if err == nil {
197 si.mu.RLock()
198 err = si.err
199 si.mu.RUnlock()
200 }
201 log.Print("share importer: all workers done")
202 if err != nil {
203 return err
204 }
205 return nil
206 }
207
208
209 func (si *shareImporter) importAssembled(ctx context.Context) {
210 res, err := http.Get(si.shareURL)
211 if err != nil {
212 return
213 }
214 defer res.Body.Close()
215 br, err := schema.WriteFileFromReader(ctx, si.dest, "", res.Body)
216 if err != nil {
217 return
218 }
219 si.mu.Lock()
220 si.br = br
221 si.mu.Unlock()
222 return
223 }
224
225
226 func (si *shareImporter) isAssembled() (bool, error) {
227 u, err := url.Parse(si.shareURL)
228 if err != nil {
229 return false, err
230 }
231 isAs, _ := strconv.ParseBool(u.Query().Get("assemble"))
232 return isAs, nil
233 }
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248 func (si *shareImporter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
249 if si.dest == nil {
250 http.Error(w, "shareImporter without a dest", 500)
251 return
252 }
253 if r.Method == "GET" {
254 si.serveProgress(w, r)
255 return
256 }
257 if r.Method != "POST" {
258 http.Error(w, "Not a POST", http.StatusBadRequest)
259 return
260 }
261 si.mu.Lock()
262 if si.running {
263 http.Error(w, "an import is already in progress", http.StatusServiceUnavailable)
264 si.mu.Unlock()
265 return
266 }
267 si.running = true
268 si.seen = 0
269 si.copied = 0
270 si.assembled = false
271 si.br = blob.Ref{}
272 si.mu.Unlock()
273
274 shareURL := r.FormValue("shareurl")
275 if shareURL == "" {
276 http.Error(w, "No shareurl parameter", http.StatusBadRequest)
277 return
278 }
279 si.shareURL = shareURL
280 isAs, err := si.isAssembled()
281 if err != nil {
282 http.Error(w, "Could not parse shareurl", http.StatusInternalServerError)
283 return
284 }
285
286 go func() {
287 defer func() {
288 si.mu.Lock()
289 si.running = false
290 si.mu.Unlock()
291 }()
292 if isAs {
293 si.mu.Lock()
294 si.assembled = true
295 si.mu.Unlock()
296 si.importAssembled(context.Background())
297 return
298 }
299 si.importAll(context.Background())
300 }()
301 w.WriteHeader(200)
302 }
303
304
305 func (si *shareImporter) serveProgress(w http.ResponseWriter, r *http.Request) {
306 si.mu.RLock()
307 defer si.mu.RUnlock()
308 httputil.ReturnJSON(w, camtypes.ShareImportProgress{
309 FilesSeen: si.seen,
310 FilesCopied: si.copied,
311 Running: si.running,
312 Assembled: si.assembled,
313 BlobRef: si.br,
314 })
315 }