diff --git a/internal_packages/worker-sync/lib/delta-streaming-connection.es6 b/internal_packages/worker-sync/lib/delta-streaming-connection.es6 index d01f4b469..f52c11b94 100644 --- a/internal_packages/worker-sync/lib/delta-streaming-connection.es6 +++ b/internal_packages/worker-sync/lib/delta-streaming-connection.es6 @@ -7,7 +7,7 @@ const {Status} = NylasLongConnection class DeltaStreamingConnection extends NylasLongConnection { constructor(api, accountId, opts = {}) { - opts.debounceInterval = 1000 + opts.debounceResultsInterval = 1000 opts.closeIfDataStopsInterval = 15 * 1000 super(api, accountId, opts) diff --git a/src/flux/nylas-long-connection.es6 b/src/flux/nylas-long-connection.es6 index 3f7615d19..af66fbfa4 100644 --- a/src/flux/nylas-long-connection.es6 +++ b/src/flux/nylas-long-connection.es6 @@ -6,7 +6,7 @@ import {IdentityStore} from 'nylas-exports' const CONNECTION_TIMEOUT = 60 * 60 * 1000 -const PROCESS_RESULTS_THROTTLE = 400 +const PROCESS_BUFFER_THROTTLE = 400 const Status = { None: 'none', Connecting: 'connecting', @@ -24,7 +24,7 @@ class NylasLongConnection { timeout, onError, onStatusChanged, - debounceInterval, + debounceResultsInterval, closeIfDataStopsInterval, } = opts @@ -43,20 +43,20 @@ class NylasLongConnection { this._timeout = timeout || CONNECTION_TIMEOUT this._onError = onError || (() => {}) this._onStatusChanged = onStatusChanged || (() => {}) - this._debounceInterval = debounceInterval || null this._closeIfDataStopsInterval = closeIfDataStopsInterval + this._processBufferThrottled = _.throttle(this._processBuffer, PROCESS_BUFFER_THROTTLE, {leading: false}) this._flushResultsSoon = () => { if (this._results.length === 0) { return } this._emitter.emit('results-stopped-arriving', this._results); this._results = [] } - if (this._debounceInterval !== null) { - this._flushResultsSoon = _.debounce(this._flushResultsSoon, this._debounceInterval) + if (debounceResultsInterval != null) { + this._flushResultsSoon = _.debounce(this._flushResultsSoon, debounceResultsInterval) } } - _processBufffer = _.throttle(() => { + _processBuffer() { const bufferJSONs = this._buffer.split('\n') // We can't parse the last block - we don't know whether we've @@ -77,7 +77,7 @@ class NylasLongConnection { } }) this._flushResultsSoon() - }, PROCESS_RESULTS_THROTTLE, {leading: false}) + } get accountId() { return this._accountId; @@ -139,7 +139,7 @@ class NylasLongConnection { responseStream.setEncoding('utf8') responseStream.on('close', () => this.close()) - responseStream.on('end', () => this.close()) + responseStream.on('end', () => this.end()) responseStream.on('data', (chunk) => { this.closeIfDataStops() // Ignore redundant newlines sent as pings. Want to avoid @@ -148,7 +148,7 @@ class NylasLongConnection { return } this._buffer += chunk - this._processBufffer() + this._processBufferThrottled() }) }) this._req.setTimeout(60 * 60 * 1000) @@ -180,6 +180,9 @@ class NylasLongConnection { } dispose(status) { + // Process the buffer one last time before disposing of the connection + // in case there is any data left that we haven't processed + this._processBuffer() if (this._status !== status) { this.setStatus(status) }