From 3d79f9b8be739dcf354c959f58e9819b93ba2633 Mon Sep 17 00:00:00 2001 From: Ben Gotow Date: Wed, 30 Nov 2016 17:26:23 -0800 Subject: [PATCH] [local-sync] Run message processor for one message at a time This avoids issues that arise when we process two messages on the same thread concurrently! --- .../src/new-message-processor/index.js | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/packages/local-sync/src/new-message-processor/index.js b/packages/local-sync/src/new-message-processor/index.js index 184b5ff7b..5aed1b513 100644 --- a/packages/local-sync/src/new-message-processor/index.js +++ b/packages/local-sync/src/new-message-processor/index.js @@ -3,27 +3,29 @@ const extractFiles = require('./extract-files') const extractContacts = require('./extract-contacts') const LocalDatabaseConnector = require('../shared/local-database-connector') +const Queue = require('promise-queue'); +const queue = new Queue(1, Infinity); + function processNewMessage(message, imapMessage) { - process.nextTick(() => { - const {accountId} = message + queue.add(async () => { + const {accountId} = message; const logger = global.Logger.forAccount({id: accountId}).child({message}) - LocalDatabaseConnector.forAccount(accountId).then((db) => { - detectThread({db, message}) - .then((thread) => { - message.threadId = thread.id - return db.Message.create(message) - }) - .then(() => extractFiles({db, message, imapMessage})) - .then(() => extractContacts({db, message})) - .then(() => { - logger.info({ - message_id: message.id, - uid: message.folderImapUID, - }, `MessageProcessor: Created and processed message`) - }) - .catch((err) => logger.error(err, `MessageProcessor: Failed`)) - }) - }) + const db = await LocalDatabaseConnector.forAccount(accountId); + + try { + const thread = await detectThread({db, message}); + message.threadId = thread.id; + await db.Message.create(message); + await extractFiles({db, message, imapMessage}); + await extractContacts({db, message}); + logger.info({ + message_id: message.id, + uid: message.folderImapUID, + }, `MessageProcessor: Created and processed message`); + } catch (err) { + logger.error(err, `MessageProcessor: Failed`); + } + }); } module.exports = {processNewMessage}