From 83ef8c12b34c3c39926121da272bae4118464763 Mon Sep 17 00:00:00 2001 From: Juan Tejada Date: Fri, 6 Jan 2017 14:28:28 -0800 Subject: [PATCH] [local-sync] Restore global queue for message processing to improve perf MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: Sync operations are mostly bound by I/O and the imap connection. What we believe that is mostly affecting cpu and battery life is that node’s event loop is being hosed with cpu intensive message processing operations. To alleviate this, we do a few things: - Restore a global message processing queue to process messages serially and meter cpu usage (message processing continues to be a fire and forget call from within sync operations) - Move actual cpu intensive work to the message processing queue, i.e. `MessageFactory.parseFromImap` - Keep track of message processing queue length, and skip sync operations if queue is too big to prevent massive memory consumption This commit also renames the package from new-message-processor to message-processor, given that now it processes both new and existing messages, and we like to minimize confusion. Test Plan: manual Reviewers: spang, khamidou, evan Reviewed By: evan Differential Revision: https://phab.nylas.com/D3602 --- packages/local-sync/spec/threading-spec.js | 2 +- .../imap/fetch-folder-list.js | 6 +- .../imap/fetch-messages-in-folder.js | 88 +++------ .../src/local-sync-worker/sync-operation.js | 19 ++ .../src/local-sync-worker/sync-worker.js | 6 +- .../detect-thread.js | 0 .../extract-contacts.js | 0 .../extract-files.js | 0 .../local-sync/src/message-processor/index.js | 167 ++++++++++++++++++ packages/local-sync/src/models/folder.js | 12 ++ .../src/new-message-processor/index.js | 74 -------- 11 files changed, 235 insertions(+), 139 deletions(-) create mode 100644 packages/local-sync/src/local-sync-worker/sync-operation.js rename packages/local-sync/src/{new-message-processor => message-processor}/detect-thread.js (100%) rename packages/local-sync/src/{new-message-processor => message-processor}/extract-contacts.js (100%) rename packages/local-sync/src/{new-message-processor => message-processor}/extract-files.js (100%) create mode 100644 packages/local-sync/src/message-processor/index.js delete mode 100644 packages/local-sync/src/new-message-processor/index.js diff --git a/packages/local-sync/spec/threading-spec.js b/packages/local-sync/spec/threading-spec.js index dfbdc4b93..d9572051d 100644 --- a/packages/local-sync/spec/threading-spec.js +++ b/packages/local-sync/spec/threading-spec.js @@ -1,6 +1,6 @@ /* eslint global-require: 0 */ /* eslint import/no-dynamic-require: 0 */ -const detectThread = require('../src/new-message-processor/detect-thread'); +const detectThread = require('../src/message-processor/detect-thread'); const LocalDatabaseConnector = require('../src/shared/local-database-connector'); const {FIXTURES_PATH, ACCOUNT_ID} = require('./helpers') diff --git a/packages/local-sync/src/local-sync-worker/imap/fetch-folder-list.js b/packages/local-sync/src/local-sync-worker/imap/fetch-folder-list.js index d9c46300d..fd5731e5e 100644 --- a/packages/local-sync/src/local-sync-worker/imap/fetch-folder-list.js +++ b/packages/local-sync/src/local-sync-worker/imap/fetch-folder-list.js @@ -1,11 +1,13 @@ const {Provider, PromiseUtils} = require('isomorphic-core'); +const SyncOperation = require('../sync-operation') const {localizedCategoryNames} = require('../sync-utils') const BASE_ROLES = ['inbox', 'sent', 'trash', 'spam']; const GMAIL_ROLES_WITH_FOLDERS = ['all', 'trash', 'spam']; -class FetchFolderList { +class FetchFolderList extends SyncOperation { constructor(account, logger) { + super() this._account = account; this._provider = account.provider; this._logger = logger; @@ -125,7 +127,7 @@ class FetchFolderList { return {next, created, deleted}; } - async run(db, imap) { + async runOperation(db, imap) { this._db = db; const boxes = await imap.getBoxes(); diff --git a/packages/local-sync/src/local-sync-worker/imap/fetch-messages-in-folder.js b/packages/local-sync/src/local-sync-worker/imap/fetch-messages-in-folder.js index 73c113d8d..d04e5b837 100644 --- a/packages/local-sync/src/local-sync-worker/imap/fetch-messages-in-folder.js +++ b/packages/local-sync/src/local-sync-worker/imap/fetch-messages-in-folder.js @@ -1,22 +1,18 @@ const _ = require('underscore'); -const os = require('os'); -const fs = require('fs'); -const path = require('path') -const mkdirp = require('mkdirp'); - const {PromiseUtils, IMAPConnection} = require('isomorphic-core'); const {Capabilities} = IMAPConnection; -const MessageFactory = require('../../shared/message-factory') -const {processNewMessage, processExistingMessage} = require('../../new-message-processor') +const SyncOperation = require('../sync-operation') +const MessageProcessor = require('../../message-processor') + const MessageFlagAttributes = ['id', 'threadId', 'folderImapUID', 'unread', 'starred', 'folderImapXGMLabels'] - const SHALLOW_SCAN_UID_COUNT = 1000; const FETCH_MESSAGES_FIRST_COUNT = 100; const FETCH_MESSAGES_COUNT = 200; -class FetchMessagesInFolder { +class FetchMessagesInFolder extends SyncOperation { constructor(folder, options, logger) { + super() this._imap = null this._box = null this._db = null @@ -194,7 +190,7 @@ class FetchMessagesInFolder { return desired; } - async _fetchMessagesAndQueueForProcessing(range) { + async _fetchAndProcessMessages(range) { const uidsByPart = {}; await this._box.fetchEach(range, {struct: true}, ({attributes}) => { @@ -207,7 +203,9 @@ class FetchMessagesInFolder { uidsByPart[key].push(attributes.uid); }) - await PromiseUtils.each(Object.keys(uidsByPart), (key) => { + await PromiseUtils.each(Object.keys(uidsByPart), async (key) => { + // note: the order of UIDs in the array doesn't matter, Gmail always + // returns them in ascending (oldest => newest) order. const uids = uidsByPart[key]; const desiredParts = JSON.parse(key); const bodies = ['HEADER'].concat(desiredParts.map(p => p.id)); @@ -217,46 +215,25 @@ class FetchMessagesInFolder { // num_messages: uids.length, // }, `FetchMessagesInFolder: Fetching parts for messages`) - // note: the order of UIDs in the array doesn't matter, Gmail always - // returns them in ascending (oldest => newest) order. - - return this._box.fetchEach( + const promises = [] + await this._box.fetchEach( uids, {bodies, struct: true}, - (imapMessage) => this._processMessage(imapMessage, desiredParts) + (imapMessage) => promises.push(MessageProcessor.queueMessageForProcessing({ + imapMessage, + desiredParts, + folderId: this._folder.id, + accountId: this._db.accountId, + })) ); + + // We need to wait for all of the messages in the range to be processed + // before actually updating the folder sync state, otherwise we might skip + // messages. + return Promise.all(promises) }); } - async _processMessage(imapMessage, desiredParts) { - const {Message} = this._db - - try { - const messageValues = await MessageFactory.parseFromImap(imapMessage, desiredParts, { - db: this._db, - folder: this._folder, - accountId: this._db.accountId, - }); - const existingMessage = await Message.find({where: {id: messageValues.id}}); - if (existingMessage) { - await processExistingMessage(existingMessage, messageValues, imapMessage) - } else { - await processNewMessage(messageValues, imapMessage) - } - console.log(`πŸ”ƒ βœ‰οΈ "${messageValues.subject}" - ${messageValues.date}`) - } catch (err) { - this._logger.error(err, { - imapMessage, - desiredParts, - }, `FetchMessagesInFolder: Could not build message`) - const outJSON = JSON.stringify({imapMessage, desiredParts, result: {}}); - const outDir = path.join(os.tmpdir(), "k2-parse-errors", this._folder.name) - const outFile = path.join(outDir, imapMessage.attributes.uid.toString()); - mkdirp.sync(outDir); - fs.writeFileSync(outFile, outJSON); - } - } - async _openMailboxAndEnsureValidity() { const box = await this._imap.openBox(this._folder.name); @@ -317,9 +294,9 @@ class FetchMessagesInFolder { // range: `${min}:${max}`, // }, `FetchMessagesInFolder: Fetching range`); - await this._fetchMessagesAndQueueForProcessing(`${min}:${max}`); + await this._fetchAndProcessMessages(`${min}:${max}`); const {fetchedmin, fetchedmax} = this._folder.syncState; - return this.updateFolderSyncState({ + return this._folder.updateSyncState({ fetchedmin: fetchedmin ? Math.min(fetchedmin, min) : min, fetchedmax: fetchedmax ? Math.max(fetchedmax, max) : max, uidnext: boxUidnext, @@ -372,7 +349,7 @@ class FetchMessagesInFolder { await this._updateMessageAttributes(remoteUIDAttributes, localMessageAttributes) // this._logger.info(`FetchMessagesInFolder: finished fetching changes to messages`); - return this.updateFolderSyncState({ + return this._folder.updateSyncState({ highestmodseq: nextHighestmodseq, timeShallowScan: Date.now(), }); @@ -398,22 +375,14 @@ class FetchMessagesInFolder { // this._logger.info(`FetchMessagesInFolder: Deep scan finished.`); - return this.updateFolderSyncState({ + return this._folder.updateSyncState({ highestmodseq: this._box.highestmodseq, timeDeepScan: Date.now(), timeShallowScan: Date.now(), }); } - async updateFolderSyncState(newState) { - if (_.isMatch(this._folder.syncState, newState)) { - return Promise.resolve(); - } - this._folder.syncState = Object.assign(this._folder.syncState, newState); - return this._folder.save(); - } - - async run(db, imap) { + async runOperation(db, imap) { console.log(`πŸ”ƒ πŸ“‚ ${this._folder.name}`) this._db = db; this._imap = imap; @@ -423,11 +392,12 @@ class FetchMessagesInFolder { // If we haven't set any syncState at all, let's set it for the first time // to generate a delta for N1 if (_.isEmpty(this._folder.syncState)) { - await this.updateFolderSyncState({ + await this._folder.updateSyncState({ uidnext: this._box.uidnext, uidvalidity: this._box.uidvalidity, fetchedmin: null, fetchedmax: null, + failedUIDs: [], }) } await this._fetchUnsyncedMessages() diff --git a/packages/local-sync/src/local-sync-worker/sync-operation.js b/packages/local-sync/src/local-sync-worker/sync-operation.js new file mode 100644 index 000000000..d2083a8d3 --- /dev/null +++ b/packages/local-sync/src/local-sync-worker/sync-operation.js @@ -0,0 +1,19 @@ +const MessageProcessor = require('../message-processor') + + +class SyncOperation { + async run(...args) { + if (MessageProcessor.queueIsFull()) { + console.log(`πŸ”ƒ Skipping sync operation - Message processing queue is full`) + return Promise.resolve() + } + + return this.runOperation(...args) + } + + async runOperation() { + throw new Error('Must implement `SyncOperation::runOperation`') + } +} + +module.exports = SyncOperation diff --git a/packages/local-sync/src/local-sync-worker/sync-worker.js b/packages/local-sync/src/local-sync-worker/sync-worker.js index 6904b7954..ce176d54a 100644 --- a/packages/local-sync/src/local-sync-worker/sync-worker.js +++ b/packages/local-sync/src/local-sync-worker/sync-worker.js @@ -73,7 +73,7 @@ class SyncWorker { this.syncNow({reason: "You've got mail!"}); } - _getIdleFolder() { + _getInboxFolder() { return this._db.Folder.find({where: {role: ['all', 'inbox']}}) } @@ -394,8 +394,8 @@ class SyncWorker { // this._logger.info('Syncworker: Completed sync cycle'); // Start idling on the inbox - const idleFolder = await this._getIdleFolder(); - await this._conn.openBox(idleFolder.name); + const inbox = await this._getInboxFolder(); + await this._conn.openBox(inbox.name); // this._logger.info('SyncWorker: Idling on inbox folder'); } diff --git a/packages/local-sync/src/new-message-processor/detect-thread.js b/packages/local-sync/src/message-processor/detect-thread.js similarity index 100% rename from packages/local-sync/src/new-message-processor/detect-thread.js rename to packages/local-sync/src/message-processor/detect-thread.js diff --git a/packages/local-sync/src/new-message-processor/extract-contacts.js b/packages/local-sync/src/message-processor/extract-contacts.js similarity index 100% rename from packages/local-sync/src/new-message-processor/extract-contacts.js rename to packages/local-sync/src/message-processor/extract-contacts.js diff --git a/packages/local-sync/src/new-message-processor/extract-files.js b/packages/local-sync/src/message-processor/extract-files.js similarity index 100% rename from packages/local-sync/src/new-message-processor/extract-files.js rename to packages/local-sync/src/message-processor/extract-files.js diff --git a/packages/local-sync/src/message-processor/index.js b/packages/local-sync/src/message-processor/index.js new file mode 100644 index 000000000..c098d0c93 --- /dev/null +++ b/packages/local-sync/src/message-processor/index.js @@ -0,0 +1,167 @@ +const _ = require('underscore') +const os = require('os'); +const fs = require('fs'); +const path = require('path') +const mkdirp = require('mkdirp'); +const detectThread = require('./detect-thread'); +const extractFiles = require('./extract-files'); +const extractContacts = require('./extract-contacts'); +const MessageFactory = require('../shared/message-factory') +const LocalDatabaseConnector = require('../shared/local-database-connector'); + + +const MAX_QUEUE_LENGTH = 500 +const PROCESSING_DELAY = 0 + +class MessageProcessor { + + constructor() { + // The queue is a chain of Promises + this._queue = Promise.resolve() + this._queueLength = 0 + } + + queueLength() { + return this._queueLength + } + + queueIsFull() { + return this._queueLength >= MAX_QUEUE_LENGTH + } + + /** + * @returns Promise that resolves when message has been processed + * This promise will never reject, given that this function is meant to be + * called as a fire and forget operation + * If message processing fails, we will register the failure in the folder + * syncState + */ + queueMessageForProcessing({accountId, folderId, imapMessage, desiredParts}) { + return new Promise((resolve) => { + this._queueLength++ + this._queue = this._queue.then(async () => { + await this._processMessage({accountId, folderId, imapMessage, desiredParts}) + this._queueLength-- + + // To save memory, we reset the Promise chain if the queue reaches a + // length of 0, otherwise we will continue referencing the entire chain + // of promises that came before + if (this._queueLength === 0) { + this._queue = Promise.resolve() + } + resolve() + + // Throttle message processing to meter cpu usage + await new Promise(r => setTimeout(r, PROCESSING_DELAY)) + }) + }) + } + + async _processMessage({accountId, folderId, imapMessage, desiredParts}) { + const db = await LocalDatabaseConnector.forAccount(accountId); + const {Message, Folder} = db + const folder = await Folder.findById(folderId) + try { + const messageValues = await MessageFactory.parseFromImap(imapMessage, desiredParts, { + db, + folder, + accountId, + }); + const existingMessage = await Message.find({where: {id: messageValues.id}}); + let processedMessage; + if (existingMessage) { + processedMessage = await this._processExistingMessage(existingMessage, messageValues, imapMessage) + } else { + processedMessage = await this._processNewMessage(messageValues, imapMessage) + } + console.log(`πŸ”ƒ βœ‰οΈ "${messageValues.subject}" - ${messageValues.date}`) + return processedMessage + } catch (err) { + console.error(`FetchMessagesInFolder: Could not build message`, { + err, + imapMessage, + desiredParts, + }) + + // Keep track of uids we failed to fetch + const {failedUIDs = []} = folder.syncState + const {uid} = imapMessage.attributes + if (uid) { + await folder.updateSyncState({failedUIDs: _.uniq(failedUIDs.concat([uid]))}) + } + + // Save parse errors for future debugging + const outJSON = JSON.stringify({imapMessage, desiredParts, result: {}}); + const outDir = path.join(os.tmpdir(), "k2-parse-errors", folder.name) + const outFile = path.join(outDir, imapMessage.attributes.uid.toString()); + mkdirp.sync(outDir); + fs.writeFileSync(outFile, outJSON); + return null + } + } + + async _processNewMessage(message, imapMessage) { + const {accountId} = message; + const db = await LocalDatabaseConnector.forAccount(accountId); + const {Message} = db + + const existingMessage = await Message.findById(message.id) + if (existingMessage) { + // This is an extremely rare case when 2 or more /new/ messages with + // the exact same headers were queued for creation (same subject, + // participants, timestamp, and message-id header). In this case, we + // will ignore it and report the error + console.warn('MessageProcessor: Encountered 2 new messages with the same id', {message}) + return null + } + const thread = await detectThread({db, message}); + message.threadId = thread.id; + const createdMessage = await Message.create(message); + await extractFiles({db, message, imapMessage}); + await extractContacts({db, message}); + createdMessage.isProcessed = true; + await createdMessage.save() + return createdMessage + } + + /** + * When we send a message we store an incomplete copy in the local + * database while we wait for the sync loop to discover the actually + * delivered one. We store this to keep track of our delivered state and + * to ensure it's in the sent folder. + * + * We also get already processed messages because they may have had their + * folders or labels changed or had some other property updated with them. + * + * It'll have the basic ID, but no thread, labels, etc. + */ + async _processExistingMessage(existingMessage, parsedMessage, rawIMAPMessage) { + const {accountId} = parsedMessage; + const db = await LocalDatabaseConnector.forAccount(accountId); + await existingMessage.update(parsedMessage); + if (parsedMessage.labels && parsedMessage.labels.length > 0) { + await existingMessage.setLabels(parsedMessage.labels) + } + + let thread = await existingMessage.getThread(); + if (!existingMessage.isProcessed) { + if (!thread) { + thread = await detectThread({db, message: parsedMessage}); + existingMessage.threadId = thread.id; + } + await extractFiles({db, message: existingMessage, imapMessage: rawIMAPMessage}); + await extractContacts({db, message: existingMessage}); + existingMessage.isProcessed = true; + } else { + if (!thread) { + throw new Error(`Existing processed message ${existingMessage.id} doesn't have thread`) + } + } + + await existingMessage.save(); + await thread.updateLabelsAndFolders(); + return existingMessage + } +} + +module.exports = new MessageProcessor() diff --git a/packages/local-sync/src/models/folder.js b/packages/local-sync/src/models/folder.js index 1dc0f11f3..1ce56dc21 100644 --- a/packages/local-sync/src/models/folder.js +++ b/packages/local-sync/src/models/folder.js @@ -1,3 +1,4 @@ +const _ = require('underscore') const crypto = require('crypto') const {DatabaseTypes: {JSONColumn}} = require('isomorphic-core'); const {formatImapPath} = require('../shared/imap-paths-utils'); @@ -30,6 +31,9 @@ module.exports = (sequelize, Sequelize) => { * * // Timestamp when we last fetched unseen messages * timeFetchedUnseen, + * + * // UIDs that failed to be fetched + * failedUIDs, * } */ syncState: JSONColumn('syncState'), @@ -63,6 +67,14 @@ module.exports = (sequelize, Sequelize) => { ) }, + updateSyncState(nextSyncState = {}) { + if (_.isMatch(this.syncState, nextSyncState)) { + return Promise.resolve(); + } + this.syncState = Object.assign(this.syncState, nextSyncState); + return this.save(); + }, + toJSON() { return { id: `${this.id}`, diff --git a/packages/local-sync/src/new-message-processor/index.js b/packages/local-sync/src/new-message-processor/index.js deleted file mode 100644 index ac3996092..000000000 --- a/packages/local-sync/src/new-message-processor/index.js +++ /dev/null @@ -1,74 +0,0 @@ -const detectThread = require('./detect-thread'); -const extractFiles = require('./extract-files'); -const extractContacts = require('./extract-contacts'); -const LocalDatabaseConnector = require('../shared/local-database-connector'); - -async function processNewMessage(message, imapMessage) { - const {accountId} = message; - const logger = global.Logger.forAccount({id: accountId}).child({message}) - const db = await LocalDatabaseConnector.forAccount(accountId); - const {Message} = db - - const existingMessage = await Message.findById(message.id) - if (existingMessage) { - // This is an extremely rare case when 2 or more /new/ messages with - // the exact same headers were queued for creation (same subject, - // participants, timestamp, and message-id header). In this case, we - // will ignore it and report the error - logger.warn({message}, 'MessageProcessor: Encountered 2 new messages with the same id') - return - } - const thread = await detectThread({db, message}); - message.threadId = thread.id; - const createdMessage = await Message.create(message); - - if (message.labels) { - await createdMessage.addLabels(message.labels) - // Note that the labels aren't officially added until save() is called later - } - - await extractFiles({db, message, imapMessage}); - await extractContacts({db, message}); - createdMessage.isProcessed = true; - await createdMessage.save() -} - -/** - * When we send a message we store an incomplete copy in the local - * database while we wait for the sync loop to discover the actually - * delivered one. We store this to keep track of our delivered state and - * to ensure it's in the sent folder. - * - * We also get already processed messages because they may have had their - * folders or labels changed or had some other property updated with them. - * - * It'll have the basic ID, but no thread, labels, etc. - */ -async function processExistingMessage(existingMessage, parsedMessage, rawIMAPMessage) { - const {accountId} = parsedMessage; - const db = await LocalDatabaseConnector.forAccount(accountId); - await existingMessage.update(parsedMessage); - if (parsedMessage.labels && parsedMessage.labels.length > 0) { - await existingMessage.setLabels(parsedMessage.labels) - } - let thread = await existingMessage.getThread(); - - if (!existingMessage.isProcessed) { - if (!thread) { - thread = await detectThread({db, message: parsedMessage}); - existingMessage.threadId = thread.id; - } - await extractFiles({db, message: existingMessage, imapMessage: rawIMAPMessage}); - await extractContacts({db, message: existingMessage}); - existingMessage.isProcessed = true; - } else { - if (!thread) { - throw new Error(`Existing processed message ${existingMessage.id} doesn't have thread`) - } - } - - await existingMessage.save(); - await thread.updateLabelsAndFolders(); -} - -module.exports = {processNewMessage, processExistingMessage}