2016-06-23 15:49:22 +08:00
|
|
|
const Rx = require('rx')
|
|
|
|
const redis = require("redis");
|
2016-07-14 07:23:25 +08:00
|
|
|
const PromiseUtils = require('./promise-utils')
|
2016-07-09 08:13:30 +08:00
|
|
|
const log = global.Logger || console
|
2016-06-24 02:45:24 +08:00
|
|
|
|
2016-07-14 07:23:25 +08:00
|
|
|
PromiseUtils.promisifyAll(redis.RedisClient.prototype);
|
|
|
|
PromiseUtils.promisifyAll(redis.Multi.prototype);
|
2016-06-23 15:49:22 +08:00
|
|
|
|
2016-07-09 08:13:30 +08:00
|
|
|
|
2016-06-23 15:49:22 +08:00
|
|
|
class PubsubConnector {
|
|
|
|
constructor() {
|
|
|
|
this._broadcastClient = null;
|
|
|
|
this._listenClient = null;
|
|
|
|
this._listenClientSubs = {};
|
|
|
|
}
|
|
|
|
|
|
|
|
buildClient() {
|
|
|
|
const client = redis.createClient(process.env.REDIS_URL || null);
|
2016-07-09 08:13:30 +08:00
|
|
|
client.on("error", log.error);
|
2016-06-23 15:49:22 +08:00
|
|
|
return client;
|
|
|
|
}
|
|
|
|
|
|
|
|
broadcastClient() {
|
|
|
|
if (!this._broadcastClient) {
|
|
|
|
this._broadcastClient = this.buildClient();
|
|
|
|
}
|
|
|
|
return this._broadcastClient;
|
|
|
|
}
|
|
|
|
|
2016-06-29 09:01:43 +08:00
|
|
|
queueProcessMessage({messageId, accountId}) {
|
|
|
|
if (!messageId) {
|
|
|
|
throw new Error("queueProcessMessage: The message body processor expects a messageId")
|
|
|
|
}
|
|
|
|
if (!accountId) {
|
|
|
|
throw new Error("queueProcessMessage: The message body processor expects a accountId")
|
|
|
|
}
|
|
|
|
this.broadcastClient().lpush(`message-processor-queue`, JSON.stringify({messageId, accountId}));
|
|
|
|
}
|
|
|
|
|
2016-06-23 15:49:22 +08:00
|
|
|
// Shared channel
|
2016-06-28 07:05:31 +08:00
|
|
|
_observableForChannelOnSharedListener(channel) {
|
2016-06-23 15:49:22 +08:00
|
|
|
if (!this._listenClient) {
|
|
|
|
this._listenClient = this.buildClient();
|
|
|
|
this._listenClientSubs = {};
|
|
|
|
}
|
|
|
|
|
|
|
|
return Rx.Observable.create((observer) => {
|
|
|
|
this._listenClient.on("message", (msgChannel, message) => {
|
|
|
|
if (msgChannel !== channel) { return }
|
|
|
|
observer.onNext(message)
|
|
|
|
});
|
|
|
|
|
|
|
|
if (!this._listenClientSubs[channel]) {
|
|
|
|
this._listenClientSubs[channel] = 1;
|
|
|
|
this._listenClient.subscribe(channel);
|
|
|
|
} else {
|
|
|
|
this._listenClientSubs[channel] += 1;
|
|
|
|
}
|
|
|
|
return () => {
|
|
|
|
this._listenClientSubs[channel] -= 1;
|
|
|
|
if (this._listenClientSubs[channel] === 0) {
|
|
|
|
this._listenClient.unsubscribe(channel);
|
|
|
|
}
|
|
|
|
}
|
2016-06-28 07:05:31 +08:00
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2016-07-01 04:25:13 +08:00
|
|
|
notifyAccount(accountId, {type, data}) {
|
2016-07-01 03:33:08 +08:00
|
|
|
this.broadcastClient().publish(`account-${accountId}`, JSON.stringify({type, data}));
|
2016-06-28 07:05:31 +08:00
|
|
|
}
|
|
|
|
|
2016-07-01 04:25:13 +08:00
|
|
|
observeAccount(accountId) {
|
2016-07-01 03:33:08 +08:00
|
|
|
return this._observableForChannelOnSharedListener(`account-${accountId}`);
|
2016-06-28 07:05:31 +08:00
|
|
|
}
|
2016-06-23 15:49:22 +08:00
|
|
|
|
2016-06-29 06:35:35 +08:00
|
|
|
notifyDelta(accountId, data) {
|
2016-07-01 03:33:08 +08:00
|
|
|
this.broadcastClient().publish(`deltas-${accountId}`, JSON.stringify(data))
|
|
|
|
}
|
|
|
|
|
|
|
|
observeAllAccounts() {
|
|
|
|
return Rx.Observable.create((observer) => {
|
|
|
|
const sub = this.buildClient();
|
|
|
|
sub.on("pmessage", (pattern, channel, message) =>
|
|
|
|
observer.onNext(channel.replace('account-', ''), message));
|
|
|
|
sub.psubscribe(`account-*`);
|
|
|
|
return () => {
|
|
|
|
sub.unsubscribe();
|
|
|
|
sub.quit();
|
|
|
|
}
|
|
|
|
})
|
2016-06-23 15:49:22 +08:00
|
|
|
}
|
|
|
|
|
2016-06-29 06:35:35 +08:00
|
|
|
observeDeltas(accountId) {
|
2016-06-23 15:49:22 +08:00
|
|
|
return Rx.Observable.create((observer) => {
|
|
|
|
const sub = this.buildClient();
|
|
|
|
sub.on("message", (channel, message) => observer.onNext(message));
|
2016-07-01 03:33:08 +08:00
|
|
|
sub.subscribe(`deltas-${accountId}`);
|
2016-06-23 15:49:22 +08:00
|
|
|
return () => {
|
|
|
|
sub.unsubscribe();
|
|
|
|
sub.quit();
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
module.exports = new PubsubConnector()
|