feat(search-streaming): Add support for new search streaming api

Summary:
- This diff includes code to connect to the new search stremaming API. It converts the old NylasLongConnection into ES6 without any of the cursor management and includes it as part of NylasAPI
- Removes current search error handling because of the new error semantics of the api. If no results are returned a message will be displayed to the user indicating that
- WIP TODO:
  - Replace old NylasLongConnection class with new one and add test coverage.
  I did not replace our  current streaming code with the new ES6 code yet because
  the old code doesn't have test coverage and I don't want to introduce any
  (subtle) regressions to a vital piece of the app.
  - Potentially replace with JSONStreaming library or new http library
  (e.g. fetch)

- Note that Streaming API is not in production yet and only works for Gmail and IMAP (no EAS)

Test Plan: -TODO, manual

Reviewers: bengotow, evan

Reviewed By: evan

Differential Revision: https://phab.nylas.com/D2859
This commit is contained in:
Juan Tejada 2016-04-08 11:46:20 -07:00
parent 3d00837c4a
commit 5d96fb227f
3 changed files with 232 additions and 46 deletions

View file

@ -1,6 +1,7 @@
_ = require 'underscore'
{remote} = require 'electron'
request = require 'request'
NylasLongConnection = require './nylas-long-connection'
Utils = require './models/utils'
Account = require './models/account'
Message = require './models/message'
@ -206,6 +207,14 @@ class NylasAPI
req = new NylasAPIRequest(@, options)
req.run().then(success, error)
longConnection: (opts) ->
connection = new NylasLongConnection(@, opts.accountId, opts)
connection.onResults(opts.onResults)
return connection
startLongConnection: (opts) ->
@longConnection(opts).start()
# If we make a request that `returnsModel` and we get a 404, we want to handle
# it intelligently and in a centralized way. This method identifies the object
# that could not be found and purges it from local cache.

View file

@ -0,0 +1,201 @@
import _ from 'underscore'
import url from 'url'
import {Emitter} from 'event-kit'
class NylasLongConnection {
static Status = {
None: 'none',
Connecting: 'connecting',
Connected: 'connected',
Closed: 'closed', // Socket has been closed for any reason
Ended: 'ended', // We have received 'end()' and will never open again.
}
constructor(api, accountId, {path, debounceInterval, throttleInterval, closeIfDataStopsTimeout, onStatusChanged} = {}) {
this._api = api
this._accountId = accountId
this._status = NylasLongConnection.Status.None
this._req = null
this._pingTimeout = null
this._emitter = new Emitter()
this._buffer = ''
this._results = []
// Options
this._path = path
this._debounceInterval = debounceInterval
this._throttleInterval = throttleInterval || 400
this._closeIfDataStopsTimeout = closeIfDataStopsTimeout || (15 * 1000)
this._onStatusChanged = onStatusChanged || () => {}
this._resultsReceived = () => {
if (this._results.length === 0) {
return
}
this._emitter.emit('results-stopped-arriving', this._results);
this._results = []
}
if (this._debounceInterval != null) {
this._resultsReceived = _.debounce(this._resultsReceived, this._debounceInterval)
}
return this
}
get accountId() {
return this._accountId;
}
get status() {
return this._status;
}
setStatus(status) {
if (this._status === status) {
return
}
this._status = status
this._onStatusChanged(this, status)
}
onResults(callback) {
this._emitter.on('results-stopped-arriving', callback)
}
processBuffer = () => {
const bufferJSONs = this._buffer.split('\n')
// We can't parse the last block - we don't know whether we've
// received the entire result or only part of it. Wait
// until we have more.
this._buffer = bufferJSONs.pop()
bufferJSONs.forEach((resultJSON) => {
if (resultJSON.length === 0) {
return
}
let result = null
try {
result = JSON.parse(resultJSON)
if (result) {
this._results.push(result)
}
} catch (e) {
console.error(`${resultJSON} could not be parsed as JSON.`, e)
}
})
this._resultsReceived()
}
start() {
const isValidStatus = (
[NylasLongConnection.Status.None, NylasLongConnection.Status.Closed].includes(this._status)
)
if (!isValidStatus) {
return this;
}
const token = this._api.accessTokenForAccountId(this._accountId)
if (!token || this._req) {
return null;
}
const options = url.parse(`${this._api.APIRoot}${this._path}`)
options.auth = `${token}:`
let lib;
if (this._api.APIRoot.indexOf('https') === -1) {
lib = require('http')
} else {
options.port = 443
lib = require('https')
}
const processBufferThrottled = _.throttle(this.processBuffer, this._throttleInterval, {leading: false})
this._req = lib.request(options, (responseStream) => {
if (responseStream.statusCode !== 200) {
responseStream.on('data', () => {
this.close()
})
return;
}
responseStream.setEncoding('utf8')
responseStream.on('close', () => {
this.close()
})
responseStream.on('data', (chunk) => {
this.closeIfDataStops()
// Ignore redundant newlines sent as pings. Want to avoid
// calls to @onProcessBuffer that contain no actual updates
if (chunk === '\n' && (this._buffer.length === 0 || this._buffer[-1] === '\n')) {
return
}
this._buffer += chunk
processBufferThrottled()
})
})
this._req.setTimeout(60 * 60 * 1000)
this._req.setSocketKeepAlive(true)
this._req.on('error', () => this.close())
this._req.on('socket', (socket) => {
this.setStatus(NylasLongConnection.Status.Connecting)
socket.on('connect', () => {
this.setStatus(NylasLongConnection.Status.Connected)
this.closeIfDataStops()
})
})
this._req.write("1")
return this
}
cleanup() {
if (this._pingTimeout) {
clearTimeout(this._pingTimeout)
}
this._pingTimeout = null
this._buffer = ''
if (this._req) {
this._req.end()
this._req.abort()
this._req = null
}
return this
}
isClosed() {
return [
NylasLongConnection.Status.None,
NylasLongConnection.Status.Closed,
NylasLongConnection.Status.Ended,
].includes(this._status)
}
close() {
if (this._status === NylasLongConnection.Status.Closed) {
return
}
this.setStatus(NylasLongConnection.Status.Closed)
this.cleanup()
}
closeIfDataStops() {
if (this._pingTimeout) {
clearTimeout(this._pingTimeout)
}
this._pingTimeout = setTimeout(() => {
this._pingTimeout = null
this.close()
}, this._closeIfDataStopsTimeout)
}
end() {
this.setStatus(NylasLongConnection.Status.Ended)
this.cleanup()
}
}
export default NylasLongConnection

View file

@ -13,6 +13,7 @@ class SearchQuerySubscription extends MutableQuerySubscription
constructor: (@_searchQuery, @_accountIds) ->
super(null, {asResultSet: true})
@_searchQueryVersion = 0
@_connections = []
_.defer => @performSearch()
searchQuery: =>
@ -42,64 +43,39 @@ class SearchQuerySubscription extends MutableQuerySubscription
performRemoteSearch: (idx) =>
searchQueryVersion = @_searchQueryVersion += 1
resultCount = 0
accountsSearched = new Set()
resultIds = []
resultReturned = =>
resultsReturned = =>
# Don't emit a "result" until we have at least one thread to display.
# Otherwise it will show "No Results Found"
if resultIds.length > 0 or resultCount is @_accountIds.length
if resultIds.length > 0 or accountsSearched.size is @_accountIds.length
if @_set?.ids().length > 0
currentResultIds = @_set.ids()
resultIds = _.uniq(currentResultIds.concat(resultIds))
dbQuery = DatabaseStore.findAll(Thread).where(id: resultIds).order(Thread.attributes.lastMessageReceivedTimestamp.descending())
@replaceQuery(dbQuery)
@_accountsFailed = []
@_updateSearchError()
@_connections = @_accountIds.map (accId) =>
NylasAPI.startLongConnection
accountId: accId
path: "/threads/search/streaming?q=#{encodeURIComponent(@_searchQuery)}"
onResults: (results) =>
threads = results[0]
return unless @_searchQueryVersion is searchQueryVersion
resultIds = resultIds.concat _.pluck(threads, 'id')
resultsReturned()
onStatusChanged: (conn) =>
if conn.isClosed()
accountsSearched.add(accId)
resultsReturned()
@_accountIds.forEach (aid) =>
NylasAPI.makeRequest
method: 'GET'
path: "/threads/search?q=#{encodeURIComponent(@_searchQuery)}"
accountId: aid
json: true
timeout: 45000
returnsModel: true
.then (threads) =>
return unless @_searchQueryVersion is searchQueryVersion
resultCount += 1
resultIds = resultIds.concat _.pluck(threads, 'id')
resultReturned()
closeConnections: =>
@_connections.forEach((conn) => conn.end())
.catch (err) =>
account = AccountStore.accountForId(aid)
if account
@_accountsFailed.push("#{account.emailAddress}: #{err.message}")
@_updateSearchError()
resultCount += 1
resultReturned()
_updateSearchError: =>
# Do not fire an action to display a notification if no-one is subscribed
# to our result set anymore.
return if @callbackCount() is 0
if @_accountsFailed.length is 0
Actions.dismissNotificationsMatching({tag: 'search-error'})
else
Actions.postNotification
type: 'error'
tag: 'search-error'
sticky: true
message: "Search failed for one or more accounts (#{@_accountsFailed.join(', ')}). Please try again.",
icon: 'fa-search-minus'
actions: [{
default: true
dismisses: true
label: 'Dismiss'
id: 'search-error:dismiss'
}]
removeCallback: =>
super()
@closeConnections() if @callbackCount() is 0
module.exports = SearchQuerySubscription