[local-sync]: Restore message-processor

Don't use pubsub for scheduling a message for processing, just use
nextTick
This commit is contained in:
Juan Tejada 2016-11-23 14:24:39 -08:00
parent 78b96d24ca
commit f0caf042bd
2 changed files with 11 additions and 31 deletions

View file

@ -2,6 +2,7 @@ const _ = require('underscore');
const {Imap, PromiseUtils, IMAPConnection} = require('isomorphic-core');
const {Capabilities} = IMAPConnection;
const {queueMessageForProcessing} = require('../../message-processor')
const MessageFlagAttributes = ['id', 'threadId', 'folderImapUID', 'unread', 'starred', 'folderImapXGMLabels']
@ -278,8 +279,7 @@ class FetchMessagesInFolder {
)
}
// FIXME: uncomment when we bring back message processing.
// LocalPubsubConnector.queueProcessMessage({accountId, messageId: message.id});
queueMessageForProcessing({accountId, messageId: message.id});
} else {
message.getThread()
.then((thread) => {

View file

@ -1,6 +1,5 @@
const {processors} = require('./processors')
const LocalDatabaseConnector = require('../shared/local-database-connector')
const LocalPubsubConnector = require('../shared/local-pubsub-connector')
// List of the attributes of Message that the processor should be allowed to change.
// The message may move between folders, get starred, etc. while it's being
@ -8,8 +7,6 @@ const LocalPubsubConnector = require('../shared/local-pubsub-connector')
const MessageAttributes = ['body', 'processed', 'to', 'from', 'cc', 'replyTo', 'bcc', 'snippet', 'threadId']
const MessageProcessorVersion = 1;
const redis = LocalPubsubConnector.buildClient();
function runPipeline({db, accountId, message, logger}) {
logger.info(`MessageProcessor: Processing message`)
return processors.reduce((prevPromise, processor) => (
@ -33,43 +30,26 @@ function saveMessage(message) {
});
}
function dequeueJob() {
redis.brpopAsync('message-processor-queue', 10).then((item) => {
if (!item) {
return dequeueJob();
}
let json = null;
try {
json = JSON.parse(item[1]);
} catch (error) {
global.Logger.error({item}, `MessageProcessor: Found invalid JSON item in queue`)
return dequeueJob();
}
const {messageId, accountId} = json;
function queueMessageForProcessing({accountId, messageId}) {
process.nextTick(() => {
const logger = global.Logger.forAccount({id: accountId}).child({message_id: messageId})
LocalDatabaseConnector.forAccount(accountId).then((db) => {
return db.Message.find({
where: {id: messageId},
include: [{model: db.Folder}, {model: db.Label}],
}).then((message) => {
})
.then((message) => {
if (!message) {
return Promise.reject(new Error(`Message not found (${messageId}). Maybe account was deleted?`))
}
return runPipeline({db, accountId, message, logger}).then((processedMessage) =>
saveMessage(processedMessage)
).catch((err) =>
logger.error(err, `MessageProcessor: Failed`)
)
return runPipeline({db, accountId, message, logger})
.then((processedMessage) => saveMessage(processedMessage))
.catch((err) => logger.error(err, `MessageProcessor: Failed`)
)
})
})
.finally(() => {
dequeueJob()
});
return null;
})
}
dequeueJob();
module.exports = {queueMessageForProcessing}