const Rx = require('rx') const redis = require("redis"); const PromiseUtils = require('./promise-utils') const log = global.Logger || console PromiseUtils.promisifyAll(redis.RedisClient.prototype); PromiseUtils.promisifyAll(redis.Multi.prototype); class PubsubConnector { constructor() { this._broadcastClient = null; this._listenClient = null; this._listenClientSubs = {}; } buildClient() { const client = redis.createClient(process.env.REDIS_URL || null); client.on("error", log.error); return client; } broadcastClient() { if (!this._broadcastClient) { this._broadcastClient = this.buildClient(); } return this._broadcastClient; } queueProcessMessage({messageId, accountId}) { if (!messageId) { throw new Error("queueProcessMessage: The message body processor expects a messageId") } if (!accountId) { throw new Error("queueProcessMessage: The message body processor expects a accountId") } this.broadcastClient().lpush(`message-processor-queue`, JSON.stringify({messageId, accountId})); } // Shared channel _observableForChannelOnSharedListener(channel) { if (!this._listenClient) { this._listenClient = this.buildClient(); this._listenClientSubs = {}; } return Rx.Observable.create((observer) => { this._listenClient.on("message", (msgChannel, message) => { if (msgChannel !== channel) { return } observer.onNext(message) }); if (!this._listenClientSubs[channel]) { this._listenClientSubs[channel] = 1; this._listenClient.subscribe(channel); } else { this._listenClientSubs[channel] += 1; } return () => { this._listenClientSubs[channel] -= 1; if (this._listenClientSubs[channel] === 0) { this._listenClient.unsubscribe(channel); } } }); } notifyAccount(accountId, {type, data}) { this.broadcastClient().publish(`account-${accountId}`, JSON.stringify({type, data})); } observeAccount(accountId) { return this._observableForChannelOnSharedListener(`account-${accountId}`); } notifyDelta(accountId, data) { this.broadcastClient().publish(`deltas-${accountId}`, JSON.stringify(data)) } observeAllAccounts() { return Rx.Observable.create((observer) => { const sub = this.buildClient(); sub.on("pmessage", (pattern, channel, message) => observer.onNext(channel.replace('account-', ''), message)); sub.psubscribe(`account-*`); return () => { sub.unsubscribe(); sub.quit(); } }) } observeDeltas(accountId) { return Rx.Observable.create((observer) => { const sub = this.buildClient(); sub.on("message", (channel, message) => observer.onNext(message)); sub.subscribe(`deltas-${accountId}`); return () => { sub.unsubscribe(); sub.quit(); } }) } } module.exports = new PubsubConnector()