Home Download Docs Code Community
     1	/*
     2	Copyright 2013 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	package search
    18	
    19	import (
    20		"bytes"
    21		"context"
    22		"encoding/json"
    23		"log"
    24		"net/http"
    25		"os"
    26		"strconv"
    27		"sync"
    28		"time"
    29	
    30		"github.com/gorilla/websocket"
    31	)
    32	
    33	const (
    34		// Time allowed to write a message to the peer.
    35		writeWait = 10 * time.Second
    36	
    37		// Time allowed to read the next pong message from the peer.
    38		pongWait = 60 * time.Second
    39	
    40		// Send pings to peer with this period. Must be less than pongWait.
    41		pingPeriod = (pongWait * 9) / 10
    42	
    43		// Maximum message size allowed from peer.
    44		maxMessageSize = 10 << 10
    45	)
    46	
    47	var debug, _ = strconv.ParseBool(os.Getenv("CAMLI_DEBUG"))
    48	
    49	type wsHub struct {
    50		sh             *Handler
    51		register       chan *wsConn
    52		unregister     chan *wsConn
    53		watchReq       chan watchReq
    54		newBlobRecv    chan string // new blob received. string is camliType.
    55		updatedResults chan *watchedQuery
    56		statusUpdate   chan json.RawMessage
    57	
    58		// Owned by func run:
    59		conns map[*wsConn]bool
    60	}
    61	
    62	func newWebsocketHub(sh *Handler) *wsHub {
    63		return &wsHub{
    64			sh:             sh,
    65			register:       make(chan *wsConn), // unbuffered; issue 563
    66			unregister:     make(chan *wsConn), // unbuffered; issue 563
    67			conns:          make(map[*wsConn]bool),
    68			watchReq:       make(chan watchReq, buffered),
    69			newBlobRecv:    make(chan string, buffered),
    70			updatedResults: make(chan *watchedQuery, buffered),
    71			statusUpdate:   make(chan json.RawMessage, buffered),
    72		}
    73	}
    74	
    75	func (h *wsHub) run() {
    76		var lastStatusMsg []byte
    77		for {
    78			select {
    79			case st := <-h.statusUpdate:
    80				const prefix = `{"tag":"_status","status":`
    81				lastStatusMsg = make([]byte, 0, len(prefix)+len(st)+1)
    82				lastStatusMsg = append(lastStatusMsg, prefix...)
    83				lastStatusMsg = append(lastStatusMsg, st...)
    84				lastStatusMsg = append(lastStatusMsg, '}')
    85				for c := range h.conns {
    86					c.send <- lastStatusMsg
    87				}
    88			case c := <-h.register:
    89				h.conns[c] = true
    90				c.send <- lastStatusMsg
    91			case c := <-h.unregister:
    92				delete(h.conns, c)
    93				close(c.send)
    94			case camliType := <-h.newBlobRecv:
    95				if camliType == "" {
    96					// TODO: something smarter. some
    97					// queries might care about all blobs.
    98					// But for now only re-kick off
    99					// queries if schema blobs arrive.  We
   100					// should track per-WatchdQuery which
   101					// blob types the search cares about.
   102					continue
   103				}
   104				// New blob was received. Kick off standing search queries to see if any changed.
   105				for conn := range h.conns {
   106					for _, wq := range conn.queries {
   107						go h.redoSearch(wq)
   108					}
   109				}
   110			case wr := <-h.watchReq:
   111				// Unsubscribe
   112				if wr.q == nil {
   113					delete(wr.conn.queries, wr.tag)
   114					log.Printf("Removed subscription for %v, %q", wr.conn, wr.tag)
   115					continue
   116				}
   117				// Very similar type, but semantically
   118				// different, so separate for now:
   119				wq := &watchedQuery{
   120					conn: wr.conn,
   121					tag:  wr.tag,
   122					q:    wr.q,
   123				}
   124				wr.conn.queries[wr.tag] = wq
   125				if debug {
   126					log.Printf("websocket: added/updated search subscription for tag %q", wr.tag)
   127				}
   128				go h.doSearch(wq)
   129	
   130			case wq := <-h.updatedResults:
   131				if !h.conns[wq.conn] || wq.conn.queries[wq.tag] == nil {
   132					// Client has since disconnected or unsubscribed.
   133					continue
   134				}
   135				wq.mu.Lock()
   136				lastres := wq.lastres
   137				wq.mu.Unlock()
   138				resb, err := json.Marshal(wsUpdateMessage{
   139					Tag:    wq.tag,
   140					Result: lastres,
   141				})
   142				if err != nil {
   143					panic(err)
   144				}
   145				wq.conn.send <- resb
   146			}
   147		}
   148	}
   149	
   150	// redoSearch is called (in its own goroutine) when a new schema blob
   151	// arrives to note that wq might now have new results and we should
   152	// re-run it.  But because a search can take awhile, don't run more
   153	// than one refresh at a time.
   154	func (h *wsHub) redoSearch(wq *watchedQuery) {
   155		wq.mu.Lock()
   156		defer wq.mu.Unlock()
   157		wq.dirty = true
   158		if wq.refreshing {
   159			// Somebody else is already refreshing.
   160			// One's enough.
   161			return
   162		}
   163		for wq.dirty {
   164			wq.refreshing = true
   165			wq.dirty = false
   166			wq.mu.Unlock() // release lock while running query; might become dirty meanwhile
   167			h.doSearch(wq)
   168			wq.mu.Lock() // before checking wq.dirty
   169		}
   170		wq.refreshing = false
   171	}
   172	
   173	func (h *wsHub) doSearch(wq *watchedQuery) {
   174		// Make our own copy, in case
   175		q := new(SearchQuery)
   176		*q = *wq.q // shallow copy, since Query will mutate its internal state fields
   177		if q.Describe != nil {
   178			q.Describe = new(DescribeRequest)
   179			*q.Describe = *wq.q.Describe
   180		}
   181	
   182		res, err := h.sh.Query(context.TODO(), q)
   183		if err != nil {
   184			log.Printf("Query error: %v", err)
   185			return
   186		}
   187		resj, _ := json.Marshal(res)
   188	
   189		wq.mu.Lock()
   190		eq := bytes.Equal(wq.lastresj, resj)
   191		wq.lastres = res
   192		wq.lastresj = resj
   193		wq.mu.Unlock()
   194		if eq {
   195			// No change in search. Ignore.
   196			return
   197		}
   198		h.updatedResults <- wq
   199	}
   200	
   201	type wsConn struct {
   202		ws   *websocket.Conn
   203		send chan []byte // Buffered channel of outbound messages.
   204		sh   *Handler
   205	
   206		// queries is owned by the wsHub.run goroutine.
   207		queries map[string]*watchedQuery // tag -> subscription
   208	}
   209	
   210	type watchedQuery struct {
   211		conn *wsConn
   212		tag  string
   213		q    *SearchQuery
   214	
   215		mu         sync.Mutex // guards following
   216		refreshing bool       // search is currently running
   217		dirty      bool       // new schema blob arrived while refreshing; another refresh due
   218		lastres    *SearchResult
   219		lastresj   []byte // as JSON
   220	}
   221	
   222	// watchReq is a (un)subscribe request.
   223	type watchReq struct {
   224		conn *wsConn
   225		tag  string       // required
   226		q    *SearchQuery // if nil, subscribe
   227	}
   228	
   229	// Client->Server subscription message.
   230	type wsClientMessage struct {
   231		// Tag is required.
   232		Tag string `json:"tag"`
   233		// Query is required to subscribe. If absent, it means unsubscribe.
   234		Query *SearchQuery `json:"query,omitempty"`
   235	}
   236	
   237	type wsUpdateMessage struct {
   238		Tag    string        `json:"tag"`
   239		Result *SearchResult `json:"result,omitempty"`
   240	}
   241	
   242	// readPump pumps messages from the websocket connection to the hub.
   243	func (c *wsConn) readPump() {
   244		defer func() {
   245			c.sh.wsHub.unregister <- c
   246			c.ws.Close()
   247		}()
   248		c.ws.SetReadLimit(maxMessageSize)
   249		c.ws.SetReadDeadline(time.Now().Add(pongWait))
   250		c.ws.SetPongHandler(func(string) error { c.ws.SetReadDeadline(time.Now().Add(pongWait)); return nil })
   251		for {
   252			_, message, err := c.ws.ReadMessage()
   253			if err != nil {
   254				break
   255			}
   256			if debug {
   257				log.Printf("websocket: got message %#q", message)
   258			}
   259			cm := new(wsClientMessage)
   260			if err := json.Unmarshal(message, cm); err != nil {
   261				log.Printf("Ignoring bogus websocket message. Err: %v", err)
   262				continue
   263			}
   264			c.sh.wsHub.watchReq <- watchReq{
   265				conn: c,
   266				tag:  cm.Tag,
   267				q:    cm.Query,
   268			}
   269		}
   270	}
   271	
   272	// write writes a message with the given message type and payload.
   273	func (c *wsConn) write(mt int, payload []byte) error {
   274		c.ws.SetWriteDeadline(time.Now().Add(writeWait))
   275		return c.ws.WriteMessage(mt, payload)
   276	}
   277	
   278	// writePump pumps messages from the hub to the websocket connection.
   279	func (c *wsConn) writePump() {
   280		ticker := time.NewTicker(pingPeriod)
   281		defer func() {
   282			ticker.Stop()
   283			c.ws.Close()
   284		}()
   285		for {
   286			select {
   287			case message, ok := <-c.send:
   288				if !ok {
   289					c.write(websocket.CloseMessage, []byte{})
   290					return
   291				}
   292				if err := c.write(websocket.TextMessage, message); err != nil {
   293					return
   294				}
   295			case <-ticker.C:
   296				if err := c.write(websocket.PingMessage, []byte{}); err != nil {
   297					return
   298				}
   299			}
   300		}
   301	}
   302	
   303	// upgrader is used in serveWebSocket to construct websocket connections.
   304	var upgrader = websocket.Upgrader{
   305		ReadBufferSize:  1024,
   306		WriteBufferSize: 1024,
   307		// uses a default origin check policy
   308	}
   309	
   310	func (sh *Handler) serveWebSocket(rw http.ResponseWriter, req *http.Request) {
   311		ws, err := upgrader.Upgrade(rw, req, nil)
   312		if _, ok := err.(websocket.HandshakeError); ok {
   313			http.Error(rw, "Not a websocket handshake", 400)
   314			return
   315		} else if err != nil {
   316			log.Println(err)
   317			return
   318		}
   319		c := &wsConn{
   320			ws:      ws,
   321			send:    make(chan []byte, 256),
   322			sh:      sh,
   323			queries: make(map[string]*watchedQuery),
   324		}
   325		sh.wsHub.register <- c
   326		go c.writePump()
   327		c.readPump()
   328	}
Website layout inspired by memcached.
Content by the authors.