From 1e74be3b94e50253e5bb9892cea6b39bad0396d6 Mon Sep 17 00:00:00 2001 From: Juan Tejada Date: Sat, 11 Feb 2017 00:48:40 -0800 Subject: [PATCH] [local-sync] syncback(Part 1): Refactor syncback-task-helpers Summary: Instead of exposing helper functions, make this a class to hold the shared state of the db, account, and logger required to run any syncback inside an account sync loop. Test Plan: manual Reviewers: mark, spang, halla, evan Reviewed By: spang, halla, evan Differential Revision: https://phab.nylas.com/D3893 --- .../local-sync/src/local-api/routes/send.js | 18 +- .../src/local-sync-worker/sync-worker.es6 | 18 +- .../syncback-task-helpers.es6 | 161 ---------------- .../syncback-task-runner.es6 | 181 ++++++++++++++++++ 4 files changed, 205 insertions(+), 173 deletions(-) delete mode 100644 packages/local-sync/src/local-sync-worker/syncback-task-helpers.es6 create mode 100644 packages/local-sync/src/local-sync-worker/syncback-task-runner.es6 diff --git a/packages/local-sync/src/local-api/routes/send.js b/packages/local-sync/src/local-api/routes/send.js index 000b44920..68cca8753 100644 --- a/packages/local-sync/src/local-api/routes/send.js +++ b/packages/local-sync/src/local-api/routes/send.js @@ -1,7 +1,7 @@ const Joi = require('joi'); const Utils = require('../../shared/utils'); const SyncbackTaskFactory = require('../../local-sync-worker/syncback-task-factory'); -const {runSyncbackTask} = require('../../local-sync-worker/syncback-task-helpers'); +const SyncbackTaskRunner = require('../../local-sync-worker/syncback-task-runner').default; const {createAndReplyWithSyncbackRequest} = require('../route-helpers'); @@ -39,10 +39,14 @@ module.exports = (server) => { const sendTask = SyncbackTaskFactory.create(account, syncbackRequest) const db = await request.getAccountDatabase() - await runSyncbackTask({ + const runner = new SyncbackTaskRunner({ + db, + account, + logger: request.logger.child(), + }) + await runner.runSyncbackTask({ task: sendTask, runTask: (t) => t.run(db), - logger: request.logger.child(), }) }, }); @@ -69,10 +73,14 @@ module.exports = (server) => { const sendTask = SyncbackTaskFactory.create(account, syncbackRequest) const db = await request.getAccountDatabase() - await runSyncbackTask({ + const runner = new SyncbackTaskRunner({ + db, + account, + logger: request.logger.child(), + }) + await runner.runSyncbackTask({ task: sendTask, runTask: (t) => t.run(db), - logger: request.logger.child(), }) }, }); diff --git a/packages/local-sync/src/local-sync-worker/sync-worker.es6 b/packages/local-sync/src/local-sync-worker/sync-worker.es6 index 86928ca76..14a0b5036 100644 --- a/packages/local-sync/src/local-sync-worker/sync-worker.es6 +++ b/packages/local-sync/src/local-sync-worker/sync-worker.es6 @@ -14,7 +14,7 @@ const { const Interruptible = require('../shared/interruptible') const SyncMetricsReporter = require('./sync-metrics-reporter'); const SyncTaskFactory = require('./sync-task-factory'); -const {getNewSyncbackTasks, markInProgressTasksAsFailed, runSyncbackTask} = require('./syncback-task-helpers'); +const SyncbackTaskRunner = require('./syncback-task-runner').default; const LocalSyncDeltaEmitter = require('./local-sync-delta-emitter').default @@ -28,13 +28,18 @@ class SyncWorker { this._conn = null; this._account = account; this._currentTask = null + this._mailListenerConn = null this._interruptible = new Interruptible() this._localDeltas = new LocalSyncDeltaEmitter(db, account.id) - this._mailListenerConn = null + this._logger = global.Logger.forAccount(account) + this._syncbackTaskRunner = new SyncbackTaskRunner({ + db, + account, + logger: this._logger, + }) this._startTime = Date.now() this._lastSyncTime = null - this._logger = global.Logger.forAccount(account) this._interrupted = false this._syncInProgress = false this._stopped = false @@ -416,7 +421,7 @@ class SyncWorker { yield this._ensureMailListenerConnection(); // Step 1: Mark all "INPROGRESS" tasks as failed. - await markInProgressTasksAsFailed({db: this._db}) + await this._syncbackTaskRunner.markInProgressTasksAsFailed() yield // Yield to allow interruption // Step 2: Run any available syncback tasks @@ -426,12 +431,11 @@ class SyncWorker { // (e.g. marking as unread or starred). We need to listen to that event for // when updates are performed from another mail client, but ignore // them when they are caused from within N1 to prevent unecessary interrupts - const tasks = yield getNewSyncbackTasks({db: this._db, account: this._account}) + const tasks = yield this._syncbackTaskRunner.getNewSyncbackTasks() this._shouldIgnoreInboxFlagUpdates = true for (const task of tasks) { - const {shouldRetry} = await runSyncbackTask({ + const {shouldRetry} = await this._syncbackTaskRunner.runSyncbackTask({ task, - logger: this._logger, runTask: (t) => this._conn.runOperation(t), }) if (shouldRetry) { diff --git a/packages/local-sync/src/local-sync-worker/syncback-task-helpers.es6 b/packages/local-sync/src/local-sync-worker/syncback-task-helpers.es6 deleted file mode 100644 index 7fbee7818..000000000 --- a/packages/local-sync/src/local-sync-worker/syncback-task-helpers.es6 +++ /dev/null @@ -1,161 +0,0 @@ -const {Actions} = require('nylas-exports') -const {IMAPErrors} = require('isomorphic-core') -const SyncbackTaskFactory = require('./syncback-task-factory'); - -const MAX_TASK_RETRIES = 2 - -// TODO NOTE! These are the tasks we exclude from the sync loop. This should be -// refactored later. -export const SendTaskTypes = ['SendMessage', 'SendMessagePerRecipient'] - -/** - * Returns a list of at most 100 Syncback requests, sorted by creation date - * (older first) and by how they affect message IMAP uids. - * - * We want to make sure that we run the tasks that affect IMAP uids last, and - * that we don't run 2 tasks that will affect the same set of UIDS together, - * i.e. without running a sync loop in between them. - * - * For example, if there's a task to change the labels of a message, and also - * a task to move that message to another folder, we need to run the label - * change /first/, otherwise the message would be moved and it would receive a - * new IMAP uid, and then attempting to change labels with an old uid would - * fail. - * - * TODO NOTE: This function excludes Send tasks because these are run outside fo the - * sync loop for performance reasons. - */ -export async function getNewSyncbackTasks({db, account} = {}) { - const {SyncbackRequest, Message} = db; - - const ensureSentFolderTasks = await SyncbackRequest.findAll({ - limit: 100, - where: {type: ['EnsureMessageInSentFolder'], status: 'NEW'}, - order: [['createdAt', 'ASC']], - }) - .map((req) => SyncbackTaskFactory.create(account, req)) - const tasks = await SyncbackRequest.findAll({ - limit: 100, - where: {type: {$notIn: [...SendTaskTypes, 'EnsureMessageInSentFolder']}, status: 'NEW'}, - order: [['createdAt', 'ASC']], - }) - .map((req) => SyncbackTaskFactory.create(account, req)) - - if (ensureSentFolderTasks.length === 0 && tasks.length === 0) { return [] } - - const tasksToProcess = [ - ...ensureSentFolderTasks, - ...tasks.filter(t => !t.affectsImapMessageUIDs()), - ] - const tasksAffectingUIDs = tasks.filter(t => t.affectsImapMessageUIDs()) - - const changeFolderTasks = tasksAffectingUIDs.filter(t => - t.description() === 'RenameFolder' || t.description() === 'DeleteFolder' - ) - if (changeFolderTasks.length > 0) { - // If we are renaming or deleting folders, those are the only tasks we - // want to process before executing any other tasks that may change uids. - // These operations may not change the uids of their messages, but we - // can't guarantee it, so to make sure, we will just run these. - const affectedFolderIds = new Set() - changeFolderTasks.forEach((task) => { - const {props: {folderId}} = task.syncbackRequestObject() - if (folderId && !affectedFolderIds.has(folderId)) { - tasksToProcess.push(task) - affectedFolderIds.add(folderId) - } - }) - return tasksToProcess - } - - // Otherwise, make sure that we don't process more than 1 task that will affect - // the UID of the same message - const affectedMessageIds = new Set() - for (const task of tasksAffectingUIDs) { - const {props: {messageId, threadId}} = task.syncbackRequestObject() - if (messageId) { - if (!affectedMessageIds.has(messageId)) { - tasksToProcess.push(task) - affectedMessageIds.add(messageId) - } - } else if (threadId) { - const messageIds = await Message.findAll({where: {threadId}}).map(m => m.id) - const shouldIncludeTask = messageIds.every(id => !affectedMessageIds.has(id)) - if (shouldIncludeTask) { - tasksToProcess.push(task) - messageIds.forEach(id => affectedMessageIds.add(id)) - } - } - } - return tasksToProcess -} - -export async function markInProgressTasksAsFailed({db} = {}) { - // We use a very limited type of two-phase commit: before we start - // running a syncback task, we mark it as "in progress". If something - // happens during the syncback (the worker window crashes, or the power - // goes down), the task won't succeed or fail. - // We absolutely never want to retry such a task, so we mark it as failed - // at the next sync iteration. We use this function for that. - const {SyncbackRequest} = db; - const inProgressTasks = await SyncbackRequest.findAll({ - // TODO this is a hack - // NOTE: We exclude SendTaskTypes because they are run outside of the sync loop - where: {type: {$notIn: SendTaskTypes}, status: 'INPROGRESS'}, - }); - - for (const inProgress of inProgressTasks) { - inProgress.status = 'FAILED'; - inProgress.error = new Error('Lingering task in progress was marked as failed') - await inProgress.save(); - } -} - -// TODO JUAN! remove this uglyness that is runTask -export async function runSyncbackTask({task, runTask, logger = console} = {}) { - const before = new Date(); - const syncbackRequest = task.syncbackRequestObject(); - let shouldRetry = false - - logger.log(`🔃 📤 ${task.description()}`, syncbackRequest.props) - try { - // Before anything, mark the task as in progress. This allows - // us to not run the same task twice. - syncbackRequest.status = "INPROGRESS"; - await syncbackRequest.save(); - - // TODO `runTask` is a hack to allow tasks to be executed outside the - // context of an imap connection, specifically to allow running send tasks - // outside of the sync loop. This should be solved in a better way or - // probably refactored when we implement the sync scheduler - const responseJSON = await runTask(task) - syncbackRequest.status = "SUCCEEDED"; - syncbackRequest.responseJSON = responseJSON || {}; - - const after = new Date(); - logger.log(`🔃 📤 ${task.description()} Succeeded (${after.getTime() - before.getTime()}ms)`) - } catch (error) { - const after = new Date(); - const {numRetries = 0} = syncbackRequest.props - - if (error instanceof IMAPErrors.RetryableError && numRetries < MAX_TASK_RETRIES) { - Actions.recordUserEvent('Retrying syncback task', {numRetries}) - shouldRetry = true - // We save this in `props` to avoid a db migration - syncbackRequest.props = Object.assign({}, syncbackRequest.props, { - numRetries: numRetries + 1, - }) - syncbackRequest.status = "NEW"; - logger.warn(`🔃 📤 ${task.description()} Failed with retryable error, retrying in next loop (${after.getTime() - before.getTime()}ms)`, {syncbackRequest: syncbackRequest.toJSON(), error}) - } else { - error.message = `Syncback Task Failed: ${error.message}` - syncbackRequest.error = error; - syncbackRequest.status = "FAILED"; - NylasEnv.reportError(error); - logger.error(`🔃 📤 ${task.description()} Failed (${after.getTime() - before.getTime()}ms)`, {syncbackRequest: syncbackRequest.toJSON(), error}) - } - } finally { - await syncbackRequest.save(); - } - return {shouldRetry} -} diff --git a/packages/local-sync/src/local-sync-worker/syncback-task-runner.es6 b/packages/local-sync/src/local-sync-worker/syncback-task-runner.es6 new file mode 100644 index 000000000..03bbda74f --- /dev/null +++ b/packages/local-sync/src/local-sync-worker/syncback-task-runner.es6 @@ -0,0 +1,181 @@ +const {Actions} = require('nylas-exports') +const {IMAPErrors} = require('isomorphic-core') +const SyncbackTaskFactory = require('./syncback-task-factory'); + +const MAX_TASK_RETRIES = 2 + +// TODO NOTE! These are the tasks we exclude from the sync loop. This should be +// refactored later. +export const SendTaskTypes = ['SendMessage', 'SendMessagePerRecipient'] + +class SyncbackTaskRunner { + + constructor({db, account, logger} = {}) { + if (!db) { + throw new Error('SyncbackTaskRunner: need to pass db') + } + if (!account) { + throw new Error('SyncbackTaskRunner: need to pass account') + } + if (!logger) { + throw new Error('SyncbackTaskRunner: need to pass logger') + } + this._db = db + this._account = account + this._logger = logger + } + + /** + * Returns a list of at most 100 Syncback requests, sorted by creation date + * (older first) and by how they affect message IMAP uids. + * + * We want to make sure that we run the tasks that affect IMAP uids last, and + * that we don't run 2 tasks that will affect the same set of UIDS together, + * i.e. without running a sync loop in between them. + * + * For example, if there's a task to change the labels of a message, and also + * a task to move that message to another folder, we need to run the label + * change /first/, otherwise the message would be moved and it would receive a + * new IMAP uid, and then attempting to change labels with an old uid would + * fail. + * + * TODO NOTE: This function excludes Send tasks because these are run outside fo the + * sync loop for performance reasons. + */ + async getNewSyncbackTasks() { + const {SyncbackRequest, Message} = this._db; + + const ensureSentFolderTasks = await SyncbackRequest.findAll({ + limit: 100, + where: {type: ['EnsureMessageInSentFolder'], status: 'NEW'}, + order: [['createdAt', 'ASC']], + }) + .map((req) => SyncbackTaskFactory.create(this._account, req)) + const tasks = await SyncbackRequest.findAll({ + limit: 100, + where: {type: {$notIn: [...SendTaskTypes, 'EnsureMessageInSentFolder']}, status: 'NEW'}, + order: [['createdAt', 'ASC']], + }) + .map((req) => SyncbackTaskFactory.create(this._account, req)) + + if (ensureSentFolderTasks.length === 0 && tasks.length === 0) { return [] } + + const tasksToProcess = [ + ...ensureSentFolderTasks, + ...tasks.filter(t => !t.affectsImapMessageUIDs()), + ] + const tasksAffectingUIDs = tasks.filter(t => t.affectsImapMessageUIDs()) + + const changeFolderTasks = tasksAffectingUIDs.filter(t => + t.description() === 'RenameFolder' || t.description() === 'DeleteFolder' + ) + if (changeFolderTasks.length > 0) { + // If we are renaming or deleting folders, those are the only tasks we + // want to process before executing any other tasks that may change uids. + // These operations may not change the uids of their messages, but we + // can't guarantee it, so to make sure, we will just run these. + const affectedFolderIds = new Set() + changeFolderTasks.forEach((task) => { + const {props: {folderId}} = task.syncbackRequestObject() + if (folderId && !affectedFolderIds.has(folderId)) { + tasksToProcess.push(task) + affectedFolderIds.add(folderId) + } + }) + return tasksToProcess + } + + // Otherwise, make sure that we don't process more than 1 task that will affect + // the UID of the same message + const affectedMessageIds = new Set() + for (const task of tasksAffectingUIDs) { + const {props: {messageId, threadId}} = task.syncbackRequestObject() + if (messageId) { + if (!affectedMessageIds.has(messageId)) { + tasksToProcess.push(task) + affectedMessageIds.add(messageId) + } + } else if (threadId) { + const messageIds = await Message.findAll({where: {threadId}}).map(m => m.id) + const shouldIncludeTask = messageIds.every(id => !affectedMessageIds.has(id)) + if (shouldIncludeTask) { + tasksToProcess.push(task) + messageIds.forEach(id => affectedMessageIds.add(id)) + } + } + } + return tasksToProcess + } + + async markInProgressTasksAsFailed() { + // We use a very limited type of two-phase commit: before we start + // running a syncback task, we mark it as "in progress". If something + // happens during the syncback (the worker window crashes, or the power + // goes down), the task won't succeed or fail. + // We absolutely never want to retry such a task, so we mark it as failed + // at the next sync iteration. We use this function for that. + const {SyncbackRequest} = this._db; + const inProgressTasks = await SyncbackRequest.findAll({ + // TODO this is a hack + // NOTE: We exclude SendTaskTypes because they are run outside of the sync loop + where: {type: {$notIn: SendTaskTypes}, status: 'INPROGRESS'}, + }); + + for (const inProgress of inProgressTasks) { + inProgress.status = 'FAILED'; + inProgress.error = new Error('Lingering task in progress was marked as failed') + await inProgress.save(); + } + } + + // TODO JUAN! remove this uglyness that is runTask + async runSyncbackTask({task, runTask} = {}) { + const before = new Date(); + const syncbackRequest = task.syncbackRequestObject(); + let shouldRetry = false + + this._logger.log(`🔃 📤 ${task.description()}`, syncbackRequest.props) + try { + // Before anything, mark the task as in progress. This allows + // us to not run the same task twice. + syncbackRequest.status = "INPROGRESS"; + await syncbackRequest.save(); + + // TODO `runTask` is a hack to allow tasks to be executed outside the + // context of an imap connection, specifically to allow running send tasks + // outside of the sync loop. This should be solved in a better way or + // probably refactored when we implement the sync scheduler + const responseJSON = await runTask(task) + syncbackRequest.status = "SUCCEEDED"; + syncbackRequest.responseJSON = responseJSON || {}; + + const after = new Date(); + this._logger.log(`🔃 📤 ${task.description()} Succeeded (${after.getTime() - before.getTime()}ms)`) + } catch (error) { + const after = new Date(); + const {numRetries = 0} = syncbackRequest.props + + if (error instanceof IMAPErrors.RetryableError && numRetries < MAX_TASK_RETRIES) { + Actions.recordUserEvent('Retrying syncback task', {numRetries}) + shouldRetry = true + // We save this in `props` to avoid a db migration + syncbackRequest.props = Object.assign({}, syncbackRequest.props, { + numRetries: numRetries + 1, + }) + syncbackRequest.status = "NEW"; + this._logger.warn(`🔃 📤 ${task.description()} Failed with retryable error, retrying in next loop (${after.getTime() - before.getTime()}ms)`, {syncbackRequest: syncbackRequest.toJSON(), error}) + } else { + error.message = `Syncback Task Failed: ${error.message}` + syncbackRequest.error = error; + syncbackRequest.status = "FAILED"; + NylasEnv.reportError(error); + this._logger.error(`🔃 📤 ${task.description()} Failed (${after.getTime() - before.getTime()}ms)`, {syncbackRequest: syncbackRequest.toJSON(), error}) + } + } finally { + await syncbackRequest.save(); + } + return {shouldRetry} + } +} + +export default SyncbackTaskRunner