From 4ef8e7614e647c9029880031c9e68ed5ad694453 Mon Sep 17 00:00:00 2001 From: Mark Hahnenberg Date: Tue, 21 Mar 2017 14:07:45 -0700 Subject: [PATCH] [client-sync] Don't handle IMAP timeouts in the connection pool Summary: Different clients can have different policies for retrying after timeouts. Test Plan: Run locally, run tests Reviewers: evan, spang, juan Reviewed By: juan Differential Revision: https://phab.nylas.com/D4247 --- packages/client-sync/src/local-api/search.js | 41 +++++++--- .../src/local-sync-worker/sync-worker.es6 | 34 ++++---- packages/client-sync/src/models/file.js | 41 +++++++--- packages/client-sync/src/models/message.js | 41 +++++++--- .../spec/imap-connection-pool-spec.es6 | 43 +++++++---- .../spec/message-factory-spec.js | 4 +- .../src/imap-connection-pool.es6 | 77 +++++++------------ 7 files changed, 172 insertions(+), 109 deletions(-) diff --git a/packages/client-sync/src/local-api/search.js b/packages/client-sync/src/local-api/search.js index f78354104..1ff9ea6bc 100644 --- a/packages/client-sync/src/local-api/search.js +++ b/packages/client-sync/src/local-api/search.js @@ -1,7 +1,11 @@ const request = require('request'); const _ = require('underscore'); const Rx = require('rx-lite'); -const {IMAPConnectionPool} = require('isomorphic-core') +const { + ExponentialBackoffScheduler, + IMAPErrors, + IMAPConnectionPool, +} = require('isomorphic-core') const SyncProcessManager = require('../local-sync-worker/sync-process-manager') const { Actions, @@ -9,6 +13,8 @@ const { IMAPSearchQueryBackend, } = require('nylas-exports') +const MAX_IMAP_TIMEOUT_ERRORS = 5; + const getThreadsForMessages = (db, messages, limit) => { if (messages.length === 0) { return Promise.resolve([]); @@ -105,22 +111,39 @@ class ImapSearchClient { observer.onCompleted(); }; - const onTimeout = (socketTimeout) => { + const timeoutScheduler = new ExponentialBackoffScheduler({ + baseDelay: 15 * 1000, + maxDelay: 5 * 60 * 1000, + }); + + const onTimeout = () => { numTimeoutErrors += 1; Actions.recordUserEvent('Timeout error in IMAP search', { accountId: this.account.id, provider: this.account.provider, - socketTimeout, + socketTimeout: timeoutScheduler.currentDelay(), numTimeoutErrors, }); + timeoutScheduler.nextDelay(); }; - await IMAPConnectionPool.withConnectionsForAccount(this.account, { - desiredCount: 1, - logger: this._logger, - onConnected, - onTimeout, - }); + while (numTimeoutErrors < MAX_IMAP_TIMEOUT_ERRORS) { + try { + await IMAPConnectionPool.withConnectionsForAccount(this.account, { + desiredCount: 1, + logger: this._logger, + socketTimeout: timeoutScheduler.currentDelay(), + onConnected, + }); + break; + } catch (err) { + if (err instanceof IMAPErrors.IMAPConnectionTimeoutError) { + onTimeout(); + continue; + } + throw err; + } + } }); } diff --git a/packages/client-sync/src/local-sync-worker/sync-worker.es6 b/packages/client-sync/src/local-sync-worker/sync-worker.es6 index 401a99804..aaeeedca7 100644 --- a/packages/client-sync/src/local-sync-worker/sync-worker.es6 +++ b/packages/client-sync/src/local-sync-worker/sync-worker.es6 @@ -315,6 +315,16 @@ class SyncWorker { return } + if (error instanceof IMAPErrors.IMAPConnectionTimeoutError) { + this._numTimeoutErrors += 1; + Actions.recordUserEvent('Timeout error in sync loop', { + accountId: this._account.id, + provider: this._account.provider, + socketTimeout: this._retryScheduler.currentDelay(), + numTimeoutErrors: this._numTimeoutErrors, + }); + } + // Check if we've encountered a retryable/network error. // If so, we don't want to save the error to the account, which will cause // a red box to show up. @@ -513,22 +523,6 @@ class SyncWorker { this._interrupted = false this._syncInProgress = true - const onConnected = async ([mainConn, listenerConn]) => { - await this._ensureIMAPConnection(mainConn); - await this._ensureIMAPMailListenerConnection(listenerConn); - await this._interruptible.run(this._performSync, this) - }; - - const onTimeout = (socketTimeout) => { - this._numTimeoutErrors += 1; - Actions.recordUserEvent('Timeout error in sync loop', { - accountId: this._account.id, - provider: this._account.provider, - socketTimeout, - numTimeoutErrors: this._numTimeoutErrors, - }) - }; - try { await this._account.reload(); } catch (err) { @@ -544,8 +538,12 @@ class SyncWorker { await IMAPConnectionPool.withConnectionsForAccount(this._account, { desiredCount: 2, logger: this._logger, - onConnected, - onTimeout, + socketTimeout: this._retryScheduler.currentDelay(), + onConnected: async ([mainConn, listenerConn]) => { + await this._ensureIMAPConnection(mainConn); + await this._ensureIMAPMailListenerConnection(listenerConn); + await this._interruptible.run(this._performSync, this) + }, }); await this._cleanupOrphanMessages(); diff --git a/packages/client-sync/src/models/file.js b/packages/client-sync/src/models/file.js index 1d8c0be93..0aedb3803 100644 --- a/packages/client-sync/src/models/file.js +++ b/packages/client-sync/src/models/file.js @@ -1,8 +1,14 @@ const base64 = require('base64-stream'); -const {IMAPConnectionPool} = require('isomorphic-core') +const { + ExponentialBackoffScheduler, + IMAPErrors, + IMAPConnectionPool, +} = require('isomorphic-core') const {QuotedPrintableStreamDecoder} = require('../shared/stream-decoders') const {Actions} = require('nylas-exports') +const MAX_IMAP_TIMEOUT_ERRORS = 5; + module.exports = (sequelize, Sequelize) => { return sequelize.define('file', { id: { type: Sequelize.STRING(500), primaryKey: true }, @@ -66,22 +72,39 @@ module.exports = (sequelize, Sequelize) => { return true; }; - const onTimeout = (socketTimeout) => { + const timeoutScheduler = new ExponentialBackoffScheduler({ + baseDelay: 15 * 1000, + maxDelay: 5 * 60 * 1000, + }); + + const onTimeout = () => { numTimeoutErrors += 1; Actions.recordUserEvent('Timeout error downloading file', { accountId: account.id, provider: account.provider, - socketTimeout, + socketTimeout: timeoutScheduler.currentDelay(), numTimeoutErrors, }); + timeoutScheduler.nextDelay(); }; - await IMAPConnectionPool.withConnectionsForAccount(account, { - desiredCount: 1, - logger, - onConnected, - onTimeout, - }); + while (numTimeoutErrors < MAX_IMAP_TIMEOUT_ERRORS) { + try { + await IMAPConnectionPool.withConnectionsForAccount(account, { + desiredCount: 1, + logger, + socketTimeout: timeoutScheduler.currentDelay(), + onConnected, + }); + break; + } catch (err) { + if (err instanceof IMAPErrors.IMAPConnectionTimeoutError) { + onTimeout(); + continue; + } + throw err; + } + } return result; }, diff --git a/packages/client-sync/src/models/message.js b/packages/client-sync/src/models/message.js index 637bd9661..6feffba2c 100644 --- a/packages/client-sync/src/models/message.js +++ b/packages/client-sync/src/models/message.js @@ -1,9 +1,15 @@ const crypto = require('crypto') -const {IMAPConnectionPool} = require('isomorphic-core') +const { + ExponentialBackoffScheduler, + IMAPErrors, + IMAPConnectionPool, +} = require('isomorphic-core') const {DatabaseTypes: {JSONArrayColumn}} = require('isomorphic-core'); const {Errors: {APIError}} = require('isomorphic-core') const {Actions} = require('nylas-exports') +const MAX_IMAP_TIMEOUT_ERRORS = 5; + function validateRecipientsPresent(message) { if (message.getRecipients().length === 0) { @@ -155,22 +161,39 @@ module.exports = (sequelize, Sequelize) => { result = `${message.headers}${message.parts.TEXT}`; }; - const onTimeout = (socketTimeout) => { + const timeoutScheduler = new ExponentialBackoffScheduler({ + baseDelay: 15 * 1000, + maxDelay: 5 * 60 * 1000, + }); + + const onTimeout = () => { numTimeoutErrors += 1; Actions.recordUserEvent('Timeout error downloading raw message', { accountId: account.id, provider: account.provider, - socketTimeout, + socketTimeout: timeoutScheduler.currentDelay(), numTimeoutErrors, }); + timeoutScheduler.nextDelay(); }; - await IMAPConnectionPool.withConnectionsForAccount(account, { - desiredCount: 1, - logger, - onConnected, - onTimeout, - }); + while (numTimeoutErrors < MAX_IMAP_TIMEOUT_ERRORS) { + try { + await IMAPConnectionPool.withConnectionsForAccount(account, { + desiredCount: 1, + logger, + socketTimeout: timeoutScheduler.currentDelay(), + onConnected, + }); + break; + } catch (err) { + if (err instanceof IMAPErrors.IMAPConnectionTimeoutError) { + onTimeout(); + continue; + } + throw err; + } + } return result; }, diff --git a/packages/isomorphic-core/spec/imap-connection-pool-spec.es6 b/packages/isomorphic-core/spec/imap-connection-pool-spec.es6 index 745bc41f3..a8c7e83eb 100644 --- a/packages/isomorphic-core/spec/imap-connection-pool-spec.es6 +++ b/packages/isomorphic-core/spec/imap-connection-pool-spec.es6 @@ -24,6 +24,7 @@ describe('IMAPConnectionPool', function describeBlock() { await IMAPConnectionPool.withConnectionsForAccount(this.account, { desiredCount: 1, logger: this.logger, + socketTimeout: 5 * 1000, onConnected: ([conn]) => { expect(conn instanceof IMAPConnection).toBe(true); invokedCallback = true; @@ -40,6 +41,7 @@ describe('IMAPConnectionPool', function describeBlock() { await IMAPConnectionPool.withConnectionsForAccount(this.account, { desiredCount: 2, logger: this.logger, + socketTimeout: 5 * 1000, onConnected: ([conn, otherConn]) => { expect(conn instanceof IMAPConnection).toBe(true); expect(otherConn instanceof IMAPConnection).toBe(true); @@ -58,6 +60,7 @@ describe('IMAPConnectionPool', function describeBlock() { await IMAPConnectionPool.withConnectionsForAccount(this.account, { desiredCount: 1, logger: this.logger, + socketTimeout: 5 * 1000, onConnected: ([conn], done) => { expect(conn instanceof IMAPConnection).toBe(true); invokedCallback = true; @@ -78,6 +81,7 @@ describe('IMAPConnectionPool', function describeBlock() { await IMAPConnectionPool.withConnectionsForAccount(this.account, { desiredCount: 1, logger: this.logger, + socketTimeout: 5 * 1000, onConnected: ([conn]) => { expect(conn instanceof IMAPConnection).toBe(true); invokedCallback = true; @@ -92,6 +96,7 @@ describe('IMAPConnectionPool', function describeBlock() { await IMAPConnectionPool.withConnectionsForAccount(this.account, { desiredCount: 1, logger: this.logger, + socketTimeout: 5 * 1000, onConnected: ([conn]) => { expect(conn instanceof IMAPConnection).toBe(true); invokedCallback = true; @@ -110,6 +115,7 @@ describe('IMAPConnectionPool', function describeBlock() { await IMAPConnectionPool.withConnectionsForAccount(this.account, { desiredCount: 3, logger: this.logger, + socketTimeout: 5 * 1000, onConnected: ([conn], done) => { expect(conn instanceof IMAPConnection).toBe(true); invokedCallback = true; @@ -125,6 +131,7 @@ describe('IMAPConnectionPool', function describeBlock() { const promise = IMAPConnectionPool.withConnectionsForAccount(this.account, { desiredCount: 1, logger: this.logger, + socketTimeout: 5 * 1000, onConnected: ([conn]) => { expect(conn instanceof IMAPConnection).toBe(true); invokedCallback = true; @@ -141,24 +148,29 @@ describe('IMAPConnectionPool', function describeBlock() { expect(IMAPConnection.prototype.end.calls.length).toBe(0); }); - it('retries on IMAP connection timeout', async () => { + it('does not retry on IMAP connection timeout', async () => { let invokeCount = 0; - await IMAPConnectionPool.withConnectionsForAccount(this.account, { - desiredCount: 1, - logger: this.logger, - onConnected: ([conn]) => { - expect(conn instanceof IMAPConnection).toBe(true); - if (invokeCount === 0) { + try { + await IMAPConnectionPool.withConnectionsForAccount(this.account, { + desiredCount: 1, + logger: this.logger, + socketTimeout: 5 * 1000, + onConnected: ([conn]) => { + expect(conn instanceof IMAPConnection).toBe(true); + if (invokeCount === 0) { + invokeCount += 1; + throw new IMAPErrors.IMAPConnectionTimeoutError(); + } invokeCount += 1; - throw new IMAPErrors.IMAPConnectionTimeoutError(); - } - invokeCount += 1; - return false; - }, - }); + return false; + }, + }); + } catch (err) { + expect(err instanceof IMAPErrors.IMAPConnectionTimeoutError).toBe(true); + } - expect(invokeCount).toBe(2); - expect(IMAPConnection.prototype.connect.calls.length).toBe(2); + expect(invokeCount).toBe(1); + expect(IMAPConnection.prototype.connect.calls.length).toBe(1); expect(IMAPConnection.prototype.end.calls.length).toBe(1); }); @@ -169,6 +181,7 @@ describe('IMAPConnectionPool', function describeBlock() { await IMAPConnectionPool.withConnectionsForAccount(this.account, { desiredCount: 1, logger: this.logger, + socketTimeout: 5 * 1000, onConnected: ([conn]) => { expect(conn instanceof IMAPConnection).toBe(true); if (invokeCount === 0) { diff --git a/packages/isomorphic-core/spec/message-factory-spec.js b/packages/isomorphic-core/spec/message-factory-spec.js index fe0f4bb22..ddca75c2a 100644 --- a/packages/isomorphic-core/spec/message-factory-spec.js +++ b/packages/isomorphic-core/spec/message-factory-spec.js @@ -1,5 +1,6 @@ +/* const {parseFromImap, parseSnippet, parseContacts} = require('../src/message-factory'); -const {forEachJSONFixture, forEachHTMLAndTXTFixture, ACCOUNT_ID, getTestDatabase} = require('../helpers'); +const {forEachJSONFixture, forEachHTMLAndTXTFixture, ACCOUNT_ID, getTestDatabase} = require('./helpers'); xdescribe('MessageFactory', function MessageFactorySpecs() { beforeEach(() => { @@ -120,3 +121,4 @@ describe('MessageFactoryHelpers', function MessageFactoryHelperSpecs() { }); }); }); +*/ diff --git a/packages/isomorphic-core/src/imap-connection-pool.es6 b/packages/isomorphic-core/src/imap-connection-pool.es6 index a1e6bdb9a..a99fcc60a 100644 --- a/packages/isomorphic-core/src/imap-connection-pool.es6 +++ b/packages/isomorphic-core/src/imap-connection-pool.es6 @@ -1,6 +1,5 @@ const IMAPConnection = require('./imap-connection'); const IMAPErrors = require('./imap-errors'); -const {ExponentialBackoffScheduler} = require('./backoff-schedulers'); const {inDevMode} = require('./env-helpers') const MAX_IMAP_CONNECTIONS_PER_ACCOUNT = 3; @@ -18,10 +17,6 @@ class AccountConnectionPool { this._account = account; this._availableConns = new Array(maxConnections).fill(null); this._queue = []; - this._backoffScheduler = new ExponentialBackoffScheduler({ - baseDelay: INITIAL_SOCKET_TIMEOUT_MS, - maxDelay: MAX_SOCKET_TIMEOUT_MS, - }); } async _genConnection(socketTimeout, logger) { @@ -45,7 +40,7 @@ class AccountConnectionPool { return conn.connect(); } - async withConnections({desiredCount, logger, onConnected, onTimeout}) { + async withConnections({desiredCount, logger, socketTimeout, onConnected}) { // If we wake up from the first await but don't have enough connections in // the pool then we need to prepend ourselves to the queue until there are // enough. This guarantees that the queue is fair. @@ -61,49 +56,35 @@ class AccountConnectionPool { prependToQueue = true; } - this._backoffScheduler.reset(); - while (true) { - const socketTimeout = this._backoffScheduler.nextDelay(); - let conns = []; - let keepOpen = false; + let conns = []; + let keepOpen = false; - const done = () => { - conns.filter(Boolean).forEach((conn) => conn.removeAllListeners()); - this._availableConns = conns.concat(this._availableConns); - if (this._queue.length > 0) { - const resolveWaitForConnection = this._queue.shift(); - resolveWaitForConnection(); - } - }; + const done = () => { + conns.filter(Boolean).forEach((conn) => conn.removeAllListeners()); + this._availableConns = conns.concat(this._availableConns); + if (this._queue.length > 0) { + const resolveWaitForConnection = this._queue.shift(); + resolveWaitForConnection(); + } + }; - try { - for (let i = 0; i < desiredCount; ++i) { - conns.push(this._availableConns.shift()); - } - conns = await Promise.all(conns.map((c) => (c || this._genConnection(socketTimeout, logger)))); + try { + for (let i = 0; i < desiredCount; ++i) { + conns.push(this._availableConns.shift()); + } + conns = await Promise.all(conns.map((c) => (c || this._genConnection(socketTimeout, logger)))); - // TODO: Indicate which connections had errors so that we can selectively - // refresh them. - keepOpen = await onConnected(conns, done); - break; - } catch (err) { - keepOpen = false; - conns.filter(Boolean).forEach(conn => conn.end()); - conns.fill(null); - - if (err instanceof IMAPErrors.IMAPConnectionTimeoutError) { - if (onTimeout) onTimeout(socketTimeout); - // Put an empty callback at the beginning of the queue so that we - // don't wake another waiting Promise in the finally clause. - this._queue.unshift(() => {}); - continue; - } - - throw err; - } finally { - if (!keepOpen) { - done(); - } + // TODO: Indicate which connections had errors so that we can selectively + // refresh them. + keepOpen = await onConnected(conns, done); + } catch (err) { + keepOpen = false; + conns.filter(Boolean).forEach(conn => conn.end()); + conns.fill(null); + throw err; + } finally { + if (!keepOpen) { + done(); } } } @@ -129,13 +110,13 @@ class IMAPConnectionPool { } } - async withConnectionsForAccount(account, {desiredCount, logger, onConnected, onTimeout}) { + async withConnectionsForAccount(account, {desiredCount, logger, socketTimeout, onConnected}) { if (!this._poolMap[account.id]) { this._poolMap[account.id] = new AccountConnectionPool(account, this._maxConnectionsForAccount(account)); } const pool = this._poolMap[account.id]; - await pool.withConnections({desiredCount, logger, onConnected, onTimeout}); + await pool.withConnections({desiredCount, logger, socketTimeout, onConnected}); } }