fix(search/long-conn): Process results buffer before ending connection (#750)

NylasLongConnection ends the connection when the 'end' event is emitted
by the `request` object. When this happens, the global connection buffer is cleared.
Also, the global buffer holds the data we've received from the connection, and
whenever we receive new data, we accumulate it in the buffer and call a processBuffer function
which is throttled to 400ms.

Given that the buffer is global state, and processing occurs
asynchronously with a delay of up to 400ms, if the 'end' event on the connection is
fired before we actually get to process the buffer, we would clear it and show no results.

This scenario currently only affected search because if we accidentally
threw away some data when streaming deltas, we will get that data again
when we reopen the delta streaming connection.
This commit is contained in:
Juan Tejada 2016-09-22 11:57:00 -07:00
parent 00787c06c7
commit 3dccb374b3
2 changed files with 13 additions and 10 deletions

View file

@ -7,7 +7,7 @@ const {Status} = NylasLongConnection
class DeltaStreamingConnection extends NylasLongConnection {
constructor(api, accountId, opts = {}) {
opts.debounceInterval = 1000
opts.debounceResultsInterval = 1000
opts.closeIfDataStopsInterval = 15 * 1000
super(api, accountId, opts)

View file

@ -6,7 +6,7 @@ import {IdentityStore} from 'nylas-exports'
const CONNECTION_TIMEOUT = 60 * 60 * 1000
const PROCESS_RESULTS_THROTTLE = 400
const PROCESS_BUFFER_THROTTLE = 400
const Status = {
None: 'none',
Connecting: 'connecting',
@ -24,7 +24,7 @@ class NylasLongConnection {
timeout,
onError,
onStatusChanged,
debounceInterval,
debounceResultsInterval,
closeIfDataStopsInterval,
} = opts
@ -43,20 +43,20 @@ class NylasLongConnection {
this._timeout = timeout || CONNECTION_TIMEOUT
this._onError = onError || (() => {})
this._onStatusChanged = onStatusChanged || (() => {})
this._debounceInterval = debounceInterval || null
this._closeIfDataStopsInterval = closeIfDataStopsInterval
this._processBufferThrottled = _.throttle(this._processBuffer, PROCESS_BUFFER_THROTTLE, {leading: false})
this._flushResultsSoon = () => {
if (this._results.length === 0) { return }
this._emitter.emit('results-stopped-arriving', this._results);
this._results = []
}
if (this._debounceInterval !== null) {
this._flushResultsSoon = _.debounce(this._flushResultsSoon, this._debounceInterval)
if (debounceResultsInterval != null) {
this._flushResultsSoon = _.debounce(this._flushResultsSoon, debounceResultsInterval)
}
}
_processBufffer = _.throttle(() => {
_processBuffer() {
const bufferJSONs = this._buffer.split('\n')
// We can't parse the last block - we don't know whether we've
@ -77,7 +77,7 @@ class NylasLongConnection {
}
})
this._flushResultsSoon()
}, PROCESS_RESULTS_THROTTLE, {leading: false})
}
get accountId() {
return this._accountId;
@ -139,7 +139,7 @@ class NylasLongConnection {
responseStream.setEncoding('utf8')
responseStream.on('close', () => this.close())
responseStream.on('end', () => this.close())
responseStream.on('end', () => this.end())
responseStream.on('data', (chunk) => {
this.closeIfDataStops()
// Ignore redundant newlines sent as pings. Want to avoid
@ -148,7 +148,7 @@ class NylasLongConnection {
return
}
this._buffer += chunk
this._processBufffer()
this._processBufferThrottled()
})
})
this._req.setTimeout(60 * 60 * 1000)
@ -180,6 +180,9 @@ class NylasLongConnection {
}
dispose(status) {
// Process the buffer one last time before disposing of the connection
// in case there is any data left that we haven't processed
this._processBuffer()
if (this._status !== status) {
this.setStatus(status)
}