fix(delta): add backoff to delta error handling

Summary: Report delta errors and exponential backoff when errors arise

Test Plan: Manually throw Errors in various parts of the delta stream

Reviewers: spang, juan

Reviewed By: juan

Differential Revision: https://phab.nylas.com/D3917
This commit is contained in:
Evan Morikawa 2017-02-14 16:09:27 -08:00
parent ddbf40c5ab
commit b5ea67198e
3 changed files with 26 additions and 23 deletions

View file

@ -12,6 +12,9 @@ import DeltaStreamingInMemoryConnection from './delta-streaming-in-memory-connec
import DeltaProcessor from './delta-processor'
import ContactRankingsCache from './contact-rankings-cache';
const MAX_RETRY_DELAY = 10 * 60 * 1000; // 15 minutes
const BASE_RETRY_DELAY = 1000;
/** This manages the syncing of N1 assets. We create one
* AccountDeltaConnection per email account. We save the state of the
* AccountDeltaConnection in the database.
@ -34,6 +37,7 @@ export default class AccountDeltaConnection {
constructor(account) {
this._state = { deltaCursors: {}, deltaStatus: {} }
this.retryDelay = BASE_RETRY_DELAY;
this._writeStateDebounced = _.debounce(this._writeState, 100)
this._account = account;
this._unlisten = Actions.retryDeltaConnection.listen(() => this.refresh());
@ -101,6 +105,7 @@ export default class AccountDeltaConnection {
setCursor: (val) => {
this._state.deltaCursors[streamName] = val;
this._writeStateDebounced();
this.retryDelay = BASE_RETRY_DELAY;
},
onDeltas: DeltaProcessor.process.bind(DeltaProcessor),
onStatusChanged: (status) => {
@ -111,20 +116,18 @@ export default class AccountDeltaConnection {
}
}
_onError = (err) => {
if (err instanceof APIError) {
if (err.statusCode === 401) {
Actions.updateAccount(this._account.id, {
syncState: Account.SYNC_STATE_AUTH_FAILED,
syncError: err.toJSON(),
})
this.cleanup()
return
}
this.refresh()
return
_onError = (err = {}) => {
if (err instanceof APIError && err.statusCode === 401) {
Actions.updateAccount(this._account.id, {
syncState: Account.SYNC_STATE_AUTH_FAILED,
syncError: err.toJSON(),
})
}
throw err
NylasEnv.reportError(`Error connecting to delta stream: ${err.message}`)
this.cleanup()
setTimeout(() => this.refresh(), this.retryDelay);
this.retryDelay = Math.min(MAX_RETRY_DELAY, this.retryDelay * 1.2)
return
}
_writeState() {

View file

@ -7,15 +7,19 @@ class DeltaStreamingConnection extends NylasLongConnection {
opts.closeIfDataStopsInterval = 15 * 1000
super(api, accountId, opts)
this._onError = opts.onError || (() => {})
const {getCursor, setCursor} = opts
this._getCursor = getCursor
this._setCursor = setCursor
// Update cursor when deltas received
this.onDeltas((deltas) => {
this.onResults((deltas = []) => {
if (opts.onDeltas) opts.onDeltas(deltas);
const last = _.last(deltas)
this._setCursor(last.cursor)
const last = _.last(deltas);
if (last && last.cursor) {
this._setCursor(last.cursor)
}
})
}
@ -23,8 +27,8 @@ class DeltaStreamingConnection extends NylasLongConnection {
return `/delta/streaming?cursor=${cursor}`
}
onError(err) {
if (err.message.includes('Invalid cursor')) {
onError(err = {}) {
if (err.message && err.message.includes('Invalid cursor')) {
const error = new Error('Delta Connection: Cursor is invalid. Need to blow away local cache.');
NylasEnv.reportError(error)
this._setCursor(0)
@ -33,10 +37,6 @@ class DeltaStreamingConnection extends NylasLongConnection {
this._onError(err)
}
onDeltas(callback) {
return this.onResults(callback)
}
start() {
this._path = this._deltaStreamingPath(this._getCursor() || 0)
super.start()

2
src/K2

@ -1 +1 @@
Subproject commit 15800d991389b4373b5b051fa9cb9b4511338674
Subproject commit 225d747166baccd26eca886b15e2e48b91908ebf