fix(deltas): Update latest cursor, address comments

- Add test to make sure that latest cursor is updated
This commit is contained in:
Juan Tejada 2016-07-27 14:29:35 -07:00
parent b7408805f0
commit ef6cf545d8
4 changed files with 46 additions and 25 deletions

View file

@ -1,3 +1,4 @@
import _ from 'underscore'
import {NylasLongConnection, DatabaseStore} from 'nylas-exports'
const {Status} = NylasLongConnection
@ -10,20 +11,16 @@ class DeltaStreamingConnection extends NylasLongConnection {
opts.closeIfDataStopsInterval = 15 * 1000
super(api, accountId, opts)
const {ready, getCursor, setCursor, setStatus} = opts
this._ready = ready
const {isReady, getCursor, setCursor} = opts
this._isReady = isReady
this._getCursor = getCursor
this._setCursor = setCursor
// Override super class instance vars
this._onStatusChanged = setStatus
this._onError = (err) => {
if (err.message.indexOf('Invalid cursor') > 0) {
const error = new Error('Delta Connection: Cursor is invalid. Need to blow away local cache.')
NylasEnv.config.unset(`nylas.${this._accountId}.cursor`)
DatabaseStore._handleSetupError(error)
}
}
// Update cursor when deltas received
this.onDeltas((deltas) => {
const last = _.last(deltas)
this._setCursor(last.cursor)
})
}
deltaStreamingPath(cursor) {
@ -34,6 +31,14 @@ class DeltaStreamingConnection extends NylasLongConnection {
return !!this._getCursor()
}
onError(err) {
if (err.message.indexOf('Invalid cursor') > 0) {
const error = new Error('Delta Connection: Cursor is invalid. Need to blow away local cache.')
NylasEnv.config.unset(`nylas.${this._accountId}.cursor`)
DatabaseStore._handleSetupError(error)
}
}
latestCursor() {
const cursor = this._getCursor()
if (cursor) { return Promise.resolve(cursor) }
@ -54,7 +59,7 @@ class DeltaStreamingConnection extends NylasLongConnection {
}
start() {
if (!this._ready()) { return }
if (!this._isReady()) { return }
if (!this.canStart()) { return }
if (this._req != null) { return }

View file

@ -49,14 +49,15 @@ class NylasSyncWorker
@_terminated = false
@_connection = new DeltaStreamingConnection(api, account.id, {
ready: => @_state isnt null
isReady: => @_state isnt null
getCursor: =>
return null if @_state is null
@_state.cursor || NylasEnv.config.get("nylas.#{@_account.id}.cursor")
setCursor: (val) =>
@_state.cursor = val
@writeState()
setStatus: (status, statusCode) =>
onStatusChanged: (status, statusCode) =>
console.log('status changing!!!')
@_state.longConnectionStatus = status
if status is NylasLongConnection.Status.Closed
# Make the delay 30 seconds if we get a 403

View file

@ -176,6 +176,17 @@ describe "NylasSyncWorker", ->
expect(connection.hasCursor()).toBe(true)
expect(connection._getCursor()).toEqual('new-school')
it "should set the cursor to the last cursor after receiving deltas", ->
spyOn(DeltaStreamingConnection.prototype, 'latestCursor').andReturn Promise.resolve()
worker = new NylasSyncWorker(@api, @account)
advanceClock()
connection = worker.connection()
deltas = [{cursor: '1'}, {cursor: '2'}]
connection._emitter.emit('results-stopped-arriving', deltas)
advanceClock()
expect(connection._getCursor()).toEqual('2')
describe "when a count request completes", ->
beforeEach ->
@worker.start()

View file

@ -36,23 +36,23 @@ class NylasLongConnection {
this._buffer = ''
this._results = []
this._pingTimeout = null
this._statusCode = null
this._httpStatusCode = null
// Options
this._path = path
this._timeout = timeout || CONNECTION_TIMEOUT
this._onError = onError || (() => {})
this._onStatusChanged = onStatusChanged || (() => {})
this._debounceInterval = debounceInterval
this._debounceInterval = debounceInterval || null
this._closeIfDataStopsInterval = closeIfDataStopsInterval
this._flushResults = () => {
this._flushResultsSoon = () => {
if (this._results.length === 0) { return }
this._emitter.emit('results-stopped-arriving', this._results);
this._results = []
}
if (this._debounceInterval != null) {
this._flushResults = _.debounce(this._flushResults, this._debounceInterval)
if (this._debounceInterval !== null) {
this._flushResultsSoon = _.debounce(this._flushResultsSoon, this._debounceInterval)
}
}
@ -76,7 +76,7 @@ class NylasLongConnection {
this._results.push(result)
}
})
this._flushResults()
this._flushResultsSoon()
}, PROCESS_RESULTS_THROTTLE, {leading: false})
get accountId() {
@ -90,13 +90,17 @@ class NylasLongConnection {
setStatus(status) {
if (this._status === status) { return }
this._status = status
this._onStatusChanged(status, this._statusCode)
this._onStatusChanged(this._status, this._httpStatusCode)
}
onResults(callback) {
this._emitter.on('results-stopped-arriving', callback)
}
onError(error) {
return this._onError(error)
}
canStart() {
return [Status.None, Status.Closed].includes(this._status)
}
@ -123,12 +127,12 @@ class NylasLongConnection {
}
this._req = lib.request(options, (responseStream) => {
this._statusCode = responseStream.statusCode
this._httpStatusCode = responseStream.statusCode
if (responseStream.statusCode !== 200) {
responseStream.on('data', (chunk) => {
const error = new Error(chunk.toString('utf8'))
console.error(error)
this._onError(error)
this.onError(error)
this.close()
})
return
@ -152,7 +156,7 @@ class NylasLongConnection {
this._req.setSocketKeepAlive(true)
this._req.on('error', (err) => {
console.error(err)
this._onError(err)
this.onError(err)
this.close()
})
this._req.on('socket', (socket) => {
@ -182,7 +186,7 @@ class NylasLongConnection {
}
clearTimeout(this._pingTimeout)
this._pingTimeout = null
this._statusCode = null
this._httpStatusCode = null
this._buffer = ''
if (this._req) {
this._req.end()