2016-07-09 08:13:30 +08:00
|
|
|
const {PubsubConnector, DatabaseConnector, Logger} = require(`nylas-core`)
|
2016-06-22 05:58:20 +08:00
|
|
|
const {processors} = require('./processors')
|
2016-06-21 05:57:54 +08:00
|
|
|
|
2016-06-24 06:52:45 +08:00
|
|
|
global.Promise = require('bluebird');
|
2016-07-09 08:13:30 +08:00
|
|
|
global.Logger = Logger.createLogger('nylas-k2-message-processor')
|
2016-06-24 06:52:45 +08:00
|
|
|
|
2016-06-23 01:59:22 +08:00
|
|
|
// List of the attributes of Message that the processor should be allowed to change.
|
2016-06-22 05:58:20 +08:00
|
|
|
// The message may move between folders, get starred, etc. while it's being
|
|
|
|
// processed, and it shouldn't overwrite changes to those fields.
|
2016-06-30 07:10:39 +08:00
|
|
|
const MessageAttributes = ['body', 'processed', 'to', 'from', 'cc', 'replyTo', 'bcc', 'snippet', 'threadId']
|
2016-06-22 05:58:20 +08:00
|
|
|
const MessageProcessorVersion = 1;
|
2016-06-21 05:57:54 +08:00
|
|
|
|
2016-07-01 04:28:41 +08:00
|
|
|
const redis = PubsubConnector.buildClient();
|
|
|
|
|
2016-07-09 08:13:30 +08:00
|
|
|
function runPipeline({db, accountId, message, logger}) {
|
|
|
|
logger.info(`MessageProcessor: Processing message`)
|
2016-06-23 01:59:22 +08:00
|
|
|
return processors.reduce((prevPromise, processor) => (
|
2016-06-28 07:05:31 +08:00
|
|
|
prevPromise.then((prevMessage) => {
|
2016-07-09 08:13:30 +08:00
|
|
|
const processed = processor({message: prevMessage, accountId, db, logger});
|
|
|
|
return Promise.resolve(processed)
|
|
|
|
.then((nextMessage) => {
|
2016-06-28 07:05:31 +08:00
|
|
|
if (!nextMessage.body) {
|
|
|
|
throw new Error("processor did not resolve with a valid message object.")
|
|
|
|
}
|
|
|
|
return Promise.resolve(nextMessage);
|
|
|
|
})
|
|
|
|
})
|
2016-06-23 01:59:22 +08:00
|
|
|
), Promise.resolve(message))
|
|
|
|
}
|
|
|
|
|
|
|
|
function saveMessage(message) {
|
|
|
|
message.processed = MessageProcessorVersion;
|
|
|
|
return message.save({
|
|
|
|
fields: MessageAttributes,
|
|
|
|
});
|
2016-06-21 05:57:54 +08:00
|
|
|
}
|
|
|
|
|
2016-06-29 09:01:43 +08:00
|
|
|
function dequeueJob() {
|
2016-07-01 04:28:41 +08:00
|
|
|
redis.brpopAsync('message-processor-queue', 10).then((item) => {
|
2016-06-29 09:01:43 +08:00
|
|
|
if (!item) {
|
|
|
|
return dequeueJob();
|
|
|
|
}
|
|
|
|
|
|
|
|
let json = null;
|
|
|
|
try {
|
|
|
|
json = JSON.parse(item[1]);
|
|
|
|
} catch (error) {
|
2016-07-09 08:13:30 +08:00
|
|
|
global.Logger.error({item}, `MessageProcessor: Found invalid JSON item in queue`)
|
2016-06-29 09:01:43 +08:00
|
|
|
return dequeueJob();
|
|
|
|
}
|
|
|
|
const {messageId, accountId} = json;
|
2016-07-09 08:13:30 +08:00
|
|
|
const logger = global.Logger.forAccount({id: accountId}).child({message_id: messageId})
|
2016-06-29 09:01:43 +08:00
|
|
|
|
2016-07-09 08:13:30 +08:00
|
|
|
DatabaseConnector.forAccount(accountId).then((db) => {
|
|
|
|
return db.Message.find({
|
2016-07-01 00:29:21 +08:00
|
|
|
where: {id: messageId},
|
|
|
|
include: [{model: db.Folder}, {model: db.Label}],
|
|
|
|
}).then((message) => {
|
|
|
|
if (!message) {
|
|
|
|
return Promise.reject(new Error(`Message not found (${messageId}). Maybe account was deleted?`))
|
|
|
|
}
|
2016-07-09 08:13:30 +08:00
|
|
|
return runPipeline({db, accountId, message, logger}).then((processedMessage) =>
|
2016-06-29 09:01:43 +08:00
|
|
|
saveMessage(processedMessage)
|
|
|
|
).catch((err) =>
|
2016-07-09 08:13:30 +08:00
|
|
|
logger.error(err, `MessageProcessor: Failed`)
|
2016-06-29 09:01:43 +08:00
|
|
|
)
|
2016-07-01 00:29:21 +08:00
|
|
|
})
|
2016-07-09 08:13:30 +08:00
|
|
|
})
|
|
|
|
.finally(() => {
|
2016-06-29 09:01:43 +08:00
|
|
|
dequeueJob()
|
|
|
|
});
|
|
|
|
|
|
|
|
return null;
|
2016-06-28 05:52:05 +08:00
|
|
|
})
|
2016-06-21 05:57:54 +08:00
|
|
|
}
|
|
|
|
|
2016-06-29 09:01:43 +08:00
|
|
|
dequeueJob();
|