Add a delta streaming endpoint

This commit is contained in:
Evan Morikawa 2016-06-21 15:57:50 -07:00
parent 29a6448922
commit 2d4b17ee52
3 changed files with 46 additions and 6 deletions

View file

@ -64,6 +64,8 @@ server.register(plugins, (err) => {
server.auth.strategy('api-consumer', 'basic', { validateFunc: validate }); server.auth.strategy('api-consumer', 'basic', { validateFunc: validate });
server.auth.default('api-consumer'); server.auth.default('api-consumer');
DatabaseConnectionFactory.setup()
server.start((startErr) => { server.start((startErr) => {
if (startErr) { throw startErr; } if (startErr) { throw startErr; }
console.log('Server running at:', server.info.uri); console.log('Server running at:', server.info.uri);

37
api/routes/delta.js Normal file
View file

@ -0,0 +1,37 @@
const DeltaStreamQueue = require(`${__base}/core/delta-stream-queue`);
module.exports = (server) => {
server.route({
method: 'GET',
path: '/delta/streaming',
config: {
description: 'Returns deltas since timestamp then streams deltas',
notes: 'Returns deltas since timestamp then streams deltas',
tags: ['threads'],
validate: {
params: {
},
},
response: {
schema: null,
},
},
handler: (request, reply) => {
const outputStream = require('stream').Readable();
outputStream._read = () => { return };
const pushMsg = (msg = "\n") => outputStream.push(msg)
request.getAccountDatabase()
.then((db) => {
return db.Transaction.findAll().then((transactions = []) => {
transactions.map(JSON.stringify).forEach(pushMsg);
DeltaStreamQueue.subscribe(db.accountId, pushMsg)
})
}).then(() => {
const keepAlive = setInterval(pushMsg, 1000);
request.on("disconnect", () => { clearTimeout(keepAlive) })
return reply(outputStream)
})
},
});
};

View file

@ -14,15 +14,16 @@ class DeltaStreamQueue {
return `delta-${accountId}` return `delta-${accountId}`
} }
hasSubscribers(accountId) { notify(accountId, data) {
return this.client.existsAsync(this.key(accountId)) this.client.publish(this.key(accountId), JSON.stringify(data))
} }
notify(accountId, data) { subscribe(accountId, callback) {
return this.hasSubscribers(accountId).then((hasSubscribers) => { this.client.on("message", (channel, message) => {
if (!hasSubscribers) return Promise.resolve() if (channel !== this.key(accountId)) { return }
return this.client.rpushAsync(this.key(accountId), JSON.stringify(data)) callback(message)
}) })
this.client.subscribe(this.key(accountId))
} }
} }