[client-sync] Convert syncback tasks to interruptible generators

Summary:
Make `run()` functions generators and change most awaits to yields

Part of T7978

Test Plan: specs from D4269, but really needs some heavy QA

Reviewers: evan, juan

Reviewed By: juan

Differential Revision: https://phab.nylas.com/D4270
This commit is contained in:
Halla Moore 2017-03-29 16:02:36 -07:00
parent c11a7ff830
commit 4c1af8f184
16 changed files with 98 additions and 99 deletions

View file

@ -9,12 +9,12 @@ class CreateCategoryIMAP extends SyncbackIMAPTask {
return false
}
async run(db, imap) {
async * _run(db, imap) {
const {accountId} = db
const {objectClass, displayName} = this.syncbackRequestObject().props
await imap.addBox(displayName)
yield imap.addBox(displayName)
const id = db[objectClass].hash({boxName: displayName, accountId})
const category = await db[objectClass].create({
const category = yield db[objectClass].create({
id,
accountId,
name: displayName,

View file

@ -9,17 +9,17 @@ class DeleteFolderIMAP extends SyncbackIMAPTask {
return true
}
async run(db, imap) {
async * _run(db, imap) {
const {folderId} = this.syncbackRequestObject().props
const folder = await db.Folder.findById(folderId)
const folder = yield db.Folder.findById(folderId)
if (!folder) {
// Nothing to delete!
return
}
await imap.delBox(folder.name);
yield imap.delBox(folder.name);
// If IMAP succeeds, save updates to the db
await folder.destroy()
yield folder.destroy()
}
}
module.exports = DeleteFolderIMAP

View file

@ -9,17 +9,17 @@ class DeleteLabelIMAP extends SyncbackIMAPTask {
return false
}
async run(db, imap) {
async * _run(db, imap) {
const {labelId} = this.syncbackRequestObject().props.labelId
const label = await db.Label.findById(labelId)
const label = yield db.Label.findById(labelId)
if (!label) {
// Nothing to delete!
return
}
await imap.delBox(label.name);
yield imap.delBox(label.name);
// If IMAP succeeds, save updates to the db
await label.destroy()
yield label.destroy()
}
}
module.exports = DeleteLabelIMAP

View file

@ -4,13 +4,13 @@ const {SyncbackIMAPTask} = require('./syncback-task')
const SyncTaskFactory = require('../sync-task-factory');
async function deleteGmailSentMessages({db, imap, provider, headerMessageId}) {
async function* deleteGmailSentMessages({db, imap, provider, headerMessageId}) {
if (provider !== 'gmail') { return }
const trash = await db.Folder.find({where: {role: 'trash'}});
const trash = yield db.Folder.find({where: {role: 'trash'}});
if (!trash) { throw new APIError(`Could not find folder with role 'trash'.`) }
const allMail = await db.Folder.find({where: {role: 'all'}});
const allMail = yield db.Folder.find({where: {role: 'all'}});
if (!allMail) { throw new APIError(`Could not find folder with role 'all'.`) }
// Move the message from all mail to trash and then delete it from there
@ -20,29 +20,29 @@ async function deleteGmailSentMessages({db, imap, provider, headerMessageId}) {
]
for (const {folder, deleteFn} of steps) {
const box = await imap.openBox(folder.name);
const uids = await box.search([['HEADER', 'Message-ID', headerMessageId]])
const box = yield imap.openBox(folder.name);
const uids = yield box.search([['HEADER', 'Message-ID', headerMessageId]])
for (const uid of uids) {
await deleteFn(box, uid);
yield deleteFn(box, uid);
}
await box.closeBox();
yield box.closeBox();
}
}
async function saveSentMessage({db, account, syncWorker, logger, imap, provider, customSentMessage, baseMessage}) {
async function* saveSentMessage({db, account, syncWorker, logger, imap, provider, customSentMessage, baseMessage}) {
const {Folder, Label} = db
// Case 1. If non gmail, save the message to the `sent` folder using IMAP
// Only gmail creates a sent message for us, so if we are using any other provider
// we need to save it manually ourselves.
if (provider !== 'gmail') {
const sentFolder = await Folder.find({where: {role: 'sent'}});
const sentFolder = yield Folder.find({where: {role: 'sent'}});
if (!sentFolder) { throw new APIError(`Can't find sent folder - could not save message to sent folder.`) }
const sender = new SendmailClient(account, logger);
const rawMime = await sender.buildMime(baseMessage);
const box = await imap.openBox(sentFolder.name);
await box.append(rawMime, {flags: 'SEEN'});
const rawMime = yield sender.buildMime(baseMessage);
const box = yield imap.openBox(sentFolder.name);
yield box.append(rawMime, {flags: 'SEEN'});
// If IMAP succeeds, fetch any new messages in the sent folder which
// should include the messages we just created there
@ -52,15 +52,15 @@ async function saveSentMessage({db, account, syncWorker, logger, imap, provider,
account,
folder: sentFolder,
})
await syncOperation.run(db, imap, syncWorker)
yield syncOperation.run(db, imap, syncWorker)
return
}
// Showing as sent in gmail means adding the message to all mail and
// adding the sent label
const sentLabel = await Label.find({where: {role: 'sent'}});
const allMailFolder = await Folder.find({where: {role: 'all'}});
const sentLabel = yield Label.find({where: {role: 'sent'}});
const allMailFolder = yield Folder.find({where: {role: 'all'}});
if (!sentLabel || !allMailFolder) {
throw new APIError('Could not save message to sent folder.')
}
@ -72,15 +72,15 @@ async function saveSentMessage({db, account, syncWorker, logger, imap, provider,
// tracking, but we actually /just/ want to show the baseMessage as sent
if (customSentMessage) {
const sender = new SendmailClient(account, logger);
const rawMime = await sender.buildMime(baseMessage);
const box = await imap.openBox(allMailFolder.name);
const rawMime = yield sender.buildMime(baseMessage);
const box = yield imap.openBox(allMailFolder.name);
await box.append(rawMime, {flags: 'SEEN'})
yield box.append(rawMime, {flags: 'SEEN'})
const {headerMessageId} = baseMessage
const uids = await box.search([['HEADER', 'Message-ID', headerMessageId]])
const uids = yield box.search([['HEADER', 'Message-ID', headerMessageId]])
// There should only be one uid in the array
await box.setLabels(uids[0], sentLabel.imapLabelIdentifier());
yield box.setLabels(uids[0], sentLabel.imapLabelIdentifier());
}
// If IMAP succeeds, fetch any new messages in the sent folder which
@ -91,10 +91,10 @@ async function saveSentMessage({db, account, syncWorker, logger, imap, provider,
account,
folder: allMailFolder,
})
await syncOperation.run(db, imap, syncWorker)
yield syncOperation.run(db, imap, syncWorker)
}
async function setThreadingReferences(db, baseMessage) {
async function* setThreadingReferences(db, baseMessage) {
const {Message, Reference} = db
// TODO When the message was created for sending, we set the
// `inReplyToLocalMessageId` if it exists, and we set the temporary properties
@ -103,7 +103,7 @@ async function setThreadingReferences(db, baseMessage) {
// them again because they are necessary for building the correct raw mime
// message to add to the sent folder
// We should clean this up
const replyToMessage = await Message.findById(
const replyToMessage = yield Message.findById(
baseMessage.inReplyToLocalMessageId,
{ include: [{model: Reference, as: 'references', attributes: ['id', 'rfc2822MessageId']}] }
)
@ -136,11 +136,11 @@ class EnsureMessageInSentFolderIMAP extends SyncbackIMAPTask {
return false
}
async run(db, imap, syncWorker) {
async * _run(db, imap, syncWorker) {
const {Message} = db
const {messageId, customSentMessage} = this.syncbackRequestObject().props
const baseMessage = await Message.findById(messageId, {
const baseMessage = yield Message.findById(messageId, {
include: [{model: db.Folder}, {model: db.Label}, {model: db.File}],
});
@ -148,7 +148,7 @@ class EnsureMessageInSentFolderIMAP extends SyncbackIMAPTask {
throw new APIError(`Couldn't find message ${messageId} to stuff in sent folder`, 500)
}
await setThreadingReferences(db, baseMessage)
yield setThreadingReferences(db, baseMessage)
const {provider} = this._account
const {headerMessageId} = baseMessage
@ -162,7 +162,7 @@ class EnsureMessageInSentFolderIMAP extends SyncbackIMAPTask {
// sent messages and clean them up
if (customSentMessage && provider === Provider.Gmail) {
try {
await deleteGmailSentMessages({db, imap, provider, headerMessageId})
yield deleteGmailSentMessages({db, imap, provider, headerMessageId})
} catch (err) {
// Even if this fails, we need to finish attempting to save the
// baseMessage to the sent folder
@ -170,7 +170,7 @@ class EnsureMessageInSentFolderIMAP extends SyncbackIMAPTask {
}
}
await saveSentMessage({db, account: this._account, syncWorker, logger: this._logger, imap, provider, customSentMessage, baseMessage})
yield saveSentMessage({db, account: this._account, syncWorker, logger: this._logger, imap, provider, customSentMessage, baseMessage})
return baseMessage.toJSON()
}
}

View file

@ -11,19 +11,19 @@ class MarkThreadAsRead extends SyncbackIMAPTask {
return false
}
async run(db, imap) {
async * _run(db, imap) {
const {sequelize, Thread} = db
const threadId = this.syncbackRequestObject().props.threadId
if (!threadId) {
throw new APIError('threadId is required')
}
const thread = await Thread.findById(threadId)
const thread = yield Thread.findById(threadId)
if (!thread) {
throw new APIError(`Can't find thread`, 404)
}
const threadMessages = await thread.getMessages()
await IMAPHelpers.forEachFolderOfThread({
const threadMessages = yield thread.getMessages()
yield IMAPHelpers.forEachFolderOfThread({
db,
imap,
threadMessages,
@ -32,7 +32,7 @@ class MarkThreadAsRead extends SyncbackIMAPTask {
},
})
// If IMAP succeeds, save the model updates
await sequelize.transaction(async (transaction) => {
yield sequelize.transaction(async (transaction) => {
await Promise.all(threadMessages.map((m) => m.update({unread: false}, {transaction})))
await thread.update({unreadCount: 0}, {transaction})
})

View file

@ -11,19 +11,19 @@ class MarkThreadAsUnread extends SyncbackIMAPTask {
return false
}
async run(db, imap) {
async * _run(db, imap) {
const {sequelize, Thread} = db
const threadId = this.syncbackRequestObject().props.threadId
if (!threadId) {
throw new APIError('threadId is required')
}
const thread = await Thread.findById(threadId)
const thread = yield Thread.findById(threadId)
if (!thread) {
throw new APIError(`Can't find thread`, 404)
}
const threadMessages = await thread.getMessages()
await IMAPHelpers.forEachFolderOfThread({
const threadMessages = yield thread.getMessages()
yield IMAPHelpers.forEachFolderOfThread({
db,
imap,
threadMessages,
@ -32,7 +32,7 @@ class MarkThreadAsUnread extends SyncbackIMAPTask {
},
})
// If IMAP succeeds, save the model updates
await sequelize.transaction(async (transaction) => {
yield sequelize.transaction(async (transaction) => {
await Promise.all(threadMessages.map((m) => m.update({unread: true}, {transaction})))
await thread.update({unreadCount: threadMessages.length}, {transaction})
})

View file

@ -12,7 +12,7 @@ class MoveThreadToFolderIMAP extends SyncbackIMAPTask {
return true
}
async run(db, imap, syncWorker) {
async * _run(db, imap, syncWorker) {
const {Thread, Folder} = db
const threadId = this.syncbackRequestObject().props.threadId
const targetFolderId = this.syncbackRequestObject().props.folderId
@ -24,18 +24,18 @@ class MoveThreadToFolderIMAP extends SyncbackIMAPTask {
throw new APIError('targetFolderId is required')
}
const targetFolder = await Folder.findById(targetFolderId)
const targetFolder = yield Folder.findById(targetFolderId)
if (!targetFolder) {
throw new APIError('targetFolder not found', 404)
}
const thread = await Thread.findById(threadId)
const thread = yield Thread.findById(threadId)
if (!thread) {
throw new APIError(`Can't find thread`, 404)
}
const threadMessages = await thread.getMessages()
await IMAPHelpers.forEachFolderOfThread({
const threadMessages = yield thread.getMessages()
yield IMAPHelpers.forEachFolderOfThread({
db,
imap,
threadMessages,
@ -55,7 +55,7 @@ class MoveThreadToFolderIMAP extends SyncbackIMAPTask {
account: this._account,
folder: targetFolder,
})
await syncOperation.run(db, imap, syncWorker)
yield syncOperation.run(db, imap, syncWorker)
}
}
module.exports = MoveThreadToFolderIMAP

View file

@ -10,16 +10,16 @@ class RenameFolderIMAP extends SyncbackIMAPTask {
return true
}
async run(db, imap) {
async * _run(db, imap) {
const {sequelize, accountId, Folder} = db
const {folderId, newFolderName} = this.syncbackRequestObject().props.folderId
const oldFolder = await Folder.findById(folderId)
await imap.renameBox(oldFolder.name, newFolderName);
const oldFolder = yield Folder.findById(folderId)
yield imap.renameBox(oldFolder.name, newFolderName);
// After IMAP succeeds, update the db
const newId = Folder.hash({boxName: newFolderName, accountId})
let newFolder;
await sequelize.transaction(async (transaction) => {
yield sequelize.transaction(async (transaction) => {
newFolder = await Folder.create({
id: newId,
accountId,

View file

@ -10,16 +10,16 @@ class RenameLabelIMAP extends SyncbackIMAPTask {
return false
}
async run(db, imap) {
async * _run(db, imap) {
const {sequelize, accountId, Label} = db
const {labelId, newLabelName} = this.syncbackRequestObject().props
const oldLabel = await Label.findById(labelId)
await imap.renameBox(oldLabel.name, newLabelName);
const oldLabel = yield Label.findById(labelId)
yield imap.renameBox(oldLabel.name, newLabelName);
// After IMAP succeeds, update the db
const newId = Label.hash({boxName: newLabelName, accountId})
let newLabel;
await sequelize.transaction(async (transaction) => {
yield sequelize.transaction(async (transaction) => {
newLabel = await Label.create({
id: newId,
accountId,

View file

@ -23,22 +23,22 @@ class SendMessagePerRecipientSMTP extends SyncbackSMTPTask {
return `SendMessagePerRecipient`;
}
async run(db, smtp) {
async * _run(db, smtp) {
const syncbackRequest = this.syncbackRequestObject()
const {
messagePayload,
usesOpenTracking,
usesLinkTracking,
} = syncbackRequest.props;
const baseMessage = await MessageFactory.buildForSend(db, messagePayload)
const baseMessage = yield MessageFactory.buildForSend(db, messagePayload)
await syncbackRequest.update({
yield syncbackRequest.update({
status: 'INPROGRESS-NOTRETRYABLE',
})
let sendResult;
try {
sendResult = await this._sendPerRecipient({
sendResult = yield this._sendPerRecipient({
db, smtp, baseMessage, logger: this._logger, usesOpenTracking, usesLinkTracking})
} catch (err) {
throw new APIError('SendMessagePerRecipient: Sending failed for all recipients', 500);
@ -60,7 +60,7 @@ class SendMessagePerRecipientSMTP extends SyncbackSMTPTask {
// be updated, and we can guarantee this because we control message
// id generation. The thread will be created or updated when we
// detect this message in the sync loop
await baseMessage.save()
yield baseMessage.save()
return {
message: baseMessage.toJSON(),

View file

@ -15,10 +15,10 @@ class SendMessageSMTP extends SyncbackSMTPTask {
return `SendMessage`;
}
async run(db, smtp) {
async * _run(db, smtp) {
const syncbackRequest = this.syncbackRequestObject()
const {messagePayload} = syncbackRequest.props
const message = await MessageFactory.buildForSend(db, messagePayload);
const message = yield MessageFactory.buildForSend(db, messagePayload);
await syncbackRequest.update({
status: 'INPROGRESS-NOTRETRYABLE',

View file

@ -14,7 +14,7 @@ class SetThreadFolderAndLabelsIMAP extends SyncbackIMAPTask {
}
async run(db, imap, syncWorker) {
async * _run(db, imap, syncWorker) {
const {Thread, Folder} = db
const threadId = this.syncbackRequestObject().props.threadId
const labelIds = this.syncbackRequestObject().props.labelIds
@ -27,18 +27,18 @@ class SetThreadFolderAndLabelsIMAP extends SyncbackIMAPTask {
throw new APIError('targetFolderId is required')
}
const targetFolder = await Folder.findById(targetFolderId)
const targetFolder = yield Folder.findById(targetFolderId)
if (!targetFolder) {
throw new APIError('targetFolder not found', 404)
}
const thread = await Thread.findById(threadId)
const thread = yield Thread.findById(threadId)
if (!thread) {
throw new APIError(`Can't find thread`, 404)
}
const threadMessages = await thread.getMessages()
await IMAPHelpers.forEachFolderOfThread({
const threadMessages = yield thread.getMessages()
yield IMAPHelpers.forEachFolderOfThread({
db,
imap,
threadMessages,
@ -60,8 +60,7 @@ class SetThreadFolderAndLabelsIMAP extends SyncbackIMAPTask {
account: this._account,
folder: targetFolder,
})
await syncOperation.run(db, imap, syncWorker)
yield syncOperation.run(db, imap, syncWorker)
}
}
module.exports = SetThreadFolderAndLabelsIMAP

View file

@ -11,7 +11,7 @@ class SetThreadLabelsIMAP extends SyncbackIMAPTask {
return false
}
async run(db, imap) {
async * _run(db, imap) {
const {sequelize, Thread} = db
const threadId = this.syncbackRequestObject().props.threadId
const labelIds = this.syncbackRequestObject().props.labelIds
@ -19,13 +19,13 @@ class SetThreadLabelsIMAP extends SyncbackIMAPTask {
throw new APIError('threadId is required')
}
const thread = await Thread.findById(threadId)
const thread = yield Thread.findById(threadId)
if (!thread) {
throw new APIError(`Can't find thread`, 404)
}
const threadMessages = await thread.getMessages()
await IMAPHelpers.forEachFolderOfThread({
const threadMessages = yield thread.getMessages()
yield IMAPHelpers.forEachFolderOfThread({
db,
imap,
threadMessages,
@ -35,7 +35,7 @@ class SetThreadLabelsIMAP extends SyncbackIMAPTask {
})
// If IMAP succeeds, save the model updates
await sequelize.transaction(async (transaction) => {
yield sequelize.transaction(async (transaction) => {
await Promise.all(threadMessages.map(async (m) => m.setLabels(labelIds, {transaction})))
await thread.setLabels(labelIds, {transaction})
})

View file

@ -11,19 +11,19 @@ class StarThread extends SyncbackIMAPTask {
return false
}
async run(db, imap) {
async * _run(db, imap) {
const {sequelize, Thread} = db
const threadId = this.syncbackRequestObject().props.threadId
if (!threadId) {
throw new APIError('threadId is required')
}
const thread = await Thread.findById(threadId)
const thread = yield Thread.findById(threadId)
if (!thread) {
throw new APIError(`Can't find thread`, 404)
}
const threadMessages = await thread.getMessages()
await IMAPHelpers.forEachFolderOfThread({
const threadMessages = yield thread.getMessages()
yield IMAPHelpers.forEachFolderOfThread({
db,
imap,
threadMessages,
@ -32,7 +32,7 @@ class StarThread extends SyncbackIMAPTask {
},
})
// If IMAP succeeds, save the model updates
await sequelize.transaction(async (transaction) => {
yield sequelize.transaction(async (transaction) => {
await Promise.all(threadMessages.map((m) => m.update({starred: true}, {transaction})))
await thread.update({starredCount: threadMessages.length}, {transaction})
})

View file

@ -14,7 +14,7 @@ class SyncUnknownUIDs extends SyncbackIMAPTask {
return false;
}
async run(db, imap, syncWorker) {
async * _run(db, imap, syncWorker) {
this._db = db;
const {Folder} = db
const {uids, folderId} = this.syncbackRequestObject().props;
@ -26,14 +26,14 @@ class SyncUnknownUIDs extends SyncbackIMAPTask {
throw new APIError('folderId is required');
}
await this.syncbackRequestObject().update({status: "INPROGRESS-NOTRETRYABLE"});
yield this.syncbackRequestObject().update({status: "INPROGRESS-NOTRETRYABLE"});
const folder = await Folder.findById(folderId);
const folder = yield Folder.findById(folderId);
if (!folder) {
throw new APIError('folder not found', 404);
}
if (await this._isCancelled()) {
if (yield this._isCancelled()) {
return;
}
@ -47,10 +47,10 @@ class SyncUnknownUIDs extends SyncbackIMAPTask {
uids: uidsToSync,
});
this._syncOperation.on('message-processed', () => this.onMessageProcessed());
await this._syncOperation.run(db, imap, syncWorker)
yield this._syncOperation.run(db, imap, syncWorker)
this._syncOperation.removeAllListeners('message-processed');
if (await this._isCancelled()) {
if (yield this._isCancelled()) {
return;
}
@ -64,7 +64,7 @@ class SyncUnknownUIDs extends SyncbackIMAPTask {
// We do this style of chained syncback tasks so that we don't block the
// sync loop for too long.
if (remainingUids.length > 0) {
await db.SyncbackRequest.create({
yield db.SyncbackRequest.create({
type: "SyncUnknownUIDs",
props: {folderId, uids: remainingUids},
accountId: this.syncbackRequestObject().accountId,

View file

@ -11,19 +11,19 @@ class UnstarThread extends SyncbackIMAPTask {
return false
}
async run(db, imap) {
async * _run(db, imap) {
const {sequelize, Thread} = db
const threadId = this.syncbackRequestObject().props.threadId
if (!threadId) {
throw new APIError('threadId is required')
}
const thread = await Thread.findById(threadId)
const thread = yield Thread.findById(threadId)
if (!thread) {
throw new APIError(`Can't find thread`, 404)
}
const threadMessages = await thread.getMessages()
await IMAPHelpers.forEachFolderOfThread({
const threadMessages = yield thread.getMessages()
yield IMAPHelpers.forEachFolderOfThread({
db,
imap,
threadMessages,
@ -32,7 +32,7 @@ class UnstarThread extends SyncbackIMAPTask {
},
})
// If IMAP succeeds, save the model updates
await sequelize.transaction(async (transaction) => {
yield sequelize.transaction(async (transaction) => {
await Promise.all(threadMessages.map((m) => m.update({starred: false}, {transaction})))
await thread.update({starredCount: 0}, {transaction})
})