Add initial version of message-processor package

This commit is contained in:
Juan Tejada 2016-06-20 14:57:54 -07:00
parent de8e09d6b5
commit 83dfb664e1
8 changed files with 67 additions and 8 deletions

View file

@ -30,6 +30,9 @@ class DatabaseConnectionFactory {
}
_sequelizeForAccount(accountId) {
if (!accountId) {
throw new Error(`You need to pass an accountId to init the database!`)
}
const sequelize = new Sequelize(accountId, '', '', {
storage: path.join(STORAGE_DIR, `a-${accountId}.sqlite`),
dialect: "sqlite",
@ -41,6 +44,7 @@ class DatabaseConnectionFactory {
db.sequelize = sequelize;
db.Sequelize = Sequelize;
db.accountId = accountId;
return sequelize.authenticate().then(() =>
sequelize.sync()

View file

@ -5,6 +5,7 @@ module.exports = (sequelize, Sequelize) => {
subject: Sequelize.STRING,
snippet: Sequelize.STRING,
body: Sequelize.STRING,
hash: Sequelize.STRING,
headers: Sequelize.STRING,
date: Sequelize.DATE,
unread: Sequelize.BOOLEAN,

View file

@ -0,0 +1,32 @@
const DatabaseConnectionFactory = require(`${__base}/core/database-connection-factory`)
const processors = require('./processors')
function createMessage({headers, body, attributes, hash, db}) {
const {Message} = db
return Message.create({
hash: hash,
unread: attributes.flags.includes('\\Unseen'),
starred: attributes.flags.includes('\\Flagged'),
date: attributes.date,
headers: headers,
body: body,
})
}
function runPipeline(message) {
return processors.reduce((prevPromise, processor) => {
return prevPromise.then((msg) => processor(msg))
}, Promise.resolve(message))
}
function processMessage({headers, body, attributes, hash, accountId}) {
return DatabaseConnectionFactory.forAccount(accountId)
.then((db) => createMessage({headers, body, attributes, hash, db}))
.then((message) => runPipeline(message))
.then((processedMessage) => processedMessage)
.catch((err) => console.log('oh no'))
}
module.exports = {
processMessage,
}

View file

@ -0,0 +1,11 @@
{
"name": "message-processor",
"version": "1.0.0",
"description": "Message processing pipeline",
"main": "y",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "Juan Tejada <juans.tejada@gmail.com>",
"license": "ISC"
}

View file

@ -0,0 +1,10 @@
const fs = require('fs')
const path = require('path')
const processors = fs.readdirSync(__dirname)
.filter((file) => file !== 'index.js')
.map((file) => {
return require(`./${file}`).processMessage
})
module.exports = {processors}

View file

@ -0,0 +1,3 @@
module.exports = {
processMessage: (message) => message,
}

View file

@ -0,0 +1,3 @@
module.exports = {
processMessage: (message) => message,
}

View file

@ -1,4 +1,6 @@
const _ = require('underscore');
const { processMessage } = require(`${__base}/message-processor`)
class SyncMailboxOperation {
constructor(category) {
@ -68,14 +70,7 @@ class SyncMailboxOperation {
flags: attributes.flags,
uid: attributes.uid,
});
return Message.create({
unread: attributes.flags.includes('\\Unseen'),
starred: attributes.flags.includes('\\Flagged'),
date: attributes.date,
headers: headers,
body: body,
});
return processMessage({accountId, attributes, headers, body, hash})
}
_openMailboxAndCheckValidity() {