1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17
18 package importer
19
20 import (
21 "context"
22 "errors"
23 "fmt"
24 "html/template"
25 "log"
26 "net/http"
27 "net/url"
28 "os"
29 "sort"
30 "strconv"
31 "strings"
32 "sync"
33 "time"
34
35 "perkeep.org/internal/httputil"
36 "perkeep.org/pkg/blob"
37 "perkeep.org/pkg/blobserver"
38 "perkeep.org/pkg/jsonsign/signhandler"
39 "perkeep.org/pkg/schema"
40 "perkeep.org/pkg/search"
41 "perkeep.org/pkg/server"
42 "perkeep.org/pkg/types/camtypes"
43
44 "go4.org/ctxutil"
45 "go4.org/jsonconfig"
46 "go4.org/syncutil"
47 )
48
49 const (
50 attrNodeType = "camliNodeType"
51 nodeTypeImporter = "importer"
52 nodeTypeImporterAccount = "importerAccount"
53
54 attrImporterType = "importerType"
55 attrClientID = "authClientID"
56 attrClientSecret = "authClientSecret"
57 attrImportRoot = "importRoot"
58 attrImportAuto = "importAuto"
59 )
60
61
62 type Importer interface {
63
64
65
66
67
68
69 Run(*RunContext) error
70
71
72 Properties() Properties
73
74
75
76 IsAccountReady(acctNode *Object) (ok bool, err error)
77 SummarizeAccount(acctNode *Object) string
78
79 ServeSetup(w http.ResponseWriter, r *http.Request, ctx *SetupContext) error
80 ServeCallback(w http.ResponseWriter, r *http.Request, ctx *SetupContext)
81
82
83
84
85
86 CallbackRequestAccount(r *http.Request) (acctRef blob.Ref, err error)
87
88
89
90 CallbackURLParameters(acctRef blob.Ref) url.Values
91 }
92
93
94 type Properties struct {
95 Title string
96 Description string
97
98
99
100 TODOIssue int
101
102
103
104
105
106
107 NeedsAPIKey bool
108
109
110
111
112
113
114 SupportsIncremental bool
115
116
117
118
119
120
121
122
123
124
125 PermanodeImporterType string
126 }
127
128
129
130
131 type LongPoller interface {
132 Importer
133
134
135
136
137 LongPoll(*RunContext) error
138 }
139
140
141
142
143
144
145 type TestDataMaker interface {
146 MakeTestData() http.RoundTripper
147
148
149 SetTestAccount(acctNode *Object) error
150 }
151
152
153
154 type ImporterSetupHTMLer interface {
155 AccountSetupHTML(*Host) string
156 }
157
158 var (
159 importers = map[string]Importer{}
160 reservedImporterKey = map[string]bool{}
161 )
162
163
164
165 func All() map[string]Importer {
166 return importers
167 }
168
169
170
171 func Register(name string, im Importer) {
172 if _, dup := importers[name]; dup {
173 panic("Dup registration of importer " + name)
174 }
175 if _, dup := reservedImporterKey[name]; dup {
176 panic("Dup registration of importer " + name)
177 }
178 if pt := im.Properties().PermanodeImporterType; pt != "" {
179 if _, dup := importers[pt]; dup {
180 panic("Dup registration of importer " + pt)
181 }
182 if _, dup := reservedImporterKey[pt]; dup {
183 panic("Dup registration of importer " + pt)
184 }
185 reservedImporterKey[pt] = true
186 }
187 importers[name] = im
188 }
189
190 func RegisterTODO(name string, p Properties) {
191 Register(name, &todoImp{Props: p})
192 }
193
194 func init() {
195
196 blobserver.RegisterHandlerConstructor("importer", newFromConfig)
197 }
198
199
200 type HostConfig struct {
201 BaseURL string
202 Prefix string
203 Target blobserver.StatReceiver
204 BlobSource blob.Fetcher
205 Signer *schema.Signer
206 Search search.QueryDescriber
207 ClientId map[string]string
208 ClientSecret map[string]string
209
210
211
212 HTTPClient *http.Client
213
214 }
215
216 func NewHost(hc HostConfig) (*Host, error) {
217 h := &Host{
218 baseURL: hc.BaseURL,
219 importerBase: hc.BaseURL + hc.Prefix,
220 imp: make(map[string]*importer),
221 }
222 var err error
223 h.tmpl, err = tmpl.Clone()
224 if err != nil {
225 return nil, err
226 }
227 h.tmpl = h.tmpl.Funcs(map[string]interface{}{
228 "bloblink": func(br blob.Ref) template.HTML {
229 if h.uiPrefix == "" {
230 return template.HTML(br.String())
231 }
232 return template.HTML(fmt.Sprintf("<a href=\"%s/%s\">%s</a>", h.uiPrefix, br, br))
233 },
234 })
235 for k, impl := range importers {
236 h.importers = append(h.importers, k)
237 clientId, clientSecret := hc.ClientId[k], hc.ClientSecret[k]
238 if clientSecret != "" && clientId == "" {
239 return nil, fmt.Errorf("Invalid static configuration for importer %q: clientSecret specified without clientId", k)
240 }
241 props := impl.Properties()
242 imp := &importer{
243 host: h,
244 name: k,
245 impl: impl,
246 props: &props,
247 clientID: clientId,
248 clientSecret: clientSecret,
249 }
250 h.imp[k] = imp
251 }
252
253 sort.Strings(h.importers)
254
255 h.target = hc.Target
256 h.blobSource = hc.BlobSource
257 h.signer = hc.Signer
258 h.search = hc.Search
259 h.client = hc.HTTPClient
260
261 return h, nil
262 }
263
264 func newFromConfig(ld blobserver.Loader, cfg jsonconfig.Obj) (http.Handler, error) {
265 hc := HostConfig{
266 BaseURL: ld.BaseURL(),
267 Prefix: ld.MyPrefix(),
268 }
269 ClientId := make(map[string]string)
270 ClientSecret := make(map[string]string)
271 for k := range importers {
272 var clientId, clientSecret string
273 if impConf := cfg.OptionalObject(k); impConf != nil {
274 clientId = impConf.OptionalString("clientID", "")
275 clientSecret = impConf.OptionalString("clientSecret", "")
276
277
278 if clientId == "" && strings.Contains(clientSecret, ":") {
279 if f := strings.SplitN(clientSecret, ":", 2); len(f) == 2 {
280 clientId, clientSecret = f[0], f[1]
281 }
282 }
283 if err := impConf.Validate(); err != nil {
284 return nil, fmt.Errorf("Invalid static configuration for importer %q: %v", k, err)
285 }
286 ClientId[k] = clientId
287 ClientSecret[k] = clientSecret
288 }
289 }
290 if err := cfg.Validate(); err != nil {
291 return nil, err
292 }
293 hc.ClientId = ClientId
294 hc.ClientSecret = ClientSecret
295 host, err := NewHost(hc)
296 if err != nil {
297 return nil, err
298 }
299 host.didInit.Add(1)
300 return host, nil
301 }
302
303 var _ blobserver.HandlerIniter = (*Host)(nil)
304
305 type SetupContext struct {
306 context.Context
307 Host *Host
308 AccountNode *Object
309
310 ia *importerAcct
311 }
312
313 func (sc *SetupContext) Credentials() (clientID, clientSecret string, err error) {
314 return sc.ia.im.credentials()
315 }
316
317 func (sc *SetupContext) CallbackURL() string {
318 params := sc.ia.im.impl.CallbackURLParameters(sc.AccountNode.PermanodeRef()).Encode()
319 if params != "" {
320 params = "?" + params
321 }
322 return sc.Host.ImporterBaseURL() + sc.ia.im.name + "/callback" + params
323 }
324
325
326
327 func (sc *SetupContext) AccountURL() string {
328 return sc.Host.ImporterBaseURL() + sc.ia.im.name + "/" + sc.AccountNode.PermanodeRef().String()
329 }
330
331
332
333 type RunContext struct {
334 ctx context.Context
335 cancel context.CancelFunc
336 Host *Host
337
338 ia *importerAcct
339
340 mu sync.Mutex
341 lastProgress *ProgressMessage
342 }
343
344
345 func (rc *RunContext) Context() context.Context {
346 if rc.ctx != nil {
347 return rc.ctx
348 }
349 return context.Background()
350 }
351
352
353
354 func CreateAccount(h *Host, impl string) (*RunContext, error) {
355 imp, ok := h.imp[impl]
356 if !ok {
357 return nil, fmt.Errorf("host does not have a %v importer", impl)
358 }
359 ia, err := imp.newAccount()
360 if err != nil {
361 return nil, fmt.Errorf("could not create new account for importer %v: %v", impl, err)
362 }
363 rc := &RunContext{
364 Host: ia.im.host,
365 ia: ia,
366 }
367 rc.ctx, rc.cancel = context.WithCancel(context.WithValue(context.Background(), ctxutil.HTTPClient, ia.im.host.HTTPClient()))
368 return rc, nil
369
370 }
371
372
373
374
375 func (rc *RunContext) Credentials() (clientID, clientSecret string, err error) {
376 return rc.ia.im.credentials()
377 }
378
379
380
381
382
383
384
385
386
387
388
389 func (rc *RunContext) AccountNode() *Object { return rc.ia.acct }
390
391
392
393
394
395
396 func (rc *RunContext) RootNode() *Object { return rc.ia.root }
397
398
399
400 type Host struct {
401 tmpl *template.Template
402 importers []string
403 imp map[string]*importer
404 baseURL string
405 importerBase string
406 target blobserver.StatReceiver
407 blobSource blob.Fetcher
408 search search.QueryDescriber
409 signer *schema.Signer
410 uiPrefix string
411
412
413
414
415
416
417
418 didInit sync.WaitGroup
419
420
421
422 client *http.Client
423 transport http.RoundTripper
424 }
425
426
427 type accountStatus struct {
428 Name string `json:"name"`
429 Type string `json:"type"`
430 Href string `json:"href"`
431
432 StartedUnixSec int64 `json:"startedUnixSec"`
433 LastFinishedUnixSec int64 `json:"finishedUnixSec"`
434 LastError string `json:"lastRunError"`
435 }
436
437
438
439 func (h *Host) AccountsStatus() (interface{}, []camtypes.StatusError) {
440 h.didInit.Wait()
441 var s []accountStatus
442 var errs []camtypes.StatusError
443 for _, impName := range h.importers {
444 imp := h.imp[impName]
445 accts, _ := imp.Accounts()
446 for _, ia := range accts {
447 as := accountStatus{
448 Type: impName,
449 Href: ia.AccountURL(),
450 Name: ia.AccountLinkSummary(),
451 }
452 ia.mu.Lock()
453 if ia.current != nil {
454 as.StartedUnixSec = ia.lastRunStart.Unix()
455 }
456 if !ia.lastRunDone.IsZero() {
457 as.LastFinishedUnixSec = ia.lastRunDone.Unix()
458 }
459 if ia.lastRunErr != nil {
460 as.LastError = ia.lastRunErr.Error()
461 errs = append(errs, camtypes.StatusError{
462 Error: ia.lastRunErr.Error(),
463 URL: ia.AccountURL(),
464 })
465 }
466 ia.mu.Unlock()
467 s = append(s, as)
468 }
469 }
470 return s, errs
471 }
472
473
474
475 func (h *Host) RunImporterAccount(importerType string, accountNode blob.Ref) error {
476 h.didInit.Wait()
477 imp, ok := h.imp[importerType]
478 if !ok {
479 return fmt.Errorf("no %q importer for this account", importerType)
480 }
481 accounts, err := imp.Accounts()
482 if err != nil {
483 return err
484 }
485 for _, ia := range accounts {
486 if ia.acct.pn != accountNode {
487 continue
488 }
489 return ia.run()
490 }
491 return fmt.Errorf("no %v account matching account in node %v", importerType, accountNode)
492 }
493
494 func (h *Host) InitHandler(hl blobserver.FindHandlerByTyper) error {
495 if prefix, _, err := hl.FindHandlerByType("ui"); err == nil {
496 h.uiPrefix = prefix
497 }
498
499 _, handler, err := hl.FindHandlerByType("root")
500 if err != nil || handler == nil {
501 return errors.New("importer requires a 'root' handler")
502 }
503 rh := handler.(*server.RootHandler)
504 searchHandler, ok := rh.SearchHandler()
505 if !ok {
506 return errors.New("importer requires a 'root' handler with 'searchRoot' defined")
507 }
508 h.search = searchHandler
509 if rh.Storage == nil {
510 return errors.New("importer requires a 'root' handler with 'blobRoot' defined")
511 }
512 h.target = rh.Storage
513 h.blobSource = rh.Storage
514
515 _, handler, _ = hl.FindHandlerByType("jsonsign")
516 if sigh, ok := handler.(*signhandler.Handler); ok {
517 h.signer = sigh.Signer()
518 }
519 if h.signer == nil {
520 return errors.New("importer requires a 'jsonsign' handler")
521 }
522 h.didInit.Done()
523 go h.startPeriodicImporters()
524 return nil
525 }
526
527
528
529
530
531
532
533 func (h *Host) ServeHTTP(w http.ResponseWriter, r *http.Request) {
534 suffix := httputil.PathSuffix(r)
535 seg := strings.Split(suffix, "/")
536 if suffix == "" || len(seg) == 0 {
537 h.serveImportersRoot(w, r)
538 return
539 }
540 impName := seg[0]
541
542 imp, ok := h.imp[impName]
543 if !ok {
544 http.NotFound(w, r)
545 return
546 }
547
548 if len(seg) == 1 || seg[1] == "" {
549 h.serveImporter(w, r, imp)
550 return
551 }
552 if seg[1] == "callback" {
553 h.serveImporterAcctCallback(w, r, imp)
554 return
555 }
556 acctRef, ok := blob.Parse(seg[1])
557 if !ok {
558 http.NotFound(w, r)
559 return
560 }
561 h.serveImporterAccount(w, r, imp, acctRef)
562 }
563
564
565 func (h *Host) serveImportersRoot(w http.ResponseWriter, r *http.Request) {
566 body := importersRootBody{
567 Host: h,
568 Importers: make([]*importer, 0, len(h.imp)),
569 }
570 for _, v := range h.importers {
571 body.Importers = append(body.Importers, h.imp[v])
572 }
573 sort.Slice(body.Importers, func(i, j int) bool {
574 return body.Importers[i].Title() < body.Importers[j].Title()
575 })
576 h.execTemplate(w, r, importersRootPage{
577 Title: "Importers",
578 Body: body,
579 })
580 }
581
582
583 func (h *Host) serveImporter(w http.ResponseWriter, r *http.Request, imp *importer) {
584 if r.Method == "POST" {
585 h.serveImporterPost(w, r, imp)
586 return
587 }
588
589 var setup string
590 node, _ := imp.Node()
591 if setuper, ok := imp.impl.(ImporterSetupHTMLer); ok && node != nil {
592 setup = setuper.AccountSetupHTML(h)
593 }
594
595 h.execTemplate(w, r, importerPage{
596 Title: "Importer - " + imp.Name(),
597 Body: importerBody{
598 Host: h,
599 Importer: imp,
600 SetupHelp: template.HTML(setup),
601 },
602 })
603 }
604
605
606 func (h *Host) serveImporterAcctCallback(w http.ResponseWriter, r *http.Request, imp *importer) {
607 if r.Method != "GET" {
608 http.Error(w, "invalid method", http.StatusBadRequest)
609 return
610 }
611 acctRef, err := imp.impl.CallbackRequestAccount(r)
612 if err != nil {
613 httputil.ServeError(w, r, err)
614 return
615 }
616 if !acctRef.Valid() {
617 httputil.ServeError(w, r, errors.New("No valid blobref returned from CallbackRequestAccount(r)"))
618 return
619 }
620 ia, err := imp.account(acctRef)
621 if err != nil {
622 http.Error(w, "invalid 'acct' param: "+err.Error(), http.StatusBadRequest)
623 return
624 }
625 imp.impl.ServeCallback(w, r, &SetupContext{
626 Context: context.TODO(),
627 Host: h,
628 AccountNode: ia.acct,
629 ia: ia,
630 })
631 }
632
633 func (h *Host) serveImporterPost(w http.ResponseWriter, r *http.Request, imp *importer) {
634 switch r.FormValue("mode") {
635 default:
636 http.Error(w, "Unknown mode.", http.StatusBadRequest)
637 case "newacct":
638 ia, err := imp.newAccount()
639 if err != nil {
640 http.Error(w, err.Error(), 500)
641 return
642 }
643 ia.setup(w, r)
644 return
645 case "saveclientidsecret":
646 n, err := imp.Node()
647 if err != nil {
648 http.Error(w, "Error getting node: "+err.Error(), 500)
649 return
650 }
651 if err := n.SetAttrs(
652 attrClientID, r.FormValue("clientID"),
653 attrClientSecret, r.FormValue("clientSecret"),
654 ); err != nil {
655 http.Error(w, "Error saving node: "+err.Error(), 500)
656 return
657 }
658 http.Redirect(w, r, h.ImporterBaseURL()+imp.name, http.StatusFound)
659 }
660 }
661
662
663 func (h *Host) serveImporterAccount(w http.ResponseWriter, r *http.Request, imp *importer, acctRef blob.Ref) {
664 ia, err := imp.account(acctRef)
665 if err != nil {
666 http.Error(w, "Unknown or invalid importer account "+acctRef.String()+": "+err.Error(), http.StatusBadRequest)
667 return
668 }
669 ia.ServeHTTP(w, r)
670 }
671
672 func (h *Host) startPeriodicImporters() {
673 res, err := h.search.Query(context.TODO(), &search.SearchQuery{
674 Sort: search.Unsorted,
675 Expression: "attr:camliNodeType:importerAccount",
676 Describe: &search.DescribeRequest{
677 Depth: 1,
678 },
679 })
680 if err != nil {
681 log.Printf("periodic importer search fail: %v", err)
682 return
683 }
684 if res.Describe == nil {
685 log.Printf("No describe response in search result")
686 return
687 }
688 for _, resBlob := range res.Blobs {
689 blob := resBlob.Blob
690 desBlob, ok := res.Describe.Meta[blob.String()]
691 if !ok || desBlob.Permanode == nil {
692 continue
693 }
694 attrs := desBlob.Permanode.Attr
695 if attrs.Get(attrNodeType) != nodeTypeImporterAccount {
696 panic("Search result returned non-importerAccount")
697 }
698 impType := attrs.Get("importerType")
699 imp, ok := h.imp[impType]
700 if !ok {
701 continue
702 }
703 ia, err := imp.account(blob)
704 if err != nil {
705 log.Printf("Can't load importer account %v for regular importing: %v", blob, err)
706 continue
707 }
708 go ia.maybeStart()
709 }
710 }
711
712 var disableImporters, _ = strconv.ParseBool(os.Getenv("CAMLI_DISABLE_IMPORTERS"))
713
714 func (ia *importerAcct) maybeStart() {
715 if disableImporters {
716 log.Printf("Importers disabled, per environment.")
717 return
718 }
719 acctObj, err := ia.im.host.ObjectFromRef(ia.acct.PermanodeRef())
720 if err != nil {
721 log.Printf("Error maybe starting %v: %v", ia.acct.PermanodeRef(), err)
722 return
723 }
724 duration, err := time.ParseDuration(acctObj.Attr(attrImportAuto))
725 if duration == 0 || err != nil {
726 return
727 }
728 ia.mu.Lock()
729 defer ia.mu.Unlock()
730 if ia.current != nil {
731 return
732 }
733 if ia.lastRunDone.After(time.Now().Add(-duration)) {
734
735 if lp, ok := ia.im.impl.(LongPoller); ok {
736 sleepFor := time.Until(ia.lastRunDone.Add(duration))
737 sleepCtx, cancel := context.WithTimeout(context.Background(), sleepFor)
738 log.Printf("%v ran recently enough. Sleeping for %v.", ia, sleepFor)
739 timer := time.AfterFunc(sleepFor, ia.maybeStart)
740
741 rc := &RunContext{
742 ctx: sleepCtx,
743 Host: ia.im.host,
744 ia: ia,
745 }
746 go func() {
747 defer cancel()
748
749 if err := lp.LongPoll(rc); err == nil {
750 log.Printf("importer: long poll for %s found an update. Starting run...", ia)
751 timer.Stop()
752 ia.start()
753 } else {
754 log.Printf("failed to long poll %s: %v", ia, err)
755 }
756 }()
757 }
758 return
759 }
760
761 log.Printf("importer: starting periodic import for %v", ia)
762 go ia.start()
763 }
764
765
766
767 func (h *Host) BaseURL() string {
768 return h.baseURL
769 }
770
771
772
773 func (h *Host) ImporterBaseURL() string {
774 return h.importerBase
775 }
776
777 func (h *Host) Target() blobserver.StatReceiver {
778 return h.target
779 }
780
781 func (h *Host) BlobSource() blob.Fetcher {
782 return h.blobSource
783 }
784
785 func (h *Host) Searcher() search.QueryDescriber { return h.search }
786
787
788 type importer struct {
789 host *Host
790 name string
791 impl Importer
792 props *Properties
793
794
795
796 clientID string
797 clientSecret string
798
799 nodemu sync.Mutex
800 nodeCache *Object
801
802 acctmu sync.Mutex
803 acct map[blob.Ref]*importerAcct
804 allLoaded bool
805 }
806
807 func (im *importer) Name() string { return im.name }
808
809 func (im *importer) Title() string {
810 if im.props.Title != "" {
811 return im.props.Title
812 }
813 return im.name
814 }
815
816 func (im *importer) TODOIssue() int {
817 return im.props.TODOIssue
818 }
819
820 func (im *importer) Description() string {
821 return im.props.Description
822 }
823
824
825
826
827
828 func (im *importer) ImporterType() string {
829 if im.props.PermanodeImporterType != "" {
830 return im.props.PermanodeImporterType
831 }
832 return im.name
833 }
834
835 func (im *importer) StaticConfig() bool { return im.clientSecret != "" }
836
837
838 func (im *importer) URL() string { return im.host.ImporterBaseURL() + im.name }
839
840 func (im *importer) ShowClientAuthEditForm() bool {
841 if im.StaticConfig() {
842
843
844 return false
845 }
846 return im.props.NeedsAPIKey
847 }
848
849 func (im *importer) InsecureForm() bool {
850 return !strings.HasPrefix(im.host.ImporterBaseURL(), "https://")
851 }
852
853 func (im *importer) CanAddNewAccount() bool {
854 if !im.props.NeedsAPIKey {
855 return true
856 }
857 id, sec, err := im.credentials()
858 return id != "" && sec != "" && err == nil
859 }
860
861 func (im *importer) ClientID() (v string, err error) {
862 v, _, err = im.credentials()
863 return
864 }
865
866 func (im *importer) ClientSecret() (v string, err error) {
867 _, v, err = im.credentials()
868 return
869 }
870
871 func (im *importer) Status() (status string, err error) {
872 if !im.props.NeedsAPIKey {
873 return "no configuration required", nil
874 }
875 if im.StaticConfig() {
876 return "API key configured on server", nil
877 }
878 n, err := im.Node()
879 if err != nil {
880 return
881 }
882 if n.Attr(attrClientID) != "" && n.Attr(attrClientSecret) != "" {
883 return "API key configured on node", nil
884 }
885 return "API key (client ID & Secret) not configured", nil
886 }
887
888 func (im *importer) credentials() (clientID, clientSecret string, err error) {
889 if im.StaticConfig() {
890 return im.clientID, im.clientSecret, nil
891 }
892 n, err := im.Node()
893 if err != nil {
894 return
895 }
896 return n.Attr(attrClientID), n.Attr(attrClientSecret), nil
897 }
898
899 func (im *importer) deleteAccount(acctRef blob.Ref) {
900 im.acctmu.Lock()
901 delete(im.acct, acctRef)
902 im.acctmu.Unlock()
903 }
904
905 func (im *importer) account(nodeRef blob.Ref) (*importerAcct, error) {
906 im.acctmu.Lock()
907 ia, ok := im.acct[nodeRef]
908 im.acctmu.Unlock()
909 if ok {
910 return ia, nil
911 }
912
913 acct, err := im.host.ObjectFromRef(nodeRef)
914 if err != nil {
915 return nil, err
916 }
917 if acct.Attr(attrNodeType) != nodeTypeImporterAccount {
918 return nil, errors.New("account has wrong node type")
919 }
920 if acct.Attr(attrImporterType) != im.ImporterType() {
921 return nil, errors.New("account has wrong importer type")
922 }
923 var root *Object
924 if v := acct.Attr(attrImportRoot); v != "" {
925 rootRef, ok := blob.Parse(v)
926 if !ok {
927 return nil, errors.New("invalid import root attribute")
928 }
929 root, err = im.host.ObjectFromRef(rootRef)
930 if err != nil {
931 return nil, err
932 }
933 } else {
934 root, err = im.host.NewObject()
935 if err != nil {
936 return nil, err
937 }
938 if err := acct.SetAttr(attrImportRoot, root.PermanodeRef().String()); err != nil {
939 return nil, err
940 }
941 }
942 ia = &importerAcct{
943 im: im,
944 acct: acct,
945 root: root,
946 }
947 im.acctmu.Lock()
948 defer im.acctmu.Unlock()
949 im.addAccountLocked(ia)
950 return ia, nil
951 }
952
953 func (im *importer) newAccount() (*importerAcct, error) {
954
955 acct, err := im.host.NewObject()
956 if err != nil {
957 return nil, err
958 }
959 root, err := im.host.NewObject()
960 if err != nil {
961 return nil, err
962 }
963 if err := acct.SetAttrs(
964 "title", fmt.Sprintf("%s account", im.name),
965 attrNodeType, nodeTypeImporterAccount,
966 attrImporterType, im.ImporterType(),
967 attrImportRoot, root.PermanodeRef().String(),
968 ); err != nil {
969 return nil, err
970 }
971
972 ia := &importerAcct{
973 im: im,
974 acct: acct,
975 root: root,
976 }
977 im.acctmu.Lock()
978 defer im.acctmu.Unlock()
979 im.addAccountLocked(ia)
980 return ia, nil
981 }
982
983 func (im *importer) addAccountLocked(ia *importerAcct) {
984 if im.acct == nil {
985 im.acct = make(map[blob.Ref]*importerAcct)
986 }
987 im.acct[ia.acct.PermanodeRef()] = ia
988 }
989
990 func (im *importer) Accounts() ([]*importerAcct, error) {
991 im.acctmu.Lock()
992 needQuery := !im.allLoaded
993 im.acctmu.Unlock()
994
995 if needQuery {
996 res, err := im.host.search.Query(context.TODO(), &search.SearchQuery{
997 Sort: search.Unsorted,
998 Expression: fmt.Sprintf("attr:%s:%s attr:%s:%s",
999 attrNodeType, nodeTypeImporterAccount,
1000 attrImporterType, im.ImporterType(),
1001 ),
1002 })
1003 if err != nil {
1004 return nil, err
1005 }
1006 for _, res := range res.Blobs {
1007 _, err := im.account(res.Blob)
1008 if err != nil {
1009 return nil, err
1010 }
1011 }
1012 }
1013
1014 im.acctmu.Lock()
1015 defer im.acctmu.Unlock()
1016 im.allLoaded = true
1017
1018 accts := make([]*importerAcct, 0, len(im.acct))
1019 for _, ia := range im.acct {
1020 accts = append(accts, ia)
1021 }
1022 sort.Sort(byImporterAccountPermanode(accts))
1023 return accts, nil
1024 }
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038 func (im *importer) Node() (*Object, error) {
1039 im.nodemu.Lock()
1040 defer im.nodemu.Unlock()
1041 if im.nodeCache != nil {
1042 return im.nodeCache, nil
1043 }
1044
1045 expr := fmt.Sprintf("attr:%s:%s attr:%s:%s",
1046 attrNodeType, nodeTypeImporter,
1047 attrImporterType, im.ImporterType(),
1048 )
1049 res, err := im.host.search.Query(context.TODO(), &search.SearchQuery{
1050 Sort: search.Unsorted,
1051 Limit: 10,
1052 Expression: expr,
1053 })
1054 if err != nil {
1055 return nil, err
1056 }
1057 best, err := im.bestNode(res)
1058 if err != nil {
1059 return nil, err
1060 }
1061 if best != nil {
1062 return best, nil
1063 }
1064
1065 o, err := im.host.NewObject()
1066 if err != nil {
1067 return nil, err
1068 }
1069 if err := o.SetAttrs(
1070 "title", fmt.Sprintf("%s importer", im.name),
1071 attrNodeType, nodeTypeImporter,
1072 attrImporterType, im.ImporterType(),
1073 ); err != nil {
1074 return nil, err
1075 }
1076
1077 im.nodeCache = o
1078 return o, nil
1079 }
1080
1081
1082
1083
1084
1085 func (im *importer) bestNode(res *search.SearchResult) (*Object, error) {
1086 if len(res.Blobs) == 0 {
1087
1088 return nil, nil
1089 }
1090 var best *Object
1091 for _, desb := range res.Blobs {
1092 o, err := im.host.ObjectFromRef(desb.Blob)
1093 if err != nil {
1094 return nil, err
1095 }
1096 if best == nil {
1097 best = o
1098 continue
1099 }
1100 if best.Attr(attrClientID) == "" && o.Attr(attrClientID) != "" {
1101 best = o
1102 continue
1103 }
1104 if best.Attr(attrClientSecret) == "" && o.Attr(attrClientSecret) != "" {
1105 best = o
1106 continue
1107 }
1108 if best.Attr("title") == "" && o.Attr("title") != "" {
1109 best = o
1110 continue
1111 }
1112
1113
1114 if best.modtime.Before(o.modtime) {
1115 best = o
1116 continue
1117 }
1118 }
1119 return best, nil
1120 }
1121
1122
1123 type importerAcct struct {
1124 im *importer
1125 acct *Object
1126 root *Object
1127
1128 mu sync.Mutex
1129 current *RunContext
1130 stopped bool
1131 lastRunErr error
1132 lastRunStart time.Time
1133 lastRunDone time.Time
1134 }
1135
1136 func (ia *importerAcct) String() string {
1137 return fmt.Sprintf("%v importer account, %v", ia.im.name, ia.acct.PermanodeRef())
1138 }
1139
1140 func (ia *importerAcct) delete() error {
1141 if err := ia.acct.SetAttrs(
1142 attrNodeType, nodeTypeImporterAccount+"-deleted",
1143 ); err != nil {
1144 return err
1145 }
1146 ia.im.deleteAccount(ia.acct.PermanodeRef())
1147 return nil
1148 }
1149
1150 func (ia *importerAcct) toggleAuto() error {
1151 old := ia.acct.Attr(attrImportAuto)
1152 if old == "" && !ia.im.props.SupportsIncremental {
1153 return fmt.Errorf("Importer %q doesn't support automatic mode", ia.im.name)
1154 }
1155 var new string
1156 if old == "" {
1157 new = "30m"
1158 }
1159 return ia.acct.SetAttrs(attrImportAuto, new)
1160 }
1161
1162 func (ia *importerAcct) IsAccountReady() (bool, error) {
1163 return ia.im.impl.IsAccountReady(ia.acct)
1164 }
1165
1166 func (ia *importerAcct) AccountObject() *Object { return ia.acct }
1167 func (ia *importerAcct) RootObject() *Object { return ia.root }
1168
1169 func (ia *importerAcct) AccountURL() string {
1170 return ia.im.URL() + "/" + ia.acct.PermanodeRef().String()
1171 }
1172
1173 func (ia *importerAcct) AccountLinkText() string {
1174 return ia.acct.PermanodeRef().String()
1175 }
1176
1177 func (ia *importerAcct) AccountLinkSummary() string {
1178 return ia.im.impl.SummarizeAccount(ia.acct)
1179 }
1180
1181 func (ia *importerAcct) RefreshInterval() time.Duration {
1182 ds := ia.acct.Attr(attrImportAuto)
1183 if ds == "" {
1184 return 0
1185 }
1186 d, _ := time.ParseDuration(ds)
1187 return d
1188 }
1189
1190 func (ia *importerAcct) ServeHTTP(w http.ResponseWriter, r *http.Request) {
1191 if r.Method == "POST" {
1192 ia.serveHTTPPost(w, r)
1193 return
1194 }
1195 ia.mu.Lock()
1196 defer ia.mu.Unlock()
1197 body := acctBody{
1198 Acct: ia,
1199 AcctType: fmt.Sprintf("%T", ia.im.impl),
1200 }
1201 if run := ia.current; run != nil {
1202 body.Running = true
1203 body.StartedAgo = time.Since(ia.lastRunStart)
1204 run.mu.Lock()
1205 body.LastStatus = fmt.Sprintf("%+v", run.lastProgress)
1206 run.mu.Unlock()
1207 } else if !ia.lastRunDone.IsZero() {
1208 body.LastAgo = time.Since(ia.lastRunDone)
1209 if ia.lastRunErr != nil {
1210 body.LastError = ia.lastRunErr.Error()
1211 }
1212 }
1213 title := fmt.Sprintf("%s account: ", ia.im.name)
1214 if summary := ia.im.impl.SummarizeAccount(ia.acct); summary != "" {
1215 title += summary
1216 } else {
1217 title += ia.acct.PermanodeRef().String()
1218 }
1219 ia.im.host.execTemplate(w, r, acctPage{
1220 Title: title,
1221 Body: body,
1222 })
1223 }
1224
1225 func (ia *importerAcct) serveHTTPPost(w http.ResponseWriter, r *http.Request) {
1226
1227
1228 switch r.FormValue("mode") {
1229 case "":
1230
1231 case "start":
1232 ia.start()
1233 case "stop":
1234 ia.stop()
1235 case "login":
1236 ia.setup(w, r)
1237 return
1238 case "toggleauto":
1239 if err := ia.toggleAuto(); err != nil {
1240 http.Error(w, err.Error(), 500)
1241 return
1242 }
1243 case "delete":
1244 ia.stop()
1245 if err := ia.delete(); err != nil {
1246 http.Error(w, err.Error(), 500)
1247 return
1248 }
1249 http.Redirect(w, r, ia.im.URL(), http.StatusFound)
1250 return
1251 default:
1252 http.Error(w, "Unknown mode", http.StatusBadRequest)
1253 return
1254 }
1255 http.Redirect(w, r, ia.AccountURL(), http.StatusFound)
1256 }
1257
1258 func (ia *importerAcct) setup(w http.ResponseWriter, r *http.Request) {
1259 if err := ia.im.impl.ServeSetup(w, r, &SetupContext{
1260 Context: context.TODO(),
1261 Host: ia.im.host,
1262 AccountNode: ia.acct,
1263 ia: ia,
1264 }); err != nil {
1265 log.Printf("%v", err)
1266 }
1267 }
1268
1269 func (ia *importerAcct) start() {
1270 ia.mu.Lock()
1271 defer ia.mu.Unlock()
1272 if ia.current != nil {
1273 return
1274 }
1275 rc := &RunContext{
1276 Host: ia.im.host,
1277 ia: ia,
1278 }
1279 rc.ctx, rc.cancel = context.WithCancel(context.WithValue(context.Background(), ctxutil.HTTPClient, ia.im.host.HTTPClient()))
1280 ia.current = rc
1281 ia.stopped = false
1282 ia.lastRunStart = time.Now()
1283 go func() {
1284 log.Printf("Starting %v: %s", ia, ia.AccountLinkSummary())
1285 err := ia.im.impl.Run(rc)
1286 if err != nil {
1287 log.Printf("%v error: %v", ia, err)
1288 } else {
1289 log.Printf("%v finished.", ia)
1290 }
1291 ia.mu.Lock()
1292 defer ia.mu.Unlock()
1293 ia.current = nil
1294 ia.stopped = false
1295 ia.lastRunDone = time.Now()
1296 ia.lastRunErr = err
1297 go ia.maybeStart()
1298 }()
1299 }
1300
1301
1302 func (ia *importerAcct) run() error {
1303 ia.mu.Lock()
1304 defer ia.mu.Unlock()
1305 if ia.current != nil {
1306 return errors.New("whatever")
1307 }
1308 rc := &RunContext{
1309 Host: ia.im.host,
1310 ia: ia,
1311 }
1312 rc.ctx, rc.cancel = context.WithCancel(context.WithValue(context.Background(), ctxutil.HTTPClient, ia.im.host.HTTPClient()))
1313 ia.current = rc
1314 ia.stopped = false
1315 ia.lastRunStart = time.Now()
1316 log.Printf("Starting %v: %s", ia, ia.AccountLinkSummary())
1317 err := ia.im.impl.Run(rc)
1318 if err != nil {
1319 return err
1320 }
1321 log.Printf("%v finished.", ia)
1322 ia.current = nil
1323 ia.stopped = false
1324 ia.lastRunDone = time.Now()
1325 return ia.lastRunErr
1326 }
1327
1328 func (ia *importerAcct) stop() {
1329 ia.mu.Lock()
1330 defer ia.mu.Unlock()
1331 if ia.current == nil || ia.stopped {
1332 return
1333 }
1334 ia.current.cancel()
1335 ia.stopped = true
1336 }
1337
1338
1339 func (h *Host) HTTPClient() *http.Client {
1340 if h.client == nil {
1341 return http.DefaultClient
1342 }
1343 return h.client
1344 }
1345
1346
1347 func (h *Host) HTTPTransport() http.RoundTripper {
1348 if h.transport == nil {
1349 return http.DefaultTransport
1350 }
1351 return h.transport
1352 }
1353
1354 type ProgressMessage struct {
1355 ItemsDone, ItemsTotal int
1356 BytesDone, BytesTotal int64
1357 }
1358
1359 func (h *Host) upload(ctx context.Context, bb *schema.Builder) (br blob.Ref, err error) {
1360 signed, err := bb.Sign(ctx, h.signer)
1361 if err != nil {
1362 return
1363 }
1364 sb, err := blobserver.ReceiveString(ctx, h.target, signed)
1365 if err != nil {
1366 return
1367 }
1368 return sb.Ref, nil
1369 }
1370
1371
1372 func (h *Host) NewObject() (*Object, error) {
1373 ctx := context.TODO()
1374 pn, err := h.upload(ctx, schema.NewUnsignedPermanode())
1375 if err != nil {
1376 return nil, err
1377 }
1378
1379
1380 return &Object{h: h, pn: pn}, nil
1381 }
1382
1383
1384
1385 type Object struct {
1386 h *Host
1387 pn blob.Ref
1388
1389 mu sync.RWMutex
1390 attr map[string][]string
1391 modtime time.Time
1392 }
1393
1394
1395 func (o *Object) PermanodeRef() blob.Ref {
1396 return o.pn
1397 }
1398
1399
1400
1401
1402 func (o *Object) Attr(attr string) string {
1403 o.mu.RLock()
1404 defer o.mu.RUnlock()
1405 if v := o.attr[attr]; len(v) > 0 {
1406 return v[0]
1407 }
1408 return ""
1409 }
1410
1411
1412 func (o *Object) Attrs(attr string) []string {
1413 o.mu.RLock()
1414 defer o.mu.RUnlock()
1415 return o.attr[attr]
1416 }
1417
1418
1419
1420
1421
1422 func (o *Object) ForeachAttr(fn func(key, value string)) {
1423 o.mu.RLock()
1424 defer o.mu.RUnlock()
1425 for k, vv := range o.attr {
1426 for _, v := range vv {
1427 fn(k, v)
1428 }
1429 }
1430 }
1431
1432
1433
1434 func (o *Object) DelAttr(key, value string) error {
1435 ctx := context.TODO()
1436 o.mu.Lock()
1437 defer o.mu.Unlock()
1438 _, err := o.h.upload(ctx, schema.NewDelAttributeClaim(o.pn, key, value))
1439 if err != nil {
1440 return err
1441 }
1442 if o.attr == nil {
1443 o.attr = make(map[string][]string)
1444 return nil
1445 }
1446 if value == "" {
1447 delete(o.attr, key)
1448 return nil
1449 }
1450 var values []string
1451 for _, v := range o.attr[key] {
1452 if v != value {
1453 values = append(values, v)
1454 }
1455 }
1456 o.attr[key] = values
1457 return nil
1458 }
1459
1460
1461 func (o *Object) SetAttr(key, value string) error {
1462 ctx := context.TODO()
1463 if o.Attr(key) == value {
1464 return nil
1465 }
1466 _, err := o.h.upload(ctx, schema.NewSetAttributeClaim(o.pn, key, value))
1467 if err != nil {
1468 return err
1469 }
1470 o.mu.Lock()
1471 defer o.mu.Unlock()
1472 if o.attr == nil {
1473 o.attr = make(map[string][]string)
1474 }
1475 o.attr[key] = []string{value}
1476 return nil
1477 }
1478
1479
1480
1481 func (o *Object) SetAttrs(keyval ...string) error {
1482 _, err := o.SetAttrs2(keyval...)
1483 return err
1484 }
1485
1486
1487
1488
1489 func (o *Object) SetAttrs2(keyval ...string) (changes bool, err error) {
1490 if len(keyval)%2 == 1 {
1491 panic("importer.SetAttrs: odd argument count")
1492 }
1493
1494 g := syncutil.Group{}
1495 for i := 0; i < len(keyval); i += 2 {
1496 key, val := keyval[i], keyval[i+1]
1497 if val != o.Attr(key) {
1498 changes = true
1499 g.Go(func() error {
1500 return o.SetAttr(key, val)
1501 })
1502 }
1503 }
1504 return changes, g.Err()
1505 }
1506
1507
1508 func (o *Object) SetAttrValues(key string, attrs []string) error {
1509 ctx := context.TODO()
1510
1511 exists := asSet(o.Attrs(key))
1512 actual := asSet(attrs)
1513 o.mu.Lock()
1514 defer o.mu.Unlock()
1515
1516 for v := range actual {
1517 if exists[v] {
1518 delete(exists, v)
1519 continue
1520 }
1521 _, err := o.h.upload(ctx, schema.NewAddAttributeClaim(o.pn, key, v))
1522 if err != nil {
1523 return err
1524 }
1525 }
1526
1527 for v := range exists {
1528 _, err := o.h.upload(ctx, schema.NewDelAttributeClaim(o.pn, key, v))
1529 if err != nil {
1530 return err
1531 }
1532 }
1533 if o.attr == nil {
1534 o.attr = make(map[string][]string)
1535 }
1536 o.attr[key] = attrs
1537 return nil
1538 }
1539
1540 func asSet(elts []string) map[string]bool {
1541 if len(elts) == 0 {
1542 return nil
1543 }
1544 set := make(map[string]bool, len(elts))
1545 for _, elt := range elts {
1546 set[elt] = true
1547 }
1548 return set
1549 }
1550
1551
1552
1553
1554 func (o *Object) ChildPathObject(path string) (*Object, error) {
1555 return o.ChildPathObjectOrFunc(path, o.h.NewObject)
1556 }
1557
1558
1559
1560
1561
1562
1563 func (o *Object) ChildPathObjectOrFunc(path string, fn func() (*Object, error)) (*Object, error) {
1564 attrName := "camliPath:" + path
1565 if v := o.Attr(attrName); v != "" {
1566 br, ok := blob.Parse(v)
1567 if !ok {
1568 return nil, fmt.Errorf("invalid blobref %q already stored at camliPath %q", br, path)
1569 }
1570 return o.h.ObjectFromRef(br)
1571 }
1572 newObj, err := fn()
1573 if err != nil {
1574 return nil, err
1575 }
1576 if err := o.SetAttr(attrName, newObj.PermanodeRef().String()); err != nil {
1577 return nil, err
1578 }
1579 return newObj, nil
1580 }
1581
1582
1583 func (h *Host) ObjectFromRef(permanodeRef blob.Ref) (*Object, error) {
1584 ctx := context.TODO()
1585 res, err := h.search.Describe(ctx, &search.DescribeRequest{
1586 BlobRef: permanodeRef,
1587 Depth: 1,
1588 })
1589 if err != nil {
1590 return nil, err
1591 }
1592 db, ok := res.Meta[permanodeRef.String()]
1593 if !ok {
1594 return nil, fmt.Errorf("permanode %v wasn't in Describe response", permanodeRef)
1595 }
1596 if db.Permanode == nil {
1597 return nil, fmt.Errorf("permanode %v had no DescribedPermanode in Describe response", permanodeRef)
1598 }
1599 return &Object{
1600 h: h,
1601 pn: permanodeRef,
1602 attr: map[string][]string(db.Permanode.Attr),
1603 modtime: db.Permanode.ModTime,
1604 }, nil
1605 }
1606
1607 type byImporterAccountPermanode []*importerAcct
1608
1609 func (s byImporterAccountPermanode) Len() int { return len(s) }
1610 func (s byImporterAccountPermanode) Less(i, j int) bool {
1611 return s[i].acct.PermanodeRef().Less(s[j].acct.PermanodeRef())
1612 }
1613 func (s byImporterAccountPermanode) Swap(i, j int) { s[i], s[j] = s[j], s[i] }