mirror of
https://github.com/Foundry376/Mailspring.git
synced 2025-02-25 08:35:16 +08:00
[client-app] Ensure sync worker can never continue running after being destroyed
Summary: Previously, it was possible for the sync worker to continue running after being interrupted, e.g. it would break out of `performSync` and then try to run `onSyncCompleted`. This is fine if we were just interrupting to restart the loop, but when we stop it we don't want it to continue running anything at all. This also refactors the syncworker to have a single `destroy` method, which sets a `destroyed` flag and uses that one exclusively instead of the `stopped` flag. Test Plan: manually check it works Reviewers: spang, halla, mark, evan Reviewed By: mark, evan Differential Revision: https://phab.nylas.com/D4307
This commit is contained in:
parent
58cf02a824
commit
9181914453
2 changed files with 43 additions and 24 deletions
|
@ -70,7 +70,7 @@ class SyncProcessManager {
|
|||
try {
|
||||
try {
|
||||
await Promise.all(
|
||||
this.workers().map(w => w.stopSync())
|
||||
this.workers().map(w => w.destroy())
|
||||
)
|
||||
.timeout(500, 'Timed out while trying to stop sync')
|
||||
} catch (err) {
|
||||
|
@ -178,11 +178,10 @@ class SyncProcessManager {
|
|||
async removeWorkerForAccountId(accountId) {
|
||||
if (this._workersByAccountId[accountId]) {
|
||||
try {
|
||||
await this._workersByAccountId[accountId].cleanup().timeout(500)
|
||||
await this._workersByAccountId[accountId].destroy({timeout: 500})
|
||||
} catch (err) {
|
||||
err.message = `Error while cleaning up sync worker: ${err.message}`
|
||||
NylasEnv.reportError(err)
|
||||
// Continue with local cleanup
|
||||
}
|
||||
this._workersByAccountId[accountId] = null;
|
||||
}
|
||||
|
|
|
@ -45,7 +45,6 @@ class SyncWorker {
|
|||
this._interrupted = false
|
||||
this._syncInProgress = false
|
||||
this._throttlingEnabled = false
|
||||
this._stopped = false
|
||||
this._destroyed = false
|
||||
this._shouldIgnoreInboxFlagUpdates = false
|
||||
this._numTimeoutErrors = 0;
|
||||
|
@ -112,6 +111,7 @@ class SyncWorker {
|
|||
}
|
||||
|
||||
async _cleanupOrphanMessages() {
|
||||
if (this._destroyed) { return null }
|
||||
const {Message, Thread, Folder, Label} = this._db;
|
||||
|
||||
const messagesWithoutFolder = await Message.findAll({
|
||||
|
@ -161,6 +161,7 @@ class SyncWorker {
|
|||
}
|
||||
|
||||
async _ensureAccessToken() {
|
||||
if (this._destroyed) { return null }
|
||||
if (this._account.provider !== 'gmail') {
|
||||
return null;
|
||||
}
|
||||
|
@ -229,6 +230,7 @@ class SyncWorker {
|
|||
}
|
||||
|
||||
async _ensureSMTPConnection() {
|
||||
if (this._destroyed) { return }
|
||||
const newCredentials = await this._ensureAccessToken();
|
||||
if (!this._smtp || newCredentials) {
|
||||
this._smtp = new SendmailClient(this._account, this._logger)
|
||||
|
@ -236,6 +238,7 @@ class SyncWorker {
|
|||
}
|
||||
|
||||
async _ensureIMAPConnection(conn) {
|
||||
if (this._destroyed) { return }
|
||||
if (this._conn === conn) {
|
||||
return;
|
||||
}
|
||||
|
@ -247,6 +250,7 @@ class SyncWorker {
|
|||
}
|
||||
|
||||
async _ensureIMAPMailListenerConnection() {
|
||||
if (this._destroyed) { return }
|
||||
if (this._mailListenerConn) {
|
||||
return;
|
||||
}
|
||||
|
@ -283,6 +287,7 @@ class SyncWorker {
|
|||
}, 100)
|
||||
|
||||
async _listenForNewMail() {
|
||||
if (this._destroyed) { return }
|
||||
this._logger.log('🔃 Listening for new mail...')
|
||||
// Open the inbox folder on our dedicated mail listener connection to listen
|
||||
// to new mail events
|
||||
|
@ -318,6 +323,7 @@ class SyncWorker {
|
|||
}
|
||||
|
||||
async _onSyncError(error) {
|
||||
if (this._destroyed) { return }
|
||||
try {
|
||||
this._disposeConnections();
|
||||
this._logger.error(`🔃 SyncWorker: Errored while syncing account`, error)
|
||||
|
@ -387,6 +393,7 @@ class SyncWorker {
|
|||
}
|
||||
|
||||
async _onSyncDidComplete() {
|
||||
if (this._destroyed) { return; }
|
||||
const now = Date.now();
|
||||
|
||||
// Save metrics to the account object
|
||||
|
@ -412,10 +419,10 @@ class SyncWorker {
|
|||
}
|
||||
|
||||
async _scheduleNextSync(error) {
|
||||
if (this._destroyed) { return; }
|
||||
let reason;
|
||||
let interval;
|
||||
try {
|
||||
if (this._stopped) { return; }
|
||||
const {Folder} = this._db;
|
||||
|
||||
const folders = await Folder.findAll();
|
||||
|
@ -467,6 +474,7 @@ class SyncWorker {
|
|||
}
|
||||
|
||||
async _runTask(task) {
|
||||
if (this._destroyed) { return }
|
||||
this._currentTask = task
|
||||
await this._conn.runOperation(this._currentTask, {syncWorker: this})
|
||||
this._currentTask = null
|
||||
|
@ -474,6 +482,7 @@ class SyncWorker {
|
|||
|
||||
// This function is interruptible. See Interruptible
|
||||
async * _performSync() {
|
||||
if (this._destroyed) { return }
|
||||
const accountId = this._account.id
|
||||
SyncActivity.reportSyncActivity(accountId, "Starting worker sync")
|
||||
yield this._account.update({syncError: null});
|
||||
|
@ -544,7 +553,7 @@ class SyncWorker {
|
|||
// Public API:
|
||||
|
||||
async syncNow({reason, interrupt = false} = {}) {
|
||||
if (this._stopped) { return }
|
||||
if (this._destroyed) { return }
|
||||
if (this._syncInProgress) {
|
||||
if (interrupt) {
|
||||
this.interrupt({reason})
|
||||
|
@ -596,29 +605,40 @@ class SyncWorker {
|
|||
}
|
||||
}
|
||||
|
||||
async interrupt({reason = 'No reason'} = {}) {
|
||||
this._logger.log(`🔃 ✋ Interrupting sync! Reason: ${reason}`)
|
||||
const interruptPromises = [this._interruptible.interrupt()]
|
||||
if (this._currentTask) {
|
||||
interruptPromises.push(this._currentTask.interrupt())
|
||||
}
|
||||
await Promise.all(interruptPromises)
|
||||
this._interrupted = true
|
||||
interrupt({reason = 'No reason'} = {}) {
|
||||
// We wrap this in a promise and don't use `async` keyword to make sure this
|
||||
// returns a Bluebird promise that can be timed out
|
||||
return new Promise(async (resolve) => {
|
||||
try {
|
||||
this._logger.log(`🔃 ✋ Interrupting sync! Reason: ${reason}`)
|
||||
const interruptPromises = [this._interruptible.interrupt()]
|
||||
if (this._currentTask) {
|
||||
interruptPromises.push(this._currentTask.interrupt())
|
||||
}
|
||||
await Promise.all(interruptPromises)
|
||||
resolve()
|
||||
} finally {
|
||||
this._interrupted = true
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async stopSync() {
|
||||
this._stopped = true
|
||||
async destroy({timeout} = {}) {
|
||||
this._destroyed = true;
|
||||
clearTimeout(this._syncTimer);
|
||||
this._syncTimer = null;
|
||||
if (this._syncInProgress) {
|
||||
return this.interrupt({reason: "Sync stopped"})
|
||||
try {
|
||||
if (this._syncInProgress) {
|
||||
let promise = this.interrupt({reason: "Sync stopped"})
|
||||
if (timeout) {
|
||||
promise = promise.timeout(timeout, 'Interrupt timed out while destroying worker')
|
||||
}
|
||||
await promise
|
||||
}
|
||||
} catch (err) {
|
||||
err.message = `Error destroying sync worker: ${err.message}`
|
||||
NylasEnv.reportError(err)
|
||||
}
|
||||
return Promise.resolve()
|
||||
}
|
||||
|
||||
async cleanup() {
|
||||
await this.stopSync()
|
||||
this._destroyed = true;
|
||||
this._disposeConnections()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue