use paging when FETCHing mailbox contents

This commit is contained in:
Andris Reinman 2020-12-13 10:07:23 +02:00
parent 345ea114a7
commit 736ef7de44
3 changed files with 242 additions and 216 deletions

View file

@ -103,5 +103,8 @@ module.exports = {
TOTP_WINDOW_SIZE: 6,
// how often to send processing updates for long running commands
LONG_COMMAND_NOTIFY_TTL: 1 * 60 * 1000
LONG_COMMAND_NOTIFY_TTL: 1 * 60 * 1000,
// when paging through a large list, how many entries to request per page
CURSOR_MAX_PAGE_SIZE: 250
};

View file

@ -137,249 +137,271 @@ module.exports = (server, messageHandler, userCache) => (mailbox, options, sessi
return callback(...args);
};
let sort = { uid: 1 };
let cursor = db.database
.collection('messages')
.find(query)
.project(projection)
.sort(sort)
.setReadPreference('secondaryPreferred')
.maxTimeMS(consts.DB_MAX_TIME_MESSAGES);
let lastUid = false;
let limitedKeys = ['_id', 'flags', 'modseq', 'uid'];
if (!Object.keys(projection).some(key => !limitedKeys.includes(key))) {
// limited query, use extra large batch size
cursor = cursor.batchSize(1000);
}
// instead of fetching all messages at once from a large mailbox
// we page it into smaller queries
let processPage = () => {
if (lastUid) {
query.uid = { $gt: lastUid };
}
let startTime = Date.now();
let rowCount = 0;
let totalBytes = 0;
let processNext = () => {
cursor.next((err, messageData) => {
if (err) {
server.logger.error(
{
tnx: 'fetch',
cid: session.id,
err
},
'[%s] FETCHERR error=%s query=%s',
session.id,
err.message,
JSON.stringify(query)
);
return done(err);
}
let sort = { uid: 1 };
let cursor = db.database
.collection('messages')
.find(query)
.project(projection)
.sort(sort)
.limit(consts.CURSOR_MAX_PAGE_SIZE)
.setReadPreference('secondaryPreferred')
.maxTimeMS(consts.DB_MAX_TIME_MESSAGES);
try {
// stop processing if IMAP socket is not open anymore
tools.checkSocket(socket);
} catch (err) {
server.logger.error(
{
tnx: 'fetch',
cid: session.id,
err
},
'[%s] FETCHERR error=%s query=%s',
session.id,
err.message,
JSON.stringify(query)
);
return done(err);
}
let limitedKeys = ['_id', 'flags', 'modseq', 'uid'];
if (!Object.keys(projection).some(key => !limitedKeys.includes(key))) {
// limited query, use extra large batch size
cursor = cursor.batchSize(1000);
}
if (!messageData) {
return cursor.close(() => {
server.logger.debug(
let startTime = Date.now();
let rowCount = 0;
let totalBytes = 0;
let processedCount = 0;
let processNext = () => {
cursor.next((err, messageData) => {
if (err) {
server.logger.error(
{
tnx: 'fetch',
cid: session.id
cid: session.id,
err
},
'[%s] FETCHOK rows=%s user=%s mailbox=%s time=%s',
'[%s] FETCHERR error=%s query=%s',
session.id,
rowCount,
mailboxData.user,
mailboxData._id,
(Date.now() - startTime) / 1000
err.message,
JSON.stringify(query)
);
return done(err);
}
done(null, true, {
rowCount,
totalBytes
try {
// stop processing if IMAP socket is not open anymore
tools.checkSocket(socket);
} catch (err) {
server.logger.error(
{
tnx: 'fetch',
cid: session.id,
err
},
'[%s] FETCHERR error=%s query=%s',
session.id,
err.message,
JSON.stringify(query)
);
return done(err);
}
if (!messageData) {
return cursor.close(() => {
if (processedCount === consts.CURSOR_MAX_PAGE_SIZE) {
// might have more entries, check next page
return setTimeout(processPage, 10);
}
server.logger.debug(
{
tnx: 'fetch',
cid: session.id
},
'[%s] FETCHOK rows=%s user=%s mailbox=%s time=%s',
session.id,
rowCount,
mailboxData.user,
mailboxData._id,
(Date.now() - startTime) / 1000
);
done(null, true, {
rowCount,
totalBytes
});
});
});
}
}
if (queryAll && !session.selected.uidList.includes(messageData.uid)) {
// skip processing messages that we do not know about yet
return processNext();
}
processedCount++;
lastUid = messageData.uid;
let markAsSeen = options.markAsSeen && !messageData.flags.includes('\\Seen');
if (markAsSeen) {
messageData.flags.unshift('\\Seen');
}
if (options.metadataOnly && !markAsSeen) {
// quick response
const data = session.formatResponse('FETCH', messageData.uid, {
query: options.query,
values: session.getQueryResponse(options.query, messageData, {
logger: server.logger,
fetchOptions: {},
database: db.database,
attachmentStorage: messageHandler.attachmentStorage,
acceptUTF8Enabled: session.isUTF8Enabled()
})
});
const compiled = imapHandler.compiler(data);
// `compiled` is a 'binary' string
totalBytes += compiled.length;
session.writeStream.write({ compiled });
rowCount++;
return setImmediate(processNext);
}
let stream = imapHandler.compileStream(
session.formatResponse('FETCH', messageData.uid, {
query: options.query,
values: session.getQueryResponse(options.query, messageData, {
logger: server.logger,
fetchOptions: {},
database: db.database,
attachmentStorage: messageHandler.attachmentStorage,
acceptUTF8Enabled: session.isUTF8Enabled()
})
})
);
rowCount++;
stream.once('error', err => {
err.processed = true;
server.logger.error(
{
err,
tnx: 'fetch',
cid: session.id,
mid: messageData._id
},
'[%s] FETCHFAIL message=%s rows=%s user=%s mailbox=%s time=%s error=%s',
session.id,
messageData._id,
rowCount,
mailboxData.user,
mailboxData._id,
(Date.now() - startTime) / 1000,
err.message
);
session.socket.end('\n* BYE Internal Server Error\n');
return cursor.close(() =>
done(err, false, {
rowCount,
totalBytes
})
);
});
let limiter = new LimitedFetch({
key: 'idw:' + session.user.id,
ttlcounter: messageHandler.counters.ttlcounter,
maxBytes: limit
});
stream.pipe(limiter);
limiter._uid = messageData.uid;
limiter._message = messageData._id;
limiter._mailbox = mailbox;
// send formatted response to socket
session.writeStream.write(limiter, () => {
totalBytes += limiter.bytes;
if (!markAsSeen) {
if (queryAll && !session.selected.uidList.includes(messageData.uid)) {
// skip processing messages that we do not know about yet
return processNext();
}
server.logger.debug(
{
tnx: 'flags',
cid: session.id
},
'[%s] UPDATE FLAGS message=%s',
session.id,
messageData.uid
let markAsSeen = options.markAsSeen && !messageData.flags.includes('\\Seen');
if (markAsSeen) {
messageData.flags.unshift('\\Seen');
}
if (options.metadataOnly && !markAsSeen) {
// quick response
const data = session.formatResponse('FETCH', messageData.uid, {
query: options.query,
values: session.getQueryResponse(options.query, messageData, {
logger: server.logger,
fetchOptions: {},
database: db.database,
attachmentStorage: messageHandler.attachmentStorage,
acceptUTF8Enabled: session.isUTF8Enabled()
})
});
const compiled = imapHandler.compiler(data);
// `compiled` is a 'binary' string
totalBytes += compiled.length;
session.writeStream.write({ compiled });
rowCount++;
return setImmediate(processNext);
}
let stream = imapHandler.compileStream(
session.formatResponse('FETCH', messageData.uid, {
query: options.query,
values: session.getQueryResponse(options.query, messageData, {
logger: server.logger,
fetchOptions: {},
database: db.database,
attachmentStorage: messageHandler.attachmentStorage,
acceptUTF8Enabled: session.isUTF8Enabled()
})
})
);
isUpdated = true;
rowCount++;
updateEntries.push({
updateOne: {
filter: {
_id: messageData._id,
// include sharding key in query
mailbox: mailboxData._id,
uid: messageData.uid
stream.once('error', err => {
err.processed = true;
server.logger.error(
{
err,
tnx: 'fetch',
cid: session.id,
mid: messageData._id
},
update: {
$addToSet: {
flags: '\\Seen'
'[%s] FETCHFAIL message=%s rows=%s user=%s mailbox=%s time=%s error=%s',
session.id,
messageData._id,
rowCount,
mailboxData.user,
mailboxData._id,
(Date.now() - startTime) / 1000,
err.message
);
session.socket.end('\n* BYE Internal Server Error\n');
return cursor.close(() =>
done(err, false, {
rowCount,
totalBytes
})
);
});
let limiter = new LimitedFetch({
key: 'idw:' + session.user.id,
ttlcounter: messageHandler.counters.ttlcounter,
maxBytes: limit
});
stream.pipe(limiter);
limiter._uid = messageData.uid;
limiter._message = messageData._id;
limiter._mailbox = mailbox;
// send formatted response to socket
session.writeStream.write(limiter, () => {
totalBytes += limiter.bytes;
if (!markAsSeen) {
return processNext();
}
server.logger.debug(
{
tnx: 'flags',
cid: session.id
},
'[%s] UPDATE FLAGS message=%s',
session.id,
messageData.uid
);
isUpdated = true;
updateEntries.push({
updateOne: {
filter: {
_id: messageData._id,
// include sharding key in query
mailbox: mailboxData._id,
uid: messageData.uid
},
$set: {
unseen: false
update: {
$addToSet: {
flags: '\\Seen'
},
$set: {
unseen: false
}
}
}
});
notifyEntries.push({
command: 'FETCH',
ignore: session.id,
uid: messageData.uid,
flags: messageData.flags,
message: messageData._id,
unseenChange: true
});
if (updateEntries.length >= consts.BULK_BATCH_SIZE) {
return db.database.collection('messages').bulkWrite(
updateEntries,
{
ordered: false,
w: 1
},
err => {
updateEntries = [];
if (err) {
return cursor.close(() =>
done(err, false, {
rowCount,
totalBytes
})
);
}
server.notifier.addEntries(mailboxData, notifyEntries, () => {
notifyEntries = [];
server.notifier.fire(session.user.id);
processNext();
});
}
);
} else {
processNext();
}
});
notifyEntries.push({
command: 'FETCH',
ignore: session.id,
uid: messageData.uid,
flags: messageData.flags,
message: messageData._id,
unseenChange: true
});
if (updateEntries.length >= consts.BULK_BATCH_SIZE) {
return db.database.collection('messages').bulkWrite(
updateEntries,
{
ordered: false,
w: 1
},
err => {
updateEntries = [];
if (err) {
return cursor.close(() =>
done(err, false, {
rowCount,
totalBytes
})
);
}
server.notifier.addEntries(mailboxData, notifyEntries, () => {
notifyEntries = [];
server.notifier.fire(session.user.id);
processNext();
});
}
);
} else {
processNext();
}
});
});
};
processNext();
};
processNext();
processPage();
});
});
}

View file

@ -28,6 +28,7 @@ function checkRangeQuery(uids, ne) {
for (let i = 1, len = uids.length; i < len; i++) {
if (uids[i] !== uids[i - 1] + 1) {
// TODO: group into AND conditions, otherwise expands too much!
return {
[!ne ? '$in' : '$nin']: uids
};