1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17
18
19
20
21 package app
22
23 import (
24 "context"
25 "errors"
26 "fmt"
27 "log"
28 "net/http"
29 "net/http/httputil"
30 "net/url"
31 "os"
32 "os/exec"
33 "path/filepath"
34 "strconv"
35 "strings"
36 "sync"
37 "time"
38
39 camhttputil "perkeep.org/internal/httputil"
40 "perkeep.org/internal/netutil"
41 "perkeep.org/pkg/auth"
42 "perkeep.org/pkg/blob"
43 "perkeep.org/pkg/blobserver"
44 "perkeep.org/pkg/search"
45
46 "go4.org/jsonconfig"
47 )
48
49
50
51
52 type Handler struct {
53 name string
54 envVars map[string]string
55
56 auth auth.AuthMode
57 appConfig jsonconfig.Obj
58 sh *search.Handler
59
60 masterQueryMu sync.RWMutex
61
62
63 masterQuery *search.SearchQuery
64
65
66
67 domainBlobs map[blob.Ref]bool
68 domainBlobsRefresh time.Time
69
70
71
72
73 prefix string
74 proxy *httputil.ReverseProxy
75 backendURL string
76 configURLPath string
77 masterqueryURLPath string
78
79 process *os.Process
80 }
81
82 func (a *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
83 if r.URL.Path == a.masterqueryURLPath {
84 a.handleMasterQuery(w, r)
85 return
86 }
87 if a.configURLPath != "" && r.URL.Path == a.configURLPath {
88 if a.auth.AllowedAccess(r)&auth.OpGet == auth.OpGet {
89 camhttputil.ReturnJSON(w, a.appConfig)
90 } else {
91 auth.SendUnauthorized(w, r)
92 }
93 return
94 }
95 trimmedPath := strings.TrimPrefix(r.URL.Path, a.prefix)
96 if strings.HasPrefix(trimmedPath, "/search") {
97 a.handleSearch(w, r)
98 return
99 }
100
101 if a.proxy == nil {
102 http.Error(w, "no proxy for the app", 500)
103 return
104 }
105 a.proxy.ServeHTTP(w, r)
106 }
107
108
109
110 func (a *Handler) handleMasterQuery(w http.ResponseWriter, r *http.Request) {
111 if !(a.auth.AllowedAccess(r)&auth.OpAll == auth.OpAll) {
112 auth.SendUnauthorized(w, r)
113 return
114 }
115 if r.Method != http.MethodPost {
116 http.Error(w, "not a POST", http.StatusMethodNotAllowed)
117 return
118 }
119 if a.sh == nil {
120 http.Error(w, "app proxy has no search handler", 500)
121 return
122 }
123 if refresh, _ := strconv.ParseBool(r.FormValue("refresh")); refresh {
124 if err := a.refreshDomainBlobs(); err != nil {
125 if err == errRefreshSuppress {
126 http.Error(w, "too many refresh requests", http.StatusTooManyRequests)
127 } else {
128 http.Error(w, fmt.Sprintf("%v", err), 500)
129 }
130 return
131 }
132 w.Write([]byte("OK"))
133 return
134 }
135 sq := new(search.SearchQuery)
136 if err := sq.FromHTTP(r); err != nil {
137 http.Error(w, fmt.Sprintf("error reading master query: %v", err), 500)
138 return
139 }
140 var masterQuery search.SearchQuery = *(sq)
141 masterQuery.Describe = masterQuery.Describe.Clone()
142 sr, err := a.sh.Query(r.Context(), sq)
143 if err != nil {
144 http.Error(w, fmt.Sprintf("error running master query: %v", err), 500)
145 return
146 }
147 a.masterQueryMu.Lock()
148 defer a.masterQueryMu.Unlock()
149 a.masterQuery = &masterQuery
150 a.domainBlobs = make(map[blob.Ref]bool, len(sr.Describe.Meta))
151 for _, v := range sr.Describe.Meta {
152 a.domainBlobs[v.BlobRef] = true
153 }
154 a.domainBlobsRefresh = time.Now()
155 w.Write([]byte("OK"))
156 }
157
158 var errRefreshSuppress = errors.New("refresh request suppressed")
159
160 func (a *Handler) refreshDomainBlobs() error {
161 a.masterQueryMu.Lock()
162 defer a.masterQueryMu.Unlock()
163 if time.Now().Before(a.domainBlobsRefresh.Add(time.Minute)) {
164
165 return errRefreshSuppress
166 }
167 if a.masterQuery == nil {
168 return errors.New("no master query")
169 }
170 var sq search.SearchQuery = *(a.masterQuery)
171 sq.Describe = sq.Describe.Clone()
172 sr, err := a.sh.Query(context.TODO(), &sq)
173 if err != nil {
174 return fmt.Errorf("error running master query: %v", err)
175 }
176 a.domainBlobs = make(map[blob.Ref]bool, len(sr.Describe.Meta))
177 for _, v := range sr.Describe.Meta {
178 a.domainBlobs[v.BlobRef] = true
179 }
180 a.domainBlobsRefresh = time.Now()
181 return nil
182 }
183
184
185
186
187 func (a *Handler) handleSearch(w http.ResponseWriter, r *http.Request) {
188 if r.Method != http.MethodPost {
189 camhttputil.BadRequestError(w, camhttputil.InvalidMethodError{}.Error())
190 return
191 }
192 if a.sh == nil {
193 http.Error(w, "app proxy has no search handler", 500)
194 return
195 }
196 a.masterQueryMu.RLock()
197 if a.masterQuery == nil {
198 http.Error(w, "search is not allowed", http.StatusForbidden)
199 a.masterQueryMu.RUnlock()
200 return
201 }
202 a.masterQueryMu.RUnlock()
203 var sq search.SearchQuery
204 if err := sq.FromHTTP(r); err != nil {
205 camhttputil.ServeJSONError(w, err)
206 return
207 }
208 sr, err := a.sh.Query(r.Context(), &sq)
209 if err != nil {
210 camhttputil.ServeJSONError(w, err)
211 return
212 }
213
214 if !a.allowProxySearchResponse(sr) {
215
216
217 if err := a.refreshDomainBlobs(); err != nil {
218 http.Error(w, "search scope is forbidden", http.StatusForbidden)
219 return
220 }
221 if !a.allowProxySearchResponse(sr) {
222 http.Error(w, "search scope is forbidden", http.StatusForbidden)
223 return
224 }
225 }
226 camhttputil.ReturnJSON(w, sr)
227 }
228
229
230
231
232 func (a *Handler) allowProxySearchResponse(sr *search.SearchResult) bool {
233 a.masterQueryMu.RLock()
234 defer a.masterQueryMu.RUnlock()
235 for _, v := range sr.Blobs {
236 if _, ok := a.domainBlobs[v.Blob]; !ok {
237 return false
238 }
239 }
240 return true
241 }
242
243
244 func randListen(listenAddr string) (string, error) {
245 return randListenFn(listenAddr, netutil.RandPort)
246 }
247
248
249
250 func randListenFn(listenAddr string, randPortFn func() (int, error)) (string, error) {
251 portIdx := strings.LastIndex(listenAddr, ":") + 1
252 if portIdx <= 0 || portIdx >= len(listenAddr) {
253 return "", errors.New("invalid listen addr, no port found")
254 }
255 port, err := randPortFn()
256 if err != nil {
257 return "", err
258 }
259 return fmt.Sprintf("%s%d", listenAddr[:portIdx], port), nil
260 }
261
262 var portMap = map[string]string{
263 "http": "80",
264 "https": "443",
265 }
266
267
268
269 func baseURL(serverBaseURL, listenAddr string) (string, error) {
270 backendURL, err := url.Parse(serverBaseURL)
271 if err != nil {
272 return "", fmt.Errorf("invalid baseURL %q: %v", serverBaseURL, err)
273 }
274 scheme := backendURL.Scheme
275 host := backendURL.Host
276 if netutil.HasPort(host) {
277 host = host[:strings.LastIndex(host, ":")]
278 }
279 port := portMap[scheme]
280 if netutil.HasPort(listenAddr) {
281 port = listenAddr[strings.LastIndex(listenAddr, ":")+1:]
282 }
283 return fmt.Sprintf("%s://%s:%s/", scheme, host, port), nil
284 }
285
286
287
288
289
290
291
292
293
294 type HandlerConfig struct {
295
296
297 Program string `json:"program"`
298
299
300
301
302 Prefix string `json:"prefix,omitempty"`
303
304
305
306
307
308 Listen string `json:"listen,omitempty"`
309
310
311
312 ServerListen string `json:"serverListen,omitempty"`
313
314
315
316
317
318
319 BackendURL string `json:"backendURL,omitempty"`
320
321
322
323 ServerBaseURL string `json:"serverBaseURL,omitempty"`
324
325
326
327
328 APIHost string `json:"apiHost,omitempty"`
329
330
331
332 AppConfig jsonconfig.Obj
333 }
334
335
336
337 func FromJSONConfig(config jsonconfig.Obj, prefix, serverBaseURL string) (HandlerConfig, error) {
338 hc := HandlerConfig{
339 Program: config.RequiredString("program"),
340 Prefix: config.OptionalString("prefix", prefix),
341 BackendURL: config.OptionalString("backendURL", ""),
342 Listen: config.OptionalString("listen", ""),
343 APIHost: config.OptionalString("apiHost", ""),
344 ServerListen: config.OptionalString("serverListen", ""),
345 ServerBaseURL: config.OptionalString("serverBaseURL", serverBaseURL),
346 AppConfig: config.OptionalObject("appConfig"),
347 }
348 if err := config.Validate(); err != nil {
349 return HandlerConfig{}, err
350 }
351 return hc, nil
352 }
353
354
355
356
357
358 func NewHandler(cfg HandlerConfig) (*Handler, error) {
359 if cfg.Program == "" {
360 return nil, fmt.Errorf("app: could not initialize Handler: empty Program")
361 }
362 name := cfg.Program
363
364 if cfg.Prefix == "" {
365 return nil, fmt.Errorf("app: could not initialize Handler for %q: empty Prefix", name)
366 }
367
368 listen, backendURL, apiHost := cfg.Listen, cfg.BackendURL, cfg.APIHost
369 var err error
370 if listen == "" {
371 serverListen := cfg.ServerListen
372 if serverListen == "" {
373 if cfg.ServerBaseURL == "" {
374 return nil, fmt.Errorf(`app: could not initialize Handler for %q: "Listen", "ServerListen" and "ServerBaseURL" all undefined`, name)
375 }
376 parsedUrl, err := url.Parse(cfg.ServerBaseURL)
377 if err != nil {
378 return nil, fmt.Errorf("app: could not initialize Handler for %q: unparseable ServerBaseURL %q: %v", name, cfg.ServerBaseURL, err)
379 }
380 serverListen = parsedUrl.Host
381 }
382 listen, err = randListen(serverListen)
383 if err != nil {
384 return nil, err
385 }
386 }
387 if backendURL == "" {
388 if cfg.ServerBaseURL == "" {
389 return nil, fmt.Errorf(`app: could not initialize Handler for %q: neither "BackendURL" or "ServerBaseURL" defined`, name)
390 }
391 backendURL, err = baseURL(cfg.ServerBaseURL, listen)
392 if err != nil {
393 return nil, err
394 }
395 }
396 if apiHost == "" {
397 if cfg.ServerBaseURL == "" {
398 return nil, fmt.Errorf(`app: could not initialize Handler for %q: neither "APIHost" or "ServerBaseURL" defined`, name)
399 }
400 apiHost = cfg.ServerBaseURL + "/"
401 }
402
403 proxyURL, err := url.Parse(backendURL)
404 if err != nil {
405 return nil, fmt.Errorf("could not parse backendURL %q: %v", backendURL, err)
406 }
407
408 username, password := auth.RandToken(20), auth.RandToken(20)
409 camliAuth := username + ":" + password
410 basicAuth := auth.NewBasicAuth(username, password)
411 envVars := map[string]string{
412 "CAMLI_API_HOST": apiHost,
413 "CAMLI_AUTH": camliAuth,
414 "CAMLI_APP_LISTEN": listen,
415 }
416 var configURLPath string
417 if cfg.AppConfig != nil {
418 configURLPath = cfg.Prefix + "config.json"
419 envVars["CAMLI_APP_CONFIG_URL"] = apiHost + strings.TrimPrefix(configURLPath, "/")
420 }
421 masterqueryURLPath := cfg.Prefix + "masterquery"
422 envVars["CAMLI_APP_MASTERQUERY_URL"] = apiHost + strings.TrimPrefix(masterqueryURLPath, "/")
423
424 return &Handler{
425 name: name,
426 envVars: envVars,
427 auth: basicAuth,
428 appConfig: cfg.AppConfig,
429 prefix: strings.TrimSuffix(cfg.Prefix, "/"),
430 proxy: httputil.NewSingleHostReverseProxy(proxyURL),
431 backendURL: backendURL,
432 configURLPath: configURLPath,
433 masterqueryURLPath: masterqueryURLPath,
434 }, nil
435 }
436
437
438
439 func (a *Handler) InitHandler(hl blobserver.FindHandlerByTyper) error {
440 apName := a.ProgramName()
441 searchPrefix, _, err := hl.FindHandlerByType("search")
442 if err != nil {
443 return fmt.Errorf("No search handler configured, which is necessary for the %v app handler", apName)
444 }
445 var sh *search.Handler
446 _, hi := hl.AllHandlers()
447 h, ok := hi[searchPrefix]
448 if !ok {
449 return fmt.Errorf("failed to find the \"search\" handler for %v", apName)
450 }
451 sh = h.(*search.Handler)
452 a.sh = sh
453 return nil
454 }
455
456 func (a *Handler) Start() error {
457 name := a.name
458 if name == "" {
459 return fmt.Errorf("invalid app name: %q", name)
460 }
461 var binPath string
462 var err error
463 if e := os.Getenv("CAMLI_APP_BINDIR"); e != "" {
464 binPath, err = exec.LookPath(filepath.Join(e, name))
465 if err != nil {
466 log.Printf("%q executable not found in %q", name, e)
467 }
468 }
469 if binPath == "" || err != nil {
470 binPath, err = exec.LookPath(name)
471 if err != nil {
472 return fmt.Errorf("%q executable not found in PATH", name)
473 }
474 }
475
476 cmd := exec.Command(binPath)
477 cmd.Stdout = os.Stdout
478 cmd.Stderr = os.Stderr
479
480 newVars := make(map[string]string, len(a.envVars))
481 for k, v := range a.envVars {
482 newVars[k+"="] = v
483 }
484 env := os.Environ()
485 for pos, oldkv := range env {
486 for k, newVal := range newVars {
487 if strings.HasPrefix(oldkv, k) {
488 env[pos] = k + newVal
489 delete(newVars, k)
490 break
491 }
492 }
493 }
494 for k, v := range newVars {
495 env = append(env, k+v)
496 }
497 cmd.Env = env
498 if err := cmd.Start(); err != nil {
499 return fmt.Errorf("could not start app %v: %v", name, err)
500 }
501 a.process = cmd.Process
502 return nil
503 }
504
505
506
507 func (a *Handler) ProgramName() string {
508 return a.name
509 }
510
511
512
513
514 func (a *Handler) AuthMode() auth.AuthMode {
515 return a.auth
516 }
517
518
519
520 func (a *Handler) AppConfig() map[string]interface{} {
521 return a.appConfig
522 }
523
524
525 func (a *Handler) BackendURL() string {
526 return a.backendURL
527 }
528
529 var errProcessTookTooLong = errors.New("process took too long to quit")
530
531
532
533 func (a *Handler) Quit() error {
534 err := a.process.Signal(os.Interrupt)
535 if err != nil {
536 return err
537 }
538
539 c := make(chan error)
540 go func() {
541 _, err := a.process.Wait()
542 c <- err
543 }()
544 select {
545 case err = <-c:
546 case <-time.After(5 * time.Second):
547
548 err = errProcessTookTooLong
549 }
550 return err
551 }