diff --git a/packages/nylas-api/app.js b/packages/nylas-api/app.js index a07e10bf9..d752b9bb9 100644 --- a/packages/nylas-api/app.js +++ b/packages/nylas-api/app.js @@ -63,8 +63,6 @@ server.register(plugins, (err) => { server.auth.strategy('api-consumer', 'basic', { validateFunc: validate }); server.auth.default('api-consumer'); - DatabaseConnectionFactory.setup() - server.start((startErr) => { if (startErr) { throw startErr; } console.log('Server running at:', server.info.uri); diff --git a/packages/nylas-api/routes/auth.js b/packages/nylas-api/routes/auth.js index 7f9adaa57..7e76ff6de 100644 --- a/packages/nylas-api/routes/auth.js +++ b/packages/nylas-api/routes/auth.js @@ -89,10 +89,10 @@ module.exports = (server) => { account.save().then((saved) => AccountToken.create({ AccountId: saved.id, - }).then((token) => { - const response = Serialization.jsonStringify(saved); - response.token = token; - reply(response); + }).then((accountToken) => { + const response = saved.toJSON(); + response.token = accountToken.value; + reply(Serialization.jsonStringify(response)); }) ); }) diff --git a/packages/nylas-api/routes/delta.js b/packages/nylas-api/routes/delta.js index c98471e19..93497ba6e 100644 --- a/packages/nylas-api/routes/delta.js +++ b/packages/nylas-api/routes/delta.js @@ -1,6 +1,6 @@ const Rx = require('rx') const _ = require('underscore'); -const {DeltaStreamQueue} = require(`nylas-core`); +const {AccountPubsub} = require(`nylas-core`); function keepAlive(request) { const until = Rx.Observable.fromCallback(request.on)("disconnect") @@ -52,7 +52,7 @@ module.exports = (server) => { request.getAccountDatabase().then((db) => { const source = Rx.Observable.merge( - DeltaStreamQueue.fromAccountId(db.accountId), + AccountPubsub.observableForAccountId(db.accountId), initialTransactions(db, request), keepAlive(request) ).subscribe(outputStream.pushJSON) diff --git a/packages/nylas-core/account-pubsub.js b/packages/nylas-core/account-pubsub.js new file mode 100644 index 000000000..57642565b --- /dev/null +++ b/packages/nylas-core/account-pubsub.js @@ -0,0 +1,47 @@ +const Rx = require('rx') +const bluebird = require('bluebird') +const redis = require("redis"); +bluebird.promisifyAll(redis.RedisClient.prototype); +bluebird.promisifyAll(redis.Multi.prototype); + +class AccountPubsub { + constructor() { + this._broadcastClient = null; + } + + buildClient() { + const client = redis.createClient(process.env.REDIS_URL || null); + client.on("error", console.error); + return client; + } + + keyForAccountId(accountId) { + return `delta-${accountId}`; + } + + notify(accountId, data) { + if (!this._broadcastClient) { + this._broadcastClient = this.buildClient(); + } + const key = this.keyForAccountId(accountId); + this._broadcastClient.publish(key, JSON.stringify(data)) + } + + observableForAccountId(accountId) { + return Rx.Observable.create((observer) => { + const sub = this.buildClient(); + const key = this.keyForAccountId(accountId); + sub.on("message", (channel, message) => { + if (channel !== key) { return } + observer.onNext(message) + }); + sub.subscribe(key); + return () => { + sub.unsubscribe() + sub.quit() + } + }) + } +} + +module.exports = new AccountPubsub() diff --git a/packages/nylas-core/database-connection-factory.js b/packages/nylas-core/database-connection-factory.js index 18ed00156..6e0e9f3bd 100644 --- a/packages/nylas-core/database-connection-factory.js +++ b/packages/nylas-core/database-connection-factory.js @@ -2,7 +2,6 @@ const Sequelize = require('sequelize'); const fs = require('fs'); const path = require('path'); const TransactionLog = require('./transaction-log') -const DeltaStreamQueue = require('./delta-stream-queue.js') require('./database-extensions'); // Extends Sequelize on require @@ -16,10 +15,6 @@ class DatabaseConnectionFactory { this._pools = {}; } - setup() { - DeltaStreamQueue.setup() - } - _readModelsInDirectory(sequelize, dirname) { const db = {}; for (const filename of fs.readdirSync(dirname)) { diff --git a/packages/nylas-core/delta-stream-queue.js b/packages/nylas-core/delta-stream-queue.js deleted file mode 100644 index b85102a2f..000000000 --- a/packages/nylas-core/delta-stream-queue.js +++ /dev/null @@ -1,33 +0,0 @@ -const Rx = require('rx') -const bluebird = require('bluebird') -const redis = require("redis"); -bluebird.promisifyAll(redis.RedisClient.prototype); -bluebird.promisifyAll(redis.Multi.prototype); - -class DeltaStreamQueue { - setup() { - this.client = redis.createClient(process.env.REDIS_URL || null); - this.client.on("error", console.error); - } - - key(accountId) { - return `delta-${accountId}` - } - - notify(accountId, data) { - this.client.publish(this.key(accountId), JSON.stringify(data)) - } - - fromAccountId(accountId) { - return Rx.Observable.create((observer) => { - this.client.on("message", (channel, message) => { - if (channel !== this.key(accountId)) { return } - observer.onNext(message) - }); - this.client.subscribe(this.key(accountId)); - return () => { this.client.unsubscribe() } - }) - } -} - -module.exports = new DeltaStreamQueue() diff --git a/packages/nylas-core/index.js b/packages/nylas-core/index.js index 26077677d..3accf1084 100644 --- a/packages/nylas-core/index.js +++ b/packages/nylas-core/index.js @@ -1,6 +1,6 @@ module.exports = { DatabaseConnectionFactory: require('./database-connection-factory'), - DeltaStreamQueue: require('./delta-stream-queue'), + AccountPubsub: require('./account-pubsub'), IMAPConnection: require('./imap-connection'), Config: require(`./config/${process.env.ENV || 'development'}`), } diff --git a/packages/nylas-core/transaction-log.js b/packages/nylas-core/transaction-log.js index a9a9bd401..77a3a9474 100644 --- a/packages/nylas-core/transaction-log.js +++ b/packages/nylas-core/transaction-log.js @@ -1,4 +1,4 @@ -const DeltaStreamQueue = require('./delta-stream-queue') +const AccountPubsub = require('./account-pubsub') class TransactionLog { constructor(db) { @@ -24,8 +24,8 @@ class TransactionLog { this.parseHookData(sequelizeHookData) ); this.db.Transaction.create(transactionData); - transactionData.object = sequelizeHookData.dataValues - DeltaStreamQueue.notify(this.db.accountId, transactionData) + transactionData.object = sequelizeHookData.dataValues; + AccountPubsub.notify(this.db.accountId, transactionData); } } diff --git a/packages/nylas-sync/app.js b/packages/nylas-sync/app.js index 522fe13d6..65adcd614 100644 --- a/packages/nylas-sync/app.js +++ b/packages/nylas-sync/app.js @@ -19,7 +19,6 @@ const start = () => { }); } -DatabaseConnectionFactory.setup() start(); global.workerPool = workerPool;