From 736ef7de44462ceb23d0969b8507f96fa63cd15b Mon Sep 17 00:00:00 2001 From: Andris Reinman Date: Sun, 13 Dec 2020 10:07:23 +0200 Subject: [PATCH] use paging when FETCHing mailbox contents --- lib/consts.js | 5 +- lib/handlers/on-fetch.js | 452 ++++++++++++++++++++------------------- lib/tools.js | 1 + 3 files changed, 242 insertions(+), 216 deletions(-) diff --git a/lib/consts.js b/lib/consts.js index 784266ac..998fd23d 100644 --- a/lib/consts.js +++ b/lib/consts.js @@ -103,5 +103,8 @@ module.exports = { TOTP_WINDOW_SIZE: 6, // how often to send processing updates for long running commands - LONG_COMMAND_NOTIFY_TTL: 1 * 60 * 1000 + LONG_COMMAND_NOTIFY_TTL: 1 * 60 * 1000, + + // when paging through a large list, how many entries to request per page + CURSOR_MAX_PAGE_SIZE: 250 }; diff --git a/lib/handlers/on-fetch.js b/lib/handlers/on-fetch.js index a66833ad..7760c626 100644 --- a/lib/handlers/on-fetch.js +++ b/lib/handlers/on-fetch.js @@ -137,249 +137,271 @@ module.exports = (server, messageHandler, userCache) => (mailbox, options, sessi return callback(...args); }; - let sort = { uid: 1 }; - let cursor = db.database - .collection('messages') - .find(query) - .project(projection) - .sort(sort) - .setReadPreference('secondaryPreferred') - .maxTimeMS(consts.DB_MAX_TIME_MESSAGES); + let lastUid = false; - let limitedKeys = ['_id', 'flags', 'modseq', 'uid']; - if (!Object.keys(projection).some(key => !limitedKeys.includes(key))) { - // limited query, use extra large batch size - cursor = cursor.batchSize(1000); - } + // instead of fetching all messages at once from a large mailbox + // we page it into smaller queries + let processPage = () => { + if (lastUid) { + query.uid = { $gt: lastUid }; + } - let startTime = Date.now(); - let rowCount = 0; - let totalBytes = 0; - let processNext = () => { - cursor.next((err, messageData) => { - if (err) { - server.logger.error( - { - tnx: 'fetch', - cid: session.id, - err - }, - '[%s] FETCHERR error=%s query=%s', - session.id, - err.message, - JSON.stringify(query) - ); - return done(err); - } + let sort = { uid: 1 }; + let cursor = db.database + .collection('messages') + .find(query) + .project(projection) + .sort(sort) + .limit(consts.CURSOR_MAX_PAGE_SIZE) + .setReadPreference('secondaryPreferred') + .maxTimeMS(consts.DB_MAX_TIME_MESSAGES); - try { - // stop processing if IMAP socket is not open anymore - tools.checkSocket(socket); - } catch (err) { - server.logger.error( - { - tnx: 'fetch', - cid: session.id, - err - }, - '[%s] FETCHERR error=%s query=%s', - session.id, - err.message, - JSON.stringify(query) - ); - return done(err); - } + let limitedKeys = ['_id', 'flags', 'modseq', 'uid']; + if (!Object.keys(projection).some(key => !limitedKeys.includes(key))) { + // limited query, use extra large batch size + cursor = cursor.batchSize(1000); + } - if (!messageData) { - return cursor.close(() => { - server.logger.debug( + let startTime = Date.now(); + let rowCount = 0; + let totalBytes = 0; + let processedCount = 0; + let processNext = () => { + cursor.next((err, messageData) => { + if (err) { + server.logger.error( { tnx: 'fetch', - cid: session.id + cid: session.id, + err }, - '[%s] FETCHOK rows=%s user=%s mailbox=%s time=%s', + '[%s] FETCHERR error=%s query=%s', session.id, - rowCount, - mailboxData.user, - mailboxData._id, - (Date.now() - startTime) / 1000 + err.message, + JSON.stringify(query) ); + return done(err); + } - done(null, true, { - rowCount, - totalBytes + try { + // stop processing if IMAP socket is not open anymore + tools.checkSocket(socket); + } catch (err) { + server.logger.error( + { + tnx: 'fetch', + cid: session.id, + err + }, + '[%s] FETCHERR error=%s query=%s', + session.id, + err.message, + JSON.stringify(query) + ); + return done(err); + } + + if (!messageData) { + return cursor.close(() => { + if (processedCount === consts.CURSOR_MAX_PAGE_SIZE) { + // might have more entries, check next page + return setTimeout(processPage, 10); + } + + server.logger.debug( + { + tnx: 'fetch', + cid: session.id + }, + '[%s] FETCHOK rows=%s user=%s mailbox=%s time=%s', + session.id, + rowCount, + mailboxData.user, + mailboxData._id, + (Date.now() - startTime) / 1000 + ); + + done(null, true, { + rowCount, + totalBytes + }); }); - }); - } + } - if (queryAll && !session.selected.uidList.includes(messageData.uid)) { - // skip processing messages that we do not know about yet - return processNext(); - } + processedCount++; + lastUid = messageData.uid; - let markAsSeen = options.markAsSeen && !messageData.flags.includes('\\Seen'); - if (markAsSeen) { - messageData.flags.unshift('\\Seen'); - } - - if (options.metadataOnly && !markAsSeen) { - // quick response - const data = session.formatResponse('FETCH', messageData.uid, { - query: options.query, - values: session.getQueryResponse(options.query, messageData, { - logger: server.logger, - fetchOptions: {}, - database: db.database, - attachmentStorage: messageHandler.attachmentStorage, - acceptUTF8Enabled: session.isUTF8Enabled() - }) - }); - - const compiled = imapHandler.compiler(data); - - // `compiled` is a 'binary' string - totalBytes += compiled.length; - session.writeStream.write({ compiled }); - - rowCount++; - return setImmediate(processNext); - } - - let stream = imapHandler.compileStream( - session.formatResponse('FETCH', messageData.uid, { - query: options.query, - values: session.getQueryResponse(options.query, messageData, { - logger: server.logger, - fetchOptions: {}, - database: db.database, - attachmentStorage: messageHandler.attachmentStorage, - acceptUTF8Enabled: session.isUTF8Enabled() - }) - }) - ); - - rowCount++; - - stream.once('error', err => { - err.processed = true; - server.logger.error( - { - err, - tnx: 'fetch', - cid: session.id, - mid: messageData._id - }, - '[%s] FETCHFAIL message=%s rows=%s user=%s mailbox=%s time=%s error=%s', - session.id, - messageData._id, - rowCount, - mailboxData.user, - mailboxData._id, - (Date.now() - startTime) / 1000, - err.message - ); - - session.socket.end('\n* BYE Internal Server Error\n'); - return cursor.close(() => - done(err, false, { - rowCount, - totalBytes - }) - ); - }); - - let limiter = new LimitedFetch({ - key: 'idw:' + session.user.id, - ttlcounter: messageHandler.counters.ttlcounter, - maxBytes: limit - }); - stream.pipe(limiter); - - limiter._uid = messageData.uid; - limiter._message = messageData._id; - limiter._mailbox = mailbox; - - // send formatted response to socket - session.writeStream.write(limiter, () => { - totalBytes += limiter.bytes; - - if (!markAsSeen) { + if (queryAll && !session.selected.uidList.includes(messageData.uid)) { + // skip processing messages that we do not know about yet return processNext(); } - server.logger.debug( - { - tnx: 'flags', - cid: session.id - }, - '[%s] UPDATE FLAGS message=%s', - session.id, - messageData.uid + let markAsSeen = options.markAsSeen && !messageData.flags.includes('\\Seen'); + if (markAsSeen) { + messageData.flags.unshift('\\Seen'); + } + + if (options.metadataOnly && !markAsSeen) { + // quick response + const data = session.formatResponse('FETCH', messageData.uid, { + query: options.query, + values: session.getQueryResponse(options.query, messageData, { + logger: server.logger, + fetchOptions: {}, + database: db.database, + attachmentStorage: messageHandler.attachmentStorage, + acceptUTF8Enabled: session.isUTF8Enabled() + }) + }); + + const compiled = imapHandler.compiler(data); + + // `compiled` is a 'binary' string + totalBytes += compiled.length; + session.writeStream.write({ compiled }); + + rowCount++; + return setImmediate(processNext); + } + + let stream = imapHandler.compileStream( + session.formatResponse('FETCH', messageData.uid, { + query: options.query, + values: session.getQueryResponse(options.query, messageData, { + logger: server.logger, + fetchOptions: {}, + database: db.database, + attachmentStorage: messageHandler.attachmentStorage, + acceptUTF8Enabled: session.isUTF8Enabled() + }) + }) ); - isUpdated = true; + rowCount++; - updateEntries.push({ - updateOne: { - filter: { - _id: messageData._id, - // include sharding key in query - mailbox: mailboxData._id, - uid: messageData.uid + stream.once('error', err => { + err.processed = true; + server.logger.error( + { + err, + tnx: 'fetch', + cid: session.id, + mid: messageData._id }, - update: { - $addToSet: { - flags: '\\Seen' + '[%s] FETCHFAIL message=%s rows=%s user=%s mailbox=%s time=%s error=%s', + session.id, + messageData._id, + rowCount, + mailboxData.user, + mailboxData._id, + (Date.now() - startTime) / 1000, + err.message + ); + + session.socket.end('\n* BYE Internal Server Error\n'); + return cursor.close(() => + done(err, false, { + rowCount, + totalBytes + }) + ); + }); + + let limiter = new LimitedFetch({ + key: 'idw:' + session.user.id, + ttlcounter: messageHandler.counters.ttlcounter, + maxBytes: limit + }); + stream.pipe(limiter); + + limiter._uid = messageData.uid; + limiter._message = messageData._id; + limiter._mailbox = mailbox; + + // send formatted response to socket + session.writeStream.write(limiter, () => { + totalBytes += limiter.bytes; + + if (!markAsSeen) { + return processNext(); + } + + server.logger.debug( + { + tnx: 'flags', + cid: session.id + }, + '[%s] UPDATE FLAGS message=%s', + session.id, + messageData.uid + ); + + isUpdated = true; + + updateEntries.push({ + updateOne: { + filter: { + _id: messageData._id, + // include sharding key in query + mailbox: mailboxData._id, + uid: messageData.uid }, - $set: { - unseen: false + update: { + $addToSet: { + flags: '\\Seen' + }, + $set: { + unseen: false + } } } + }); + + notifyEntries.push({ + command: 'FETCH', + ignore: session.id, + uid: messageData.uid, + flags: messageData.flags, + message: messageData._id, + unseenChange: true + }); + + if (updateEntries.length >= consts.BULK_BATCH_SIZE) { + return db.database.collection('messages').bulkWrite( + updateEntries, + { + ordered: false, + w: 1 + }, + err => { + updateEntries = []; + if (err) { + return cursor.close(() => + done(err, false, { + rowCount, + totalBytes + }) + ); + } + + server.notifier.addEntries(mailboxData, notifyEntries, () => { + notifyEntries = []; + server.notifier.fire(session.user.id); + processNext(); + }); + } + ); + } else { + processNext(); } }); - - notifyEntries.push({ - command: 'FETCH', - ignore: session.id, - uid: messageData.uid, - flags: messageData.flags, - message: messageData._id, - unseenChange: true - }); - - if (updateEntries.length >= consts.BULK_BATCH_SIZE) { - return db.database.collection('messages').bulkWrite( - updateEntries, - { - ordered: false, - w: 1 - }, - err => { - updateEntries = []; - if (err) { - return cursor.close(() => - done(err, false, { - rowCount, - totalBytes - }) - ); - } - - server.notifier.addEntries(mailboxData, notifyEntries, () => { - notifyEntries = []; - server.notifier.fire(session.user.id); - processNext(); - }); - } - ); - } else { - processNext(); - } }); - }); + }; + + processNext(); }; - processNext(); + processPage(); }); }); } diff --git a/lib/tools.js b/lib/tools.js index 11d9cb6c..a5d968b7 100644 --- a/lib/tools.js +++ b/lib/tools.js @@ -28,6 +28,7 @@ function checkRangeQuery(uids, ne) { for (let i = 1, len = uids.length; i < len; i++) { if (uids[i] !== uids[i - 1] + 1) { + // TODO: group into AND conditions, otherwise expands too much! return { [!ne ? '$in' : '$nin']: uids };