diff --git a/packages/isomorphic-core/index.js b/packages/isomorphic-core/index.js index 98be8a57a..f9eedc2e4 100644 --- a/packages/isomorphic-core/index.js +++ b/packages/isomorphic-core/index.js @@ -12,6 +12,7 @@ module.exports = { PromiseUtils: require('./src/promise-utils'), DatabaseTypes: require('./src/database-types'), IMAPConnection: require('./src/imap-connection'), + IMAPConnectionPool: require('./src/imap-pool'), SendmailClient: require('./src/sendmail-client'), DeltaStreamBuilder: require('./src/delta-stream-builder'), HookTransactionLog: require('./src/hook-transaction-log'), diff --git a/packages/isomorphic-core/spec/imap-pool-spec.es6 b/packages/isomorphic-core/spec/imap-pool-spec.es6 new file mode 100644 index 000000000..813169564 --- /dev/null +++ b/packages/isomorphic-core/spec/imap-pool-spec.es6 @@ -0,0 +1,191 @@ +import IMAPConnectionPool from '../src/imap-pool'; +import IMAPConnection from '../src/imap-connection'; +import IMAPErrors from '../src/imap-errors'; + +describe('IMAPConnectionPool', function describeBlock() { + beforeEach(() => { + this.account = { + id: 'test-account', + decryptedCredentials: () => { return {}; }, + connectionSettings: { + imap_host: 'imap.foobar.com', + }, + }; + IMAPConnectionPool._poolMap = {}; + this.logger = {}; + spyOn(IMAPConnection.prototype, 'connect').and.callFake(function connectFake() { + return this; + }); + spyOn(IMAPConnection.prototype, 'end').and.callFake(() => {}); + }); + + it('opens IMAP connection and properly returns to pool at end of scope', async () => { + let invokedCallback = false; + await IMAPConnectionPool.withConnectionsForAccount(this.account, { + desiredCount: 1, + logger: this.logger, + onConnected: ([conn]) => { + expect(conn instanceof IMAPConnection).toBe(true); + invokedCallback = true; + return false; + }, + }); + expect(invokedCallback).toBe(true); + expect(IMAPConnection.prototype.connect.calls.count()).toBe(1); + expect(IMAPConnection.prototype.end.calls.count()).toBe(0); + }); + + it('opens multiple IMAP connections and properly returns to pool at end of scope', async () => { + let invokedCallback = false; + await IMAPConnectionPool.withConnectionsForAccount(this.account, { + desiredCount: 2, + logger: this.logger, + onConnected: ([conn, otherConn]) => { + expect(conn instanceof IMAPConnection).toBe(true); + expect(otherConn instanceof IMAPConnection).toBe(true); + invokedCallback = true; + return false; + }, + }); + expect(invokedCallback).toBe(true); + expect(IMAPConnection.prototype.connect.calls.count()).toBe(2); + expect(IMAPConnection.prototype.end.calls.count()).toBe(0); + }); + + it('opens an IMAP connection properly and only returns to pool on done', async () => { + let invokedCallback = false; + let doneCallback = null; + await IMAPConnectionPool.withConnectionsForAccount(this.account, { + desiredCount: 1, + logger: this.logger, + onConnected: ([conn], done) => { + expect(conn instanceof IMAPConnection).toBe(true); + invokedCallback = true; + doneCallback = done; + return true; + }, + }); + expect(invokedCallback).toBe(true); + expect(IMAPConnection.prototype.connect.calls.count()).toBe(1); + expect(IMAPConnection.prototype.end.calls.count()).toBe(0); + expect(IMAPConnectionPool._poolMap[this.account.id]._availableConns.length === 2); + doneCallback(); + expect(IMAPConnectionPool._poolMap[this.account.id]._availableConns.length === 3); + }); + + it('does not call connect if already connected', async () => { + let invokedCallback = false; + await IMAPConnectionPool.withConnectionsForAccount(this.account, { + desiredCount: 1, + logger: this.logger, + onConnected: ([conn]) => { + expect(conn instanceof IMAPConnection).toBe(true); + invokedCallback = true; + return false; + }, + }); + expect(invokedCallback).toBe(true); + expect(IMAPConnection.prototype.connect.calls.count()).toBe(1); + expect(IMAPConnection.prototype.end.calls.count()).toBe(0); + + invokedCallback = false; + await IMAPConnectionPool.withConnectionsForAccount(this.account, { + desiredCount: 1, + logger: this.logger, + onConnected: ([conn]) => { + expect(conn instanceof IMAPConnection).toBe(true); + invokedCallback = true; + return false; + }, + }); + + expect(invokedCallback).toBe(true); + expect(IMAPConnection.prototype.connect.calls.count()).toBe(1); + expect(IMAPConnection.prototype.end.calls.count()).toBe(0); + }); + + it('waits for an available IMAP connection', async () => { + let invokedCallback = false; + let doneCallback = null; + await IMAPConnectionPool.withConnectionsForAccount(this.account, { + desiredCount: 3, + logger: this.logger, + onConnected: ([conn], done) => { + expect(conn instanceof IMAPConnection).toBe(true); + invokedCallback = true; + doneCallback = done; + return true; + }, + }); + expect(invokedCallback).toBe(true); + expect(IMAPConnection.prototype.connect.calls.count()).toBe(3); + expect(IMAPConnection.prototype.end.calls.count()).toBe(0); + + invokedCallback = false; + const promise = IMAPConnectionPool.withConnectionsForAccount(this.account, { + desiredCount: 1, + logger: this.logger, + onConnected: ([conn]) => { + expect(conn instanceof IMAPConnection).toBe(true); + invokedCallback = true; + return false; + }, + }); + + expect(IMAPConnectionPool._poolMap[this.account.id]._queue.length).toBe(1) + doneCallback(); + await promise; + + expect(invokedCallback).toBe(true); + expect(IMAPConnection.prototype.connect.calls.count()).toBe(3); + expect(IMAPConnection.prototype.end.calls.count()).toBe(0); + }); + + it('retries on IMAP connection timeout', async () => { + let invokeCount = 0; + await IMAPConnectionPool.withConnectionsForAccount(this.account, { + desiredCount: 1, + logger: this.logger, + onConnected: ([conn]) => { + expect(conn instanceof IMAPConnection).toBe(true); + if (invokeCount === 0) { + invokeCount += 1; + throw new IMAPErrors.IMAPConnectionTimeoutError(); + } + invokeCount += 1; + return false; + }, + }); + + expect(invokeCount).toBe(2); + expect(IMAPConnection.prototype.connect.calls.count()).toBe(2); + expect(IMAPConnection.prototype.end.calls.count()).toBe(1); + }); + + it('does not retry on other IMAP error', async () => { + let invokeCount = 0; + let errorCount = 0; + try { + await IMAPConnectionPool.withConnectionsForAccount(this.account, { + desiredCount: 1, + logger: this.logger, + onConnected: ([conn]) => { + expect(conn instanceof IMAPConnection).toBe(true); + if (invokeCount === 0) { + invokeCount += 1; + throw new IMAPErrors.IMAPSocketError(); + } + invokeCount += 1; + return false; + }, + }); + } catch (err) { + errorCount += 1; + } + + expect(invokeCount).toBe(1); + expect(errorCount).toBe(1); + expect(IMAPConnection.prototype.connect.calls.count()).toBe(1); + expect(IMAPConnection.prototype.end.calls.count()).toBe(1); + }); +}); diff --git a/packages/isomorphic-core/src/imap-pool.es6 b/packages/isomorphic-core/src/imap-pool.es6 new file mode 100644 index 000000000..229b8ca7a --- /dev/null +++ b/packages/isomorphic-core/src/imap-pool.es6 @@ -0,0 +1,121 @@ +const IMAPConnection = require('./imap-connection'); +const IMAPErrors = require('./imap-errors'); +const {ExponentialBackoffScheduler} = require('./backoff-schedulers'); + +const MAX_IMAP_CONNECTIONS_PER_ACCOUNT = 3; +const INITIAL_SOCKET_TIMEOUT_MS = 30 * 1000; // 30 sec +const MAX_SOCKET_TIMEOUT_MS = 10 * 60 * 1000 // 10 min + +class AccountConnectionPool { + constructor(account, maxConnections) { + this._account = account; + this._availableConns = new Array(maxConnections).fill(null); + this._queue = []; + this._backoffScheduler = new ExponentialBackoffScheduler({ + baseDelay: INITIAL_SOCKET_TIMEOUT_MS, + maxDelay: MAX_SOCKET_TIMEOUT_MS, + }); + } + + async _genConnection(socketTimeout, logger) { + const settings = this._account.connectionSettings; + const credentials = this._account.decryptedCredentials(); + + if (!settings || !settings.imap_host) { + throw new Error("_genConnection: There are no IMAP connection settings for this account."); + } + if (!credentials) { + throw new Error("_genConnection: There are no IMAP connection credentials for this account."); + } + + const conn = new IMAPConnection({ + db: null, + settings: Object.assign({}, settings, credentials, {socketTimeout}), + logger, + account: this._account, + }); + + return conn.connect(); + } + + async withConnections({desiredCount, logger, onConnected, onTimeout}) { + // If we wake up from the first await but don't have enough connections in + // the pool then we need to prepend ourselves to the queue until there are + // enough. This guarantees that the queue is fair. + let prependToQueue = false; + while (this._availableConns.length < desiredCount) { + await new Promise((resolve) => { + if (prependToQueue) { + this._queue.unshift(resolve); + } else { + this._queue.push(resolve); + } + }); + prependToQueue = true; + } + + this._backoffScheduler.reset(); + while (true) { + const socketTimeout = this._backoffScheduler.nextDelay(); + let conns = []; + let keepOpen = false; + + const done = () => { + conns.filter(Boolean).forEach((conn) => conn.removeAllListeners()); + this._availableConns = conns.concat(this._availableConns); + if (this._queue.length > 0) { + const resolveWaitForConnection = this._queue.shift(); + resolveWaitForConnection(); + } + }; + + try { + for (let i = 0; i < desiredCount; ++i) { + conns.push(this._availableConns.shift()); + } + conns = await Promise.all(conns.map((c) => (c || this._genConnection(socketTimeout, logger)))); + + // TODO: Indicate which connections had errors so that we can selectively + // refresh them. + keepOpen = await onConnected(conns, done); + break; + } catch (err) { + keepOpen = false; + conns.filter(Boolean).forEach(conn => conn.end()); + conns.fill(null); + + if (err instanceof IMAPErrors.IMAPConnectionTimeoutError) { + if (onTimeout) onTimeout(socketTimeout); + // Put an empty callback at the beginning of the queue so that we + // don't wake another waiting Promise in the finally clause. + this._queue.unshift(() => {}); + continue; + } + + throw err; + } finally { + if (!keepOpen) { + done(); + } + } + } + } +} + +class IMAPConnectionPool { + constructor(maxConnectionsPerAccount) { + this._maxConnectionsPerAccount = maxConnectionsPerAccount; + this._poolMap = {}; + } + + async withConnectionsForAccount(account, {desiredCount, logger, onConnected, onTimeout}) { + if (!this._poolMap[account.id]) { + this._poolMap[account.id] = new AccountConnectionPool(account, this._maxConnectionsPerAccount); + } + + const pool = this._poolMap[account.id]; + await pool.withConnections({desiredCount, logger, onConnected, onTimeout}); + } +} + +module.exports = new IMAPConnectionPool(MAX_IMAP_CONNECTIONS_PER_ACCOUNT);