Home Download Docs Code Community
     1	/*
     2	Copyright 2013 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	// Package importer imports content from third-party websites.
    18	package importer // import "perkeep.org/pkg/importer"
    19	
    20	import (
    21		"context"
    22		"errors"
    23		"fmt"
    24		"html/template"
    25		"log"
    26		"net/http"
    27		"net/url"
    28		"os"
    29		"sort"
    30		"strconv"
    31		"strings"
    32		"sync"
    33		"time"
    34	
    35		"perkeep.org/internal/httputil"
    36		"perkeep.org/pkg/blob"
    37		"perkeep.org/pkg/blobserver"
    38		"perkeep.org/pkg/jsonsign/signhandler"
    39		"perkeep.org/pkg/schema"
    40		"perkeep.org/pkg/search"
    41		"perkeep.org/pkg/server"
    42		"perkeep.org/pkg/types/camtypes"
    43	
    44		"go4.org/ctxutil"
    45		"go4.org/jsonconfig"
    46		"go4.org/syncutil"
    47	)
    48	
    49	const (
    50		attrNodeType            = "camliNodeType"
    51		nodeTypeImporter        = "importer"
    52		nodeTypeImporterAccount = "importerAccount"
    53	
    54		attrImporterType = "importerType" // => "twitter", "foursquare", etc
    55		attrClientID     = "authClientID"
    56		attrClientSecret = "authClientSecret"
    57		attrImportRoot   = "importRoot"
    58		attrImportAuto   = "importAuto" // => time.Duration value ("30m") or "" for off
    59	)
    60	
    61	// An Importer imports from a third-party site.
    62	type Importer interface {
    63		// Run runs a full or incremental import.
    64		//
    65		// The importer should continually or periodically monitor the
    66		// RunContext.Context()'s Done channel to exit early if
    67		// requested. The return value should be ctx.Err() if the
    68		// importer exits for that reason.
    69		Run(*RunContext) error
    70	
    71		// Properties returns properties of this importer type.
    72		Properties() Properties
    73	
    74		// IsAccountReady reports whether the provided account node
    75		// is configured.
    76		IsAccountReady(acctNode *Object) (ok bool, err error)
    77		SummarizeAccount(acctNode *Object) string
    78	
    79		ServeSetup(w http.ResponseWriter, r *http.Request, ctx *SetupContext) error
    80		ServeCallback(w http.ResponseWriter, r *http.Request, ctx *SetupContext)
    81	
    82		// CallbackRequestAccount extracts the blobref of the importer account from
    83		// the callback URL parameters of r. For example, it will be encoded as:
    84		// For Twitter (OAuth1), in its own URL parameter: "acct=sha1-f2b0b7da718b97ce8c31591d8ed4645c777f3ef4"
    85		// For Picasa: (OAuth2), in the OAuth2 "state" parameter: "state=acct:sha1-97911b1a5887eb5862d1c81666ba839fc1363ea1"
    86		CallbackRequestAccount(r *http.Request) (acctRef blob.Ref, err error)
    87	
    88		// CallbackURLParameters uses the input importer account blobRef to build
    89		// and return the URL parameters, that will be appended to the callback URL.
    90		CallbackURLParameters(acctRef blob.Ref) url.Values
    91	}
    92	
    93	// Properties contains the properties of an importer type.
    94	type Properties struct {
    95		Title       string
    96		Description string
    97	
    98		// TODOIssue, if non-zero, marks the importer as invalid, but the UI
    99		// will link to a tracking bug for implementing it.
   100		TODOIssue int
   101	
   102		// NeedsAPIKey reports whether this importer requires an API key
   103		// (OAuth2 client_id & client_secret, or equivalent).
   104		// If the API only requires a username & password, or a flow to get
   105		// an auth token per-account without an overall API key, importers
   106		// can return false here.
   107		NeedsAPIKey bool
   108	
   109		// SupportsIncremental reports whether this importer has been optimized
   110		// to run efficiently in regular incremental runs. (e.g. every 5 minutes
   111		// or half hour). Eventually all importers might support this and we'll
   112		// make it required, in which case we might delete this option.
   113		// For now, some importers (e.g. Flickr) don't yet support this.
   114		SupportsIncremental bool
   115	
   116		// PermanodeImporterType optionally specifies the "importerType"
   117		// permanode attribute value that should be stored for
   118		// accounts of this type. By default, it is the string that it
   119		// was registered with. This should only be specified for
   120		// products that have been rebranded, and then this should be
   121		// the old branding, to not break people who have been
   122		// importing the account since before the rebranding.
   123		// For example, this is "foursquare" for "swarm", so "swarm" shows
   124		// in the UI and URLs, but it's "foursquare" in permanodes.
   125		PermanodeImporterType string
   126	}
   127	
   128	// LongPoller is optionally implemented by importers which can long
   129	// poll efficiently to wait for new content.
   130	// For example, Twitter uses this to subscribe to the user's stream.
   131	type LongPoller interface {
   132		Importer
   133	
   134		// LongPoll waits and returns nil when there's new content.
   135		// It does not fetch the content itself.
   136		// It returns a non-nil error if it failed to long poll.
   137		LongPoll(*RunContext) error
   138	}
   139	
   140	// TestDataMaker is an optional interface that may be implemented by Importers to
   141	// generate test data locally. The returned Roundtripper will be used as the
   142	// transport of the HTTPClient, in the RunContext that will be passed to Run
   143	// during tests and devcam server --makethings.
   144	// (See http://perkeep.org/issue/417).
   145	type TestDataMaker interface {
   146		MakeTestData() http.RoundTripper
   147		// SetTestAccount allows an importer to set some needed attributes on the importer
   148		// account node before a run is started.
   149		SetTestAccount(acctNode *Object) error
   150	}
   151	
   152	// ImporterSetupHTMLer is an optional interface that may be implemented by
   153	// Importers to return some HTML to be included on the importer setup page.
   154	type ImporterSetupHTMLer interface {
   155		AccountSetupHTML(*Host) string
   156	}
   157	
   158	var (
   159		importers           = map[string]Importer{}
   160		reservedImporterKey = map[string]bool{}
   161	)
   162	
   163	// All returns the map of importer implementation name to implementation. This
   164	// map should not be mutated.
   165	func All() map[string]Importer {
   166		return importers
   167	}
   168	
   169	// Register registers a site-specific importer. It should only be called from init,
   170	// and not from concurrent goroutines.
   171	func Register(name string, im Importer) {
   172		if _, dup := importers[name]; dup {
   173			panic("Dup registration of importer " + name)
   174		}
   175		if _, dup := reservedImporterKey[name]; dup {
   176			panic("Dup registration of importer " + name)
   177		}
   178		if pt := im.Properties().PermanodeImporterType; pt != "" {
   179			if _, dup := importers[pt]; dup {
   180				panic("Dup registration of importer " + pt)
   181			}
   182			if _, dup := reservedImporterKey[pt]; dup {
   183				panic("Dup registration of importer " + pt)
   184			}
   185			reservedImporterKey[pt] = true
   186		}
   187		importers[name] = im
   188	}
   189	
   190	func RegisterTODO(name string, p Properties) {
   191		Register(name, &todoImp{Props: p})
   192	}
   193	
   194	func init() {
   195		// Register the meta "importer" handler, which handles all other handlers.
   196		blobserver.RegisterHandlerConstructor("importer", newFromConfig)
   197	}
   198	
   199	// HostConfig holds the parameters to set up a Host.
   200	type HostConfig struct {
   201		BaseURL      string
   202		Prefix       string                  // URL prefix for the importer handler
   203		Target       blobserver.StatReceiver // storage for the imported object blobs
   204		BlobSource   blob.Fetcher            // for additional resources, such as twitter zip file
   205		Signer       *schema.Signer
   206		Search       search.QueryDescriber
   207		ClientId     map[string]string // optionally maps importer impl name to a clientId credential
   208		ClientSecret map[string]string // optionally maps importer impl name to a clientSecret credential
   209	
   210		// HTTPClient optionally specifies how to fetch external network
   211		// resources. The Host will use http.DefaultClient otherwise.
   212		HTTPClient *http.Client
   213		// TODO: add more if/when needed
   214	}
   215	
   216	func NewHost(hc HostConfig) (*Host, error) {
   217		h := &Host{
   218			baseURL:      hc.BaseURL,
   219			importerBase: hc.BaseURL + hc.Prefix,
   220			imp:          make(map[string]*importer),
   221		}
   222		var err error
   223		h.tmpl, err = tmpl.Clone()
   224		if err != nil {
   225			return nil, err
   226		}
   227		h.tmpl = h.tmpl.Funcs(map[string]interface{}{
   228			"bloblink": func(br blob.Ref) template.HTML {
   229				if h.uiPrefix == "" {
   230					return template.HTML(br.String())
   231				}
   232				return template.HTML(fmt.Sprintf("<a href=\"%s/%s\">%s</a>", h.uiPrefix, br, br))
   233			},
   234		})
   235		for k, impl := range importers {
   236			h.importers = append(h.importers, k)
   237			clientId, clientSecret := hc.ClientId[k], hc.ClientSecret[k]
   238			if clientSecret != "" && clientId == "" {
   239				return nil, fmt.Errorf("Invalid static configuration for importer %q: clientSecret specified without clientId", k)
   240			}
   241			props := impl.Properties()
   242			imp := &importer{
   243				host:         h,
   244				name:         k,
   245				impl:         impl,
   246				props:        &props,
   247				clientID:     clientId,
   248				clientSecret: clientSecret,
   249			}
   250			h.imp[k] = imp
   251		}
   252	
   253		sort.Strings(h.importers)
   254	
   255		h.target = hc.Target
   256		h.blobSource = hc.BlobSource
   257		h.signer = hc.Signer
   258		h.search = hc.Search
   259		h.client = hc.HTTPClient
   260	
   261		return h, nil
   262	}
   263	
   264	func newFromConfig(ld blobserver.Loader, cfg jsonconfig.Obj) (http.Handler, error) {
   265		hc := HostConfig{
   266			BaseURL: ld.BaseURL(),
   267			Prefix:  ld.MyPrefix(),
   268		}
   269		ClientId := make(map[string]string)
   270		ClientSecret := make(map[string]string)
   271		for k := range importers {
   272			var clientId, clientSecret string
   273			if impConf := cfg.OptionalObject(k); impConf != nil {
   274				clientId = impConf.OptionalString("clientID", "")
   275				clientSecret = impConf.OptionalString("clientSecret", "")
   276				// Special case: allow clientSecret to be of form "clientId:clientSecret"
   277				// if the clientId is empty.
   278				if clientId == "" && strings.Contains(clientSecret, ":") {
   279					if f := strings.SplitN(clientSecret, ":", 2); len(f) == 2 {
   280						clientId, clientSecret = f[0], f[1]
   281					}
   282				}
   283				if err := impConf.Validate(); err != nil {
   284					return nil, fmt.Errorf("Invalid static configuration for importer %q: %v", k, err)
   285				}
   286				ClientId[k] = clientId
   287				ClientSecret[k] = clientSecret
   288			}
   289		}
   290		if err := cfg.Validate(); err != nil {
   291			return nil, err
   292		}
   293		hc.ClientId = ClientId
   294		hc.ClientSecret = ClientSecret
   295		host, err := NewHost(hc)
   296		if err != nil {
   297			return nil, err
   298		}
   299		host.didInit.Add(1)
   300		return host, nil
   301	}
   302	
   303	var _ blobserver.HandlerIniter = (*Host)(nil)
   304	
   305	type SetupContext struct {
   306		context.Context
   307		Host        *Host
   308		AccountNode *Object
   309	
   310		ia *importerAcct
   311	}
   312	
   313	func (sc *SetupContext) Credentials() (clientID, clientSecret string, err error) {
   314		return sc.ia.im.credentials()
   315	}
   316	
   317	func (sc *SetupContext) CallbackURL() string {
   318		params := sc.ia.im.impl.CallbackURLParameters(sc.AccountNode.PermanodeRef()).Encode()
   319		if params != "" {
   320			params = "?" + params
   321		}
   322		return sc.Host.ImporterBaseURL() + sc.ia.im.name + "/callback" + params
   323	}
   324	
   325	// AccountURL returns the URL to an account of an importer
   326	// (http://host/importer/TYPE/sha1-sd8fsd7f8sdf7).
   327	func (sc *SetupContext) AccountURL() string {
   328		return sc.Host.ImporterBaseURL() + sc.ia.im.name + "/" + sc.AccountNode.PermanodeRef().String()
   329	}
   330	
   331	// RunContext is the context provided for a given Run of an importer, importing
   332	// a certain account on a certain importer.
   333	type RunContext struct {
   334		ctx    context.Context
   335		cancel context.CancelFunc // for when we stop/pause the importing
   336		Host   *Host
   337	
   338		ia *importerAcct
   339	
   340		mu           sync.Mutex // guards following
   341		lastProgress *ProgressMessage
   342	}
   343	
   344	// Context returns the run's context. It is always non-nil.
   345	func (rc *RunContext) Context() context.Context {
   346		if rc.ctx != nil {
   347			return rc.ctx
   348		}
   349		return context.Background()
   350	}
   351	
   352	// CreateAccount creates a new importer account for the Host h, and the importer
   353	// implementation named impl. It returns a RunContext setup with that account.
   354	func CreateAccount(h *Host, impl string) (*RunContext, error) {
   355		imp, ok := h.imp[impl]
   356		if !ok {
   357			return nil, fmt.Errorf("host does not have a %v importer", impl)
   358		}
   359		ia, err := imp.newAccount()
   360		if err != nil {
   361			return nil, fmt.Errorf("could not create new account for importer %v: %v", impl, err)
   362		}
   363		rc := &RunContext{
   364			Host: ia.im.host,
   365			ia:   ia,
   366		}
   367		rc.ctx, rc.cancel = context.WithCancel(context.WithValue(context.Background(), ctxutil.HTTPClient, ia.im.host.HTTPClient()))
   368		return rc, nil
   369	
   370	}
   371	
   372	// Credentials returns the credentials for the importer. This is
   373	// typically the OAuth1, OAuth2, or equivalent client ID (api token)
   374	// and client secret (api secret).
   375	func (rc *RunContext) Credentials() (clientID, clientSecret string, err error) {
   376		return rc.ia.im.credentials()
   377	}
   378	
   379	// AccountNode returns the permanode storing account information for this permanode.
   380	// It will contain the attributes:
   381	//   - camliNodeType = "importerAccount"
   382	//   - importerType = "registered-type"
   383	//
   384	// You must not change the camliNodeType or importerType.
   385	//
   386	// You should use this permanode to store state about where your
   387	// importer left off, if it can efficiently resume later (without
   388	// missing anything).
   389	func (rc *RunContext) AccountNode() *Object { return rc.ia.acct }
   390	
   391	// RootNode returns the initially-empty permanode storing the root
   392	// of this account's data. You can change anything at will. This will
   393	// typically be modeled as a dynamic directory (with camliPath:xxxx
   394	// attributes), where each path element is either a file, object, or
   395	// another dynamic directory.
   396	func (rc *RunContext) RootNode() *Object { return rc.ia.root }
   397	
   398	// Host is the HTTP handler and state for managing all the importers
   399	// linked into the binary, even if they're not configured.
   400	type Host struct {
   401		tmpl         *template.Template
   402		importers    []string // sorted; e.g. dummy flickr foursquare picasa twitter
   403		imp          map[string]*importer
   404		baseURL      string
   405		importerBase string
   406		target       blobserver.StatReceiver
   407		blobSource   blob.Fetcher // e.g. twitter reading zip file
   408		search       search.QueryDescriber
   409		signer       *schema.Signer
   410		uiPrefix     string // or empty if no UI handler
   411	
   412		// didInit is incremented by newFromConfig and marked done
   413		// after InitHandler. Any method on Host that requires Init
   414		// then calls didInit.Wait to guard against initialization
   415		// races where serverinit calls InitHandler in a random
   416		// order on start-up and different handlers access the
   417		// not-yet-initialized Host (notably from a goroutine)
   418		didInit sync.WaitGroup
   419	
   420		// HTTPClient optionally specifies how to fetch external network
   421		// resources. Defaults to http.DefaultClient.
   422		client    *http.Client
   423		transport http.RoundTripper
   424	}
   425	
   426	// accountStatus is the JSON representation of the status of a configured importer account.
   427	type accountStatus struct {
   428		Name string `json:"name"` // display name
   429		Type string `json:"type"`
   430		Href string `json:"href"`
   431	
   432		StartedUnixSec      int64  `json:"startedUnixSec"`  // zero if not running
   433		LastFinishedUnixSec int64  `json:"finishedUnixSec"` // zero if no previous run
   434		LastError           string `json:"lastRunError"`    // empty if last run was success
   435	}
   436	
   437	// AccountsStatus returns the currently configured accounts and their status for
   438	// inclusion in the status.json document, as rendered by the web UI.
   439	func (h *Host) AccountsStatus() (interface{}, []camtypes.StatusError) {
   440		h.didInit.Wait()
   441		var s []accountStatus
   442		var errs []camtypes.StatusError
   443		for _, impName := range h.importers {
   444			imp := h.imp[impName]
   445			accts, _ := imp.Accounts()
   446			for _, ia := range accts {
   447				as := accountStatus{
   448					Type: impName,
   449					Href: ia.AccountURL(),
   450					Name: ia.AccountLinkSummary(),
   451				}
   452				ia.mu.Lock()
   453				if ia.current != nil {
   454					as.StartedUnixSec = ia.lastRunStart.Unix()
   455				}
   456				if !ia.lastRunDone.IsZero() {
   457					as.LastFinishedUnixSec = ia.lastRunDone.Unix()
   458				}
   459				if ia.lastRunErr != nil {
   460					as.LastError = ia.lastRunErr.Error()
   461					errs = append(errs, camtypes.StatusError{
   462						Error: ia.lastRunErr.Error(),
   463						URL:   ia.AccountURL(),
   464					})
   465				}
   466				ia.mu.Unlock()
   467				s = append(s, as)
   468			}
   469		}
   470		return s, errs
   471	}
   472	
   473	// RunImporterAccount runs the importerType importer on the account described in
   474	// accountNode.
   475	func (h *Host) RunImporterAccount(importerType string, accountNode blob.Ref) error {
   476		h.didInit.Wait()
   477		imp, ok := h.imp[importerType]
   478		if !ok {
   479			return fmt.Errorf("no %q importer for this account", importerType)
   480		}
   481		accounts, err := imp.Accounts()
   482		if err != nil {
   483			return err
   484		}
   485		for _, ia := range accounts {
   486			if ia.acct.pn != accountNode {
   487				continue
   488			}
   489			return ia.run()
   490		}
   491		return fmt.Errorf("no %v account matching account in node %v", importerType, accountNode)
   492	}
   493	
   494	func (h *Host) InitHandler(hl blobserver.FindHandlerByTyper) error {
   495		if prefix, _, err := hl.FindHandlerByType("ui"); err == nil {
   496			h.uiPrefix = prefix
   497		}
   498	
   499		_, handler, err := hl.FindHandlerByType("root")
   500		if err != nil || handler == nil {
   501			return errors.New("importer requires a 'root' handler")
   502		}
   503		rh := handler.(*server.RootHandler)
   504		searchHandler, ok := rh.SearchHandler()
   505		if !ok {
   506			return errors.New("importer requires a 'root' handler with 'searchRoot' defined")
   507		}
   508		h.search = searchHandler
   509		if rh.Storage == nil {
   510			return errors.New("importer requires a 'root' handler with 'blobRoot' defined")
   511		}
   512		h.target = rh.Storage
   513		h.blobSource = rh.Storage
   514	
   515		_, handler, _ = hl.FindHandlerByType("jsonsign")
   516		if sigh, ok := handler.(*signhandler.Handler); ok {
   517			h.signer = sigh.Signer()
   518		}
   519		if h.signer == nil {
   520			return errors.New("importer requires a 'jsonsign' handler")
   521		}
   522		h.didInit.Done()
   523		go h.startPeriodicImporters()
   524		return nil
   525	}
   526	
   527	// ServeHTTP serves:
   528	//
   529	//	http://host/importer/
   530	//	http://host/importer/twitter/
   531	//	http://host/importer/twitter/callback
   532	//	http://host/importer/twitter/sha1-abcabcabcabcabc (single account)
   533	func (h *Host) ServeHTTP(w http.ResponseWriter, r *http.Request) {
   534		suffix := httputil.PathSuffix(r)
   535		seg := strings.Split(suffix, "/")
   536		if suffix == "" || len(seg) == 0 {
   537			h.serveImportersRoot(w, r)
   538			return
   539		}
   540		impName := seg[0]
   541	
   542		imp, ok := h.imp[impName]
   543		if !ok {
   544			http.NotFound(w, r)
   545			return
   546		}
   547	
   548		if len(seg) == 1 || seg[1] == "" {
   549			h.serveImporter(w, r, imp)
   550			return
   551		}
   552		if seg[1] == "callback" {
   553			h.serveImporterAcctCallback(w, r, imp)
   554			return
   555		}
   556		acctRef, ok := blob.Parse(seg[1])
   557		if !ok {
   558			http.NotFound(w, r)
   559			return
   560		}
   561		h.serveImporterAccount(w, r, imp, acctRef)
   562	}
   563	
   564	// Serves list of importers at http://host/importer/
   565	func (h *Host) serveImportersRoot(w http.ResponseWriter, r *http.Request) {
   566		body := importersRootBody{
   567			Host:      h,
   568			Importers: make([]*importer, 0, len(h.imp)),
   569		}
   570		for _, v := range h.importers {
   571			body.Importers = append(body.Importers, h.imp[v])
   572		}
   573		sort.Slice(body.Importers, func(i, j int) bool {
   574			return body.Importers[i].Title() < body.Importers[j].Title()
   575		})
   576		h.execTemplate(w, r, importersRootPage{
   577			Title: "Importers",
   578			Body:  body,
   579		})
   580	}
   581	
   582	// Serves list of accounts at http://host/importer/twitter
   583	func (h *Host) serveImporter(w http.ResponseWriter, r *http.Request, imp *importer) {
   584		if r.Method == "POST" {
   585			h.serveImporterPost(w, r, imp)
   586			return
   587		}
   588	
   589		var setup string
   590		node, _ := imp.Node()
   591		if setuper, ok := imp.impl.(ImporterSetupHTMLer); ok && node != nil {
   592			setup = setuper.AccountSetupHTML(h)
   593		}
   594	
   595		h.execTemplate(w, r, importerPage{
   596			Title: "Importer - " + imp.Name(),
   597			Body: importerBody{
   598				Host:      h,
   599				Importer:  imp,
   600				SetupHelp: template.HTML(setup),
   601			},
   602		})
   603	}
   604	
   605	// Serves oauth callback at http://host/importer/TYPE/callback
   606	func (h *Host) serveImporterAcctCallback(w http.ResponseWriter, r *http.Request, imp *importer) {
   607		if r.Method != "GET" {
   608			http.Error(w, "invalid method", http.StatusBadRequest)
   609			return
   610		}
   611		acctRef, err := imp.impl.CallbackRequestAccount(r)
   612		if err != nil {
   613			httputil.ServeError(w, r, err)
   614			return
   615		}
   616		if !acctRef.Valid() {
   617			httputil.ServeError(w, r, errors.New("No valid blobref returned from CallbackRequestAccount(r)"))
   618			return
   619		}
   620		ia, err := imp.account(acctRef)
   621		if err != nil {
   622			http.Error(w, "invalid 'acct' param: "+err.Error(), http.StatusBadRequest)
   623			return
   624		}
   625		imp.impl.ServeCallback(w, r, &SetupContext{
   626			Context:     context.TODO(),
   627			Host:        h,
   628			AccountNode: ia.acct,
   629			ia:          ia,
   630		})
   631	}
   632	
   633	func (h *Host) serveImporterPost(w http.ResponseWriter, r *http.Request, imp *importer) {
   634		switch r.FormValue("mode") {
   635		default:
   636			http.Error(w, "Unknown mode.", http.StatusBadRequest)
   637		case "newacct":
   638			ia, err := imp.newAccount()
   639			if err != nil {
   640				http.Error(w, err.Error(), 500)
   641				return
   642			}
   643			ia.setup(w, r)
   644			return
   645		case "saveclientidsecret":
   646			n, err := imp.Node()
   647			if err != nil {
   648				http.Error(w, "Error getting node: "+err.Error(), 500)
   649				return
   650			}
   651			if err := n.SetAttrs(
   652				attrClientID, r.FormValue("clientID"),
   653				attrClientSecret, r.FormValue("clientSecret"),
   654			); err != nil {
   655				http.Error(w, "Error saving node: "+err.Error(), 500)
   656				return
   657			}
   658			http.Redirect(w, r, h.ImporterBaseURL()+imp.name, http.StatusFound)
   659		}
   660	}
   661	
   662	// Serves details of accounts at http://host/importer/twitter/sha1-23098429382934
   663	func (h *Host) serveImporterAccount(w http.ResponseWriter, r *http.Request, imp *importer, acctRef blob.Ref) {
   664		ia, err := imp.account(acctRef)
   665		if err != nil {
   666			http.Error(w, "Unknown or invalid importer account "+acctRef.String()+": "+err.Error(), http.StatusBadRequest)
   667			return
   668		}
   669		ia.ServeHTTP(w, r)
   670	}
   671	
   672	func (h *Host) startPeriodicImporters() {
   673		res, err := h.search.Query(context.TODO(), &search.SearchQuery{
   674			Sort:       search.Unsorted,
   675			Expression: "attr:camliNodeType:importerAccount",
   676			Describe: &search.DescribeRequest{
   677				Depth: 1,
   678			},
   679		})
   680		if err != nil {
   681			log.Printf("periodic importer search fail: %v", err)
   682			return
   683		}
   684		if res.Describe == nil {
   685			log.Printf("No describe response in search result")
   686			return
   687		}
   688		for _, resBlob := range res.Blobs {
   689			blob := resBlob.Blob
   690			desBlob, ok := res.Describe.Meta[blob.String()]
   691			if !ok || desBlob.Permanode == nil {
   692				continue
   693			}
   694			attrs := desBlob.Permanode.Attr
   695			if attrs.Get(attrNodeType) != nodeTypeImporterAccount {
   696				panic("Search result returned non-importerAccount")
   697			}
   698			impType := attrs.Get("importerType")
   699			imp, ok := h.imp[impType]
   700			if !ok {
   701				continue
   702			}
   703			ia, err := imp.account(blob)
   704			if err != nil {
   705				log.Printf("Can't load importer account %v for regular importing: %v", blob, err)
   706				continue
   707			}
   708			go ia.maybeStart()
   709		}
   710	}
   711	
   712	var disableImporters, _ = strconv.ParseBool(os.Getenv("CAMLI_DISABLE_IMPORTERS"))
   713	
   714	func (ia *importerAcct) maybeStart() {
   715		if disableImporters {
   716			log.Printf("Importers disabled, per environment.")
   717			return
   718		}
   719		acctObj, err := ia.im.host.ObjectFromRef(ia.acct.PermanodeRef())
   720		if err != nil {
   721			log.Printf("Error maybe starting %v: %v", ia.acct.PermanodeRef(), err)
   722			return
   723		}
   724		duration, err := time.ParseDuration(acctObj.Attr(attrImportAuto))
   725		if duration == 0 || err != nil {
   726			return
   727		}
   728		ia.mu.Lock()
   729		defer ia.mu.Unlock()
   730		if ia.current != nil {
   731			return
   732		}
   733		if ia.lastRunDone.After(time.Now().Add(-duration)) {
   734			// Kick off long poller wait if supported.
   735			if lp, ok := ia.im.impl.(LongPoller); ok {
   736				sleepFor := time.Until(ia.lastRunDone.Add(duration))
   737				sleepCtx, cancel := context.WithTimeout(context.Background(), sleepFor)
   738				log.Printf("%v ran recently enough. Sleeping for %v.", ia, sleepFor)
   739				timer := time.AfterFunc(sleepFor, ia.maybeStart)
   740	
   741				rc := &RunContext{
   742					ctx:  sleepCtx,
   743					Host: ia.im.host,
   744					ia:   ia,
   745				}
   746				go func() {
   747					defer cancel()
   748	
   749					if err := lp.LongPoll(rc); err == nil {
   750						log.Printf("importer: long poll for %s found an update. Starting run...", ia)
   751						timer.Stop()
   752						ia.start()
   753					} else {
   754						log.Printf("failed to long poll %s: %v", ia, err)
   755					}
   756				}()
   757			}
   758			return
   759		}
   760	
   761		log.Printf("importer: starting periodic import for %v", ia)
   762		go ia.start()
   763	}
   764	
   765	// BaseURL returns the root of the whole server, without trailing
   766	// slash.
   767	func (h *Host) BaseURL() string {
   768		return h.baseURL
   769	}
   770	
   771	// ImporterBaseURL returns the URL base of the importer handler,
   772	// including trailing slash.
   773	func (h *Host) ImporterBaseURL() string {
   774		return h.importerBase
   775	}
   776	
   777	func (h *Host) Target() blobserver.StatReceiver {
   778		return h.target
   779	}
   780	
   781	func (h *Host) BlobSource() blob.Fetcher {
   782		return h.blobSource
   783	}
   784	
   785	func (h *Host) Searcher() search.QueryDescriber { return h.search }
   786	
   787	// importer is an importer for a certain site, but not a specific account on that site.
   788	type importer struct {
   789		host  *Host
   790		name  string // importer name e.g. "twitter"
   791		impl  Importer
   792		props *Properties // impl.Properties; pointer so we crash on & find uninitialized callers
   793	
   794		// If statically configured in config file, else
   795		// they come from the importer node's attributes.
   796		clientID     string
   797		clientSecret string
   798	
   799		nodemu    sync.Mutex // guards nodeCache
   800		nodeCache *Object    // or nil if unset
   801	
   802		acctmu    sync.Mutex
   803		acct      map[blob.Ref]*importerAcct // key: account permanode
   804		allLoaded bool
   805	}
   806	
   807	func (im *importer) Name() string { return im.name }
   808	
   809	func (im *importer) Title() string {
   810		if im.props.Title != "" {
   811			return im.props.Title
   812		}
   813		return im.name
   814	}
   815	
   816	func (im *importer) TODOIssue() int {
   817		return im.props.TODOIssue
   818	}
   819	
   820	func (im *importer) Description() string {
   821		return im.props.Description
   822	}
   823	
   824	// ImporterType returns the account permanode's attrImporterType
   825	// value. This is almost always the same as the Name, except in cases
   826	// where a product gets rebranded. (e.g. "foursquare" to "swarm", in
   827	// which case the importer type remains the old branding)
   828	func (im *importer) ImporterType() string {
   829		if im.props.PermanodeImporterType != "" {
   830			return im.props.PermanodeImporterType
   831		}
   832		return im.name
   833	}
   834	
   835	func (im *importer) StaticConfig() bool { return im.clientSecret != "" }
   836	
   837	// URL returns the importer's URL without trailing slash.
   838	func (im *importer) URL() string { return im.host.ImporterBaseURL() + im.name }
   839	
   840	func (im *importer) ShowClientAuthEditForm() bool {
   841		if im.StaticConfig() {
   842			// Don't expose the server's statically-configured client secret
   843			// to the user. (e.g. a hosted multi-user configuration)
   844			return false
   845		}
   846		return im.props.NeedsAPIKey
   847	}
   848	
   849	func (im *importer) InsecureForm() bool {
   850		return !strings.HasPrefix(im.host.ImporterBaseURL(), "https://")
   851	}
   852	
   853	func (im *importer) CanAddNewAccount() bool {
   854		if !im.props.NeedsAPIKey {
   855			return true
   856		}
   857		id, sec, err := im.credentials()
   858		return id != "" && sec != "" && err == nil
   859	}
   860	
   861	func (im *importer) ClientID() (v string, err error) {
   862		v, _, err = im.credentials()
   863		return
   864	}
   865	
   866	func (im *importer) ClientSecret() (v string, err error) {
   867		_, v, err = im.credentials()
   868		return
   869	}
   870	
   871	func (im *importer) Status() (status string, err error) {
   872		if !im.props.NeedsAPIKey {
   873			return "no configuration required", nil
   874		}
   875		if im.StaticConfig() {
   876			return "API key configured on server", nil
   877		}
   878		n, err := im.Node()
   879		if err != nil {
   880			return
   881		}
   882		if n.Attr(attrClientID) != "" && n.Attr(attrClientSecret) != "" {
   883			return "API key configured on node", nil
   884		}
   885		return "API key (client ID & Secret) not configured", nil
   886	}
   887	
   888	func (im *importer) credentials() (clientID, clientSecret string, err error) {
   889		if im.StaticConfig() {
   890			return im.clientID, im.clientSecret, nil
   891		}
   892		n, err := im.Node()
   893		if err != nil {
   894			return
   895		}
   896		return n.Attr(attrClientID), n.Attr(attrClientSecret), nil
   897	}
   898	
   899	func (im *importer) deleteAccount(acctRef blob.Ref) {
   900		im.acctmu.Lock()
   901		delete(im.acct, acctRef)
   902		im.acctmu.Unlock()
   903	}
   904	
   905	func (im *importer) account(nodeRef blob.Ref) (*importerAcct, error) {
   906		im.acctmu.Lock()
   907		ia, ok := im.acct[nodeRef]
   908		im.acctmu.Unlock()
   909		if ok {
   910			return ia, nil
   911		}
   912	
   913		acct, err := im.host.ObjectFromRef(nodeRef)
   914		if err != nil {
   915			return nil, err
   916		}
   917		if acct.Attr(attrNodeType) != nodeTypeImporterAccount {
   918			return nil, errors.New("account has wrong node type")
   919		}
   920		if acct.Attr(attrImporterType) != im.ImporterType() {
   921			return nil, errors.New("account has wrong importer type")
   922		}
   923		var root *Object
   924		if v := acct.Attr(attrImportRoot); v != "" {
   925			rootRef, ok := blob.Parse(v)
   926			if !ok {
   927				return nil, errors.New("invalid import root attribute")
   928			}
   929			root, err = im.host.ObjectFromRef(rootRef)
   930			if err != nil {
   931				return nil, err
   932			}
   933		} else {
   934			root, err = im.host.NewObject()
   935			if err != nil {
   936				return nil, err
   937			}
   938			if err := acct.SetAttr(attrImportRoot, root.PermanodeRef().String()); err != nil {
   939				return nil, err
   940			}
   941		}
   942		ia = &importerAcct{
   943			im:   im,
   944			acct: acct,
   945			root: root,
   946		}
   947		im.acctmu.Lock()
   948		defer im.acctmu.Unlock()
   949		im.addAccountLocked(ia)
   950		return ia, nil
   951	}
   952	
   953	func (im *importer) newAccount() (*importerAcct, error) {
   954	
   955		acct, err := im.host.NewObject()
   956		if err != nil {
   957			return nil, err
   958		}
   959		root, err := im.host.NewObject()
   960		if err != nil {
   961			return nil, err
   962		}
   963		if err := acct.SetAttrs(
   964			"title", fmt.Sprintf("%s account", im.name),
   965			attrNodeType, nodeTypeImporterAccount,
   966			attrImporterType, im.ImporterType(),
   967			attrImportRoot, root.PermanodeRef().String(),
   968		); err != nil {
   969			return nil, err
   970		}
   971	
   972		ia := &importerAcct{
   973			im:   im,
   974			acct: acct,
   975			root: root,
   976		}
   977		im.acctmu.Lock()
   978		defer im.acctmu.Unlock()
   979		im.addAccountLocked(ia)
   980		return ia, nil
   981	}
   982	
   983	func (im *importer) addAccountLocked(ia *importerAcct) {
   984		if im.acct == nil {
   985			im.acct = make(map[blob.Ref]*importerAcct)
   986		}
   987		im.acct[ia.acct.PermanodeRef()] = ia
   988	}
   989	
   990	func (im *importer) Accounts() ([]*importerAcct, error) {
   991		im.acctmu.Lock()
   992		needQuery := !im.allLoaded
   993		im.acctmu.Unlock()
   994	
   995		if needQuery {
   996			res, err := im.host.search.Query(context.TODO(), &search.SearchQuery{
   997				Sort: search.Unsorted,
   998				Expression: fmt.Sprintf("attr:%s:%s attr:%s:%s",
   999					attrNodeType, nodeTypeImporterAccount,
  1000					attrImporterType, im.ImporterType(),
  1001				),
  1002			})
  1003			if err != nil {
  1004				return nil, err
  1005			}
  1006			for _, res := range res.Blobs {
  1007				_, err := im.account(res.Blob) // caches account
  1008				if err != nil {
  1009					return nil, err
  1010				}
  1011			}
  1012		}
  1013	
  1014		im.acctmu.Lock()
  1015		defer im.acctmu.Unlock()
  1016		im.allLoaded = true
  1017	
  1018		accts := make([]*importerAcct, 0, len(im.acct))
  1019		for _, ia := range im.acct {
  1020			accts = append(accts, ia)
  1021		}
  1022		sort.Sort(byImporterAccountPermanode(accts))
  1023		return accts, nil
  1024	}
  1025	
  1026	// node returns the importer node. (not specific to a certain account
  1027	// on that importer site)
  1028	//
  1029	// It is a permanode with:
  1030	//
  1031	//	camliNodeType: "importer"
  1032	//	importerType: "twitter"
  1033	//
  1034	// And optionally:
  1035	//
  1036	//	authClientID:     "xxx"    // e.g. api token
  1037	//	authClientSecret: "sdkojfsldfjlsdkf"
  1038	func (im *importer) Node() (*Object, error) {
  1039		im.nodemu.Lock()
  1040		defer im.nodemu.Unlock()
  1041		if im.nodeCache != nil {
  1042			return im.nodeCache, nil
  1043		}
  1044	
  1045		expr := fmt.Sprintf("attr:%s:%s attr:%s:%s",
  1046			attrNodeType, nodeTypeImporter,
  1047			attrImporterType, im.ImporterType(),
  1048		)
  1049		res, err := im.host.search.Query(context.TODO(), &search.SearchQuery{
  1050			Sort:       search.Unsorted,
  1051			Limit:      10, // might be more than one because of multiple blob hash types
  1052			Expression: expr,
  1053		})
  1054		if err != nil {
  1055			return nil, err
  1056		}
  1057		best, err := im.bestNode(res)
  1058		if err != nil {
  1059			return nil, err
  1060		}
  1061		if best != nil {
  1062			return best, nil
  1063		}
  1064	
  1065		o, err := im.host.NewObject()
  1066		if err != nil {
  1067			return nil, err
  1068		}
  1069		if err := o.SetAttrs(
  1070			"title", fmt.Sprintf("%s importer", im.name),
  1071			attrNodeType, nodeTypeImporter,
  1072			attrImporterType, im.ImporterType(),
  1073		); err != nil {
  1074			return nil, err
  1075		}
  1076	
  1077		im.nodeCache = o
  1078		return o, nil
  1079	}
  1080	
  1081	// bestNode picks the most populated importer node amongst res.Blobs, or nil if
  1082	// len(res.Blobs) == 0
  1083	// It is the caller's responsibility to create the correct request so that
  1084	// res.Blobs are all importer nodes.
  1085	func (im *importer) bestNode(res *search.SearchResult) (*Object, error) {
  1086		if len(res.Blobs) == 0 {
  1087			// let the caller create a fresh node
  1088			return nil, nil
  1089		}
  1090		var best *Object
  1091		for _, desb := range res.Blobs {
  1092			o, err := im.host.ObjectFromRef(desb.Blob)
  1093			if err != nil {
  1094				return nil, err
  1095			}
  1096			if best == nil {
  1097				best = o
  1098				continue
  1099			}
  1100			if best.Attr(attrClientID) == "" && o.Attr(attrClientID) != "" {
  1101				best = o
  1102				continue
  1103			}
  1104			if best.Attr(attrClientSecret) == "" && o.Attr(attrClientSecret) != "" {
  1105				best = o
  1106				continue
  1107			}
  1108			if best.Attr("title") == "" && o.Attr("title") != "" {
  1109				best = o
  1110				continue
  1111			}
  1112			// all other things being equal, pick the most recent one, as it's more likely
  1113			// to be a sha224 than a sha1
  1114			if best.modtime.Before(o.modtime) {
  1115				best = o
  1116				continue
  1117			}
  1118		}
  1119		return best, nil
  1120	}
  1121	
  1122	// importerAcct is a long-lived type representing account
  1123	type importerAcct struct {
  1124		im   *importer
  1125		acct *Object
  1126		root *Object
  1127	
  1128		mu           sync.Mutex
  1129		current      *RunContext // or nil if not running
  1130		stopped      bool        // stop requested (context canceled)
  1131		lastRunErr   error
  1132		lastRunStart time.Time
  1133		lastRunDone  time.Time
  1134	}
  1135	
  1136	func (ia *importerAcct) String() string {
  1137		return fmt.Sprintf("%v importer account, %v", ia.im.name, ia.acct.PermanodeRef())
  1138	}
  1139	
  1140	func (ia *importerAcct) delete() error {
  1141		if err := ia.acct.SetAttrs(
  1142			attrNodeType, nodeTypeImporterAccount+"-deleted",
  1143		); err != nil {
  1144			return err
  1145		}
  1146		ia.im.deleteAccount(ia.acct.PermanodeRef())
  1147		return nil
  1148	}
  1149	
  1150	func (ia *importerAcct) toggleAuto() error {
  1151		old := ia.acct.Attr(attrImportAuto)
  1152		if old == "" && !ia.im.props.SupportsIncremental {
  1153			return fmt.Errorf("Importer %q doesn't support automatic mode", ia.im.name)
  1154		}
  1155		var new string
  1156		if old == "" {
  1157			new = "30m" // TODO: configurable?
  1158		}
  1159		return ia.acct.SetAttrs(attrImportAuto, new)
  1160	}
  1161	
  1162	func (ia *importerAcct) IsAccountReady() (bool, error) {
  1163		return ia.im.impl.IsAccountReady(ia.acct)
  1164	}
  1165	
  1166	func (ia *importerAcct) AccountObject() *Object { return ia.acct }
  1167	func (ia *importerAcct) RootObject() *Object    { return ia.root }
  1168	
  1169	func (ia *importerAcct) AccountURL() string {
  1170		return ia.im.URL() + "/" + ia.acct.PermanodeRef().String()
  1171	}
  1172	
  1173	func (ia *importerAcct) AccountLinkText() string {
  1174		return ia.acct.PermanodeRef().String()
  1175	}
  1176	
  1177	func (ia *importerAcct) AccountLinkSummary() string {
  1178		return ia.im.impl.SummarizeAccount(ia.acct)
  1179	}
  1180	
  1181	func (ia *importerAcct) RefreshInterval() time.Duration {
  1182		ds := ia.acct.Attr(attrImportAuto)
  1183		if ds == "" {
  1184			return 0
  1185		}
  1186		d, _ := time.ParseDuration(ds)
  1187		return d
  1188	}
  1189	
  1190	func (ia *importerAcct) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  1191		if r.Method == "POST" {
  1192			ia.serveHTTPPost(w, r)
  1193			return
  1194		}
  1195		ia.mu.Lock()
  1196		defer ia.mu.Unlock()
  1197		body := acctBody{
  1198			Acct:     ia,
  1199			AcctType: fmt.Sprintf("%T", ia.im.impl),
  1200		}
  1201		if run := ia.current; run != nil {
  1202			body.Running = true
  1203			body.StartedAgo = time.Since(ia.lastRunStart)
  1204			run.mu.Lock()
  1205			body.LastStatus = fmt.Sprintf("%+v", run.lastProgress)
  1206			run.mu.Unlock()
  1207		} else if !ia.lastRunDone.IsZero() {
  1208			body.LastAgo = time.Since(ia.lastRunDone)
  1209			if ia.lastRunErr != nil {
  1210				body.LastError = ia.lastRunErr.Error()
  1211			}
  1212		}
  1213		title := fmt.Sprintf("%s account: ", ia.im.name)
  1214		if summary := ia.im.impl.SummarizeAccount(ia.acct); summary != "" {
  1215			title += summary
  1216		} else {
  1217			title += ia.acct.PermanodeRef().String()
  1218		}
  1219		ia.im.host.execTemplate(w, r, acctPage{
  1220			Title: title,
  1221			Body:  body,
  1222		})
  1223	}
  1224	
  1225	func (ia *importerAcct) serveHTTPPost(w http.ResponseWriter, r *http.Request) {
  1226		// TODO: XSRF token
  1227	
  1228		switch r.FormValue("mode") {
  1229		case "":
  1230			// Nothing.
  1231		case "start":
  1232			ia.start()
  1233		case "stop":
  1234			ia.stop()
  1235		case "login":
  1236			ia.setup(w, r)
  1237			return
  1238		case "toggleauto":
  1239			if err := ia.toggleAuto(); err != nil {
  1240				http.Error(w, err.Error(), 500)
  1241				return
  1242			}
  1243		case "delete":
  1244			ia.stop() // can't hurt
  1245			if err := ia.delete(); err != nil {
  1246				http.Error(w, err.Error(), 500)
  1247				return
  1248			}
  1249			http.Redirect(w, r, ia.im.URL(), http.StatusFound)
  1250			return
  1251		default:
  1252			http.Error(w, "Unknown mode", http.StatusBadRequest)
  1253			return
  1254		}
  1255		http.Redirect(w, r, ia.AccountURL(), http.StatusFound)
  1256	}
  1257	
  1258	func (ia *importerAcct) setup(w http.ResponseWriter, r *http.Request) {
  1259		if err := ia.im.impl.ServeSetup(w, r, &SetupContext{
  1260			Context:     context.TODO(),
  1261			Host:        ia.im.host,
  1262			AccountNode: ia.acct,
  1263			ia:          ia,
  1264		}); err != nil {
  1265			log.Printf("%v", err)
  1266		}
  1267	}
  1268	
  1269	func (ia *importerAcct) start() {
  1270		ia.mu.Lock()
  1271		defer ia.mu.Unlock()
  1272		if ia.current != nil {
  1273			return
  1274		}
  1275		rc := &RunContext{
  1276			Host: ia.im.host,
  1277			ia:   ia,
  1278		}
  1279		rc.ctx, rc.cancel = context.WithCancel(context.WithValue(context.Background(), ctxutil.HTTPClient, ia.im.host.HTTPClient()))
  1280		ia.current = rc
  1281		ia.stopped = false
  1282		ia.lastRunStart = time.Now()
  1283		go func() {
  1284			log.Printf("Starting %v: %s", ia, ia.AccountLinkSummary())
  1285			err := ia.im.impl.Run(rc)
  1286			if err != nil {
  1287				log.Printf("%v error: %v", ia, err)
  1288			} else {
  1289				log.Printf("%v finished.", ia)
  1290			}
  1291			ia.mu.Lock()
  1292			defer ia.mu.Unlock()
  1293			ia.current = nil
  1294			ia.stopped = false
  1295			ia.lastRunDone = time.Now()
  1296			ia.lastRunErr = err
  1297			go ia.maybeStart()
  1298		}()
  1299	}
  1300	
  1301	// TODO(mpl): review this code more carefully. mostly copied from start().
  1302	func (ia *importerAcct) run() error {
  1303		ia.mu.Lock()
  1304		defer ia.mu.Unlock()
  1305		if ia.current != nil {
  1306			return errors.New("whatever")
  1307		}
  1308		rc := &RunContext{
  1309			Host: ia.im.host,
  1310			ia:   ia,
  1311		}
  1312		rc.ctx, rc.cancel = context.WithCancel(context.WithValue(context.Background(), ctxutil.HTTPClient, ia.im.host.HTTPClient()))
  1313		ia.current = rc
  1314		ia.stopped = false
  1315		ia.lastRunStart = time.Now()
  1316		log.Printf("Starting %v: %s", ia, ia.AccountLinkSummary())
  1317		err := ia.im.impl.Run(rc)
  1318		if err != nil {
  1319			return err
  1320		}
  1321		log.Printf("%v finished.", ia)
  1322		ia.current = nil
  1323		ia.stopped = false
  1324		ia.lastRunDone = time.Now()
  1325		return ia.lastRunErr
  1326	}
  1327	
  1328	func (ia *importerAcct) stop() {
  1329		ia.mu.Lock()
  1330		defer ia.mu.Unlock()
  1331		if ia.current == nil || ia.stopped {
  1332			return
  1333		}
  1334		ia.current.cancel()
  1335		ia.stopped = true
  1336	}
  1337	
  1338	// HTTPClient returns the HTTP client to use.
  1339	func (h *Host) HTTPClient() *http.Client {
  1340		if h.client == nil {
  1341			return http.DefaultClient
  1342		}
  1343		return h.client
  1344	}
  1345	
  1346	// HTTPTransport returns the HTTP transport to use.
  1347	func (h *Host) HTTPTransport() http.RoundTripper {
  1348		if h.transport == nil {
  1349			return http.DefaultTransport
  1350		}
  1351		return h.transport
  1352	}
  1353	
  1354	type ProgressMessage struct {
  1355		ItemsDone, ItemsTotal int
  1356		BytesDone, BytesTotal int64
  1357	}
  1358	
  1359	func (h *Host) upload(ctx context.Context, bb *schema.Builder) (br blob.Ref, err error) {
  1360		signed, err := bb.Sign(ctx, h.signer)
  1361		if err != nil {
  1362			return
  1363		}
  1364		sb, err := blobserver.ReceiveString(ctx, h.target, signed)
  1365		if err != nil {
  1366			return
  1367		}
  1368		return sb.Ref, nil
  1369	}
  1370	
  1371	// NewObject creates a new permanode and returns its Object wrapper.
  1372	func (h *Host) NewObject() (*Object, error) {
  1373		ctx := context.TODO() // TODO: move the NewObject method to some "ImportRun" type with a context field?
  1374		pn, err := h.upload(ctx, schema.NewUnsignedPermanode())
  1375		if err != nil {
  1376			return nil, err
  1377		}
  1378		// No need to do a describe query against it: we know it's
  1379		// empty (has no claims against it yet).
  1380		return &Object{h: h, pn: pn}, nil
  1381	}
  1382	
  1383	// An Object is wrapper around a permanode that the importer uses
  1384	// to synchronize.
  1385	type Object struct {
  1386		h  *Host
  1387		pn blob.Ref // permanode ref
  1388	
  1389		mu      sync.RWMutex
  1390		attr    map[string][]string
  1391		modtime time.Time
  1392	}
  1393	
  1394	// PermanodeRef returns the permanode that this object wraps.
  1395	func (o *Object) PermanodeRef() blob.Ref {
  1396		return o.pn
  1397	}
  1398	
  1399	// Attr returns the object's attribute value for the provided attr,
  1400	// or the empty string if unset.  To distinguish between unset,
  1401	// an empty string, or multiple attribute values, use Attrs.
  1402	func (o *Object) Attr(attr string) string {
  1403		o.mu.RLock()
  1404		defer o.mu.RUnlock()
  1405		if v := o.attr[attr]; len(v) > 0 {
  1406			return v[0]
  1407		}
  1408		return ""
  1409	}
  1410	
  1411	// Attrs returns the attribute values for the provided attr.
  1412	func (o *Object) Attrs(attr string) []string {
  1413		o.mu.RLock()
  1414		defer o.mu.RUnlock()
  1415		return o.attr[attr]
  1416	}
  1417	
  1418	// ForeachAttr runs fn for each of the object's attributes & values.
  1419	// There might be multiple values for the same attribute.
  1420	// The internal lock is held while running, so no mutations should be
  1421	// made or it will deadlock.
  1422	func (o *Object) ForeachAttr(fn func(key, value string)) {
  1423		o.mu.RLock()
  1424		defer o.mu.RUnlock()
  1425		for k, vv := range o.attr {
  1426			for _, v := range vv {
  1427				fn(k, v)
  1428			}
  1429		}
  1430	}
  1431	
  1432	// DelAttr removes value from the values set for the attribute attr of
  1433	// permaNode. If value is empty then all the values for attribute are cleared.
  1434	func (o *Object) DelAttr(key, value string) error {
  1435		ctx := context.TODO()
  1436		o.mu.Lock()
  1437		defer o.mu.Unlock()
  1438		_, err := o.h.upload(ctx, schema.NewDelAttributeClaim(o.pn, key, value))
  1439		if err != nil {
  1440			return err
  1441		}
  1442		if o.attr == nil {
  1443			o.attr = make(map[string][]string)
  1444			return nil
  1445		}
  1446		if value == "" {
  1447			delete(o.attr, key)
  1448			return nil
  1449		}
  1450		var values []string
  1451		for _, v := range o.attr[key] {
  1452			if v != value {
  1453				values = append(values, v)
  1454			}
  1455		}
  1456		o.attr[key] = values
  1457		return nil
  1458	}
  1459	
  1460	// SetAttr sets the attribute key to value.
  1461	func (o *Object) SetAttr(key, value string) error {
  1462		ctx := context.TODO() // TODO: make it possible to get a context via Object; either new context field, or via some "ImportRun" field?
  1463		if o.Attr(key) == value {
  1464			return nil
  1465		}
  1466		_, err := o.h.upload(ctx, schema.NewSetAttributeClaim(o.pn, key, value))
  1467		if err != nil {
  1468			return err
  1469		}
  1470		o.mu.Lock()
  1471		defer o.mu.Unlock()
  1472		if o.attr == nil {
  1473			o.attr = make(map[string][]string)
  1474		}
  1475		o.attr[key] = []string{value}
  1476		return nil
  1477	}
  1478	
  1479	// SetAttrs sets multiple attributes. The provided keyval should be an
  1480	// even number of alternating key/value pairs to set.
  1481	func (o *Object) SetAttrs(keyval ...string) error {
  1482		_, err := o.SetAttrs2(keyval...)
  1483		return err
  1484	}
  1485	
  1486	// SetAttrs2 sets multiple attributes and returns whether there were
  1487	// any changes. The provided keyval should be an even number of
  1488	// alternating key/value pairs to set.
  1489	func (o *Object) SetAttrs2(keyval ...string) (changes bool, err error) {
  1490		if len(keyval)%2 == 1 {
  1491			panic("importer.SetAttrs: odd argument count")
  1492		}
  1493	
  1494		g := syncutil.Group{}
  1495		for i := 0; i < len(keyval); i += 2 {
  1496			key, val := keyval[i], keyval[i+1]
  1497			if val != o.Attr(key) {
  1498				changes = true
  1499				g.Go(func() error {
  1500					return o.SetAttr(key, val)
  1501				})
  1502			}
  1503		}
  1504		return changes, g.Err()
  1505	}
  1506	
  1507	// SetAttrValues sets multi-valued attribute.
  1508	func (o *Object) SetAttrValues(key string, attrs []string) error {
  1509		ctx := context.TODO() // TODO: make it possible to get a context via Object; either new context field, or via some "ImportRun" field?
  1510	
  1511		exists := asSet(o.Attrs(key))
  1512		actual := asSet(attrs)
  1513		o.mu.Lock()
  1514		defer o.mu.Unlock()
  1515		// add new values
  1516		for v := range actual {
  1517			if exists[v] {
  1518				delete(exists, v)
  1519				continue
  1520			}
  1521			_, err := o.h.upload(ctx, schema.NewAddAttributeClaim(o.pn, key, v))
  1522			if err != nil {
  1523				return err
  1524			}
  1525		}
  1526		// delete unneeded values
  1527		for v := range exists {
  1528			_, err := o.h.upload(ctx, schema.NewDelAttributeClaim(o.pn, key, v))
  1529			if err != nil {
  1530				return err
  1531			}
  1532		}
  1533		if o.attr == nil {
  1534			o.attr = make(map[string][]string)
  1535		}
  1536		o.attr[key] = attrs
  1537		return nil
  1538	}
  1539	
  1540	func asSet(elts []string) map[string]bool {
  1541		if len(elts) == 0 {
  1542			return nil
  1543		}
  1544		set := make(map[string]bool, len(elts))
  1545		for _, elt := range elts {
  1546			set[elt] = true
  1547		}
  1548		return set
  1549	}
  1550	
  1551	// ChildPathObject returns (creating if necessary) the child object
  1552	// from the permanode o, given by the "camliPath:xxxx" attribute,
  1553	// where xxx is the provided path.
  1554	func (o *Object) ChildPathObject(path string) (*Object, error) {
  1555		return o.ChildPathObjectOrFunc(path, o.h.NewObject)
  1556	}
  1557	
  1558	// ChildPathObjectOrFunc returns the child object from the permanode o,
  1559	// given by the "camliPath:xxxx" attribute, where xxx is the provided
  1560	// path. If the path doesn't exist, the provided func should return an
  1561	// appropriate object. If the func fails, the return error is
  1562	// returned directly without any attempt to make a permanode.
  1563	func (o *Object) ChildPathObjectOrFunc(path string, fn func() (*Object, error)) (*Object, error) {
  1564		attrName := "camliPath:" + path
  1565		if v := o.Attr(attrName); v != "" {
  1566			br, ok := blob.Parse(v)
  1567			if !ok {
  1568				return nil, fmt.Errorf("invalid blobref %q already stored at camliPath %q", br, path)
  1569			}
  1570			return o.h.ObjectFromRef(br)
  1571		}
  1572		newObj, err := fn()
  1573		if err != nil {
  1574			return nil, err
  1575		}
  1576		if err := o.SetAttr(attrName, newObj.PermanodeRef().String()); err != nil {
  1577			return nil, err
  1578		}
  1579		return newObj, nil
  1580	}
  1581	
  1582	// ObjectFromRef returns the object given by the named permanode
  1583	func (h *Host) ObjectFromRef(permanodeRef blob.Ref) (*Object, error) {
  1584		ctx := context.TODO()
  1585		res, err := h.search.Describe(ctx, &search.DescribeRequest{
  1586			BlobRef: permanodeRef,
  1587			Depth:   1,
  1588		})
  1589		if err != nil {
  1590			return nil, err
  1591		}
  1592		db, ok := res.Meta[permanodeRef.String()]
  1593		if !ok {
  1594			return nil, fmt.Errorf("permanode %v wasn't in Describe response", permanodeRef)
  1595		}
  1596		if db.Permanode == nil {
  1597			return nil, fmt.Errorf("permanode %v had no DescribedPermanode in Describe response", permanodeRef)
  1598		}
  1599		return &Object{
  1600			h:       h,
  1601			pn:      permanodeRef,
  1602			attr:    map[string][]string(db.Permanode.Attr),
  1603			modtime: db.Permanode.ModTime,
  1604		}, nil
  1605	}
  1606	
  1607	type byImporterAccountPermanode []*importerAcct
  1608	
  1609	func (s byImporterAccountPermanode) Len() int { return len(s) }
  1610	func (s byImporterAccountPermanode) Less(i, j int) bool {
  1611		return s[i].acct.PermanodeRef().Less(s[j].acct.PermanodeRef())
  1612	}
  1613	func (s byImporterAccountPermanode) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
Website layout inspired by memcached.
Content by the authors.