Home Download Docs Code Community
     1	/*
     2	Copyright 2013 The Perkeep Authors
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     8	     http://www.apache.org/licenses/LICENSE-2.0
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    17	// Package importer imports content from third-party websites.
    18	package importer // import "perkeep.org/pkg/importer"
    20	import (
    21		"context"
    22		"errors"
    23		"fmt"
    24		"html/template"
    25		"log"
    26		"net/http"
    27		"net/url"
    28		"os"
    29		"sort"
    30		"strconv"
    31		"strings"
    32		"sync"
    33		"time"
    35		"perkeep.org/internal/httputil"
    36		"perkeep.org/pkg/blob"
    37		"perkeep.org/pkg/blobserver"
    38		"perkeep.org/pkg/jsonsign/signhandler"
    39		"perkeep.org/pkg/schema"
    40		"perkeep.org/pkg/search"
    41		"perkeep.org/pkg/server"
    42		"perkeep.org/pkg/types/camtypes"
    44		"go4.org/ctxutil"
    45		"go4.org/jsonconfig"
    46		"go4.org/syncutil"
    47	)
    49	const (
    50		attrNodeType            = "camliNodeType"
    51		nodeTypeImporter        = "importer"
    52		nodeTypeImporterAccount = "importerAccount"
    54		attrImporterType = "importerType" // => "twitter", "foursquare", etc
    55		attrClientID     = "authClientID"
    56		attrClientSecret = "authClientSecret"
    57		attrImportRoot   = "importRoot"
    58		attrImportAuto   = "importAuto" // => time.Duration value ("30m") or "" for off
    59	)
    61	// An Importer imports from a third-party site.
    62	type Importer interface {
    63		// Run runs a full or incremental import.
    64		//
    65		// The importer should continually or periodically monitor the
    66		// RunContext.Context()'s Done channel to exit early if
    67		// requested. The return value should be ctx.Err() if the
    68		// importer exits for that reason.
    69		Run(*RunContext) error
    71		// Properties returns properties of this importer type.
    72		Properties() Properties
    74		// IsAccountReady reports whether the provided account node
    75		// is configured.
    76		IsAccountReady(acctNode *Object) (ok bool, err error)
    77		SummarizeAccount(acctNode *Object) string
    79		ServeSetup(w http.ResponseWriter, r *http.Request, ctx *SetupContext) error
    80		ServeCallback(w http.ResponseWriter, r *http.Request, ctx *SetupContext)
    82		// CallbackRequestAccount extracts the blobref of the importer account from
    83		// the callback URL parameters of r. For example, it will be encoded as:
    84		// For Twitter (OAuth1), in its own URL parameter: "acct=sha1-f2b0b7da718b97ce8c31591d8ed4645c777f3ef4"
    85		// For Picasa: (OAuth2), in the OAuth2 "state" parameter: "state=acct:sha1-97911b1a5887eb5862d1c81666ba839fc1363ea1"
    86		CallbackRequestAccount(r *http.Request) (acctRef blob.Ref, err error)
    88		// CallbackURLParameters uses the input importer account blobRef to build
    89		// and return the URL parameters, that will be appended to the callback URL.
    90		CallbackURLParameters(acctRef blob.Ref) url.Values
    91	}
    93	// Properties contains the properties of an importer type.
    94	type Properties struct {
    95		Title       string
    96		Description string
    98		// TODOIssue, if non-zero, marks the importer as invalid, but the UI
    99		// will link to a tracking bug for implementing it.
   100		TODOIssue int
   102		// NeedsAPIKey reports whether this importer requires an API key
   103		// (OAuth2 client_id & client_secret, or equivalent).
   104		// If the API only requires a username & password, or a flow to get
   105		// an auth token per-account without an overall API key, importers
   106		// can return false here.
   107		NeedsAPIKey bool
   109		// SupportsIncremental reports whether this importer has been optimized
   110		// to run efficiently in regular incremental runs. (e.g. every 5 minutes
   111		// or half hour). Eventually all importers might support this and we'll
   112		// make it required, in which case we might delete this option.
   113		// For now, some importers (e.g. Flickr) don't yet support this.
   114		SupportsIncremental bool
   116		// PermanodeImporterType optionally specifies the "importerType"
   117		// permanode attribute value that should be stored for
   118		// accounts of this type. By default, it is the string that it
   119		// was registered with. This should only be specified for
   120		// products that have been rebranded, and then this should be
   121		// the old branding, to not break people who have been
   122		// importing the account since before the rebranding.
   123		// For example, this is "foursquare" for "swarm", so "swarm" shows
   124		// in the UI and URLs, but it's "foursquare" in permanodes.
   125		PermanodeImporterType string
   126	}
   128	// LongPoller is optionally implemented by importers which can long
   129	// poll efficiently to wait for new content.
   130	// For example, Twitter uses this to subscribe to the user's stream.
   131	type LongPoller interface {
   132		Importer
   134		// LongPoll waits and returns nil when there's new content.
   135		// It does not fetch the content itself.
   136		// It returns a non-nil error if it failed to long poll.
   137		LongPoll(*RunContext) error
   138	}
   140	// TestDataMaker is an optional interface that may be implemented by Importers to
   141	// generate test data locally. The returned Roundtripper will be used as the
   142	// transport of the HTTPClient, in the RunContext that will be passed to Run
   143	// during tests and devcam server --makethings.
   144	// (See http://perkeep.org/issue/417).
   145	type TestDataMaker interface {
   146		MakeTestData() http.RoundTripper
   147		// SetTestAccount allows an importer to set some needed attributes on the importer
   148		// account node before a run is started.
   149		SetTestAccount(acctNode *Object) error
   150	}
   152	// ImporterSetupHTMLer is an optional interface that may be implemented by
   153	// Importers to return some HTML to be included on the importer setup page.
   154	type ImporterSetupHTMLer interface {
   155		AccountSetupHTML(*Host) string
   156	}
   158	var (
   159		importers           = map[string]Importer{}
   160		reservedImporterKey = map[string]bool{}
   161	)
   163	// All returns the map of importer implementation name to implementation. This
   164	// map should not be mutated.
   165	func All() map[string]Importer {
   166		return importers
   167	}
   169	// Register registers a site-specific importer. It should only be called from init,
   170	// and not from concurrent goroutines.
   171	func Register(name string, im Importer) {
   172		if _, dup := importers[name]; dup {
   173			panic("Dup registration of importer " + name)
   174		}
   175		if _, dup := reservedImporterKey[name]; dup {
   176			panic("Dup registration of importer " + name)
   177		}
   178		if pt := im.Properties().PermanodeImporterType; pt != "" {
   179			if _, dup := importers[pt]; dup {
   180				panic("Dup registration of importer " + pt)
   181			}
   182			if _, dup := reservedImporterKey[pt]; dup {
   183				panic("Dup registration of importer " + pt)
   184			}
   185			reservedImporterKey[pt] = true
   186		}
   187		importers[name] = im
   188	}
   190	func RegisterTODO(name string, p Properties) {
   191		Register(name, &todoImp{Props: p})
   192	}
   194	func init() {
   195		// Register the meta "importer" handler, which handles all other handlers.
   196		blobserver.RegisterHandlerConstructor("importer", newFromConfig)
   197	}
   199	// HostConfig holds the parameters to set up a Host.
   200	type HostConfig struct {
   201		BaseURL      string
   202		Prefix       string                  // URL prefix for the importer handler
   203		Target       blobserver.StatReceiver // storage for the imported object blobs
   204		BlobSource   blob.Fetcher            // for additional resources, such as twitter zip file
   205		Signer       *schema.Signer
   206		Search       search.QueryDescriber
   207		ClientId     map[string]string // optionally maps importer impl name to a clientId credential
   208		ClientSecret map[string]string // optionally maps importer impl name to a clientSecret credential
   210		// HTTPClient optionally specifies how to fetch external network
   211		// resources. The Host will use http.DefaultClient otherwise.
   212		HTTPClient *http.Client
   213		// TODO: add more if/when needed
   214	}
   216	func NewHost(hc HostConfig) (*Host, error) {
   217		h := &Host{
   218			baseURL:      hc.BaseURL,
   219			importerBase: hc.BaseURL + hc.Prefix,
   220			imp:          make(map[string]*importer),
   221		}
   222		var err error
   223		h.tmpl, err = tmpl.Clone()
   224		if err != nil {
   225			return nil, err
   226		}
   227		h.tmpl = h.tmpl.Funcs(map[string]interface{}{
   228			"bloblink": func(br blob.Ref) template.HTML {
   229				if h.uiPrefix == "" {
   230					return template.HTML(br.String())
   231				}
   232				return template.HTML(fmt.Sprintf("<a href=\"%s/%s\">%s</a>", h.uiPrefix, br, br))
   233			},
   234		})
   235		for k, impl := range importers {
   236			h.importers = append(h.importers, k)
   237			clientId, clientSecret := hc.ClientId[k], hc.ClientSecret[k]
   238			if clientSecret != "" && clientId == "" {
   239				return nil, fmt.Errorf("Invalid static configuration for importer %q: clientSecret specified without clientId", k)
   240			}
   241			props := impl.Properties()
   242			imp := &importer{
   243				host:         h,
   244				name:         k,
   245				impl:         impl,
   246				props:        &props,
   247				clientID:     clientId,
   248				clientSecret: clientSecret,
   249			}
   250			h.imp[k] = imp
   251		}
   253		sort.Strings(h.importers)
   255		h.target = hc.Target
   256		h.blobSource = hc.BlobSource
   257		h.signer = hc.Signer
   258		h.search = hc.Search
   259		h.client = hc.HTTPClient
   261		return h, nil
   262	}
   264	func newFromConfig(ld blobserver.Loader, cfg jsonconfig.Obj) (http.Handler, error) {
   265		hc := HostConfig{
   266			BaseURL: ld.BaseURL(),
   267			Prefix:  ld.MyPrefix(),
   268		}
   269		ClientId := make(map[string]string)
   270		ClientSecret := make(map[string]string)
   271		for k := range importers {
   272			var clientId, clientSecret string
   273			if impConf := cfg.OptionalObject(k); impConf != nil {
   274				clientId = impConf.OptionalString("clientID", "")
   275				clientSecret = impConf.OptionalString("clientSecret", "")
   276				// Special case: allow clientSecret to be of form "clientId:clientSecret"
   277				// if the clientId is empty.
   278				if clientId == "" && strings.Contains(clientSecret, ":") {
   279					if f := strings.SplitN(clientSecret, ":", 2); len(f) == 2 {
   280						clientId, clientSecret = f[0], f[1]
   281					}
   282				}
   283				if err := impConf.Validate(); err != nil {
   284					return nil, fmt.Errorf("Invalid static configuration for importer %q: %v", k, err)
   285				}
   286				ClientId[k] = clientId
   287				ClientSecret[k] = clientSecret
   288			}
   289		}
   290		if err := cfg.Validate(); err != nil {
   291			return nil, err
   292		}
   293		hc.ClientId = ClientId
   294		hc.ClientSecret = ClientSecret
   295		host, err := NewHost(hc)
   296		if err != nil {
   297			return nil, err
   298		}
   299		host.didInit.Add(1)
   300		return host, nil
   301	}
   303	var _ blobserver.HandlerIniter = (*Host)(nil)
   305	type SetupContext struct {
   306		context.Context
   307		Host        *Host
   308		AccountNode *Object
   310		ia *importerAcct
   311	}
   313	func (sc *SetupContext) Credentials() (clientID, clientSecret string, err error) {
   314		return sc.ia.im.credentials()
   315	}
   317	func (sc *SetupContext) CallbackURL() string {
   318		params := sc.ia.im.impl.CallbackURLParameters(sc.AccountNode.PermanodeRef()).Encode()
   319		if params != "" {
   320			params = "?" + params
   321		}
   322		return sc.Host.ImporterBaseURL() + sc.ia.im.name + "/callback" + params
   323	}
   325	// AccountURL returns the URL to an account of an importer
   326	// (http://host/importer/TYPE/sha1-sd8fsd7f8sdf7).
   327	func (sc *SetupContext) AccountURL() string {
   328		return sc.Host.ImporterBaseURL() + sc.ia.im.name + "/" + sc.AccountNode.PermanodeRef().String()
   329	}
   331	// RunContext is the context provided for a given Run of an importer, importing
   332	// a certain account on a certain importer.
   333	type RunContext struct {
   334		ctx    context.Context
   335		cancel context.CancelFunc // for when we stop/pause the importing
   336		Host   *Host
   338		ia *importerAcct
   340		mu           sync.Mutex // guards following
   341		lastProgress *ProgressMessage
   342	}
   344	// Context returns the run's context. It is always non-nil.
   345	func (rc *RunContext) Context() context.Context {
   346		if rc.ctx != nil {
   347			return rc.ctx
   348		}
   349		return context.Background()
   350	}
   352	// CreateAccount creates a new importer account for the Host h, and the importer
   353	// implementation named impl. It returns a RunContext setup with that account.
   354	func CreateAccount(h *Host, impl string) (*RunContext, error) {
   355		imp, ok := h.imp[impl]
   356		if !ok {
   357			return nil, fmt.Errorf("host does not have a %v importer", impl)
   358		}
   359		ia, err := imp.newAccount()
   360		if err != nil {
   361			return nil, fmt.Errorf("could not create new account for importer %v: %v", impl, err)
   362		}
   363		rc := &RunContext{
   364			Host: ia.im.host,
   365			ia:   ia,
   366		}
   367		rc.ctx, rc.cancel = context.WithCancel(context.WithValue(context.Background(), ctxutil.HTTPClient, ia.im.host.HTTPClient()))
   368		return rc, nil
   370	}
   372	// Credentials returns the credentials for the importer. This is
   373	// typically the OAuth1, OAuth2, or equivalent client ID (api token)
   374	// and client secret (api secret).
   375	func (rc *RunContext) Credentials() (clientID, clientSecret string, err error) {
   376		return rc.ia.im.credentials()
   377	}
   379	// AccountNode returns the permanode storing account information for this permanode.
   380	// It will contain the attributes:
   381	//   - camliNodeType = "importerAccount"
   382	//   - importerType = "registered-type"
   383	//
   384	// You must not change the camliNodeType or importerType.
   385	//
   386	// You should use this permanode to store state about where your
   387	// importer left off, if it can efficiently resume later (without
   388	// missing anything).
   389	func (rc *RunContext) AccountNode() *Object { return rc.ia.acct }
   391	// RootNode returns the initially-empty permanode storing the root
   392	// of this account's data. You can change anything at will. This will
   393	// typically be modeled as a dynamic directory (with camliPath:xxxx
   394	// attributes), where each path element is either a file, object, or
   395	// another dynamic directory.
   396	func (rc *RunContext) RootNode() *Object { return rc.ia.root }
   398	// Host is the HTTP handler and state for managing all the importers
   399	// linked into the binary, even if they're not configured.
   400	type Host struct {
   401		tmpl         *template.Template
   402		importers    []string // sorted; e.g. dummy flickr foursquare picasa twitter
   403		imp          map[string]*importer
   404		baseURL      string
   405		importerBase string
   406		target       blobserver.StatReceiver
   407		blobSource   blob.Fetcher // e.g. twitter reading zip file
   408		search       search.QueryDescriber
   409		signer       *schema.Signer
   410		uiPrefix     string // or empty if no UI handler
   412		// didInit is incremented by newFromConfig and marked done
   413		// after InitHandler. Any method on Host that requires Init
   414		// then calls didInit.Wait to guard against initialization
   415		// races where serverinit calls InitHandler in a random
   416		// order on start-up and different handlers access the
   417		// not-yet-initialized Host (notably from a goroutine)
   418		didInit sync.WaitGroup
   420		// HTTPClient optionally specifies how to fetch external network
   421		// resources. Defaults to http.DefaultClient.
   422		client    *http.Client
   423		transport http.RoundTripper
   424	}
   426	// accountStatus is the JSON representation of the status of a configured importer account.
   427	type accountStatus struct {
   428		Name string `json:"name"` // display name
   429		Type string `json:"type"`
   430		Href string `json:"href"`
   432		StartedUnixSec      int64  `json:"startedUnixSec"`  // zero if not running
   433		LastFinishedUnixSec int64  `json:"finishedUnixSec"` // zero if no previous run
   434		LastError           string `json:"lastRunError"`    // empty if last run was success
   435	}
   437	// AccountsStatus returns the currently configured accounts and their status for
   438	// inclusion in the status.json document, as rendered by the web UI.
   439	func (h *Host) AccountsStatus() (interface{}, []camtypes.StatusError) {
   440		h.didInit.Wait()
   441		var s []accountStatus
   442		var errs []camtypes.StatusError
   443		for _, impName := range h.importers {
   444			imp := h.imp[impName]
   445			accts, _ := imp.Accounts()
   446			for _, ia := range accts {
   447				as := accountStatus{
   448					Type: impName,
   449					Href: ia.AccountURL(),
   450					Name: ia.AccountLinkSummary(),
   451				}
   452				ia.mu.Lock()
   453				if ia.current != nil {
   454					as.StartedUnixSec = ia.lastRunStart.Unix()
   455				}
   456				if !ia.lastRunDone.IsZero() {
   457					as.LastFinishedUnixSec = ia.lastRunDone.Unix()
   458				}
   459				if ia.lastRunErr != nil {
   460					as.LastError = ia.lastRunErr.Error()
   461					errs = append(errs, camtypes.StatusError{
   462						Error: ia.lastRunErr.Error(),
   463						URL:   ia.AccountURL(),
   464					})
   465				}
   466				ia.mu.Unlock()
   467				s = append(s, as)
   468			}
   469		}
   470		return s, errs
   471	}
   473	// RunImporterAccount runs the importerType importer on the account described in
   474	// accountNode.
   475	func (h *Host) RunImporterAccount(importerType string, accountNode blob.Ref) error {
   476		h.didInit.Wait()
   477		imp, ok := h.imp[importerType]
   478		if !ok {
   479			return fmt.Errorf("no %q importer for this account", importerType)
   480		}
   481		accounts, err := imp.Accounts()
   482		if err != nil {
   483			return err
   484		}
   485		for _, ia := range accounts {
   486			if ia.acct.pn != accountNode {
   487				continue
   488			}
   489			return ia.run()
   490		}
   491		return fmt.Errorf("no %v account matching account in node %v", importerType, accountNode)
   492	}
   494	func (h *Host) InitHandler(hl blobserver.FindHandlerByTyper) error {
   495		if prefix, _, err := hl.FindHandlerByType("ui"); err == nil {
   496			h.uiPrefix = prefix
   497		}
   499		_, handler, err := hl.FindHandlerByType("root")
   500		if err != nil || handler == nil {
   501			return errors.New("importer requires a 'root' handler")
   502		}
   503		rh := handler.(*server.RootHandler)
   504		searchHandler, ok := rh.SearchHandler()
   505		if !ok {
   506			return errors.New("importer requires a 'root' handler with 'searchRoot' defined")
   507		}
   508		h.search = searchHandler
   509		if rh.Storage == nil {
   510			return errors.New("importer requires a 'root' handler with 'blobRoot' defined")
   511		}
   512		h.target = rh.Storage
   513		h.blobSource = rh.Storage
   515		_, handler, _ = hl.FindHandlerByType("jsonsign")
   516		if sigh, ok := handler.(*signhandler.Handler); ok {
   517			h.signer = sigh.Signer()
   518		}
   519		if h.signer == nil {
   520			return errors.New("importer requires a 'jsonsign' handler")
   521		}
   522		h.didInit.Done()
   523		go h.startPeriodicImporters()
   524		return nil
   525	}
   527	// ServeHTTP serves:
   528	//
   529	//	http://host/importer/
   530	//	http://host/importer/twitter/
   531	//	http://host/importer/twitter/callback
   532	//	http://host/importer/twitter/sha1-abcabcabcabcabc (single account)
   533	func (h *Host) ServeHTTP(w http.ResponseWriter, r *http.Request) {
   534		suffix := httputil.PathSuffix(r)
   535		seg := strings.Split(suffix, "/")
   536		if suffix == "" || len(seg) == 0 {
   537			h.serveImportersRoot(w, r)
   538			return
   539		}
   540		impName := seg[0]
   542		imp, ok := h.imp[impName]
   543		if !ok {
   544			http.NotFound(w, r)
   545			return
   546		}
   548		if len(seg) == 1 || seg[1] == "" {
   549			h.serveImporter(w, r, imp)
   550			return
   551		}
   552		if seg[1] == "callback" {
   553			h.serveImporterAcctCallback(w, r, imp)
   554			return
   555		}
   556		acctRef, ok := blob.Parse(seg[1])
   557		if !ok {
   558			http.NotFound(w, r)
   559			return
   560		}
   561		h.serveImporterAccount(w, r, imp, acctRef)
   562	}
   564	// Serves list of importers at http://host/importer/
   565	func (h *Host) serveImportersRoot(w http.ResponseWriter, r *http.Request) {
   566		body := importersRootBody{
   567			Host:      h,
   568			Importers: make([]*importer, 0, len(h.imp)),
   569		}
   570		for _, v := range h.importers {
   571			body.Importers = append(body.Importers, h.imp[v])
   572		}
   573		sort.Slice(body.Importers, func(i, j int) bool {
   574			return body.Importers[i].Title() < body.Importers[j].Title()
   575		})
   576		h.execTemplate(w, r, importersRootPage{
   577			Title: "Importers",
   578			Body:  body,
   579		})
   580	}
   582	// Serves list of accounts at http://host/importer/twitter
   583	func (h *Host) serveImporter(w http.ResponseWriter, r *http.Request, imp *importer) {
   584		if r.Method == "POST" {
   585			h.serveImporterPost(w, r, imp)
   586			return
   587		}
   589		var setup string
   590		node, _ := imp.Node()
   591		if setuper, ok := imp.impl.(ImporterSetupHTMLer); ok && node != nil {
   592			setup = setuper.AccountSetupHTML(h)
   593		}
   595		h.execTemplate(w, r, importerPage{
   596			Title: "Importer - " + imp.Name(),
   597			Body: importerBody{
   598				Host:      h,
   599				Importer:  imp,
   600				SetupHelp: template.HTML(setup),
   601			},
   602		})
   603	}
   605	// Serves oauth callback at http://host/importer/TYPE/callback
   606	func (h *Host) serveImporterAcctCallback(w http.ResponseWriter, r *http.Request, imp *importer) {
   607		if r.Method != "GET" {
   608			http.Error(w, "invalid method", http.StatusBadRequest)
   609			return
   610		}
   611		acctRef, err := imp.impl.CallbackRequestAccount(r)
   612		if err != nil {
   613			httputil.ServeError(w, r, err)
   614			return
   615		}
   616		if !acctRef.Valid() {
   617			httputil.ServeError(w, r, errors.New("No valid blobref returned from CallbackRequestAccount(r)"))
   618			return
   619		}
   620		ia, err := imp.account(acctRef)
   621		if err != nil {
   622			http.Error(w, "invalid 'acct' param: "+err.Error(), http.StatusBadRequest)
   623			return
   624		}
   625		imp.impl.ServeCallback(w, r, &SetupContext{
   626			Context:     context.TODO(),
   627			Host:        h,
   628			AccountNode: ia.acct,
   629			ia:          ia,
   630		})
   631	}
   633	func (h *Host) serveImporterPost(w http.ResponseWriter, r *http.Request, imp *importer) {
   634		switch r.FormValue("mode") {
   635		default:
   636			http.Error(w, "Unknown mode.", http.StatusBadRequest)
   637		case "newacct":
   638			ia, err := imp.newAccount()
   639			if err != nil {
   640				http.Error(w, err.Error(), 500)
   641				return
   642			}
   643			ia.setup(w, r)
   644			return
   645		case "saveclientidsecret":
   646			n, err := imp.Node()
   647			if err != nil {
   648				http.Error(w, "Error getting node: "+err.Error(), 500)
   649				return
   650			}
   651			if err := n.SetAttrs(
   652				attrClientID, r.FormValue("clientID"),
   653				attrClientSecret, r.FormValue("clientSecret"),
   654			); err != nil {
   655				http.Error(w, "Error saving node: "+err.Error(), 500)
   656				return
   657			}
   658			http.Redirect(w, r, h.ImporterBaseURL()+imp.name, http.StatusFound)
   659		}
   660	}
   662	// Serves details of accounts at http://host/importer/twitter/sha1-23098429382934
   663	func (h *Host) serveImporterAccount(w http.ResponseWriter, r *http.Request, imp *importer, acctRef blob.Ref) {
   664		ia, err := imp.account(acctRef)
   665		if err != nil {
   666			http.Error(w, "Unknown or invalid importer account "+acctRef.String()+": "+err.Error(), http.StatusBadRequest)
   667			return
   668		}
   669		ia.ServeHTTP(w, r)
   670	}
   672	func (h *Host) startPeriodicImporters() {
   673		res, err := h.search.Query(context.TODO(), &search.SearchQuery{
   674			Sort:       search.Unsorted,
   675			Expression: "attr:camliNodeType:importerAccount",
   676			Describe: &search.DescribeRequest{
   677				Depth: 1,
   678			},
   679		})
   680		if err != nil {
   681			log.Printf("periodic importer search fail: %v", err)
   682			return
   683		}
   684		if res.Describe == nil {
   685			log.Printf("No describe response in search result")
   686			return
   687		}
   688		for _, resBlob := range res.Blobs {
   689			blob := resBlob.Blob
   690			desBlob, ok := res.Describe.Meta[blob.String()]
   691			if !ok || desBlob.Permanode == nil {
   692				continue
   693			}
   694			attrs := desBlob.Permanode.Attr
   695			if attrs.Get(attrNodeType) != nodeTypeImporterAccount {
   696				panic("Search result returned non-importerAccount")
   697			}
   698			impType := attrs.Get("importerType")
   699			imp, ok := h.imp[impType]
   700			if !ok {
   701				continue
   702			}
   703			ia, err := imp.account(blob)
   704			if err != nil {
   705				log.Printf("Can't load importer account %v for regular importing: %v", blob, err)
   706				continue
   707			}
   708			go ia.maybeStart()
   709		}
   710	}
   712	var disableImporters, _ = strconv.ParseBool(os.Getenv("CAMLI_DISABLE_IMPORTERS"))
   714	func (ia *importerAcct) maybeStart() {
   715		if disableImporters {
   716			log.Printf("Importers disabled, per environment.")
   717			return
   718		}
   719		acctObj, err := ia.im.host.ObjectFromRef(ia.acct.PermanodeRef())
   720		if err != nil {
   721			log.Printf("Error maybe starting %v: %v", ia.acct.PermanodeRef(), err)
   722			return
   723		}
   724		duration, err := time.ParseDuration(acctObj.Attr(attrImportAuto))
   725		if duration == 0 || err != nil {
   726			return
   727		}
   728		ia.mu.Lock()
   729		defer ia.mu.Unlock()
   730		if ia.current != nil {
   731			return
   732		}
   733		if ia.lastRunDone.After(time.Now().Add(-duration)) {
   734			// Kick off long poller wait if supported.
   735			if lp, ok := ia.im.impl.(LongPoller); ok {
   736				sleepFor := time.Until(ia.lastRunDone.Add(duration))
   737				sleepCtx, cancel := context.WithTimeout(context.Background(), sleepFor)
   738				log.Printf("%v ran recently enough. Sleeping for %v.", ia, sleepFor)
   739				timer := time.AfterFunc(sleepFor, ia.maybeStart)
   741				rc := &RunContext{
   742					ctx:  sleepCtx,
   743					Host: ia.im.host,
   744					ia:   ia,
   745				}
   746				go func() {
   747					defer cancel()
   749					if err := lp.LongPoll(rc); err == nil {
   750						log.Printf("importer: long poll for %s found an update. Starting run...", ia)
   751						timer.Stop()
   752						ia.start()
   753					} else {
   754						log.Printf("failed to long poll %s: %v", ia, err)
   755					}
   756				}()
   757			}
   758			return
   759		}
   761		log.Printf("importer: starting periodic import for %v", ia)
   762		go ia.start()
   763	}
   765	// BaseURL returns the root of the whole server, without trailing
   766	// slash.
   767	func (h *Host) BaseURL() string {
   768		return h.baseURL
   769	}
   771	// ImporterBaseURL returns the URL base of the importer handler,
   772	// including trailing slash.
   773	func (h *Host) ImporterBaseURL() string {
   774		return h.importerBase
   775	}
   777	func (h *Host) Target() blobserver.StatReceiver {
   778		return h.target
   779	}
   781	func (h *Host) BlobSource() blob.Fetcher {
   782		return h.blobSource
   783	}
   785	func (h *Host) Searcher() search.QueryDescriber { return h.search }
   787	// importer is an importer for a certain site, but not a specific account on that site.
   788	type importer struct {
   789		host  *Host
   790		name  string // importer name e.g. "twitter"
   791		impl  Importer
   792		props *Properties // impl.Properties; pointer so we crash on & find uninitialized callers
   794		// If statically configured in config file, else
   795		// they come from the importer node's attributes.
   796		clientID     string
   797		clientSecret string
   799		nodemu    sync.Mutex // guards nodeCache
   800		nodeCache *Object    // or nil if unset
   802		acctmu    sync.Mutex
   803		acct      map[blob.Ref]*importerAcct // key: account permanode
   804		allLoaded bool
   805	}
   807	func (im *importer) Name() string { return im.name }
   809	func (im *importer) Title() string {
   810		if im.props.Title != "" {
   811			return im.props.Title
   812		}
   813		return im.name
   814	}
   816	func (im *importer) TODOIssue() int {
   817		return im.props.TODOIssue
   818	}
   820	func (im *importer) Description() string {
   821		return im.props.Description
   822	}
   824	// ImporterType returns the account permanode's attrImporterType
   825	// value. This is almost always the same as the Name, except in cases
   826	// where a product gets rebranded. (e.g. "foursquare" to "swarm", in
   827	// which case the importer type remains the old branding)
   828	func (im *importer) ImporterType() string {
   829		if im.props.PermanodeImporterType != "" {
   830			return im.props.PermanodeImporterType
   831		}
   832		return im.name
   833	}
   835	func (im *importer) StaticConfig() bool { return im.clientSecret != "" }
   837	// URL returns the importer's URL without trailing slash.
   838	func (im *importer) URL() string { return im.host.ImporterBaseURL() + im.name }
   840	func (im *importer) ShowClientAuthEditForm() bool {
   841		if im.StaticConfig() {
   842			// Don't expose the server's statically-configured client secret
   843			// to the user. (e.g. a hosted multi-user configuration)
   844			return false
   845		}
   846		return im.props.NeedsAPIKey
   847	}
   849	func (im *importer) InsecureForm() bool {
   850		return !strings.HasPrefix(im.host.ImporterBaseURL(), "https://")
   851	}
   853	func (im *importer) CanAddNewAccount() bool {
   854		if !im.props.NeedsAPIKey {
   855			return true
   856		}
   857		id, sec, err := im.credentials()
   858		return id != "" && sec != "" && err == nil
   859	}
   861	func (im *importer) ClientID() (v string, err error) {
   862		v, _, err = im.credentials()
   863		return
   864	}
   866	func (im *importer) ClientSecret() (v string, err error) {
   867		_, v, err = im.credentials()
   868		return
   869	}
   871	func (im *importer) Status() (status string, err error) {
   872		if !im.props.NeedsAPIKey {
   873			return "no configuration required", nil
   874		}
   875		if im.StaticConfig() {
   876			return "API key configured on server", nil
   877		}
   878		n, err := im.Node()
   879		if err != nil {
   880			return
   881		}
   882		if n.Attr(attrClientID) != "" && n.Attr(attrClientSecret) != "" {
   883			return "API key configured on node", nil
   884		}
   885		return "API key (client ID & Secret) not configured", nil
   886	}
   888	func (im *importer) credentials() (clientID, clientSecret string, err error) {
   889		if im.StaticConfig() {
   890			return im.clientID, im.clientSecret, nil
   891		}
   892		n, err := im.Node()
   893		if err != nil {
   894			return
   895		}
   896		return n.Attr(attrClientID), n.Attr(attrClientSecret), nil
   897	}
   899	func (im *importer) deleteAccount(acctRef blob.Ref) {
   900		im.acctmu.Lock()
   901		delete(im.acct, acctRef)
   902		im.acctmu.Unlock()
   903	}
   905	func (im *importer) account(nodeRef blob.Ref) (*importerAcct, error) {
   906		im.acctmu.Lock()
   907		ia, ok := im.acct[nodeRef]
   908		im.acctmu.Unlock()
   909		if ok {
   910			return ia, nil
   911		}
   913		acct, err := im.host.ObjectFromRef(nodeRef)
   914		if err != nil {
   915			return nil, err
   916		}
   917		if acct.Attr(attrNodeType) != nodeTypeImporterAccount {
   918			return nil, errors.New("account has wrong node type")
   919		}
   920		if acct.Attr(attrImporterType) != im.ImporterType() {
   921			return nil, errors.New("account has wrong importer type")
   922		}
   923		var root *Object
   924		if v := acct.Attr(attrImportRoot); v != "" {
   925			rootRef, ok := blob.Parse(v)
   926			if !ok {
   927				return nil, errors.New("invalid import root attribute")
   928			}
   929			root, err = im.host.ObjectFromRef(rootRef)
   930			if err != nil {
   931				return nil, err
   932			}
   933		} else {
   934			root, err = im.host.NewObject()
   935			if err != nil {
   936				return nil, err
   937			}
   938			if err := acct.SetAttr(attrImportRoot, root.PermanodeRef().String()); err != nil {
   939				return nil, err
   940			}
   941		}
   942		ia = &importerAcct{
   943			im:   im,
   944			acct: acct,
   945			root: root,
   946		}
   947		im.acctmu.Lock()
   948		defer im.acctmu.Unlock()
   949		im.addAccountLocked(ia)
   950		return ia, nil
   951	}
   953	func (im *importer) newAccount() (*importerAcct, error) {
   955		acct, err := im.host.NewObject()
   956		if err != nil {
   957			return nil, err
   958		}
   959		root, err := im.host.NewObject()
   960		if err != nil {
   961			return nil, err
   962		}
   963		if err := acct.SetAttrs(
   964			"title", fmt.Sprintf("%s account", im.name),
   965			attrNodeType, nodeTypeImporterAccount,
   966			attrImporterType, im.ImporterType(),
   967			attrImportRoot, root.PermanodeRef().String(),
   968		); err != nil {
   969			return nil, err
   970		}
   972		ia := &importerAcct{
   973			im:   im,
   974			acct: acct,
   975			root: root,
   976		}
   977		im.acctmu.Lock()
   978		defer im.acctmu.Unlock()
   979		im.addAccountLocked(ia)
   980		return ia, nil
   981	}
   983	func (im *importer) addAccountLocked(ia *importerAcct) {
   984		if im.acct == nil {
   985			im.acct = make(map[blob.Ref]*importerAcct)
   986		}
   987		im.acct[ia.acct.PermanodeRef()] = ia
   988	}
   990	func (im *importer) Accounts() ([]*importerAcct, error) {
   991		im.acctmu.Lock()
   992		needQuery := !im.allLoaded
   993		im.acctmu.Unlock()
   995		if needQuery {
   996			res, err := im.host.search.Query(context.TODO(), &search.SearchQuery{
   997				Sort: search.Unsorted,
   998				Expression: fmt.Sprintf("attr:%s:%s attr:%s:%s",
   999					attrNodeType, nodeTypeImporterAccount,
  1000					attrImporterType, im.ImporterType(),
  1001				),
  1002			})
  1003			if err != nil {
  1004				return nil, err
  1005			}
  1006			for _, res := range res.Blobs {
  1007				_, err := im.account(res.Blob) // caches account
  1008				if err != nil {
  1009					return nil, err
  1010				}
  1011			}
  1012		}
  1014		im.acctmu.Lock()
  1015		defer im.acctmu.Unlock()
  1016		im.allLoaded = true
  1018		accts := make([]*importerAcct, 0, len(im.acct))
  1019		for _, ia := range im.acct {
  1020			accts = append(accts, ia)
  1021		}
  1022		sort.Sort(byImporterAccountPermanode(accts))
  1023		return accts, nil
  1024	}
  1026	// node returns the importer node. (not specific to a certain account
  1027	// on that importer site)
  1028	//
  1029	// It is a permanode with:
  1030	//
  1031	//	camliNodeType: "importer"
  1032	//	importerType: "twitter"
  1033	//
  1034	// And optionally:
  1035	//
  1036	//	authClientID:     "xxx"    // e.g. api token
  1037	//	authClientSecret: "sdkojfsldfjlsdkf"
  1038	func (im *importer) Node() (*Object, error) {
  1039		im.nodemu.Lock()
  1040		defer im.nodemu.Unlock()
  1041		if im.nodeCache != nil {
  1042			return im.nodeCache, nil
  1043		}
  1045		expr := fmt.Sprintf("attr:%s:%s attr:%s:%s",
  1046			attrNodeType, nodeTypeImporter,
  1047			attrImporterType, im.ImporterType(),
  1048		)
  1049		res, err := im.host.search.Query(context.TODO(), &search.SearchQuery{
  1050			Sort:       search.Unsorted,
  1051			Limit:      10, // might be more than one because of multiple blob hash types
  1052			Expression: expr,
  1053		})
  1054		if err != nil {
  1055			return nil, err
  1056		}
  1057		best, err := im.bestNode(res)
  1058		if err != nil {
  1059			return nil, err
  1060		}
  1061		if best != nil {
  1062			return best, nil
  1063		}
  1065		o, err := im.host.NewObject()
  1066		if err != nil {
  1067			return nil, err
  1068		}
  1069		if err := o.SetAttrs(
  1070			"title", fmt.Sprintf("%s importer", im.name),
  1071			attrNodeType, nodeTypeImporter,
  1072			attrImporterType, im.ImporterType(),
  1073		); err != nil {
  1074			return nil, err
  1075		}
  1077		im.nodeCache = o
  1078		return o, nil
  1079	}
  1081	// bestNode picks the most populated importer node amongst res.Blobs, or nil if
  1082	// len(res.Blobs) == 0
  1083	// It is the caller's responsibility to create the correct request so that
  1084	// res.Blobs are all importer nodes.
  1085	func (im *importer) bestNode(res *search.SearchResult) (*Object, error) {
  1086		if len(res.Blobs) == 0 {
  1087			// let the caller create a fresh node
  1088			return nil, nil
  1089		}
  1090		var best *Object
  1091		for _, desb := range res.Blobs {
  1092			o, err := im.host.ObjectFromRef(desb.Blob)
  1093			if err != nil {
  1094				return nil, err
  1095			}
  1096			if best == nil {
  1097				best = o
  1098				continue
  1099			}
  1100			if best.Attr(attrClientID) == "" && o.Attr(attrClientID) != "" {
  1101				best = o
  1102				continue
  1103			}
  1104			if best.Attr(attrClientSecret) == "" && o.Attr(attrClientSecret) != "" {
  1105				best = o
  1106				continue
  1107			}
  1108			if best.Attr("title") == "" && o.Attr("title") != "" {
  1109				best = o
  1110				continue
  1111			}
  1112			// all other things being equal, pick the most recent one, as it's more likely
  1113			// to be a sha224 than a sha1
  1114			if best.modtime.Before(o.modtime) {
  1115				best = o
  1116				continue
  1117			}
  1118		}
  1119		return best, nil
  1120	}
  1122	// importerAcct is a long-lived type representing account
  1123	type importerAcct struct {
  1124		im   *importer
  1125		acct *Object
  1126		root *Object
  1128		mu           sync.Mutex
  1129		current      *RunContext // or nil if not running
  1130		stopped      bool        // stop requested (context canceled)
  1131		lastRunErr   error
  1132		lastRunStart time.Time
  1133		lastRunDone  time.Time
  1134	}
  1136	func (ia *importerAcct) String() string {
  1137		return fmt.Sprintf("%v importer account, %v", ia.im.name, ia.acct.PermanodeRef())
  1138	}
  1140	func (ia *importerAcct) delete() error {
  1141		if err := ia.acct.SetAttrs(
  1142			attrNodeType, nodeTypeImporterAccount+"-deleted",
  1143		); err != nil {
  1144			return err
  1145		}
  1146		ia.im.deleteAccount(ia.acct.PermanodeRef())
  1147		return nil
  1148	}
  1150	func (ia *importerAcct) toggleAuto() error {
  1151		old := ia.acct.Attr(attrImportAuto)
  1152		if old == "" && !ia.im.props.SupportsIncremental {
  1153			return fmt.Errorf("Importer %q doesn't support automatic mode", ia.im.name)
  1154		}
  1155		var new string
  1156		if old == "" {
  1157			new = "30m" // TODO: configurable?
  1158		}
  1159		return ia.acct.SetAttrs(attrImportAuto, new)
  1160	}
  1162	func (ia *importerAcct) IsAccountReady() (bool, error) {
  1163		return ia.im.impl.IsAccountReady(ia.acct)
  1164	}
  1166	func (ia *importerAcct) AccountObject() *Object { return ia.acct }
  1167	func (ia *importerAcct) RootObject() *Object    { return ia.root }
  1169	func (ia *importerAcct) AccountURL() string {
  1170		return ia.im.URL() + "/" + ia.acct.PermanodeRef().String()
  1171	}
  1173	func (ia *importerAcct) AccountLinkText() string {
  1174		return ia.acct.PermanodeRef().String()
  1175	}
  1177	func (ia *importerAcct) AccountLinkSummary() string {
  1178		return ia.im.impl.SummarizeAccount(ia.acct)
  1179	}
  1181	func (ia *importerAcct) RefreshInterval() time.Duration {
  1182		ds := ia.acct.Attr(attrImportAuto)
  1183		if ds == "" {
  1184			return 0
  1185		}
  1186		d, _ := time.ParseDuration(ds)
  1187		return d
  1188	}
  1190	func (ia *importerAcct) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  1191		if r.Method == "POST" {
  1192			ia.serveHTTPPost(w, r)
  1193			return
  1194		}
  1195		ia.mu.Lock()
  1196		defer ia.mu.Unlock()
  1197		body := acctBody{
  1198			Acct:     ia,
  1199			AcctType: fmt.Sprintf("%T", ia.im.impl),
  1200		}
  1201		if run := ia.current; run != nil {
  1202			body.Running = true
  1203			body.StartedAgo = time.Since(ia.lastRunStart)
  1204			run.mu.Lock()
  1205			body.LastStatus = fmt.Sprintf("%+v", run.lastProgress)
  1206			run.mu.Unlock()
  1207		} else if !ia.lastRunDone.IsZero() {
  1208			body.LastAgo = time.Since(ia.lastRunDone)
  1209			if ia.lastRunErr != nil {
  1210				body.LastError = ia.lastRunErr.Error()
  1211			}
  1212		}
  1213		title := fmt.Sprintf("%s account: ", ia.im.name)
  1214		if summary := ia.im.impl.SummarizeAccount(ia.acct); summary != "" {
  1215			title += summary
  1216		} else {
  1217			title += ia.acct.PermanodeRef().String()
  1218		}
  1219		ia.im.host.execTemplate(w, r, acctPage{
  1220			Title: title,
  1221			Body:  body,
  1222		})
  1223	}
  1225	func (ia *importerAcct) serveHTTPPost(w http.ResponseWriter, r *http.Request) {
  1226		// TODO: XSRF token
  1228		switch r.FormValue("mode") {
  1229		case "":
  1230			// Nothing.
  1231		case "start":
  1232			ia.start()
  1233		case "stop":
  1234			ia.stop()
  1235		case "login":
  1236			ia.setup(w, r)
  1237			return
  1238		case "toggleauto":
  1239			if err := ia.toggleAuto(); err != nil {
  1240				http.Error(w, err.Error(), 500)
  1241				return
  1242			}
  1243		case "delete":
  1244			ia.stop() // can't hurt
  1245			if err := ia.delete(); err != nil {
  1246				http.Error(w, err.Error(), 500)
  1247				return
  1248			}
  1249			http.Redirect(w, r, ia.im.URL(), http.StatusFound)
  1250			return
  1251		default:
  1252			http.Error(w, "Unknown mode", http.StatusBadRequest)
  1253			return
  1254		}
  1255		http.Redirect(w, r, ia.AccountURL(), http.StatusFound)
  1256	}
  1258	func (ia *importerAcct) setup(w http.ResponseWriter, r *http.Request) {
  1259		if err := ia.im.impl.ServeSetup(w, r, &SetupContext{
  1260			Context:     context.TODO(),
  1261			Host:        ia.im.host,
  1262			AccountNode: ia.acct,
  1263			ia:          ia,
  1264		}); err != nil {
  1265			log.Printf("%v", err)
  1266		}
  1267	}
  1269	func (ia *importerAcct) start() {
  1270		ia.mu.Lock()
  1271		defer ia.mu.Unlock()
  1272		if ia.current != nil {
  1273			return
  1274		}
  1275		rc := &RunContext{
  1276			Host: ia.im.host,
  1277			ia:   ia,
  1278		}
  1279		rc.ctx, rc.cancel = context.WithCancel(context.WithValue(context.Background(), ctxutil.HTTPClient, ia.im.host.HTTPClient()))
  1280		ia.current = rc
  1281		ia.stopped = false
  1282		ia.lastRunStart = time.Now()
  1283		go func() {
  1284			log.Printf("Starting %v: %s", ia, ia.AccountLinkSummary())
  1285			err := ia.im.impl.Run(rc)
  1286			if err != nil {
  1287				log.Printf("%v error: %v", ia, err)
  1288			} else {
  1289				log.Printf("%v finished.", ia)
  1290			}
  1291			ia.mu.Lock()
  1292			defer ia.mu.Unlock()
  1293			ia.current = nil
  1294			ia.stopped = false
  1295			ia.lastRunDone = time.Now()
  1296			ia.lastRunErr = err
  1297			go ia.maybeStart()
  1298		}()
  1299	}
  1301	// TODO(mpl): review this code more carefully. mostly copied from start().
  1302	func (ia *importerAcct) run() error {
  1303		ia.mu.Lock()
  1304		defer ia.mu.Unlock()
  1305		if ia.current != nil {
  1306			return errors.New("whatever")
  1307		}
  1308		rc := &RunContext{
  1309			Host: ia.im.host,
  1310			ia:   ia,
  1311		}
  1312		rc.ctx, rc.cancel = context.WithCancel(context.WithValue(context.Background(), ctxutil.HTTPClient, ia.im.host.HTTPClient()))
  1313		ia.current = rc
  1314		ia.stopped = false
  1315		ia.lastRunStart = time.Now()
  1316		log.Printf("Starting %v: %s", ia, ia.AccountLinkSummary())
  1317		err := ia.im.impl.Run(rc)
  1318		if err != nil {
  1319			return err
  1320		}
  1321		log.Printf("%v finished.", ia)
  1322		ia.current = nil
  1323		ia.stopped = false
  1324		ia.lastRunDone = time.Now()
  1325		return ia.lastRunErr
  1326	}
  1328	func (ia *importerAcct) stop() {
  1329		ia.mu.Lock()
  1330		defer ia.mu.Unlock()
  1331		if ia.current == nil || ia.stopped {
  1332			return
  1333		}
  1334		ia.current.cancel()
  1335		ia.stopped = true
  1336	}
  1338	// HTTPClient returns the HTTP client to use.
  1339	func (h *Host) HTTPClient() *http.Client {
  1340		if h.client == nil {
  1341			return http.DefaultClient
  1342		}
  1343		return h.client
  1344	}
  1346	// HTTPTransport returns the HTTP transport to use.
  1347	func (h *Host) HTTPTransport() http.RoundTripper {
  1348		if h.transport == nil {
  1349			return http.DefaultTransport
  1350		}
  1351		return h.transport
  1352	}
  1354	type ProgressMessage struct {
  1355		ItemsDone, ItemsTotal int
  1356		BytesDone, BytesTotal int64
  1357	}
  1359	func (h *Host) upload(ctx context.Context, bb *schema.Builder) (br blob.Ref, err error) {
  1360		signed, err := bb.Sign(ctx, h.signer)
  1361		if err != nil {
  1362			return
  1363		}
  1364		sb, err := blobserver.ReceiveString(ctx, h.target, signed)
  1365		if err != nil {
  1366			return
  1367		}
  1368		return sb.Ref, nil
  1369	}
  1371	// NewObject creates a new permanode and returns its Object wrapper.
  1372	func (h *Host) NewObject() (*Object, error) {
  1373		ctx := context.TODO() // TODO: move the NewObject method to some "ImportRun" type with a context field?
  1374		pn, err := h.upload(ctx, schema.NewUnsignedPermanode())
  1375		if err != nil {
  1376			return nil, err
  1377		}
  1378		// No need to do a describe query against it: we know it's
  1379		// empty (has no claims against it yet).
  1380		return &Object{h: h, pn: pn}, nil
  1381	}
  1383	// An Object is wrapper around a permanode that the importer uses
  1384	// to synchronize.
  1385	type Object struct {
  1386		h  *Host
  1387		pn blob.Ref // permanode ref
  1389		mu      sync.RWMutex
  1390		attr    map[string][]string
  1391		modtime time.Time
  1392	}
  1394	// PermanodeRef returns the permanode that this object wraps.
  1395	func (o *Object) PermanodeRef() blob.Ref {
  1396		return o.pn
  1397	}
  1399	// Attr returns the object's attribute value for the provided attr,
  1400	// or the empty string if unset.  To distinguish between unset,
  1401	// an empty string, or multiple attribute values, use Attrs.
  1402	func (o *Object) Attr(attr string) string {
  1403		o.mu.RLock()
  1404		defer o.mu.RUnlock()
  1405		if v := o.attr[attr]; len(v) > 0 {
  1406			return v[0]
  1407		}
  1408		return ""
  1409	}
  1411	// Attrs returns the attribute values for the provided attr.
  1412	func (o *Object) Attrs(attr string) []string {
  1413		o.mu.RLock()
  1414		defer o.mu.RUnlock()
  1415		return o.attr[attr]
  1416	}
  1418	// ForeachAttr runs fn for each of the object's attributes & values.
  1419	// There might be multiple values for the same attribute.
  1420	// The internal lock is held while running, so no mutations should be
  1421	// made or it will deadlock.
  1422	func (o *Object) ForeachAttr(fn func(key, value string)) {
  1423		o.mu.RLock()
  1424		defer o.mu.RUnlock()
  1425		for k, vv := range o.attr {
  1426			for _, v := range vv {
  1427				fn(k, v)
  1428			}
  1429		}
  1430	}
  1432	// DelAttr removes value from the values set for the attribute attr of
  1433	// permaNode. If value is empty then all the values for attribute are cleared.
  1434	func (o *Object) DelAttr(key, value string) error {
  1435		ctx := context.TODO()
  1436		o.mu.Lock()
  1437		defer o.mu.Unlock()
  1438		_, err := o.h.upload(ctx, schema.NewDelAttributeClaim(o.pn, key, value))
  1439		if err != nil {
  1440			return err
  1441		}
  1442		if o.attr == nil {
  1443			o.attr = make(map[string][]string)
  1444			return nil
  1445		}
  1446		if value == "" {
  1447			delete(o.attr, key)
  1448			return nil
  1449		}
  1450		var values []string
  1451		for _, v := range o.attr[key] {
  1452			if v != value {
  1453				values = append(values, v)
  1454			}
  1455		}
  1456		o.attr[key] = values
  1457		return nil
  1458	}
  1460	// SetAttr sets the attribute key to value.
  1461	func (o *Object) SetAttr(key, value string) error {
  1462		ctx := context.TODO() // TODO: make it possible to get a context via Object; either new context field, or via some "ImportRun" field?
  1463		if o.Attr(key) == value {
  1464			return nil
  1465		}
  1466		_, err := o.h.upload(ctx, schema.NewSetAttributeClaim(o.pn, key, value))
  1467		if err != nil {
  1468			return err
  1469		}
  1470		o.mu.Lock()
  1471		defer o.mu.Unlock()
  1472		if o.attr == nil {
  1473			o.attr = make(map[string][]string)
  1474		}
  1475		o.attr[key] = []string{value}
  1476		return nil
  1477	}
  1479	// SetAttrs sets multiple attributes. The provided keyval should be an
  1480	// even number of alternating key/value pairs to set.
  1481	func (o *Object) SetAttrs(keyval ...string) error {
  1482		_, err := o.SetAttrs2(keyval...)
  1483		return err
  1484	}
  1486	// SetAttrs2 sets multiple attributes and returns whether there were
  1487	// any changes. The provided keyval should be an even number of
  1488	// alternating key/value pairs to set.
  1489	func (o *Object) SetAttrs2(keyval ...string) (changes bool, err error) {
  1490		if len(keyval)%2 == 1 {
  1491			panic("importer.SetAttrs: odd argument count")
  1492		}
  1494		g := syncutil.Group{}
  1495		for i := 0; i < len(keyval); i += 2 {
  1496			key, val := keyval[i], keyval[i+1]
  1497			if val != o.Attr(key) {
  1498				changes = true
  1499				g.Go(func() error {
  1500					return o.SetAttr(key, val)
  1501				})
  1502			}
  1503		}
  1504		return changes, g.Err()
  1505	}
  1507	// SetAttrValues sets multi-valued attribute.
  1508	func (o *Object) SetAttrValues(key string, attrs []string) error {
  1509		ctx := context.TODO() // TODO: make it possible to get a context via Object; either new context field, or via some "ImportRun" field?
  1511		exists := asSet(o.Attrs(key))
  1512		actual := asSet(attrs)
  1513		o.mu.Lock()
  1514		defer o.mu.Unlock()
  1515		// add new values
  1516		for v := range actual {
  1517			if exists[v] {
  1518				delete(exists, v)
  1519				continue
  1520			}
  1521			_, err := o.h.upload(ctx, schema.NewAddAttributeClaim(o.pn, key, v))
  1522			if err != nil {
  1523				return err
  1524			}
  1525		}
  1526		// delete unneeded values
  1527		for v := range exists {
  1528			_, err := o.h.upload(ctx, schema.NewDelAttributeClaim(o.pn, key, v))
  1529			if err != nil {
  1530				return err
  1531			}
  1532		}
  1533		if o.attr == nil {
  1534			o.attr = make(map[string][]string)
  1535		}
  1536		o.attr[key] = attrs
  1537		return nil
  1538	}
  1540	func asSet(elts []string) map[string]bool {
  1541		if len(elts) == 0 {
  1542			return nil
  1543		}
  1544		set := make(map[string]bool, len(elts))
  1545		for _, elt := range elts {
  1546			set[elt] = true
  1547		}
  1548		return set
  1549	}
  1551	// ChildPathObject returns (creating if necessary) the child object
  1552	// from the permanode o, given by the "camliPath:xxxx" attribute,
  1553	// where xxx is the provided path.
  1554	func (o *Object) ChildPathObject(path string) (*Object, error) {
  1555		return o.ChildPathObjectOrFunc(path, o.h.NewObject)
  1556	}
  1558	// ChildPathObjectOrFunc returns the child object from the permanode o,
  1559	// given by the "camliPath:xxxx" attribute, where xxx is the provided
  1560	// path. If the path doesn't exist, the provided func should return an
  1561	// appropriate object. If the func fails, the return error is
  1562	// returned directly without any attempt to make a permanode.
  1563	func (o *Object) ChildPathObjectOrFunc(path string, fn func() (*Object, error)) (*Object, error) {
  1564		attrName := "camliPath:" + path
  1565		if v := o.Attr(attrName); v != "" {
  1566			br, ok := blob.Parse(v)
  1567			if !ok {
  1568				return nil, fmt.Errorf("invalid blobref %q already stored at camliPath %q", br, path)
  1569			}
  1570			return o.h.ObjectFromRef(br)
  1571		}
  1572		newObj, err := fn()
  1573		if err != nil {
  1574			return nil, err
  1575		}
  1576		if err := o.SetAttr(attrName, newObj.PermanodeRef().String()); err != nil {
  1577			return nil, err
  1578		}
  1579		return newObj, nil
  1580	}
  1582	// ObjectFromRef returns the object given by the named permanode
  1583	func (h *Host) ObjectFromRef(permanodeRef blob.Ref) (*Object, error) {
  1584		ctx := context.TODO()
  1585		res, err := h.search.Describe(ctx, &search.DescribeRequest{
  1586			BlobRef: permanodeRef,
  1587			Depth:   1,
  1588		})
  1589		if err != nil {
  1590			return nil, err
  1591		}
  1592		db, ok := res.Meta[permanodeRef.String()]
  1593		if !ok {
  1594			return nil, fmt.Errorf("permanode %v wasn't in Describe response", permanodeRef)
  1595		}
  1596		if db.Permanode == nil {
  1597			return nil, fmt.Errorf("permanode %v had no DescribedPermanode in Describe response", permanodeRef)
  1598		}
  1599		return &Object{
  1600			h:       h,
  1601			pn:      permanodeRef,
  1602			attr:    map[string][]string(db.Permanode.Attr),
  1603			modtime: db.Permanode.ModTime,
  1604		}, nil
  1605	}
  1607	type byImporterAccountPermanode []*importerAcct
  1609	func (s byImporterAccountPermanode) Len() int { return len(s) }
  1610	func (s byImporterAccountPermanode) Less(i, j int) bool {
  1611		return s[i].acct.PermanodeRef().Less(s[j].acct.PermanodeRef())
  1612	}
  1613	func (s byImporterAccountPermanode) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
Website layout inspired by memcached.
Content by the authors.