Redis coordination of sync processes / assignment

This commit is contained in:
Ben Gotow 2016-06-23 00:49:22 -07:00
parent 2e9bfa68b5
commit 12d9db8dd9
14 changed files with 329 additions and 104 deletions

View file

@ -21,8 +21,8 @@ const plugins = [Inert, Vision, HapiBasicAuth, {
}];
let sharedDb = null;
const {DatabaseConnectionFactory} = require(`nylas-core`)
DatabaseConnectionFactory.forShared().then((db) => {
const {DatabaseConnector} = require(`nylas-core`)
DatabaseConnector.forShared().then((db) => {
sharedDb = db;
});

View file

@ -1,10 +1,10 @@
/* eslint func-names:0 */
const {DatabaseConnectionFactory} = require(`nylas-core`);
const {DatabaseConnector} = require(`nylas-core`);
module.exports = (server) => {
server.decorate('request', 'getAccountDatabase', function () {
const account = this.auth.credentials;
return DatabaseConnectionFactory.forAccount(account.id);
return DatabaseConnector.forAccount(account.id);
});
}

View file

@ -2,7 +2,7 @@ const Joi = require('Joi');
const _ = require('underscore');
const Serialization = require('../serialization');
const {IMAPConnection, DatabaseConnectionFactory} = require('nylas-core');
const {IMAPConnection, PubsubConnector, DatabaseConnector} = require('nylas-core');
const imapSmtpSettings = Joi.object().keys({
imap_host: [Joi.string().ip().required(), Joi.string().hostname().required()],
@ -69,7 +69,7 @@ module.exports = (server) => {
}
Promise.all(connectionChecks).then(() => {
DatabaseConnectionFactory.forShared().then((db) => {
DatabaseConnector.forShared().then((db) => {
const {AccountToken, Account} = db;
const account = Account.build({
@ -90,6 +90,11 @@ module.exports = (server) => {
AccountToken.create({
AccountId: saved.id,
}).then((accountToken) => {
const client = PubsubConnector.broadcastClient();
client.lpushAsync('accounts:unclaimed', saved.id).catch((err) => {
console.error(`Auth: Could not queue account sync! ${err.message}`)
});
const response = saved.toJSON();
response.token = accountToken.value;
reply(Serialization.jsonStringify(response));

View file

@ -1,6 +1,6 @@
const Rx = require('rx')
const _ = require('underscore');
const {AccountPubsub} = require(`nylas-core`);
const {PubsubConnector} = require(`nylas-core`);
function keepAlive(request) {
const until = Rx.Observable.fromCallback(request.on)("disconnect")
@ -52,7 +52,7 @@ module.exports = (server) => {
request.getAccountDatabase().then((db) => {
const source = Rx.Observable.merge(
AccountPubsub.observableForAccountId(db.accountId),
PubsubConnector.observableForAccountDeltas(db.accountId),
initialTransactions(db, request),
keepAlive(request)
).subscribe(outputStream.pushJSON)

View file

@ -1,47 +0,0 @@
const Rx = require('rx')
const bluebird = require('bluebird')
const redis = require("redis");
bluebird.promisifyAll(redis.RedisClient.prototype);
bluebird.promisifyAll(redis.Multi.prototype);
class AccountPubsub {
constructor() {
this._broadcastClient = null;
}
buildClient() {
const client = redis.createClient(process.env.REDIS_URL || null);
client.on("error", console.error);
return client;
}
keyForAccountId(accountId) {
return `delta-${accountId}`;
}
notify(accountId, data) {
if (!this._broadcastClient) {
this._broadcastClient = this.buildClient();
}
const key = this.keyForAccountId(accountId);
this._broadcastClient.publish(key, JSON.stringify(data))
}
observableForAccountId(accountId) {
return Rx.Observable.create((observer) => {
const sub = this.buildClient();
const key = this.keyForAccountId(accountId);
sub.on("message", (channel, message) => {
if (channel !== key) { return }
observer.onNext(message)
});
sub.subscribe(key);
return () => {
sub.unsubscribe()
sub.quit()
}
})
}
}
module.exports = new AccountPubsub()

View file

@ -10,7 +10,7 @@ if (!fs.existsSync(STORAGE_DIR)) {
fs.mkdirSync(STORAGE_DIR);
}
class DatabaseConnectionFactory {
class DatabaseConnector {
constructor() {
this._pools = {};
}
@ -86,4 +86,4 @@ class DatabaseConnectionFactory {
}
}
module.exports = new DatabaseConnectionFactory()
module.exports = new DatabaseConnector()

View file

@ -1,6 +1,6 @@
module.exports = {
DatabaseConnectionFactory: require('./database-connection-factory'),
AccountPubsub: require('./account-pubsub'),
DatabaseConnector: require('./database-connector'),
PubsubConnector: require('./pubsub-connector'),
IMAPConnection: require('./imap-connection'),
Config: require(`./config/${process.env.ENV || 'development'}`),
}

View file

@ -0,0 +1,91 @@
const Rx = require('rx')
const bluebird = require('bluebird')
const redis = require("redis");
bluebird.promisifyAll(redis.RedisClient.prototype);
bluebird.promisifyAll(redis.Multi.prototype);
class PubsubConnector {
constructor() {
this._broadcastClient = null;
this._listenClient = null;
this._listenClientSubs = {};
}
buildClient() {
const client = redis.createClient(process.env.REDIS_URL || null);
client.on("error", console.error);
return client;
}
broadcastClient() {
if (!this._broadcastClient) {
this._broadcastClient = this.buildClient();
}
return this._broadcastClient;
}
channelForAccount(accountId) {
return `a-${accountId}`;
}
channelForAccountDeltas(accountId) {
return `a-${accountId}-deltas`;
}
// Shared channel
notifyAccountChange(accountId) {
const channel = this.channelForAccount(accountId);
this.broadcastClient().publish(channel, 'modified');
}
observableForAccountChanges(accountId) {
if (!this._listenClient) {
this._listenClient = this.buildClient();
this._listenClientSubs = {};
}
const channel = this.channelForAccount(accountId);
return Rx.Observable.create((observer) => {
this._listenClient.on("message", (msgChannel, message) => {
if (msgChannel !== channel) { return }
observer.onNext(message)
});
if (!this._listenClientSubs[channel]) {
this._listenClientSubs[channel] = 1;
this._listenClient.subscribe(channel);
} else {
this._listenClientSubs[channel] += 1;
}
return () => {
this._listenClientSubs[channel] -= 1;
if (this._listenClientSubs[channel] === 0) {
this._listenClient.unsubscribe(channel);
}
}
})
}
// Account (delta streaming) channels
notifyAccountDeltas(accountId, data) {
const channel = this.channelForAccountDeltas(accountId);
this.broadcastClient().publish(channel, JSON.stringify(data))
}
observableForAccountDeltas(accountId) {
return Rx.Observable.create((observer) => {
const sub = this.buildClient();
sub.on("message", (channel, message) => observer.onNext(message));
sub.subscribe(this.channelForAccountDeltas(accountId));
return () => {
sub.unsubscribe();
sub.quit();
}
})
}
}
module.exports = new PubsubConnector()

View file

@ -1,4 +1,4 @@
const AccountPubsub = require('./account-pubsub')
const PubsubConnector = require('./pubsub-connector')
class TransactionLog {
constructor(db) {
@ -25,7 +25,8 @@ class TransactionLog {
);
this.db.Transaction.create(transactionData);
transactionData.object = sequelizeHookData.dataValues;
AccountPubsub.notify(this.db.accountId, transactionData);
PubsubConnector.notifyAccountDeltas(this.db.accountId, transactionData);
}
}

View file

@ -1,4 +1,4 @@
const {DatabaseConnectionFactory} = require(`nylas-core`)
const {DatabaseConnector} = require(`nylas-core`)
const {processors} = require('./processors')
// List of the attributes of Message that the processor should be allowed to change.
@ -21,7 +21,7 @@ function saveMessage(message) {
}
function processMessage({messageId, accountId}) {
DatabaseConnectionFactory.forAccount(accountId)
DatabaseConnector.forAccount(accountId)
.then(({Message}) =>
Message.find({where: {id: messageId}}).then((message) =>
runPipeline(accountId, message)

View file

@ -1,24 +1,21 @@
global.Promise = require('bluebird');
const {DatabaseConnectionFactory} = require(`nylas-core`)
const SyncWorkerPool = require('./sync-worker-pool');
const workerPool = new SyncWorkerPool();
const {DatabaseConnector} = require(`nylas-core`)
const SyncProcessManager = require('./sync-process-manager');
const start = () => {
DatabaseConnectionFactory.forShared().then((db) => {
const {Account} = db;
Account.findAll().then((accounts) => {
if (accounts.length === 0) {
console.log(`Couldn't find any accounts to sync. Run this CURL command to auth one!`)
console.log(`curl -X POST -H "Content-Type: application/json" -d '{"email":"inboxapptest2@fastmail.fm", "name":"Ben Gotow", "provider":"imap", "settings":{"imap_username":"inboxapptest1@fastmail.fm","imap_host":"mail.amessagingengine.com","imap_port":993,"smtp_host":"mail.messagingengine.com","smtp_port":0,"smtp_username":"inboxapptest1@fastmail.fm", "smtp_password":"trar2e","imap_password":"trar2e","ssl_required":true}}' "http://localhost:5100/auth?client_id=123"`)
}
accounts.forEach((account) => {
workerPool.addWorkerForAccount(account);
});
});
const manager = new SyncProcessManager();
DatabaseConnector.forShared().then((db) => {
const {Account} = db;
Account.findAll().then((accounts) => {
if (accounts.length === 0) {
console.log(`Couldn't find any accounts to sync. Run this CURL command to auth one!`)
console.log(`curl -X POST -H "Content-Type: application/json" -d '{"email":"inboxapptest2@fastmail.fm", "name":"Ben Gotow", "provider":"imap", "settings":{"imap_username":"inboxapptest1@fastmail.fm","imap_host":"mail.amessagingengine.com","imap_port":993,"smtp_host":"mail.messagingengine.com","smtp_port":0,"smtp_username":"inboxapptest1@fastmail.fm", "smtp_password":"trar2e","imap_password":"trar2e","ssl_required":true}}' "http://localhost:5100/auth?client_id=123"`)
}
manager.ensureAccountIDsInRedis(accounts.map(a => a.id)).then(() => {
manager.start();
})
});
}
});
start();
global.workerPool = workerPool;
global.manager = manager;

View file

@ -0,0 +1,179 @@
const os = require('os');
const SyncWorker = require('./sync-worker');
const {DatabaseConnector, PubsubConnector} = require(`nylas-core`)
const CPU_COUNT = os.cpus().length;
const IDENTITY = `${os.hostname()}-${process.pid}`;
const ACCOUNTS_UNCLAIMED = 'accounts:unclaimed';
const ACCOUNTS_CLAIMED_PREFIX = 'accounts:id-';
const ACCOUNTS_FOR = (id) => `${ACCOUNTS_CLAIMED_PREFIX}${id}`;
const HEARTBEAT_FOR = (id) => `heartbeat:${id}`;
const HEARTBEAT_EXPIRES = 30; // 2 min in prod?
/*
Accounts ALWAYS exist in either `accounts:unclaimed` or an `accounts:{id}` list.
They are atomically moved between these sets as they are claimed and returned.
Periodically, each worker in the pool looks at all the `accounts:{id}` lists.
For each list it finds, it checks for the existence of `heartbeat:{id}`, a key
that expires quickly if the sync process doesn't refresh it.
If it does not find the key, it moves all of the accounts in the list back to
the unclaimed key.
*/
class SyncProcessManager {
constructor() {
this._workers = {};
this._listenForSyncsClient = null;
this._exiting = false;
}
start() {
console.log(`SyncWorkerPool: Starting with ID ${IDENTITY}`)
this.unassignAccountsAssignedTo(IDENTITY).then(() => {
this.unassignAccountsMissingHeartbeats();
this.update();
});
setInterval(() => this.updateHeartbeat(), HEARTBEAT_EXPIRES / 5.0 * 1000);
this.updateHeartbeat();
process.on('SIGINT', () => this.onSigInt());
}
updateHeartbeat() {
const key = HEARTBEAT_FOR(IDENTITY);
const client = PubsubConnector.broadcastClient();
client.setAsync(key, Date.now()).then(() =>
client.expireAsync(key, HEARTBEAT_EXPIRES)
).then(() =>
console.log("SyncWorkerPool: Published heartbeat.")
)
}
onSigInt() {
console.log(`SyncWorkerPool: Exiting...`)
this._exiting = true;
this.unassignAccountsAssignedTo(IDENTITY).then(() =>
PubsubConnector.broadcastClient().delAsync(ACCOUNTS_FOR(IDENTITY)).then(() =>
PubsubConnector.broadcastClient().delAsync(HEARTBEAT_FOR(IDENTITY))
)
).finally(() => {
process.exit(1);
});
}
ensureAccountIDsInRedis(accountIds) {
const client = PubsubConnector.broadcastClient();
let unseenIds = [].concat(accountIds);
return Promise.each(client.keysAsync(`accounts:*`), (key) =>
client.lrangeAsync(key, 0, 20000).then((foundIds) => {
unseenIds = unseenIds.filter((a) => !foundIds.includes(`${a}`))
})
).finally(() => {
if (unseenIds.length === 0) {
return;
}
console.log(`SyncWorkerPool: Adding account IDs ${unseenIds.join(',')} to redis.`)
unseenIds.map((id) => client.lpushAsync(ACCOUNTS_UNCLAIMED, id));
});
}
unassignAccountsMissingHeartbeats() {
const client = PubsubConnector.broadcastClient();
console.log("SyncWorkerPool: Starting unassignment for processes missing heartbeats.")
Promise.each(client.keysAsync(`${ACCOUNTS_CLAIMED_PREFIX}*`), (key) => {
const id = key.replace(ACCOUNTS_CLAIMED_PREFIX, '');
return client.existsAsync(HEARTBEAT_FOR(id)).then((exists) =>
(exists ? Promise.resolve() : this.unassignAccountsAssignedTo(id))
)
}).finally(() => {
const delay = HEARTBEAT_EXPIRES * 1000;
setTimeout(() => this.unassignAccountsMissingHeartbeats(), delay);
});
}
unassignAccountsAssignedTo(identity) {
const src = ACCOUNTS_FOR(identity);
const dst = ACCOUNTS_UNCLAIMED;
const unassignOne = (count) =>
PubsubConnector.broadcastClient().rpoplpushAsync(src, dst).then((val) =>
(val ? unassignOne(count + 1) : Promise.resolve(count))
)
return unassignOne(0).then((returned) => {
console.log(`SyncWorkerPool: Returned ${returned} accounts assigned to ${identity}.`)
});
}
update() {
this.ensureCapacity().then(() => {
console.log(`SyncWorkerPool: Voluntering to sync additional account.`)
this.acceptUnclaimedAccount().finally(() => {
this.update();
});
})
.catch((err) => {
console.log(`SyncWorkerPool: No capacity for additional accounts. ${err.message}`)
setTimeout(() => this.update(), 5000)
});
}
ensureCapacity() {
if (os.freemem() < 20 * 1024 * 1024) {
return Promise.reject(new Error(`<20MB RAM free (${os.freemem()} bytes)`));
}
const fiveMinuteLoadAvg = os.loadavg()[1];
if (fiveMinuteLoadAvg > CPU_COUNT * 0.9) {
return Promise.reject(new Error(`CPU load > 90% (${fiveMinuteLoadAvg} - ${CPU_COUNT} cores)`));
}
if (this._exiting) {
return Promise.reject(new Error('Quitting...'))
}
return Promise.resolve();
}
acceptUnclaimedAccount() {
if (!this._waitForAccountClient) {
this._waitForAccountClient = PubsubConnector.buildClient();
}
const src = ACCOUNTS_UNCLAIMED;
const dst = ACCOUNTS_FOR(IDENTITY);
return this._waitForAccountClient.brpoplpushAsync(src, dst, 10000)
.then((accountId) => {
if (accountId) {
this.addWorkerForAccountId(accountId);
}
});
}
addWorkerForAccountId(accountId) {
DatabaseConnector.forShared().then(({Account}) => {
Account.find({where: {id: accountId}}).then((account) => {
if (!account) {
return;
}
DatabaseConnector.forAccount(account.id).then((db) => {
console.log(`SyncWorkerPool: Starting worker for Account ${accountId}`)
this._workers[account.id] = new SyncWorker(account, db);
});
});
});
}
}
module.exports = SyncProcessManager;

View file

@ -1,16 +0,0 @@
const SyncWorker = require('./sync-worker');
const {DatabaseConnectionFactory} = require(`nylas-core`)
class SyncWorkerPool {
constructor() {
this._workers = {};
}
addWorkerForAccount(account) {
DatabaseConnectionFactory.forAccount(account.id).then((db) => {
this._workers[account.id] = new SyncWorker(account, db);
});
}
}
module.exports = SyncWorkerPool;

View file

@ -1,4 +1,8 @@
const {IMAPConnection} = require('nylas-core');
const {
IMAPConnection,
PubsubConnector,
DatabaseConnector,
} = require('nylas-core');
const RefreshMailboxesOperation = require('./imap/refresh-mailboxes-operation')
const SyncMailboxOperation = require('./imap/sync-mailbox-operation')
//
@ -28,17 +32,28 @@ class SyncWorker {
this.syncNow();
this.scheduleExpiration();
this._listener = PubsubConnector.observableForAccountChanges(account.id).subscribe(() => {
this.onAccountChanged();
});
}
cleanup() {
this._listener.dispose();
}
// TODO: How does this get called?
onAccountChanged() {
this.syncNow();
this.scheduleExpiration();
DatabaseConnector.forShared().then(({Account}) => {
Account.find({where: {id: this._account.id}}).then((account) => {
this._account = account;
this.syncNow();
this.scheduleExpiration();
})
});
}
onExpired() {
// Returning syncs to the unclaimed queue every so often is healthy.
// TODO: That.
this.cleanup();
}
onSyncDidComplete() {