[local-sync] Restore global queue for message processing to improve perf

Summary:
Sync operations are mostly bound by I/O and the imap connection.
What we believe that is mostly affecting cpu and battery life is that node’s event
loop is being hosed with cpu intensive message processing operations.

To alleviate this, we do a few things:

- Restore a global message processing queue to process messages serially and meter cpu usage (message processing continues to be a fire and forget call from within sync operations)
- Move actual cpu intensive work to the message processing queue, i.e. `MessageFactory.parseFromImap`
- Keep track of message processing queue length, and skip sync operations if queue is too big to prevent massive memory consumption

This commit also renames the package from new-message-processor to
message-processor, given that now it processes both new and existing
messages, and we like to minimize confusion.

Test Plan: manual

Reviewers: spang, khamidou, evan

Reviewed By: evan

Differential Revision: https://phab.nylas.com/D3602
This commit is contained in:
Juan Tejada 2017-01-06 14:28:28 -08:00
parent c7f8796409
commit 83ef8c12b3
11 changed files with 235 additions and 139 deletions

View file

@ -1,6 +1,6 @@
/* eslint global-require: 0 */
/* eslint import/no-dynamic-require: 0 */
const detectThread = require('../src/new-message-processor/detect-thread');
const detectThread = require('../src/message-processor/detect-thread');
const LocalDatabaseConnector = require('../src/shared/local-database-connector');
const {FIXTURES_PATH, ACCOUNT_ID} = require('./helpers')

View file

@ -1,11 +1,13 @@
const {Provider, PromiseUtils} = require('isomorphic-core');
const SyncOperation = require('../sync-operation')
const {localizedCategoryNames} = require('../sync-utils')
const BASE_ROLES = ['inbox', 'sent', 'trash', 'spam'];
const GMAIL_ROLES_WITH_FOLDERS = ['all', 'trash', 'spam'];
class FetchFolderList {
class FetchFolderList extends SyncOperation {
constructor(account, logger) {
super()
this._account = account;
this._provider = account.provider;
this._logger = logger;
@ -125,7 +127,7 @@ class FetchFolderList {
return {next, created, deleted};
}
async run(db, imap) {
async runOperation(db, imap) {
this._db = db;
const boxes = await imap.getBoxes();

View file

@ -1,22 +1,18 @@
const _ = require('underscore');
const os = require('os');
const fs = require('fs');
const path = require('path')
const mkdirp = require('mkdirp');
const {PromiseUtils, IMAPConnection} = require('isomorphic-core');
const {Capabilities} = IMAPConnection;
const MessageFactory = require('../../shared/message-factory')
const {processNewMessage, processExistingMessage} = require('../../new-message-processor')
const SyncOperation = require('../sync-operation')
const MessageProcessor = require('../../message-processor')
const MessageFlagAttributes = ['id', 'threadId', 'folderImapUID', 'unread', 'starred', 'folderImapXGMLabels']
const SHALLOW_SCAN_UID_COUNT = 1000;
const FETCH_MESSAGES_FIRST_COUNT = 100;
const FETCH_MESSAGES_COUNT = 200;
class FetchMessagesInFolder {
class FetchMessagesInFolder extends SyncOperation {
constructor(folder, options, logger) {
super()
this._imap = null
this._box = null
this._db = null
@ -194,7 +190,7 @@ class FetchMessagesInFolder {
return desired;
}
async _fetchMessagesAndQueueForProcessing(range) {
async _fetchAndProcessMessages(range) {
const uidsByPart = {};
await this._box.fetchEach(range, {struct: true}, ({attributes}) => {
@ -207,7 +203,9 @@ class FetchMessagesInFolder {
uidsByPart[key].push(attributes.uid);
})
await PromiseUtils.each(Object.keys(uidsByPart), (key) => {
await PromiseUtils.each(Object.keys(uidsByPart), async (key) => {
// note: the order of UIDs in the array doesn't matter, Gmail always
// returns them in ascending (oldest => newest) order.
const uids = uidsByPart[key];
const desiredParts = JSON.parse(key);
const bodies = ['HEADER'].concat(desiredParts.map(p => p.id));
@ -217,46 +215,25 @@ class FetchMessagesInFolder {
// num_messages: uids.length,
// }, `FetchMessagesInFolder: Fetching parts for messages`)
// note: the order of UIDs in the array doesn't matter, Gmail always
// returns them in ascending (oldest => newest) order.
return this._box.fetchEach(
const promises = []
await this._box.fetchEach(
uids,
{bodies, struct: true},
(imapMessage) => this._processMessage(imapMessage, desiredParts)
(imapMessage) => promises.push(MessageProcessor.queueMessageForProcessing({
imapMessage,
desiredParts,
folderId: this._folder.id,
accountId: this._db.accountId,
}))
);
// We need to wait for all of the messages in the range to be processed
// before actually updating the folder sync state, otherwise we might skip
// messages.
return Promise.all(promises)
});
}
async _processMessage(imapMessage, desiredParts) {
const {Message} = this._db
try {
const messageValues = await MessageFactory.parseFromImap(imapMessage, desiredParts, {
db: this._db,
folder: this._folder,
accountId: this._db.accountId,
});
const existingMessage = await Message.find({where: {id: messageValues.id}});
if (existingMessage) {
await processExistingMessage(existingMessage, messageValues, imapMessage)
} else {
await processNewMessage(messageValues, imapMessage)
}
console.log(`🔃 ✉️ "${messageValues.subject}" - ${messageValues.date}`)
} catch (err) {
this._logger.error(err, {
imapMessage,
desiredParts,
}, `FetchMessagesInFolder: Could not build message`)
const outJSON = JSON.stringify({imapMessage, desiredParts, result: {}});
const outDir = path.join(os.tmpdir(), "k2-parse-errors", this._folder.name)
const outFile = path.join(outDir, imapMessage.attributes.uid.toString());
mkdirp.sync(outDir);
fs.writeFileSync(outFile, outJSON);
}
}
async _openMailboxAndEnsureValidity() {
const box = await this._imap.openBox(this._folder.name);
@ -317,9 +294,9 @@ class FetchMessagesInFolder {
// range: `${min}:${max}`,
// }, `FetchMessagesInFolder: Fetching range`);
await this._fetchMessagesAndQueueForProcessing(`${min}:${max}`);
await this._fetchAndProcessMessages(`${min}:${max}`);
const {fetchedmin, fetchedmax} = this._folder.syncState;
return this.updateFolderSyncState({
return this._folder.updateSyncState({
fetchedmin: fetchedmin ? Math.min(fetchedmin, min) : min,
fetchedmax: fetchedmax ? Math.max(fetchedmax, max) : max,
uidnext: boxUidnext,
@ -372,7 +349,7 @@ class FetchMessagesInFolder {
await this._updateMessageAttributes(remoteUIDAttributes, localMessageAttributes)
// this._logger.info(`FetchMessagesInFolder: finished fetching changes to messages`);
return this.updateFolderSyncState({
return this._folder.updateSyncState({
highestmodseq: nextHighestmodseq,
timeShallowScan: Date.now(),
});
@ -398,22 +375,14 @@ class FetchMessagesInFolder {
// this._logger.info(`FetchMessagesInFolder: Deep scan finished.`);
return this.updateFolderSyncState({
return this._folder.updateSyncState({
highestmodseq: this._box.highestmodseq,
timeDeepScan: Date.now(),
timeShallowScan: Date.now(),
});
}
async updateFolderSyncState(newState) {
if (_.isMatch(this._folder.syncState, newState)) {
return Promise.resolve();
}
this._folder.syncState = Object.assign(this._folder.syncState, newState);
return this._folder.save();
}
async run(db, imap) {
async runOperation(db, imap) {
console.log(`🔃 📂 ${this._folder.name}`)
this._db = db;
this._imap = imap;
@ -423,11 +392,12 @@ class FetchMessagesInFolder {
// If we haven't set any syncState at all, let's set it for the first time
// to generate a delta for N1
if (_.isEmpty(this._folder.syncState)) {
await this.updateFolderSyncState({
await this._folder.updateSyncState({
uidnext: this._box.uidnext,
uidvalidity: this._box.uidvalidity,
fetchedmin: null,
fetchedmax: null,
failedUIDs: [],
})
}
await this._fetchUnsyncedMessages()

View file

@ -0,0 +1,19 @@
const MessageProcessor = require('../message-processor')
class SyncOperation {
async run(...args) {
if (MessageProcessor.queueIsFull()) {
console.log(`🔃 Skipping sync operation - Message processing queue is full`)
return Promise.resolve()
}
return this.runOperation(...args)
}
async runOperation() {
throw new Error('Must implement `SyncOperation::runOperation`')
}
}
module.exports = SyncOperation

View file

@ -73,7 +73,7 @@ class SyncWorker {
this.syncNow({reason: "You've got mail!"});
}
_getIdleFolder() {
_getInboxFolder() {
return this._db.Folder.find({where: {role: ['all', 'inbox']}})
}
@ -394,8 +394,8 @@ class SyncWorker {
// this._logger.info('Syncworker: Completed sync cycle');
// Start idling on the inbox
const idleFolder = await this._getIdleFolder();
await this._conn.openBox(idleFolder.name);
const inbox = await this._getInboxFolder();
await this._conn.openBox(inbox.name);
// this._logger.info('SyncWorker: Idling on inbox folder');
}

View file

@ -0,0 +1,167 @@
const _ = require('underscore')
const os = require('os');
const fs = require('fs');
const path = require('path')
const mkdirp = require('mkdirp');
const detectThread = require('./detect-thread');
const extractFiles = require('./extract-files');
const extractContacts = require('./extract-contacts');
const MessageFactory = require('../shared/message-factory')
const LocalDatabaseConnector = require('../shared/local-database-connector');
const MAX_QUEUE_LENGTH = 500
const PROCESSING_DELAY = 0
class MessageProcessor {
constructor() {
// The queue is a chain of Promises
this._queue = Promise.resolve()
this._queueLength = 0
}
queueLength() {
return this._queueLength
}
queueIsFull() {
return this._queueLength >= MAX_QUEUE_LENGTH
}
/**
* @returns Promise that resolves when message has been processed
* This promise will never reject, given that this function is meant to be
* called as a fire and forget operation
* If message processing fails, we will register the failure in the folder
* syncState
*/
queueMessageForProcessing({accountId, folderId, imapMessage, desiredParts}) {
return new Promise((resolve) => {
this._queueLength++
this._queue = this._queue.then(async () => {
await this._processMessage({accountId, folderId, imapMessage, desiredParts})
this._queueLength--
// To save memory, we reset the Promise chain if the queue reaches a
// length of 0, otherwise we will continue referencing the entire chain
// of promises that came before
if (this._queueLength === 0) {
this._queue = Promise.resolve()
}
resolve()
// Throttle message processing to meter cpu usage
await new Promise(r => setTimeout(r, PROCESSING_DELAY))
})
})
}
async _processMessage({accountId, folderId, imapMessage, desiredParts}) {
const db = await LocalDatabaseConnector.forAccount(accountId);
const {Message, Folder} = db
const folder = await Folder.findById(folderId)
try {
const messageValues = await MessageFactory.parseFromImap(imapMessage, desiredParts, {
db,
folder,
accountId,
});
const existingMessage = await Message.find({where: {id: messageValues.id}});
let processedMessage;
if (existingMessage) {
processedMessage = await this._processExistingMessage(existingMessage, messageValues, imapMessage)
} else {
processedMessage = await this._processNewMessage(messageValues, imapMessage)
}
console.log(`🔃 ✉️ "${messageValues.subject}" - ${messageValues.date}`)
return processedMessage
} catch (err) {
console.error(`FetchMessagesInFolder: Could not build message`, {
err,
imapMessage,
desiredParts,
})
// Keep track of uids we failed to fetch
const {failedUIDs = []} = folder.syncState
const {uid} = imapMessage.attributes
if (uid) {
await folder.updateSyncState({failedUIDs: _.uniq(failedUIDs.concat([uid]))})
}
// Save parse errors for future debugging
const outJSON = JSON.stringify({imapMessage, desiredParts, result: {}});
const outDir = path.join(os.tmpdir(), "k2-parse-errors", folder.name)
const outFile = path.join(outDir, imapMessage.attributes.uid.toString());
mkdirp.sync(outDir);
fs.writeFileSync(outFile, outJSON);
return null
}
}
async _processNewMessage(message, imapMessage) {
const {accountId} = message;
const db = await LocalDatabaseConnector.forAccount(accountId);
const {Message} = db
const existingMessage = await Message.findById(message.id)
if (existingMessage) {
// This is an extremely rare case when 2 or more /new/ messages with
// the exact same headers were queued for creation (same subject,
// participants, timestamp, and message-id header). In this case, we
// will ignore it and report the error
console.warn('MessageProcessor: Encountered 2 new messages with the same id', {message})
return null
}
const thread = await detectThread({db, message});
message.threadId = thread.id;
const createdMessage = await Message.create(message);
await extractFiles({db, message, imapMessage});
await extractContacts({db, message});
createdMessage.isProcessed = true;
await createdMessage.save()
return createdMessage
}
/**
* When we send a message we store an incomplete copy in the local
* database while we wait for the sync loop to discover the actually
* delivered one. We store this to keep track of our delivered state and
* to ensure it's in the sent folder.
*
* We also get already processed messages because they may have had their
* folders or labels changed or had some other property updated with them.
*
* It'll have the basic ID, but no thread, labels, etc.
*/
async _processExistingMessage(existingMessage, parsedMessage, rawIMAPMessage) {
const {accountId} = parsedMessage;
const db = await LocalDatabaseConnector.forAccount(accountId);
await existingMessage.update(parsedMessage);
if (parsedMessage.labels && parsedMessage.labels.length > 0) {
await existingMessage.setLabels(parsedMessage.labels)
}
let thread = await existingMessage.getThread();
if (!existingMessage.isProcessed) {
if (!thread) {
thread = await detectThread({db, message: parsedMessage});
existingMessage.threadId = thread.id;
}
await extractFiles({db, message: existingMessage, imapMessage: rawIMAPMessage});
await extractContacts({db, message: existingMessage});
existingMessage.isProcessed = true;
} else {
if (!thread) {
throw new Error(`Existing processed message ${existingMessage.id} doesn't have thread`)
}
}
await existingMessage.save();
await thread.updateLabelsAndFolders();
return existingMessage
}
}
module.exports = new MessageProcessor()

View file

@ -1,3 +1,4 @@
const _ = require('underscore')
const crypto = require('crypto')
const {DatabaseTypes: {JSONColumn}} = require('isomorphic-core');
const {formatImapPath} = require('../shared/imap-paths-utils');
@ -30,6 +31,9 @@ module.exports = (sequelize, Sequelize) => {
*
* // Timestamp when we last fetched unseen messages
* timeFetchedUnseen,
*
* // UIDs that failed to be fetched
* failedUIDs,
* }
*/
syncState: JSONColumn('syncState'),
@ -63,6 +67,14 @@ module.exports = (sequelize, Sequelize) => {
)
},
updateSyncState(nextSyncState = {}) {
if (_.isMatch(this.syncState, nextSyncState)) {
return Promise.resolve();
}
this.syncState = Object.assign(this.syncState, nextSyncState);
return this.save();
},
toJSON() {
return {
id: `${this.id}`,

View file

@ -1,74 +0,0 @@
const detectThread = require('./detect-thread');
const extractFiles = require('./extract-files');
const extractContacts = require('./extract-contacts');
const LocalDatabaseConnector = require('../shared/local-database-connector');
async function processNewMessage(message, imapMessage) {
const {accountId} = message;
const logger = global.Logger.forAccount({id: accountId}).child({message})
const db = await LocalDatabaseConnector.forAccount(accountId);
const {Message} = db
const existingMessage = await Message.findById(message.id)
if (existingMessage) {
// This is an extremely rare case when 2 or more /new/ messages with
// the exact same headers were queued for creation (same subject,
// participants, timestamp, and message-id header). In this case, we
// will ignore it and report the error
logger.warn({message}, 'MessageProcessor: Encountered 2 new messages with the same id')
return
}
const thread = await detectThread({db, message});
message.threadId = thread.id;
const createdMessage = await Message.create(message);
if (message.labels) {
await createdMessage.addLabels(message.labels)
// Note that the labels aren't officially added until save() is called later
}
await extractFiles({db, message, imapMessage});
await extractContacts({db, message});
createdMessage.isProcessed = true;
await createdMessage.save()
}
/**
* When we send a message we store an incomplete copy in the local
* database while we wait for the sync loop to discover the actually
* delivered one. We store this to keep track of our delivered state and
* to ensure it's in the sent folder.
*
* We also get already processed messages because they may have had their
* folders or labels changed or had some other property updated with them.
*
* It'll have the basic ID, but no thread, labels, etc.
*/
async function processExistingMessage(existingMessage, parsedMessage, rawIMAPMessage) {
const {accountId} = parsedMessage;
const db = await LocalDatabaseConnector.forAccount(accountId);
await existingMessage.update(parsedMessage);
if (parsedMessage.labels && parsedMessage.labels.length > 0) {
await existingMessage.setLabels(parsedMessage.labels)
}
let thread = await existingMessage.getThread();
if (!existingMessage.isProcessed) {
if (!thread) {
thread = await detectThread({db, message: parsedMessage});
existingMessage.threadId = thread.id;
}
await extractFiles({db, message: existingMessage, imapMessage: rawIMAPMessage});
await extractContacts({db, message: existingMessage});
existingMessage.isProcessed = true;
} else {
if (!thread) {
throw new Error(`Existing processed message ${existingMessage.id} doesn't have thread`)
}
}
await existingMessage.save();
await thread.updateLabelsAndFolders();
}
module.exports = {processNewMessage, processExistingMessage}