diff --git a/packages/local-sync/src/local-sync-worker/imap/fetch-messages-in-folder.js b/packages/local-sync/src/local-sync-worker/imap/fetch-messages-in-folder.js index a65423474..4660c8215 100644 --- a/packages/local-sync/src/local-sync-worker/imap/fetch-messages-in-folder.js +++ b/packages/local-sync/src/local-sync-worker/imap/fetch-messages-in-folder.js @@ -264,11 +264,12 @@ class FetchMessagesInFolder { // TODO investigate batching processing new messages // We could measure load of actual sync vs load of just message processing // to determine how meaningful it is - processNewMessage(messageValues, imapMessage) + await processNewMessage(messageValues, imapMessage) // this._logger.info({ // message: messageValues, // }, `FetchMessagesInFolder: Queued new message for processing`) } + console.log(`🔃 ✉️ "${messageValues.subject}" - ${messageValues.date}`) } catch (err) { this._logger.error(err, { imapMessage, diff --git a/packages/local-sync/src/new-message-processor/index.js b/packages/local-sync/src/new-message-processor/index.js index fc96d67c5..4ac2f1ed4 100644 --- a/packages/local-sync/src/new-message-processor/index.js +++ b/packages/local-sync/src/new-message-processor/index.js @@ -3,40 +3,34 @@ const extractFiles = require('./extract-files'); const extractContacts = require('./extract-contacts'); const LocalDatabaseConnector = require('../shared/local-database-connector'); -const Queue = require('promise-queue'); -const queue = new Queue(1, Infinity); +async function processNewMessage(message, imapMessage) { + const {accountId} = message; + const logger = global.Logger.forAccount({id: accountId}).child({message}) + const db = await LocalDatabaseConnector.forAccount(accountId); + const {Message} = db -function processNewMessage(message, imapMessage) { - queue.add(async () => { - const {accountId} = message; - const logger = global.Logger.forAccount({id: accountId}).child({message}) - const db = await LocalDatabaseConnector.forAccount(accountId); - const {Message} = db - - try { - const existingMessage = await Message.findById(message.id) - if (existingMessage) { - // This is an extremely rare case when 2 or more /new/ messages with - // the exact same headers were queued for creation (same subject, - // participants, timestamp, and message-id header). In this case, we - // will ignore it and report the error - logger.warn({message}, 'MessageProcessor: Encountered 2 new messages with the same id') - return - } - const thread = await detectThread({db, message}); - message.threadId = thread.id; - await Message.create(message); - await extractFiles({db, message, imapMessage}); - await extractContacts({db, message}); - console.log(`🔃 ✉️ "${message.subject}"`) - // logger.info({ - // message_id: message.id, - // uid: message.folderImapUID, - // }, `MessageProcessor: Created and processed message`); - } catch (err) { - logger.error(err, `MessageProcessor: Failed`); + try { + const existingMessage = await Message.findById(message.id) + if (existingMessage) { + // This is an extremely rare case when 2 or more /new/ messages with + // the exact same headers were queued for creation (same subject, + // participants, timestamp, and message-id header). In this case, we + // will ignore it and report the error + logger.warn({message}, 'MessageProcessor: Encountered 2 new messages with the same id') + return } - }); + const thread = await detectThread({db, message}); + message.threadId = thread.id; + await Message.create(message); + await extractFiles({db, message, imapMessage}); + await extractContacts({db, message}); + // logger.info({ + // message_id: message.id, + // uid: message.folderImapUID, + // }, `MessageProcessor: Created and processed message`); + } catch (err) { + logger.error(err, `MessageProcessor: Failed`); + } } module.exports = {processNewMessage}