mirror of
https://github.com/Foundry376/Mailspring.git
synced 2025-09-07 21:24:24 +08:00
[local-sync] Fix syncback tasks execution
Summary: We were doing all of this fancy filtering and sorting to determine which tasks to run, but in the end, we were just running the whole unsorted list (using the wrong variable) Also extracted getting the list of tasks into its own function, for easier unit tests (it should really have some) And, also wanted to make sure other people looked at this code, since I believe no one has before. Test Plan: todo Reviewers: mark, khamidou, spang, halla Reviewed By: halla Differential Revision: https://phab.nylas.com/D3516
This commit is contained in:
parent
2d145800ae
commit
5189407e38
1 changed files with 74 additions and 55 deletions
|
@ -84,6 +84,77 @@ class SyncWorker {
|
|||
return this._db.Folder.find({where: {role: ['all', 'inbox']}})
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a list of at most 100 Syncback requests, sorted by creation date
|
||||
* (older first) and by how they affect message IMAP uids.
|
||||
*
|
||||
* We want to make sure that we run the tasks that affect IMAP uids last, and
|
||||
* that we don't run 2 tasks that will affect the same set of UIDS together,
|
||||
* i.e. without running a sync loop in between.
|
||||
*
|
||||
* For example, if there's a task to change the labels of a message, and also
|
||||
* a task to move that message to another folder, we need to run the label
|
||||
* change /first/, otherwise the message would be moved and it would receive a
|
||||
* new IMAP uid, and then attempting to change labels with an old uid would
|
||||
* fail.
|
||||
*/
|
||||
async _getNewSyncbackTasks() {
|
||||
const {SyncbackRequest, Message} = this._db;
|
||||
const where = {
|
||||
limit: 100,
|
||||
where: {status: "NEW"},
|
||||
order: [['createdAt', 'ASC']],
|
||||
};
|
||||
|
||||
const tasks = await SyncbackRequest.findAll(where)
|
||||
.map((req) => SyncbackTaskFactory.create(this._account, req))
|
||||
|
||||
if (tasks.length === 0) { return [] }
|
||||
|
||||
const tasksToProcess = tasks.filter(t => !t.affectsImapMessageUIDs())
|
||||
const tasksAffectingUIDs = tasks.filter(t => t.affectsImapMessageUIDs())
|
||||
|
||||
const changeFolderTasks = tasksAffectingUIDs.filter(t =>
|
||||
t.description() === 'RenameFolder' || t.description() === 'DeleteFolder'
|
||||
)
|
||||
if (changeFolderTasks.length > 0) {
|
||||
// If we are renaming or deleting folders, those are the only tasks we
|
||||
// want to process before executing any other tasks that may change uids.
|
||||
// These operations may not change the uids of their messages, but we
|
||||
// can't guarantee it, so to make sure, we will just run these.
|
||||
const affectedFolderIds = new Set()
|
||||
changeFolderTasks.forEach((task) => {
|
||||
const {props: {folderId}} = task.syncbackRequestObject()
|
||||
if (folderId && !affectedFolderIds.has(folderId)) {
|
||||
tasksToProcess.push(task)
|
||||
affectedFolderIds.add(folderId)
|
||||
}
|
||||
})
|
||||
return tasksToProcess
|
||||
}
|
||||
|
||||
// Otherwise, make sure that we don't process more than 1 task that will affect
|
||||
// the UID of the same message
|
||||
const affectedMessageIds = new Set()
|
||||
for (const task of tasksAffectingUIDs) {
|
||||
const {props: {messageId, threadId}} = task.syncbackRequestObject()
|
||||
if (messageId) {
|
||||
if (!affectedMessageIds.has(messageId)) {
|
||||
tasksToProcess.push(task)
|
||||
affectedMessageIds.add(messageId)
|
||||
}
|
||||
} else if (threadId) {
|
||||
const messageIds = await Message.findAll({where: {threadId}}).map(m => m.id)
|
||||
const shouldIncludeTask = messageIds.every(id => !affectedMessageIds.has(id))
|
||||
if (shouldIncludeTask) {
|
||||
tasksToProcess.push(task)
|
||||
messageIds.forEach(id => affectedMessageIds.add(id))
|
||||
}
|
||||
}
|
||||
}
|
||||
return tasksToProcess
|
||||
}
|
||||
|
||||
async ensureConnection() {
|
||||
if (this._conn) {
|
||||
return await this._conn.connect();
|
||||
|
@ -117,61 +188,9 @@ class SyncWorker {
|
|||
return await this._conn.connect();
|
||||
}
|
||||
|
||||
async runNewSyncbackRequests() {
|
||||
const {SyncbackRequest, Message} = this._db;
|
||||
const where = {
|
||||
limit: 100,
|
||||
where: {status: "NEW"},
|
||||
order: [['createdAt', 'ASC']],
|
||||
};
|
||||
|
||||
// Make sure we run the tasks that affect IMAP uids last, and that we don't
|
||||
// run 2 tasks that will affect the same set of UIDS together (i.e. without
|
||||
// running a sync loop in between)
|
||||
const tasks = await SyncbackRequest.findAll(where)
|
||||
.map((req) => SyncbackTaskFactory.create(this._account, req))
|
||||
|
||||
async runNewSyncbackTasks() {
|
||||
const tasks = await this._getNewSyncbackTasks()
|
||||
if (tasks.length === 0) { return Promise.resolve() }
|
||||
|
||||
const tasksToProcess = tasks.filter(t => !t.affectsImapMessageUIDs())
|
||||
const tasksAffectingUIDs = tasks.filter(t => t.affectsImapMessageUIDs())
|
||||
|
||||
const changeFolderTasks = tasksAffectingUIDs.filter(t =>
|
||||
t.description() === 'RenameFolder' || t.description() === 'DeleteFolder'
|
||||
)
|
||||
if (changeFolderTasks.length > 0) {
|
||||
// If we are renaming or deleting folders, those are the only tasks we
|
||||
// want to process before executing any other tasks that may change UIDs
|
||||
const affectedFolderIds = new Set()
|
||||
changeFolderTasks.forEach((task) => {
|
||||
const {props: {folderId}} = task.syncbackRequestObject()
|
||||
if (folderId && !affectedFolderIds.has(folderId)) {
|
||||
tasksToProcess.push(task)
|
||||
affectedFolderIds.add(folderId)
|
||||
}
|
||||
})
|
||||
return PromiseUtils.each(tasks, (task) => this.runSyncbackTask(task));
|
||||
}
|
||||
|
||||
// Otherwise, make sure that we don't process more than 1 task that will affect
|
||||
// the UID of the same message
|
||||
const affectedMessageIds = new Set()
|
||||
for (const task of tasksAffectingUIDs) {
|
||||
const {props: {messageId, threadId}} = task.syncbackRequestObject()
|
||||
if (messageId) {
|
||||
if (!affectedMessageIds.has(messageId)) {
|
||||
tasksToProcess.push(task)
|
||||
affectedMessageIds.add(messageId)
|
||||
}
|
||||
} else if (threadId) {
|
||||
const messageIds = await Message.findAll({where: {threadId}}).map(m => m.id)
|
||||
const shouldIncludeTask = messageIds.every(id => !affectedMessageIds.has(id))
|
||||
if (shouldIncludeTask) {
|
||||
tasksToProcess.push(task)
|
||||
messageIds.forEach(id => affectedMessageIds.add(id))
|
||||
}
|
||||
}
|
||||
}
|
||||
return PromiseUtils.each(tasks, (task) => this.runSyncbackTask(task));
|
||||
}
|
||||
|
||||
|
@ -235,7 +254,7 @@ class SyncWorker {
|
|||
try {
|
||||
await this._account.update({syncError: null});
|
||||
await this.ensureConnection();
|
||||
await this.runNewSyncbackRequests();
|
||||
await this.runNewSyncbackTasks();
|
||||
await this._conn.runOperation(new FetchFolderList(this._account, this._logger));
|
||||
await this.syncMessagesInAllFolders();
|
||||
await this.cleanupOrphanMessages();
|
||||
|
|
Loading…
Add table
Reference in a new issue