Refactor DeltaStreamQueue (connection per subscription)

This commit is contained in:
Ben Gotow 2016-06-22 18:34:00 -07:00
parent cb95574378
commit 2e9bfa68b5
9 changed files with 57 additions and 51 deletions

View file

@ -63,8 +63,6 @@ server.register(plugins, (err) => {
server.auth.strategy('api-consumer', 'basic', { validateFunc: validate });
server.auth.default('api-consumer');
DatabaseConnectionFactory.setup()
server.start((startErr) => {
if (startErr) { throw startErr; }
console.log('Server running at:', server.info.uri);

View file

@ -89,10 +89,10 @@ module.exports = (server) => {
account.save().then((saved) =>
AccountToken.create({
AccountId: saved.id,
}).then((token) => {
const response = Serialization.jsonStringify(saved);
response.token = token;
reply(response);
}).then((accountToken) => {
const response = saved.toJSON();
response.token = accountToken.value;
reply(Serialization.jsonStringify(response));
})
);
})

View file

@ -1,6 +1,6 @@
const Rx = require('rx')
const _ = require('underscore');
const {DeltaStreamQueue} = require(`nylas-core`);
const {AccountPubsub} = require(`nylas-core`);
function keepAlive(request) {
const until = Rx.Observable.fromCallback(request.on)("disconnect")
@ -52,7 +52,7 @@ module.exports = (server) => {
request.getAccountDatabase().then((db) => {
const source = Rx.Observable.merge(
DeltaStreamQueue.fromAccountId(db.accountId),
AccountPubsub.observableForAccountId(db.accountId),
initialTransactions(db, request),
keepAlive(request)
).subscribe(outputStream.pushJSON)

View file

@ -0,0 +1,47 @@
const Rx = require('rx')
const bluebird = require('bluebird')
const redis = require("redis");
bluebird.promisifyAll(redis.RedisClient.prototype);
bluebird.promisifyAll(redis.Multi.prototype);
class AccountPubsub {
constructor() {
this._broadcastClient = null;
}
buildClient() {
const client = redis.createClient(process.env.REDIS_URL || null);
client.on("error", console.error);
return client;
}
keyForAccountId(accountId) {
return `delta-${accountId}`;
}
notify(accountId, data) {
if (!this._broadcastClient) {
this._broadcastClient = this.buildClient();
}
const key = this.keyForAccountId(accountId);
this._broadcastClient.publish(key, JSON.stringify(data))
}
observableForAccountId(accountId) {
return Rx.Observable.create((observer) => {
const sub = this.buildClient();
const key = this.keyForAccountId(accountId);
sub.on("message", (channel, message) => {
if (channel !== key) { return }
observer.onNext(message)
});
sub.subscribe(key);
return () => {
sub.unsubscribe()
sub.quit()
}
})
}
}
module.exports = new AccountPubsub()

View file

@ -2,7 +2,6 @@ const Sequelize = require('sequelize');
const fs = require('fs');
const path = require('path');
const TransactionLog = require('./transaction-log')
const DeltaStreamQueue = require('./delta-stream-queue.js')
require('./database-extensions'); // Extends Sequelize on require
@ -16,10 +15,6 @@ class DatabaseConnectionFactory {
this._pools = {};
}
setup() {
DeltaStreamQueue.setup()
}
_readModelsInDirectory(sequelize, dirname) {
const db = {};
for (const filename of fs.readdirSync(dirname)) {

View file

@ -1,33 +0,0 @@
const Rx = require('rx')
const bluebird = require('bluebird')
const redis = require("redis");
bluebird.promisifyAll(redis.RedisClient.prototype);
bluebird.promisifyAll(redis.Multi.prototype);
class DeltaStreamQueue {
setup() {
this.client = redis.createClient(process.env.REDIS_URL || null);
this.client.on("error", console.error);
}
key(accountId) {
return `delta-${accountId}`
}
notify(accountId, data) {
this.client.publish(this.key(accountId), JSON.stringify(data))
}
fromAccountId(accountId) {
return Rx.Observable.create((observer) => {
this.client.on("message", (channel, message) => {
if (channel !== this.key(accountId)) { return }
observer.onNext(message)
});
this.client.subscribe(this.key(accountId));
return () => { this.client.unsubscribe() }
})
}
}
module.exports = new DeltaStreamQueue()

View file

@ -1,6 +1,6 @@
module.exports = {
DatabaseConnectionFactory: require('./database-connection-factory'),
DeltaStreamQueue: require('./delta-stream-queue'),
AccountPubsub: require('./account-pubsub'),
IMAPConnection: require('./imap-connection'),
Config: require(`./config/${process.env.ENV || 'development'}`),
}

View file

@ -1,4 +1,4 @@
const DeltaStreamQueue = require('./delta-stream-queue')
const AccountPubsub = require('./account-pubsub')
class TransactionLog {
constructor(db) {
@ -24,8 +24,8 @@ class TransactionLog {
this.parseHookData(sequelizeHookData)
);
this.db.Transaction.create(transactionData);
transactionData.object = sequelizeHookData.dataValues
DeltaStreamQueue.notify(this.db.accountId, transactionData)
transactionData.object = sequelizeHookData.dataValues;
AccountPubsub.notify(this.db.accountId, transactionData);
}
}

View file

@ -19,7 +19,6 @@ const start = () => {
});
}
DatabaseConnectionFactory.setup()
start();
global.workerPool = workerPool;