From f0caf042bd543014b7a6ea74d1f58c58f774fa20 Mon Sep 17 00:00:00 2001 From: Juan Tejada Date: Wed, 23 Nov 2016 14:24:39 -0800 Subject: [PATCH] [local-sync]: Restore message-processor Don't use pubsub for scheduling a message for processing, just use nextTick --- .../imap/fetch-messages-in-folder.js | 4 +- .../message-processor/{app.js => index.js} | 38 +++++-------------- 2 files changed, 11 insertions(+), 31 deletions(-) rename packages/local-sync/src/message-processor/{app.js => index.js} (66%) diff --git a/packages/local-sync/src/local-sync-worker/imap/fetch-messages-in-folder.js b/packages/local-sync/src/local-sync-worker/imap/fetch-messages-in-folder.js index 05bce329b..ab938a6f3 100644 --- a/packages/local-sync/src/local-sync-worker/imap/fetch-messages-in-folder.js +++ b/packages/local-sync/src/local-sync-worker/imap/fetch-messages-in-folder.js @@ -2,6 +2,7 @@ const _ = require('underscore'); const {Imap, PromiseUtils, IMAPConnection} = require('isomorphic-core'); const {Capabilities} = IMAPConnection; +const {queueMessageForProcessing} = require('../../message-processor') const MessageFlagAttributes = ['id', 'threadId', 'folderImapUID', 'unread', 'starred', 'folderImapXGMLabels'] @@ -278,8 +279,7 @@ class FetchMessagesInFolder { ) } - // FIXME: uncomment when we bring back message processing. - // LocalPubsubConnector.queueProcessMessage({accountId, messageId: message.id}); + queueMessageForProcessing({accountId, messageId: message.id}); } else { message.getThread() .then((thread) => { diff --git a/packages/local-sync/src/message-processor/app.js b/packages/local-sync/src/message-processor/index.js similarity index 66% rename from packages/local-sync/src/message-processor/app.js rename to packages/local-sync/src/message-processor/index.js index cf57f8e24..831571edc 100644 --- a/packages/local-sync/src/message-processor/app.js +++ b/packages/local-sync/src/message-processor/index.js @@ -1,6 +1,5 @@ const {processors} = require('./processors') const LocalDatabaseConnector = require('../shared/local-database-connector') -const LocalPubsubConnector = require('../shared/local-pubsub-connector') // List of the attributes of Message that the processor should be allowed to change. // The message may move between folders, get starred, etc. while it's being @@ -8,8 +7,6 @@ const LocalPubsubConnector = require('../shared/local-pubsub-connector') const MessageAttributes = ['body', 'processed', 'to', 'from', 'cc', 'replyTo', 'bcc', 'snippet', 'threadId'] const MessageProcessorVersion = 1; -const redis = LocalPubsubConnector.buildClient(); - function runPipeline({db, accountId, message, logger}) { logger.info(`MessageProcessor: Processing message`) return processors.reduce((prevPromise, processor) => ( @@ -33,43 +30,26 @@ function saveMessage(message) { }); } -function dequeueJob() { - redis.brpopAsync('message-processor-queue', 10).then((item) => { - if (!item) { - return dequeueJob(); - } - - let json = null; - try { - json = JSON.parse(item[1]); - } catch (error) { - global.Logger.error({item}, `MessageProcessor: Found invalid JSON item in queue`) - return dequeueJob(); - } - const {messageId, accountId} = json; +function queueMessageForProcessing({accountId, messageId}) { + process.nextTick(() => { const logger = global.Logger.forAccount({id: accountId}).child({message_id: messageId}) LocalDatabaseConnector.forAccount(accountId).then((db) => { return db.Message.find({ where: {id: messageId}, include: [{model: db.Folder}, {model: db.Label}], - }).then((message) => { + }) + .then((message) => { if (!message) { return Promise.reject(new Error(`Message not found (${messageId}). Maybe account was deleted?`)) } - return runPipeline({db, accountId, message, logger}).then((processedMessage) => - saveMessage(processedMessage) - ).catch((err) => - logger.error(err, `MessageProcessor: Failed`) - ) + return runPipeline({db, accountId, message, logger}) + .then((processedMessage) => saveMessage(processedMessage)) + .catch((err) => logger.error(err, `MessageProcessor: Failed`) + ) }) }) - .finally(() => { - dequeueJob() - }); - - return null; }) } -dequeueJob(); +module.exports = {queueMessageForProcessing}