[local-sync] syncback(Part 3): Fixup runSyncbackTasks

Summary:
Now that we don't run Send tasks outside the sync loop, we don't need
that awful hack wich required passing a `runTask` callback to
`runSyncbackTask` in order to customize how to run the task.

Instead, runSyncbackTask now knows 2 ways to run a task, either via imap, or
via smtp, depending on the resource declared by task to run. So now
SyncbackTasks declare a resource type they need to run, and that will be
passed as their second argument when running.

Depends D3894

Test Plan: manual

Reviewers: mark, halla, spang, evan

Reviewed By: halla, spang, evan

Differential Revision: https://phab.nylas.com/D3896
This commit is contained in:
Juan Tejada 2017-02-11 02:46:56 -08:00
parent c1ecd045d7
commit 4e85993957
25 changed files with 147 additions and 123 deletions

View file

@ -1,7 +1,8 @@
const _ = require('underscore') const _ = require('underscore')
const { const {
IMAPConnection,
IMAPErrors, IMAPErrors,
IMAPConnection,
SendmailClient,
} = require('isomorphic-core'); } = require('isomorphic-core');
const { const {
Actions, Actions,
@ -32,11 +33,7 @@ class SyncWorker {
this._interruptible = new Interruptible() this._interruptible = new Interruptible()
this._localDeltas = new LocalSyncDeltaEmitter(db, account.id) this._localDeltas = new LocalSyncDeltaEmitter(db, account.id)
this._logger = global.Logger.forAccount(account) this._logger = global.Logger.forAccount(account)
this._syncbackTaskRunner = new SyncbackTaskRunner({ this._smtp = new SendmailClient(this._account, this._logger)
db,
account,
logger: this._logger,
})
this._startTime = Date.now() this._startTime = Date.now()
this._lastSyncTime = null this._lastSyncTime = null
@ -420,8 +417,16 @@ class SyncWorker {
yield this._ensureConnection(); yield this._ensureConnection();
yield this._ensureMailListenerConnection(); yield this._ensureMailListenerConnection();
const syncbackTaskRunner = new SyncbackTaskRunner({
db: this._db,
imap: this._conn,
smtp: this._smtp,
logger: this._logger,
account: this._account,
})
// Step 1: Mark all "INPROGRESS" tasks as failed. // Step 1: Mark all "INPROGRESS" tasks as failed.
await this._syncbackTaskRunner.markInProgressTasksAsFailed() await syncbackTaskRunner.markInProgressTasksAsFailed()
yield // Yield to allow interruption yield // Yield to allow interruption
// Step 2: Run any available syncback tasks // Step 2: Run any available syncback tasks
@ -431,13 +436,10 @@ class SyncWorker {
// (e.g. marking as unread or starred). We need to listen to that event for // (e.g. marking as unread or starred). We need to listen to that event for
// when updates are performed from another mail client, but ignore // when updates are performed from another mail client, but ignore
// them when they are caused from within N1 to prevent unecessary interrupts // them when they are caused from within N1 to prevent unecessary interrupts
const tasks = yield this._syncbackTaskRunner.getNewSyncbackTasks() const tasks = yield syncbackTaskRunner.getNewSyncbackTasks()
this._shouldIgnoreInboxFlagUpdates = true this._shouldIgnoreInboxFlagUpdates = true
for (const task of tasks) { for (const task of tasks) {
const {shouldRetry} = await this._syncbackTaskRunner.runSyncbackTask({ const {shouldRetry} = await syncbackTaskRunner.runSyncbackTask(task)
task,
runTask: (t) => this._conn.runOperation(t),
})
if (shouldRetry) { if (shouldRetry) {
this.syncNow({reason: 'Retrying syncback task', interrupt: true}); this.syncNow({reason: 'Retrying syncback task', interrupt: true});
} }

View file

@ -1,6 +1,7 @@
const {Actions} = require('nylas-exports') import {Actions} from 'nylas-exports'
const {IMAPErrors} = require('isomorphic-core') import {IMAPErrors} from 'isomorphic-core'
const SyncbackTaskFactory = require('./syncback-task-factory'); import SyncbackTask from './syncback-tasks/syncback-task'
import SyncbackTaskFactory from './syncback-task-factory';
const MAX_TASK_RETRIES = 2 const MAX_TASK_RETRIES = 2
@ -12,7 +13,7 @@ const SendTaskTypes = [
class SyncbackTaskRunner { class SyncbackTaskRunner {
constructor({db, account, logger} = {}) { constructor({db, account, logger, imap, smtp} = {}) {
if (!db) { if (!db) {
throw new Error('SyncbackTaskRunner: need to pass db') throw new Error('SyncbackTaskRunner: need to pass db')
} }
@ -22,9 +23,17 @@ class SyncbackTaskRunner {
if (!logger) { if (!logger) {
throw new Error('SyncbackTaskRunner: need to pass logger') throw new Error('SyncbackTaskRunner: need to pass logger')
} }
if (!imap) {
throw new Error('SyncbackTaskRunner: need to pass imap')
}
if (!smtp) {
throw new Error('SyncbackTaskRunner: need to pass smtp')
}
this._db = db this._db = db
this._account = account this._account = account
this._logger = logger this._logger = logger
this._imap = imap
this._smtp = smtp
} }
/** /**
@ -125,8 +134,10 @@ class SyncbackTaskRunner {
} }
} }
// TODO JUAN! remove this uglyness that is runTask async runSyncbackTask(task) {
async runSyncbackTask({task, runTask} = {}) { if (!task || !(task instanceof SyncbackTask)) {
throw new Error('runSyncbackTask: must pass a SyncbackTask')
}
const before = new Date(); const before = new Date();
const syncbackRequest = task.syncbackRequestObject(); const syncbackRequest = task.syncbackRequestObject();
let shouldRetry = false let shouldRetry = false
@ -138,11 +149,18 @@ class SyncbackTaskRunner {
syncbackRequest.status = "INPROGRESS"; syncbackRequest.status = "INPROGRESS";
await syncbackRequest.save(); await syncbackRequest.save();
// TODO `runTask` is a hack to allow tasks to be executed outside the const resource = task.resource()
// context of an imap connection, specifically to allow running send tasks let responseJSON;
// outside of the sync loop. This should be solved in a better way or switch (resource) {
// probably refactored when we implement the sync scheduler case 'imap':
const responseJSON = await runTask(task) responseJSON = await this._imap.runOperation(task)
break;
case 'smtp':
responseJSON = await task.run(this._db, this._smtp)
break;
default:
throw new Error(`runSyncbackTask: unknown resource. Must be one of ['imap', 'smtp']`)
}
syncbackRequest.status = "SUCCEEDED"; syncbackRequest.status = "SUCCEEDED";
syncbackRequest.responseJSON = responseJSON || {}; syncbackRequest.responseJSON = responseJSON || {};

View file

@ -1,6 +1,6 @@
const SyncbackTask = require('./syncback-task') const {SyncbackIMAPTask} = require('./syncback-task')
class CreateCategoryIMAP extends SyncbackTask { class CreateCategoryIMAP extends SyncbackIMAPTask {
description() { description() {
return `CreateCategory`; return `CreateCategory`;
} }

View file

@ -1,6 +1,6 @@
const SyncbackTask = require('./syncback-task') const {SyncbackIMAPTask} = require('./syncback-task')
class DeleteFolderIMAP extends SyncbackTask { class DeleteFolderIMAP extends SyncbackIMAPTask {
description() { description() {
return `DeleteFolder`; return `DeleteFolder`;
} }

View file

@ -1,6 +1,6 @@
const SyncbackTask = require('./syncback-task') const {SyncbackIMAPTask} = require('./syncback-task')
class DeleteLabelIMAP extends SyncbackTask { class DeleteLabelIMAP extends SyncbackIMAPTask {
description() { description() {
return `DeleteLabel`; return `DeleteLabel`;
} }

View file

@ -1,7 +1,7 @@
const SyncbackTask = require('./syncback-task') const {SyncbackIMAPTask} = require('./syncback-task')
const IMAPHelpers = require('../imap-helpers') const IMAPHelpers = require('../imap-helpers')
class DeleteMessageIMAP extends SyncbackTask { class DeleteMessageIMAP extends SyncbackIMAPTask {
description() { description() {
return `DeleteMessage`; return `DeleteMessage`;
} }

View file

@ -1,5 +1,5 @@
const {SendmailClient, Provider, Errors: {APIError}} = require('isomorphic-core') const {SendmailClient, Provider, Errors: {APIError}} = require('isomorphic-core')
const SyncbackTask = require('./syncback-task') const {SyncbackIMAPTask} = require('./syncback-task')
const SyncTaskFactory = require('../sync-task-factory'); const SyncTaskFactory = require('../sync-task-factory');
const {getReplyHeaders} = require('../../shared/message-factory') const {getReplyHeaders} = require('../../shared/message-factory')
@ -126,7 +126,7 @@ async function setThreadingReferences(db, baseMessage) {
* automatically created (keyed by the same Meassage-Id header we set), * automatically created (keyed by the same Meassage-Id header we set),
* then stuff a copy of the original message in the sent folder. * then stuff a copy of the original message in the sent folder.
*/ */
class EnsureMessageInSentFolderIMAP extends SyncbackTask { class EnsureMessageInSentFolderIMAP extends SyncbackIMAPTask {
description() { description() {
return `EnsureMessageInSentFolder`; return `EnsureMessageInSentFolder`;
} }

View file

@ -1,7 +1,7 @@
const SyncbackTask = require('./syncback-task') const {SyncbackIMAPTask} = require('./syncback-task')
const IMAPHelpers = require('../imap-helpers') const IMAPHelpers = require('../imap-helpers')
class MarkMessageAsReadIMAP extends SyncbackTask { class MarkMessageAsReadIMAP extends SyncbackIMAPTask {
description() { description() {
return `MarkMessageAsRead`; return `MarkMessageAsRead`;
} }

View file

@ -1,7 +1,7 @@
const SyncbackTask = require('./syncback-task') const {SyncbackIMAPTask} = require('./syncback-task')
const IMAPHelpers = require('../imap-helpers') const IMAPHelpers = require('../imap-helpers')
class MarkMessageAsUnreadIMAP extends SyncbackTask { class MarkMessageAsUnreadIMAP extends SyncbackIMAPTask {
description() { description() {
return `MarkMessageAsUnread`; return `MarkMessageAsUnread`;
} }

View file

@ -1,8 +1,8 @@
const {Errors: {APIError}} = require('isomorphic-core') const {Errors: {APIError}} = require('isomorphic-core')
const SyncbackTask = require('./syncback-task') const {SyncbackIMAPTask} = require('./syncback-task')
const IMAPHelpers = require('../imap-helpers') const IMAPHelpers = require('../imap-helpers')
class MarkThreadAsRead extends SyncbackTask { class MarkThreadAsRead extends SyncbackIMAPTask {
description() { description() {
return `MarkThreadAsRead`; return `MarkThreadAsRead`;
} }

View file

@ -1,8 +1,8 @@
const {Errors: {APIError}} = require('isomorphic-core') const {Errors: {APIError}} = require('isomorphic-core')
const SyncbackTask = require('./syncback-task') const {SyncbackIMAPTask} = require('./syncback-task')
const IMAPHelpers = require('../imap-helpers') const IMAPHelpers = require('../imap-helpers')
class MarkThreadAsUnread extends SyncbackTask { class MarkThreadAsUnread extends SyncbackIMAPTask {
description() { description() {
return `MarkThreadAsUnread`; return `MarkThreadAsUnread`;
} }

View file

@ -1,8 +1,8 @@
const {Errors: {APIError}} = require('isomorphic-core') const {Errors: {APIError}} = require('isomorphic-core')
const SyncbackTask = require('./syncback-task') const {SyncbackIMAPTask} = require('./syncback-task')
const IMAPHelpers = require('../imap-helpers') const IMAPHelpers = require('../imap-helpers')
class MoveMessageToFolderIMAP extends SyncbackTask { class MoveMessageToFolderIMAP extends SyncbackIMAPTask {
description() { description() {
return `MoveMessageToFolder`; return `MoveMessageToFolder`;
} }

View file

@ -1,9 +1,9 @@
const {Errors: {APIError}} = require('isomorphic-core') const {Errors: {APIError}} = require('isomorphic-core')
const SyncbackTask = require('./syncback-task') const {SyncbackIMAPTask} = require('./syncback-task')
const IMAPHelpers = require('../imap-helpers') const IMAPHelpers = require('../imap-helpers')
const SyncTaskFactory = require('../sync-task-factory'); const SyncTaskFactory = require('../sync-task-factory');
class MoveThreadToFolderIMAP extends SyncbackTask { class MoveThreadToFolderIMAP extends SyncbackIMAPTask {
description() { description() {
return `MoveThreadToFolder`; return `MoveThreadToFolder`;
} }

View file

@ -1,7 +1,7 @@
const {Errors: {APIError}} = require('isomorphic-core') const {Errors: {APIError}} = require('isomorphic-core')
const SyncbackTask = require('./syncback-task') const {SyncbackIMAPTask} = require('./syncback-task')
class RenameFolderIMAP extends SyncbackTask { class RenameFolderIMAP extends SyncbackIMAPTask {
description() { description() {
return `RenameFolder`; return `RenameFolder`;
} }

View file

@ -1,7 +1,7 @@
const {Errors: {APIError}} = require('isomorphic-core') const {Errors: {APIError}} = require('isomorphic-core')
const SyncbackTask = require('./syncback-task') const {SyncbackIMAPTask} = require('./syncback-task')
class RenameLabelIMAP extends SyncbackTask { class RenameLabelIMAP extends SyncbackIMAPTask {
description() { description() {
return `RenameLabel`; return `RenameLabel`;
} }

View file

@ -1,46 +1,9 @@
const {SendmailClient, Errors: {APIError}} = require('isomorphic-core') const {Errors: {APIError}} = require('isomorphic-core')
const Utils = require('../../shared/utils') const Utils = require('../../shared/utils')
const SyncbackTask = require('./syncback-task') const {SyncbackSMTPTask} = require('./syncback-task')
const MessageFactory = require('../../shared/message-factory') const MessageFactory = require('../../shared/message-factory')
async function sendPerRecipient({db, account, baseMessage, usesOpenTracking, usesLinkTracking, logger = console} = {}) {
const {Message} = db
const recipients = baseMessage.getRecipients()
const failedRecipients = []
for (const recipient of recipients) {
const customBody = MessageFactory.buildTrackingBodyForRecipient({
recipient,
baseMessage,
usesOpenTracking,
usesLinkTracking,
})
const individualizedMessage = Utils.copyModel(Message, baseMessage, {
body: customBody,
})
// TODO we set these temporary properties which aren't stored in the
// database model because SendmailClient requires them to send the message
// with the correct headers.
// This should be cleaned up
individualizedMessage.references = baseMessage.references;
individualizedMessage.inReplyTo = baseMessage.inReplyTo;
try {
const sender = new SendmailClient(account, logger);
await sender.sendCustom(individualizedMessage, {to: [recipient]})
} catch (error) {
logger.error(error, {recipient: recipient.email}, 'SendMessagePerRecipient: Failed to send to recipient');
failedRecipients.push(recipient.email)
}
}
if (failedRecipients.length === recipients.length) {
throw new APIError('SendMessagePerRecipient: Sending failed for all recipients', 500);
}
return {failedRecipients}
}
/** /**
* This enables customized link and open tracking on a per-recipient basis * This enables customized link and open tracking on a per-recipient basis
* by delivering several messages to each recipient. * by delivering several messages to each recipient.
@ -56,26 +19,21 @@ async function sendPerRecipient({db, account, baseMessage, usesOpenTracking, use
* up in the sent folder and only a single message shows up in the sent * up in the sent folder and only a single message shows up in the sent
* folder. * folder.
*/ */
class SendMessagePerRecipientSMTP extends SyncbackTask { class SendMessagePerRecipientSMTP extends SyncbackSMTPTask {
description() { description() {
return `SendMessagePerRecipient`; return `SendMessagePerRecipient`;
} }
affectsImapMessageUIDs() { async run(db, smtp) {
return false
}
async run(db) {
const { const {
messagePayload, messagePayload,
usesOpenTracking, usesOpenTracking,
usesLinkTracking, usesLinkTracking,
} = this.syncbackRequestObject().props } = this.syncbackRequestObject().props
const account = this._account
const baseMessage = await MessageFactory.buildForSend(db, messagePayload) const baseMessage = await MessageFactory.buildForSend(db, messagePayload)
const sendResult = await sendPerRecipient({db, account, baseMessage, usesOpenTracking, usesLinkTracking}) const sendResult = await this._sendPerRecipient({db, smtp, baseMessage, usesOpenTracking, usesLinkTracking})
/** /**
* Once messages have actually been delivered, we need to be very * Once messages have actually been delivered, we need to be very
@ -106,6 +64,42 @@ class SendMessagePerRecipientSMTP extends SyncbackTask {
return {message: {}, failedRecipients: []} return {message: {}, failedRecipients: []}
} }
} }
async _sendPerRecipient({db, smtp, baseMessage, usesOpenTracking, usesLinkTracking} = {}) {
const {Message} = db
const recipients = baseMessage.getRecipients()
const failedRecipients = []
for (const recipient of recipients) {
const customBody = MessageFactory.buildTrackingBodyForRecipient({
recipient,
baseMessage,
usesOpenTracking,
usesLinkTracking,
})
const individualizedMessage = Utils.copyModel(Message, baseMessage, {
body: customBody,
})
// TODO we set these temporary properties which aren't stored in the
// database model because SendmailClient requires them to send the message
// with the correct headers.
// This should be cleaned up
individualizedMessage.references = baseMessage.references;
individualizedMessage.inReplyTo = baseMessage.inReplyTo;
try {
await smtp.sendCustom(individualizedMessage, {to: [recipient]})
} catch (error) {
this._logger.error(error, {recipient: recipient.email}, 'SendMessagePerRecipient: Failed to send to recipient');
failedRecipients.push(recipient.email)
}
}
if (failedRecipients.length === recipients.length) {
throw new APIError('SendMessagePerRecipient: Sending failed for all recipients', 500);
}
return {failedRecipients}
}
} }
module.exports = SendMessagePerRecipientSMTP; module.exports = SendMessagePerRecipientSMTP;

View file

@ -1,6 +1,5 @@
const {SendmailClient} = require('isomorphic-core')
const SyncbackTask = require('../syncback-tasks/syncback-task')
const MessageFactory = require('../../shared/message-factory') const MessageFactory = require('../../shared/message-factory')
const {SyncbackSMTPTask} = require('../syncback-tasks/syncback-task')
/** /**
* This sets up the actual delivery of a message. * This sets up the actual delivery of a message.
@ -11,29 +10,23 @@ const MessageFactory = require('../../shared/message-factory')
* We later get EnsureMessageInSentFolder queued to ensure the newly * We later get EnsureMessageInSentFolder queued to ensure the newly
* delivered message shows up in the sent folder. * delivered message shows up in the sent folder.
*/ */
class SendMessageSMTP extends SyncbackTask { class SendMessageSMTP extends SyncbackSMTPTask {
description() { description() {
return `SendMessage`; return `SendMessage`;
} }
affectsImapMessageUIDs() { async run(db, smtp) {
return false
}
async run(db) {
const {messagePayload} = this.syncbackRequestObject().props const {messagePayload} = this.syncbackRequestObject().props
const message = await MessageFactory.buildForSend(db, messagePayload); const message = await MessageFactory.buildForSend(db, messagePayload);
const logger = global.Logger.forAccount(this._account); await smtp.send(message);
const sender = new SendmailClient(this._account, logger);
await sender.send(message);
try { try {
message.setIsSent(true) message.setIsSent(true)
await message.save(); await message.save();
return {message: message.toJSON()} return {message: message.toJSON()}
} catch (err) { } catch (err) {
logger.error(err, "SendMessage: Failed to save the message to the local sync database after it was successfully delivered") this._logger.error(err, "SendMessage: Failed to save the message to the local sync database after it was successfully delivered")
return {message: {}} return {message: {}}
} }
} }

View file

@ -1,7 +1,7 @@
const SyncbackTask = require('./syncback-task') const {SyncbackIMAPTask} = require('./syncback-task')
const IMAPHelpers = require('../imap-helpers') const IMAPHelpers = require('../imap-helpers')
class SetMessageLabelsIMAP extends SyncbackTask { class SetMessageLabelsIMAP extends SyncbackIMAPTask {
description() { description() {
return `SetMessageLabels`; return `SetMessageLabels`;
} }

View file

@ -1,10 +1,10 @@
const {Errors: {APIError}} = require('isomorphic-core') const {Errors: {APIError}} = require('isomorphic-core')
const SyncbackTask = require('./syncback-task') const {SyncbackIMAPTask} = require('./syncback-task')
const IMAPHelpers = require('../imap-helpers') const IMAPHelpers = require('../imap-helpers')
const SyncTaskFactory = require('../sync-task-factory'); const SyncTaskFactory = require('../sync-task-factory');
class SetThreadFolderAndLabelsIMAP extends SyncbackTask { class SetThreadFolderAndLabelsIMAP extends SyncbackIMAPTask {
description() { description() {
return `SetThreadFolderAndLabels`; return `SetThreadFolderAndLabels`;
} }

View file

@ -1,8 +1,8 @@
const {Errors: {APIError}} = require('isomorphic-core') const {Errors: {APIError}} = require('isomorphic-core')
const SyncbackTask = require('./syncback-task') const {SyncbackIMAPTask} = require('./syncback-task')
const IMAPHelpers = require('../imap-helpers') const IMAPHelpers = require('../imap-helpers')
class SetThreadLabelsIMAP extends SyncbackTask { class SetThreadLabelsIMAP extends SyncbackIMAPTask {
description() { description() {
return `SetThreadLabels`; return `SetThreadLabels`;
} }

View file

@ -1,7 +1,7 @@
const SyncbackTask = require('./syncback-task') const {SyncbackIMAPTask} = require('./syncback-task')
const IMAPHelpers = require('../imap-helpers') const IMAPHelpers = require('../imap-helpers')
class StarMessageIMAP extends SyncbackTask { class StarMessageIMAP extends SyncbackIMAPTask {
description() { description() {
return `StarMessage`; return `StarMessage`;
} }

View file

@ -1,8 +1,8 @@
const {Errors: {APIError}} = require('isomorphic-core') const {Errors: {APIError}} = require('isomorphic-core')
const SyncbackTask = require('./syncback-task') const {SyncbackIMAPTask} = require('./syncback-task')
const IMAPHelpers = require('../imap-helpers') const IMAPHelpers = require('../imap-helpers')
class StarThread extends SyncbackTask { class StarThread extends SyncbackIMAPTask {
description() { description() {
return `StarThread`; return `StarThread`;
} }

View file

@ -19,6 +19,10 @@ class SyncbackTask {
throw new Error("Must return a description") throw new Error("Must return a description")
} }
resource() {
throw new Error("Must return a resource. Must be one of ['imap', 'smtp']")
}
affectsImapMessageUIDs() { affectsImapMessageUIDs() {
throw new Error("Must implement `affectsImapMessageUIDs`") throw new Error("Must implement `affectsImapMessageUIDs`")
} }
@ -27,4 +31,17 @@ class SyncbackTask {
throw new Error("Must implement a run method") throw new Error("Must implement a run method")
} }
} }
module.exports = SyncbackTask
export class SyncbackIMAPTask extends SyncbackTask {
resource() {
return 'imap'
}
}
export class SyncbackSMTPTask extends SyncbackTask {
resource() {
return 'smtp'
}
}
export default SyncbackTask

View file

@ -1,7 +1,7 @@
const SyncbackTask = require('./syncback-task') const {SyncbackIMAPTask} = require('./syncback-task')
const IMAPHelpers = require('../imap-helpers') const IMAPHelpers = require('../imap-helpers')
class UnstarMessageIMAP extends SyncbackTask { class UnstarMessageIMAP extends SyncbackIMAPTask {
description() { description() {
return `UnstarMessage`; return `UnstarMessage`;
} }

View file

@ -1,8 +1,8 @@
const {Errors: {APIError}} = require('isomorphic-core') const {Errors: {APIError}} = require('isomorphic-core')
const SyncbackTask = require('./syncback-task') const {SyncbackIMAPTask} = require('./syncback-task')
const IMAPHelpers = require('../imap-helpers') const IMAPHelpers = require('../imap-helpers')
class UnstarThread extends SyncbackTask { class UnstarThread extends SyncbackIMAPTask {
description() { description() {
return `UnstarThread`; return `UnstarThread`;
} }