From 83dfb664e1d7ffadf376abc59661f4a67251b547 Mon Sep 17 00:00:00 2001 From: Juan Tejada Date: Mon, 20 Jun 2016 14:57:54 -0700 Subject: [PATCH] Add initial version of message-processor package --- core/database-connection-factory.js | 4 +++ core/models/account/message.js | 1 + message-processor/index.js | 32 +++++++++++++++++++++ message-processor/package.json | 11 +++++++ message-processor/processors/index.js | 10 +++++++ message-processor/processors/quoted-text.js | 3 ++ message-processor/processors/threading.js | 3 ++ sync/imap/sync-mailbox-operation.js | 11 ++----- 8 files changed, 67 insertions(+), 8 deletions(-) create mode 100644 message-processor/index.js create mode 100644 message-processor/package.json create mode 100644 message-processor/processors/index.js create mode 100644 message-processor/processors/quoted-text.js create mode 100644 message-processor/processors/threading.js diff --git a/core/database-connection-factory.js b/core/database-connection-factory.js index 59d021487..47d293c91 100644 --- a/core/database-connection-factory.js +++ b/core/database-connection-factory.js @@ -30,6 +30,9 @@ class DatabaseConnectionFactory { } _sequelizeForAccount(accountId) { + if (!accountId) { + throw new Error(`You need to pass an accountId to init the database!`) + } const sequelize = new Sequelize(accountId, '', '', { storage: path.join(STORAGE_DIR, `a-${accountId}.sqlite`), dialect: "sqlite", @@ -41,6 +44,7 @@ class DatabaseConnectionFactory { db.sequelize = sequelize; db.Sequelize = Sequelize; + db.accountId = accountId; return sequelize.authenticate().then(() => sequelize.sync() diff --git a/core/models/account/message.js b/core/models/account/message.js index 9418aaba4..ec78ec946 100644 --- a/core/models/account/message.js +++ b/core/models/account/message.js @@ -5,6 +5,7 @@ module.exports = (sequelize, Sequelize) => { subject: Sequelize.STRING, snippet: Sequelize.STRING, body: Sequelize.STRING, + hash: Sequelize.STRING, headers: Sequelize.STRING, date: Sequelize.DATE, unread: Sequelize.BOOLEAN, diff --git a/message-processor/index.js b/message-processor/index.js new file mode 100644 index 000000000..719f877f7 --- /dev/null +++ b/message-processor/index.js @@ -0,0 +1,32 @@ +const DatabaseConnectionFactory = require(`${__base}/core/database-connection-factory`) +const processors = require('./processors') + +function createMessage({headers, body, attributes, hash, db}) { + const {Message} = db + return Message.create({ + hash: hash, + unread: attributes.flags.includes('\\Unseen'), + starred: attributes.flags.includes('\\Flagged'), + date: attributes.date, + headers: headers, + body: body, + }) +} + +function runPipeline(message) { + return processors.reduce((prevPromise, processor) => { + return prevPromise.then((msg) => processor(msg)) + }, Promise.resolve(message)) +} + +function processMessage({headers, body, attributes, hash, accountId}) { + return DatabaseConnectionFactory.forAccount(accountId) + .then((db) => createMessage({headers, body, attributes, hash, db})) + .then((message) => runPipeline(message)) + .then((processedMessage) => processedMessage) + .catch((err) => console.log('oh no')) +} + +module.exports = { + processMessage, +} diff --git a/message-processor/package.json b/message-processor/package.json new file mode 100644 index 000000000..900e7509a --- /dev/null +++ b/message-processor/package.json @@ -0,0 +1,11 @@ +{ + "name": "message-processor", + "version": "1.0.0", + "description": "Message processing pipeline", + "main": "y", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "author": "Juan Tejada ", + "license": "ISC" +} diff --git a/message-processor/processors/index.js b/message-processor/processors/index.js new file mode 100644 index 000000000..a13b95158 --- /dev/null +++ b/message-processor/processors/index.js @@ -0,0 +1,10 @@ +const fs = require('fs') +const path = require('path') + +const processors = fs.readdirSync(__dirname) +.filter((file) => file !== 'index.js') +.map((file) => { + return require(`./${file}`).processMessage +}) + +module.exports = {processors} diff --git a/message-processor/processors/quoted-text.js b/message-processor/processors/quoted-text.js new file mode 100644 index 000000000..39ffd71c6 --- /dev/null +++ b/message-processor/processors/quoted-text.js @@ -0,0 +1,3 @@ +module.exports = { + processMessage: (message) => message, +} diff --git a/message-processor/processors/threading.js b/message-processor/processors/threading.js new file mode 100644 index 000000000..39ffd71c6 --- /dev/null +++ b/message-processor/processors/threading.js @@ -0,0 +1,3 @@ +module.exports = { + processMessage: (message) => message, +} diff --git a/sync/imap/sync-mailbox-operation.js b/sync/imap/sync-mailbox-operation.js index 3ab16b52d..cb69df2f8 100644 --- a/sync/imap/sync-mailbox-operation.js +++ b/sync/imap/sync-mailbox-operation.js @@ -1,4 +1,6 @@ const _ = require('underscore'); +const { processMessage } = require(`${__base}/message-processor`) + class SyncMailboxOperation { constructor(category) { @@ -68,14 +70,7 @@ class SyncMailboxOperation { flags: attributes.flags, uid: attributes.uid, }); - - return Message.create({ - unread: attributes.flags.includes('\\Unseen'), - starred: attributes.flags.includes('\\Flagged'), - date: attributes.date, - headers: headers, - body: body, - }); + return processMessage({accountId, attributes, headers, body, hash}) } _openMailboxAndCheckValidity() {