From 6bd62423983b4577cd51c022883d921827c31d92 Mon Sep 17 00:00:00 2001 From: Mark Hahnenberg Date: Fri, 17 Feb 2017 13:56:25 -0800 Subject: [PATCH] [client-sync] Use IMAPConnectionPool in client sync worker Summary: See title Test Plan: Run locally, make sure sync still works Reviewers: evan, spang, juan Reviewed By: spang, juan Differential Revision: https://phab.nylas.com/D3998 --- .../src/local-sync-worker/sync-worker.es6 | 140 ++++++------------ 1 file changed, 46 insertions(+), 94 deletions(-) diff --git a/packages/client-sync/src/local-sync-worker/sync-worker.es6 b/packages/client-sync/src/local-sync-worker/sync-worker.es6 index 84a2e4a16..553dc23ff 100644 --- a/packages/client-sync/src/local-sync-worker/sync-worker.es6 +++ b/packages/client-sync/src/local-sync-worker/sync-worker.es6 @@ -2,6 +2,7 @@ const _ = require('underscore') const { IMAPErrors, IMAPConnection, + IMAPConnectionPool, SendmailClient, MetricsReporter, } = require('isomorphic-core'); @@ -46,7 +47,6 @@ class SyncWorker { this._shouldIgnoreInboxFlagUpdates = false this._numRetries = 0; this._numTimeoutErrors = 0; - this._socketTimeout = IMAPConnection.DefaultSocketTimeout; this._requireTokenRefresh = false this._batchProcessedUids = new Map(); @@ -148,7 +148,7 @@ class SyncWorker { async _ensureAccessToken() { if (this._account.provider !== 'gmail') { - return null + return null; } try { @@ -214,74 +214,31 @@ class SyncWorker { } } - _getIMAPSocketTimeout() { - return Math.min( - this._socketTimeout + (this._socketTimeout * (2 ** this._numTimeoutErrors)), - MAX_SOCKET_TIMEOUT_MS - ) + async _ensureSMTPConnection() { + const newCredentials = await this._ensureAccessToken(); + if (!this._smtp || newCredentials) { + this._smtp = new SendmailClient(this._account, this._logger) + } } - async _ensureConnection() { - const newCredentials = await this._ensureAccessToken() - - if (!newCredentials && this._conn && this._smtp) { - // We already have a connection and we don't need to update the - // credentials - return this._conn.connect(); + async _ensureIMAPConnection(conn) { + if (this._conn === conn) { + return; } - if (newCredentials) { - this._logger.info("Refreshed and updated access token."); - } - - const settings = this._account.connectionSettings; - const credentials = newCredentials || this._account.decryptedCredentials(); - - if (!settings || !settings.imap_host) { - throw new Error("_ensureConnection: There are no IMAP connection settings for this account."); - } - if (!settings.smtp_host && !settings.smtp_custom_config) { - throw new Error("_ensureConnection: There are no SMTP connection settings for this account."); - } - if (!credentials) { - throw new Error("_ensureConnection: There are no IMAP/SMTP connection credentials for this account."); - } - - this._smtp = new SendmailClient(this._account, this._logger) - - this._socketTimeout = this._getIMAPSocketTimeout() - const conn = new IMAPConnection({ - db: this._db, - settings: Object.assign({}, settings, credentials, {socketTimeout: this._socketTimeout}), - logger: this._logger, - account: this._account, - }); - conn.on('queue-empty', () => {}); this._conn = conn; - return this._conn.connect(); + this._conn._db = this._db; } - async _ensureMailListenerConnection() { - const newCredentials = await this._ensureAccessToken() - - if (!newCredentials && this._mailListenerConn) { - // We already have a connection and we don't need to update the - // credentials - return this._mailListenerConn.connect(); + async _ensureIMAPMailListenerConnection(conn) { + if (this._mailListenerConn === conn) { + return; } - const settings = this._account.connectionSettings; - const credentials = newCredentials || this._account.decryptedCredentials(); - - this._socketTimeout = this._getIMAPSocketTimeout() - const conn = new IMAPConnection({ - db: this._db, - settings: Object.assign({}, settings, credentials, {socketTimeout: this._socketTimeout}), - logger: this._logger, - account: this._account, - }); + this._mailListenerConn = conn; + this._mailListenerConn._db = this._db; conn.on('mail', () => { this._onInboxUpdates(`You've got mail`); @@ -294,9 +251,6 @@ class SyncWorker { if (this._shouldIgnoreInboxFlagUpdates) { return; } this._onInboxUpdates(`There are flag updates on the inbox`); }) - - this._mailListenerConn = conn; - return this._mailListenerConn.connect(); } async _listenForNewMail() { @@ -312,18 +266,11 @@ class SyncWorker { this.syncNow({reason, interrupt: true}); }, 100) - _closeConnections() { - if (this._conn) { - this._conn.end(); - } - if (this._mailListenerConn) { - this._mailListenerConn.end() - } - this._conn = null - this._mailListenerConn = null + _clearConnections() { + this._conn = null; + this._mailListenerConn = null; } - async _getFoldersToSync() { const {Folder} = this._db; @@ -341,10 +288,10 @@ class SyncWorker { } async _onSyncError(error) { - this._closeConnections() + this._clearConnections(); this._logger.error(`🔃 SyncWorker: Errored while syncing account`, error) - // Step 1 Check if we encountered an expired token error + // Check if we encountered an expired token error. // We try to refresh Google OAuth2 access tokens in advance, but sometimes // it doesn't work (e.g. the token expires during the sync loop). In this // case, we need to immediately restart the sync loop & refresh the token. @@ -362,27 +309,15 @@ class SyncWorker { return } - // Step 2 Check if is we encountered an IMAP timeout error to be able to set - // an appropriate socket timeout - if (error instanceof IMAPErrors.IMAPConnectionTimeoutError) { - this._numTimeoutErrors += 1; - Actions.recordUserEvent('Timeout error in sync loop', { - accountId: this._account.id, - provider: this._account.provider, - socketTimeout: this._socketTimeout, - numTimeoutErrors: this._numTimeoutErrors, - }) - } - - // Step 3 Check if we've encountered a retryable/network error - // If so, we don't want to save the error to the account, which will cause a - // red box to show up + // Check if we've encountered a retryable/network error. + // If so, we don't want to save the error to the account, which will cause + // a red box to show up. if (error instanceof IMAPErrors.RetryableError) { this._numRetries += 1; return } - // Step 4 Update account error state + // Update account error state const errorJSON = error.toJSON() const fingerprint = ["{{ default }}", "sync loop", error.message]; NylasEnv.reportError(error, {fingerprint: fingerprint}); @@ -489,8 +424,6 @@ class SyncWorker { // This function is interruptible. See Interruptible async * _performSync() { yield this._account.update({syncError: null}); - yield this._ensureConnection(); - yield this._ensureMailListenerConnection(); const syncbackTaskRunner = new SyncbackTaskRunner({ db: this._db, @@ -575,7 +508,25 @@ class SyncWorker { this._logger.log(`🔃 🆕 Reason: ${reason}`) let error; try { - await this._interruptible.run(this._performSync, this) + await this._ensureSMTPConnection(); + await IMAPConnectionPool.withConnectionsForAccount(this._account, { + desiredCount: 2, + logger: this._logger, + onConnected: async ([mainConn, listenerConn]) => { + await this._ensureIMAPConnection(mainConn); + await this._ensureIMAPMailListenerConnection(listenerConn); + await this._interruptible.run(this._performSync, this) + }, + onTimeout: (socketTimeout) => { + this._numTimeoutErrors += 1; + Actions.recordUserEvent('Timeout error in sync loop', { + accountId: this._account.id, + provider: this._account.provider, + socketTimeout, + numTimeoutErrors: this._numTimeoutErrors, + }) + }, + }); await this._cleanupOrphanMessages(); await this._onSyncDidComplete(); this._numRetries = 0; @@ -586,6 +537,7 @@ class SyncWorker { } finally { this._lastSyncTime = Date.now() this._syncInProgress = false + this._clearConnections(); await this._scheduleNextSync(error) } } @@ -613,7 +565,7 @@ class SyncWorker { async cleanup() { await this.stopSync() this._destroyed = true; - this._closeConnections() + this._clearConnections() } }