Mailspring/packages/nylas-api/routes/delta.js
2016-07-13 12:37:38 -07:00

86 lines
2.8 KiB
JavaScript

const Rx = require('rx')
const _ = require('underscore');
const {PubsubConnector} = require(`nylas-core`);
function keepAlive(request) {
const until = Rx.Observable.fromCallback(request.on)("disconnect")
return Rx.Observable.interval(1000).map(() => "\n").takeUntil(until)
}
function inflateTransactions(db, transactionModels = []) {
const transactions = _.pluck(transactionModels, "dataValues")
transactions.forEach((t) => t.cursor = t.id);
const byModel = _.groupBy(transactions, "object");
const byObjectIds = _.groupBy(transactions, "objectId");
return Promise.all(Object.keys(byModel).map((object) => {
const ids = _.pluck(byModel[object], "objectId");
const modelConstructorName = object.charAt(0).toUpperCase() + object.slice(1);
const ModelKlass = db[modelConstructorName]
let includes = [];
if (ModelKlass.requiredAssociationsForJSON) {
includes = ModelKlass.requiredAssociationsForJSON()
}
return ModelKlass.findAll({where: {id: ids}, include: includes})
.then((models = []) => {
for (const model of models) {
model.dataValues.object = object
const tsForId = byObjectIds[model.id];
if (!tsForId || tsForId.length === 0) { continue; }
for (const t of tsForId) { t.attributes = model.dataValues; }
}
})
})).then(() => transactions.map(JSON.stringify).join("\n"))
}
function createOutputStream() {
const outputStream = require('stream').Readable();
outputStream._read = () => { return };
outputStream.pushJSON = (msg) => {
const jsonMsg = typeof msg === 'string' ? msg : JSON.stringify(msg);
outputStream.push(jsonMsg);
}
return outputStream
}
function lastTransaction(db) {
return db.Transaction.findOne({order: [['id', 'DESC']]})
}
function initialTransactions(db, request) {
let cursor = (request.query || {}).cursor;
const where = cursor ? {id: {$gt: cursor}} : {createdAt: {$gte: new Date()}}
return db.Transaction
.streamAll({where})
.flatMap((objs) => inflateTransactions(db, objs))
}
module.exports = (server) => {
server.route({
method: 'GET',
path: '/delta/streaming',
handler: (request, reply) => {
const outputStream = createOutputStream();
request.getAccountDatabase().then((db) => {
const source = Rx.Observable.merge(
PubsubConnector.observeDeltas(request.auth.credentials.id),
initialTransactions(db, request),
keepAlive(request)
).subscribe(outputStream.pushJSON)
request.on("disconnect", source.dispose.bind(source));
});
reply(outputStream)
},
});
server.route({
method: 'POST',
path: '/delta/latest_cursor',
handler: (request, reply) => request.getAccountDatabase().then((db) =>
lastTransaction(db).then((t) => reply({cursor: t.id}))
),
});
};