Merge branch 'master' of github.com:nylas/k2

This commit is contained in:
Halla Moore 2016-06-30 15:49:03 -07:00
commit d68151a684
14 changed files with 120 additions and 84 deletions

View file

@ -51,10 +51,10 @@ const validate = (request, username, password, callback) => {
}
token.getAccount().then((account) => {
if (!account) {
callback(new Error("Could not find Account referenced by AccountToken"), false, {});
callback(null, false, {});
return;
}
SchedulerUtils.notifyAccountIsActive(account.id)
SchedulerUtils.markAccountIsActive(account.id)
callback(null, true, account);
});
});

View file

@ -5,8 +5,7 @@ module.exports = {
createSyncbackRequest: function createSyncbackRequest(request, reply, syncRequestArgs) {
request.getAccountDatabase().then((db) => {
db.SyncbackRequest.create(syncRequestArgs).then((syncbackRequest) => {
PubsubConnector.notify({
accountId: db.accountId,
PubsubConnector.notifyAccount(db.accountId, {
type: MessageTypes.SYNCBACK_REQUESTED,
data: syncbackRequest.id,
});

View file

@ -9,6 +9,8 @@ const {
DatabaseConnector,
SyncPolicy,
Provider,
PubsubConnector,
MessageTypes,
} = require('nylas-core');
const {GMAIL_CLIENT_ID, GMAIL_CLIENT_SECRET, GMAIL_REDIRECT_URL} = process.env;

View file

@ -19,7 +19,7 @@ function jsonSchema(modelName) {
organization_unit: Joi.string(),
connection_settings: Joi.object(),
sync_policy: Joi.object(),
sync_error: Joi.object(),
sync_error: Joi.object().allow(null),
})
}
if (modelName === 'Folder') {

View file

@ -3,18 +3,16 @@ const MessageTypes = require('./message-types')
module.exports = (db, sequelize) => {
sequelize.addHook("afterCreate", ({dataValues, $modelOptions}) => {
if ($modelOptions.name.singular === 'Account') {
if ($modelOptions.name.singular === 'account') {
PubsubConnector.broadcastClient().lpushAsync('accounts:unclaimed', dataValues.id);
PubsubConnector.notify({
accountId: dataValues.id,
type: MessageTypes.ACCOUNT_UPDATED,
PubsubConnector.notifyAccount(dataValues.id, {
type: MessageTypes.ACCOUNT_CREATED,
});
}
})
sequelize.addHook("afterUpdate", ({dataValues, $modelOptions}) => {
if ($modelOptions.name.singular === 'Account') {
PubsubConnector.notify({
accountId: dataValues.id,
if ($modelOptions.name.singular === 'account') {
PubsubConnector.notifyAccount(dataValues.id, {
type: MessageTypes.ACCOUNT_UPDATED,
});
}

View file

@ -1,4 +1,5 @@
module.exports = {
ACCOUNT_CREATED: "ACCOUNT_CREATED",
ACCOUNT_UPDATED: "ACCOUNT_UPDATED",
SYNCBACK_REQUESTED: "SYNCBACK_REQUESTED",
}

View file

@ -62,23 +62,36 @@ class PubsubConnector {
});
}
notify({accountId, type, data}) {
this.broadcastClient().publish(`channel-${accountId}`, JSON.stringify({type, data}));
notifyAccount(accountId, {type, data}) {
this.broadcastClient().publish(`account-${accountId}`, JSON.stringify({type, data}));
}
observe(accountId) {
return this._observableForChannelOnSharedListener(`channel-${accountId}`);
observeAccount(accountId) {
return this._observableForChannelOnSharedListener(`account-${accountId}`);
}
notifyDelta(accountId, data) {
this.broadcastClient().publish(`channel-${accountId}-deltas`, JSON.stringify(data))
this.broadcastClient().publish(`deltas-${accountId}`, JSON.stringify(data))
}
observeAllAccounts() {
return Rx.Observable.create((observer) => {
const sub = this.buildClient();
sub.on("pmessage", (pattern, channel, message) =>
observer.onNext(channel.replace('account-', ''), message));
sub.psubscribe(`account-*`);
return () => {
sub.unsubscribe();
sub.quit();
}
})
}
observeDeltas(accountId) {
return Rx.Observable.create((observer) => {
const sub = this.buildClient();
sub.on("message", (channel, message) => observer.onNext(message));
sub.subscribe(`channel-${accountId}-deltas`);
sub.subscribe(`deltas-${accountId}`);
return () => {
sub.unsubscribe();
sub.quit();

View file

@ -1,8 +1,7 @@
const ACCOUNTS_UNCLAIMED = 'accounts:unclaimed';
const ACCOUNTS_CLAIMED_PREFIX = 'accounts:id-';
const ACCOUNTS_FOR = (id) => `${ACCOUNTS_CLAIMED_PREFIX}${id}`;
const CONNECTION_COUNT_FOR = (id) => `connections:${id}`
const ACTIVE_KEY_FOR = (id) => `active:${id}`
const HEARTBEAT_FOR = (id) => `heartbeat:${id}`;
const HEARTBEAT_EXPIRES = 30; // 2 min in prod?
@ -35,18 +34,26 @@ const assignPolicy = (accountId, policy) => {
const checkIfAccountIsActive = (accountId) => {
const client = PubsubConnector.broadcastClient();
const key = CONNECTION_COUNT_FOR(accountId);
return client.getAsync(key).then((val) => val && val > 0)
const key = ACTIVE_KEY_FOR(accountId);
return client.getAsync(key).then((val) => val !== null)
}
const notifyAccountIsActive = (accountId) => {
const listActiveAccounts = () => {
const client = PubsubConnector.broadcastClient();
const key = CONNECTION_COUNT_FOR(accountId);
const keyBase = ACTIVE_KEY_FOR('');
return client.keysAsync(`${keyBase}*`).then((keys) =>
keys.map(k => k.replace(keyBase, ''))
);
}
const markAccountIsActive = (accountId) => {
const client = PubsubConnector.broadcastClient();
const key = ACTIVE_KEY_FOR(accountId);
client.incrAsync(key).then((val) => {
client.expireAsync(key, 15 * 60 * 1000); // 15 min
client.expireAsync(key, 5 * 60); // 5 min in seconds
if (val === 1) {
PubsubConnector.notify({
accountId: accountId,
PubsubConnector.notifyAccount(accountId, {
type: MessageTypes.ACCOUNT_UPDATED,
});
}
@ -63,6 +70,7 @@ module.exports = {
assignPolicy,
forEachAccountList,
notifyAccountIsActive,
listActiveAccounts,
markAccountIsActive,
checkIfAccountIsActive,
}

View file

@ -2,7 +2,6 @@ const Hapi = require('hapi');
const HapiWebSocket = require('hapi-plugin-websocket');
const Inert = require('inert');
const {DatabaseConnector, PubsubConnector, SchedulerUtils, NylasError} = require(`nylas-core`);
const {forEachAccountList} = SchedulerUtils;
global.Promise = require('bluebird');
global.NylasError = NylasError;
@ -25,16 +24,19 @@ DatabaseConnector.forShared().then(({Account}) => {
ws.send(JSON.stringify({ cmd: "ACCOUNT", payload: acct }));
});
});
this.redis = PubsubConnector.buildClient();
this.redis.on('pmessage', (pattern, channel) => {
Account.find({where: {id: channel.replace('a-', '')}}).then((acct) => {
this.observable = PubsubConnector.observeAllAccounts().subscribe((accountId) => {
Account.find({where: {id: accountId}}).then((acct) => {
ws.send(JSON.stringify({ cmd: "ACCOUNT", payload: acct }));
});
});
this.redis.psubscribe(PubsubConnector.channelForAccount('*'));
this.assignmentsInterval = setInterval(() => {
this.pollInterval = setInterval(() => {
SchedulerUtils.listActiveAccounts().then((accountIds) => {
ws.send(JSON.stringify({ cmd: "ACTIVE", payload: accountIds}))
});
const assignments = {};
forEachAccountList((identity, accountIds) => {
SchedulerUtils.forEachAccountList((identity, accountIds) => {
for (const accountId of accountIds) {
assignments[accountId] = identity;
}
@ -44,8 +46,8 @@ DatabaseConnector.forShared().then(({Account}) => {
}, 1000);
},
disconnect: () => {
clearInterval(this.assignmentsInterval);
this.redis.quit();
clearInterval(this.pollInterval);
this.observable.dispose();
},
},
},

View file

@ -9,6 +9,8 @@ body {
width: 300px;
background-color: white;
padding:15px;
margin:5px;
vertical-align: top;
}
.account h3 {

View file

@ -1,4 +1,6 @@
/* eslint react/react-in-jsx-scope: 0*/
const React = window.React;
const ReactDOM = window.ReactDOM;
class ErrorsRoot extends React.Component {
render() {
@ -7,17 +9,13 @@ class ErrorsRoot extends React.Component {
}
class Account extends React.Component {
propTypes: {
account: React.PropTypes.object,
assignment: React.PropTypes.string,
}
renderError() {
const {account} = this.props
const {account} = this.props;
if (account.sync_error != null) {
const error = {
message: account.sync_error.message,
stack: account.sync_error.stack.split('\n').slice(0, 4),
stack: account.sync_error.stack ? account.sync_error.stack.split('\n').slice(0, 4) : [],
}
return (
<div className="error">
@ -32,11 +30,11 @@ class Account extends React.Component {
}
render() {
const {account, assignment} = this.props;
const {account, assignment, active} = this.props;
const errorClass = account.sync_error ? ' errored' : ''
return (
<div className={`account${errorClass}`}>
<h3>{account.email_address}</h3>
<h3>{account.email_address} {active ? '🌕' : '🌑'}</h3>
<strong>{assignment}</strong>
<pre>{JSON.stringify(account.sync_policy, null, 2)}</pre>
{this.renderError()}
@ -45,6 +43,12 @@ class Account extends React.Component {
}
}
Account.propTypes = {
account: React.PropTypes.object,
active: React.PropTypes.bool,
assignment: React.PropTypes.string,
}
class Root extends React.Component {
constructor() {
@ -52,6 +56,7 @@ class Root extends React.Component {
this.state = {
accounts: {},
assignments: {},
activeAccountIds: [],
};
}
@ -75,6 +80,9 @@ class Root extends React.Component {
if (msg.cmd === 'ASSIGNMENTS') {
this.onReceivedAssignments(msg.payload);
}
if (msg.cmd === 'ACTIVE') {
this.onReceivedActiveAccountIds(msg.payload);
}
} catch (err) {
console.error(err);
}
@ -88,6 +96,10 @@ class Root extends React.Component {
this.setState({assignments})
}
onReceivedActiveAccountIds(accountIds) {
this.setState({activeAccountIds: accountIds})
}
onReceivedAccount(account) {
const accounts = Object.assign({}, this.state.accounts);
accounts[account.id] = account;
@ -98,11 +110,12 @@ class Root extends React.Component {
return (
<div>
{
Object.keys(this.state.accounts).sort((a, b) => a.compare(b)).map((key) =>
Object.keys(this.state.accounts).sort((a, b) => a.localeCompare(b)).map((id) =>
<Account
key={key}
assignment={this.state.assignments[key]}
account={this.state.accounts[key]}
key={id}
active={this.state.activeAccountIds.includes(id)}
assignment={this.state.assignments[id]}
account={this.state.accounts[id]}
/>
)
}

View file

@ -10,6 +10,8 @@ global.NylasError = NylasError;
const MessageAttributes = ['body', 'processed', 'to', 'from', 'cc', 'replyTo', 'bcc', 'snippet', 'threadId']
const MessageProcessorVersion = 1;
const redis = PubsubConnector.buildClient();
function runPipeline({db, accountId, message}) {
console.log(`Processing message ${message.id}`)
return processors.reduce((prevPromise, processor) => (
@ -36,8 +38,7 @@ function saveMessage(message) {
}
function dequeueJob() {
const conn = PubsubConnector.buildClient()
conn.brpopAsync('message-processor-queue', 10).then((item) => {
redis.brpopAsync('message-processor-queue', 10).then((item) => {
if (!item) {
return dequeueJob();
}

View file

@ -238,7 +238,7 @@ class FetchMessagesInFolder {
folderImapUID: attributes.uid,
folderId: this._category.id,
headers: parsedHeaders,
headerMessageId: parsedHeaders['message-id'][0],
headerMessageId: parsedHeaders['message-id'] ? parsedHeaders['message-id'][0] : '',
subject: parsedHeaders.subject[0],
}

View file

@ -25,7 +25,7 @@ class SyncWorker {
this.syncNow();
this._onMessage = this._onMessage.bind(this);
this._listener = PubsubConnector.observe(account.id).subscribe(this._onMessage)
this._listener = PubsubConnector.observeAccount(account.id).subscribe(this._onMessage)
}
cleanup() {
@ -59,39 +59,17 @@ class SyncWorker {
})
}
_onConnectionIdleUpdate() {
this.syncNow();
}
_getAccount() {
return DatabaseConnector.forShared().then(({Account}) =>
Account.find({where: {id: this._account.id}})
);
}
onSyncDidComplete() {
const {afterSync} = this._account.syncPolicy;
if (afterSync === 'idle') {
return this.getIdleFolder()
.then((idleFolder) => this._conn.openBox(idleFolder.name))
.then(() => console.log('SyncWorker: - Idling on inbox category'))
.catch((error) => {
console.error('SyncWorker: - Unhandled error while attempting to idle on Inbox after sync: ', error)
this.closeConnection()
})
}
if (afterSync === 'close') {
console.log('SyncWorker: - Closing connection');
this.closeConnection()
return Promise.resolve()
}
throw new Error(`SyncWorker.onSyncDidComplete: Unknown afterSync behavior: ${afterSync}. Closing connection`)
}
onConnectionIdleUpdate() {
this.syncNow();
}
getIdleFolder() {
_getIdleFolder() {
return this._db.Folder.find({where: {role: ['all', 'inbox']}})
}
@ -111,10 +89,10 @@ class SyncWorker {
const conn = new IMAPConnection(this._db, Object.assign({}, settings, credentials));
conn.on('mail', () => {
this.onConnectionIdleUpdate();
this._onConnectionIdleUpdate();
})
conn.on('update', () => {
this.onConnectionIdleUpdate();
this._onConnectionIdleUpdate();
})
conn.on('queue-empty', () => {
});
@ -173,6 +151,7 @@ class SyncWorker {
}
this.ensureConnection()
.then(() => this._account.update({syncError: null}))
.then(() => this.performSync())
.then(() => this.onSyncDidComplete())
.catch((error) => this.onSyncError(error))
@ -185,6 +164,7 @@ class SyncWorker {
onSyncError(error) {
console.error(`SyncWorker: Error while syncing account ${this._account.emailAddress} `, error)
this.closeConnection()
if (error.source === 'socket') {
// Continue to retry if it was a network error
return Promise.resolve()
@ -193,8 +173,25 @@ class SyncWorker {
return this._account.save()
}
onSyncDidComplete() {
const {afterSync} = this._account.syncPolicy;
if (afterSync === 'idle') {
return this._getIdleFolder()
.then((idleFolder) => this._conn.openBox(idleFolder.name))
.then(() => console.log('SyncWorker: - Idling on inbox category'))
}
if (afterSync === 'close') {
console.log('SyncWorker: - Closing connection');
this.closeConnection()
return Promise.resolve()
}
throw new Error(`SyncWorker.onSyncDidComplete: Unknown afterSync behavior: ${afterSync}. Closing connection`)
}
scheduleNextSync() {
if (this._account.errored()) { return }
SchedulerUtils.checkIfAccountIsActive(this._account.id).then((active) => {
const {intervals} = this._account.syncPolicy;
const interval = active ? intervals.active : intervals.inactive;