From 5d96fb227f7a00e612b09be96ed045b74a6854f4 Mon Sep 17 00:00:00 2001 From: Juan Tejada Date: Fri, 8 Apr 2016 11:46:20 -0700 Subject: [PATCH] feat(search-streaming): Add support for new search streaming api Summary: - This diff includes code to connect to the new search stremaming API. It converts the old NylasLongConnection into ES6 without any of the cursor management and includes it as part of NylasAPI - Removes current search error handling because of the new error semantics of the api. If no results are returned a message will be displayed to the user indicating that - WIP TODO: - Replace old NylasLongConnection class with new one and add test coverage. I did not replace our current streaming code with the new ES6 code yet because the old code doesn't have test coverage and I don't want to introduce any (subtle) regressions to a vital piece of the app. - Potentially replace with JSONStreaming library or new http library (e.g. fetch) - Note that Streaming API is not in production yet and only works for Gmail and IMAP (no EAS) Test Plan: -TODO, manual Reviewers: bengotow, evan Reviewed By: evan Differential Revision: https://phab.nylas.com/D2859 --- src/flux/nylas-api.coffee | 9 ++ src/flux/nylas-long-connection.es6 | 201 +++++++++++++++++++++++++++ src/search-query-subscription.coffee | 68 +++------ 3 files changed, 232 insertions(+), 46 deletions(-) create mode 100644 src/flux/nylas-long-connection.es6 diff --git a/src/flux/nylas-api.coffee b/src/flux/nylas-api.coffee index bd7ac7bf5..b5ccdca2f 100644 --- a/src/flux/nylas-api.coffee +++ b/src/flux/nylas-api.coffee @@ -1,6 +1,7 @@ _ = require 'underscore' {remote} = require 'electron' request = require 'request' +NylasLongConnection = require './nylas-long-connection' Utils = require './models/utils' Account = require './models/account' Message = require './models/message' @@ -206,6 +207,14 @@ class NylasAPI req = new NylasAPIRequest(@, options) req.run().then(success, error) + longConnection: (opts) -> + connection = new NylasLongConnection(@, opts.accountId, opts) + connection.onResults(opts.onResults) + return connection + + startLongConnection: (opts) -> + @longConnection(opts).start() + # If we make a request that `returnsModel` and we get a 404, we want to handle # it intelligently and in a centralized way. This method identifies the object # that could not be found and purges it from local cache. diff --git a/src/flux/nylas-long-connection.es6 b/src/flux/nylas-long-connection.es6 new file mode 100644 index 000000000..b09c1eccd --- /dev/null +++ b/src/flux/nylas-long-connection.es6 @@ -0,0 +1,201 @@ +import _ from 'underscore' +import url from 'url' +import {Emitter} from 'event-kit' + + +class NylasLongConnection { + + static Status = { + None: 'none', + Connecting: 'connecting', + Connected: 'connected', + Closed: 'closed', // Socket has been closed for any reason + Ended: 'ended', // We have received 'end()' and will never open again. + } + + constructor(api, accountId, {path, debounceInterval, throttleInterval, closeIfDataStopsTimeout, onStatusChanged} = {}) { + this._api = api + this._accountId = accountId + this._status = NylasLongConnection.Status.None + this._req = null + this._pingTimeout = null + this._emitter = new Emitter() + this._buffer = '' + this._results = [] + + // Options + this._path = path + this._debounceInterval = debounceInterval + this._throttleInterval = throttleInterval || 400 + this._closeIfDataStopsTimeout = closeIfDataStopsTimeout || (15 * 1000) + this._onStatusChanged = onStatusChanged || () => {} + + + this._resultsReceived = () => { + if (this._results.length === 0) { + return + } + this._emitter.emit('results-stopped-arriving', this._results); + this._results = [] + } + if (this._debounceInterval != null) { + this._resultsReceived = _.debounce(this._resultsReceived, this._debounceInterval) + } + return this + } + + get accountId() { + return this._accountId; + } + + get status() { + return this._status; + } + + setStatus(status) { + if (this._status === status) { + return + } + this._status = status + this._onStatusChanged(this, status) + } + + onResults(callback) { + this._emitter.on('results-stopped-arriving', callback) + } + + processBuffer = () => { + const bufferJSONs = this._buffer.split('\n') + + // We can't parse the last block - we don't know whether we've + // received the entire result or only part of it. Wait + // until we have more. + this._buffer = bufferJSONs.pop() + + bufferJSONs.forEach((resultJSON) => { + if (resultJSON.length === 0) { + return + } + let result = null + try { + result = JSON.parse(resultJSON) + if (result) { + this._results.push(result) + } + } catch (e) { + console.error(`${resultJSON} could not be parsed as JSON.`, e) + } + }) + this._resultsReceived() + } + + start() { + const isValidStatus = ( + [NylasLongConnection.Status.None, NylasLongConnection.Status.Closed].includes(this._status) + ) + if (!isValidStatus) { + return this; + } + + const token = this._api.accessTokenForAccountId(this._accountId) + if (!token || this._req) { + return null; + } + + const options = url.parse(`${this._api.APIRoot}${this._path}`) + options.auth = `${token}:` + + let lib; + if (this._api.APIRoot.indexOf('https') === -1) { + lib = require('http') + } else { + options.port = 443 + lib = require('https') + } + + const processBufferThrottled = _.throttle(this.processBuffer, this._throttleInterval, {leading: false}) + this._req = lib.request(options, (responseStream) => { + if (responseStream.statusCode !== 200) { + responseStream.on('data', () => { + this.close() + }) + return; + } + + responseStream.setEncoding('utf8') + responseStream.on('close', () => { + this.close() + }) + responseStream.on('data', (chunk) => { + this.closeIfDataStops() + + // Ignore redundant newlines sent as pings. Want to avoid + // calls to @onProcessBuffer that contain no actual updates + if (chunk === '\n' && (this._buffer.length === 0 || this._buffer[-1] === '\n')) { + return + } + this._buffer += chunk + processBufferThrottled() + }) + }) + this._req.setTimeout(60 * 60 * 1000) + this._req.setSocketKeepAlive(true) + this._req.on('error', () => this.close()) + this._req.on('socket', (socket) => { + this.setStatus(NylasLongConnection.Status.Connecting) + socket.on('connect', () => { + this.setStatus(NylasLongConnection.Status.Connected) + this.closeIfDataStops() + }) + }) + this._req.write("1") + return this + } + + cleanup() { + if (this._pingTimeout) { + clearTimeout(this._pingTimeout) + } + this._pingTimeout = null + this._buffer = '' + if (this._req) { + this._req.end() + this._req.abort() + this._req = null + } + return this + } + + isClosed() { + return [ + NylasLongConnection.Status.None, + NylasLongConnection.Status.Closed, + NylasLongConnection.Status.Ended, + ].includes(this._status) + } + + close() { + if (this._status === NylasLongConnection.Status.Closed) { + return + } + this.setStatus(NylasLongConnection.Status.Closed) + this.cleanup() + } + + closeIfDataStops() { + if (this._pingTimeout) { + clearTimeout(this._pingTimeout) + } + this._pingTimeout = setTimeout(() => { + this._pingTimeout = null + this.close() + }, this._closeIfDataStopsTimeout) + } + + end() { + this.setStatus(NylasLongConnection.Status.Ended) + this.cleanup() + } +} + +export default NylasLongConnection diff --git a/src/search-query-subscription.coffee b/src/search-query-subscription.coffee index 6c8f4abac..9cca5964d 100644 --- a/src/search-query-subscription.coffee +++ b/src/search-query-subscription.coffee @@ -13,6 +13,7 @@ class SearchQuerySubscription extends MutableQuerySubscription constructor: (@_searchQuery, @_accountIds) -> super(null, {asResultSet: true}) @_searchQueryVersion = 0 + @_connections = [] _.defer => @performSearch() searchQuery: => @@ -42,64 +43,39 @@ class SearchQuerySubscription extends MutableQuerySubscription performRemoteSearch: (idx) => searchQueryVersion = @_searchQueryVersion += 1 - resultCount = 0 + accountsSearched = new Set() resultIds = [] - resultReturned = => + resultsReturned = => # Don't emit a "result" until we have at least one thread to display. # Otherwise it will show "No Results Found" - if resultIds.length > 0 or resultCount is @_accountIds.length + if resultIds.length > 0 or accountsSearched.size is @_accountIds.length if @_set?.ids().length > 0 currentResultIds = @_set.ids() resultIds = _.uniq(currentResultIds.concat(resultIds)) dbQuery = DatabaseStore.findAll(Thread).where(id: resultIds).order(Thread.attributes.lastMessageReceivedTimestamp.descending()) @replaceQuery(dbQuery) - @_accountsFailed = [] - @_updateSearchError() + @_connections = @_accountIds.map (accId) => + NylasAPI.startLongConnection + accountId: accId + path: "/threads/search/streaming?q=#{encodeURIComponent(@_searchQuery)}" + onResults: (results) => + threads = results[0] + return unless @_searchQueryVersion is searchQueryVersion + resultIds = resultIds.concat _.pluck(threads, 'id') + resultsReturned() + onStatusChanged: (conn) => + if conn.isClosed() + accountsSearched.add(accId) + resultsReturned() - @_accountIds.forEach (aid) => - NylasAPI.makeRequest - method: 'GET' - path: "/threads/search?q=#{encodeURIComponent(@_searchQuery)}" - accountId: aid - json: true - timeout: 45000 - returnsModel: true - .then (threads) => - return unless @_searchQueryVersion is searchQueryVersion - resultCount += 1 - resultIds = resultIds.concat _.pluck(threads, 'id') - resultReturned() + closeConnections: => + @_connections.forEach((conn) => conn.end()) - .catch (err) => - account = AccountStore.accountForId(aid) - if account - @_accountsFailed.push("#{account.emailAddress}: #{err.message}") - @_updateSearchError() - resultCount += 1 - resultReturned() - - _updateSearchError: => - # Do not fire an action to display a notification if no-one is subscribed - # to our result set anymore. - return if @callbackCount() is 0 - - if @_accountsFailed.length is 0 - Actions.dismissNotificationsMatching({tag: 'search-error'}) - else - Actions.postNotification - type: 'error' - tag: 'search-error' - sticky: true - message: "Search failed for one or more accounts (#{@_accountsFailed.join(', ')}). Please try again.", - icon: 'fa-search-minus' - actions: [{ - default: true - dismisses: true - label: 'Dismiss' - id: 'search-error:dismiss' - }] + removeCallback: => + super() + @closeConnections() if @callbackCount() is 0 module.exports = SearchQuerySubscription