Home Download Docs Code Community
     1	/*
     2	Copyright 2014 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	/*
    18	Package pinboard imports pinboard.in posts.
    19	
    20	This package uses the v1 api documented here:  https://pinboard.in/api.
    21	
    22	Note that the api document seems to use 'post' and 'bookmark'
    23	interchangeably.  We use 'post' everywhere in this code.
    24	
    25	Posts in pinboard are mutable; they can be edited or deleted.
    26	
    27	We handle edited posts by always reimporting everything and rewriting
    28	any nodes.  Perhaps this would become more efficient if we would first
    29	compare the meta tag from pinboard to the meta tag we have stored to
    30	only write the node if there are changes.
    31	
    32	We don't handle deleted posts.  One possible approach for this would
    33	be to import everything under a new permanode, then once it is
    34	successful, swap the new permanode and the posts node (note: I don't
    35	think I really understand the data model here, so this is sort of
    36	gibberish).
    37	
    38	I have exchanged email with Maciej Ceglowski of pinboard, who may in
    39	the future provide an api that lets us query what has changed.  We
    40	might want to switch to that when available to make the import process
    41	more light-weight.
    42	*/
    43	package pinboard // import "perkeep.org/pkg/importer/pinboard"
    44	
    45	import (
    46		"encoding/json"
    47		"fmt"
    48		"html/template"
    49		"io"
    50		"log"
    51		"net/http"
    52		"strings"
    53		"sync"
    54		"time"
    55	
    56		"perkeep.org/internal/httputil"
    57		"perkeep.org/pkg/importer"
    58		"perkeep.org/pkg/schema"
    59		"perkeep.org/pkg/schema/nodeattr"
    60	
    61		"go4.org/ctxutil"
    62		"go4.org/syncutil"
    63	)
    64	
    65	func init() {
    66		importer.Register("pinboard", imp{})
    67	}
    68	
    69	const (
    70		fetchUrl = "https://api.pinboard.in/v1/posts/all?auth_token=%s&format=json&results=%d&todt=%s"
    71	
    72		// runCompleteVersion is a cache-busting version number of the
    73		// importer code. It should be incremented whenever the
    74		// behavior of this importer is updated enough to warrant a
    75		// complete run.  Otherwise, if the importer runs to
    76		// completion, this version number is recorded on the account
    77		// permanode and subsequent importers can stop early.
    78		runCompleteVersion = "1"
    79	
    80		timeFormat = "2006-01-02T15:04:05Z"
    81	
    82		// pauseInterval is the time we wait between fetching batches (for
    83		// a particualar user).  This time is pretty long, but is what the
    84		// api documentation suggests.
    85		pauseInterval = 5 * time.Minute
    86	
    87		// batchLimit is the maximum number of posts we will fetch in one batch.
    88		batchLimit = 10000
    89	
    90		attrAuthToken = "authToken"
    91	
    92		// attrPostMeta is the attribute to store the meta tag of a pinboard post.
    93		// It is used as the signal for duplicate detection, as the meta tag
    94		// changes whenever a post is mutated.
    95		attrPostMeta = "pinboard.in:meta"
    96	
    97		// StatusTooManyRequests is the http status code returned by
    98		// pinboard servers if we have made too many requests for a
    99		// particular user.  If we receive this status code, we should
   100		// double the amount of time we wait before trying again.
   101		StatusTooManyRequests = 429
   102	)
   103	
   104	// We expect <username>:<some id>.  Sometimes pinboard calls this an
   105	// auth token and sometimes they call it an api token.
   106	func extractUsername(authToken string) string {
   107		split := strings.SplitN(authToken, ":", 2)
   108		if len(split) == 2 {
   109			return split[0]
   110		}
   111		return ""
   112	}
   113	
   114	type imp struct {
   115		importer.OAuth1 // for CallbackRequestAccount and CallbackURLParameters
   116	}
   117	
   118	func (imp) Properties() importer.Properties {
   119		return importer.Properties{
   120			Title:               "Pinboard",
   121			Description:         "import your pinboard.in posts",
   122			SupportsIncremental: true,
   123			NeedsAPIKey:         false,
   124		}
   125	}
   126	
   127	func (imp) IsAccountReady(acct *importer.Object) (ready bool, err error) {
   128		ready = acct.Attr(attrAuthToken) != ""
   129		return ready, nil
   130	}
   131	
   132	func (im imp) SummarizeAccount(acct *importer.Object) string {
   133		ok, err := im.IsAccountReady(acct)
   134		if err != nil {
   135			return "Not configured; error = " + err.Error()
   136		}
   137		if !ok {
   138			return "Not configured"
   139		}
   140		return fmt.Sprintf("Pinboard account for %s", extractUsername(acct.Attr(attrAuthToken)))
   141	}
   142	
   143	func (imp) ServeSetup(w http.ResponseWriter, r *http.Request, ctx *importer.SetupContext) error {
   144		return tmpl.ExecuteTemplate(w, "serveSetup", ctx)
   145	}
   146	
   147	var tmpl = template.Must(template.New("root").Parse(`
   148	{{define "serveSetup"}}
   149	<h1>Configuring Pinboad Account</h1>
   150	<form method="get" action="{{.CallbackURL}}">
   151	  <input type="hidden" name="acct" value="{{.AccountNode.PermanodeRef}}">
   152	  <table border=0 cellpadding=3>
   153	  <tr><td align=right>API token</td><td><input name="apiToken" size=50> (You can find it <a href="https://pinboard.in/settings/password">here</a>)</td></tr>
   154	  <tr><td align=right></td><td><input type="submit" value="Add"></td></tr>
   155	  </table>
   156	</form>
   157	{{end}}
   158	`))
   159	
   160	func (im imp) ServeCallback(w http.ResponseWriter, r *http.Request, ctx *importer.SetupContext) {
   161		t := r.FormValue("apiToken")
   162		if t == "" {
   163			http.Error(w, "Expected an API Token", http.StatusBadRequest)
   164			return
   165		}
   166		if extractUsername(t) == "" {
   167			errText := fmt.Sprintf("Unable to parse %q as an api token.  We expect <username>:<somevalue>", t)
   168			http.Error(w, errText, http.StatusBadRequest)
   169		}
   170		if err := ctx.AccountNode.SetAttrs(
   171			attrAuthToken, t,
   172		); err != nil {
   173			httputil.ServeError(w, r, fmt.Errorf("Error setting attribute: %v", err))
   174			return
   175		}
   176		http.Redirect(w, r, ctx.AccountURL(), http.StatusFound)
   177	}
   178	
   179	func (im imp) Run(ctx *importer.RunContext) (err error) {
   180		log.Printf("pinboard: Running importer.")
   181		r := &run{
   182			RunContext:  ctx,
   183			im:          im,
   184			postGate:    syncutil.NewGate(3),
   185			nextCursor:  time.Now().Format(timeFormat),
   186			nextAfter:   time.Now(),
   187			lastPause:   pauseInterval,
   188			incremental: ctx.AccountNode().Attr(importer.AcctAttrCompletedVersion) == runCompleteVersion,
   189		}
   190		err = r.importPosts()
   191		log.Printf("pinboard: Importer returned %v.", err)
   192		if err != nil {
   193			return err
   194		}
   195		return r.AccountNode().SetAttrs(importer.AcctAttrCompletedVersion, runCompleteVersion)
   196	}
   197	
   198	func (im imp) ServeHTTP(w http.ResponseWriter, r *http.Request) {
   199		httputil.BadRequestError(w, "Unexpected path: %s", r.URL.Path)
   200	}
   201	
   202	type run struct {
   203		*importer.RunContext
   204		im       imp
   205		postGate *syncutil.Gate
   206	
   207		// nextCursor is the exclusive bound, to fetch only bookmarks created before this time.
   208		// The pinboard API returns most recent posts first.
   209		nextCursor string
   210	
   211		// We should not fetch the next batch until this time (exclusive bound)
   212		nextAfter time.Time
   213	
   214		// This gets set to pauseInterval at the beginning of each run and
   215		// after each successful fetch.  Every time we get a 429 back from
   216		// pinboard, it gets doubled.  It will be used to calculate the
   217		// next time we fetch from pinboard.
   218		lastPause time.Duration
   219	
   220		incremental bool // whether we've completed a run in the past
   221	}
   222	
   223	func (r *run) getPostsNode() (*importer.Object, error) {
   224		username := extractUsername(r.AccountNode().Attr(attrAuthToken))
   225		root := r.RootNode()
   226		rootTitle := fmt.Sprintf("%s's Pinboard Account", username)
   227		log.Printf("pinboard: root title = %q; want %q.", root.Attr(nodeattr.Title), rootTitle)
   228		if err := root.SetAttr(nodeattr.Title, rootTitle); err != nil {
   229			return nil, err
   230		}
   231		obj, err := root.ChildPathObject("posts")
   232		if err != nil {
   233			return nil, err
   234		}
   235		title := fmt.Sprintf("%s's Posts", username)
   236		return obj, obj.SetAttr(nodeattr.Title, title)
   237	}
   238	
   239	func (r *run) importPosts() error {
   240		authToken := r.AccountNode().Attr(attrAuthToken)
   241		parent, err := r.getPostsNode()
   242		if err != nil {
   243			return err
   244		}
   245	
   246		keepTrying := true
   247		for keepTrying {
   248			keepTrying, err = r.importBatch(authToken, parent)
   249			if err != nil {
   250				return err
   251			}
   252		}
   253	
   254		return nil
   255	}
   256	
   257	// Used to parse json
   258	type apiPost struct {
   259		Href        string
   260		Description string
   261		Extended    string
   262		Meta        string
   263		Hash        string
   264		Time        string
   265		Shared      string
   266		ToRead      string
   267		Tags        string
   268	}
   269	
   270	func (r *run) importBatch(authToken string, parent *importer.Object) (keepTrying bool, err error) {
   271		sleepDuration := time.Until(r.nextAfter)
   272		// block until we either get canceled or until it is time to run
   273		select {
   274		case <-r.Context().Done():
   275			log.Printf("pinboard: Importer interrupted.")
   276			return false, r.Context().Err()
   277		case <-time.After(sleepDuration):
   278			// just proceed
   279		}
   280		start := time.Now()
   281	
   282		u := fmt.Sprintf(fetchUrl, authToken, batchLimit, r.nextCursor)
   283		resp, err := ctxutil.Client(r.Context()).Get(u)
   284		if err != nil {
   285			return false, err
   286		}
   287		defer resp.Body.Close()
   288		switch {
   289		case resp.StatusCode == StatusTooManyRequests:
   290			r.lastPause = r.lastPause * 2
   291			r.nextAfter = time.Now().Add(r.lastPause)
   292			return true, nil
   293		case resp.StatusCode != http.StatusOK:
   294			return false, fmt.Errorf("Unexpected status code %v fetching %v", resp.StatusCode, u)
   295		}
   296	
   297		body, err := io.ReadAll(resp.Body)
   298		if err != nil {
   299			return false, err
   300		}
   301	
   302		var postBatch []apiPost
   303		if err = json.Unmarshal(body, &postBatch); err != nil {
   304			return false, err
   305		}
   306	
   307		if err != nil {
   308			return false, err
   309		}
   310	
   311		postCount := len(postBatch)
   312		if postCount == 0 {
   313			// we are done!
   314			return false, nil
   315		}
   316	
   317		log.Printf("pinboard: Importing %d posts...", postCount)
   318		var (
   319			allDupMu sync.Mutex
   320			allDups  = true
   321			grp      syncutil.Group
   322		)
   323		for _, post := range postBatch {
   324			select {
   325			case <-r.Context().Done():
   326				log.Printf("pinboard: Importer interrupted")
   327				return false, r.Context().Err()
   328			default:
   329			}
   330	
   331			post := post
   332			r.postGate.Start()
   333			grp.Go(func() error {
   334				defer r.postGate.Done()
   335				dup, err := r.importPost(&post, parent)
   336				if !dup {
   337					allDupMu.Lock()
   338					allDups = false
   339					allDupMu.Unlock()
   340				}
   341				return err
   342			})
   343		}
   344	
   345		if err := grp.Err(); err != nil {
   346			return false, err
   347		}
   348		log.Printf("pinboard: Imported batch of %d posts in %s.", postCount, time.Since(start))
   349	
   350		if r.incremental && allDups {
   351			log.Printf("pinboard: incremental import found end batch")
   352			return false, nil
   353		}
   354	
   355		r.nextCursor = postBatch[postCount-1].Time
   356		r.lastPause = pauseInterval
   357		r.nextAfter = time.Now().Add(pauseInterval)
   358		tryAgain := postCount == batchLimit
   359		return tryAgain, nil
   360	}
   361	
   362	func (r *run) importPost(post *apiPost, parent *importer.Object) (dup bool, err error) {
   363		postNode, err := parent.ChildPathObject(post.Hash)
   364		if err != nil {
   365			return false, err
   366		}
   367	
   368		//Check for duplicates
   369		if post.Meta != "" && postNode.Attr(attrPostMeta) == post.Meta {
   370			return true, nil
   371		}
   372	
   373		t, err := time.Parse(timeFormat, post.Time)
   374		if err != nil {
   375			return false, err
   376		}
   377	
   378		attrs := []string{
   379			"pinboard.in:hash", post.Hash,
   380			nodeattr.Type, "pinboard.in:post",
   381			nodeattr.DateCreated, schema.RFC3339FromTime(t),
   382			nodeattr.Title, post.Description,
   383			nodeattr.URL, post.Href,
   384			"pinboard.in:extended", post.Extended,
   385			"pinboard.in:shared", post.Shared,
   386			"pinboard.in:toread", post.ToRead,
   387		}
   388		if err = postNode.SetAttrs(attrs...); err != nil {
   389			return false, err
   390		}
   391		if err = postNode.SetAttrValues("tag", strings.Split(post.Tags, " ")); err != nil {
   392			return false, err
   393		}
   394		if err = postNode.SetAttr(attrPostMeta, post.Meta); err != nil {
   395			return false, err
   396		}
   397	
   398		return false, nil
   399	}
Website layout inspired by memcached.
Content by the authors.