Fix redis keys, dashboard app, show account active state on dashboard

This commit is contained in:
Ben Gotow 2016-06-30 12:33:08 -07:00
parent f9952f9fa7
commit ef5c4a29fe
7 changed files with 98 additions and 63 deletions

View file

@ -19,7 +19,7 @@ function jsonSchema(modelName) {
organization_unit: Joi.string(),
connection_settings: Joi.object(),
sync_policy: Joi.object(),
sync_error: Joi.object(),
sync_error: Joi.object().allow(null),
})
}
if (modelName === 'Folder') {

View file

@ -63,22 +63,35 @@ class PubsubConnector {
}
notify({accountId, type, data}) {
this.broadcastClient().publish(`channel-${accountId}`, JSON.stringify({type, data}));
this.broadcastClient().publish(`account-${accountId}`, JSON.stringify({type, data}));
}
observe(accountId) {
return this._observableForChannelOnSharedListener(`channel-${accountId}`);
return this._observableForChannelOnSharedListener(`account-${accountId}`);
}
notifyDelta(accountId, data) {
this.broadcastClient().publish(`channel-${accountId}-deltas`, JSON.stringify(data))
this.broadcastClient().publish(`deltas-${accountId}`, JSON.stringify(data))
}
observeAllAccounts() {
return Rx.Observable.create((observer) => {
const sub = this.buildClient();
sub.on("pmessage", (pattern, channel, message) =>
observer.onNext(channel.replace('account-', ''), message));
sub.psubscribe(`account-*`);
return () => {
sub.unsubscribe();
sub.quit();
}
})
}
observeDeltas(accountId) {
return Rx.Observable.create((observer) => {
const sub = this.buildClient();
sub.on("message", (channel, message) => observer.onNext(message));
sub.subscribe(`channel-${accountId}-deltas`);
sub.subscribe(`deltas-${accountId}`);
return () => {
sub.unsubscribe();
sub.quit();

View file

@ -1,8 +1,7 @@
const ACCOUNTS_UNCLAIMED = 'accounts:unclaimed';
const ACCOUNTS_CLAIMED_PREFIX = 'accounts:id-';
const ACCOUNTS_FOR = (id) => `${ACCOUNTS_CLAIMED_PREFIX}${id}`;
const CONNECTION_COUNT_FOR = (id) => `connections:${id}`
const ACTIVE_KEY_FOR = (id) => `active:${id}`
const HEARTBEAT_FOR = (id) => `heartbeat:${id}`;
const HEARTBEAT_EXPIRES = 30; // 2 min in prod?
@ -35,15 +34,24 @@ const assignPolicy = (accountId, policy) => {
const checkIfAccountIsActive = (accountId) => {
const client = PubsubConnector.broadcastClient();
const key = CONNECTION_COUNT_FOR(accountId);
return client.getAsync(key).then((val) => val && val > 0)
const key = ACTIVE_KEY_FOR(accountId);
return client.getAsync(key).then((val) => val !== null)
}
const listActiveAccounts = () => {
const client = PubsubConnector.broadcastClient();
const keyBase = ACTIVE_KEY_FOR('');
return client.keysAsync(`${keyBase}*`).then((keys) =>
keys.map(k => k.replace(keyBase, ''))
);
}
const notifyAccountIsActive = (accountId) => {
const client = PubsubConnector.broadcastClient();
const key = CONNECTION_COUNT_FOR(accountId);
const key = ACTIVE_KEY_FOR(accountId);
client.incrAsync(key).then((val) => {
client.expireAsync(key, 15 * 60 * 1000); // 15 min
client.expireAsync(key, 5 * 60 * 1000); // 5 min
if (val === 1) {
PubsubConnector.notify({
accountId: accountId,
@ -63,6 +71,7 @@ module.exports = {
assignPolicy,
forEachAccountList,
listActiveAccounts,
notifyAccountIsActive,
checkIfAccountIsActive,
}

View file

@ -2,7 +2,6 @@ const Hapi = require('hapi');
const HapiWebSocket = require('hapi-plugin-websocket');
const Inert = require('inert');
const {DatabaseConnector, PubsubConnector, SchedulerUtils, NylasError} = require(`nylas-core`);
const {forEachAccountList} = SchedulerUtils;
global.Promise = require('bluebird');
global.NylasError = NylasError;
@ -25,16 +24,19 @@ DatabaseConnector.forShared().then(({Account}) => {
ws.send(JSON.stringify({ cmd: "ACCOUNT", payload: acct }));
});
});
this.redis = PubsubConnector.buildClient();
this.redis.on('pmessage', (pattern, channel) => {
Account.find({where: {id: channel.replace('a-', '')}}).then((acct) => {
this.observable = PubsubConnector.observeAllAccounts().subscribe((accountId) => {
Account.find({where: {id: accountId}}).then((acct) => {
ws.send(JSON.stringify({ cmd: "ACCOUNT", payload: acct }));
});
});
this.redis.psubscribe(PubsubConnector.channelForAccount('*'));
this.assignmentsInterval = setInterval(() => {
this.pollInterval = setInterval(() => {
SchedulerUtils.listActiveAccounts().then((accountIds) => {
ws.send(JSON.stringify({ cmd: "ACTIVE", payload: accountIds}))
});
const assignments = {};
forEachAccountList((identity, accountIds) => {
SchedulerUtils.forEachAccountList((identity, accountIds) => {
for (const accountId of accountIds) {
assignments[accountId] = identity;
}
@ -44,8 +46,8 @@ DatabaseConnector.forShared().then(({Account}) => {
}, 1000);
},
disconnect: () => {
clearInterval(this.assignmentsInterval);
this.redis.quit();
clearInterval(this.pollInterval);
this.observable.dispose();
},
},
},

View file

@ -9,6 +9,8 @@ body {
width: 300px;
background-color: white;
padding:15px;
margin:5px;
vertical-align: top;
}
.account h3 {

View file

@ -1,4 +1,6 @@
/* eslint react/react-in-jsx-scope: 0*/
const React = window.React;
const ReactDOM = window.ReactDOM;
class ErrorsRoot extends React.Component {
render() {
@ -7,17 +9,13 @@ class ErrorsRoot extends React.Component {
}
class Account extends React.Component {
propTypes: {
account: React.PropTypes.object,
assignment: React.PropTypes.string,
}
renderError() {
const {account} = this.props
const {account} = this.props;
if (account.sync_error != null) {
const error = {
message: account.sync_error.message,
stack: account.sync_error.stack.split('\n').slice(0, 4),
stack: account.sync_error.stack ? account.sync_error.stack.split('\n').slice(0, 4) : [],
}
return (
<div className="error">
@ -32,11 +30,11 @@ class Account extends React.Component {
}
render() {
const {account, assignment} = this.props;
const {account, assignment, active} = this.props;
const errorClass = account.sync_error ? ' errored' : ''
return (
<div className={`account${errorClass}`}>
<h3>{account.email_address}</h3>
<h3>{account.email_address} {active ? '🌕' : '🌑'}</h3>
<strong>{assignment}</strong>
<pre>{JSON.stringify(account.sync_policy, null, 2)}</pre>
{this.renderError()}
@ -45,6 +43,12 @@ class Account extends React.Component {
}
}
Account.propTypes = {
account: React.PropTypes.object,
active: React.PropTypes.bool,
assignment: React.PropTypes.string,
}
class Root extends React.Component {
constructor() {
@ -52,6 +56,7 @@ class Root extends React.Component {
this.state = {
accounts: {},
assignments: {},
activeAccountIds: [],
};
}
@ -75,6 +80,9 @@ class Root extends React.Component {
if (msg.cmd === 'ASSIGNMENTS') {
this.onReceivedAssignments(msg.payload);
}
if (msg.cmd === 'ACTIVE') {
this.onReceivedActiveAccountIds(msg.payload);
}
} catch (err) {
console.error(err);
}
@ -88,6 +96,9 @@ class Root extends React.Component {
this.setState({assignments})
}
onReceivedActiveAccountIds(accountIds) {
this.setState({activeAccountIds: accountIds})
}
onReceivedAccount(account) {
const accounts = Object.assign({}, this.state.accounts);
accounts[account.id] = account;
@ -98,11 +109,12 @@ class Root extends React.Component {
return (
<div>
{
Object.keys(this.state.accounts).sort((a, b) => a.compare(b)).map((key) =>
Object.keys(this.state.accounts).sort((a, b) => a.localeCompare(b)).map((id) =>
<Account
key={key}
assignment={this.state.assignments[key]}
account={this.state.accounts[key]}
key={id}
active={this.state.activeAccountIds.includes(id)}
assignment={this.state.assignments[id]}
account={this.state.accounts[id]}
/>
)
}

View file

@ -59,39 +59,17 @@ class SyncWorker {
})
}
_onConnectionIdleUpdate() {
this.syncNow();
}
_getAccount() {
return DatabaseConnector.forShared().then(({Account}) =>
Account.find({where: {id: this._account.id}})
);
}
onSyncDidComplete() {
const {afterSync} = this._account.syncPolicy;
if (afterSync === 'idle') {
return this.getIdleFolder()
.then((idleFolder) => this._conn.openBox(idleFolder.name))
.then(() => console.log('SyncWorker: - Idling on inbox category'))
.catch((error) => {
console.error('SyncWorker: - Unhandled error while attempting to idle on Inbox after sync: ', error)
this.closeConnection()
})
}
if (afterSync === 'close') {
console.log('SyncWorker: - Closing connection');
this.closeConnection()
return Promise.resolve()
}
throw new Error(`SyncWorker.onSyncDidComplete: Unknown afterSync behavior: ${afterSync}. Closing connection`)
}
onConnectionIdleUpdate() {
this.syncNow();
}
getIdleFolder() {
_getIdleFolder() {
return this._db.Folder.find({where: {role: ['all', 'inbox']}})
}
@ -111,10 +89,10 @@ class SyncWorker {
const conn = new IMAPConnection(this._db, Object.assign({}, settings, credentials));
conn.on('mail', () => {
this.onConnectionIdleUpdate();
this._onConnectionIdleUpdate();
})
conn.on('update', () => {
this.onConnectionIdleUpdate();
this._onConnectionIdleUpdate();
})
conn.on('queue-empty', () => {
});
@ -173,6 +151,7 @@ class SyncWorker {
}
this.ensureConnection()
.then(() => this._account.update({syncError: null}))
.then(() => this.performSync())
.then(() => this.onSyncDidComplete())
.catch((error) => this.onSyncError(error))
@ -185,6 +164,7 @@ class SyncWorker {
onSyncError(error) {
console.error(`SyncWorker: Error while syncing account ${this._account.emailAddress} `, error)
this.closeConnection()
if (error.source === 'socket') {
// Continue to retry if it was a network error
return Promise.resolve()
@ -193,8 +173,25 @@ class SyncWorker {
return this._account.save()
}
onSyncDidComplete() {
const {afterSync} = this._account.syncPolicy;
if (afterSync === 'idle') {
return this._getIdleFolder()
.then((idleFolder) => this._conn.openBox(idleFolder.name))
.then(() => console.log('SyncWorker: - Idling on inbox category'))
}
if (afterSync === 'close') {
console.log('SyncWorker: - Closing connection');
this.closeConnection()
return Promise.resolve()
}
throw new Error(`SyncWorker.onSyncDidComplete: Unknown afterSync behavior: ${afterSync}. Closing connection`)
}
scheduleNextSync() {
if (this._account.errored()) { return }
SchedulerUtils.checkIfAccountIsActive(this._account.id).then((active) => {
const {intervals} = this._account.syncPolicy;
const interval = active ? intervals.active : intervals.inactive;