[client-sync] Add per-Account IMAP connection pooling

Summary:
Prior to this diff it was easy for us to create too many IMAP connections (e.g.
by requesting many attachments at once), causing random failures when the
server would reject our connection attempts. This diff adds a per-Account IMAP
pooling mechanism so that we avoid these failures.

Test Plan:
Run locally with sync worker and several other clients using the
pool, verify correct behavior. Also added a few unit tests.

Reviewers: evan, spang, juan

Reviewed By: juan

Differential Revision: https://phab.nylas.com/D3965
This commit is contained in:
Mark Hahnenberg 2017-02-17 12:05:50 -08:00
parent 66873e3f9e
commit c634380ab6
3 changed files with 313 additions and 0 deletions

View file

@ -12,6 +12,7 @@ module.exports = {
PromiseUtils: require('./src/promise-utils'),
DatabaseTypes: require('./src/database-types'),
IMAPConnection: require('./src/imap-connection'),
IMAPConnectionPool: require('./src/imap-pool'),
SendmailClient: require('./src/sendmail-client'),
DeltaStreamBuilder: require('./src/delta-stream-builder'),
HookTransactionLog: require('./src/hook-transaction-log'),

View file

@ -0,0 +1,191 @@
import IMAPConnectionPool from '../src/imap-pool';
import IMAPConnection from '../src/imap-connection';
import IMAPErrors from '../src/imap-errors';
describe('IMAPConnectionPool', function describeBlock() {
beforeEach(() => {
this.account = {
id: 'test-account',
decryptedCredentials: () => { return {}; },
connectionSettings: {
imap_host: 'imap.foobar.com',
},
};
IMAPConnectionPool._poolMap = {};
this.logger = {};
spyOn(IMAPConnection.prototype, 'connect').and.callFake(function connectFake() {
return this;
});
spyOn(IMAPConnection.prototype, 'end').and.callFake(() => {});
});
it('opens IMAP connection and properly returns to pool at end of scope', async () => {
let invokedCallback = false;
await IMAPConnectionPool.withConnectionsForAccount(this.account, {
desiredCount: 1,
logger: this.logger,
onConnected: ([conn]) => {
expect(conn instanceof IMAPConnection).toBe(true);
invokedCallback = true;
return false;
},
});
expect(invokedCallback).toBe(true);
expect(IMAPConnection.prototype.connect.calls.count()).toBe(1);
expect(IMAPConnection.prototype.end.calls.count()).toBe(0);
});
it('opens multiple IMAP connections and properly returns to pool at end of scope', async () => {
let invokedCallback = false;
await IMAPConnectionPool.withConnectionsForAccount(this.account, {
desiredCount: 2,
logger: this.logger,
onConnected: ([conn, otherConn]) => {
expect(conn instanceof IMAPConnection).toBe(true);
expect(otherConn instanceof IMAPConnection).toBe(true);
invokedCallback = true;
return false;
},
});
expect(invokedCallback).toBe(true);
expect(IMAPConnection.prototype.connect.calls.count()).toBe(2);
expect(IMAPConnection.prototype.end.calls.count()).toBe(0);
});
it('opens an IMAP connection properly and only returns to pool on done', async () => {
let invokedCallback = false;
let doneCallback = null;
await IMAPConnectionPool.withConnectionsForAccount(this.account, {
desiredCount: 1,
logger: this.logger,
onConnected: ([conn], done) => {
expect(conn instanceof IMAPConnection).toBe(true);
invokedCallback = true;
doneCallback = done;
return true;
},
});
expect(invokedCallback).toBe(true);
expect(IMAPConnection.prototype.connect.calls.count()).toBe(1);
expect(IMAPConnection.prototype.end.calls.count()).toBe(0);
expect(IMAPConnectionPool._poolMap[this.account.id]._availableConns.length === 2);
doneCallback();
expect(IMAPConnectionPool._poolMap[this.account.id]._availableConns.length === 3);
});
it('does not call connect if already connected', async () => {
let invokedCallback = false;
await IMAPConnectionPool.withConnectionsForAccount(this.account, {
desiredCount: 1,
logger: this.logger,
onConnected: ([conn]) => {
expect(conn instanceof IMAPConnection).toBe(true);
invokedCallback = true;
return false;
},
});
expect(invokedCallback).toBe(true);
expect(IMAPConnection.prototype.connect.calls.count()).toBe(1);
expect(IMAPConnection.prototype.end.calls.count()).toBe(0);
invokedCallback = false;
await IMAPConnectionPool.withConnectionsForAccount(this.account, {
desiredCount: 1,
logger: this.logger,
onConnected: ([conn]) => {
expect(conn instanceof IMAPConnection).toBe(true);
invokedCallback = true;
return false;
},
});
expect(invokedCallback).toBe(true);
expect(IMAPConnection.prototype.connect.calls.count()).toBe(1);
expect(IMAPConnection.prototype.end.calls.count()).toBe(0);
});
it('waits for an available IMAP connection', async () => {
let invokedCallback = false;
let doneCallback = null;
await IMAPConnectionPool.withConnectionsForAccount(this.account, {
desiredCount: 3,
logger: this.logger,
onConnected: ([conn], done) => {
expect(conn instanceof IMAPConnection).toBe(true);
invokedCallback = true;
doneCallback = done;
return true;
},
});
expect(invokedCallback).toBe(true);
expect(IMAPConnection.prototype.connect.calls.count()).toBe(3);
expect(IMAPConnection.prototype.end.calls.count()).toBe(0);
invokedCallback = false;
const promise = IMAPConnectionPool.withConnectionsForAccount(this.account, {
desiredCount: 1,
logger: this.logger,
onConnected: ([conn]) => {
expect(conn instanceof IMAPConnection).toBe(true);
invokedCallback = true;
return false;
},
});
expect(IMAPConnectionPool._poolMap[this.account.id]._queue.length).toBe(1)
doneCallback();
await promise;
expect(invokedCallback).toBe(true);
expect(IMAPConnection.prototype.connect.calls.count()).toBe(3);
expect(IMAPConnection.prototype.end.calls.count()).toBe(0);
});
it('retries on IMAP connection timeout', async () => {
let invokeCount = 0;
await IMAPConnectionPool.withConnectionsForAccount(this.account, {
desiredCount: 1,
logger: this.logger,
onConnected: ([conn]) => {
expect(conn instanceof IMAPConnection).toBe(true);
if (invokeCount === 0) {
invokeCount += 1;
throw new IMAPErrors.IMAPConnectionTimeoutError();
}
invokeCount += 1;
return false;
},
});
expect(invokeCount).toBe(2);
expect(IMAPConnection.prototype.connect.calls.count()).toBe(2);
expect(IMAPConnection.prototype.end.calls.count()).toBe(1);
});
it('does not retry on other IMAP error', async () => {
let invokeCount = 0;
let errorCount = 0;
try {
await IMAPConnectionPool.withConnectionsForAccount(this.account, {
desiredCount: 1,
logger: this.logger,
onConnected: ([conn]) => {
expect(conn instanceof IMAPConnection).toBe(true);
if (invokeCount === 0) {
invokeCount += 1;
throw new IMAPErrors.IMAPSocketError();
}
invokeCount += 1;
return false;
},
});
} catch (err) {
errorCount += 1;
}
expect(invokeCount).toBe(1);
expect(errorCount).toBe(1);
expect(IMAPConnection.prototype.connect.calls.count()).toBe(1);
expect(IMAPConnection.prototype.end.calls.count()).toBe(1);
});
});

View file

@ -0,0 +1,121 @@
const IMAPConnection = require('./imap-connection');
const IMAPErrors = require('./imap-errors');
const {ExponentialBackoffScheduler} = require('./backoff-schedulers');
const MAX_IMAP_CONNECTIONS_PER_ACCOUNT = 3;
const INITIAL_SOCKET_TIMEOUT_MS = 30 * 1000; // 30 sec
const MAX_SOCKET_TIMEOUT_MS = 10 * 60 * 1000 // 10 min
class AccountConnectionPool {
constructor(account, maxConnections) {
this._account = account;
this._availableConns = new Array(maxConnections).fill(null);
this._queue = [];
this._backoffScheduler = new ExponentialBackoffScheduler({
baseDelay: INITIAL_SOCKET_TIMEOUT_MS,
maxDelay: MAX_SOCKET_TIMEOUT_MS,
});
}
async _genConnection(socketTimeout, logger) {
const settings = this._account.connectionSettings;
const credentials = this._account.decryptedCredentials();
if (!settings || !settings.imap_host) {
throw new Error("_genConnection: There are no IMAP connection settings for this account.");
}
if (!credentials) {
throw new Error("_genConnection: There are no IMAP connection credentials for this account.");
}
const conn = new IMAPConnection({
db: null,
settings: Object.assign({}, settings, credentials, {socketTimeout}),
logger,
account: this._account,
});
return conn.connect();
}
async withConnections({desiredCount, logger, onConnected, onTimeout}) {
// If we wake up from the first await but don't have enough connections in
// the pool then we need to prepend ourselves to the queue until there are
// enough. This guarantees that the queue is fair.
let prependToQueue = false;
while (this._availableConns.length < desiredCount) {
await new Promise((resolve) => {
if (prependToQueue) {
this._queue.unshift(resolve);
} else {
this._queue.push(resolve);
}
});
prependToQueue = true;
}
this._backoffScheduler.reset();
while (true) {
const socketTimeout = this._backoffScheduler.nextDelay();
let conns = [];
let keepOpen = false;
const done = () => {
conns.filter(Boolean).forEach((conn) => conn.removeAllListeners());
this._availableConns = conns.concat(this._availableConns);
if (this._queue.length > 0) {
const resolveWaitForConnection = this._queue.shift();
resolveWaitForConnection();
}
};
try {
for (let i = 0; i < desiredCount; ++i) {
conns.push(this._availableConns.shift());
}
conns = await Promise.all(conns.map((c) => (c || this._genConnection(socketTimeout, logger))));
// TODO: Indicate which connections had errors so that we can selectively
// refresh them.
keepOpen = await onConnected(conns, done);
break;
} catch (err) {
keepOpen = false;
conns.filter(Boolean).forEach(conn => conn.end());
conns.fill(null);
if (err instanceof IMAPErrors.IMAPConnectionTimeoutError) {
if (onTimeout) onTimeout(socketTimeout);
// Put an empty callback at the beginning of the queue so that we
// don't wake another waiting Promise in the finally clause.
this._queue.unshift(() => {});
continue;
}
throw err;
} finally {
if (!keepOpen) {
done();
}
}
}
}
}
class IMAPConnectionPool {
constructor(maxConnectionsPerAccount) {
this._maxConnectionsPerAccount = maxConnectionsPerAccount;
this._poolMap = {};
}
async withConnectionsForAccount(account, {desiredCount, logger, onConnected, onTimeout}) {
if (!this._poolMap[account.id]) {
this._poolMap[account.id] = new AccountConnectionPool(account, this._maxConnectionsPerAccount);
}
const pool = this._poolMap[account.id];
await pool.withConnections({desiredCount, logger, onConnected, onTimeout});
}
}
module.exports = new IMAPConnectionPool(MAX_IMAP_CONNECTIONS_PER_ACCOUNT);