[client-sync] Refactor MessageProcessor to be more robust to errors

Summary:
Errors in the MessageProcessor were causing sync to get stuck
occasionally. This diff refactors queueMessageForProcessing and friends
so that they're more robust to errors during promise construction.

Test Plan: Run locally

Reviewers: juan, spang, evan

Reviewed By: evan

Differential Revision: https://phab.nylas.com/D4190
This commit is contained in:
Mark Hahnenberg 2017-03-10 11:07:11 -08:00
parent c971ed03e2
commit 792994d95c

View file

@ -47,52 +47,61 @@ class MessageProcessor {
} }
/** /**
* @returns Promise that resolves when message has been processed * @returns Promise that resolves when message has been processed. This
* This promise will never reject, given that this function is meant to be * promise will never reject. If message processing fails, we will register
* called as a fire and forget operation * the failure in the folder syncState.
* If message processing fails, we will register the failure in the folder
* syncState
*/ */
queueMessageForProcessing({accountId, folderId, imapMessage, struct, desiredParts, throttle = true} = {}) { queueMessageForProcessing({accountId, folderId, imapMessage, struct, desiredParts, throttle = true} = {}) {
return new Promise((resolve) => { return new Promise(async (resolve) => {
this._queueLength++ let logger;
this._queue = this._queue.then(async () => { let folder;
if (this._currentChunkSize === 0) { try {
this._currentChunkStart = Date.now(); const accountDb = await LocalDatabaseConnector.forShared()
} const account = await accountDb.Account.findById(accountId)
this._currentChunkSize++; const db = await LocalDatabaseConnector.forAccount(accountId);
const {Folder} = db
folder = await Folder.findById(folderId)
logger = global.Logger.forAccount(account)
await this._processMessage({accountId, folderId, imapMessage, struct, desiredParts}) this._queueLength++
this._queueLength-- this._queue = this._queue.then(async () => {
if (this._currentChunkSize === 0) {
// Throttle message processing to meter cpu usage this._currentChunkStart = Date.now();
if (this._currentChunkSize === MAX_CHUNK_SIZE) {
if (throttle) {
await new Promise(r => setTimeout(r, this._computeThrottlingTimeout()));
} }
this._currentChunkSize = 0; this._currentChunkSize++;
}
// To save memory, we reset the Promise chain if the queue reaches a await this._processMessage({db, accountId, folder, imapMessage, struct, desiredParts, logger})
// length of 0, otherwise we will continue referencing the entire chain this._queueLength--
// of promises that came before
if (this._queueLength === 0) { // Throttle message processing to meter cpu usage
this._queue = Promise.resolve() if (this._currentChunkSize === MAX_CHUNK_SIZE) {
if (throttle) {
await new Promise(r => setTimeout(r, this._computeThrottlingTimeout()));
}
this._currentChunkSize = 0;
}
// To save memory, we reset the Promise chain if the queue reaches a
// length of 0, otherwise we will continue referencing the entire chain
// of promises that came before
if (this._queueLength === 0) {
this._queue = Promise.resolve()
}
});
} catch (err) {
if (logger && folder) {
await this._onError({imapMessage, desiredParts, folder, err, logger});
} else {
NylasEnv.reportError(err);
} }
resolve() }
}) resolve();
}) })
} }
async _processMessage({accountId, folderId, imapMessage, struct, desiredParts}) { async _processMessage({db, accountId, folder, imapMessage, struct, desiredParts, logger}) {
const db = await LocalDatabaseConnector.forAccount(accountId);
const {Message, Folder, Label} = db
const folder = await Folder.findById(folderId)
const accountDb = await LocalDatabaseConnector.forShared()
const account = await accountDb.Account.findById(accountId)
const logger = global.Logger.forAccount(account)
try { try {
const {Message, Folder, Label} = db;
const messageValues = await MessageFactory.parseFromImap(imapMessage, desiredParts, { const messageValues = await MessageFactory.parseFromImap(imapMessage, desiredParts, {
db, db,
folder, folder,
@ -147,38 +156,42 @@ class MessageProcessor {
logger.log(`🔃 ✉️ (${folder.name}) "${messageValues.subject}" - ${messageValues.date}`) logger.log(`🔃 ✉️ (${folder.name}) "${messageValues.subject}" - ${messageValues.date}`)
return processedMessage return processedMessage
} catch (err) { } catch (err) {
logger.error(`MessageProcessor: Could not build message`, { await this._onError({imapMessage, desiredParts, folder, err, logger});
err,
imapMessage,
desiredParts,
})
const fingerprint = ["{{ default }}", "message processor", err.message];
NylasEnv.reportError(err, {fingerprint,
rateLimit: {
ratePerHour: 30,
key: `MessageProcessorError:${err.message}`,
},
})
// Keep track of uids we failed to fetch
const {failedUIDs = []} = folder.syncState
const {uid} = imapMessage.attributes
if (uid) {
await folder.updateSyncState({failedUIDs: _.uniq(failedUIDs.concat([uid]))})
}
// Save parse errors for future debugging
if (process.env.NYLAS_DEBUG) {
const outJSON = JSON.stringify({imapMessage, desiredParts, result: {}});
const outDir = path.join(os.tmpdir(), "k2-parse-errors", folder.name)
const outFile = path.join(outDir, imapMessage.attributes.uid.toString());
mkdirp.sync(outDir);
fs.writeFileSync(outFile, outJSON);
}
return null return null
} }
} }
async _onError({imapMessage, desiredParts, folder, err, logger}) {
logger.error(`MessageProcessor: Could not build message`, {
err,
imapMessage,
desiredParts,
})
const fingerprint = ["{{ default }}", "message processor", err.message];
NylasEnv.reportError(err, {fingerprint,
rateLimit: {
ratePerHour: 30,
key: `MessageProcessorError:${err.message}`,
},
})
// Keep track of uids we failed to fetch
const {failedUIDs = []} = folder.syncState
const {uid} = imapMessage.attributes
if (uid) {
await folder.updateSyncState({failedUIDs: _.uniq(failedUIDs.concat([uid]))})
}
// Save parse errors for future debugging
if (process.env.NYLAS_DEBUG) {
const outJSON = JSON.stringify({imapMessage, desiredParts, result: {}});
const outDir = path.join(os.tmpdir(), "k2-parse-errors", folder.name)
const outFile = path.join(outDir, imapMessage.attributes.uid.toString());
mkdirp.sync(outDir);
fs.writeFileSync(outFile, outJSON);
}
}
// Replaces ["<rfc2822messageid>", ...] with [[object Reference], ...] // Replaces ["<rfc2822messageid>", ...] with [[object Reference], ...]
// Creates references that do not yet exist, and adds the correct // Creates references that do not yet exist, and adds the correct
// associations as well // associations as well