diff --git a/packages/nylas-core/database-connector.js b/packages/nylas-core/database-connector.js index 8c573431b..812f6427b 100644 --- a/packages/nylas-core/database-connector.js +++ b/packages/nylas-core/database-connector.js @@ -75,6 +75,17 @@ class DatabaseConnector { db.sequelize = sequelize; db.Sequelize = Sequelize; + const changeObserver = ({dataValues, $modelOptions}) => { + if ($modelOptions.name.singular === 'Account') { + const PubsubConnector = require('./pubsub-connector'); + PubsubConnector.notifyAccountChange(dataValues.id); + } + } + + sequelize.addHook("afterCreate", changeObserver) + sequelize.addHook("afterUpdate", changeObserver) + sequelize.addHook("afterDelete", changeObserver) + return sequelize.authenticate().then(() => sequelize.sync() ).thenReturn(db); diff --git a/packages/nylas-core/pubsub-connector.js b/packages/nylas-core/pubsub-connector.js index 533b30a5f..e251811f7 100644 --- a/packages/nylas-core/pubsub-connector.js +++ b/packages/nylas-core/pubsub-connector.js @@ -1,6 +1,9 @@ const Rx = require('rx') const bluebird = require('bluebird') const redis = require("redis"); + +const SyncPolicy = require('./sync-policy'); + bluebird.promisifyAll(redis.RedisClient.prototype); bluebird.promisifyAll(redis.Multi.prototype); @@ -34,6 +37,33 @@ class PubsubConnector { // Shared channel + assignPolicy(accountId, policy) { + console.log(`Changing policy for ${accountId} to ${JSON.stringify(policy)}`) + const DatabaseConnector = require('./database-connector'); + DatabaseConnector.forShared().then(({Account}) => { + Account.find({where: {id: accountId}}).then((account) => { + account.syncPolicy = policy; + account.save() + }) + }); + } + + incrementActivePolicyLockForAccount(accountId) { + this.broadcastClient().incrAsync(`connections-${accountId}`).then((val) => { + if (val === 1) { + this.assignPolicy(accountId, SyncPolicy.activeUserPolicy()) + } + }) + } + + decrementActivePolicyLockForAccount(accountId) { + this.broadcastClient().decrAsync(`connections-${accountId}`).then((val) => { + if (val === 0) { + this.assignPolicy(accountId, SyncPolicy.defaultPolicy()) + } + }); + } + notifyAccountChange(accountId) { const channel = this.channelForAccount(accountId); this.broadcastClient().publish(channel, 'modified'); diff --git a/packages/nylas-sync/sync-process-manager.js b/packages/nylas-sync/sync-process-manager.js index a6bace14f..3eeccb074 100644 --- a/packages/nylas-sync/sync-process-manager.js +++ b/packages/nylas-sync/sync-process-manager.js @@ -10,6 +10,7 @@ const ACCOUNTS_CLAIMED_PREFIX = 'accounts:id-'; const ACCOUNTS_FOR = (id) => `${ACCOUNTS_CLAIMED_PREFIX}${id}`; const HEARTBEAT_FOR = (id) => `heartbeat:${id}`; const HEARTBEAT_EXPIRES = 30; // 2 min in prod? +const CLAIM_DURATION = 10 * 60 * 1000; // 2 hours on prod? /* Accounts ALWAYS exist in either `accounts:unclaimed` or an `accounts:{id}` list. @@ -31,7 +32,7 @@ class SyncProcessManager { } start() { - console.log(`SyncWorkerPool: Starting with ID ${IDENTITY}`) + console.log(`ProcessManager: Starting with ID ${IDENTITY}`) this.unassignAccountsAssignedTo(IDENTITY).then(() => { this.unassignAccountsMissingHeartbeats(); @@ -50,12 +51,12 @@ class SyncProcessManager { client.setAsync(key, Date.now()).then(() => client.expireAsync(key, HEARTBEAT_EXPIRES) ).then(() => - console.log("SyncWorkerPool: Published heartbeat.") + console.log("ProcessManager: Published heartbeat.") ) } onSigInt() { - console.log(`SyncWorkerPool: Exiting...`) + console.log(`ProcessManager: Exiting...`) this._exiting = true; this.unassignAccountsAssignedTo(IDENTITY).then(() => @@ -80,7 +81,7 @@ class SyncProcessManager { if (unseenIds.length === 0) { return; } - console.log(`SyncWorkerPool: Adding account IDs ${unseenIds.join(',')} to redis.`) + console.log(`ProcessManager: Adding account IDs ${unseenIds.join(',')} to ${ACCOUNTS_UNCLAIMED}.`) unseenIds.map((id) => client.lpushAsync(ACCOUNTS_UNCLAIMED, id)); }); } @@ -88,7 +89,7 @@ class SyncProcessManager { unassignAccountsMissingHeartbeats() { const client = PubsubConnector.broadcastClient(); - console.log("SyncWorkerPool: Starting unassignment for processes missing heartbeats.") + console.log("ProcessManager: Starting unassignment for processes missing heartbeats.") Promise.each(client.keysAsync(`${ACCOUNTS_CLAIMED_PREFIX}*`), (key) => { const id = key.replace(ACCOUNTS_CLAIMED_PREFIX, ''); @@ -111,19 +112,19 @@ class SyncProcessManager { ) return unassignOne(0).then((returned) => { - console.log(`SyncWorkerPool: Returned ${returned} accounts assigned to ${identity}.`) + console.log(`ProcessManager: Returned ${returned} accounts assigned to ${identity}.`) }); } update() { this.ensureCapacity().then(() => { - console.log(`SyncWorkerPool: Voluntering to sync additional account.`) + console.log(`ProcessManager: Voluntering to sync additional account.`) this.acceptUnclaimedAccount().finally(() => { this.update(); }); }) .catch((err) => { - console.log(`SyncWorkerPool: No capacity for additional accounts. ${err.message}`) + console.log(`ProcessManager: Decided not to sync additional account. ${err.message}`) setTimeout(() => this.update(), 5000) }); } @@ -139,7 +140,7 @@ class SyncProcessManager { } if (this._exiting) { - return Promise.reject(new Error('Quitting...')) + return Promise.reject(new Error('Process is exiting.')) } return Promise.resolve(); @@ -153,11 +154,10 @@ class SyncProcessManager { const src = ACCOUNTS_UNCLAIMED; const dst = ACCOUNTS_FOR(IDENTITY); - return this._waitForAccountClient.brpoplpushAsync(src, dst, 10000) - .then((accountId) => { - if (accountId) { - this.addWorkerForAccountId(accountId); - } + return this._waitForAccountClient.brpoplpushAsync(src, dst, 10000).then((accountId) => { + if (!accountId) { return } + this.addWorkerForAccountId(accountId); + setTimeout(() => this.removeWorker(), CLAIM_DURATION); }); } @@ -168,12 +168,33 @@ class SyncProcessManager { return; } DatabaseConnector.forAccount(account.id).then((db) => { - console.log(`SyncWorkerPool: Starting worker for Account ${accountId}`) + if (this._exiting) { + return; + } + console.log(`ProcessManager: Starting worker for Account ${accountId}`) this._workers[account.id] = new SyncWorker(account, db); }); }); }); } + + removeWorker() { + const src = ACCOUNTS_FOR(IDENTITY); + const dst = ACCOUNTS_UNCLAIMED; + + return PubsubConnector.broadcastClient().rpoplpushAsync(src, dst).then((accountId) => { + if (!accountId) { + return; + } + + console.log(`ProcessManager: Returning account ${accountId} to unclaimed pool.`) + + if (this._workers[accountId]) { + this._workers[accountId].cleanup(); + } + this._workers[accountId] = null; + }); + } } module.exports = SyncProcessManager; diff --git a/packages/nylas-sync/sync-worker.js b/packages/nylas-sync/sync-worker.js index 819cd0bb6..d0c18886c 100644 --- a/packages/nylas-sync/sync-worker.js +++ b/packages/nylas-sync/sync-worker.js @@ -3,6 +3,7 @@ const { PubsubConnector, DatabaseConnector, } = require('nylas-core'); + const RefreshMailboxesOperation = require('./imap/refresh-mailboxes-operation') const SyncMailboxOperation = require('./imap/sync-mailbox-operation') // @@ -29,9 +30,9 @@ class SyncWorker { this._syncTimer = null; this._expirationTimer = null; + this._destroyed = false; this.syncNow(); - this.scheduleExpiration(); this._listener = PubsubConnector.observableForAccountChanges(account.id).subscribe(() => { this.onAccountChanged(); @@ -39,34 +40,32 @@ class SyncWorker { } cleanup() { + this._destroyed = true; this._listener.dispose(); + this._conn.end(); } onAccountChanged() { + console.log("SyncWorker: Detected change to account. Reloading and syncing now.") DatabaseConnector.forShared().then(({Account}) => { Account.find({where: {id: this._account.id}}).then((account) => { this._account = account; this.syncNow(); - this.scheduleExpiration(); }) }); } - onExpired() { - this.cleanup(); - } - onSyncDidComplete() { const {afterSync} = this._account.syncPolicy; if (afterSync === 'idle') { this.getInboxCategory().then((inboxCategory) => { this._conn.openBox(inboxCategory.name, true).then(() => { - console.log(" - Idling on inbox category"); + console.log("SyncWorker: - Idling on inbox category"); }); }); } else if (afterSync === 'close') { - console.log(" - Closing connection"); + console.log("SyncWorker: - Closing connection"); this._conn.end(); this._conn = null; } else { @@ -155,19 +154,12 @@ class SyncWorker { }); } - scheduleExpiration() { - const {expiration} = this._account.syncPolicy; - - clearTimeout(this._expirationTimer); - this._expirationTimer = setTimeout(() => this.onExpired(), expiration); - } - scheduleNextSync() { const {interval} = this._account.syncPolicy; if (interval) { const target = this._lastSyncTime + interval; - console.log(`Next sync scheduled for ${new Date(target).toLocaleString()}`); + console.log(`SyncWorker: Next sync scheduled for ${new Date(target).toLocaleString()}`); this._syncTimer = setTimeout(() => { this.syncNow(); }, target - Date.now());