diff --git a/packages/local-sync/src/local-sync-worker/sync-worker.es6 b/packages/local-sync/src/local-sync-worker/sync-worker.es6 index 7b409a0be..c6f8a8d81 100644 --- a/packages/local-sync/src/local-sync-worker/sync-worker.es6 +++ b/packages/local-sync/src/local-sync-worker/sync-worker.es6 @@ -12,7 +12,7 @@ const Interruptible = require('../shared/interruptible') const FetchFolderList = require('./imap/fetch-folder-list') const FetchMessagesInFolder = require('./imap/fetch-messages-in-folder') const SyncMetricsReporter = require('./sync-metrics-reporter'); -const SyncbackTaskWorker = require('./syncback-task-worker'); +const SyncbackTaskFactory = require('./syncback-task-factory'); const LocalSyncDeltaEmitter = require('./local-sync-delta-emitter').default @@ -22,9 +22,8 @@ class SyncWorker { this._manager = parentManager; this._conn = null; this._account = account; - this._currentSyncOperation = null + this._currentOperation = null this._interruptible = new Interruptible() - this._syncbackTaskWorker = new SyncbackTaskWorker(account, db) this._localDeltas = new LocalSyncDeltaEmitter(db, account.id) this._startTime = Date.now(); @@ -178,6 +177,96 @@ class SyncWorker { this._conn = null } + /** + * Returns a list of at most 100 Syncback requests, sorted by creation date + * (older first) and by how they affect message IMAP uids. + * + * We want to make sure that we run the tasks that affect IMAP uids last, and + * that we don't run 2 tasks that will affect the same set of UIDS together, + * i.e. without running a sync loop in between them. + * + * For example, if there's a task to change the labels of a message, and also + * a task to move that message to another folder, we need to run the label + * change /first/, otherwise the message would be moved and it would receive a + * new IMAP uid, and then attempting to change labels with an old uid would + * fail. + */ + async _getNewSyncbackTasks() { + const {SyncbackRequest, Message} = this._db; + const where = { + limit: 100, + where: {status: "NEW"}, + order: [['createdAt', 'ASC']], + }; + + const tasks = await SyncbackRequest.findAll(where) + .map((req) => SyncbackTaskFactory.create(this._account, req)) + + if (tasks.length === 0) { return [] } + + // TODO prioritize Send! + + const tasksToProcess = tasks.filter(t => !t.affectsImapMessageUIDs()) + const tasksAffectingUIDs = tasks.filter(t => t.affectsImapMessageUIDs()) + + const changeFolderTasks = tasksAffectingUIDs.filter(t => + t.description() === 'RenameFolder' || t.description() === 'DeleteFolder' + ) + if (changeFolderTasks.length > 0) { + // If we are renaming or deleting folders, those are the only tasks we + // want to process before executing any other tasks that may change uids. + // These operations may not change the uids of their messages, but we + // can't guarantee it, so to make sure, we will just run these. + const affectedFolderIds = new Set() + changeFolderTasks.forEach((task) => { + const {props: {folderId}} = task.syncbackRequestObject() + if (folderId && !affectedFolderIds.has(folderId)) { + tasksToProcess.push(task) + affectedFolderIds.add(folderId) + } + }) + return tasksToProcess + } + + // Otherwise, make sure that we don't process more than 1 task that will affect + // the UID of the same message + const affectedMessageIds = new Set() + for (const task of tasksAffectingUIDs) { + const {props: {messageId, threadId}} = task.syncbackRequestObject() + if (messageId) { + if (!affectedMessageIds.has(messageId)) { + tasksToProcess.push(task) + affectedMessageIds.add(messageId) + } + } else if (threadId) { + const messageIds = await Message.findAll({where: {threadId}}).map(m => m.id) + const shouldIncludeTask = messageIds.every(id => !affectedMessageIds.has(id)) + if (shouldIncludeTask) { + tasksToProcess.push(task) + messageIds.forEach(id => affectedMessageIds.add(id)) + } + } + } + return tasksToProcess + } + + async _runSyncbackTask(task) { + const syncbackRequest = task.syncbackRequestObject(); + console.log(`🔃 📤 ${task.description()}`, syncbackRequest.props) + try { + const responseJSON = await this._conn.runOperation(task); + syncbackRequest.status = "SUCCEEDED"; + syncbackRequest.responseJSON = responseJSON || {}; + console.log(`🔃 📤 ${task.description()} Succeeded`) + } catch (error) { + syncbackRequest.error = error; + syncbackRequest.status = "FAILED"; + console.error(`🔃 📤 ${task.description()} Failed`, {syncbackRequest: syncbackRequest.toJSON()}) + } finally { + await syncbackRequest.save(); + } + } + async _getFoldersToSync() { const {Folder} = this._db; @@ -237,7 +326,9 @@ class SyncWorker { // Start idling on the inbox const inbox = await this._getInboxFolder(); - await this._conn.openBox(inbox.name); + if (inbox) { + await this._conn.openBox(inbox.name); + } // this._logger.info('SyncWorker: Idling on inbox folder'); } @@ -277,19 +368,30 @@ class SyncWorker { }, nextSyncIn); } + async _runOperation(operation) { + this._currentOperation = operation + await this._conn.runOperation(this._currentOperation) + this._currentOperation = null + } + // This function is interruptible. See Interruptible async * _performSync() { yield this._account.update({syncError: null}); yield this._ensureConnection(); - yield this._syncbackTaskWorker.runNewSyncbackTasks(this._conn); - this._currentSyncOperation = new FetchFolderList(this._account, this._logger) - await this._conn.runOperation(this._currentSyncOperation) - this._currentSyncOperation = null + // Step 1: Run any available syncback tasks + const tasks = yield this._getNewSyncbackTasks() + for (const task of tasks) { + await this._runSyncbackTask(task) + yield // Yield to allow interruption + } - // Yield to allow interruption - yield + // Step 2: Fetch the folder list. We need to run this before syncing folders + // because we need folders to sync! + await this._runOperation(new FetchFolderList(this._account, this._logger)) + yield // Yield to allow interruption + // Step 3: Sync each folder, sorted by inbox first // TODO prioritize syncing all of inbox first if there's a ton of folders (e.g. imap // accounts). If there are many folders, we would only sync the first n // messages in the inbox and not go back to it until we've done the same for @@ -300,12 +402,8 @@ class SyncWorker { const sortedFolders = yield this._getFoldersToSync() const {folderSyncOptions} = this._account.syncPolicy; for (const folder of sortedFolders) { - this._currentSyncOperation = new FetchMessagesInFolder(folder, folderSyncOptions, this._logger) - await this._conn.runOperation(this._currentSyncOperation) - this._currentSyncOperation = null - - // Yield to allow interruption - yield + this._runOperation(new FetchMessagesInFolder(folder, folderSyncOptions, this._logger)) + yield // Yield to allow interruption } } @@ -351,8 +449,8 @@ class SyncWorker { interrupt() { this._interruptible.interrupt() - if (this._currentSyncOperation) { - this._currentSyncOperation.interrupt() + if (this._currentOperation) { + this._currentOperation.interrupt() } this._interrupted = true } diff --git a/packages/local-sync/src/local-sync-worker/syncback-task-worker.js b/packages/local-sync/src/local-sync-worker/syncback-task-worker.js deleted file mode 100644 index c3fa55ca8..000000000 --- a/packages/local-sync/src/local-sync-worker/syncback-task-worker.js +++ /dev/null @@ -1,127 +0,0 @@ -const { - IMAPConnection, -} = require('isomorphic-core'); -const SyncbackTaskFactory = require('./syncback-task-factory') - - -/** - * SyncbackTaskWorker runs newly available syncback requests - */ -class SyncbackTaskWorker { - - constructor(account, db) { - if (!account) { - throw new Error('SyncbackTaskWorker requires an account') - } - if (!db) { - throw new Error('SyncbackTaskWorker requires a db instance') - } - this._account = account - this._db = db - } - - /** - * Returns a list of at most 100 Syncback requests, sorted by creation date - * (older first) and by how they affect message IMAP uids. - * - * We want to make sure that we run the tasks that affect IMAP uids last, and - * that we don't run 2 tasks that will affect the same set of UIDS together, - * i.e. without running a sync loop in between them. - * - * For example, if there's a task to change the labels of a message, and also - * a task to move that message to another folder, we need to run the label - * change /first/, otherwise the message would be moved and it would receive a - * new IMAP uid, and then attempting to change labels with an old uid would - * fail. - */ - async _getNewSyncbackTasks() { - const {SyncbackRequest, Message} = this._db; - const where = { - limit: 100, - where: {status: "NEW"}, - order: [['createdAt', 'ASC']], - }; - - const tasks = await SyncbackRequest.findAll(where) - .map((req) => SyncbackTaskFactory.create(this._account, req)) - - if (tasks.length === 0) { return [] } - - // TODO prioritize Send! - - const tasksToProcess = tasks.filter(t => !t.affectsImapMessageUIDs()) - const tasksAffectingUIDs = tasks.filter(t => t.affectsImapMessageUIDs()) - - const changeFolderTasks = tasksAffectingUIDs.filter(t => - t.description() === 'RenameFolder' || t.description() === 'DeleteFolder' - ) - if (changeFolderTasks.length > 0) { - // If we are renaming or deleting folders, those are the only tasks we - // want to process before executing any other tasks that may change uids. - // These operations may not change the uids of their messages, but we - // can't guarantee it, so to make sure, we will just run these. - const affectedFolderIds = new Set() - changeFolderTasks.forEach((task) => { - const {props: {folderId}} = task.syncbackRequestObject() - if (folderId && !affectedFolderIds.has(folderId)) { - tasksToProcess.push(task) - affectedFolderIds.add(folderId) - } - }) - return tasksToProcess - } - - // Otherwise, make sure that we don't process more than 1 task that will affect - // the UID of the same message - const affectedMessageIds = new Set() - for (const task of tasksAffectingUIDs) { - const {props: {messageId, threadId}} = task.syncbackRequestObject() - if (messageId) { - if (!affectedMessageIds.has(messageId)) { - tasksToProcess.push(task) - affectedMessageIds.add(messageId) - } - } else if (threadId) { - const messageIds = await Message.findAll({where: {threadId}}).map(m => m.id) - const shouldIncludeTask = messageIds.every(id => !affectedMessageIds.has(id)) - if (shouldIncludeTask) { - tasksToProcess.push(task) - messageIds.forEach(id => affectedMessageIds.add(id)) - } - } - } - return tasksToProcess - } - - async runSyncbackTask(conn, task) { - const syncbackRequest = task.syncbackRequestObject(); - console.log(`🔃 📤 ${task.description()}`, syncbackRequest.props) - try { - const responseJSON = await conn.runOperation(task); - syncbackRequest.status = "SUCCEEDED"; - syncbackRequest.responseJSON = responseJSON || {}; - console.log(`🔃 📤 ${task.description()} Succeeded`) - } catch (error) { - syncbackRequest.error = error; - syncbackRequest.status = "FAILED"; - console.error(`🔃 📤 ${task.description()} Failed`, {syncbackRequest: syncbackRequest.toJSON()}) - } finally { - await syncbackRequest.save(); - } - } - - async runNewSyncbackTasks(conn) { - // TODO Make this interruptible too! - if (!(conn instanceof IMAPConnection)) { - throw new Error('SyncbackTaskWorker requires an IMAPConnection') - } - - const tasks = await this._getNewSyncbackTasks() - if (tasks.length === 0) { return; } - for (const task of tasks) { - await this.runSyncbackTask(conn, task) - } - } -} - -module.exports = SyncbackTaskWorker