From 77122694021c38230fc9c8a62727079422a9f28d Mon Sep 17 00:00:00 2001 From: Ben Gotow Date: Thu, 1 Dec 2016 18:41:40 -0800 Subject: [PATCH] [*] fix(deltas): Cloud-API not filtering deltas at all, refactor a few things MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Don’t need functions in delta.js which must be called to return promsies. Fun of promsies is that you don’t need to care when they’re built to attach a .then. - Make boundary between route handler and delta stream builder more explicit, don’t do query parsing in helpers, always reply from handler. - Remove pushJSON extension to outputStream which never actually received JSON. - Remove `takeUntil` - disposing of the downstream observable should dispose of all the merged/upstream observables - Rename inflate => stringify since the returned value is a string not an object. - Remove support for delta streams with no cursors. Don’t think this was supposed to be a feature. - Add accountId to Transaction models - Make database hooks shared in isomorphic core --- packages/isomorphic-core/index.js | 4 +- .../src/delta-stream-builder.js | 102 +++++++----------- .../src}/hook-increment-version-on-save.js | 0 .../src}/hook-transaction-log.js | 24 +++-- .../isomorphic-core/src/models/transaction.js | 2 +- .../local-sync/src/local-api/routes/delta.js | 42 +++++--- .../src/shared/local-database-connector.js | 12 ++- 7 files changed, 97 insertions(+), 89 deletions(-) rename packages/{local-sync/src/shared => isomorphic-core/src}/hook-increment-version-on-save.js (100%) rename packages/{local-sync/src/shared => isomorphic-core/src}/hook-transaction-log.js (62%) diff --git a/packages/isomorphic-core/index.js b/packages/isomorphic-core/index.js index 2a7e6407b..67721a8b6 100644 --- a/packages/isomorphic-core/index.js +++ b/packages/isomorphic-core/index.js @@ -10,5 +10,7 @@ module.exports = { PromiseUtils: require('./src/promise-utils'), DatabaseTypes: require('./src/database-types'), loadModels: require('./src/load-models'), - deltaStreamBuilder: require('./src/delta-stream-builder'), + DeltaStreamBuilder: require('./src/delta-stream-builder'), + HookTransactionLog: require('./src/hook-transaction-log'), + HookIncrementVersionOnSave: require('./src/hook-increment-version-on-save'), } diff --git a/packages/isomorphic-core/src/delta-stream-builder.js b/packages/isomorphic-core/src/delta-stream-builder.js index 068397e23..edbd5dce0 100644 --- a/packages/isomorphic-core/src/delta-stream-builder.js +++ b/packages/isomorphic-core/src/delta-stream-builder.js @@ -2,83 +2,63 @@ const _ = require('underscore'); const Rx = require('rx') const stream = require('stream'); -function keepAlive(request) { - const until = Rx.Observable.fromCallback(request.on)("disconnect") - return Rx.Observable.interval(1000).map(() => "\n").takeUntil(until) -} +function stringifyTransactions(db, transactions = []) { + const transactionJSONs = transactions.map((t) => t.toJSON()) + transactionJSONs.forEach((t) => { t.cursor = t.id }); -function inflateTransactions(db, transactionModels = []) { - let models = transactionModels; - if (!(_.isArray(models))) { models = [transactionModels] } - const transactions = models.map((mod) => mod.toJSON()) - transactions.forEach((t) => { t.cursor = t.id }); - const byModel = _.groupBy(transactions, "object"); - const byObjectIds = _.groupBy(transactions, "objectId"); + const byModel = _.groupBy(transactionJSONs, "object"); + const byObjectIds = _.groupBy(transactionJSONs, "objectId"); - return Promise.all(Object.keys(byModel).map((object) => { - const ids = _.pluck(byModel[object], "objectId"); - const modelConstructorName = object.charAt(0).toUpperCase() + object.slice(1); + return Promise.all(Object.keys(byModel).map((modelName) => { + const modelIds = byModel[modelName].map(t => t.objectId); + const modelConstructorName = modelName.charAt(0).toUpperCase() + modelName.slice(1); const ModelKlass = db[modelConstructorName] + let includes = []; if (ModelKlass.requiredAssociationsForJSON) { includes = ModelKlass.requiredAssociationsForJSON(db) } - return ModelKlass.findAll({where: {id: ids}, include: includes}) - .then((objs = []) => { - for (const model of objs) { - const tsForId = byObjectIds[model.id]; - if (!tsForId || tsForId.length === 0) { continue; } - for (const t of tsForId) { t.attributes = model.toJSON(); } + return ModelKlass.findAll({ + where: {id: modelIds}, + include: includes, + }).then((models) => { + for (const model of models) { + const transactionsForModel = byObjectIds[model.id]; + for (const t of transactionsForModel) { + t.attributes = model.toJSON(); + } } - }) - })).then(() => `${transactions.map(JSON.stringify).join("\n")}\n`) -} - -function createOutputStream() { - const outputStream = stream.Readable(); - outputStream._read = () => { return }; - outputStream.pushJSON = (msg) => { - const jsonMsg = typeof msg === 'string' ? msg : JSON.stringify(msg); - outputStream.push(jsonMsg); - } - return outputStream -} - -function initialTransactions(db, request) { - const cursor = (request.query || {}).cursor; - const where = cursor ? {id: {$gt: cursor}} : {createdAt: {$gte: new Date()}} - return db.Transaction - .streamAll({where}) - .flatMap((objs) => inflateTransactions(db, objs)) -} - -function inflatedIncomingTransaction(db, request, transactionSource) { - return transactionSource.flatMap((t) => inflateTransactions(db, [t])) + }); + })).then(() => { + return `${transactionJSONs.map(JSON.stringify).join("\n")}\n`; + }); } module.exports = { - buildStream(request, dbSource, transactionSource) { - const outputStream = createOutputStream(); + buildStream(request, {databasePromise, cursor, accountId, deltasSource}) { + return databasePromise.then((db) => { + const initialSource = db.Transaction.streamAll({where: { id: {$gt: cursor}, accountId }}); - dbSource().then((db) => { const source = Rx.Observable.merge( - inflatedIncomingTransaction(db, request, transactionSource(db, request)), - initialTransactions(db, request), - keepAlive(request) - ).subscribe(outputStream.pushJSON) + initialSource.flatMap((t) => stringifyTransactions(db, t)), + deltasSource.flatMap((t) => stringifyTransactions(db, [t])), + Rx.Observable.interval(1000).map(() => "\n") + ) - request.on("disconnect", source.dispose.bind(source)); + const outputStream = stream.Readable(); + outputStream._read = () => { return }; + source.subscribe((str) => outputStream.push(str)) + request.on("disconnect", () => source.dispose()); + + return outputStream; }); - - return outputStream }, - lastTransactionReply(dbSource, reply) { - dbSource().then((db) => { - db.Transaction.findOne({order: [['id', 'DESC']]}) - .then((t) => { - reply({cursor: (t || {}).id}) - }) - }) + buildCursor({databasePromise}) { + return databasePromise.then(({Transaction}) => { + return Transaction.findOne({order: [['id', 'DESC']]}).then((t) => { + return (t || {}).id; + }); + }); }, } diff --git a/packages/local-sync/src/shared/hook-increment-version-on-save.js b/packages/isomorphic-core/src/hook-increment-version-on-save.js similarity index 100% rename from packages/local-sync/src/shared/hook-increment-version-on-save.js rename to packages/isomorphic-core/src/hook-increment-version-on-save.js diff --git a/packages/local-sync/src/shared/hook-transaction-log.js b/packages/isomorphic-core/src/hook-transaction-log.js similarity index 62% rename from packages/local-sync/src/shared/hook-transaction-log.js rename to packages/isomorphic-core/src/hook-transaction-log.js index 903837a7c..d3b1feac6 100644 --- a/packages/local-sync/src/shared/hook-transaction-log.js +++ b/packages/isomorphic-core/src/hook-transaction-log.js @@ -1,7 +1,6 @@ const _ = require('underscore') -const TransactionConnector = require('./transaction-connector') -module.exports = (db, sequelize) => { +module.exports = (db, sequelize, {only, onCreatedTransaction} = {}) => { if (!db.Transaction) { throw new Error("Cannot enable transaction logging, there is no Transaction model class in this database.") } @@ -15,20 +14,33 @@ module.exports = (db, sequelize) => { const transactionLogger = (event) => { return ({dataValues, _changed, $modelOptions}) => { + let name = $modelOptions.name.singular; + if (name === 'metadatum') { + name = 'metadata'; + } + + if (only && !only.includes(name)) { + return; + } + const changedFields = Object.keys(_changed) if ((isTransaction($modelOptions) || allIgnoredFields(changedFields))) { return; } + const accountId = db.accountId ? db.accountId : dataValues.accountId; + if (!accountId) { + throw new Error("Assertion failure: Cannot create a transaction - could not resolve accountId.") + } + const transactionData = Object.assign({event}, { - object: $modelOptions.name.singular, + object: name, objectId: dataValues.id, + accountId: accountId, changedFields: changedFields, }); - db.Transaction.create(transactionData).then((transaction) => { - TransactionConnector.notifyDelta(db.accountId, transaction); - }) + db.Transaction.create(transactionData).then(onCreatedTransaction) } } diff --git a/packages/isomorphic-core/src/models/transaction.js b/packages/isomorphic-core/src/models/transaction.js index 4a0ca1592..25b4a830f 100644 --- a/packages/isomorphic-core/src/models/transaction.js +++ b/packages/isomorphic-core/src/models/transaction.js @@ -5,6 +5,7 @@ module.exports = (sequelize, Sequelize) => { event: Sequelize.STRING, object: Sequelize.STRING, objectId: Sequelize.STRING, + accountId: Sequelize.STRING, changedFields: JSONARRAYType('changedFields'), }, { instanceMethods: { @@ -14,7 +15,6 @@ module.exports = (sequelize, Sequelize) => { event: this.event, object: this.object, objectId: `${this.objectId}`, - changedFields: this.changedFields, } }, }, diff --git a/packages/local-sync/src/local-api/routes/delta.js b/packages/local-sync/src/local-api/routes/delta.js index 9c291bbc5..2fdbfd1a4 100644 --- a/packages/local-sync/src/local-api/routes/delta.js +++ b/packages/local-sync/src/local-api/routes/delta.js @@ -1,31 +1,41 @@ +const Joi = require('joi'); const TransactionConnector = require('../../shared/transaction-connector') -const {deltaStreamBuilder} = require('isomorphic-core') - -function transactionSource(db, request) { - const accountId = request.auth.credentials.id; - return TransactionConnector.getObservableForAccountId(accountId) -} - -function dbSource(request) { - return request.getAccountDatabase.bind(request) -} +const {DeltaStreamBuilder} = require('isomorphic-core') module.exports = (server) => { server.route({ method: 'GET', path: '/delta/streaming', + config: { + validate: { + query: { + cursor: Joi.string().required(), + }, + }, + }, handler: (request, reply) => { - const outputStream = deltaStreamBuilder.buildStream(request, - dbSource(request), transactionSource) - reply(outputStream) + const account = request.auth.credentials; + + DeltaStreamBuilder.buildStream(request, { + cursor: request.query.cursor, + accountId: account.id, + databasePromise: request.getAccountDatabase(), + deltasSource: TransactionConnector.getObservableForAccountId(account.id), + }).then((stream) => { + reply(stream) + }); }, }); server.route({ method: 'POST', path: '/delta/latest_cursor', - handler: (request, reply) => - deltaStreamBuilder.lastTransactionReply(dbSource(request), reply) - , + handler: (request, reply) => { + DeltaStreamBuilder.buildCursor({ + databasePromise: request.getAccountDatabase(), + }).then((cursor) => { + reply({cursor}) + }); + }, }); }; diff --git a/packages/local-sync/src/shared/local-database-connector.js b/packages/local-sync/src/shared/local-database-connector.js index b9988f9d9..e6a8494f1 100644 --- a/packages/local-sync/src/shared/local-database-connector.js +++ b/packages/local-sync/src/shared/local-database-connector.js @@ -1,9 +1,8 @@ const Sequelize = require('sequelize'); const fs = require('fs'); const path = require('path'); -const {loadModels, PromiseUtils} = require('isomorphic-core'); -const HookTransactionLog = require('./hook-transaction-log'); -const HookIncrementVersionOnSave = require('./hook-increment-version-on-save'); +const {loadModels, PromiseUtils, HookIncrementVersionOnSave, HookTransactionLog} = require('isomorphic-core'); +const TransactionConnector = require('./transaction-connector') require('./database-extensions'); // Extends Sequelize on require @@ -34,7 +33,12 @@ class LocalDatabaseConnector { modelDirs: [path.resolve(__dirname, '..', 'models')], }) - HookTransactionLog(db, newSequelize); + HookTransactionLog(db, newSequelize, { + onCreatedTransaction: (transaction) => { + TransactionConnector.notifyDelta(db.accountId, transaction); + }, + }); + HookIncrementVersionOnSave(db, newSequelize); db.sequelize = newSequelize;