diff --git a/packages/local-sync/src/local-sync-worker/index.js b/packages/local-sync/src/local-sync-worker/index.js index 94d83cf9b..d6a2ef8c5 100644 --- a/packages/local-sync/src/local-sync-worker/index.js +++ b/packages/local-sync/src/local-sync-worker/index.js @@ -7,6 +7,8 @@ const manager = require('./sync-process-manager') // Account objects. We want to sync all K2 Accounts, but when an N1 Account is // deleted, we want to delete the K2 account too. +const deletionsInProgress = new Set(); + async function ensureK2Consistency() { const {Account} = await LocalDatabaseConnector.forShared(); const k2Accounts = await Account.findAll(); @@ -16,11 +18,13 @@ async function ensureK2Consistency() { const deletions = []; for (const k2Account of k2Accounts) { const deleted = !n1Emails.includes(k2Account.emailAddress); - if (deleted) { + if (deleted && !deletionsInProgress.has(k2Account.id)) { console.warn(`Deleting K2 account ID ${k2Account.id} which could not be matched to an N1 account.`) - manager.removeWorkerForAccountId(k2Account.id); + deletionsInProgress.add(k2Account.id) + await manager.removeWorkerForAccountId(k2Account.id); LocalDatabaseConnector.destroyAccountDatabase(k2Account.id); - deletions.push(k2Account.destroy()); + const deletion = k2Account.destroy().then(() => deletionsInProgress.delete(k2Account.id)) + deletions.push(deletion) } } return await Promise.all(deletions) diff --git a/packages/local-sync/src/local-sync-worker/sync-process-manager.js b/packages/local-sync/src/local-sync-worker/sync-process-manager.js index 6ea5bbeff..0e1c6009a 100644 --- a/packages/local-sync/src/local-sync-worker/sync-process-manager.js +++ b/packages/local-sync/src/local-sync-worker/sync-process-manager.js @@ -100,9 +100,9 @@ class SyncProcessManager { } } - removeWorkerForAccountId(accountId) { + async removeWorkerForAccountId(accountId) { if (this._workers[accountId]) { - this._workers[accountId].cleanup(); + await this._workers[accountId].cleanup(); this._workers[accountId] = null; } } diff --git a/packages/local-sync/src/local-sync-worker/sync-worker.es6 b/packages/local-sync/src/local-sync-worker/sync-worker.es6 index 7b817f5b6..5c9ae88da 100644 --- a/packages/local-sync/src/local-sync-worker/sync-worker.es6 +++ b/packages/local-sync/src/local-sync-worker/sync-worker.es6 @@ -31,6 +31,7 @@ class SyncWorker { this._logger = global.Logger.forAccount(account) this._interrupted = false this._syncInProgress = false + this._stopped = false this._destroyed = false this._shouldIgnoreInboxFlagUpdates = false @@ -283,6 +284,7 @@ class SyncWorker { } async _scheduleNextSync() { + if (this._stopped) { return; } const {intervals} = this._account.syncPolicy; const {Folder} = this._db; @@ -372,6 +374,7 @@ class SyncWorker { // Public API: async syncNow({reason, interrupt = false} = {}) { + if (this._stopped) { return } if (this._syncInProgress) { if (interrupt) { this.interrupt({reason}) @@ -407,25 +410,27 @@ class SyncWorker { } } - interrupt({reason = 'No reason'} = {}) { + async interrupt({reason = 'No reason'} = {}) { console.log(`🔃 Interrupting sync! Reason: ${reason}`) - this._interruptible.interrupt() + await this._interruptible.interrupt() if (this._currentTask) { - this._currentTask.interrupt() + await this._currentTask.interrupt() } this._interrupted = true } async stopSync() { + this._stopped = true + clearTimeout(this._syncTimer); + this._syncTimer = null; if (this._syncInProgress) { return this.interrupt({reason: "Sync stopped"}) } return Promise.resolve() } - cleanup() { - clearTimeout(this._syncTimer); - this._syncTimer = null; + async cleanup() { + await this.stopSync() this._destroyed = true; this._closeConnections() } diff --git a/packages/local-sync/src/shared/interruptible.js b/packages/local-sync/src/shared/interruptible.js index ef1e2b781..94f40b917 100644 --- a/packages/local-sync/src/shared/interruptible.js +++ b/packages/local-sync/src/shared/interruptible.js @@ -1,3 +1,4 @@ +const {EventEmitter} = require('events') /** * Interruptible objects allow you to run and interrupt functions by using @@ -31,13 +32,20 @@ * interruptible.interrupt() * ``` */ -class Interruptible { +class Interruptible extends EventEmitter { constructor() { - this._interrupted = false + super() + this._interrupt = false + this._running = false } interrupt() { - this._interrupted = true + if (!this._running) { return Promise.resolve() } + + // Start listening before the interrupt, so we don't miss the 'interrupted' event + const promise = new Promise((resolve) => this.once('interrupted', resolve)) + this._interrupt = true + return promise } // This function executes the generator object through completion or until we @@ -85,7 +93,8 @@ class Interruptible { // Advance until done while (!step.done) { - if (this._interrupted) { + if (this._interrupt) { + this.emit('interrupted') console.log('Operation Interrupted') return resolve() } @@ -104,9 +113,11 @@ class Interruptible { * the generator function throws an error at any point. */ async run(generatorFunc, ctx, ...fnArgs) { + this._running = true const generatorObj = generatorFunc.call(ctx, ...fnArgs) await this._runGenerator(generatorObj) - this._interrupted = false + this._interrupt = false + this._running = false } }