2016-06-23 04:35:55 +08:00
|
|
|
const Rx = require('rx')
|
2016-06-21 08:28:26 +08:00
|
|
|
const bluebird = require('bluebird')
|
|
|
|
const redis = require("redis");
|
|
|
|
bluebird.promisifyAll(redis.RedisClient.prototype);
|
|
|
|
bluebird.promisifyAll(redis.Multi.prototype);
|
|
|
|
|
|
|
|
class DeltaStreamQueue {
|
|
|
|
setup() {
|
2016-06-22 08:44:09 +08:00
|
|
|
this.client = redis.createClient(process.env.REDIS_URL || null);
|
2016-06-21 08:28:26 +08:00
|
|
|
this.client.on("error", console.error);
|
|
|
|
}
|
|
|
|
|
|
|
|
key(accountId) {
|
|
|
|
return `delta-${accountId}`
|
|
|
|
}
|
|
|
|
|
2016-06-22 06:57:50 +08:00
|
|
|
notify(accountId, data) {
|
|
|
|
this.client.publish(this.key(accountId), JSON.stringify(data))
|
2016-06-21 08:28:26 +08:00
|
|
|
}
|
|
|
|
|
2016-06-23 04:35:55 +08:00
|
|
|
fromAccountId(accountId) {
|
|
|
|
return Rx.Observable.create((observer) => {
|
|
|
|
this.client.on("message", (channel, message) => {
|
|
|
|
if (channel !== this.key(accountId)) { return }
|
|
|
|
observer.onNext(message)
|
|
|
|
});
|
|
|
|
this.client.subscribe(this.key(accountId));
|
|
|
|
return () => { this.client.unsubscribe() }
|
2016-06-21 08:28:26 +08:00
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
module.exports = new DeltaStreamQueue()
|