1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
43 package pinboard
44
45 import (
46 "encoding/json"
47 "fmt"
48 "html/template"
49 "io"
50 "log"
51 "net/http"
52 "strings"
53 "sync"
54 "time"
55
56 "perkeep.org/internal/httputil"
57 "perkeep.org/pkg/importer"
58 "perkeep.org/pkg/schema"
59 "perkeep.org/pkg/schema/nodeattr"
60
61 "go4.org/ctxutil"
62 "go4.org/syncutil"
63 )
64
65 func init() {
66 importer.Register("pinboard", imp{})
67 }
68
69 const (
70 fetchUrl = "https://api.pinboard.in/v1/posts/all?auth_token=%s&format=json&results=%d&todt=%s"
71
72
73
74
75
76
77
78 runCompleteVersion = "1"
79
80 timeFormat = "2006-01-02T15:04:05Z"
81
82
83
84
85 pauseInterval = 5 * time.Minute
86
87
88 batchLimit = 10000
89
90 attrAuthToken = "authToken"
91
92
93
94
95 attrPostMeta = "pinboard.in:meta"
96
97
98
99
100
101 StatusTooManyRequests = 429
102 )
103
104
105
106 func extractUsername(authToken string) string {
107 split := strings.SplitN(authToken, ":", 2)
108 if len(split) == 2 {
109 return split[0]
110 }
111 return ""
112 }
113
114 type imp struct {
115 importer.OAuth1
116 }
117
118 func (imp) Properties() importer.Properties {
119 return importer.Properties{
120 Title: "Pinboard",
121 Description: "import your pinboard.in posts",
122 SupportsIncremental: true,
123 NeedsAPIKey: false,
124 }
125 }
126
127 func (imp) IsAccountReady(acct *importer.Object) (ready bool, err error) {
128 ready = acct.Attr(attrAuthToken) != ""
129 return ready, nil
130 }
131
132 func (im imp) SummarizeAccount(acct *importer.Object) string {
133 ok, err := im.IsAccountReady(acct)
134 if err != nil {
135 return "Not configured; error = " + err.Error()
136 }
137 if !ok {
138 return "Not configured"
139 }
140 return fmt.Sprintf("Pinboard account for %s", extractUsername(acct.Attr(attrAuthToken)))
141 }
142
143 func (imp) ServeSetup(w http.ResponseWriter, r *http.Request, ctx *importer.SetupContext) error {
144 return tmpl.ExecuteTemplate(w, "serveSetup", ctx)
145 }
146
147 var tmpl = template.Must(template.New("root").Parse(`
148 {{define "serveSetup"}}
149 <h1>Configuring Pinboad Account</h1>
150 <form method="get" action="{{.CallbackURL}}">
151 <input type="hidden" name="acct" value="{{.AccountNode.PermanodeRef}}">
152 <table border=0 cellpadding=3>
153 <tr><td align=right>API token</td><td><input name="apiToken" size=50> (You can find it <a href="https://pinboard.in/settings/password">here</a>)</td></tr>
154 <tr><td align=right></td><td><input type="submit" value="Add"></td></tr>
155 </table>
156 </form>
157 {{end}}
158 `))
159
160 func (im imp) ServeCallback(w http.ResponseWriter, r *http.Request, ctx *importer.SetupContext) {
161 t := r.FormValue("apiToken")
162 if t == "" {
163 http.Error(w, "Expected an API Token", http.StatusBadRequest)
164 return
165 }
166 if extractUsername(t) == "" {
167 errText := fmt.Sprintf("Unable to parse %q as an api token. We expect <username>:<somevalue>", t)
168 http.Error(w, errText, http.StatusBadRequest)
169 }
170 if err := ctx.AccountNode.SetAttrs(
171 attrAuthToken, t,
172 ); err != nil {
173 httputil.ServeError(w, r, fmt.Errorf("Error setting attribute: %v", err))
174 return
175 }
176 http.Redirect(w, r, ctx.AccountURL(), http.StatusFound)
177 }
178
179 func (im imp) Run(ctx *importer.RunContext) (err error) {
180 log.Printf("pinboard: Running importer.")
181 r := &run{
182 RunContext: ctx,
183 im: im,
184 postGate: syncutil.NewGate(3),
185 nextCursor: time.Now().Format(timeFormat),
186 nextAfter: time.Now(),
187 lastPause: pauseInterval,
188 incremental: ctx.AccountNode().Attr(importer.AcctAttrCompletedVersion) == runCompleteVersion,
189 }
190 err = r.importPosts()
191 log.Printf("pinboard: Importer returned %v.", err)
192 if err != nil {
193 return err
194 }
195 return r.AccountNode().SetAttrs(importer.AcctAttrCompletedVersion, runCompleteVersion)
196 }
197
198 func (im imp) ServeHTTP(w http.ResponseWriter, r *http.Request) {
199 httputil.BadRequestError(w, "Unexpected path: %s", r.URL.Path)
200 }
201
202 type run struct {
203 *importer.RunContext
204 im imp
205 postGate *syncutil.Gate
206
207
208
209 nextCursor string
210
211
212 nextAfter time.Time
213
214
215
216
217
218 lastPause time.Duration
219
220 incremental bool
221 }
222
223 func (r *run) getPostsNode() (*importer.Object, error) {
224 username := extractUsername(r.AccountNode().Attr(attrAuthToken))
225 root := r.RootNode()
226 rootTitle := fmt.Sprintf("%s's Pinboard Account", username)
227 log.Printf("pinboard: root title = %q; want %q.", root.Attr(nodeattr.Title), rootTitle)
228 if err := root.SetAttr(nodeattr.Title, rootTitle); err != nil {
229 return nil, err
230 }
231 obj, err := root.ChildPathObject("posts")
232 if err != nil {
233 return nil, err
234 }
235 title := fmt.Sprintf("%s's Posts", username)
236 return obj, obj.SetAttr(nodeattr.Title, title)
237 }
238
239 func (r *run) importPosts() error {
240 authToken := r.AccountNode().Attr(attrAuthToken)
241 parent, err := r.getPostsNode()
242 if err != nil {
243 return err
244 }
245
246 keepTrying := true
247 for keepTrying {
248 keepTrying, err = r.importBatch(authToken, parent)
249 if err != nil {
250 return err
251 }
252 }
253
254 return nil
255 }
256
257
258 type apiPost struct {
259 Href string
260 Description string
261 Extended string
262 Meta string
263 Hash string
264 Time string
265 Shared string
266 ToRead string
267 Tags string
268 }
269
270 func (r *run) importBatch(authToken string, parent *importer.Object) (keepTrying bool, err error) {
271 sleepDuration := time.Until(r.nextAfter)
272
273 select {
274 case <-r.Context().Done():
275 log.Printf("pinboard: Importer interrupted.")
276 return false, r.Context().Err()
277 case <-time.After(sleepDuration):
278
279 }
280 start := time.Now()
281
282 u := fmt.Sprintf(fetchUrl, authToken, batchLimit, r.nextCursor)
283 resp, err := ctxutil.Client(r.Context()).Get(u)
284 if err != nil {
285 return false, err
286 }
287 defer resp.Body.Close()
288 switch {
289 case resp.StatusCode == StatusTooManyRequests:
290 r.lastPause = r.lastPause * 2
291 r.nextAfter = time.Now().Add(r.lastPause)
292 return true, nil
293 case resp.StatusCode != http.StatusOK:
294 return false, fmt.Errorf("Unexpected status code %v fetching %v", resp.StatusCode, u)
295 }
296
297 body, err := io.ReadAll(resp.Body)
298 if err != nil {
299 return false, err
300 }
301
302 var postBatch []apiPost
303 if err = json.Unmarshal(body, &postBatch); err != nil {
304 return false, err
305 }
306
307 if err != nil {
308 return false, err
309 }
310
311 postCount := len(postBatch)
312 if postCount == 0 {
313
314 return false, nil
315 }
316
317 log.Printf("pinboard: Importing %d posts...", postCount)
318 var (
319 allDupMu sync.Mutex
320 allDups = true
321 grp syncutil.Group
322 )
323 for _, post := range postBatch {
324 select {
325 case <-r.Context().Done():
326 log.Printf("pinboard: Importer interrupted")
327 return false, r.Context().Err()
328 default:
329 }
330
331 post := post
332 r.postGate.Start()
333 grp.Go(func() error {
334 defer r.postGate.Done()
335 dup, err := r.importPost(&post, parent)
336 if !dup {
337 allDupMu.Lock()
338 allDups = false
339 allDupMu.Unlock()
340 }
341 return err
342 })
343 }
344
345 if err := grp.Err(); err != nil {
346 return false, err
347 }
348 log.Printf("pinboard: Imported batch of %d posts in %s.", postCount, time.Since(start))
349
350 if r.incremental && allDups {
351 log.Printf("pinboard: incremental import found end batch")
352 return false, nil
353 }
354
355 r.nextCursor = postBatch[postCount-1].Time
356 r.lastPause = pauseInterval
357 r.nextAfter = time.Now().Add(pauseInterval)
358 tryAgain := postCount == batchLimit
359 return tryAgain, nil
360 }
361
362 func (r *run) importPost(post *apiPost, parent *importer.Object) (dup bool, err error) {
363 postNode, err := parent.ChildPathObject(post.Hash)
364 if err != nil {
365 return false, err
366 }
367
368
369 if post.Meta != "" && postNode.Attr(attrPostMeta) == post.Meta {
370 return true, nil
371 }
372
373 t, err := time.Parse(timeFormat, post.Time)
374 if err != nil {
375 return false, err
376 }
377
378 attrs := []string{
379 "pinboard.in:hash", post.Hash,
380 nodeattr.Type, "pinboard.in:post",
381 nodeattr.DateCreated, schema.RFC3339FromTime(t),
382 nodeattr.Title, post.Description,
383 nodeattr.URL, post.Href,
384 "pinboard.in:extended", post.Extended,
385 "pinboard.in:shared", post.Shared,
386 "pinboard.in:toread", post.ToRead,
387 }
388 if err = postNode.SetAttrs(attrs...); err != nil {
389 return false, err
390 }
391 if err = postNode.SetAttrValues("tag", strings.Split(post.Tags, " ")); err != nil {
392 return false, err
393 }
394 if err = postNode.SetAttr(attrPostMeta, post.Meta); err != nil {
395 return false, err
396 }
397
398 return false, nil
399 }