mirror of
https://github.com/Foundry376/Mailspring.git
synced 2025-01-12 19:23:13 +08:00
208 lines
6.5 KiB
JavaScript
208 lines
6.5 KiB
JavaScript
const os = require('os');
|
|
const SyncWorker = require('./sync-worker');
|
|
const {DatabaseConnector, PubsubConnector, SchedulerUtils} = require(`nylas-core`)
|
|
|
|
const IDENTITY = `${os.hostname()}-${process.pid}`;
|
|
|
|
const {
|
|
ACCOUNTS_FOR,
|
|
ACCOUNTS_UNCLAIMED,
|
|
ACCOUNTS_CLAIMED_PREFIX,
|
|
HEARTBEAT_FOR,
|
|
HEARTBEAT_EXPIRES,
|
|
forEachAccountList,
|
|
} = SchedulerUtils;
|
|
|
|
/*
|
|
Accounts ALWAYS exist in either `accounts:unclaimed` or an `accounts:{id}` list.
|
|
They are atomically moved between these sets as they are claimed and returned.
|
|
|
|
Periodically, each worker in the pool looks at all the `accounts:{id}` lists.
|
|
For each list it finds, it checks for the existence of `heartbeat:{id}`, a key
|
|
that expires quickly if the sync process doesn't refresh it.
|
|
|
|
If it does not find the key, it moves all of the accounts in the list back to
|
|
the unclaimed key.
|
|
|
|
Sync processes only claim an account for a fixed period of time. This means that
|
|
an engineer can add new sync machines to the pool and the load across instances
|
|
will balance on it's own. It also means one bad instance will not permanently
|
|
disrupt sync for any accounts. (Eg: instance has faulty network connection.)
|
|
|
|
Sync processes periodically claim accounts when they can find them, regardless
|
|
of how busy they are. A separate API (`/routes/monitoring`) allows CloudWatch
|
|
to decide whether to spin up instances or take them offline based on CPU/RAM
|
|
utilization across the pool.
|
|
*/
|
|
|
|
class SyncProcessManager {
|
|
constructor() {
|
|
this._workers = {};
|
|
this._listenForSyncsClient = null;
|
|
this._exiting = false;
|
|
this._logger = global.Logger.child({identity: IDENTITY})
|
|
}
|
|
|
|
start() {
|
|
this._logger.info(`ProcessManager: Starting with ID`)
|
|
|
|
this.unassignAccountsAssignedTo(IDENTITY).then(() => {
|
|
this.unassignAccountsMissingHeartbeats();
|
|
this.update();
|
|
});
|
|
|
|
setInterval(() => this.updateHeartbeat(), HEARTBEAT_EXPIRES / 5.0 * 1000);
|
|
this.updateHeartbeat();
|
|
|
|
process.on('SIGINT', () => this.onSigInt());
|
|
process.on('SIGTERM', () => this.onSigInt());
|
|
}
|
|
|
|
updateHeartbeat() {
|
|
const key = HEARTBEAT_FOR(IDENTITY);
|
|
const client = PubsubConnector.broadcastClient();
|
|
client.setAsync(key, Date.now())
|
|
.then(() => client.expireAsync(key, HEARTBEAT_EXPIRES))
|
|
.then(() => {
|
|
this._logger.info({
|
|
accounts_syncing_count: Object.keys(this._workers).length,
|
|
}, "ProcessManager: 💘")
|
|
})
|
|
}
|
|
|
|
onSigInt() {
|
|
this._logger.info(`ProcessManager: Exiting...`)
|
|
this._exiting = true;
|
|
|
|
this.unassignAccountsAssignedTo(IDENTITY).then(() =>
|
|
PubsubConnector.broadcastClient().delAsync(ACCOUNTS_FOR(IDENTITY)).then(() =>
|
|
PubsubConnector.broadcastClient().delAsync(HEARTBEAT_FOR(IDENTITY))
|
|
)
|
|
).finally(() => {
|
|
process.exit(1);
|
|
});
|
|
}
|
|
|
|
ensureAccountIDsInRedis(accountIds) {
|
|
const client = PubsubConnector.broadcastClient();
|
|
|
|
let unseenIds = [].concat(accountIds);
|
|
|
|
this._logger.info("ProcessManager: Starting scan for accountIds in database that are not present in Redis.")
|
|
|
|
return forEachAccountList((foundProcessIdentity, foundIds) => {
|
|
unseenIds = unseenIds.filter((a) => !foundIds.includes(`${a}`))
|
|
})
|
|
.finally(() => {
|
|
if (unseenIds.length === 0) {
|
|
return;
|
|
}
|
|
this._logger.info({
|
|
unseen_ids: unseenIds.join(', '),
|
|
channel: ACCOUNTS_UNCLAIMED,
|
|
}, `ProcessManager: Adding unseen account IDs to ACCOUNTS_UNCLAIMED channel.`)
|
|
unseenIds.map((id) => client.lpushAsync(ACCOUNTS_UNCLAIMED, id));
|
|
});
|
|
}
|
|
|
|
unassignAccountsMissingHeartbeats() {
|
|
const client = PubsubConnector.broadcastClient();
|
|
|
|
this._logger.info("ProcessManager: Starting unassignment for processes missing heartbeats.")
|
|
|
|
Promise.each(client.keysAsync(`${ACCOUNTS_CLAIMED_PREFIX}*`), (key) => {
|
|
const id = key.replace(ACCOUNTS_CLAIMED_PREFIX, '');
|
|
return client.existsAsync(HEARTBEAT_FOR(id)).then((exists) =>
|
|
(exists ? Promise.resolve() : this.unassignAccountsAssignedTo(id))
|
|
)
|
|
}).finally(() => {
|
|
const delay = HEARTBEAT_EXPIRES * 1000;
|
|
setTimeout(() => this.unassignAccountsMissingHeartbeats(), delay);
|
|
});
|
|
}
|
|
|
|
unassignAccountsAssignedTo(identity) {
|
|
const src = ACCOUNTS_FOR(identity);
|
|
const dst = ACCOUNTS_UNCLAIMED;
|
|
|
|
const unassignOne = (count) =>
|
|
PubsubConnector.broadcastClient().rpoplpushAsync(src, dst).then((val) =>
|
|
(val ? unassignOne(count + 1) : Promise.resolve(count))
|
|
)
|
|
|
|
return unassignOne(0).then((returned) => {
|
|
this._logger.info({
|
|
returned,
|
|
assigned_to: identity,
|
|
}, `ProcessManager: Returned accounts`)
|
|
});
|
|
}
|
|
|
|
update() {
|
|
this._logger.info(`ProcessManager: Searching for an unclaimed account to sync.`)
|
|
|
|
this.acceptUnclaimedAccount().finally(() => {
|
|
if (this._exiting) {
|
|
return;
|
|
}
|
|
this.update();
|
|
});
|
|
}
|
|
|
|
acceptUnclaimedAccount() {
|
|
if (!this._waitForAccountClient) {
|
|
this._waitForAccountClient = PubsubConnector.buildClient();
|
|
}
|
|
|
|
const src = ACCOUNTS_UNCLAIMED;
|
|
const dst = ACCOUNTS_FOR(IDENTITY);
|
|
|
|
return this._waitForAccountClient.brpoplpushAsync(src, dst, 10000).then((accountId) => {
|
|
if (!accountId) {
|
|
return Promise.resolve();
|
|
}
|
|
this.addWorkerForAccountId(accountId);
|
|
|
|
// If we've added an account, wait a second before asking for another one.
|
|
// Spacing them out is probably healthy.
|
|
return Promise.delay(2000);
|
|
});
|
|
}
|
|
|
|
addWorkerForAccountId(accountId) {
|
|
DatabaseConnector.forShared().then(({Account}) => {
|
|
Account.find({where: {id: accountId}}).then((account) => {
|
|
if (!account || this._workers[account.id]) {
|
|
return;
|
|
}
|
|
DatabaseConnector.forAccount(account.id).then((db) => {
|
|
if (this._exiting || this._workers[account.id]) {
|
|
return;
|
|
}
|
|
this._logger.info({account_id: accountId}, `ProcessManager: Starting worker for Account`)
|
|
|
|
this._workers[account.id] = new SyncWorker(account, db, () => {
|
|
this.removeWorkerForAccountId(accountId)
|
|
});
|
|
});
|
|
});
|
|
});
|
|
}
|
|
|
|
removeWorkerForAccountId(accountId) {
|
|
const src = ACCOUNTS_FOR(IDENTITY);
|
|
const dst = ACCOUNTS_UNCLAIMED;
|
|
|
|
return PubsubConnector.broadcastClient().lremAsync(src, 1, accountId).then((didRemove) => {
|
|
if (didRemove) {
|
|
PubsubConnector.broadcastClient().rpushAsync(dst, accountId)
|
|
} else {
|
|
this._logger.error("Wanted to return item to pool, but didn't have claim on it.")
|
|
return
|
|
}
|
|
this._workers[accountId] = null;
|
|
});
|
|
}
|
|
}
|
|
|
|
module.exports = SyncProcessManager;
|