Update logger for sync process manager

- Log identity data always
This commit is contained in:
Juan Tejada 2016-07-11 15:40:03 -07:00
parent 5085730902
commit 960dbdeb8f

View file

@ -40,10 +40,11 @@ class SyncProcessManager {
this._workers = {}; this._workers = {};
this._listenForSyncsClient = null; this._listenForSyncsClient = null;
this._exiting = false; this._exiting = false;
this._logger = global.Logger.child({identity: IDENTITY})
} }
start() { start() {
global.Logger.info(`ProcessManager: Starting with ID ${IDENTITY}`) this._logger.info(`ProcessManager: Starting with ID`)
this.unassignAccountsAssignedTo(IDENTITY).then(() => { this.unassignAccountsAssignedTo(IDENTITY).then(() => {
this.unassignAccountsMissingHeartbeats(); this.unassignAccountsMissingHeartbeats();
@ -63,12 +64,12 @@ class SyncProcessManager {
client.setAsync(key, Date.now()).then(() => client.setAsync(key, Date.now()).then(() =>
client.expireAsync(key, HEARTBEAT_EXPIRES) client.expireAsync(key, HEARTBEAT_EXPIRES)
).then(() => ).then(() =>
global.Logger.info("ProcessManager: 💘") this._logger.info("ProcessManager: 💘")
) )
} }
onSigInt() { onSigInt() {
global.Logger.info(`ProcessManager: Exiting...`) this._logger.info(`ProcessManager: Exiting...`)
this._exiting = true; this._exiting = true;
this.unassignAccountsAssignedTo(IDENTITY).then(() => this.unassignAccountsAssignedTo(IDENTITY).then(() =>
@ -85,7 +86,7 @@ class SyncProcessManager {
let unseenIds = [].concat(accountIds); let unseenIds = [].concat(accountIds);
global.Logger.info("ProcessManager: Starting scan for accountIds in database that are not present in Redis.") this._logger.info("ProcessManager: Starting scan for accountIds in database that are not present in Redis.")
return forEachAccountList((foundProcessIdentity, foundIds) => { return forEachAccountList((foundProcessIdentity, foundIds) => {
unseenIds = unseenIds.filter((a) => !foundIds.includes(`${a}`)) unseenIds = unseenIds.filter((a) => !foundIds.includes(`${a}`))
@ -94,7 +95,10 @@ class SyncProcessManager {
if (unseenIds.length === 0) { if (unseenIds.length === 0) {
return; return;
} }
global.Logger.info(`ProcessManager: Adding account IDs ${unseenIds.join(',')} to ${ACCOUNTS_UNCLAIMED}.`) this._logger.info({
unseen_ids: unseenIds.join(', '),
channel: ACCOUNTS_UNCLAIMED,
}, `ProcessManager: Adding unseen account IDs to ACCOUNTS_UNCLAIMED channel.`)
unseenIds.map((id) => client.lpushAsync(ACCOUNTS_UNCLAIMED, id)); unseenIds.map((id) => client.lpushAsync(ACCOUNTS_UNCLAIMED, id));
}); });
} }
@ -102,7 +106,7 @@ class SyncProcessManager {
unassignAccountsMissingHeartbeats() { unassignAccountsMissingHeartbeats() {
const client = PubsubConnector.broadcastClient(); const client = PubsubConnector.broadcastClient();
global.Logger.info("ProcessManager: Starting unassignment for processes missing heartbeats.") this._logger.info("ProcessManager: Starting unassignment for processes missing heartbeats.")
Promise.each(client.keysAsync(`${ACCOUNTS_CLAIMED_PREFIX}*`), (key) => { Promise.each(client.keysAsync(`${ACCOUNTS_CLAIMED_PREFIX}*`), (key) => {
const id = key.replace(ACCOUNTS_CLAIMED_PREFIX, ''); const id = key.replace(ACCOUNTS_CLAIMED_PREFIX, '');
@ -125,12 +129,15 @@ class SyncProcessManager {
) )
return unassignOne(0).then((returned) => { return unassignOne(0).then((returned) => {
global.Logger.info(`ProcessManager: Returned ${returned} accounts assigned to ${identity}.`) this._logger.info({
returned,
assigned_to: identity,
}, `ProcessManager: Returned accounts`)
}); });
} }
update() { update() {
global.Logger.info(`ProcessManager: Searching for an unclaimed account to sync.`) this._logger.info(`ProcessManager: Searching for an unclaimed account to sync.`)
this.acceptUnclaimedAccount().finally(() => { this.acceptUnclaimedAccount().finally(() => {
if (this._exiting) { if (this._exiting) {
@ -170,7 +177,7 @@ class SyncProcessManager {
if (this._exiting || this._workers[account.id]) { if (this._exiting || this._workers[account.id]) {
return; return;
} }
global.Logger.info(`ProcessManager: Starting worker for Account ${accountId}`) this._logger.info({account_id: accountId}, `ProcessManager: Starting worker for Account`)
this._workers[account.id] = new SyncWorker(account, db, () => { this._workers[account.id] = new SyncWorker(account, db, () => {
this.removeWorkerForAccountId(accountId) this.removeWorkerForAccountId(accountId)
}); });
@ -187,7 +194,8 @@ class SyncProcessManager {
if (didRemove) { if (didRemove) {
PubsubConnector.broadcastClient().rpushAsync(dst, accountId) PubsubConnector.broadcastClient().rpushAsync(dst, accountId)
} else { } else {
throw new Error("Wanted to return item to pool, but didn't have claim on it.") this._logger.error("Wanted to return item to pool, but didn't have claim on it.")
return
} }
this._workers[accountId] = null; this._workers[accountId] = null;
}); });