[client-app] Prevent delta streaming connection from retrying too much

Summary:
Before this commit, we would call .close() onError and when the
connection closed. We also closed the connection onError, even though
NylasLongConnection had also closed it, so we ended up calling close a
bunch of times. This would cause us to set a bunch of timeouts to retry
unecessarilly.

This commit makes it so we ensure there's only one retry timeout and
consolidates the logic that calls .close() so we don't call it so many
times unnecessarily

Test Plan: manual

Reviewers: spang, mark, evan

Reviewed By: mark, evan

Differential Revision: https://phab.nylas.com/D4166
This commit is contained in:
Juan Tejada 2017-03-09 13:41:02 -08:00
parent da463c250f
commit d2cd0db335
2 changed files with 88 additions and 74 deletions

View file

@ -98,6 +98,7 @@ class NylasLongConnection {
onError(error) {
this._onError(error)
this.close()
}
canStart() {
@ -108,82 +109,85 @@ class NylasLongConnection {
if (!this.canStart()) { return this }
if (this._req != null) { return this }
const accountToken = this._api.accessTokenForAccountId(this._accountId)
const identityToken = (IdentityStore.identity() || {}).token || ''
if (!accountToken) {
throw new APIError({
statusCode: 401,
message: `Can't establish NylasLongConnection: No account token available for account ${this._accountId}`,
})
}
const options = url.parse(`${this._api.APIRoot}${this._path}`)
options.auth = `${accountToken}:${identityToken}`
let lib;
if (this._api.APIRoot.indexOf('https') === -1) {
lib = require('http')
} else {
lib = require('https')
}
this._req = lib.request(options, (responseStream) => {
this._req.responseStream = responseStream
this._httpStatusCode = responseStream.statusCode
if (responseStream.statusCode !== 200) {
responseStream.on('data', (chunk) => {
const error = new APIError({
response: responseStream,
message: chunk.toString('utf8'),
statusCode: responseStream.statusCode,
})
this.onError(error)
this.close()
try {
const accountToken = this._api.accessTokenForAccountId(this._accountId)
const identityToken = (IdentityStore.identity() || {}).token || ''
if (!accountToken) {
throw new APIError({
statusCode: 401,
message: `Can't establish NylasLongConnection: No account token available for account ${this._accountId}`,
})
return
}
responseStream.setEncoding('utf8')
responseStream.on('error', (error) => {
this.onError(new APIError({error}))
this.close()
})
responseStream.on('close', () => this.close())
responseStream.on('end', () => this.close())
responseStream.on('data', (chunk) => {
this.closeIfDataStops()
// Ignore redundant newlines sent as pings. Want to avoid
// calls to this.onProcessBuffer that contain no actual updates
if (chunk === '\n' && (this._buffer.length === 0 || _.last(this._buffer) === '\n')) {
const options = url.parse(`${this._api.APIRoot}${this._path}`)
options.auth = `${accountToken}:${identityToken}`
let lib;
if (this._api.APIRoot.indexOf('https') === -1) {
lib = require('http')
} else {
lib = require('https')
}
this._req = lib.request(options, (responseStream) => {
this._req.responseStream = responseStream
this._httpStatusCode = responseStream.statusCode
if (responseStream.statusCode !== 200) {
responseStream.on('data', (chunk) => {
const error = new APIError({
response: responseStream,
message: chunk.toString('utf8'),
statusCode: responseStream.statusCode,
})
this.onError(error)
})
return
}
this._buffer += chunk
this._processBufferThrottled()
responseStream.setEncoding('utf8')
responseStream.on('error', (error) => {
this.onError(new APIError({error}))
})
responseStream.on('close', () => this.close())
responseStream.on('end', () => this.close())
responseStream.on('data', (chunk) => {
this.closeIfDataStops()
// Ignore redundant newlines sent as pings. Want to avoid
// calls to this.onProcessBuffer that contain no actual updates
if (chunk === '\n' && (this._buffer.length === 0 || _.last(this._buffer) === '\n')) {
return
}
this._buffer += chunk
this._processBufferThrottled()
})
})
})
this._req.setTimeout(60 * 60 * 1000)
this._req.setSocketKeepAlive(true)
this._req.on('error', (error) => {
this.onError(new APIError({error}))
this.close()
})
this._req.on('socket', (socket) => {
this.setStatus(Status.Connecting)
socket.on('connect', () => {
this.setStatus(Status.Connected)
this.closeIfDataStops()
})
socket.on('error', (error) => {
this._req.setTimeout(60 * 60 * 1000)
this._req.setSocketKeepAlive(true)
this._req.on('error', (error) => {
this.onError(new APIError({error}))
this.close()
})
socket.on('close', () => this.close())
socket.on('end', () => this.close())
})
// We `end` the request to start it.
// See https://github.com/nylas/nylas-mail/pull/2004
this._req.end()
return this
this._req.on('socket', (socket) => {
this.setStatus(Status.Connecting)
socket.on('connect', () => {
this.setStatus(Status.Connected)
this.closeIfDataStops()
})
socket.on('error', (error) => {
this.onError(new APIError({error}))
})
socket.on('close', () => this.close())
socket.on('end', () => this.close())
})
// We `end` the request to start it.
// See https://github.com/nylas/nylas-mail/pull/2004
this._req.end()
return this
} catch (err) {
// start() should not throw any errors synchronously. Any errors should be
// asynchronously transmitted to the caller via `onError`
setTimeout(() => this.onError(err), 0)
return this
}
}
closeIfDataStops() {

View file

@ -19,8 +19,9 @@ class DeltaStreamingConnection {
this._account = account
this._state = null
this._longConnection = null
this._writeStateDebounced = _.debounce(this._writeState, 100)
this._retryTimeout = null
this._unsubscribers = []
this._writeStateDebounced = _.debounce(this._writeState, 100)
this._backoffScheduler = new ExponentialBackoffScheduler({
baseDelay: BASE_RETRY_DELAY,
maxDelay: MAX_RETRY_DELAY,
@ -67,11 +68,13 @@ class DeltaStreamingConnection {
}
close() {
this._clearRetryTimeout()
this._disposeListeners()
this._longConnection.close()
}
end() {
this._clearRetryTimeout()
this._disposeListeners()
this._longConnection.end()
}
@ -88,6 +91,11 @@ class DeltaStreamingConnection {
this._unsubscribers = []
}
_clearRetryTimeout() {
clearTimeout(this._retryTimeout)
this._retryTimeout = null
}
_onOnlineStatusChanged = () => {
if (OnlineStatusStore.isOnline()) {
this.restart()
@ -103,7 +111,9 @@ class DeltaStreamingConnection {
this._backoffScheduler.reset()
}
if (status === Closed) {
setTimeout(() => this.restart(), this._backoffScheduler.nextDelay());
if (this._retryTimeout) { return }
this._clearRetryTimeout()
this._retryTimeout = setTimeout(() => this.restart(), this._backoffScheduler.nextDelay());
}
}
@ -138,9 +148,6 @@ class DeltaStreamingConnection {
if (!NylasAPIRequest.NonReportableStatusCodes.includes(err.statusCode)) {
NylasEnv.reportError(err)
}
this.close()
setTimeout(() => this.restart(), this._backoffScheduler.nextDelay());
}
_setCursor = (cursor) => {
@ -160,7 +167,10 @@ class DeltaStreamingConnection {
// Migrate from old storage key
const oldState = await DatabaseStore.findJSONBlob(`NylasSyncWorker:${this._account.id}`)
if (!oldState) {
return {cursor: null, status: null};
return {
cursor: null,
status: null,
};
}
const {deltaCursors = {}, deltaStatus = {}} = oldState