Return syncs to unclaimed queue after CLAIM_DURATION, just because it's healthy

This commit is contained in:
Ben Gotow 2016-06-23 11:45:24 -07:00
parent c2e9093b42
commit f7c647f7ba
4 changed files with 85 additions and 31 deletions

View file

@ -75,6 +75,17 @@ class DatabaseConnector {
db.sequelize = sequelize;
db.Sequelize = Sequelize;
const changeObserver = ({dataValues, $modelOptions}) => {
if ($modelOptions.name.singular === 'Account') {
const PubsubConnector = require('./pubsub-connector');
PubsubConnector.notifyAccountChange(dataValues.id);
}
}
sequelize.addHook("afterCreate", changeObserver)
sequelize.addHook("afterUpdate", changeObserver)
sequelize.addHook("afterDelete", changeObserver)
return sequelize.authenticate().then(() =>
sequelize.sync()
).thenReturn(db);

View file

@ -1,6 +1,9 @@
const Rx = require('rx')
const bluebird = require('bluebird')
const redis = require("redis");
const SyncPolicy = require('./sync-policy');
bluebird.promisifyAll(redis.RedisClient.prototype);
bluebird.promisifyAll(redis.Multi.prototype);
@ -34,6 +37,33 @@ class PubsubConnector {
// Shared channel
assignPolicy(accountId, policy) {
console.log(`Changing policy for ${accountId} to ${JSON.stringify(policy)}`)
const DatabaseConnector = require('./database-connector');
DatabaseConnector.forShared().then(({Account}) => {
Account.find({where: {id: accountId}}).then((account) => {
account.syncPolicy = policy;
account.save()
})
});
}
incrementActivePolicyLockForAccount(accountId) {
this.broadcastClient().incrAsync(`connections-${accountId}`).then((val) => {
if (val === 1) {
this.assignPolicy(accountId, SyncPolicy.activeUserPolicy())
}
})
}
decrementActivePolicyLockForAccount(accountId) {
this.broadcastClient().decrAsync(`connections-${accountId}`).then((val) => {
if (val === 0) {
this.assignPolicy(accountId, SyncPolicy.defaultPolicy())
}
});
}
notifyAccountChange(accountId) {
const channel = this.channelForAccount(accountId);
this.broadcastClient().publish(channel, 'modified');

View file

@ -10,6 +10,7 @@ const ACCOUNTS_CLAIMED_PREFIX = 'accounts:id-';
const ACCOUNTS_FOR = (id) => `${ACCOUNTS_CLAIMED_PREFIX}${id}`;
const HEARTBEAT_FOR = (id) => `heartbeat:${id}`;
const HEARTBEAT_EXPIRES = 30; // 2 min in prod?
const CLAIM_DURATION = 10 * 60 * 1000; // 2 hours on prod?
/*
Accounts ALWAYS exist in either `accounts:unclaimed` or an `accounts:{id}` list.
@ -31,7 +32,7 @@ class SyncProcessManager {
}
start() {
console.log(`SyncWorkerPool: Starting with ID ${IDENTITY}`)
console.log(`ProcessManager: Starting with ID ${IDENTITY}`)
this.unassignAccountsAssignedTo(IDENTITY).then(() => {
this.unassignAccountsMissingHeartbeats();
@ -50,12 +51,12 @@ class SyncProcessManager {
client.setAsync(key, Date.now()).then(() =>
client.expireAsync(key, HEARTBEAT_EXPIRES)
).then(() =>
console.log("SyncWorkerPool: Published heartbeat.")
console.log("ProcessManager: Published heartbeat.")
)
}
onSigInt() {
console.log(`SyncWorkerPool: Exiting...`)
console.log(`ProcessManager: Exiting...`)
this._exiting = true;
this.unassignAccountsAssignedTo(IDENTITY).then(() =>
@ -80,7 +81,7 @@ class SyncProcessManager {
if (unseenIds.length === 0) {
return;
}
console.log(`SyncWorkerPool: Adding account IDs ${unseenIds.join(',')} to redis.`)
console.log(`ProcessManager: Adding account IDs ${unseenIds.join(',')} to ${ACCOUNTS_UNCLAIMED}.`)
unseenIds.map((id) => client.lpushAsync(ACCOUNTS_UNCLAIMED, id));
});
}
@ -88,7 +89,7 @@ class SyncProcessManager {
unassignAccountsMissingHeartbeats() {
const client = PubsubConnector.broadcastClient();
console.log("SyncWorkerPool: Starting unassignment for processes missing heartbeats.")
console.log("ProcessManager: Starting unassignment for processes missing heartbeats.")
Promise.each(client.keysAsync(`${ACCOUNTS_CLAIMED_PREFIX}*`), (key) => {
const id = key.replace(ACCOUNTS_CLAIMED_PREFIX, '');
@ -111,19 +112,19 @@ class SyncProcessManager {
)
return unassignOne(0).then((returned) => {
console.log(`SyncWorkerPool: Returned ${returned} accounts assigned to ${identity}.`)
console.log(`ProcessManager: Returned ${returned} accounts assigned to ${identity}.`)
});
}
update() {
this.ensureCapacity().then(() => {
console.log(`SyncWorkerPool: Voluntering to sync additional account.`)
console.log(`ProcessManager: Voluntering to sync additional account.`)
this.acceptUnclaimedAccount().finally(() => {
this.update();
});
})
.catch((err) => {
console.log(`SyncWorkerPool: No capacity for additional accounts. ${err.message}`)
console.log(`ProcessManager: Decided not to sync additional account. ${err.message}`)
setTimeout(() => this.update(), 5000)
});
}
@ -139,7 +140,7 @@ class SyncProcessManager {
}
if (this._exiting) {
return Promise.reject(new Error('Quitting...'))
return Promise.reject(new Error('Process is exiting.'))
}
return Promise.resolve();
@ -153,11 +154,10 @@ class SyncProcessManager {
const src = ACCOUNTS_UNCLAIMED;
const dst = ACCOUNTS_FOR(IDENTITY);
return this._waitForAccountClient.brpoplpushAsync(src, dst, 10000)
.then((accountId) => {
if (accountId) {
this.addWorkerForAccountId(accountId);
}
return this._waitForAccountClient.brpoplpushAsync(src, dst, 10000).then((accountId) => {
if (!accountId) { return }
this.addWorkerForAccountId(accountId);
setTimeout(() => this.removeWorker(), CLAIM_DURATION);
});
}
@ -168,12 +168,33 @@ class SyncProcessManager {
return;
}
DatabaseConnector.forAccount(account.id).then((db) => {
console.log(`SyncWorkerPool: Starting worker for Account ${accountId}`)
if (this._exiting) {
return;
}
console.log(`ProcessManager: Starting worker for Account ${accountId}`)
this._workers[account.id] = new SyncWorker(account, db);
});
});
});
}
removeWorker() {
const src = ACCOUNTS_FOR(IDENTITY);
const dst = ACCOUNTS_UNCLAIMED;
return PubsubConnector.broadcastClient().rpoplpushAsync(src, dst).then((accountId) => {
if (!accountId) {
return;
}
console.log(`ProcessManager: Returning account ${accountId} to unclaimed pool.`)
if (this._workers[accountId]) {
this._workers[accountId].cleanup();
}
this._workers[accountId] = null;
});
}
}
module.exports = SyncProcessManager;

View file

@ -3,6 +3,7 @@ const {
PubsubConnector,
DatabaseConnector,
} = require('nylas-core');
const RefreshMailboxesOperation = require('./imap/refresh-mailboxes-operation')
const SyncMailboxOperation = require('./imap/sync-mailbox-operation')
//
@ -29,9 +30,9 @@ class SyncWorker {
this._syncTimer = null;
this._expirationTimer = null;
this._destroyed = false;
this.syncNow();
this.scheduleExpiration();
this._listener = PubsubConnector.observableForAccountChanges(account.id).subscribe(() => {
this.onAccountChanged();
@ -39,34 +40,32 @@ class SyncWorker {
}
cleanup() {
this._destroyed = true;
this._listener.dispose();
this._conn.end();
}
onAccountChanged() {
console.log("SyncWorker: Detected change to account. Reloading and syncing now.")
DatabaseConnector.forShared().then(({Account}) => {
Account.find({where: {id: this._account.id}}).then((account) => {
this._account = account;
this.syncNow();
this.scheduleExpiration();
})
});
}
onExpired() {
this.cleanup();
}
onSyncDidComplete() {
const {afterSync} = this._account.syncPolicy;
if (afterSync === 'idle') {
this.getInboxCategory().then((inboxCategory) => {
this._conn.openBox(inboxCategory.name, true).then(() => {
console.log(" - Idling on inbox category");
console.log("SyncWorker: - Idling on inbox category");
});
});
} else if (afterSync === 'close') {
console.log(" - Closing connection");
console.log("SyncWorker: - Closing connection");
this._conn.end();
this._conn = null;
} else {
@ -155,19 +154,12 @@ class SyncWorker {
});
}
scheduleExpiration() {
const {expiration} = this._account.syncPolicy;
clearTimeout(this._expirationTimer);
this._expirationTimer = setTimeout(() => this.onExpired(), expiration);
}
scheduleNextSync() {
const {interval} = this._account.syncPolicy;
if (interval) {
const target = this._lastSyncTime + interval;
console.log(`Next sync scheduled for ${new Date(target).toLocaleString()}`);
console.log(`SyncWorker: Next sync scheduled for ${new Date(target).toLocaleString()}`);
this._syncTimer = setTimeout(() => {
this.syncNow();
}, target - Date.now());