From 2d4b17ee52e397b90ff48dfa9511b76b37da7092 Mon Sep 17 00:00:00 2001 From: Evan Morikawa Date: Tue, 21 Jun 2016 15:57:50 -0700 Subject: [PATCH] Add a delta streaming endpoint --- api/app.js | 2 ++ api/routes/delta.js | 37 +++++++++++++++++++++++++++++++++++++ core/delta-stream-queue.js | 13 +++++++------ 3 files changed, 46 insertions(+), 6 deletions(-) create mode 100644 api/routes/delta.js diff --git a/api/app.js b/api/app.js index a763f6a9d..6359393d2 100644 --- a/api/app.js +++ b/api/app.js @@ -64,6 +64,8 @@ server.register(plugins, (err) => { server.auth.strategy('api-consumer', 'basic', { validateFunc: validate }); server.auth.default('api-consumer'); + DatabaseConnectionFactory.setup() + server.start((startErr) => { if (startErr) { throw startErr; } console.log('Server running at:', server.info.uri); diff --git a/api/routes/delta.js b/api/routes/delta.js new file mode 100644 index 000000000..f06f400bf --- /dev/null +++ b/api/routes/delta.js @@ -0,0 +1,37 @@ +const DeltaStreamQueue = require(`${__base}/core/delta-stream-queue`); + +module.exports = (server) => { + server.route({ + method: 'GET', + path: '/delta/streaming', + config: { + description: 'Returns deltas since timestamp then streams deltas', + notes: 'Returns deltas since timestamp then streams deltas', + tags: ['threads'], + validate: { + params: { + }, + }, + response: { + schema: null, + }, + }, + handler: (request, reply) => { + const outputStream = require('stream').Readable(); + outputStream._read = () => { return }; + const pushMsg = (msg = "\n") => outputStream.push(msg) + + request.getAccountDatabase() + .then((db) => { + return db.Transaction.findAll().then((transactions = []) => { + transactions.map(JSON.stringify).forEach(pushMsg); + DeltaStreamQueue.subscribe(db.accountId, pushMsg) + }) + }).then(() => { + const keepAlive = setInterval(pushMsg, 1000); + request.on("disconnect", () => { clearTimeout(keepAlive) }) + return reply(outputStream) + }) + }, + }); +}; diff --git a/core/delta-stream-queue.js b/core/delta-stream-queue.js index 7da47b556..c1a0f87d9 100644 --- a/core/delta-stream-queue.js +++ b/core/delta-stream-queue.js @@ -14,15 +14,16 @@ class DeltaStreamQueue { return `delta-${accountId}` } - hasSubscribers(accountId) { - return this.client.existsAsync(this.key(accountId)) + notify(accountId, data) { + this.client.publish(this.key(accountId), JSON.stringify(data)) } - notify(accountId, data) { - return this.hasSubscribers(accountId).then((hasSubscribers) => { - if (!hasSubscribers) return Promise.resolve() - return this.client.rpushAsync(this.key(accountId), JSON.stringify(data)) + subscribe(accountId, callback) { + this.client.on("message", (channel, message) => { + if (channel !== this.key(accountId)) { return } + callback(message) }) + this.client.subscribe(this.key(accountId)) } }