From ef5c4a29fe216baac3daab0a27e39723b53b7f66 Mon Sep 17 00:00:00 2001 From: Ben Gotow Date: Thu, 30 Jun 2016 12:33:08 -0700 Subject: [PATCH] Fix redis keys, dashboard app, show account active state on dashboard --- packages/nylas-api/serialization.js | 2 +- packages/nylas-core/pubsub-connector.js | 21 ++++++-- packages/nylas-core/scheduler-utils.js | 21 +++++--- packages/nylas-dashboard/app.js | 20 ++++---- packages/nylas-dashboard/public/css/app.css | 2 + packages/nylas-dashboard/public/js/app.jsx | 38 +++++++++----- packages/nylas-sync/sync-worker.js | 57 ++++++++++----------- 7 files changed, 98 insertions(+), 63 deletions(-) diff --git a/packages/nylas-api/serialization.js b/packages/nylas-api/serialization.js index 9570d6519..68c77ddc8 100644 --- a/packages/nylas-api/serialization.js +++ b/packages/nylas-api/serialization.js @@ -19,7 +19,7 @@ function jsonSchema(modelName) { organization_unit: Joi.string(), connection_settings: Joi.object(), sync_policy: Joi.object(), - sync_error: Joi.object(), + sync_error: Joi.object().allow(null), }) } if (modelName === 'Folder') { diff --git a/packages/nylas-core/pubsub-connector.js b/packages/nylas-core/pubsub-connector.js index b37b27d14..56ba15e92 100644 --- a/packages/nylas-core/pubsub-connector.js +++ b/packages/nylas-core/pubsub-connector.js @@ -63,22 +63,35 @@ class PubsubConnector { } notify({accountId, type, data}) { - this.broadcastClient().publish(`channel-${accountId}`, JSON.stringify({type, data})); + this.broadcastClient().publish(`account-${accountId}`, JSON.stringify({type, data})); } observe(accountId) { - return this._observableForChannelOnSharedListener(`channel-${accountId}`); + return this._observableForChannelOnSharedListener(`account-${accountId}`); } notifyDelta(accountId, data) { - this.broadcastClient().publish(`channel-${accountId}-deltas`, JSON.stringify(data)) + this.broadcastClient().publish(`deltas-${accountId}`, JSON.stringify(data)) + } + + observeAllAccounts() { + return Rx.Observable.create((observer) => { + const sub = this.buildClient(); + sub.on("pmessage", (pattern, channel, message) => + observer.onNext(channel.replace('account-', ''), message)); + sub.psubscribe(`account-*`); + return () => { + sub.unsubscribe(); + sub.quit(); + } + }) } observeDeltas(accountId) { return Rx.Observable.create((observer) => { const sub = this.buildClient(); sub.on("message", (channel, message) => observer.onNext(message)); - sub.subscribe(`channel-${accountId}-deltas`); + sub.subscribe(`deltas-${accountId}`); return () => { sub.unsubscribe(); sub.quit(); diff --git a/packages/nylas-core/scheduler-utils.js b/packages/nylas-core/scheduler-utils.js index 4df20d560..930fdc7cf 100644 --- a/packages/nylas-core/scheduler-utils.js +++ b/packages/nylas-core/scheduler-utils.js @@ -1,8 +1,7 @@ const ACCOUNTS_UNCLAIMED = 'accounts:unclaimed'; const ACCOUNTS_CLAIMED_PREFIX = 'accounts:id-'; const ACCOUNTS_FOR = (id) => `${ACCOUNTS_CLAIMED_PREFIX}${id}`; - -const CONNECTION_COUNT_FOR = (id) => `connections:${id}` +const ACTIVE_KEY_FOR = (id) => `active:${id}` const HEARTBEAT_FOR = (id) => `heartbeat:${id}`; const HEARTBEAT_EXPIRES = 30; // 2 min in prod? @@ -35,15 +34,24 @@ const assignPolicy = (accountId, policy) => { const checkIfAccountIsActive = (accountId) => { const client = PubsubConnector.broadcastClient(); - const key = CONNECTION_COUNT_FOR(accountId); - return client.getAsync(key).then((val) => val && val > 0) + const key = ACTIVE_KEY_FOR(accountId); + return client.getAsync(key).then((val) => val !== null) +} + +const listActiveAccounts = () => { + const client = PubsubConnector.broadcastClient(); + const keyBase = ACTIVE_KEY_FOR(''); + + return client.keysAsync(`${keyBase}*`).then((keys) => + keys.map(k => k.replace(keyBase, '')) + ); } const notifyAccountIsActive = (accountId) => { const client = PubsubConnector.broadcastClient(); - const key = CONNECTION_COUNT_FOR(accountId); + const key = ACTIVE_KEY_FOR(accountId); client.incrAsync(key).then((val) => { - client.expireAsync(key, 15 * 60 * 1000); // 15 min + client.expireAsync(key, 5 * 60 * 1000); // 5 min if (val === 1) { PubsubConnector.notify({ accountId: accountId, @@ -63,6 +71,7 @@ module.exports = { assignPolicy, forEachAccountList, + listActiveAccounts, notifyAccountIsActive, checkIfAccountIsActive, } diff --git a/packages/nylas-dashboard/app.js b/packages/nylas-dashboard/app.js index 1cd7ed35f..801ecc72e 100644 --- a/packages/nylas-dashboard/app.js +++ b/packages/nylas-dashboard/app.js @@ -2,7 +2,6 @@ const Hapi = require('hapi'); const HapiWebSocket = require('hapi-plugin-websocket'); const Inert = require('inert'); const {DatabaseConnector, PubsubConnector, SchedulerUtils, NylasError} = require(`nylas-core`); -const {forEachAccountList} = SchedulerUtils; global.Promise = require('bluebird'); global.NylasError = NylasError; @@ -25,16 +24,19 @@ DatabaseConnector.forShared().then(({Account}) => { ws.send(JSON.stringify({ cmd: "ACCOUNT", payload: acct })); }); }); - this.redis = PubsubConnector.buildClient(); - this.redis.on('pmessage', (pattern, channel) => { - Account.find({where: {id: channel.replace('a-', '')}}).then((acct) => { + + this.observable = PubsubConnector.observeAllAccounts().subscribe((accountId) => { + Account.find({where: {id: accountId}}).then((acct) => { ws.send(JSON.stringify({ cmd: "ACCOUNT", payload: acct })); }); }); - this.redis.psubscribe(PubsubConnector.channelForAccount('*')); - this.assignmentsInterval = setInterval(() => { + + this.pollInterval = setInterval(() => { + SchedulerUtils.listActiveAccounts().then((accountIds) => { + ws.send(JSON.stringify({ cmd: "ACTIVE", payload: accountIds})) + }); const assignments = {}; - forEachAccountList((identity, accountIds) => { + SchedulerUtils.forEachAccountList((identity, accountIds) => { for (const accountId of accountIds) { assignments[accountId] = identity; } @@ -44,8 +46,8 @@ DatabaseConnector.forShared().then(({Account}) => { }, 1000); }, disconnect: () => { - clearInterval(this.assignmentsInterval); - this.redis.quit(); + clearInterval(this.pollInterval); + this.observable.dispose(); }, }, }, diff --git a/packages/nylas-dashboard/public/css/app.css b/packages/nylas-dashboard/public/css/app.css index 5acd21adf..081bdd647 100644 --- a/packages/nylas-dashboard/public/css/app.css +++ b/packages/nylas-dashboard/public/css/app.css @@ -9,6 +9,8 @@ body { width: 300px; background-color: white; padding:15px; + margin:5px; + vertical-align: top; } .account h3 { diff --git a/packages/nylas-dashboard/public/js/app.jsx b/packages/nylas-dashboard/public/js/app.jsx index 9c2d72f71..c7c59b5dd 100644 --- a/packages/nylas-dashboard/public/js/app.jsx +++ b/packages/nylas-dashboard/public/js/app.jsx @@ -1,4 +1,6 @@ /* eslint react/react-in-jsx-scope: 0*/ +const React = window.React; +const ReactDOM = window.ReactDOM; class ErrorsRoot extends React.Component { render() { @@ -7,17 +9,13 @@ class ErrorsRoot extends React.Component { } class Account extends React.Component { - propTypes: { - account: React.PropTypes.object, - assignment: React.PropTypes.string, - } - renderError() { - const {account} = this.props + const {account} = this.props; + if (account.sync_error != null) { const error = { message: account.sync_error.message, - stack: account.sync_error.stack.split('\n').slice(0, 4), + stack: account.sync_error.stack ? account.sync_error.stack.split('\n').slice(0, 4) : [], } return (
@@ -32,11 +30,11 @@ class Account extends React.Component { } render() { - const {account, assignment} = this.props; + const {account, assignment, active} = this.props; const errorClass = account.sync_error ? ' errored' : '' return (
-

{account.email_address}

+

{account.email_address} {active ? '🌕' : '🌑'}

{assignment}
{JSON.stringify(account.sync_policy, null, 2)}
{this.renderError()} @@ -45,6 +43,12 @@ class Account extends React.Component { } } +Account.propTypes = { + account: React.PropTypes.object, + active: React.PropTypes.bool, + assignment: React.PropTypes.string, +} + class Root extends React.Component { constructor() { @@ -52,6 +56,7 @@ class Root extends React.Component { this.state = { accounts: {}, assignments: {}, + activeAccountIds: [], }; } @@ -75,6 +80,9 @@ class Root extends React.Component { if (msg.cmd === 'ASSIGNMENTS') { this.onReceivedAssignments(msg.payload); } + if (msg.cmd === 'ACTIVE') { + this.onReceivedActiveAccountIds(msg.payload); + } } catch (err) { console.error(err); } @@ -88,6 +96,9 @@ class Root extends React.Component { this.setState({assignments}) } + onReceivedActiveAccountIds(accountIds) { + this.setState({activeAccountIds: accountIds}) + } onReceivedAccount(account) { const accounts = Object.assign({}, this.state.accounts); accounts[account.id] = account; @@ -98,11 +109,12 @@ class Root extends React.Component { return (
{ - Object.keys(this.state.accounts).sort((a, b) => a.compare(b)).map((key) => + Object.keys(this.state.accounts).sort((a, b) => a.localeCompare(b)).map((id) => ) } diff --git a/packages/nylas-sync/sync-worker.js b/packages/nylas-sync/sync-worker.js index d16fa631d..5ae4e808a 100644 --- a/packages/nylas-sync/sync-worker.js +++ b/packages/nylas-sync/sync-worker.js @@ -59,39 +59,17 @@ class SyncWorker { }) } + _onConnectionIdleUpdate() { + this.syncNow(); + } + _getAccount() { return DatabaseConnector.forShared().then(({Account}) => Account.find({where: {id: this._account.id}}) ); } - onSyncDidComplete() { - const {afterSync} = this._account.syncPolicy; - - if (afterSync === 'idle') { - return this.getIdleFolder() - .then((idleFolder) => this._conn.openBox(idleFolder.name)) - .then(() => console.log('SyncWorker: - Idling on inbox category')) - .catch((error) => { - console.error('SyncWorker: - Unhandled error while attempting to idle on Inbox after sync: ', error) - this.closeConnection() - }) - } - - if (afterSync === 'close') { - console.log('SyncWorker: - Closing connection'); - this.closeConnection() - return Promise.resolve() - } - - throw new Error(`SyncWorker.onSyncDidComplete: Unknown afterSync behavior: ${afterSync}. Closing connection`) - } - - onConnectionIdleUpdate() { - this.syncNow(); - } - - getIdleFolder() { + _getIdleFolder() { return this._db.Folder.find({where: {role: ['all', 'inbox']}}) } @@ -111,10 +89,10 @@ class SyncWorker { const conn = new IMAPConnection(this._db, Object.assign({}, settings, credentials)); conn.on('mail', () => { - this.onConnectionIdleUpdate(); + this._onConnectionIdleUpdate(); }) conn.on('update', () => { - this.onConnectionIdleUpdate(); + this._onConnectionIdleUpdate(); }) conn.on('queue-empty', () => { }); @@ -173,6 +151,7 @@ class SyncWorker { } this.ensureConnection() + .then(() => this._account.update({syncError: null})) .then(() => this.performSync()) .then(() => this.onSyncDidComplete()) .catch((error) => this.onSyncError(error)) @@ -185,6 +164,7 @@ class SyncWorker { onSyncError(error) { console.error(`SyncWorker: Error while syncing account ${this._account.emailAddress} `, error) this.closeConnection() + if (error.source === 'socket') { // Continue to retry if it was a network error return Promise.resolve() @@ -193,8 +173,25 @@ class SyncWorker { return this._account.save() } + onSyncDidComplete() { + const {afterSync} = this._account.syncPolicy; + + if (afterSync === 'idle') { + return this._getIdleFolder() + .then((idleFolder) => this._conn.openBox(idleFolder.name)) + .then(() => console.log('SyncWorker: - Idling on inbox category')) + } + + if (afterSync === 'close') { + console.log('SyncWorker: - Closing connection'); + this.closeConnection() + return Promise.resolve() + } + + throw new Error(`SyncWorker.onSyncDidComplete: Unknown afterSync behavior: ${afterSync}. Closing connection`) + } + scheduleNextSync() { - if (this._account.errored()) { return } SchedulerUtils.checkIfAccountIsActive(this._account.id).then((active) => { const {intervals} = this._account.syncPolicy; const interval = active ? intervals.active : intervals.inactive;