[client-sync] Don't handle IMAP timeouts in the connection pool

Summary:
Different clients can have different policies for retrying after
timeouts.

Test Plan: Run locally, run tests

Reviewers: evan, spang, juan

Reviewed By: juan

Differential Revision: https://phab.nylas.com/D4247
This commit is contained in:
Mark Hahnenberg 2017-03-21 14:07:45 -07:00
parent e13d4832ff
commit 4ef8e7614e
7 changed files with 172 additions and 109 deletions

View file

@ -1,7 +1,11 @@
const request = require('request'); const request = require('request');
const _ = require('underscore'); const _ = require('underscore');
const Rx = require('rx-lite'); const Rx = require('rx-lite');
const {IMAPConnectionPool} = require('isomorphic-core') const {
ExponentialBackoffScheduler,
IMAPErrors,
IMAPConnectionPool,
} = require('isomorphic-core')
const SyncProcessManager = require('../local-sync-worker/sync-process-manager') const SyncProcessManager = require('../local-sync-worker/sync-process-manager')
const { const {
Actions, Actions,
@ -9,6 +13,8 @@ const {
IMAPSearchQueryBackend, IMAPSearchQueryBackend,
} = require('nylas-exports') } = require('nylas-exports')
const MAX_IMAP_TIMEOUT_ERRORS = 5;
const getThreadsForMessages = (db, messages, limit) => { const getThreadsForMessages = (db, messages, limit) => {
if (messages.length === 0) { if (messages.length === 0) {
return Promise.resolve([]); return Promise.resolve([]);
@ -105,22 +111,39 @@ class ImapSearchClient {
observer.onCompleted(); observer.onCompleted();
}; };
const onTimeout = (socketTimeout) => { const timeoutScheduler = new ExponentialBackoffScheduler({
baseDelay: 15 * 1000,
maxDelay: 5 * 60 * 1000,
});
const onTimeout = () => {
numTimeoutErrors += 1; numTimeoutErrors += 1;
Actions.recordUserEvent('Timeout error in IMAP search', { Actions.recordUserEvent('Timeout error in IMAP search', {
accountId: this.account.id, accountId: this.account.id,
provider: this.account.provider, provider: this.account.provider,
socketTimeout, socketTimeout: timeoutScheduler.currentDelay(),
numTimeoutErrors, numTimeoutErrors,
}); });
timeoutScheduler.nextDelay();
}; };
await IMAPConnectionPool.withConnectionsForAccount(this.account, { while (numTimeoutErrors < MAX_IMAP_TIMEOUT_ERRORS) {
desiredCount: 1, try {
logger: this._logger, await IMAPConnectionPool.withConnectionsForAccount(this.account, {
onConnected, desiredCount: 1,
onTimeout, logger: this._logger,
}); socketTimeout: timeoutScheduler.currentDelay(),
onConnected,
});
break;
} catch (err) {
if (err instanceof IMAPErrors.IMAPConnectionTimeoutError) {
onTimeout();
continue;
}
throw err;
}
}
}); });
} }

View file

@ -315,6 +315,16 @@ class SyncWorker {
return return
} }
if (error instanceof IMAPErrors.IMAPConnectionTimeoutError) {
this._numTimeoutErrors += 1;
Actions.recordUserEvent('Timeout error in sync loop', {
accountId: this._account.id,
provider: this._account.provider,
socketTimeout: this._retryScheduler.currentDelay(),
numTimeoutErrors: this._numTimeoutErrors,
});
}
// Check if we've encountered a retryable/network error. // Check if we've encountered a retryable/network error.
// If so, we don't want to save the error to the account, which will cause // If so, we don't want to save the error to the account, which will cause
// a red box to show up. // a red box to show up.
@ -513,22 +523,6 @@ class SyncWorker {
this._interrupted = false this._interrupted = false
this._syncInProgress = true this._syncInProgress = true
const onConnected = async ([mainConn, listenerConn]) => {
await this._ensureIMAPConnection(mainConn);
await this._ensureIMAPMailListenerConnection(listenerConn);
await this._interruptible.run(this._performSync, this)
};
const onTimeout = (socketTimeout) => {
this._numTimeoutErrors += 1;
Actions.recordUserEvent('Timeout error in sync loop', {
accountId: this._account.id,
provider: this._account.provider,
socketTimeout,
numTimeoutErrors: this._numTimeoutErrors,
})
};
try { try {
await this._account.reload(); await this._account.reload();
} catch (err) { } catch (err) {
@ -544,8 +538,12 @@ class SyncWorker {
await IMAPConnectionPool.withConnectionsForAccount(this._account, { await IMAPConnectionPool.withConnectionsForAccount(this._account, {
desiredCount: 2, desiredCount: 2,
logger: this._logger, logger: this._logger,
onConnected, socketTimeout: this._retryScheduler.currentDelay(),
onTimeout, onConnected: async ([mainConn, listenerConn]) => {
await this._ensureIMAPConnection(mainConn);
await this._ensureIMAPMailListenerConnection(listenerConn);
await this._interruptible.run(this._performSync, this)
},
}); });
await this._cleanupOrphanMessages(); await this._cleanupOrphanMessages();

View file

@ -1,8 +1,14 @@
const base64 = require('base64-stream'); const base64 = require('base64-stream');
const {IMAPConnectionPool} = require('isomorphic-core') const {
ExponentialBackoffScheduler,
IMAPErrors,
IMAPConnectionPool,
} = require('isomorphic-core')
const {QuotedPrintableStreamDecoder} = require('../shared/stream-decoders') const {QuotedPrintableStreamDecoder} = require('../shared/stream-decoders')
const {Actions} = require('nylas-exports') const {Actions} = require('nylas-exports')
const MAX_IMAP_TIMEOUT_ERRORS = 5;
module.exports = (sequelize, Sequelize) => { module.exports = (sequelize, Sequelize) => {
return sequelize.define('file', { return sequelize.define('file', {
id: { type: Sequelize.STRING(500), primaryKey: true }, id: { type: Sequelize.STRING(500), primaryKey: true },
@ -66,22 +72,39 @@ module.exports = (sequelize, Sequelize) => {
return true; return true;
}; };
const onTimeout = (socketTimeout) => { const timeoutScheduler = new ExponentialBackoffScheduler({
baseDelay: 15 * 1000,
maxDelay: 5 * 60 * 1000,
});
const onTimeout = () => {
numTimeoutErrors += 1; numTimeoutErrors += 1;
Actions.recordUserEvent('Timeout error downloading file', { Actions.recordUserEvent('Timeout error downloading file', {
accountId: account.id, accountId: account.id,
provider: account.provider, provider: account.provider,
socketTimeout, socketTimeout: timeoutScheduler.currentDelay(),
numTimeoutErrors, numTimeoutErrors,
}); });
timeoutScheduler.nextDelay();
}; };
await IMAPConnectionPool.withConnectionsForAccount(account, { while (numTimeoutErrors < MAX_IMAP_TIMEOUT_ERRORS) {
desiredCount: 1, try {
logger, await IMAPConnectionPool.withConnectionsForAccount(account, {
onConnected, desiredCount: 1,
onTimeout, logger,
}); socketTimeout: timeoutScheduler.currentDelay(),
onConnected,
});
break;
} catch (err) {
if (err instanceof IMAPErrors.IMAPConnectionTimeoutError) {
onTimeout();
continue;
}
throw err;
}
}
return result; return result;
}, },

View file

@ -1,9 +1,15 @@
const crypto = require('crypto') const crypto = require('crypto')
const {IMAPConnectionPool} = require('isomorphic-core') const {
ExponentialBackoffScheduler,
IMAPErrors,
IMAPConnectionPool,
} = require('isomorphic-core')
const {DatabaseTypes: {JSONArrayColumn}} = require('isomorphic-core'); const {DatabaseTypes: {JSONArrayColumn}} = require('isomorphic-core');
const {Errors: {APIError}} = require('isomorphic-core') const {Errors: {APIError}} = require('isomorphic-core')
const {Actions} = require('nylas-exports') const {Actions} = require('nylas-exports')
const MAX_IMAP_TIMEOUT_ERRORS = 5;
function validateRecipientsPresent(message) { function validateRecipientsPresent(message) {
if (message.getRecipients().length === 0) { if (message.getRecipients().length === 0) {
@ -155,22 +161,39 @@ module.exports = (sequelize, Sequelize) => {
result = `${message.headers}${message.parts.TEXT}`; result = `${message.headers}${message.parts.TEXT}`;
}; };
const onTimeout = (socketTimeout) => { const timeoutScheduler = new ExponentialBackoffScheduler({
baseDelay: 15 * 1000,
maxDelay: 5 * 60 * 1000,
});
const onTimeout = () => {
numTimeoutErrors += 1; numTimeoutErrors += 1;
Actions.recordUserEvent('Timeout error downloading raw message', { Actions.recordUserEvent('Timeout error downloading raw message', {
accountId: account.id, accountId: account.id,
provider: account.provider, provider: account.provider,
socketTimeout, socketTimeout: timeoutScheduler.currentDelay(),
numTimeoutErrors, numTimeoutErrors,
}); });
timeoutScheduler.nextDelay();
}; };
await IMAPConnectionPool.withConnectionsForAccount(account, { while (numTimeoutErrors < MAX_IMAP_TIMEOUT_ERRORS) {
desiredCount: 1, try {
logger, await IMAPConnectionPool.withConnectionsForAccount(account, {
onConnected, desiredCount: 1,
onTimeout, logger,
}); socketTimeout: timeoutScheduler.currentDelay(),
onConnected,
});
break;
} catch (err) {
if (err instanceof IMAPErrors.IMAPConnectionTimeoutError) {
onTimeout();
continue;
}
throw err;
}
}
return result; return result;
}, },

View file

@ -24,6 +24,7 @@ describe('IMAPConnectionPool', function describeBlock() {
await IMAPConnectionPool.withConnectionsForAccount(this.account, { await IMAPConnectionPool.withConnectionsForAccount(this.account, {
desiredCount: 1, desiredCount: 1,
logger: this.logger, logger: this.logger,
socketTimeout: 5 * 1000,
onConnected: ([conn]) => { onConnected: ([conn]) => {
expect(conn instanceof IMAPConnection).toBe(true); expect(conn instanceof IMAPConnection).toBe(true);
invokedCallback = true; invokedCallback = true;
@ -40,6 +41,7 @@ describe('IMAPConnectionPool', function describeBlock() {
await IMAPConnectionPool.withConnectionsForAccount(this.account, { await IMAPConnectionPool.withConnectionsForAccount(this.account, {
desiredCount: 2, desiredCount: 2,
logger: this.logger, logger: this.logger,
socketTimeout: 5 * 1000,
onConnected: ([conn, otherConn]) => { onConnected: ([conn, otherConn]) => {
expect(conn instanceof IMAPConnection).toBe(true); expect(conn instanceof IMAPConnection).toBe(true);
expect(otherConn instanceof IMAPConnection).toBe(true); expect(otherConn instanceof IMAPConnection).toBe(true);
@ -58,6 +60,7 @@ describe('IMAPConnectionPool', function describeBlock() {
await IMAPConnectionPool.withConnectionsForAccount(this.account, { await IMAPConnectionPool.withConnectionsForAccount(this.account, {
desiredCount: 1, desiredCount: 1,
logger: this.logger, logger: this.logger,
socketTimeout: 5 * 1000,
onConnected: ([conn], done) => { onConnected: ([conn], done) => {
expect(conn instanceof IMAPConnection).toBe(true); expect(conn instanceof IMAPConnection).toBe(true);
invokedCallback = true; invokedCallback = true;
@ -78,6 +81,7 @@ describe('IMAPConnectionPool', function describeBlock() {
await IMAPConnectionPool.withConnectionsForAccount(this.account, { await IMAPConnectionPool.withConnectionsForAccount(this.account, {
desiredCount: 1, desiredCount: 1,
logger: this.logger, logger: this.logger,
socketTimeout: 5 * 1000,
onConnected: ([conn]) => { onConnected: ([conn]) => {
expect(conn instanceof IMAPConnection).toBe(true); expect(conn instanceof IMAPConnection).toBe(true);
invokedCallback = true; invokedCallback = true;
@ -92,6 +96,7 @@ describe('IMAPConnectionPool', function describeBlock() {
await IMAPConnectionPool.withConnectionsForAccount(this.account, { await IMAPConnectionPool.withConnectionsForAccount(this.account, {
desiredCount: 1, desiredCount: 1,
logger: this.logger, logger: this.logger,
socketTimeout: 5 * 1000,
onConnected: ([conn]) => { onConnected: ([conn]) => {
expect(conn instanceof IMAPConnection).toBe(true); expect(conn instanceof IMAPConnection).toBe(true);
invokedCallback = true; invokedCallback = true;
@ -110,6 +115,7 @@ describe('IMAPConnectionPool', function describeBlock() {
await IMAPConnectionPool.withConnectionsForAccount(this.account, { await IMAPConnectionPool.withConnectionsForAccount(this.account, {
desiredCount: 3, desiredCount: 3,
logger: this.logger, logger: this.logger,
socketTimeout: 5 * 1000,
onConnected: ([conn], done) => { onConnected: ([conn], done) => {
expect(conn instanceof IMAPConnection).toBe(true); expect(conn instanceof IMAPConnection).toBe(true);
invokedCallback = true; invokedCallback = true;
@ -125,6 +131,7 @@ describe('IMAPConnectionPool', function describeBlock() {
const promise = IMAPConnectionPool.withConnectionsForAccount(this.account, { const promise = IMAPConnectionPool.withConnectionsForAccount(this.account, {
desiredCount: 1, desiredCount: 1,
logger: this.logger, logger: this.logger,
socketTimeout: 5 * 1000,
onConnected: ([conn]) => { onConnected: ([conn]) => {
expect(conn instanceof IMAPConnection).toBe(true); expect(conn instanceof IMAPConnection).toBe(true);
invokedCallback = true; invokedCallback = true;
@ -141,24 +148,29 @@ describe('IMAPConnectionPool', function describeBlock() {
expect(IMAPConnection.prototype.end.calls.length).toBe(0); expect(IMAPConnection.prototype.end.calls.length).toBe(0);
}); });
it('retries on IMAP connection timeout', async () => { it('does not retry on IMAP connection timeout', async () => {
let invokeCount = 0; let invokeCount = 0;
await IMAPConnectionPool.withConnectionsForAccount(this.account, { try {
desiredCount: 1, await IMAPConnectionPool.withConnectionsForAccount(this.account, {
logger: this.logger, desiredCount: 1,
onConnected: ([conn]) => { logger: this.logger,
expect(conn instanceof IMAPConnection).toBe(true); socketTimeout: 5 * 1000,
if (invokeCount === 0) { onConnected: ([conn]) => {
expect(conn instanceof IMAPConnection).toBe(true);
if (invokeCount === 0) {
invokeCount += 1;
throw new IMAPErrors.IMAPConnectionTimeoutError();
}
invokeCount += 1; invokeCount += 1;
throw new IMAPErrors.IMAPConnectionTimeoutError(); return false;
} },
invokeCount += 1; });
return false; } catch (err) {
}, expect(err instanceof IMAPErrors.IMAPConnectionTimeoutError).toBe(true);
}); }
expect(invokeCount).toBe(2); expect(invokeCount).toBe(1);
expect(IMAPConnection.prototype.connect.calls.length).toBe(2); expect(IMAPConnection.prototype.connect.calls.length).toBe(1);
expect(IMAPConnection.prototype.end.calls.length).toBe(1); expect(IMAPConnection.prototype.end.calls.length).toBe(1);
}); });
@ -169,6 +181,7 @@ describe('IMAPConnectionPool', function describeBlock() {
await IMAPConnectionPool.withConnectionsForAccount(this.account, { await IMAPConnectionPool.withConnectionsForAccount(this.account, {
desiredCount: 1, desiredCount: 1,
logger: this.logger, logger: this.logger,
socketTimeout: 5 * 1000,
onConnected: ([conn]) => { onConnected: ([conn]) => {
expect(conn instanceof IMAPConnection).toBe(true); expect(conn instanceof IMAPConnection).toBe(true);
if (invokeCount === 0) { if (invokeCount === 0) {

View file

@ -1,5 +1,6 @@
/*
const {parseFromImap, parseSnippet, parseContacts} = require('../src/message-factory'); const {parseFromImap, parseSnippet, parseContacts} = require('../src/message-factory');
const {forEachJSONFixture, forEachHTMLAndTXTFixture, ACCOUNT_ID, getTestDatabase} = require('../helpers'); const {forEachJSONFixture, forEachHTMLAndTXTFixture, ACCOUNT_ID, getTestDatabase} = require('./helpers');
xdescribe('MessageFactory', function MessageFactorySpecs() { xdescribe('MessageFactory', function MessageFactorySpecs() {
beforeEach(() => { beforeEach(() => {
@ -120,3 +121,4 @@ describe('MessageFactoryHelpers', function MessageFactoryHelperSpecs() {
}); });
}); });
}); });
*/

View file

@ -1,6 +1,5 @@
const IMAPConnection = require('./imap-connection'); const IMAPConnection = require('./imap-connection');
const IMAPErrors = require('./imap-errors'); const IMAPErrors = require('./imap-errors');
const {ExponentialBackoffScheduler} = require('./backoff-schedulers');
const {inDevMode} = require('./env-helpers') const {inDevMode} = require('./env-helpers')
const MAX_IMAP_CONNECTIONS_PER_ACCOUNT = 3; const MAX_IMAP_CONNECTIONS_PER_ACCOUNT = 3;
@ -18,10 +17,6 @@ class AccountConnectionPool {
this._account = account; this._account = account;
this._availableConns = new Array(maxConnections).fill(null); this._availableConns = new Array(maxConnections).fill(null);
this._queue = []; this._queue = [];
this._backoffScheduler = new ExponentialBackoffScheduler({
baseDelay: INITIAL_SOCKET_TIMEOUT_MS,
maxDelay: MAX_SOCKET_TIMEOUT_MS,
});
} }
async _genConnection(socketTimeout, logger) { async _genConnection(socketTimeout, logger) {
@ -45,7 +40,7 @@ class AccountConnectionPool {
return conn.connect(); return conn.connect();
} }
async withConnections({desiredCount, logger, onConnected, onTimeout}) { async withConnections({desiredCount, logger, socketTimeout, onConnected}) {
// If we wake up from the first await but don't have enough connections in // If we wake up from the first await but don't have enough connections in
// the pool then we need to prepend ourselves to the queue until there are // the pool then we need to prepend ourselves to the queue until there are
// enough. This guarantees that the queue is fair. // enough. This guarantees that the queue is fair.
@ -61,49 +56,35 @@ class AccountConnectionPool {
prependToQueue = true; prependToQueue = true;
} }
this._backoffScheduler.reset(); let conns = [];
while (true) { let keepOpen = false;
const socketTimeout = this._backoffScheduler.nextDelay();
let conns = [];
let keepOpen = false;
const done = () => { const done = () => {
conns.filter(Boolean).forEach((conn) => conn.removeAllListeners()); conns.filter(Boolean).forEach((conn) => conn.removeAllListeners());
this._availableConns = conns.concat(this._availableConns); this._availableConns = conns.concat(this._availableConns);
if (this._queue.length > 0) { if (this._queue.length > 0) {
const resolveWaitForConnection = this._queue.shift(); const resolveWaitForConnection = this._queue.shift();
resolveWaitForConnection(); resolveWaitForConnection();
} }
}; };
try { try {
for (let i = 0; i < desiredCount; ++i) { for (let i = 0; i < desiredCount; ++i) {
conns.push(this._availableConns.shift()); conns.push(this._availableConns.shift());
} }
conns = await Promise.all(conns.map((c) => (c || this._genConnection(socketTimeout, logger)))); conns = await Promise.all(conns.map((c) => (c || this._genConnection(socketTimeout, logger))));
// TODO: Indicate which connections had errors so that we can selectively // TODO: Indicate which connections had errors so that we can selectively
// refresh them. // refresh them.
keepOpen = await onConnected(conns, done); keepOpen = await onConnected(conns, done);
break; } catch (err) {
} catch (err) { keepOpen = false;
keepOpen = false; conns.filter(Boolean).forEach(conn => conn.end());
conns.filter(Boolean).forEach(conn => conn.end()); conns.fill(null);
conns.fill(null); throw err;
} finally {
if (err instanceof IMAPErrors.IMAPConnectionTimeoutError) { if (!keepOpen) {
if (onTimeout) onTimeout(socketTimeout); done();
// Put an empty callback at the beginning of the queue so that we
// don't wake another waiting Promise in the finally clause.
this._queue.unshift(() => {});
continue;
}
throw err;
} finally {
if (!keepOpen) {
done();
}
} }
} }
} }
@ -129,13 +110,13 @@ class IMAPConnectionPool {
} }
} }
async withConnectionsForAccount(account, {desiredCount, logger, onConnected, onTimeout}) { async withConnectionsForAccount(account, {desiredCount, logger, socketTimeout, onConnected}) {
if (!this._poolMap[account.id]) { if (!this._poolMap[account.id]) {
this._poolMap[account.id] = new AccountConnectionPool(account, this._maxConnectionsForAccount(account)); this._poolMap[account.id] = new AccountConnectionPool(account, this._maxConnectionsForAccount(account));
} }
const pool = this._poolMap[account.id]; const pool = this._poolMap[account.id];
await pool.withConnections({desiredCount, logger, onConnected, onTimeout}); await pool.withConnections({desiredCount, logger, socketTimeout, onConnected});
} }
} }