From 3dccb374b3a6c4bc017bb8339ae7e9384bf077e6 Mon Sep 17 00:00:00 2001 From: Juan Tejada Date: Thu, 22 Sep 2016 11:57:00 -0700 Subject: [PATCH] fix(search/long-conn): Process results buffer before ending connection (#750) NylasLongConnection ends the connection when the 'end' event is emitted by the `request` object. When this happens, the global connection buffer is cleared. Also, the global buffer holds the data we've received from the connection, and whenever we receive new data, we accumulate it in the buffer and call a processBuffer function which is throttled to 400ms. Given that the buffer is global state, and processing occurs asynchronously with a delay of up to 400ms, if the 'end' event on the connection is fired before we actually get to process the buffer, we would clear it and show no results. This scenario currently only affected search because if we accidentally threw away some data when streaming deltas, we will get that data again when we reopen the delta streaming connection. --- .../lib/delta-streaming-connection.es6 | 2 +- src/flux/nylas-long-connection.es6 | 21 +++++++++++-------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/internal_packages/worker-sync/lib/delta-streaming-connection.es6 b/internal_packages/worker-sync/lib/delta-streaming-connection.es6 index d01f4b469..f52c11b94 100644 --- a/internal_packages/worker-sync/lib/delta-streaming-connection.es6 +++ b/internal_packages/worker-sync/lib/delta-streaming-connection.es6 @@ -7,7 +7,7 @@ const {Status} = NylasLongConnection class DeltaStreamingConnection extends NylasLongConnection { constructor(api, accountId, opts = {}) { - opts.debounceInterval = 1000 + opts.debounceResultsInterval = 1000 opts.closeIfDataStopsInterval = 15 * 1000 super(api, accountId, opts) diff --git a/src/flux/nylas-long-connection.es6 b/src/flux/nylas-long-connection.es6 index 3f7615d19..af66fbfa4 100644 --- a/src/flux/nylas-long-connection.es6 +++ b/src/flux/nylas-long-connection.es6 @@ -6,7 +6,7 @@ import {IdentityStore} from 'nylas-exports' const CONNECTION_TIMEOUT = 60 * 60 * 1000 -const PROCESS_RESULTS_THROTTLE = 400 +const PROCESS_BUFFER_THROTTLE = 400 const Status = { None: 'none', Connecting: 'connecting', @@ -24,7 +24,7 @@ class NylasLongConnection { timeout, onError, onStatusChanged, - debounceInterval, + debounceResultsInterval, closeIfDataStopsInterval, } = opts @@ -43,20 +43,20 @@ class NylasLongConnection { this._timeout = timeout || CONNECTION_TIMEOUT this._onError = onError || (() => {}) this._onStatusChanged = onStatusChanged || (() => {}) - this._debounceInterval = debounceInterval || null this._closeIfDataStopsInterval = closeIfDataStopsInterval + this._processBufferThrottled = _.throttle(this._processBuffer, PROCESS_BUFFER_THROTTLE, {leading: false}) this._flushResultsSoon = () => { if (this._results.length === 0) { return } this._emitter.emit('results-stopped-arriving', this._results); this._results = [] } - if (this._debounceInterval !== null) { - this._flushResultsSoon = _.debounce(this._flushResultsSoon, this._debounceInterval) + if (debounceResultsInterval != null) { + this._flushResultsSoon = _.debounce(this._flushResultsSoon, debounceResultsInterval) } } - _processBufffer = _.throttle(() => { + _processBuffer() { const bufferJSONs = this._buffer.split('\n') // We can't parse the last block - we don't know whether we've @@ -77,7 +77,7 @@ class NylasLongConnection { } }) this._flushResultsSoon() - }, PROCESS_RESULTS_THROTTLE, {leading: false}) + } get accountId() { return this._accountId; @@ -139,7 +139,7 @@ class NylasLongConnection { responseStream.setEncoding('utf8') responseStream.on('close', () => this.close()) - responseStream.on('end', () => this.close()) + responseStream.on('end', () => this.end()) responseStream.on('data', (chunk) => { this.closeIfDataStops() // Ignore redundant newlines sent as pings. Want to avoid @@ -148,7 +148,7 @@ class NylasLongConnection { return } this._buffer += chunk - this._processBufffer() + this._processBufferThrottled() }) }) this._req.setTimeout(60 * 60 * 1000) @@ -180,6 +180,9 @@ class NylasLongConnection { } dispose(status) { + // Process the buffer one last time before disposing of the connection + // in case there is any data left that we haven't processed + this._processBuffer() if (this._status !== status) { this.setStatus(status) }