2016-06-28 07:01:21 +08:00
|
|
|
const {DatabaseConnector, NylasError} = require(`nylas-core`)
|
2016-06-22 05:58:20 +08:00
|
|
|
const {processors} = require('./processors')
|
2016-06-21 05:57:54 +08:00
|
|
|
|
2016-06-24 06:52:45 +08:00
|
|
|
global.Promise = require('bluebird');
|
2016-06-28 07:01:21 +08:00
|
|
|
global.NylasError = NylasError;
|
2016-06-24 06:52:45 +08:00
|
|
|
|
2016-06-23 01:59:22 +08:00
|
|
|
// List of the attributes of Message that the processor should be allowed to change.
|
2016-06-22 05:58:20 +08:00
|
|
|
// The message may move between folders, get starred, etc. while it's being
|
|
|
|
// processed, and it shouldn't overwrite changes to those fields.
|
2016-06-28 07:38:40 +08:00
|
|
|
const MessageAttributes = ['body', 'processed', 'to', 'from', 'cc', 'bcc', 'snippet']
|
2016-06-22 05:58:20 +08:00
|
|
|
const MessageProcessorVersion = 1;
|
2016-06-21 05:57:54 +08:00
|
|
|
|
2016-06-28 05:52:05 +08:00
|
|
|
function runPipeline({db, accountId, message}) {
|
2016-06-23 01:59:22 +08:00
|
|
|
return processors.reduce((prevPromise, processor) => (
|
2016-06-28 07:05:31 +08:00
|
|
|
prevPromise.then((prevMessage) => {
|
|
|
|
const processed = processor({message: prevMessage, accountId, db});
|
|
|
|
if (!(processed instanceof Promise)) {
|
|
|
|
throw new Error(`processor ${processor} did not return a promise.`)
|
|
|
|
}
|
|
|
|
return processed.then((nextMessage) => {
|
|
|
|
if (!nextMessage.body) {
|
|
|
|
throw new Error("processor did not resolve with a valid message object.")
|
|
|
|
}
|
|
|
|
return Promise.resolve(nextMessage);
|
|
|
|
})
|
|
|
|
})
|
2016-06-23 01:59:22 +08:00
|
|
|
), Promise.resolve(message))
|
|
|
|
}
|
|
|
|
|
|
|
|
function saveMessage(message) {
|
|
|
|
message.processed = MessageProcessorVersion;
|
|
|
|
return message.save({
|
|
|
|
fields: MessageAttributes,
|
|
|
|
});
|
2016-06-21 05:57:54 +08:00
|
|
|
}
|
|
|
|
|
2016-06-22 05:58:20 +08:00
|
|
|
function processMessage({messageId, accountId}) {
|
2016-06-23 15:49:22 +08:00
|
|
|
DatabaseConnector.forAccount(accountId)
|
2016-06-28 05:52:05 +08:00
|
|
|
.then((db) => {
|
|
|
|
const {Message} = db
|
2016-06-23 01:59:22 +08:00
|
|
|
Message.find({where: {id: messageId}}).then((message) =>
|
2016-06-28 05:52:05 +08:00
|
|
|
runPipeline({db, accountId, message})
|
2016-06-23 01:59:22 +08:00
|
|
|
.then((processedMessage) => saveMessage(processedMessage))
|
2016-06-22 05:58:20 +08:00
|
|
|
.catch((err) =>
|
2016-06-28 07:05:31 +08:00
|
|
|
console.error(`MessageProcessor Failed: ${err} ${err.stack}`)
|
2016-06-22 05:58:20 +08:00
|
|
|
)
|
|
|
|
)
|
|
|
|
.catch((err) =>
|
|
|
|
console.error(`MessageProcessor: Couldn't find message id ${messageId} in accountId: ${accountId}: ${err}`)
|
|
|
|
)
|
2016-06-28 05:52:05 +08:00
|
|
|
})
|
2016-06-21 05:57:54 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
module.exports = {
|
|
|
|
processMessage,
|
|
|
|
}
|