diff --git a/internal_packages/worker-sync/lib/delta-streaming-connection.es6 b/internal_packages/worker-sync/lib/delta-streaming-connection.es6 index 925d6e0c4..d01f4b469 100644 --- a/internal_packages/worker-sync/lib/delta-streaming-connection.es6 +++ b/internal_packages/worker-sync/lib/delta-streaming-connection.es6 @@ -1,3 +1,4 @@ +import _ from 'underscore' import {NylasLongConnection, DatabaseStore} from 'nylas-exports' const {Status} = NylasLongConnection @@ -10,20 +11,16 @@ class DeltaStreamingConnection extends NylasLongConnection { opts.closeIfDataStopsInterval = 15 * 1000 super(api, accountId, opts) - const {ready, getCursor, setCursor, setStatus} = opts - this._ready = ready + const {isReady, getCursor, setCursor} = opts + this._isReady = isReady this._getCursor = getCursor this._setCursor = setCursor - // Override super class instance vars - this._onStatusChanged = setStatus - this._onError = (err) => { - if (err.message.indexOf('Invalid cursor') > 0) { - const error = new Error('Delta Connection: Cursor is invalid. Need to blow away local cache.') - NylasEnv.config.unset(`nylas.${this._accountId}.cursor`) - DatabaseStore._handleSetupError(error) - } - } + // Update cursor when deltas received + this.onDeltas((deltas) => { + const last = _.last(deltas) + this._setCursor(last.cursor) + }) } deltaStreamingPath(cursor) { @@ -34,6 +31,14 @@ class DeltaStreamingConnection extends NylasLongConnection { return !!this._getCursor() } + onError(err) { + if (err.message.indexOf('Invalid cursor') > 0) { + const error = new Error('Delta Connection: Cursor is invalid. Need to blow away local cache.') + NylasEnv.config.unset(`nylas.${this._accountId}.cursor`) + DatabaseStore._handleSetupError(error) + } + } + latestCursor() { const cursor = this._getCursor() if (cursor) { return Promise.resolve(cursor) } @@ -54,7 +59,7 @@ class DeltaStreamingConnection extends NylasLongConnection { } start() { - if (!this._ready()) { return } + if (!this._isReady()) { return } if (!this.canStart()) { return } if (this._req != null) { return } diff --git a/internal_packages/worker-sync/lib/nylas-sync-worker.coffee b/internal_packages/worker-sync/lib/nylas-sync-worker.coffee index 3eb2e47b2..ddcb525e7 100644 --- a/internal_packages/worker-sync/lib/nylas-sync-worker.coffee +++ b/internal_packages/worker-sync/lib/nylas-sync-worker.coffee @@ -49,14 +49,15 @@ class NylasSyncWorker @_terminated = false @_connection = new DeltaStreamingConnection(api, account.id, { - ready: => @_state isnt null + isReady: => @_state isnt null getCursor: => return null if @_state is null @_state.cursor || NylasEnv.config.get("nylas.#{@_account.id}.cursor") setCursor: (val) => @_state.cursor = val @writeState() - setStatus: (status, statusCode) => + onStatusChanged: (status, statusCode) => + console.log('status changing!!!') @_state.longConnectionStatus = status if status is NylasLongConnection.Status.Closed # Make the delay 30 seconds if we get a 403 diff --git a/internal_packages/worker-sync/spec/nylas-sync-worker-spec.coffee b/internal_packages/worker-sync/spec/nylas-sync-worker-spec.coffee index f0afb974a..c95a9f1d2 100644 --- a/internal_packages/worker-sync/spec/nylas-sync-worker-spec.coffee +++ b/internal_packages/worker-sync/spec/nylas-sync-worker-spec.coffee @@ -176,6 +176,17 @@ describe "NylasSyncWorker", -> expect(connection.hasCursor()).toBe(true) expect(connection._getCursor()).toEqual('new-school') + it "should set the cursor to the last cursor after receiving deltas", -> + spyOn(DeltaStreamingConnection.prototype, 'latestCursor').andReturn Promise.resolve() + worker = new NylasSyncWorker(@api, @account) + advanceClock() + connection = worker.connection() + deltas = [{cursor: '1'}, {cursor: '2'}] + connection._emitter.emit('results-stopped-arriving', deltas) + advanceClock() + expect(connection._getCursor()).toEqual('2') + + describe "when a count request completes", -> beforeEach -> @worker.start() diff --git a/src/flux/nylas-long-connection.es6 b/src/flux/nylas-long-connection.es6 index e225f08b0..534774e00 100644 --- a/src/flux/nylas-long-connection.es6 +++ b/src/flux/nylas-long-connection.es6 @@ -36,23 +36,23 @@ class NylasLongConnection { this._buffer = '' this._results = [] this._pingTimeout = null - this._statusCode = null + this._httpStatusCode = null // Options this._path = path this._timeout = timeout || CONNECTION_TIMEOUT this._onError = onError || (() => {}) this._onStatusChanged = onStatusChanged || (() => {}) - this._debounceInterval = debounceInterval + this._debounceInterval = debounceInterval || null this._closeIfDataStopsInterval = closeIfDataStopsInterval - this._flushResults = () => { + this._flushResultsSoon = () => { if (this._results.length === 0) { return } this._emitter.emit('results-stopped-arriving', this._results); this._results = [] } - if (this._debounceInterval != null) { - this._flushResults = _.debounce(this._flushResults, this._debounceInterval) + if (this._debounceInterval !== null) { + this._flushResultsSoon = _.debounce(this._flushResultsSoon, this._debounceInterval) } } @@ -76,7 +76,7 @@ class NylasLongConnection { this._results.push(result) } }) - this._flushResults() + this._flushResultsSoon() }, PROCESS_RESULTS_THROTTLE, {leading: false}) get accountId() { @@ -90,13 +90,17 @@ class NylasLongConnection { setStatus(status) { if (this._status === status) { return } this._status = status - this._onStatusChanged(status, this._statusCode) + this._onStatusChanged(this._status, this._httpStatusCode) } onResults(callback) { this._emitter.on('results-stopped-arriving', callback) } + onError(error) { + return this._onError(error) + } + canStart() { return [Status.None, Status.Closed].includes(this._status) } @@ -123,12 +127,12 @@ class NylasLongConnection { } this._req = lib.request(options, (responseStream) => { - this._statusCode = responseStream.statusCode + this._httpStatusCode = responseStream.statusCode if (responseStream.statusCode !== 200) { responseStream.on('data', (chunk) => { const error = new Error(chunk.toString('utf8')) console.error(error) - this._onError(error) + this.onError(error) this.close() }) return @@ -152,7 +156,7 @@ class NylasLongConnection { this._req.setSocketKeepAlive(true) this._req.on('error', (err) => { console.error(err) - this._onError(err) + this.onError(err) this.close() }) this._req.on('socket', (socket) => { @@ -182,7 +186,7 @@ class NylasLongConnection { } clearTimeout(this._pingTimeout) this._pingTimeout = null - this._statusCode = null + this._httpStatusCode = null this._buffer = '' if (this._req) { this._req.end()