mirror of
https://github.com/Foundry376/Mailspring.git
synced 2024-09-22 00:06:06 +08:00
[client-sync] Use IMAPConnectionPool in client sync worker
Summary: See title Test Plan: Run locally, make sure sync still works Reviewers: evan, spang, juan Reviewed By: spang, juan Differential Revision: https://phab.nylas.com/D3998
This commit is contained in:
parent
e991349708
commit
6bd6242398
|
@ -2,6 +2,7 @@ const _ = require('underscore')
|
||||||
const {
|
const {
|
||||||
IMAPErrors,
|
IMAPErrors,
|
||||||
IMAPConnection,
|
IMAPConnection,
|
||||||
|
IMAPConnectionPool,
|
||||||
SendmailClient,
|
SendmailClient,
|
||||||
MetricsReporter,
|
MetricsReporter,
|
||||||
} = require('isomorphic-core');
|
} = require('isomorphic-core');
|
||||||
|
@ -46,7 +47,6 @@ class SyncWorker {
|
||||||
this._shouldIgnoreInboxFlagUpdates = false
|
this._shouldIgnoreInboxFlagUpdates = false
|
||||||
this._numRetries = 0;
|
this._numRetries = 0;
|
||||||
this._numTimeoutErrors = 0;
|
this._numTimeoutErrors = 0;
|
||||||
this._socketTimeout = IMAPConnection.DefaultSocketTimeout;
|
|
||||||
this._requireTokenRefresh = false
|
this._requireTokenRefresh = false
|
||||||
this._batchProcessedUids = new Map();
|
this._batchProcessedUids = new Map();
|
||||||
|
|
||||||
|
@ -148,7 +148,7 @@ class SyncWorker {
|
||||||
|
|
||||||
async _ensureAccessToken() {
|
async _ensureAccessToken() {
|
||||||
if (this._account.provider !== 'gmail') {
|
if (this._account.provider !== 'gmail') {
|
||||||
return null
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -214,74 +214,31 @@ class SyncWorker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_getIMAPSocketTimeout() {
|
async _ensureSMTPConnection() {
|
||||||
return Math.min(
|
const newCredentials = await this._ensureAccessToken();
|
||||||
this._socketTimeout + (this._socketTimeout * (2 ** this._numTimeoutErrors)),
|
if (!this._smtp || newCredentials) {
|
||||||
MAX_SOCKET_TIMEOUT_MS
|
this._smtp = new SendmailClient(this._account, this._logger)
|
||||||
)
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async _ensureConnection() {
|
async _ensureIMAPConnection(conn) {
|
||||||
const newCredentials = await this._ensureAccessToken()
|
if (this._conn === conn) {
|
||||||
|
return;
|
||||||
if (!newCredentials && this._conn && this._smtp) {
|
|
||||||
// We already have a connection and we don't need to update the
|
|
||||||
// credentials
|
|
||||||
return this._conn.connect();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (newCredentials) {
|
|
||||||
this._logger.info("Refreshed and updated access token.");
|
|
||||||
}
|
|
||||||
|
|
||||||
const settings = this._account.connectionSettings;
|
|
||||||
const credentials = newCredentials || this._account.decryptedCredentials();
|
|
||||||
|
|
||||||
if (!settings || !settings.imap_host) {
|
|
||||||
throw new Error("_ensureConnection: There are no IMAP connection settings for this account.");
|
|
||||||
}
|
|
||||||
if (!settings.smtp_host && !settings.smtp_custom_config) {
|
|
||||||
throw new Error("_ensureConnection: There are no SMTP connection settings for this account.");
|
|
||||||
}
|
|
||||||
if (!credentials) {
|
|
||||||
throw new Error("_ensureConnection: There are no IMAP/SMTP connection credentials for this account.");
|
|
||||||
}
|
|
||||||
|
|
||||||
this._smtp = new SendmailClient(this._account, this._logger)
|
|
||||||
|
|
||||||
this._socketTimeout = this._getIMAPSocketTimeout()
|
|
||||||
const conn = new IMAPConnection({
|
|
||||||
db: this._db,
|
|
||||||
settings: Object.assign({}, settings, credentials, {socketTimeout: this._socketTimeout}),
|
|
||||||
logger: this._logger,
|
|
||||||
account: this._account,
|
|
||||||
});
|
|
||||||
|
|
||||||
conn.on('queue-empty', () => {});
|
conn.on('queue-empty', () => {});
|
||||||
|
|
||||||
this._conn = conn;
|
this._conn = conn;
|
||||||
return this._conn.connect();
|
this._conn._db = this._db;
|
||||||
}
|
}
|
||||||
|
|
||||||
async _ensureMailListenerConnection() {
|
async _ensureIMAPMailListenerConnection(conn) {
|
||||||
const newCredentials = await this._ensureAccessToken()
|
if (this._mailListenerConn === conn) {
|
||||||
|
return;
|
||||||
if (!newCredentials && this._mailListenerConn) {
|
|
||||||
// We already have a connection and we don't need to update the
|
|
||||||
// credentials
|
|
||||||
return this._mailListenerConn.connect();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const settings = this._account.connectionSettings;
|
this._mailListenerConn = conn;
|
||||||
const credentials = newCredentials || this._account.decryptedCredentials();
|
this._mailListenerConn._db = this._db;
|
||||||
|
|
||||||
this._socketTimeout = this._getIMAPSocketTimeout()
|
|
||||||
const conn = new IMAPConnection({
|
|
||||||
db: this._db,
|
|
||||||
settings: Object.assign({}, settings, credentials, {socketTimeout: this._socketTimeout}),
|
|
||||||
logger: this._logger,
|
|
||||||
account: this._account,
|
|
||||||
});
|
|
||||||
|
|
||||||
conn.on('mail', () => {
|
conn.on('mail', () => {
|
||||||
this._onInboxUpdates(`You've got mail`);
|
this._onInboxUpdates(`You've got mail`);
|
||||||
|
@ -294,9 +251,6 @@ class SyncWorker {
|
||||||
if (this._shouldIgnoreInboxFlagUpdates) { return; }
|
if (this._shouldIgnoreInboxFlagUpdates) { return; }
|
||||||
this._onInboxUpdates(`There are flag updates on the inbox`);
|
this._onInboxUpdates(`There are flag updates on the inbox`);
|
||||||
})
|
})
|
||||||
|
|
||||||
this._mailListenerConn = conn;
|
|
||||||
return this._mailListenerConn.connect();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async _listenForNewMail() {
|
async _listenForNewMail() {
|
||||||
|
@ -312,18 +266,11 @@ class SyncWorker {
|
||||||
this.syncNow({reason, interrupt: true});
|
this.syncNow({reason, interrupt: true});
|
||||||
}, 100)
|
}, 100)
|
||||||
|
|
||||||
_closeConnections() {
|
_clearConnections() {
|
||||||
if (this._conn) {
|
this._conn = null;
|
||||||
this._conn.end();
|
this._mailListenerConn = null;
|
||||||
}
|
|
||||||
if (this._mailListenerConn) {
|
|
||||||
this._mailListenerConn.end()
|
|
||||||
}
|
|
||||||
this._conn = null
|
|
||||||
this._mailListenerConn = null
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
async _getFoldersToSync() {
|
async _getFoldersToSync() {
|
||||||
const {Folder} = this._db;
|
const {Folder} = this._db;
|
||||||
|
|
||||||
|
@ -341,10 +288,10 @@ class SyncWorker {
|
||||||
}
|
}
|
||||||
|
|
||||||
async _onSyncError(error) {
|
async _onSyncError(error) {
|
||||||
this._closeConnections()
|
this._clearConnections();
|
||||||
this._logger.error(`🔃 SyncWorker: Errored while syncing account`, error)
|
this._logger.error(`🔃 SyncWorker: Errored while syncing account`, error)
|
||||||
|
|
||||||
// Step 1 Check if we encountered an expired token error
|
// Check if we encountered an expired token error.
|
||||||
// We try to refresh Google OAuth2 access tokens in advance, but sometimes
|
// We try to refresh Google OAuth2 access tokens in advance, but sometimes
|
||||||
// it doesn't work (e.g. the token expires during the sync loop). In this
|
// it doesn't work (e.g. the token expires during the sync loop). In this
|
||||||
// case, we need to immediately restart the sync loop & refresh the token.
|
// case, we need to immediately restart the sync loop & refresh the token.
|
||||||
|
@ -362,27 +309,15 @@ class SyncWorker {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step 2 Check if is we encountered an IMAP timeout error to be able to set
|
// Check if we've encountered a retryable/network error.
|
||||||
// an appropriate socket timeout
|
// If so, we don't want to save the error to the account, which will cause
|
||||||
if (error instanceof IMAPErrors.IMAPConnectionTimeoutError) {
|
// a red box to show up.
|
||||||
this._numTimeoutErrors += 1;
|
|
||||||
Actions.recordUserEvent('Timeout error in sync loop', {
|
|
||||||
accountId: this._account.id,
|
|
||||||
provider: this._account.provider,
|
|
||||||
socketTimeout: this._socketTimeout,
|
|
||||||
numTimeoutErrors: this._numTimeoutErrors,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Step 3 Check if we've encountered a retryable/network error
|
|
||||||
// If so, we don't want to save the error to the account, which will cause a
|
|
||||||
// red box to show up
|
|
||||||
if (error instanceof IMAPErrors.RetryableError) {
|
if (error instanceof IMAPErrors.RetryableError) {
|
||||||
this._numRetries += 1;
|
this._numRetries += 1;
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step 4 Update account error state
|
// Update account error state
|
||||||
const errorJSON = error.toJSON()
|
const errorJSON = error.toJSON()
|
||||||
const fingerprint = ["{{ default }}", "sync loop", error.message];
|
const fingerprint = ["{{ default }}", "sync loop", error.message];
|
||||||
NylasEnv.reportError(error, {fingerprint: fingerprint});
|
NylasEnv.reportError(error, {fingerprint: fingerprint});
|
||||||
|
@ -489,8 +424,6 @@ class SyncWorker {
|
||||||
// This function is interruptible. See Interruptible
|
// This function is interruptible. See Interruptible
|
||||||
async * _performSync() {
|
async * _performSync() {
|
||||||
yield this._account.update({syncError: null});
|
yield this._account.update({syncError: null});
|
||||||
yield this._ensureConnection();
|
|
||||||
yield this._ensureMailListenerConnection();
|
|
||||||
|
|
||||||
const syncbackTaskRunner = new SyncbackTaskRunner({
|
const syncbackTaskRunner = new SyncbackTaskRunner({
|
||||||
db: this._db,
|
db: this._db,
|
||||||
|
@ -575,7 +508,25 @@ class SyncWorker {
|
||||||
this._logger.log(`🔃 🆕 Reason: ${reason}`)
|
this._logger.log(`🔃 🆕 Reason: ${reason}`)
|
||||||
let error;
|
let error;
|
||||||
try {
|
try {
|
||||||
await this._interruptible.run(this._performSync, this)
|
await this._ensureSMTPConnection();
|
||||||
|
await IMAPConnectionPool.withConnectionsForAccount(this._account, {
|
||||||
|
desiredCount: 2,
|
||||||
|
logger: this._logger,
|
||||||
|
onConnected: async ([mainConn, listenerConn]) => {
|
||||||
|
await this._ensureIMAPConnection(mainConn);
|
||||||
|
await this._ensureIMAPMailListenerConnection(listenerConn);
|
||||||
|
await this._interruptible.run(this._performSync, this)
|
||||||
|
},
|
||||||
|
onTimeout: (socketTimeout) => {
|
||||||
|
this._numTimeoutErrors += 1;
|
||||||
|
Actions.recordUserEvent('Timeout error in sync loop', {
|
||||||
|
accountId: this._account.id,
|
||||||
|
provider: this._account.provider,
|
||||||
|
socketTimeout,
|
||||||
|
numTimeoutErrors: this._numTimeoutErrors,
|
||||||
|
})
|
||||||
|
},
|
||||||
|
});
|
||||||
await this._cleanupOrphanMessages();
|
await this._cleanupOrphanMessages();
|
||||||
await this._onSyncDidComplete();
|
await this._onSyncDidComplete();
|
||||||
this._numRetries = 0;
|
this._numRetries = 0;
|
||||||
|
@ -586,6 +537,7 @@ class SyncWorker {
|
||||||
} finally {
|
} finally {
|
||||||
this._lastSyncTime = Date.now()
|
this._lastSyncTime = Date.now()
|
||||||
this._syncInProgress = false
|
this._syncInProgress = false
|
||||||
|
this._clearConnections();
|
||||||
await this._scheduleNextSync(error)
|
await this._scheduleNextSync(error)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -613,7 +565,7 @@ class SyncWorker {
|
||||||
async cleanup() {
|
async cleanup() {
|
||||||
await this.stopSync()
|
await this.stopSync()
|
||||||
this._destroyed = true;
|
this._destroyed = true;
|
||||||
this._closeConnections()
|
this._clearConnections()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue