[local-sync] Make syncback task execution interruptible

Summary:
See title. Got rid of that syncback-worker class which was kind of useless and
made things harder. My b.

Test Plan: locally

Reviewers: evan

Reviewed By: evan

Differential Revision: https://phab.nylas.com/D3624
This commit is contained in:
Juan Tejada 2017-01-10 10:44:37 -08:00
parent a234570118
commit 80708dacbc
2 changed files with 116 additions and 145 deletions

View file

@ -12,7 +12,7 @@ const Interruptible = require('../shared/interruptible')
const FetchFolderList = require('./imap/fetch-folder-list')
const FetchMessagesInFolder = require('./imap/fetch-messages-in-folder')
const SyncMetricsReporter = require('./sync-metrics-reporter');
const SyncbackTaskWorker = require('./syncback-task-worker');
const SyncbackTaskFactory = require('./syncback-task-factory');
const LocalSyncDeltaEmitter = require('./local-sync-delta-emitter').default
@ -22,9 +22,8 @@ class SyncWorker {
this._manager = parentManager;
this._conn = null;
this._account = account;
this._currentSyncOperation = null
this._currentOperation = null
this._interruptible = new Interruptible()
this._syncbackTaskWorker = new SyncbackTaskWorker(account, db)
this._localDeltas = new LocalSyncDeltaEmitter(db, account.id)
this._startTime = Date.now();
@ -178,6 +177,96 @@ class SyncWorker {
this._conn = null
}
/**
* Returns a list of at most 100 Syncback requests, sorted by creation date
* (older first) and by how they affect message IMAP uids.
*
* We want to make sure that we run the tasks that affect IMAP uids last, and
* that we don't run 2 tasks that will affect the same set of UIDS together,
* i.e. without running a sync loop in between them.
*
* For example, if there's a task to change the labels of a message, and also
* a task to move that message to another folder, we need to run the label
* change /first/, otherwise the message would be moved and it would receive a
* new IMAP uid, and then attempting to change labels with an old uid would
* fail.
*/
async _getNewSyncbackTasks() {
const {SyncbackRequest, Message} = this._db;
const where = {
limit: 100,
where: {status: "NEW"},
order: [['createdAt', 'ASC']],
};
const tasks = await SyncbackRequest.findAll(where)
.map((req) => SyncbackTaskFactory.create(this._account, req))
if (tasks.length === 0) { return [] }
// TODO prioritize Send!
const tasksToProcess = tasks.filter(t => !t.affectsImapMessageUIDs())
const tasksAffectingUIDs = tasks.filter(t => t.affectsImapMessageUIDs())
const changeFolderTasks = tasksAffectingUIDs.filter(t =>
t.description() === 'RenameFolder' || t.description() === 'DeleteFolder'
)
if (changeFolderTasks.length > 0) {
// If we are renaming or deleting folders, those are the only tasks we
// want to process before executing any other tasks that may change uids.
// These operations may not change the uids of their messages, but we
// can't guarantee it, so to make sure, we will just run these.
const affectedFolderIds = new Set()
changeFolderTasks.forEach((task) => {
const {props: {folderId}} = task.syncbackRequestObject()
if (folderId && !affectedFolderIds.has(folderId)) {
tasksToProcess.push(task)
affectedFolderIds.add(folderId)
}
})
return tasksToProcess
}
// Otherwise, make sure that we don't process more than 1 task that will affect
// the UID of the same message
const affectedMessageIds = new Set()
for (const task of tasksAffectingUIDs) {
const {props: {messageId, threadId}} = task.syncbackRequestObject()
if (messageId) {
if (!affectedMessageIds.has(messageId)) {
tasksToProcess.push(task)
affectedMessageIds.add(messageId)
}
} else if (threadId) {
const messageIds = await Message.findAll({where: {threadId}}).map(m => m.id)
const shouldIncludeTask = messageIds.every(id => !affectedMessageIds.has(id))
if (shouldIncludeTask) {
tasksToProcess.push(task)
messageIds.forEach(id => affectedMessageIds.add(id))
}
}
}
return tasksToProcess
}
async _runSyncbackTask(task) {
const syncbackRequest = task.syncbackRequestObject();
console.log(`🔃 📤 ${task.description()}`, syncbackRequest.props)
try {
const responseJSON = await this._conn.runOperation(task);
syncbackRequest.status = "SUCCEEDED";
syncbackRequest.responseJSON = responseJSON || {};
console.log(`🔃 📤 ${task.description()} Succeeded`)
} catch (error) {
syncbackRequest.error = error;
syncbackRequest.status = "FAILED";
console.error(`🔃 📤 ${task.description()} Failed`, {syncbackRequest: syncbackRequest.toJSON()})
} finally {
await syncbackRequest.save();
}
}
async _getFoldersToSync() {
const {Folder} = this._db;
@ -237,7 +326,9 @@ class SyncWorker {
// Start idling on the inbox
const inbox = await this._getInboxFolder();
await this._conn.openBox(inbox.name);
if (inbox) {
await this._conn.openBox(inbox.name);
}
// this._logger.info('SyncWorker: Idling on inbox folder');
}
@ -277,19 +368,30 @@ class SyncWorker {
}, nextSyncIn);
}
async _runOperation(operation) {
this._currentOperation = operation
await this._conn.runOperation(this._currentOperation)
this._currentOperation = null
}
// This function is interruptible. See Interruptible
async * _performSync() {
yield this._account.update({syncError: null});
yield this._ensureConnection();
yield this._syncbackTaskWorker.runNewSyncbackTasks(this._conn);
this._currentSyncOperation = new FetchFolderList(this._account, this._logger)
await this._conn.runOperation(this._currentSyncOperation)
this._currentSyncOperation = null
// Step 1: Run any available syncback tasks
const tasks = yield this._getNewSyncbackTasks()
for (const task of tasks) {
await this._runSyncbackTask(task)
yield // Yield to allow interruption
}
// Yield to allow interruption
yield
// Step 2: Fetch the folder list. We need to run this before syncing folders
// because we need folders to sync!
await this._runOperation(new FetchFolderList(this._account, this._logger))
yield // Yield to allow interruption
// Step 3: Sync each folder, sorted by inbox first
// TODO prioritize syncing all of inbox first if there's a ton of folders (e.g. imap
// accounts). If there are many folders, we would only sync the first n
// messages in the inbox and not go back to it until we've done the same for
@ -300,12 +402,8 @@ class SyncWorker {
const sortedFolders = yield this._getFoldersToSync()
const {folderSyncOptions} = this._account.syncPolicy;
for (const folder of sortedFolders) {
this._currentSyncOperation = new FetchMessagesInFolder(folder, folderSyncOptions, this._logger)
await this._conn.runOperation(this._currentSyncOperation)
this._currentSyncOperation = null
// Yield to allow interruption
yield
this._runOperation(new FetchMessagesInFolder(folder, folderSyncOptions, this._logger))
yield // Yield to allow interruption
}
}
@ -351,8 +449,8 @@ class SyncWorker {
interrupt() {
this._interruptible.interrupt()
if (this._currentSyncOperation) {
this._currentSyncOperation.interrupt()
if (this._currentOperation) {
this._currentOperation.interrupt()
}
this._interrupted = true
}

View file

@ -1,127 +0,0 @@
const {
IMAPConnection,
} = require('isomorphic-core');
const SyncbackTaskFactory = require('./syncback-task-factory')
/**
* SyncbackTaskWorker runs newly available syncback requests
*/
class SyncbackTaskWorker {
constructor(account, db) {
if (!account) {
throw new Error('SyncbackTaskWorker requires an account')
}
if (!db) {
throw new Error('SyncbackTaskWorker requires a db instance')
}
this._account = account
this._db = db
}
/**
* Returns a list of at most 100 Syncback requests, sorted by creation date
* (older first) and by how they affect message IMAP uids.
*
* We want to make sure that we run the tasks that affect IMAP uids last, and
* that we don't run 2 tasks that will affect the same set of UIDS together,
* i.e. without running a sync loop in between them.
*
* For example, if there's a task to change the labels of a message, and also
* a task to move that message to another folder, we need to run the label
* change /first/, otherwise the message would be moved and it would receive a
* new IMAP uid, and then attempting to change labels with an old uid would
* fail.
*/
async _getNewSyncbackTasks() {
const {SyncbackRequest, Message} = this._db;
const where = {
limit: 100,
where: {status: "NEW"},
order: [['createdAt', 'ASC']],
};
const tasks = await SyncbackRequest.findAll(where)
.map((req) => SyncbackTaskFactory.create(this._account, req))
if (tasks.length === 0) { return [] }
// TODO prioritize Send!
const tasksToProcess = tasks.filter(t => !t.affectsImapMessageUIDs())
const tasksAffectingUIDs = tasks.filter(t => t.affectsImapMessageUIDs())
const changeFolderTasks = tasksAffectingUIDs.filter(t =>
t.description() === 'RenameFolder' || t.description() === 'DeleteFolder'
)
if (changeFolderTasks.length > 0) {
// If we are renaming or deleting folders, those are the only tasks we
// want to process before executing any other tasks that may change uids.
// These operations may not change the uids of their messages, but we
// can't guarantee it, so to make sure, we will just run these.
const affectedFolderIds = new Set()
changeFolderTasks.forEach((task) => {
const {props: {folderId}} = task.syncbackRequestObject()
if (folderId && !affectedFolderIds.has(folderId)) {
tasksToProcess.push(task)
affectedFolderIds.add(folderId)
}
})
return tasksToProcess
}
// Otherwise, make sure that we don't process more than 1 task that will affect
// the UID of the same message
const affectedMessageIds = new Set()
for (const task of tasksAffectingUIDs) {
const {props: {messageId, threadId}} = task.syncbackRequestObject()
if (messageId) {
if (!affectedMessageIds.has(messageId)) {
tasksToProcess.push(task)
affectedMessageIds.add(messageId)
}
} else if (threadId) {
const messageIds = await Message.findAll({where: {threadId}}).map(m => m.id)
const shouldIncludeTask = messageIds.every(id => !affectedMessageIds.has(id))
if (shouldIncludeTask) {
tasksToProcess.push(task)
messageIds.forEach(id => affectedMessageIds.add(id))
}
}
}
return tasksToProcess
}
async runSyncbackTask(conn, task) {
const syncbackRequest = task.syncbackRequestObject();
console.log(`🔃 📤 ${task.description()}`, syncbackRequest.props)
try {
const responseJSON = await conn.runOperation(task);
syncbackRequest.status = "SUCCEEDED";
syncbackRequest.responseJSON = responseJSON || {};
console.log(`🔃 📤 ${task.description()} Succeeded`)
} catch (error) {
syncbackRequest.error = error;
syncbackRequest.status = "FAILED";
console.error(`🔃 📤 ${task.description()} Failed`, {syncbackRequest: syncbackRequest.toJSON()})
} finally {
await syncbackRequest.save();
}
}
async runNewSyncbackTasks(conn) {
// TODO Make this interruptible too!
if (!(conn instanceof IMAPConnection)) {
throw new Error('SyncbackTaskWorker requires an IMAPConnection')
}
const tasks = await this._getNewSyncbackTasks()
if (tasks.length === 0) { return; }
for (const task of tasks) {
await this.runSyncbackTask(conn, task)
}
}
}
module.exports = SyncbackTaskWorker