mirror of
https://github.com/Foundry376/Mailspring.git
synced 2025-03-03 19:43:04 +08:00
[local-sync] Stop sync worker before deleting account database
Summary: Various errors are thrown when the sync worker tries accessing a database that we've already deleted, so make sure the sync worker has been stopped before we remove the database. This diff involves modifying `Interruptible` so that `interrupt()` returns a promise that resolves once the interrupt has been completed. Addresses T7472 Test Plan: manual Reviewers: evan, juan Reviewed By: evan, juan Differential Revision: https://phab.nylas.com/D3679
This commit is contained in:
parent
f06ba78d8a
commit
77ad25af24
4 changed files with 36 additions and 16 deletions
|
@ -7,6 +7,8 @@ const manager = require('./sync-process-manager')
|
|||
// Account objects. We want to sync all K2 Accounts, but when an N1 Account is
|
||||
// deleted, we want to delete the K2 account too.
|
||||
|
||||
const deletionsInProgress = new Set();
|
||||
|
||||
async function ensureK2Consistency() {
|
||||
const {Account} = await LocalDatabaseConnector.forShared();
|
||||
const k2Accounts = await Account.findAll();
|
||||
|
@ -16,11 +18,13 @@ async function ensureK2Consistency() {
|
|||
const deletions = [];
|
||||
for (const k2Account of k2Accounts) {
|
||||
const deleted = !n1Emails.includes(k2Account.emailAddress);
|
||||
if (deleted) {
|
||||
if (deleted && !deletionsInProgress.has(k2Account.id)) {
|
||||
console.warn(`Deleting K2 account ID ${k2Account.id} which could not be matched to an N1 account.`)
|
||||
manager.removeWorkerForAccountId(k2Account.id);
|
||||
deletionsInProgress.add(k2Account.id)
|
||||
await manager.removeWorkerForAccountId(k2Account.id);
|
||||
LocalDatabaseConnector.destroyAccountDatabase(k2Account.id);
|
||||
deletions.push(k2Account.destroy());
|
||||
const deletion = k2Account.destroy().then(() => deletionsInProgress.delete(k2Account.id))
|
||||
deletions.push(deletion)
|
||||
}
|
||||
}
|
||||
return await Promise.all(deletions)
|
||||
|
|
|
@ -100,9 +100,9 @@ class SyncProcessManager {
|
|||
}
|
||||
}
|
||||
|
||||
removeWorkerForAccountId(accountId) {
|
||||
async removeWorkerForAccountId(accountId) {
|
||||
if (this._workers[accountId]) {
|
||||
this._workers[accountId].cleanup();
|
||||
await this._workers[accountId].cleanup();
|
||||
this._workers[accountId] = null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ class SyncWorker {
|
|||
this._logger = global.Logger.forAccount(account)
|
||||
this._interrupted = false
|
||||
this._syncInProgress = false
|
||||
this._stopped = false
|
||||
this._destroyed = false
|
||||
this._shouldIgnoreInboxFlagUpdates = false
|
||||
|
||||
|
@ -283,6 +284,7 @@ class SyncWorker {
|
|||
}
|
||||
|
||||
async _scheduleNextSync() {
|
||||
if (this._stopped) { return; }
|
||||
const {intervals} = this._account.syncPolicy;
|
||||
const {Folder} = this._db;
|
||||
|
||||
|
@ -372,6 +374,7 @@ class SyncWorker {
|
|||
// Public API:
|
||||
|
||||
async syncNow({reason, interrupt = false} = {}) {
|
||||
if (this._stopped) { return }
|
||||
if (this._syncInProgress) {
|
||||
if (interrupt) {
|
||||
this.interrupt({reason})
|
||||
|
@ -407,25 +410,27 @@ class SyncWorker {
|
|||
}
|
||||
}
|
||||
|
||||
interrupt({reason = 'No reason'} = {}) {
|
||||
async interrupt({reason = 'No reason'} = {}) {
|
||||
console.log(`🔃 Interrupting sync! Reason: ${reason}`)
|
||||
this._interruptible.interrupt()
|
||||
await this._interruptible.interrupt()
|
||||
if (this._currentTask) {
|
||||
this._currentTask.interrupt()
|
||||
await this._currentTask.interrupt()
|
||||
}
|
||||
this._interrupted = true
|
||||
}
|
||||
|
||||
async stopSync() {
|
||||
this._stopped = true
|
||||
clearTimeout(this._syncTimer);
|
||||
this._syncTimer = null;
|
||||
if (this._syncInProgress) {
|
||||
return this.interrupt({reason: "Sync stopped"})
|
||||
}
|
||||
return Promise.resolve()
|
||||
}
|
||||
|
||||
cleanup() {
|
||||
clearTimeout(this._syncTimer);
|
||||
this._syncTimer = null;
|
||||
async cleanup() {
|
||||
await this.stopSync()
|
||||
this._destroyed = true;
|
||||
this._closeConnections()
|
||||
}
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
const {EventEmitter} = require('events')
|
||||
|
||||
/**
|
||||
* Interruptible objects allow you to run and interrupt functions by using
|
||||
|
@ -31,13 +32,20 @@
|
|||
* interruptible.interrupt()
|
||||
* ```
|
||||
*/
|
||||
class Interruptible {
|
||||
class Interruptible extends EventEmitter {
|
||||
constructor() {
|
||||
this._interrupted = false
|
||||
super()
|
||||
this._interrupt = false
|
||||
this._running = false
|
||||
}
|
||||
|
||||
interrupt() {
|
||||
this._interrupted = true
|
||||
if (!this._running) { return Promise.resolve() }
|
||||
|
||||
// Start listening before the interrupt, so we don't miss the 'interrupted' event
|
||||
const promise = new Promise((resolve) => this.once('interrupted', resolve))
|
||||
this._interrupt = true
|
||||
return promise
|
||||
}
|
||||
|
||||
// This function executes the generator object through completion or until we
|
||||
|
@ -85,7 +93,8 @@ class Interruptible {
|
|||
|
||||
// Advance until done
|
||||
while (!step.done) {
|
||||
if (this._interrupted) {
|
||||
if (this._interrupt) {
|
||||
this.emit('interrupted')
|
||||
console.log('Operation Interrupted')
|
||||
return resolve()
|
||||
}
|
||||
|
@ -104,9 +113,11 @@ class Interruptible {
|
|||
* the generator function throws an error at any point.
|
||||
*/
|
||||
async run(generatorFunc, ctx, ...fnArgs) {
|
||||
this._running = true
|
||||
const generatorObj = generatorFunc.call(ctx, ...fnArgs)
|
||||
await this._runGenerator(generatorObj)
|
||||
this._interrupted = false
|
||||
this._interrupt = false
|
||||
this._running = false
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue