From c634380ab612ebb3d0a9ba63652b0406ba938817 Mon Sep 17 00:00:00 2001 From: Mark Hahnenberg Date: Fri, 17 Feb 2017 12:05:50 -0800 Subject: [PATCH] [client-sync] Add per-Account IMAP connection pooling Summary: Prior to this diff it was easy for us to create too many IMAP connections (e.g. by requesting many attachments at once), causing random failures when the server would reject our connection attempts. This diff adds a per-Account IMAP pooling mechanism so that we avoid these failures. Test Plan: Run locally with sync worker and several other clients using the pool, verify correct behavior. Also added a few unit tests. Reviewers: evan, spang, juan Reviewed By: juan Differential Revision: https://phab.nylas.com/D3965 --- packages/isomorphic-core/index.js | 1 + .../isomorphic-core/spec/imap-pool-spec.es6 | 191 ++++++++++++++++++ packages/isomorphic-core/src/imap-pool.es6 | 121 +++++++++++ 3 files changed, 313 insertions(+) create mode 100644 packages/isomorphic-core/spec/imap-pool-spec.es6 create mode 100644 packages/isomorphic-core/src/imap-pool.es6 diff --git a/packages/isomorphic-core/index.js b/packages/isomorphic-core/index.js index 98be8a57a..f9eedc2e4 100644 --- a/packages/isomorphic-core/index.js +++ b/packages/isomorphic-core/index.js @@ -12,6 +12,7 @@ module.exports = { PromiseUtils: require('./src/promise-utils'), DatabaseTypes: require('./src/database-types'), IMAPConnection: require('./src/imap-connection'), + IMAPConnectionPool: require('./src/imap-pool'), SendmailClient: require('./src/sendmail-client'), DeltaStreamBuilder: require('./src/delta-stream-builder'), HookTransactionLog: require('./src/hook-transaction-log'), diff --git a/packages/isomorphic-core/spec/imap-pool-spec.es6 b/packages/isomorphic-core/spec/imap-pool-spec.es6 new file mode 100644 index 000000000..813169564 --- /dev/null +++ b/packages/isomorphic-core/spec/imap-pool-spec.es6 @@ -0,0 +1,191 @@ +import IMAPConnectionPool from '../src/imap-pool'; +import IMAPConnection from '../src/imap-connection'; +import IMAPErrors from '../src/imap-errors'; + +describe('IMAPConnectionPool', function describeBlock() { + beforeEach(() => { + this.account = { + id: 'test-account', + decryptedCredentials: () => { return {}; }, + connectionSettings: { + imap_host: 'imap.foobar.com', + }, + }; + IMAPConnectionPool._poolMap = {}; + this.logger = {}; + spyOn(IMAPConnection.prototype, 'connect').and.callFake(function connectFake() { + return this; + }); + spyOn(IMAPConnection.prototype, 'end').and.callFake(() => {}); + }); + + it('opens IMAP connection and properly returns to pool at end of scope', async () => { + let invokedCallback = false; + await IMAPConnectionPool.withConnectionsForAccount(this.account, { + desiredCount: 1, + logger: this.logger, + onConnected: ([conn]) => { + expect(conn instanceof IMAPConnection).toBe(true); + invokedCallback = true; + return false; + }, + }); + expect(invokedCallback).toBe(true); + expect(IMAPConnection.prototype.connect.calls.count()).toBe(1); + expect(IMAPConnection.prototype.end.calls.count()).toBe(0); + }); + + it('opens multiple IMAP connections and properly returns to pool at end of scope', async () => { + let invokedCallback = false; + await IMAPConnectionPool.withConnectionsForAccount(this.account, { + desiredCount: 2, + logger: this.logger, + onConnected: ([conn, otherConn]) => { + expect(conn instanceof IMAPConnection).toBe(true); + expect(otherConn instanceof IMAPConnection).toBe(true); + invokedCallback = true; + return false; + }, + }); + expect(invokedCallback).toBe(true); + expect(IMAPConnection.prototype.connect.calls.count()).toBe(2); + expect(IMAPConnection.prototype.end.calls.count()).toBe(0); + }); + + it('opens an IMAP connection properly and only returns to pool on done', async () => { + let invokedCallback = false; + let doneCallback = null; + await IMAPConnectionPool.withConnectionsForAccount(this.account, { + desiredCount: 1, + logger: this.logger, + onConnected: ([conn], done) => { + expect(conn instanceof IMAPConnection).toBe(true); + invokedCallback = true; + doneCallback = done; + return true; + }, + }); + expect(invokedCallback).toBe(true); + expect(IMAPConnection.prototype.connect.calls.count()).toBe(1); + expect(IMAPConnection.prototype.end.calls.count()).toBe(0); + expect(IMAPConnectionPool._poolMap[this.account.id]._availableConns.length === 2); + doneCallback(); + expect(IMAPConnectionPool._poolMap[this.account.id]._availableConns.length === 3); + }); + + it('does not call connect if already connected', async () => { + let invokedCallback = false; + await IMAPConnectionPool.withConnectionsForAccount(this.account, { + desiredCount: 1, + logger: this.logger, + onConnected: ([conn]) => { + expect(conn instanceof IMAPConnection).toBe(true); + invokedCallback = true; + return false; + }, + }); + expect(invokedCallback).toBe(true); + expect(IMAPConnection.prototype.connect.calls.count()).toBe(1); + expect(IMAPConnection.prototype.end.calls.count()).toBe(0); + + invokedCallback = false; + await IMAPConnectionPool.withConnectionsForAccount(this.account, { + desiredCount: 1, + logger: this.logger, + onConnected: ([conn]) => { + expect(conn instanceof IMAPConnection).toBe(true); + invokedCallback = true; + return false; + }, + }); + + expect(invokedCallback).toBe(true); + expect(IMAPConnection.prototype.connect.calls.count()).toBe(1); + expect(IMAPConnection.prototype.end.calls.count()).toBe(0); + }); + + it('waits for an available IMAP connection', async () => { + let invokedCallback = false; + let doneCallback = null; + await IMAPConnectionPool.withConnectionsForAccount(this.account, { + desiredCount: 3, + logger: this.logger, + onConnected: ([conn], done) => { + expect(conn instanceof IMAPConnection).toBe(true); + invokedCallback = true; + doneCallback = done; + return true; + }, + }); + expect(invokedCallback).toBe(true); + expect(IMAPConnection.prototype.connect.calls.count()).toBe(3); + expect(IMAPConnection.prototype.end.calls.count()).toBe(0); + + invokedCallback = false; + const promise = IMAPConnectionPool.withConnectionsForAccount(this.account, { + desiredCount: 1, + logger: this.logger, + onConnected: ([conn]) => { + expect(conn instanceof IMAPConnection).toBe(true); + invokedCallback = true; + return false; + }, + }); + + expect(IMAPConnectionPool._poolMap[this.account.id]._queue.length).toBe(1) + doneCallback(); + await promise; + + expect(invokedCallback).toBe(true); + expect(IMAPConnection.prototype.connect.calls.count()).toBe(3); + expect(IMAPConnection.prototype.end.calls.count()).toBe(0); + }); + + it('retries on IMAP connection timeout', async () => { + let invokeCount = 0; + await IMAPConnectionPool.withConnectionsForAccount(this.account, { + desiredCount: 1, + logger: this.logger, + onConnected: ([conn]) => { + expect(conn instanceof IMAPConnection).toBe(true); + if (invokeCount === 0) { + invokeCount += 1; + throw new IMAPErrors.IMAPConnectionTimeoutError(); + } + invokeCount += 1; + return false; + }, + }); + + expect(invokeCount).toBe(2); + expect(IMAPConnection.prototype.connect.calls.count()).toBe(2); + expect(IMAPConnection.prototype.end.calls.count()).toBe(1); + }); + + it('does not retry on other IMAP error', async () => { + let invokeCount = 0; + let errorCount = 0; + try { + await IMAPConnectionPool.withConnectionsForAccount(this.account, { + desiredCount: 1, + logger: this.logger, + onConnected: ([conn]) => { + expect(conn instanceof IMAPConnection).toBe(true); + if (invokeCount === 0) { + invokeCount += 1; + throw new IMAPErrors.IMAPSocketError(); + } + invokeCount += 1; + return false; + }, + }); + } catch (err) { + errorCount += 1; + } + + expect(invokeCount).toBe(1); + expect(errorCount).toBe(1); + expect(IMAPConnection.prototype.connect.calls.count()).toBe(1); + expect(IMAPConnection.prototype.end.calls.count()).toBe(1); + }); +}); diff --git a/packages/isomorphic-core/src/imap-pool.es6 b/packages/isomorphic-core/src/imap-pool.es6 new file mode 100644 index 000000000..229b8ca7a --- /dev/null +++ b/packages/isomorphic-core/src/imap-pool.es6 @@ -0,0 +1,121 @@ +const IMAPConnection = require('./imap-connection'); +const IMAPErrors = require('./imap-errors'); +const {ExponentialBackoffScheduler} = require('./backoff-schedulers'); + +const MAX_IMAP_CONNECTIONS_PER_ACCOUNT = 3; +const INITIAL_SOCKET_TIMEOUT_MS = 30 * 1000; // 30 sec +const MAX_SOCKET_TIMEOUT_MS = 10 * 60 * 1000 // 10 min + +class AccountConnectionPool { + constructor(account, maxConnections) { + this._account = account; + this._availableConns = new Array(maxConnections).fill(null); + this._queue = []; + this._backoffScheduler = new ExponentialBackoffScheduler({ + baseDelay: INITIAL_SOCKET_TIMEOUT_MS, + maxDelay: MAX_SOCKET_TIMEOUT_MS, + }); + } + + async _genConnection(socketTimeout, logger) { + const settings = this._account.connectionSettings; + const credentials = this._account.decryptedCredentials(); + + if (!settings || !settings.imap_host) { + throw new Error("_genConnection: There are no IMAP connection settings for this account."); + } + if (!credentials) { + throw new Error("_genConnection: There are no IMAP connection credentials for this account."); + } + + const conn = new IMAPConnection({ + db: null, + settings: Object.assign({}, settings, credentials, {socketTimeout}), + logger, + account: this._account, + }); + + return conn.connect(); + } + + async withConnections({desiredCount, logger, onConnected, onTimeout}) { + // If we wake up from the first await but don't have enough connections in + // the pool then we need to prepend ourselves to the queue until there are + // enough. This guarantees that the queue is fair. + let prependToQueue = false; + while (this._availableConns.length < desiredCount) { + await new Promise((resolve) => { + if (prependToQueue) { + this._queue.unshift(resolve); + } else { + this._queue.push(resolve); + } + }); + prependToQueue = true; + } + + this._backoffScheduler.reset(); + while (true) { + const socketTimeout = this._backoffScheduler.nextDelay(); + let conns = []; + let keepOpen = false; + + const done = () => { + conns.filter(Boolean).forEach((conn) => conn.removeAllListeners()); + this._availableConns = conns.concat(this._availableConns); + if (this._queue.length > 0) { + const resolveWaitForConnection = this._queue.shift(); + resolveWaitForConnection(); + } + }; + + try { + for (let i = 0; i < desiredCount; ++i) { + conns.push(this._availableConns.shift()); + } + conns = await Promise.all(conns.map((c) => (c || this._genConnection(socketTimeout, logger)))); + + // TODO: Indicate which connections had errors so that we can selectively + // refresh them. + keepOpen = await onConnected(conns, done); + break; + } catch (err) { + keepOpen = false; + conns.filter(Boolean).forEach(conn => conn.end()); + conns.fill(null); + + if (err instanceof IMAPErrors.IMAPConnectionTimeoutError) { + if (onTimeout) onTimeout(socketTimeout); + // Put an empty callback at the beginning of the queue so that we + // don't wake another waiting Promise in the finally clause. + this._queue.unshift(() => {}); + continue; + } + + throw err; + } finally { + if (!keepOpen) { + done(); + } + } + } + } +} + +class IMAPConnectionPool { + constructor(maxConnectionsPerAccount) { + this._maxConnectionsPerAccount = maxConnectionsPerAccount; + this._poolMap = {}; + } + + async withConnectionsForAccount(account, {desiredCount, logger, onConnected, onTimeout}) { + if (!this._poolMap[account.id]) { + this._poolMap[account.id] = new AccountConnectionPool(account, this._maxConnectionsPerAccount); + } + + const pool = this._poolMap[account.id]; + await pool.withConnections({desiredCount, logger, onConnected, onTimeout}); + } +} + +module.exports = new IMAPConnectionPool(MAX_IMAP_CONNECTIONS_PER_ACCOUNT);