mirror of
https://github.com/Foundry376/Mailspring.git
synced 2025-03-01 10:33:14 +08:00
Remove concept of self-limiting workers, will use cloudwatch metrics collection to scale fleet instead of queue length
This commit is contained in:
parent
f7c647f7ba
commit
8e692982bb
1 changed files with 19 additions and 26 deletions
|
@ -22,6 +22,16 @@ that expires quickly if the sync process doesn't refresh it.
|
|||
|
||||
If it does not find the key, it moves all of the accounts in the list back to
|
||||
the unclaimed key.
|
||||
|
||||
Sync processes only claim an account for a fixed period of time. This means that
|
||||
an engineer can add new sync machines to the pool and the load across instances
|
||||
will balance on it's own. It also means one bad instance will not permanently
|
||||
disrupt sync for any accounts. (Eg: instance has faulty network connection.)
|
||||
|
||||
Sync processes periodically claim accounts when they can find them, regardless
|
||||
of how busy they are. A separate API (`/routes/monitoring`) allows CloudWatch
|
||||
to decide whether to spin up instances or take them offline based on CPU/RAM
|
||||
utilization across the pool.
|
||||
*/
|
||||
|
||||
class SyncProcessManager {
|
||||
|
@ -73,6 +83,8 @@ class SyncProcessManager {
|
|||
|
||||
let unseenIds = [].concat(accountIds);
|
||||
|
||||
console.log("ProcessManager: Starting scan for accountIds in database that are not present in Redis.")
|
||||
|
||||
return Promise.each(client.keysAsync(`accounts:*`), (key) =>
|
||||
client.lrangeAsync(key, 0, 20000).then((foundIds) => {
|
||||
unseenIds = unseenIds.filter((a) => !foundIds.includes(`${a}`))
|
||||
|
@ -117,35 +129,16 @@ class SyncProcessManager {
|
|||
}
|
||||
|
||||
update() {
|
||||
this.ensureCapacity().then(() => {
|
||||
console.log(`ProcessManager: Voluntering to sync additional account.`)
|
||||
this.acceptUnclaimedAccount().finally(() => {
|
||||
this.update();
|
||||
});
|
||||
})
|
||||
.catch((err) => {
|
||||
console.log(`ProcessManager: Decided not to sync additional account. ${err.message}`)
|
||||
setTimeout(() => this.update(), 5000)
|
||||
console.log(`ProcessManager: Searching for an unclaimed account to sync.`)
|
||||
|
||||
this.acceptUnclaimedAccount().finally(() => {
|
||||
if (this._exiting) {
|
||||
return;
|
||||
}
|
||||
this.update();
|
||||
});
|
||||
}
|
||||
|
||||
ensureCapacity() {
|
||||
if (os.freemem() < 20 * 1024 * 1024) {
|
||||
return Promise.reject(new Error(`<20MB RAM free (${os.freemem()} bytes)`));
|
||||
}
|
||||
|
||||
const fiveMinuteLoadAvg = os.loadavg()[1];
|
||||
if (fiveMinuteLoadAvg > CPU_COUNT * 0.9) {
|
||||
return Promise.reject(new Error(`CPU load > 90% (${fiveMinuteLoadAvg} - ${CPU_COUNT} cores)`));
|
||||
}
|
||||
|
||||
if (this._exiting) {
|
||||
return Promise.reject(new Error('Process is exiting.'))
|
||||
}
|
||||
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
acceptUnclaimedAccount() {
|
||||
if (!this._waitForAccountClient) {
|
||||
this._waitForAccountClient = PubsubConnector.buildClient();
|
||||
|
|
Loading…
Reference in a new issue