[*] fix(deltas): Cloud-API not filtering deltas at all, refactor a few things

- Don’t need functions in delta.js which must be called to return promsies. Fun of promsies is that you don’t need to care when they’re built to attach a .then.

- Make boundary between route handler and delta stream builder more explicit, don’t do query parsing in helpers, always reply from handler.

- Remove pushJSON extension to outputStream which never actually received JSON.

- Remove `takeUntil` - disposing of the downstream observable should dispose of all the merged/upstream observables

- Rename inflate => stringify since the returned value is a string not an object.

- Remove support for delta streams with no cursors. Don’t think this was supposed to be a feature.

- Add accountId to Transaction models

- Make database hooks shared in isomorphic core
This commit is contained in:
Ben Gotow 2016-12-01 18:41:40 -08:00
parent 15cfe2cec0
commit 7712269402
7 changed files with 97 additions and 89 deletions

View file

@ -10,5 +10,7 @@ module.exports = {
PromiseUtils: require('./src/promise-utils'),
DatabaseTypes: require('./src/database-types'),
loadModels: require('./src/load-models'),
deltaStreamBuilder: require('./src/delta-stream-builder'),
DeltaStreamBuilder: require('./src/delta-stream-builder'),
HookTransactionLog: require('./src/hook-transaction-log'),
HookIncrementVersionOnSave: require('./src/hook-increment-version-on-save'),
}

View file

@ -2,83 +2,63 @@ const _ = require('underscore');
const Rx = require('rx')
const stream = require('stream');
function keepAlive(request) {
const until = Rx.Observable.fromCallback(request.on)("disconnect")
return Rx.Observable.interval(1000).map(() => "\n").takeUntil(until)
}
function stringifyTransactions(db, transactions = []) {
const transactionJSONs = transactions.map((t) => t.toJSON())
transactionJSONs.forEach((t) => { t.cursor = t.id });
function inflateTransactions(db, transactionModels = []) {
let models = transactionModels;
if (!(_.isArray(models))) { models = [transactionModels] }
const transactions = models.map((mod) => mod.toJSON())
transactions.forEach((t) => { t.cursor = t.id });
const byModel = _.groupBy(transactions, "object");
const byObjectIds = _.groupBy(transactions, "objectId");
const byModel = _.groupBy(transactionJSONs, "object");
const byObjectIds = _.groupBy(transactionJSONs, "objectId");
return Promise.all(Object.keys(byModel).map((object) => {
const ids = _.pluck(byModel[object], "objectId");
const modelConstructorName = object.charAt(0).toUpperCase() + object.slice(1);
return Promise.all(Object.keys(byModel).map((modelName) => {
const modelIds = byModel[modelName].map(t => t.objectId);
const modelConstructorName = modelName.charAt(0).toUpperCase() + modelName.slice(1);
const ModelKlass = db[modelConstructorName]
let includes = [];
if (ModelKlass.requiredAssociationsForJSON) {
includes = ModelKlass.requiredAssociationsForJSON(db)
}
return ModelKlass.findAll({where: {id: ids}, include: includes})
.then((objs = []) => {
for (const model of objs) {
const tsForId = byObjectIds[model.id];
if (!tsForId || tsForId.length === 0) { continue; }
for (const t of tsForId) { t.attributes = model.toJSON(); }
return ModelKlass.findAll({
where: {id: modelIds},
include: includes,
}).then((models) => {
for (const model of models) {
const transactionsForModel = byObjectIds[model.id];
for (const t of transactionsForModel) {
t.attributes = model.toJSON();
}
}
})
})).then(() => `${transactions.map(JSON.stringify).join("\n")}\n`)
}
function createOutputStream() {
const outputStream = stream.Readable();
outputStream._read = () => { return };
outputStream.pushJSON = (msg) => {
const jsonMsg = typeof msg === 'string' ? msg : JSON.stringify(msg);
outputStream.push(jsonMsg);
}
return outputStream
}
function initialTransactions(db, request) {
const cursor = (request.query || {}).cursor;
const where = cursor ? {id: {$gt: cursor}} : {createdAt: {$gte: new Date()}}
return db.Transaction
.streamAll({where})
.flatMap((objs) => inflateTransactions(db, objs))
}
function inflatedIncomingTransaction(db, request, transactionSource) {
return transactionSource.flatMap((t) => inflateTransactions(db, [t]))
});
})).then(() => {
return `${transactionJSONs.map(JSON.stringify).join("\n")}\n`;
});
}
module.exports = {
buildStream(request, dbSource, transactionSource) {
const outputStream = createOutputStream();
buildStream(request, {databasePromise, cursor, accountId, deltasSource}) {
return databasePromise.then((db) => {
const initialSource = db.Transaction.streamAll({where: { id: {$gt: cursor}, accountId }});
dbSource().then((db) => {
const source = Rx.Observable.merge(
inflatedIncomingTransaction(db, request, transactionSource(db, request)),
initialTransactions(db, request),
keepAlive(request)
).subscribe(outputStream.pushJSON)
initialSource.flatMap((t) => stringifyTransactions(db, t)),
deltasSource.flatMap((t) => stringifyTransactions(db, [t])),
Rx.Observable.interval(1000).map(() => "\n")
)
request.on("disconnect", source.dispose.bind(source));
const outputStream = stream.Readable();
outputStream._read = () => { return };
source.subscribe((str) => outputStream.push(str))
request.on("disconnect", () => source.dispose());
return outputStream;
});
return outputStream
},
lastTransactionReply(dbSource, reply) {
dbSource().then((db) => {
db.Transaction.findOne({order: [['id', 'DESC']]})
.then((t) => {
reply({cursor: (t || {}).id})
})
})
buildCursor({databasePromise}) {
return databasePromise.then(({Transaction}) => {
return Transaction.findOne({order: [['id', 'DESC']]}).then((t) => {
return (t || {}).id;
});
});
},
}

View file

@ -1,7 +1,6 @@
const _ = require('underscore')
const TransactionConnector = require('./transaction-connector')
module.exports = (db, sequelize) => {
module.exports = (db, sequelize, {only, onCreatedTransaction} = {}) => {
if (!db.Transaction) {
throw new Error("Cannot enable transaction logging, there is no Transaction model class in this database.")
}
@ -15,20 +14,33 @@ module.exports = (db, sequelize) => {
const transactionLogger = (event) => {
return ({dataValues, _changed, $modelOptions}) => {
let name = $modelOptions.name.singular;
if (name === 'metadatum') {
name = 'metadata';
}
if (only && !only.includes(name)) {
return;
}
const changedFields = Object.keys(_changed)
if ((isTransaction($modelOptions) || allIgnoredFields(changedFields))) {
return;
}
const accountId = db.accountId ? db.accountId : dataValues.accountId;
if (!accountId) {
throw new Error("Assertion failure: Cannot create a transaction - could not resolve accountId.")
}
const transactionData = Object.assign({event}, {
object: $modelOptions.name.singular,
object: name,
objectId: dataValues.id,
accountId: accountId,
changedFields: changedFields,
});
db.Transaction.create(transactionData).then((transaction) => {
TransactionConnector.notifyDelta(db.accountId, transaction);
})
db.Transaction.create(transactionData).then(onCreatedTransaction)
}
}

View file

@ -5,6 +5,7 @@ module.exports = (sequelize, Sequelize) => {
event: Sequelize.STRING,
object: Sequelize.STRING,
objectId: Sequelize.STRING,
accountId: Sequelize.STRING,
changedFields: JSONARRAYType('changedFields'),
}, {
instanceMethods: {
@ -14,7 +15,6 @@ module.exports = (sequelize, Sequelize) => {
event: this.event,
object: this.object,
objectId: `${this.objectId}`,
changedFields: this.changedFields,
}
},
},

View file

@ -1,31 +1,41 @@
const Joi = require('joi');
const TransactionConnector = require('../../shared/transaction-connector')
const {deltaStreamBuilder} = require('isomorphic-core')
function transactionSource(db, request) {
const accountId = request.auth.credentials.id;
return TransactionConnector.getObservableForAccountId(accountId)
}
function dbSource(request) {
return request.getAccountDatabase.bind(request)
}
const {DeltaStreamBuilder} = require('isomorphic-core')
module.exports = (server) => {
server.route({
method: 'GET',
path: '/delta/streaming',
config: {
validate: {
query: {
cursor: Joi.string().required(),
},
},
},
handler: (request, reply) => {
const outputStream = deltaStreamBuilder.buildStream(request,
dbSource(request), transactionSource)
reply(outputStream)
const account = request.auth.credentials;
DeltaStreamBuilder.buildStream(request, {
cursor: request.query.cursor,
accountId: account.id,
databasePromise: request.getAccountDatabase(),
deltasSource: TransactionConnector.getObservableForAccountId(account.id),
}).then((stream) => {
reply(stream)
});
},
});
server.route({
method: 'POST',
path: '/delta/latest_cursor',
handler: (request, reply) =>
deltaStreamBuilder.lastTransactionReply(dbSource(request), reply)
,
handler: (request, reply) => {
DeltaStreamBuilder.buildCursor({
databasePromise: request.getAccountDatabase(),
}).then((cursor) => {
reply({cursor})
});
},
});
};

View file

@ -1,9 +1,8 @@
const Sequelize = require('sequelize');
const fs = require('fs');
const path = require('path');
const {loadModels, PromiseUtils} = require('isomorphic-core');
const HookTransactionLog = require('./hook-transaction-log');
const HookIncrementVersionOnSave = require('./hook-increment-version-on-save');
const {loadModels, PromiseUtils, HookIncrementVersionOnSave, HookTransactionLog} = require('isomorphic-core');
const TransactionConnector = require('./transaction-connector')
require('./database-extensions'); // Extends Sequelize on require
@ -34,7 +33,12 @@ class LocalDatabaseConnector {
modelDirs: [path.resolve(__dirname, '..', 'models')],
})
HookTransactionLog(db, newSequelize);
HookTransactionLog(db, newSequelize, {
onCreatedTransaction: (transaction) => {
TransactionConnector.notifyDelta(db.accountId, transaction);
},
});
HookIncrementVersionOnSave(db, newSequelize);
db.sequelize = newSequelize;