From 12d9db8dd924fffa9bd8bd209843fca805f66942 Mon Sep 17 00:00:00 2001 From: Ben Gotow Date: Thu, 23 Jun 2016 00:49:22 -0700 Subject: [PATCH] Redis coordination of sync processes / assignment --- packages/nylas-api/app.js | 4 +- packages/nylas-api/decorators/connections.js | 4 +- packages/nylas-api/routes/auth.js | 9 +- packages/nylas-api/routes/delta.js | 4 +- packages/nylas-core/account-pubsub.js | 47 ----- ...ction-factory.js => database-connector.js} | 4 +- packages/nylas-core/index.js | 4 +- packages/nylas-core/pubsub-connector.js | 91 +++++++++ packages/nylas-core/transaction-log.js | 5 +- packages/nylas-message-processor/index.js | 4 +- packages/nylas-sync/app.js | 35 ++-- packages/nylas-sync/sync-process-manager.js | 179 ++++++++++++++++++ packages/nylas-sync/sync-worker-pool.js | 16 -- packages/nylas-sync/sync-worker.js | 27 ++- 14 files changed, 329 insertions(+), 104 deletions(-) delete mode 100644 packages/nylas-core/account-pubsub.js rename packages/nylas-core/{database-connection-factory.js => database-connector.js} (96%) create mode 100644 packages/nylas-core/pubsub-connector.js create mode 100644 packages/nylas-sync/sync-process-manager.js delete mode 100644 packages/nylas-sync/sync-worker-pool.js diff --git a/packages/nylas-api/app.js b/packages/nylas-api/app.js index d752b9bb9..365e53904 100644 --- a/packages/nylas-api/app.js +++ b/packages/nylas-api/app.js @@ -21,8 +21,8 @@ const plugins = [Inert, Vision, HapiBasicAuth, { }]; let sharedDb = null; -const {DatabaseConnectionFactory} = require(`nylas-core`) -DatabaseConnectionFactory.forShared().then((db) => { +const {DatabaseConnector} = require(`nylas-core`) +DatabaseConnector.forShared().then((db) => { sharedDb = db; }); diff --git a/packages/nylas-api/decorators/connections.js b/packages/nylas-api/decorators/connections.js index 2190f2c67..2003696cf 100644 --- a/packages/nylas-api/decorators/connections.js +++ b/packages/nylas-api/decorators/connections.js @@ -1,10 +1,10 @@ /* eslint func-names:0 */ -const {DatabaseConnectionFactory} = require(`nylas-core`); +const {DatabaseConnector} = require(`nylas-core`); module.exports = (server) => { server.decorate('request', 'getAccountDatabase', function () { const account = this.auth.credentials; - return DatabaseConnectionFactory.forAccount(account.id); + return DatabaseConnector.forAccount(account.id); }); } diff --git a/packages/nylas-api/routes/auth.js b/packages/nylas-api/routes/auth.js index 7e76ff6de..6ccb02118 100644 --- a/packages/nylas-api/routes/auth.js +++ b/packages/nylas-api/routes/auth.js @@ -2,7 +2,7 @@ const Joi = require('Joi'); const _ = require('underscore'); const Serialization = require('../serialization'); -const {IMAPConnection, DatabaseConnectionFactory} = require('nylas-core'); +const {IMAPConnection, PubsubConnector, DatabaseConnector} = require('nylas-core'); const imapSmtpSettings = Joi.object().keys({ imap_host: [Joi.string().ip().required(), Joi.string().hostname().required()], @@ -69,7 +69,7 @@ module.exports = (server) => { } Promise.all(connectionChecks).then(() => { - DatabaseConnectionFactory.forShared().then((db) => { + DatabaseConnector.forShared().then((db) => { const {AccountToken, Account} = db; const account = Account.build({ @@ -90,6 +90,11 @@ module.exports = (server) => { AccountToken.create({ AccountId: saved.id, }).then((accountToken) => { + const client = PubsubConnector.broadcastClient(); + client.lpushAsync('accounts:unclaimed', saved.id).catch((err) => { + console.error(`Auth: Could not queue account sync! ${err.message}`) + }); + const response = saved.toJSON(); response.token = accountToken.value; reply(Serialization.jsonStringify(response)); diff --git a/packages/nylas-api/routes/delta.js b/packages/nylas-api/routes/delta.js index 93497ba6e..fc5e603bf 100644 --- a/packages/nylas-api/routes/delta.js +++ b/packages/nylas-api/routes/delta.js @@ -1,6 +1,6 @@ const Rx = require('rx') const _ = require('underscore'); -const {AccountPubsub} = require(`nylas-core`); +const {PubsubConnector} = require(`nylas-core`); function keepAlive(request) { const until = Rx.Observable.fromCallback(request.on)("disconnect") @@ -52,7 +52,7 @@ module.exports = (server) => { request.getAccountDatabase().then((db) => { const source = Rx.Observable.merge( - AccountPubsub.observableForAccountId(db.accountId), + PubsubConnector.observableForAccountDeltas(db.accountId), initialTransactions(db, request), keepAlive(request) ).subscribe(outputStream.pushJSON) diff --git a/packages/nylas-core/account-pubsub.js b/packages/nylas-core/account-pubsub.js deleted file mode 100644 index 57642565b..000000000 --- a/packages/nylas-core/account-pubsub.js +++ /dev/null @@ -1,47 +0,0 @@ -const Rx = require('rx') -const bluebird = require('bluebird') -const redis = require("redis"); -bluebird.promisifyAll(redis.RedisClient.prototype); -bluebird.promisifyAll(redis.Multi.prototype); - -class AccountPubsub { - constructor() { - this._broadcastClient = null; - } - - buildClient() { - const client = redis.createClient(process.env.REDIS_URL || null); - client.on("error", console.error); - return client; - } - - keyForAccountId(accountId) { - return `delta-${accountId}`; - } - - notify(accountId, data) { - if (!this._broadcastClient) { - this._broadcastClient = this.buildClient(); - } - const key = this.keyForAccountId(accountId); - this._broadcastClient.publish(key, JSON.stringify(data)) - } - - observableForAccountId(accountId) { - return Rx.Observable.create((observer) => { - const sub = this.buildClient(); - const key = this.keyForAccountId(accountId); - sub.on("message", (channel, message) => { - if (channel !== key) { return } - observer.onNext(message) - }); - sub.subscribe(key); - return () => { - sub.unsubscribe() - sub.quit() - } - }) - } -} - -module.exports = new AccountPubsub() diff --git a/packages/nylas-core/database-connection-factory.js b/packages/nylas-core/database-connector.js similarity index 96% rename from packages/nylas-core/database-connection-factory.js rename to packages/nylas-core/database-connector.js index 6e0e9f3bd..8c573431b 100644 --- a/packages/nylas-core/database-connection-factory.js +++ b/packages/nylas-core/database-connector.js @@ -10,7 +10,7 @@ if (!fs.existsSync(STORAGE_DIR)) { fs.mkdirSync(STORAGE_DIR); } -class DatabaseConnectionFactory { +class DatabaseConnector { constructor() { this._pools = {}; } @@ -86,4 +86,4 @@ class DatabaseConnectionFactory { } } -module.exports = new DatabaseConnectionFactory() +module.exports = new DatabaseConnector() diff --git a/packages/nylas-core/index.js b/packages/nylas-core/index.js index 3accf1084..f5ebeae89 100644 --- a/packages/nylas-core/index.js +++ b/packages/nylas-core/index.js @@ -1,6 +1,6 @@ module.exports = { - DatabaseConnectionFactory: require('./database-connection-factory'), - AccountPubsub: require('./account-pubsub'), + DatabaseConnector: require('./database-connector'), + PubsubConnector: require('./pubsub-connector'), IMAPConnection: require('./imap-connection'), Config: require(`./config/${process.env.ENV || 'development'}`), } diff --git a/packages/nylas-core/pubsub-connector.js b/packages/nylas-core/pubsub-connector.js new file mode 100644 index 000000000..533b30a5f --- /dev/null +++ b/packages/nylas-core/pubsub-connector.js @@ -0,0 +1,91 @@ +const Rx = require('rx') +const bluebird = require('bluebird') +const redis = require("redis"); +bluebird.promisifyAll(redis.RedisClient.prototype); +bluebird.promisifyAll(redis.Multi.prototype); + +class PubsubConnector { + constructor() { + this._broadcastClient = null; + this._listenClient = null; + this._listenClientSubs = {}; + } + + buildClient() { + const client = redis.createClient(process.env.REDIS_URL || null); + client.on("error", console.error); + return client; + } + + broadcastClient() { + if (!this._broadcastClient) { + this._broadcastClient = this.buildClient(); + } + return this._broadcastClient; + } + + channelForAccount(accountId) { + return `a-${accountId}`; + } + + channelForAccountDeltas(accountId) { + return `a-${accountId}-deltas`; + } + + // Shared channel + + notifyAccountChange(accountId) { + const channel = this.channelForAccount(accountId); + this.broadcastClient().publish(channel, 'modified'); + } + + observableForAccountChanges(accountId) { + if (!this._listenClient) { + this._listenClient = this.buildClient(); + this._listenClientSubs = {}; + } + + const channel = this.channelForAccount(accountId); + return Rx.Observable.create((observer) => { + this._listenClient.on("message", (msgChannel, message) => { + if (msgChannel !== channel) { return } + observer.onNext(message) + }); + + if (!this._listenClientSubs[channel]) { + this._listenClientSubs[channel] = 1; + this._listenClient.subscribe(channel); + } else { + this._listenClientSubs[channel] += 1; + } + return () => { + this._listenClientSubs[channel] -= 1; + if (this._listenClientSubs[channel] === 0) { + this._listenClient.unsubscribe(channel); + } + } + }) + } + + + // Account (delta streaming) channels + + notifyAccountDeltas(accountId, data) { + const channel = this.channelForAccountDeltas(accountId); + this.broadcastClient().publish(channel, JSON.stringify(data)) + } + + observableForAccountDeltas(accountId) { + return Rx.Observable.create((observer) => { + const sub = this.buildClient(); + sub.on("message", (channel, message) => observer.onNext(message)); + sub.subscribe(this.channelForAccountDeltas(accountId)); + return () => { + sub.unsubscribe(); + sub.quit(); + } + }) + } +} + +module.exports = new PubsubConnector() diff --git a/packages/nylas-core/transaction-log.js b/packages/nylas-core/transaction-log.js index 77a3a9474..133efad70 100644 --- a/packages/nylas-core/transaction-log.js +++ b/packages/nylas-core/transaction-log.js @@ -1,4 +1,4 @@ -const AccountPubsub = require('./account-pubsub') +const PubsubConnector = require('./pubsub-connector') class TransactionLog { constructor(db) { @@ -25,7 +25,8 @@ class TransactionLog { ); this.db.Transaction.create(transactionData); transactionData.object = sequelizeHookData.dataValues; - AccountPubsub.notify(this.db.accountId, transactionData); + + PubsubConnector.notifyAccountDeltas(this.db.accountId, transactionData); } } diff --git a/packages/nylas-message-processor/index.js b/packages/nylas-message-processor/index.js index b50d83cd1..40a3269fd 100644 --- a/packages/nylas-message-processor/index.js +++ b/packages/nylas-message-processor/index.js @@ -1,4 +1,4 @@ -const {DatabaseConnectionFactory} = require(`nylas-core`) +const {DatabaseConnector} = require(`nylas-core`) const {processors} = require('./processors') // List of the attributes of Message that the processor should be allowed to change. @@ -21,7 +21,7 @@ function saveMessage(message) { } function processMessage({messageId, accountId}) { - DatabaseConnectionFactory.forAccount(accountId) + DatabaseConnector.forAccount(accountId) .then(({Message}) => Message.find({where: {id: messageId}}).then((message) => runPipeline(accountId, message) diff --git a/packages/nylas-sync/app.js b/packages/nylas-sync/app.js index 65adcd614..dc6692444 100644 --- a/packages/nylas-sync/app.js +++ b/packages/nylas-sync/app.js @@ -1,24 +1,21 @@ global.Promise = require('bluebird'); -const {DatabaseConnectionFactory} = require(`nylas-core`) -const SyncWorkerPool = require('./sync-worker-pool'); -const workerPool = new SyncWorkerPool(); +const {DatabaseConnector} = require(`nylas-core`) +const SyncProcessManager = require('./sync-process-manager'); -const start = () => { - DatabaseConnectionFactory.forShared().then((db) => { - const {Account} = db; - Account.findAll().then((accounts) => { - if (accounts.length === 0) { - console.log(`Couldn't find any accounts to sync. Run this CURL command to auth one!`) - console.log(`curl -X POST -H "Content-Type: application/json" -d '{"email":"inboxapptest2@fastmail.fm", "name":"Ben Gotow", "provider":"imap", "settings":{"imap_username":"inboxapptest1@fastmail.fm","imap_host":"mail.amessagingengine.com","imap_port":993,"smtp_host":"mail.messagingengine.com","smtp_port":0,"smtp_username":"inboxapptest1@fastmail.fm", "smtp_password":"trar2e","imap_password":"trar2e","ssl_required":true}}' "http://localhost:5100/auth?client_id=123"`) - } - accounts.forEach((account) => { - workerPool.addWorkerForAccount(account); - }); - }); +const manager = new SyncProcessManager(); + +DatabaseConnector.forShared().then((db) => { + const {Account} = db; + Account.findAll().then((accounts) => { + if (accounts.length === 0) { + console.log(`Couldn't find any accounts to sync. Run this CURL command to auth one!`) + console.log(`curl -X POST -H "Content-Type: application/json" -d '{"email":"inboxapptest2@fastmail.fm", "name":"Ben Gotow", "provider":"imap", "settings":{"imap_username":"inboxapptest1@fastmail.fm","imap_host":"mail.amessagingengine.com","imap_port":993,"smtp_host":"mail.messagingengine.com","smtp_port":0,"smtp_username":"inboxapptest1@fastmail.fm", "smtp_password":"trar2e","imap_password":"trar2e","ssl_required":true}}' "http://localhost:5100/auth?client_id=123"`) + } + manager.ensureAccountIDsInRedis(accounts.map(a => a.id)).then(() => { + manager.start(); + }) }); -} +}); -start(); - -global.workerPool = workerPool; +global.manager = manager; diff --git a/packages/nylas-sync/sync-process-manager.js b/packages/nylas-sync/sync-process-manager.js new file mode 100644 index 000000000..a6bace14f --- /dev/null +++ b/packages/nylas-sync/sync-process-manager.js @@ -0,0 +1,179 @@ +const os = require('os'); +const SyncWorker = require('./sync-worker'); +const {DatabaseConnector, PubsubConnector} = require(`nylas-core`) + +const CPU_COUNT = os.cpus().length; +const IDENTITY = `${os.hostname()}-${process.pid}`; + +const ACCOUNTS_UNCLAIMED = 'accounts:unclaimed'; +const ACCOUNTS_CLAIMED_PREFIX = 'accounts:id-'; +const ACCOUNTS_FOR = (id) => `${ACCOUNTS_CLAIMED_PREFIX}${id}`; +const HEARTBEAT_FOR = (id) => `heartbeat:${id}`; +const HEARTBEAT_EXPIRES = 30; // 2 min in prod? + +/* +Accounts ALWAYS exist in either `accounts:unclaimed` or an `accounts:{id}` list. +They are atomically moved between these sets as they are claimed and returned. + +Periodically, each worker in the pool looks at all the `accounts:{id}` lists. +For each list it finds, it checks for the existence of `heartbeat:{id}`, a key +that expires quickly if the sync process doesn't refresh it. + +If it does not find the key, it moves all of the accounts in the list back to +the unclaimed key. +*/ + +class SyncProcessManager { + constructor() { + this._workers = {}; + this._listenForSyncsClient = null; + this._exiting = false; + } + + start() { + console.log(`SyncWorkerPool: Starting with ID ${IDENTITY}`) + + this.unassignAccountsAssignedTo(IDENTITY).then(() => { + this.unassignAccountsMissingHeartbeats(); + this.update(); + }); + + setInterval(() => this.updateHeartbeat(), HEARTBEAT_EXPIRES / 5.0 * 1000); + this.updateHeartbeat(); + + process.on('SIGINT', () => this.onSigInt()); + } + + updateHeartbeat() { + const key = HEARTBEAT_FOR(IDENTITY); + const client = PubsubConnector.broadcastClient(); + client.setAsync(key, Date.now()).then(() => + client.expireAsync(key, HEARTBEAT_EXPIRES) + ).then(() => + console.log("SyncWorkerPool: Published heartbeat.") + ) + } + + onSigInt() { + console.log(`SyncWorkerPool: Exiting...`) + this._exiting = true; + + this.unassignAccountsAssignedTo(IDENTITY).then(() => + PubsubConnector.broadcastClient().delAsync(ACCOUNTS_FOR(IDENTITY)).then(() => + PubsubConnector.broadcastClient().delAsync(HEARTBEAT_FOR(IDENTITY)) + ) + ).finally(() => { + process.exit(1); + }); + } + + ensureAccountIDsInRedis(accountIds) { + const client = PubsubConnector.broadcastClient(); + + let unseenIds = [].concat(accountIds); + + return Promise.each(client.keysAsync(`accounts:*`), (key) => + client.lrangeAsync(key, 0, 20000).then((foundIds) => { + unseenIds = unseenIds.filter((a) => !foundIds.includes(`${a}`)) + }) + ).finally(() => { + if (unseenIds.length === 0) { + return; + } + console.log(`SyncWorkerPool: Adding account IDs ${unseenIds.join(',')} to redis.`) + unseenIds.map((id) => client.lpushAsync(ACCOUNTS_UNCLAIMED, id)); + }); + } + + unassignAccountsMissingHeartbeats() { + const client = PubsubConnector.broadcastClient(); + + console.log("SyncWorkerPool: Starting unassignment for processes missing heartbeats.") + + Promise.each(client.keysAsync(`${ACCOUNTS_CLAIMED_PREFIX}*`), (key) => { + const id = key.replace(ACCOUNTS_CLAIMED_PREFIX, ''); + return client.existsAsync(HEARTBEAT_FOR(id)).then((exists) => + (exists ? Promise.resolve() : this.unassignAccountsAssignedTo(id)) + ) + }).finally(() => { + const delay = HEARTBEAT_EXPIRES * 1000; + setTimeout(() => this.unassignAccountsMissingHeartbeats(), delay); + }); + } + + unassignAccountsAssignedTo(identity) { + const src = ACCOUNTS_FOR(identity); + const dst = ACCOUNTS_UNCLAIMED; + + const unassignOne = (count) => + PubsubConnector.broadcastClient().rpoplpushAsync(src, dst).then((val) => + (val ? unassignOne(count + 1) : Promise.resolve(count)) + ) + + return unassignOne(0).then((returned) => { + console.log(`SyncWorkerPool: Returned ${returned} accounts assigned to ${identity}.`) + }); + } + + update() { + this.ensureCapacity().then(() => { + console.log(`SyncWorkerPool: Voluntering to sync additional account.`) + this.acceptUnclaimedAccount().finally(() => { + this.update(); + }); + }) + .catch((err) => { + console.log(`SyncWorkerPool: No capacity for additional accounts. ${err.message}`) + setTimeout(() => this.update(), 5000) + }); + } + + ensureCapacity() { + if (os.freemem() < 20 * 1024 * 1024) { + return Promise.reject(new Error(`<20MB RAM free (${os.freemem()} bytes)`)); + } + + const fiveMinuteLoadAvg = os.loadavg()[1]; + if (fiveMinuteLoadAvg > CPU_COUNT * 0.9) { + return Promise.reject(new Error(`CPU load > 90% (${fiveMinuteLoadAvg} - ${CPU_COUNT} cores)`)); + } + + if (this._exiting) { + return Promise.reject(new Error('Quitting...')) + } + + return Promise.resolve(); + } + + acceptUnclaimedAccount() { + if (!this._waitForAccountClient) { + this._waitForAccountClient = PubsubConnector.buildClient(); + } + + const src = ACCOUNTS_UNCLAIMED; + const dst = ACCOUNTS_FOR(IDENTITY); + + return this._waitForAccountClient.brpoplpushAsync(src, dst, 10000) + .then((accountId) => { + if (accountId) { + this.addWorkerForAccountId(accountId); + } + }); + } + + addWorkerForAccountId(accountId) { + DatabaseConnector.forShared().then(({Account}) => { + Account.find({where: {id: accountId}}).then((account) => { + if (!account) { + return; + } + DatabaseConnector.forAccount(account.id).then((db) => { + console.log(`SyncWorkerPool: Starting worker for Account ${accountId}`) + this._workers[account.id] = new SyncWorker(account, db); + }); + }); + }); + } +} + +module.exports = SyncProcessManager; diff --git a/packages/nylas-sync/sync-worker-pool.js b/packages/nylas-sync/sync-worker-pool.js deleted file mode 100644 index aeca79a18..000000000 --- a/packages/nylas-sync/sync-worker-pool.js +++ /dev/null @@ -1,16 +0,0 @@ -const SyncWorker = require('./sync-worker'); -const {DatabaseConnectionFactory} = require(`nylas-core`) - -class SyncWorkerPool { - constructor() { - this._workers = {}; - } - - addWorkerForAccount(account) { - DatabaseConnectionFactory.forAccount(account.id).then((db) => { - this._workers[account.id] = new SyncWorker(account, db); - }); - } -} - -module.exports = SyncWorkerPool; diff --git a/packages/nylas-sync/sync-worker.js b/packages/nylas-sync/sync-worker.js index 1e3566c26..819cd0bb6 100644 --- a/packages/nylas-sync/sync-worker.js +++ b/packages/nylas-sync/sync-worker.js @@ -1,4 +1,8 @@ -const {IMAPConnection} = require('nylas-core'); +const { + IMAPConnection, + PubsubConnector, + DatabaseConnector, +} = require('nylas-core'); const RefreshMailboxesOperation = require('./imap/refresh-mailboxes-operation') const SyncMailboxOperation = require('./imap/sync-mailbox-operation') // @@ -28,17 +32,28 @@ class SyncWorker { this.syncNow(); this.scheduleExpiration(); + + this._listener = PubsubConnector.observableForAccountChanges(account.id).subscribe(() => { + this.onAccountChanged(); + }); + } + + cleanup() { + this._listener.dispose(); } - // TODO: How does this get called? onAccountChanged() { - this.syncNow(); - this.scheduleExpiration(); + DatabaseConnector.forShared().then(({Account}) => { + Account.find({where: {id: this._account.id}}).then((account) => { + this._account = account; + this.syncNow(); + this.scheduleExpiration(); + }) + }); } onExpired() { - // Returning syncs to the unclaimed queue every so often is healthy. - // TODO: That. + this.cleanup(); } onSyncDidComplete() {