From f995f6fda150a8bba5cf0f23a3db81fe2c042cb1 Mon Sep 17 00:00:00 2001 From: Evan Morikawa Date: Mon, 20 Jun 2016 17:28:26 -0700 Subject: [PATCH] Add delta stream queue on sync side --- core/database-connection-factory.js | 5 +++++ core/delta-stream-queue.js | 29 +++++++++++++++++++++++++++++ core/package.json | 2 ++ core/transaction-log.js | 8 ++++++-- sync/app.js | 24 +++++++++++++++++++----- sync/package.json | 3 +++ 6 files changed, 64 insertions(+), 7 deletions(-) create mode 100644 core/delta-stream-queue.js diff --git a/core/database-connection-factory.js b/core/database-connection-factory.js index 1d9b64781..76cebb316 100644 --- a/core/database-connection-factory.js +++ b/core/database-connection-factory.js @@ -2,6 +2,7 @@ const Sequelize = require('sequelize'); const fs = require('fs'); const path = require('path'); const TransactionLog = require('./transaction-log') +const DeltaStreamQueue = require('./delta-stream-queue.js') const STORAGE_DIR = path.join(__base, 'storage'); if (!fs.existsSync(STORAGE_DIR)) { @@ -13,6 +14,10 @@ class DatabaseConnectionFactory { this._pools = {}; } + setup() { + DeltaStreamQueue.setup() + } + _readModelsInDirectory(sequelize, dirname) { const db = {}; for (const filename of fs.readdirSync(dirname)) { diff --git a/core/delta-stream-queue.js b/core/delta-stream-queue.js new file mode 100644 index 000000000..7da47b556 --- /dev/null +++ b/core/delta-stream-queue.js @@ -0,0 +1,29 @@ +const bluebird = require('bluebird') +const redis = require("redis"); +bluebird.promisifyAll(redis.RedisClient.prototype); +bluebird.promisifyAll(redis.Multi.prototype); + +class DeltaStreamQueue { + setup() { + this.client = redis.createClient(); + this.client.on("error", console.error); + this.client.on("ready", () => console.log("Redis ready")); + } + + key(accountId) { + return `delta-${accountId}` + } + + hasSubscribers(accountId) { + return this.client.existsAsync(this.key(accountId)) + } + + notify(accountId, data) { + return this.hasSubscribers(accountId).then((hasSubscribers) => { + if (!hasSubscribers) return Promise.resolve() + return this.client.rpushAsync(this.key(accountId), JSON.stringify(data)) + }) + } +} + +module.exports = new DeltaStreamQueue() diff --git a/core/package.json b/core/package.json index 184b16d50..cd7564760 100644 --- a/core/package.json +++ b/core/package.json @@ -4,7 +4,9 @@ "description": "", "main": "database-connection-factory.js", "dependencies": { + "bluebird": "3.x.x", "mysql": "^2.10.2", + "redis": "2.x.x", "sequelize": "^3.23.3", "sqlite3": "^3.1.4" }, diff --git a/core/transaction-log.js b/core/transaction-log.js index f636ac6f5..2462a2926 100644 --- a/core/transaction-log.js +++ b/core/transaction-log.js @@ -1,3 +1,5 @@ +const DeltaStreamQueue = require('./delta-stream-queue') + class TransactionLog { constructor(db) { this.db = db; @@ -18,9 +20,11 @@ class TransactionLog { transactionLogger(type) { return (sequelizeHookData) => { if (this.isTransaction(sequelizeHookData)) return; - this.db.Transaction.create(Object.assign({type: type}, + const transactionData = Object.assign({type: type}, this.parseHookData(sequelizeHookData) - )); + ); + this.db.Transaction.create(transactionData); + DeltaStreamQueue.notify(this.db.accountId, transactionData) } } diff --git a/sync/app.js b/sync/app.js index 5481a9411..f3f1e31e9 100644 --- a/sync/app.js +++ b/sync/app.js @@ -8,13 +8,27 @@ const DatabaseConnectionFactory = require(`${__base}/core/database-connection-fa const SyncWorkerPool = require('./sync-worker-pool'); const workerPool = new SyncWorkerPool(); -DatabaseConnectionFactory.forShared().then((db) => { - const {Account} = db - Account.findAll().then((accounts) => { - accounts.forEach((account) => { - workerPool.addWorkerForAccount(account); +const RedisServer = require('redis-server'); +const redisServerInstance = new RedisServer(6379); + +const start = () => { + DatabaseConnectionFactory.setup() + DatabaseConnectionFactory.forShared().then((db) => { + const {Account} = db + Account.findAll().then((accounts) => { + accounts.forEach((account) => { + workerPool.addWorkerForAccount(account); + }); }); }); +} + +redisServerInstance.open((error) => { + if (error) { + console.error(error) + process.exit(1); + } + start() }); global.workerPool = workerPool; diff --git a/sync/package.json b/sync/package.json index 0dcb6228b..2524ddb1c 100644 --- a/sync/package.json +++ b/sync/package.json @@ -6,10 +6,13 @@ "dependencies": { "bluebird": "^3.4.1", "imap": "^0.8.17", + "redis": "2.x.x", + "redis-server": "0.x.x", "underscore": "^1.8.3" }, "devDependencies": {}, "scripts": { + "postinstall": "brew install redis", "start": "node app.js", "test": "echo \"Error: no test specified\" && exit 1" },