diff --git a/internal_packages/worker-sync/lib/nylas-long-connection.coffee b/internal_packages/worker-sync/lib/nylas-long-connection.coffee index e54b2745b..3ead7cb4d 100644 --- a/internal_packages/worker-sync/lib/nylas-long-connection.coffee +++ b/internal_packages/worker-sync/lib/nylas-long-connection.coffee @@ -11,10 +11,10 @@ class NylasLongConnection Connected: 'connected' Retrying: 'retrying' - constructor: (api, accountId) -> + constructor: (api, accountId, config) -> @_api = api @_accountId = accountId - @_cursorKey = "nylas.#{@_accountId}.cursor" + @_config = config @_emitter = new Emitter @_state = 'idle' @_req = null @@ -28,10 +28,9 @@ class NylasLongConnection last = @_deltas[@_deltas.length - 1] @_emitter.emit('deltas-stopped-arriving', @_deltas) + @_config.setCursor(last.cursor) @_deltas = [] - # Note: setCursor is slow and saves to disk, so we do it once at the end - @setCursor(last.cursor) , 1000 @ @@ -40,10 +39,10 @@ class NylasLongConnection @_accountId hasCursor: -> - !!NylasEnv.config.get(@_cursorKey) + !!@_config.getCursor() withCursor: (callback) -> - cursor = NylasEnv.config.get(@_cursorKey) + cursor = @_config.getCursor() return callback(cursor) if cursor @_api.makeRequest @@ -52,12 +51,9 @@ class NylasLongConnection method: 'POST' success: ({cursor}) => console.log("Obtained stream cursor #{cursor}.") - @setCursor(cursor) + @_config.setCursor(cursor) callback(cursor) - setCursor: (cursor) -> - NylasEnv.config.set(@_cursorKey, cursor) - state: -> @state @@ -92,6 +88,8 @@ class NylasLongConnection @_flushDeltasDebounced() start: -> + return unless @_config.ready() + token = @_api.accessTokenForAccountId(@_accountId) return if not token? return if @_state is NylasLongConnection.State.Ended diff --git a/internal_packages/worker-sync/lib/nylas-sync-worker.coffee b/internal_packages/worker-sync/lib/nylas-sync-worker.coffee index 326ecd0e9..c9332af33 100644 --- a/internal_packages/worker-sync/lib/nylas-sync-worker.coffee +++ b/internal_packages/worker-sync/lib/nylas-sync-worker.coffee @@ -42,7 +42,16 @@ class NylasSyncWorker @_account = account @_terminated = false - @_connection = new NylasLongConnection(api, account.id) + @_connection = new NylasLongConnection(api, account.id, { + ready: => @_state isnt null + getCursor: => + return null if @_state is null + @_state['cursor'] || NylasEnv.config.get("nylas.#{@_account.id}.cursor") + setCursor: (val) => + @_state['cursor'] = val + @writeState() + }) + @_refreshingCaches = [new ContactRankingsCache(account.id)] @_resumeTimer = new BackoffTimer => # indirection needed so resumeFetches can be spied on @@ -53,9 +62,10 @@ class NylasSyncWorker @_state = null DatabaseStore.findJSONBlob("NylasSyncWorker:#{@_account.id}").then (json) => @_state = json ? {} - for model, modelState of @_state - modelState.busy = false + for key in ['threads', 'labels', 'folders', 'drafts', 'contacts', 'calendars', 'events'] + @_state[key].busy = false if @_state[key] @resumeFetches() + @_connection.start() @ diff --git a/internal_packages/worker-sync/spec/nylas-sync-worker-spec.coffee b/internal_packages/worker-sync/spec/nylas-sync-worker-spec.coffee index f972bf907..bff7365bf 100644 --- a/internal_packages/worker-sync/spec/nylas-sync-worker-spec.coffee +++ b/internal_packages/worker-sync/spec/nylas-sync-worker-spec.coffee @@ -16,10 +16,12 @@ describe "NylasSyncWorker", -> getThreads: (account, params, requestOptions) => @apiRequests.push({account, model:'threads', params, requestOptions}) + @apiCursorStub = undefined spyOn(DatabaseTransaction.prototype, 'persistJSONBlob').andReturn(Promise.resolve()) spyOn(DatabaseStore, 'findJSONBlob').andCallFake (key) => if key is "NylasSyncWorker:#{TEST_ACCOUNT_ID}" return Promise.resolve _.extend {}, { + "cursor": @apiCursorStub "contacts": busy: true complete: false @@ -36,6 +38,7 @@ describe "NylasSyncWorker", -> @account = new Account(clientId: TEST_ACCOUNT_CLIENT_ID, serverId: TEST_ACCOUNT_ID, organizationUnit: 'label') @worker = new NylasSyncWorker(@api, @account) @connection = @worker.connection() + spyOn(@connection, 'start') advanceClock() it "should reset `busy` to false when reading state from disk", -> @@ -46,13 +49,11 @@ describe "NylasSyncWorker", -> describe "start", -> it "should open the delta connection", -> - spyOn(@connection, 'start') @worker.start() advanceClock() expect(@connection.start).toHaveBeenCalled() it "should start querying for model collections and counts that haven't been fully cached", -> - spyOn(@connection, 'start') @worker.start() advanceClock() expect(@apiRequests.length).toBe(10) @@ -129,6 +130,40 @@ describe "NylasSyncWorker", -> advanceClock(30000) expect(@worker.resumeFetches.callCount).toBe(1) + describe "delta streaming cursor", -> + it "should read the cursor from the database, and the old config format", -> + spyOn(NylasLongConnection.prototype, 'withCursor').andCallFake => + + @apiCursorStub = undefined + + # no cursor present + worker = new NylasSyncWorker(@api, @account) + connection = worker.connection() + expect(connection.hasCursor()).toBe(false) + advanceClock() + expect(connection.hasCursor()).toBe(false) + + # cursor present in config + spyOn(NylasEnv.config, 'get').andCallFake (key) => + return 'old-school' if key is "nylas.#{@account.id}.cursor" + return undefined + + worker = new NylasSyncWorker(@api, @account) + connection = worker.connection() + advanceClock() + expect(connection.hasCursor()).toBe(true) + expect(connection._config.getCursor()).toEqual('old-school') + + # cursor present in database, overrides cursor in config + @apiCursorStub = "new-school" + + worker = new NylasSyncWorker(@api, @account) + connection = worker.connection() + expect(connection.hasCursor()).toBe(false) + advanceClock() + expect(connection.hasCursor()).toBe(true) + expect(connection._config.getCursor()).toEqual('new-school') + describe "when a count request completes", -> beforeEach -> @worker.start()