[local-sync] syncback(Part 1): Refactor syncback-task-helpers

Summary:
Instead of exposing helper functions, make this a class to hold the
shared state of the db, account, and logger required to run any syncback
inside an account sync loop.

Test Plan: manual

Reviewers: mark, spang, halla, evan

Reviewed By: spang, halla, evan

Differential Revision: https://phab.nylas.com/D3893
This commit is contained in:
Juan Tejada 2017-02-11 00:48:40 -08:00
parent dbb404ccba
commit 1e74be3b94
4 changed files with 205 additions and 173 deletions

View file

@ -1,7 +1,7 @@
const Joi = require('joi');
const Utils = require('../../shared/utils');
const SyncbackTaskFactory = require('../../local-sync-worker/syncback-task-factory');
const {runSyncbackTask} = require('../../local-sync-worker/syncback-task-helpers');
const SyncbackTaskRunner = require('../../local-sync-worker/syncback-task-runner').default;
const {createAndReplyWithSyncbackRequest} = require('../route-helpers');
@ -39,10 +39,14 @@ module.exports = (server) => {
const sendTask = SyncbackTaskFactory.create(account, syncbackRequest)
const db = await request.getAccountDatabase()
await runSyncbackTask({
const runner = new SyncbackTaskRunner({
db,
account,
logger: request.logger.child(),
})
await runner.runSyncbackTask({
task: sendTask,
runTask: (t) => t.run(db),
logger: request.logger.child(),
})
},
});
@ -69,10 +73,14 @@ module.exports = (server) => {
const sendTask = SyncbackTaskFactory.create(account, syncbackRequest)
const db = await request.getAccountDatabase()
await runSyncbackTask({
const runner = new SyncbackTaskRunner({
db,
account,
logger: request.logger.child(),
})
await runner.runSyncbackTask({
task: sendTask,
runTask: (t) => t.run(db),
logger: request.logger.child(),
})
},
});

View file

@ -14,7 +14,7 @@ const {
const Interruptible = require('../shared/interruptible')
const SyncMetricsReporter = require('./sync-metrics-reporter');
const SyncTaskFactory = require('./sync-task-factory');
const {getNewSyncbackTasks, markInProgressTasksAsFailed, runSyncbackTask} = require('./syncback-task-helpers');
const SyncbackTaskRunner = require('./syncback-task-runner').default;
const LocalSyncDeltaEmitter = require('./local-sync-delta-emitter').default
@ -28,13 +28,18 @@ class SyncWorker {
this._conn = null;
this._account = account;
this._currentTask = null
this._mailListenerConn = null
this._interruptible = new Interruptible()
this._localDeltas = new LocalSyncDeltaEmitter(db, account.id)
this._mailListenerConn = null
this._logger = global.Logger.forAccount(account)
this._syncbackTaskRunner = new SyncbackTaskRunner({
db,
account,
logger: this._logger,
})
this._startTime = Date.now()
this._lastSyncTime = null
this._logger = global.Logger.forAccount(account)
this._interrupted = false
this._syncInProgress = false
this._stopped = false
@ -416,7 +421,7 @@ class SyncWorker {
yield this._ensureMailListenerConnection();
// Step 1: Mark all "INPROGRESS" tasks as failed.
await markInProgressTasksAsFailed({db: this._db})
await this._syncbackTaskRunner.markInProgressTasksAsFailed()
yield // Yield to allow interruption
// Step 2: Run any available syncback tasks
@ -426,12 +431,11 @@ class SyncWorker {
// (e.g. marking as unread or starred). We need to listen to that event for
// when updates are performed from another mail client, but ignore
// them when they are caused from within N1 to prevent unecessary interrupts
const tasks = yield getNewSyncbackTasks({db: this._db, account: this._account})
const tasks = yield this._syncbackTaskRunner.getNewSyncbackTasks()
this._shouldIgnoreInboxFlagUpdates = true
for (const task of tasks) {
const {shouldRetry} = await runSyncbackTask({
const {shouldRetry} = await this._syncbackTaskRunner.runSyncbackTask({
task,
logger: this._logger,
runTask: (t) => this._conn.runOperation(t),
})
if (shouldRetry) {

View file

@ -1,161 +0,0 @@
const {Actions} = require('nylas-exports')
const {IMAPErrors} = require('isomorphic-core')
const SyncbackTaskFactory = require('./syncback-task-factory');
const MAX_TASK_RETRIES = 2
// TODO NOTE! These are the tasks we exclude from the sync loop. This should be
// refactored later.
export const SendTaskTypes = ['SendMessage', 'SendMessagePerRecipient']
/**
* Returns a list of at most 100 Syncback requests, sorted by creation date
* (older first) and by how they affect message IMAP uids.
*
* We want to make sure that we run the tasks that affect IMAP uids last, and
* that we don't run 2 tasks that will affect the same set of UIDS together,
* i.e. without running a sync loop in between them.
*
* For example, if there's a task to change the labels of a message, and also
* a task to move that message to another folder, we need to run the label
* change /first/, otherwise the message would be moved and it would receive a
* new IMAP uid, and then attempting to change labels with an old uid would
* fail.
*
* TODO NOTE: This function excludes Send tasks because these are run outside fo the
* sync loop for performance reasons.
*/
export async function getNewSyncbackTasks({db, account} = {}) {
const {SyncbackRequest, Message} = db;
const ensureSentFolderTasks = await SyncbackRequest.findAll({
limit: 100,
where: {type: ['EnsureMessageInSentFolder'], status: 'NEW'},
order: [['createdAt', 'ASC']],
})
.map((req) => SyncbackTaskFactory.create(account, req))
const tasks = await SyncbackRequest.findAll({
limit: 100,
where: {type: {$notIn: [...SendTaskTypes, 'EnsureMessageInSentFolder']}, status: 'NEW'},
order: [['createdAt', 'ASC']],
})
.map((req) => SyncbackTaskFactory.create(account, req))
if (ensureSentFolderTasks.length === 0 && tasks.length === 0) { return [] }
const tasksToProcess = [
...ensureSentFolderTasks,
...tasks.filter(t => !t.affectsImapMessageUIDs()),
]
const tasksAffectingUIDs = tasks.filter(t => t.affectsImapMessageUIDs())
const changeFolderTasks = tasksAffectingUIDs.filter(t =>
t.description() === 'RenameFolder' || t.description() === 'DeleteFolder'
)
if (changeFolderTasks.length > 0) {
// If we are renaming or deleting folders, those are the only tasks we
// want to process before executing any other tasks that may change uids.
// These operations may not change the uids of their messages, but we
// can't guarantee it, so to make sure, we will just run these.
const affectedFolderIds = new Set()
changeFolderTasks.forEach((task) => {
const {props: {folderId}} = task.syncbackRequestObject()
if (folderId && !affectedFolderIds.has(folderId)) {
tasksToProcess.push(task)
affectedFolderIds.add(folderId)
}
})
return tasksToProcess
}
// Otherwise, make sure that we don't process more than 1 task that will affect
// the UID of the same message
const affectedMessageIds = new Set()
for (const task of tasksAffectingUIDs) {
const {props: {messageId, threadId}} = task.syncbackRequestObject()
if (messageId) {
if (!affectedMessageIds.has(messageId)) {
tasksToProcess.push(task)
affectedMessageIds.add(messageId)
}
} else if (threadId) {
const messageIds = await Message.findAll({where: {threadId}}).map(m => m.id)
const shouldIncludeTask = messageIds.every(id => !affectedMessageIds.has(id))
if (shouldIncludeTask) {
tasksToProcess.push(task)
messageIds.forEach(id => affectedMessageIds.add(id))
}
}
}
return tasksToProcess
}
export async function markInProgressTasksAsFailed({db} = {}) {
// We use a very limited type of two-phase commit: before we start
// running a syncback task, we mark it as "in progress". If something
// happens during the syncback (the worker window crashes, or the power
// goes down), the task won't succeed or fail.
// We absolutely never want to retry such a task, so we mark it as failed
// at the next sync iteration. We use this function for that.
const {SyncbackRequest} = db;
const inProgressTasks = await SyncbackRequest.findAll({
// TODO this is a hack
// NOTE: We exclude SendTaskTypes because they are run outside of the sync loop
where: {type: {$notIn: SendTaskTypes}, status: 'INPROGRESS'},
});
for (const inProgress of inProgressTasks) {
inProgress.status = 'FAILED';
inProgress.error = new Error('Lingering task in progress was marked as failed')
await inProgress.save();
}
}
// TODO JUAN! remove this uglyness that is runTask
export async function runSyncbackTask({task, runTask, logger = console} = {}) {
const before = new Date();
const syncbackRequest = task.syncbackRequestObject();
let shouldRetry = false
logger.log(`🔃 📤 ${task.description()}`, syncbackRequest.props)
try {
// Before anything, mark the task as in progress. This allows
// us to not run the same task twice.
syncbackRequest.status = "INPROGRESS";
await syncbackRequest.save();
// TODO `runTask` is a hack to allow tasks to be executed outside the
// context of an imap connection, specifically to allow running send tasks
// outside of the sync loop. This should be solved in a better way or
// probably refactored when we implement the sync scheduler
const responseJSON = await runTask(task)
syncbackRequest.status = "SUCCEEDED";
syncbackRequest.responseJSON = responseJSON || {};
const after = new Date();
logger.log(`🔃 📤 ${task.description()} Succeeded (${after.getTime() - before.getTime()}ms)`)
} catch (error) {
const after = new Date();
const {numRetries = 0} = syncbackRequest.props
if (error instanceof IMAPErrors.RetryableError && numRetries < MAX_TASK_RETRIES) {
Actions.recordUserEvent('Retrying syncback task', {numRetries})
shouldRetry = true
// We save this in `props` to avoid a db migration
syncbackRequest.props = Object.assign({}, syncbackRequest.props, {
numRetries: numRetries + 1,
})
syncbackRequest.status = "NEW";
logger.warn(`🔃 📤 ${task.description()} Failed with retryable error, retrying in next loop (${after.getTime() - before.getTime()}ms)`, {syncbackRequest: syncbackRequest.toJSON(), error})
} else {
error.message = `Syncback Task Failed: ${error.message}`
syncbackRequest.error = error;
syncbackRequest.status = "FAILED";
NylasEnv.reportError(error);
logger.error(`🔃 📤 ${task.description()} Failed (${after.getTime() - before.getTime()}ms)`, {syncbackRequest: syncbackRequest.toJSON(), error})
}
} finally {
await syncbackRequest.save();
}
return {shouldRetry}
}

View file

@ -0,0 +1,181 @@
const {Actions} = require('nylas-exports')
const {IMAPErrors} = require('isomorphic-core')
const SyncbackTaskFactory = require('./syncback-task-factory');
const MAX_TASK_RETRIES = 2
// TODO NOTE! These are the tasks we exclude from the sync loop. This should be
// refactored later.
export const SendTaskTypes = ['SendMessage', 'SendMessagePerRecipient']
class SyncbackTaskRunner {
constructor({db, account, logger} = {}) {
if (!db) {
throw new Error('SyncbackTaskRunner: need to pass db')
}
if (!account) {
throw new Error('SyncbackTaskRunner: need to pass account')
}
if (!logger) {
throw new Error('SyncbackTaskRunner: need to pass logger')
}
this._db = db
this._account = account
this._logger = logger
}
/**
* Returns a list of at most 100 Syncback requests, sorted by creation date
* (older first) and by how they affect message IMAP uids.
*
* We want to make sure that we run the tasks that affect IMAP uids last, and
* that we don't run 2 tasks that will affect the same set of UIDS together,
* i.e. without running a sync loop in between them.
*
* For example, if there's a task to change the labels of a message, and also
* a task to move that message to another folder, we need to run the label
* change /first/, otherwise the message would be moved and it would receive a
* new IMAP uid, and then attempting to change labels with an old uid would
* fail.
*
* TODO NOTE: This function excludes Send tasks because these are run outside fo the
* sync loop for performance reasons.
*/
async getNewSyncbackTasks() {
const {SyncbackRequest, Message} = this._db;
const ensureSentFolderTasks = await SyncbackRequest.findAll({
limit: 100,
where: {type: ['EnsureMessageInSentFolder'], status: 'NEW'},
order: [['createdAt', 'ASC']],
})
.map((req) => SyncbackTaskFactory.create(this._account, req))
const tasks = await SyncbackRequest.findAll({
limit: 100,
where: {type: {$notIn: [...SendTaskTypes, 'EnsureMessageInSentFolder']}, status: 'NEW'},
order: [['createdAt', 'ASC']],
})
.map((req) => SyncbackTaskFactory.create(this._account, req))
if (ensureSentFolderTasks.length === 0 && tasks.length === 0) { return [] }
const tasksToProcess = [
...ensureSentFolderTasks,
...tasks.filter(t => !t.affectsImapMessageUIDs()),
]
const tasksAffectingUIDs = tasks.filter(t => t.affectsImapMessageUIDs())
const changeFolderTasks = tasksAffectingUIDs.filter(t =>
t.description() === 'RenameFolder' || t.description() === 'DeleteFolder'
)
if (changeFolderTasks.length > 0) {
// If we are renaming or deleting folders, those are the only tasks we
// want to process before executing any other tasks that may change uids.
// These operations may not change the uids of their messages, but we
// can't guarantee it, so to make sure, we will just run these.
const affectedFolderIds = new Set()
changeFolderTasks.forEach((task) => {
const {props: {folderId}} = task.syncbackRequestObject()
if (folderId && !affectedFolderIds.has(folderId)) {
tasksToProcess.push(task)
affectedFolderIds.add(folderId)
}
})
return tasksToProcess
}
// Otherwise, make sure that we don't process more than 1 task that will affect
// the UID of the same message
const affectedMessageIds = new Set()
for (const task of tasksAffectingUIDs) {
const {props: {messageId, threadId}} = task.syncbackRequestObject()
if (messageId) {
if (!affectedMessageIds.has(messageId)) {
tasksToProcess.push(task)
affectedMessageIds.add(messageId)
}
} else if (threadId) {
const messageIds = await Message.findAll({where: {threadId}}).map(m => m.id)
const shouldIncludeTask = messageIds.every(id => !affectedMessageIds.has(id))
if (shouldIncludeTask) {
tasksToProcess.push(task)
messageIds.forEach(id => affectedMessageIds.add(id))
}
}
}
return tasksToProcess
}
async markInProgressTasksAsFailed() {
// We use a very limited type of two-phase commit: before we start
// running a syncback task, we mark it as "in progress". If something
// happens during the syncback (the worker window crashes, or the power
// goes down), the task won't succeed or fail.
// We absolutely never want to retry such a task, so we mark it as failed
// at the next sync iteration. We use this function for that.
const {SyncbackRequest} = this._db;
const inProgressTasks = await SyncbackRequest.findAll({
// TODO this is a hack
// NOTE: We exclude SendTaskTypes because they are run outside of the sync loop
where: {type: {$notIn: SendTaskTypes}, status: 'INPROGRESS'},
});
for (const inProgress of inProgressTasks) {
inProgress.status = 'FAILED';
inProgress.error = new Error('Lingering task in progress was marked as failed')
await inProgress.save();
}
}
// TODO JUAN! remove this uglyness that is runTask
async runSyncbackTask({task, runTask} = {}) {
const before = new Date();
const syncbackRequest = task.syncbackRequestObject();
let shouldRetry = false
this._logger.log(`🔃 📤 ${task.description()}`, syncbackRequest.props)
try {
// Before anything, mark the task as in progress. This allows
// us to not run the same task twice.
syncbackRequest.status = "INPROGRESS";
await syncbackRequest.save();
// TODO `runTask` is a hack to allow tasks to be executed outside the
// context of an imap connection, specifically to allow running send tasks
// outside of the sync loop. This should be solved in a better way or
// probably refactored when we implement the sync scheduler
const responseJSON = await runTask(task)
syncbackRequest.status = "SUCCEEDED";
syncbackRequest.responseJSON = responseJSON || {};
const after = new Date();
this._logger.log(`🔃 📤 ${task.description()} Succeeded (${after.getTime() - before.getTime()}ms)`)
} catch (error) {
const after = new Date();
const {numRetries = 0} = syncbackRequest.props
if (error instanceof IMAPErrors.RetryableError && numRetries < MAX_TASK_RETRIES) {
Actions.recordUserEvent('Retrying syncback task', {numRetries})
shouldRetry = true
// We save this in `props` to avoid a db migration
syncbackRequest.props = Object.assign({}, syncbackRequest.props, {
numRetries: numRetries + 1,
})
syncbackRequest.status = "NEW";
this._logger.warn(`🔃 📤 ${task.description()} Failed with retryable error, retrying in next loop (${after.getTime() - before.getTime()}ms)`, {syncbackRequest: syncbackRequest.toJSON(), error})
} else {
error.message = `Syncback Task Failed: ${error.message}`
syncbackRequest.error = error;
syncbackRequest.status = "FAILED";
NylasEnv.reportError(error);
this._logger.error(`🔃 📤 ${task.description()} Failed (${after.getTime() - before.getTime()}ms)`, {syncbackRequest: syncbackRequest.toJSON(), error})
}
} finally {
await syncbackRequest.save();
}
return {shouldRetry}
}
}
export default SyncbackTaskRunner