Add delta stream queue on sync side

This commit is contained in:
Evan Morikawa 2016-06-20 17:28:26 -07:00
parent cebce1081f
commit f995f6fda1
6 changed files with 64 additions and 7 deletions

View file

@ -2,6 +2,7 @@ const Sequelize = require('sequelize');
const fs = require('fs');
const path = require('path');
const TransactionLog = require('./transaction-log')
const DeltaStreamQueue = require('./delta-stream-queue.js')
const STORAGE_DIR = path.join(__base, 'storage');
if (!fs.existsSync(STORAGE_DIR)) {
@ -13,6 +14,10 @@ class DatabaseConnectionFactory {
this._pools = {};
}
setup() {
DeltaStreamQueue.setup()
}
_readModelsInDirectory(sequelize, dirname) {
const db = {};
for (const filename of fs.readdirSync(dirname)) {

View file

@ -0,0 +1,29 @@
const bluebird = require('bluebird')
const redis = require("redis");
bluebird.promisifyAll(redis.RedisClient.prototype);
bluebird.promisifyAll(redis.Multi.prototype);
class DeltaStreamQueue {
setup() {
this.client = redis.createClient();
this.client.on("error", console.error);
this.client.on("ready", () => console.log("Redis ready"));
}
key(accountId) {
return `delta-${accountId}`
}
hasSubscribers(accountId) {
return this.client.existsAsync(this.key(accountId))
}
notify(accountId, data) {
return this.hasSubscribers(accountId).then((hasSubscribers) => {
if (!hasSubscribers) return Promise.resolve()
return this.client.rpushAsync(this.key(accountId), JSON.stringify(data))
})
}
}
module.exports = new DeltaStreamQueue()

View file

@ -4,7 +4,9 @@
"description": "",
"main": "database-connection-factory.js",
"dependencies": {
"bluebird": "3.x.x",
"mysql": "^2.10.2",
"redis": "2.x.x",
"sequelize": "^3.23.3",
"sqlite3": "^3.1.4"
},

View file

@ -1,3 +1,5 @@
const DeltaStreamQueue = require('./delta-stream-queue')
class TransactionLog {
constructor(db) {
this.db = db;
@ -18,9 +20,11 @@ class TransactionLog {
transactionLogger(type) {
return (sequelizeHookData) => {
if (this.isTransaction(sequelizeHookData)) return;
this.db.Transaction.create(Object.assign({type: type},
const transactionData = Object.assign({type: type},
this.parseHookData(sequelizeHookData)
));
);
this.db.Transaction.create(transactionData);
DeltaStreamQueue.notify(this.db.accountId, transactionData)
}
}

View file

@ -8,13 +8,27 @@ const DatabaseConnectionFactory = require(`${__base}/core/database-connection-fa
const SyncWorkerPool = require('./sync-worker-pool');
const workerPool = new SyncWorkerPool();
DatabaseConnectionFactory.forShared().then((db) => {
const {Account} = db
Account.findAll().then((accounts) => {
accounts.forEach((account) => {
workerPool.addWorkerForAccount(account);
const RedisServer = require('redis-server');
const redisServerInstance = new RedisServer(6379);
const start = () => {
DatabaseConnectionFactory.setup()
DatabaseConnectionFactory.forShared().then((db) => {
const {Account} = db
Account.findAll().then((accounts) => {
accounts.forEach((account) => {
workerPool.addWorkerForAccount(account);
});
});
});
}
redisServerInstance.open((error) => {
if (error) {
console.error(error)
process.exit(1);
}
start()
});
global.workerPool = workerPool;

View file

@ -6,10 +6,13 @@
"dependencies": {
"bluebird": "^3.4.1",
"imap": "^0.8.17",
"redis": "2.x.x",
"redis-server": "0.x.x",
"underscore": "^1.8.3"
},
"devDependencies": {},
"scripts": {
"postinstall": "brew install redis",
"start": "node app.js",
"test": "echo \"Error: no test specified\" && exit 1"
},