diff --git a/src/flux/nylas-api.coffee b/src/flux/nylas-api.coffee index bd7ac7bf5..b5ccdca2f 100644 --- a/src/flux/nylas-api.coffee +++ b/src/flux/nylas-api.coffee @@ -1,6 +1,7 @@ _ = require 'underscore' {remote} = require 'electron' request = require 'request' +NylasLongConnection = require './nylas-long-connection' Utils = require './models/utils' Account = require './models/account' Message = require './models/message' @@ -206,6 +207,14 @@ class NylasAPI req = new NylasAPIRequest(@, options) req.run().then(success, error) + longConnection: (opts) -> + connection = new NylasLongConnection(@, opts.accountId, opts) + connection.onResults(opts.onResults) + return connection + + startLongConnection: (opts) -> + @longConnection(opts).start() + # If we make a request that `returnsModel` and we get a 404, we want to handle # it intelligently and in a centralized way. This method identifies the object # that could not be found and purges it from local cache. diff --git a/src/flux/nylas-long-connection.es6 b/src/flux/nylas-long-connection.es6 new file mode 100644 index 000000000..b09c1eccd --- /dev/null +++ b/src/flux/nylas-long-connection.es6 @@ -0,0 +1,201 @@ +import _ from 'underscore' +import url from 'url' +import {Emitter} from 'event-kit' + + +class NylasLongConnection { + + static Status = { + None: 'none', + Connecting: 'connecting', + Connected: 'connected', + Closed: 'closed', // Socket has been closed for any reason + Ended: 'ended', // We have received 'end()' and will never open again. + } + + constructor(api, accountId, {path, debounceInterval, throttleInterval, closeIfDataStopsTimeout, onStatusChanged} = {}) { + this._api = api + this._accountId = accountId + this._status = NylasLongConnection.Status.None + this._req = null + this._pingTimeout = null + this._emitter = new Emitter() + this._buffer = '' + this._results = [] + + // Options + this._path = path + this._debounceInterval = debounceInterval + this._throttleInterval = throttleInterval || 400 + this._closeIfDataStopsTimeout = closeIfDataStopsTimeout || (15 * 1000) + this._onStatusChanged = onStatusChanged || () => {} + + + this._resultsReceived = () => { + if (this._results.length === 0) { + return + } + this._emitter.emit('results-stopped-arriving', this._results); + this._results = [] + } + if (this._debounceInterval != null) { + this._resultsReceived = _.debounce(this._resultsReceived, this._debounceInterval) + } + return this + } + + get accountId() { + return this._accountId; + } + + get status() { + return this._status; + } + + setStatus(status) { + if (this._status === status) { + return + } + this._status = status + this._onStatusChanged(this, status) + } + + onResults(callback) { + this._emitter.on('results-stopped-arriving', callback) + } + + processBuffer = () => { + const bufferJSONs = this._buffer.split('\n') + + // We can't parse the last block - we don't know whether we've + // received the entire result or only part of it. Wait + // until we have more. + this._buffer = bufferJSONs.pop() + + bufferJSONs.forEach((resultJSON) => { + if (resultJSON.length === 0) { + return + } + let result = null + try { + result = JSON.parse(resultJSON) + if (result) { + this._results.push(result) + } + } catch (e) { + console.error(`${resultJSON} could not be parsed as JSON.`, e) + } + }) + this._resultsReceived() + } + + start() { + const isValidStatus = ( + [NylasLongConnection.Status.None, NylasLongConnection.Status.Closed].includes(this._status) + ) + if (!isValidStatus) { + return this; + } + + const token = this._api.accessTokenForAccountId(this._accountId) + if (!token || this._req) { + return null; + } + + const options = url.parse(`${this._api.APIRoot}${this._path}`) + options.auth = `${token}:` + + let lib; + if (this._api.APIRoot.indexOf('https') === -1) { + lib = require('http') + } else { + options.port = 443 + lib = require('https') + } + + const processBufferThrottled = _.throttle(this.processBuffer, this._throttleInterval, {leading: false}) + this._req = lib.request(options, (responseStream) => { + if (responseStream.statusCode !== 200) { + responseStream.on('data', () => { + this.close() + }) + return; + } + + responseStream.setEncoding('utf8') + responseStream.on('close', () => { + this.close() + }) + responseStream.on('data', (chunk) => { + this.closeIfDataStops() + + // Ignore redundant newlines sent as pings. Want to avoid + // calls to @onProcessBuffer that contain no actual updates + if (chunk === '\n' && (this._buffer.length === 0 || this._buffer[-1] === '\n')) { + return + } + this._buffer += chunk + processBufferThrottled() + }) + }) + this._req.setTimeout(60 * 60 * 1000) + this._req.setSocketKeepAlive(true) + this._req.on('error', () => this.close()) + this._req.on('socket', (socket) => { + this.setStatus(NylasLongConnection.Status.Connecting) + socket.on('connect', () => { + this.setStatus(NylasLongConnection.Status.Connected) + this.closeIfDataStops() + }) + }) + this._req.write("1") + return this + } + + cleanup() { + if (this._pingTimeout) { + clearTimeout(this._pingTimeout) + } + this._pingTimeout = null + this._buffer = '' + if (this._req) { + this._req.end() + this._req.abort() + this._req = null + } + return this + } + + isClosed() { + return [ + NylasLongConnection.Status.None, + NylasLongConnection.Status.Closed, + NylasLongConnection.Status.Ended, + ].includes(this._status) + } + + close() { + if (this._status === NylasLongConnection.Status.Closed) { + return + } + this.setStatus(NylasLongConnection.Status.Closed) + this.cleanup() + } + + closeIfDataStops() { + if (this._pingTimeout) { + clearTimeout(this._pingTimeout) + } + this._pingTimeout = setTimeout(() => { + this._pingTimeout = null + this.close() + }, this._closeIfDataStopsTimeout) + } + + end() { + this.setStatus(NylasLongConnection.Status.Ended) + this.cleanup() + } +} + +export default NylasLongConnection diff --git a/src/search-query-subscription.coffee b/src/search-query-subscription.coffee index 6c8f4abac..9cca5964d 100644 --- a/src/search-query-subscription.coffee +++ b/src/search-query-subscription.coffee @@ -13,6 +13,7 @@ class SearchQuerySubscription extends MutableQuerySubscription constructor: (@_searchQuery, @_accountIds) -> super(null, {asResultSet: true}) @_searchQueryVersion = 0 + @_connections = [] _.defer => @performSearch() searchQuery: => @@ -42,64 +43,39 @@ class SearchQuerySubscription extends MutableQuerySubscription performRemoteSearch: (idx) => searchQueryVersion = @_searchQueryVersion += 1 - resultCount = 0 + accountsSearched = new Set() resultIds = [] - resultReturned = => + resultsReturned = => # Don't emit a "result" until we have at least one thread to display. # Otherwise it will show "No Results Found" - if resultIds.length > 0 or resultCount is @_accountIds.length + if resultIds.length > 0 or accountsSearched.size is @_accountIds.length if @_set?.ids().length > 0 currentResultIds = @_set.ids() resultIds = _.uniq(currentResultIds.concat(resultIds)) dbQuery = DatabaseStore.findAll(Thread).where(id: resultIds).order(Thread.attributes.lastMessageReceivedTimestamp.descending()) @replaceQuery(dbQuery) - @_accountsFailed = [] - @_updateSearchError() + @_connections = @_accountIds.map (accId) => + NylasAPI.startLongConnection + accountId: accId + path: "/threads/search/streaming?q=#{encodeURIComponent(@_searchQuery)}" + onResults: (results) => + threads = results[0] + return unless @_searchQueryVersion is searchQueryVersion + resultIds = resultIds.concat _.pluck(threads, 'id') + resultsReturned() + onStatusChanged: (conn) => + if conn.isClosed() + accountsSearched.add(accId) + resultsReturned() - @_accountIds.forEach (aid) => - NylasAPI.makeRequest - method: 'GET' - path: "/threads/search?q=#{encodeURIComponent(@_searchQuery)}" - accountId: aid - json: true - timeout: 45000 - returnsModel: true - .then (threads) => - return unless @_searchQueryVersion is searchQueryVersion - resultCount += 1 - resultIds = resultIds.concat _.pluck(threads, 'id') - resultReturned() + closeConnections: => + @_connections.forEach((conn) => conn.end()) - .catch (err) => - account = AccountStore.accountForId(aid) - if account - @_accountsFailed.push("#{account.emailAddress}: #{err.message}") - @_updateSearchError() - resultCount += 1 - resultReturned() - - _updateSearchError: => - # Do not fire an action to display a notification if no-one is subscribed - # to our result set anymore. - return if @callbackCount() is 0 - - if @_accountsFailed.length is 0 - Actions.dismissNotificationsMatching({tag: 'search-error'}) - else - Actions.postNotification - type: 'error' - tag: 'search-error' - sticky: true - message: "Search failed for one or more accounts (#{@_accountsFailed.join(', ')}). Please try again.", - icon: 'fa-search-minus' - actions: [{ - default: true - dismisses: true - label: 'Dismiss' - id: 'search-error:dismiss' - }] + removeCallback: => + super() + @closeConnections() if @callbackCount() is 0 module.exports = SearchQuerySubscription