diff --git a/packages/nylas-sync/sync-process-manager.js b/packages/nylas-sync/sync-process-manager.js index d9cab0829..560bc1fad 100644 --- a/packages/nylas-sync/sync-process-manager.js +++ b/packages/nylas-sync/sync-process-manager.js @@ -40,10 +40,11 @@ class SyncProcessManager { this._workers = {}; this._listenForSyncsClient = null; this._exiting = false; + this._logger = global.Logger.child({identity: IDENTITY}) } start() { - global.Logger.info(`ProcessManager: Starting with ID ${IDENTITY}`) + this._logger.info(`ProcessManager: Starting with ID`) this.unassignAccountsAssignedTo(IDENTITY).then(() => { this.unassignAccountsMissingHeartbeats(); @@ -63,12 +64,12 @@ class SyncProcessManager { client.setAsync(key, Date.now()).then(() => client.expireAsync(key, HEARTBEAT_EXPIRES) ).then(() => - global.Logger.info("ProcessManager: 💘") + this._logger.info("ProcessManager: 💘") ) } onSigInt() { - global.Logger.info(`ProcessManager: Exiting...`) + this._logger.info(`ProcessManager: Exiting...`) this._exiting = true; this.unassignAccountsAssignedTo(IDENTITY).then(() => @@ -85,7 +86,7 @@ class SyncProcessManager { let unseenIds = [].concat(accountIds); - global.Logger.info("ProcessManager: Starting scan for accountIds in database that are not present in Redis.") + this._logger.info("ProcessManager: Starting scan for accountIds in database that are not present in Redis.") return forEachAccountList((foundProcessIdentity, foundIds) => { unseenIds = unseenIds.filter((a) => !foundIds.includes(`${a}`)) @@ -94,7 +95,10 @@ class SyncProcessManager { if (unseenIds.length === 0) { return; } - global.Logger.info(`ProcessManager: Adding account IDs ${unseenIds.join(',')} to ${ACCOUNTS_UNCLAIMED}.`) + this._logger.info({ + unseen_ids: unseenIds.join(', '), + channel: ACCOUNTS_UNCLAIMED, + }, `ProcessManager: Adding unseen account IDs to ACCOUNTS_UNCLAIMED channel.`) unseenIds.map((id) => client.lpushAsync(ACCOUNTS_UNCLAIMED, id)); }); } @@ -102,7 +106,7 @@ class SyncProcessManager { unassignAccountsMissingHeartbeats() { const client = PubsubConnector.broadcastClient(); - global.Logger.info("ProcessManager: Starting unassignment for processes missing heartbeats.") + this._logger.info("ProcessManager: Starting unassignment for processes missing heartbeats.") Promise.each(client.keysAsync(`${ACCOUNTS_CLAIMED_PREFIX}*`), (key) => { const id = key.replace(ACCOUNTS_CLAIMED_PREFIX, ''); @@ -125,12 +129,15 @@ class SyncProcessManager { ) return unassignOne(0).then((returned) => { - global.Logger.info(`ProcessManager: Returned ${returned} accounts assigned to ${identity}.`) + this._logger.info({ + returned, + assigned_to: identity, + }, `ProcessManager: Returned accounts`) }); } update() { - global.Logger.info(`ProcessManager: Searching for an unclaimed account to sync.`) + this._logger.info(`ProcessManager: Searching for an unclaimed account to sync.`) this.acceptUnclaimedAccount().finally(() => { if (this._exiting) { @@ -170,7 +177,7 @@ class SyncProcessManager { if (this._exiting || this._workers[account.id]) { return; } - global.Logger.info(`ProcessManager: Starting worker for Account ${accountId}`) + this._logger.info({account_id: accountId}, `ProcessManager: Starting worker for Account`) this._workers[account.id] = new SyncWorker(account, db, () => { this.removeWorkerForAccountId(accountId) }); @@ -187,7 +194,8 @@ class SyncProcessManager { if (didRemove) { PubsubConnector.broadcastClient().rpushAsync(dst, accountId) } else { - throw new Error("Wanted to return item to pool, but didn't have claim on it.") + this._logger.error("Wanted to return item to pool, but didn't have claim on it.") + return } this._workers[accountId] = null; });