From 14cffcf8a5e3422ae18ffda10c7adee550a83602 Mon Sep 17 00:00:00 2001 From: Evan Morikawa Date: Tue, 28 Jun 2016 15:35:35 -0700 Subject: [PATCH] Change to notify / observe for syncback requests --- packages/nylas-api/route-helpers.js | 16 ++++++++ packages/nylas-api/routes/delta.js | 2 +- packages/nylas-api/routes/threads.js | 17 ++++---- packages/nylas-core/hook-account-crud.js | 11 +++++- packages/nylas-core/hook-transaction-log.js | 2 +- packages/nylas-core/index.js | 1 + packages/nylas-core/message-types.js | 4 ++ packages/nylas-core/pubsub-connector.js | 43 ++++----------------- packages/nylas-core/scheduler-utils.js | 6 ++- packages/nylas-sync/sync-worker.js | 18 +++++++-- 10 files changed, 67 insertions(+), 53 deletions(-) create mode 100644 packages/nylas-api/route-helpers.js create mode 100644 packages/nylas-core/message-types.js diff --git a/packages/nylas-api/route-helpers.js b/packages/nylas-api/route-helpers.js new file mode 100644 index 000000000..ceb443d9e --- /dev/null +++ b/packages/nylas-api/route-helpers.js @@ -0,0 +1,16 @@ +const {PubsubConnector, MessageTypes} = require('nylas-core') + +module.exports = { + createSyncbackRequest: function createSyncbackRequest(request, reply, syncRequestArgs) { + request.getAccountDatabase().then((db) => { + db.SyncbackRequest.create(syncRequestArgs).then((syncbackRequest) => { + PubsubConnector.notify({ + accountId: db.accountId, + type: MessageTypes.SYNCBACK_REQUESTED, + data: syncbackRequest.id, + }); + reply(Serialization.jsonStringify(syncbackRequest)) + }) + }) + } +} diff --git a/packages/nylas-api/routes/delta.js b/packages/nylas-api/routes/delta.js index 4c2b28843..d3648359f 100644 --- a/packages/nylas-api/routes/delta.js +++ b/packages/nylas-api/routes/delta.js @@ -53,7 +53,7 @@ module.exports = (server) => { request.getAccountDatabase().then((db) => { const source = Rx.Observable.merge( - PubsubConnector.observableForAccountDeltas(account.id), + PubsubConnector.observeDeltas(account.id), initialTransactions(db, request), keepAlive(request) ).subscribe(outputStream.pushJSON) diff --git a/packages/nylas-api/routes/threads.js b/packages/nylas-api/routes/threads.js index 8cd77e62a..27577ff3c 100644 --- a/packages/nylas-api/routes/threads.js +++ b/packages/nylas-api/routes/threads.js @@ -1,5 +1,6 @@ const Joi = require('joi'); const Serialization = require('../serialization'); +const {createSyncbackRequest} = require('../route-helpers') module.exports = (server) => { server.route({ @@ -50,16 +51,12 @@ module.exports = (server) => { }, }, handler: (request, reply) => { - request.getAccountDatabase().then((db) => { - db.SyncbackRequest.create({ - type: "MoveToFolder", - props: { - folderId: request.params.folder_id, - threadId: request.params.id, - }, - }).then((syncbackRequest) => { - reply(Serialization.jsonStringify(syncbackRequest)) - }) + createSyncbackRequest(request, reply, { + type: "MoveToFolder", + props: { + folderId: request.params.folder_id, + threadId: requres.params.id, + } }) }, }); diff --git a/packages/nylas-core/hook-account-crud.js b/packages/nylas-core/hook-account-crud.js index 4e4737b28..0ded2094e 100644 --- a/packages/nylas-core/hook-account-crud.js +++ b/packages/nylas-core/hook-account-crud.js @@ -1,15 +1,22 @@ const PubsubConnector = require('./pubsub-connector') +const MessageTypes = require('./message-types') module.exports = (db, sequelize) => { sequelize.addHook("afterCreate", ({dataValues, $modelOptions}) => { if ($modelOptions.name.singular === 'Account') { PubsubConnector.broadcastClient().lpushAsync('accounts:unclaimed', dataValues.id); - PubsubConnector.notifyAccountChange(dataValues.id); + PubsubConnector.notify({ + accountId: dataValues.id, + type: MessageTypes.ACCOUNT_UPDATED + }); } }) sequelize.addHook("afterUpdate", ({dataValues, $modelOptions}) => { if ($modelOptions.name.singular === 'Account') { - PubsubConnector.notifyAccountChange(dataValues.id); + PubsubConnector.notify({ + accountId: dataValues.id, + type: MessageTypes.ACCOUNT_UPDATED + }); } }) // TODO delete account from redis diff --git a/packages/nylas-core/hook-transaction-log.js b/packages/nylas-core/hook-transaction-log.js index c2d77818d..65b3e64d2 100644 --- a/packages/nylas-core/hook-transaction-log.js +++ b/packages/nylas-core/hook-transaction-log.js @@ -22,7 +22,7 @@ module.exports = (db, sequelize) => { db.Transaction.create(transactionData); transactionData.object = sequelizeHookData.dataValues; - PubsubConnector.notifyAccountDeltas(db.accountId, transactionData); + PubsubConnector.notifyDelta(db.accountId, transactionData); } } diff --git a/packages/nylas-core/index.js b/packages/nylas-core/index.js index e984fcbc1..b8bb708f2 100644 --- a/packages/nylas-core/index.js +++ b/packages/nylas-core/index.js @@ -11,5 +11,6 @@ module.exports = { IMAPConnection: require('./imap-connection'), SyncPolicy: require('./sync-policy'), SchedulerUtils: require('./scheduler-utils'), + MessageTypes: require('./message-types'), NylasError, } diff --git a/packages/nylas-core/message-types.js b/packages/nylas-core/message-types.js new file mode 100644 index 000000000..995445648 --- /dev/null +++ b/packages/nylas-core/message-types.js @@ -0,0 +1,4 @@ +module.exports = { + ACCOUNT_UPDATED: "ACCOUNT_UPDATED", + SYNCBACK_REQUESTED: "SYNCBACK_REQUESTED", +} diff --git a/packages/nylas-core/pubsub-connector.js b/packages/nylas-core/pubsub-connector.js index b279f7f35..bf577b64a 100644 --- a/packages/nylas-core/pubsub-connector.js +++ b/packages/nylas-core/pubsub-connector.js @@ -26,16 +26,7 @@ class PubsubConnector { return this._broadcastClient; } - channelForAccount(accountId) { - return `a-${accountId}`; - } - - channelForAccountDeltas(accountId) { - return `deltas-${accountId}`; - } - // Shared channel - _observableForChannelOnSharedListener(channel) { if (!this._listenClient) { this._listenClient = this.buildClient(); @@ -63,47 +54,29 @@ class PubsubConnector { }); } - notifyAccountChange(accountId) { - const channel = this.channelForAccount(accountId); - this.broadcastClient().publish(channel, 'modified'); + notify({accountId, type, data}) { + this.broadcastClient().publish(`channel-${accountId}`, {type, data}); } - observableForAccountChanges(accountId) { - const channel = this.channelForAccount(accountId); - return this._observableForChannelOnSharedListener(channel); + observe(accountId) { + return this._observableForChannelOnSharedListener(`channel-${accountId}`); } - notifyMessageCreated(payload) { - this.broadcastClient().publish('message-created', payload); + notifyDelta(accountId, data) { + this.broadcastClient().publish(`channel-${accountId}-deltas`, JSON.stringify(data)) } - observableForMessageCreation(accountId) { - return this._observableForChannelOnSharedListener('message-created'); - } - - // Account (delta streaming) channels - - notifyAccountDeltas(accountId, data) { - const channel = this.channelForAccountDeltas(accountId); - this.broadcastClient().publish(channel, JSON.stringify(data)) - } - - observableForAccountDeltas(accountId) { + observeDeltas(accountId) { return Rx.Observable.create((observer) => { const sub = this.buildClient(); sub.on("message", (channel, message) => observer.onNext(message)); - sub.subscribe(this.channelForAccountDeltas(accountId)); + sub.subscribe(`channel-${accountId}-deltas`); return () => { sub.unsubscribe(); sub.quit(); } }) } - - queueSyncbackTask({taskName, props}) { - const channel = this.channelForSyncbackTaskQueue(accountId); - this.broadcastClient().publish(channel, JSON.stringify(data)) - } } module.exports = new PubsubConnector() diff --git a/packages/nylas-core/scheduler-utils.js b/packages/nylas-core/scheduler-utils.js index 94dc807cf..568240870 100644 --- a/packages/nylas-core/scheduler-utils.js +++ b/packages/nylas-core/scheduler-utils.js @@ -10,6 +10,7 @@ const HEARTBEAT_EXPIRES = 30; // 2 min in prod? const CLAIM_DURATION = 10 * 60 * 1000; // 2 hours on prod? const PubsubConnector = require('./pubsub-connector'); +const MessageTypes = require('./message-types') const forEachAccountList = (forEachCallback) => { const client = PubsubConnector.broadcastClient(); @@ -44,7 +45,10 @@ const notifyAccountIsActive = (accountId) => { client.incrAsync(key).then((val) => { client.expireAsync(key, 15 * 60 * 1000); // 15 min if (val === 1) { - PubsubConnector.notifyAccountChange(accountId); + PubsubConnector.notify({ + accountId: accountId, + type: MessageTypes.ACCOUNT_UPDATED + }); } }); } diff --git a/packages/nylas-sync/sync-worker.js b/packages/nylas-sync/sync-worker.js index 6b35c2875..f2f7d291e 100644 --- a/packages/nylas-sync/sync-worker.js +++ b/packages/nylas-sync/sync-worker.js @@ -4,6 +4,7 @@ const { IMAPConnection, PubsubConnector, DatabaseConnector, + MessageTypes, } = require('nylas-core'); const FetchCategoryList = require('./imap/fetch-category-list') @@ -24,8 +25,8 @@ class SyncWorker { this.syncNow(); - this._listener = PubsubConnector.observableForAccountChanges(account.id) - .subscribe(() => this.onAccountChanged()) + this._onMessage = this._onMessage.bind(this) + this._listener = PubsubConnector.observe(account.id).subscribe(this._onMessage) } cleanup() { @@ -39,7 +40,18 @@ class SyncWorker { this._conn = null } - onAccountChanged() { + _onMessage(msg = {}) { + switch(msg.type) { + case MessageTypes.ACCOUNT_UPDATED: + this._onAccountUpdated(); break; + case MessageTypes.SYNCBACK_REQUESTED: + this.syncNow(); break; + default: + throw new Error(`Invalid message: ${JSON.stringify(msg)}`) + } + } + + _onAccountUpdated() { console.log("SyncWorker: Detected change to account. Reloading and syncing now.") DatabaseConnector.forShared().then(({Account}) => { Account.find({where: {id: this._account.id}}).then((account) => {