Optimized partial fetches. Does not work correctly with mime torture message

This commit is contained in:
Andris Reinman 2019-08-08 01:12:00 +03:00
parent 87b27ef0ab
commit f9ee7d6200
9 changed files with 112 additions and 35 deletions

View file

@ -230,7 +230,7 @@ module.exports = {
};
if (item.partial.maxLength && item.partial.maxLength < 1024 * 1024) {
item.partial.maxLength = 1024 * 1024;
//item.partial.maxLength = 1024 * 1024;
}
}
if (!imapTools.fetchSchema.hasOwnProperty(item.item) || !checkSchema(imapTools.fetchSchema[item.item], item)) {

View file

@ -171,7 +171,7 @@ module.exports = (response, isLogging) => {
if (isLogging) {
resp.push(Buffer.from('"(* ' + len + 'B literal *)"'));
} else {
resp.push(Buffer.from('{' + len + '}\r\n'));
resp.push(Buffer.from('{' + Math.max(len, 0) + '}\r\n'));
if (nodeValue && typeof nodeValue.pipe === 'function') {
//value is a stream object

View file

@ -79,7 +79,7 @@ module.exports = function(response, asArray, isLogging) {
if (!node.value) {
resp += '{0}\r\n';
} else {
resp += '{' + node.value.length + '}\r\n';
resp += '{' + Math.max(node.value.length, 0) + '}\r\n';
}
respParts.push(resp);
resp = (node.value || '').toString('binary');

View file

@ -656,17 +656,18 @@ module.exports.getQueryResponse = function(query, message, options) {
mimeTree = indexer.parseMimeTree(message.raw);
}
value = indexer.getContents(mimeTree, false, {
//startFrom: item.partial && item.partial.startFrom,
//maxLength: item.partial && item.partial.maxLength
startFrom: item.partial && item.partial.startFrom,
maxLength: item.partial && item.partial.maxLength
});
} else {
// BODY[SELECTOR]
if (!mimeTree) {
mimeTree = indexer.parseMimeTree(message.raw);
}
console.log(item);
value = indexer.getContents(mimeTree, item, {
//startFrom: item.partial && item.partial.startFrom,
//maxLength: item.partial && item.partial.maxLength
startFrom: item.partial && item.partial.startFrom,
maxLength: item.partial && item.partial.maxLength
});
}

View file

@ -135,18 +135,29 @@ class Indexer {
let curWritePos = 0;
let writeLength = 0;
let canWrite = size => {
if (curWritePos + size <= startFrom) {
let getCurrentBounds = size => {
if (curWritePos + size < startFrom) {
curWritePos += size;
return false;
}
if (maxLength && writeLength >= maxLength) {
writeLength += size;
return false;
}
return true;
let startFromBounds = curWritePos < startFrom ? startFrom - curWritePos : 0;
let maxLengthBounds = maxLength ? maxLength - writeLength : 0;
maxLengthBounds = Math.min(size - startFromBounds, maxLengthBounds);
if (maxLengthBounds < 0) {
maxLengthBounds = 0;
}
return {
startFrom: startFromBounds,
maxLength: maxLengthBounds
};
};
let write = async chunk => {
@ -245,29 +256,50 @@ class Indexer {
} else if (node.attachmentId && !options.skipExternal) {
await emit(false, true); // force newline between header and contents
if (canWrite(node.size)) {
let readBounds = getCurrentBounds(node.size);
if (readBounds) {
let attachmentId = node.attachmentId;
if (mimeTree.attachmentMap && mimeTree.attachmentMap[node.attachmentId]) {
attachmentId = mimeTree.attachmentMap[node.attachmentId];
}
let attachmentData = await this.getAttachment(attachmentId);
let attachmentStream = this.attachmentStorage.createReadStream(attachmentId, attachmentData);
await new Promise((resolve, reject) => {
attachmentStream.once('error', reject);
// move write pointer ahead by skipped base64 bytes
let bytes = Math.min(readBounds.startFrom, node.size);
curWritePos += bytes;
attachmentStream.once('end', () => {
resolve();
// only process attachment if we are reading inside existing bounds
if (attachmentData.length > readBounds.startFrom) {
let attachmentStream = this.attachmentStorage.createReadStream(attachmentId, attachmentData, readBounds);
await new Promise((resolve, reject) => {
attachmentStream.once('error', err => {
reject(err);
});
attachmentStream.once('end', () => {
// update read offset counters
let bytes = 'outputBytes' in attachmentStream ? attachmentStream.outputBytes : readBounds.maxLength;
if (bytes) {
curWritePos += bytes;
if (maxLength) {
writeLength += bytes;
}
}
resolve();
});
attachmentStream.pipe(
output,
{
end: false
}
);
});
attachmentStream.pipe(
output,
{
end: false
}
);
});
}
}
}
@ -296,6 +328,10 @@ class Indexer {
await walk(mimeTree);
if (mimeTree.lineCount > 1) {
await write(NEWLINE);
}
output.end();
};

View file

@ -1398,7 +1398,7 @@ describe('IMAP Protocol integration tests', function() {
});
it('should return partial BODY[]', function(done) {
let cmds = ['T1 LOGIN testuser pass', 'T2 SELECT INBOX', 'T3 FETCH 4 BODY.PEEK[]<0>', 'T4 FETCH 4 BODY.PEEK[]<4.10000>', 'T5 LOGOUT'];
let cmds = ['T1 LOGIN testuser pass', 'T2 SELECT INBOX', 'T3 FETCH 4 BODY.PEEK[]<4.5>', 'T4 FETCH 4 BODY.PEEK[]<4.10000>', 'T5 LOGOUT'];
testClient(
{
@ -1408,8 +1408,7 @@ describe('IMAP Protocol integration tests', function() {
},
function(resp) {
resp = resp.toString();
// we asked for 5 bytes but will get all bytes due to WildDuck returning minimum chunks of 1MB
//expect(resp.indexOf('\n* 4 FETCH (BODY[]<4> {93}\r\n: sen)\r\n') >= 0).to.be.true;
expect(resp.indexOf('\n* 4 FETCH (BODY[]<4> {5}\r\n: sen)\r\n') >= 0).to.be.true;
expect(
resp.indexOf(
'\n* 4 FETCH (BODY[]<4> {93}\r\n: sender@example.com\r\nto: to@example.com\r\ncc: cc@example.com\r\nsubject: test\r\n\r\nHello World!\r\n)\r\n'
@ -1433,7 +1432,6 @@ describe('IMAP Protocol integration tests', function() {
},
function(resp) {
resp = resp.toString();
expect(
resp.indexOf(
'\r\n* 3 FETCH (BODY[1] {107}\r\nMIME-Version: 1.0\r\nFrom: andris@kreata.ee\r\nTo: andris@pangalink.net\r\nIn-Reply-To: <test1>\r\n\r\nHello world 1!)\r\n'

View file

@ -52,7 +52,7 @@ const base64Offset = (lineLength, base64StartOffset, base64MaxLength) => {
let binaryEndOffset = 0;
if (base64MaxLength) {
let binaryMaxLength = Math.ceil(base64MaxLength / 4) * 3;
binaryEndOffset = binaryStartOffset + binaryMaxLength;
binaryEndOffset = binaryStartOffset + binaryMaxLength + 2;
}
return {

View file

@ -6,6 +6,7 @@ const RedFour = require('ioredfour');
const errors = require('../errors');
const log = require('npmlog');
const crypto = require('crypto');
const base64Offset = require('./base64-offset');
// Set to false to disable base64 decoding feature
const FEATURE_DECODE_ATTACHMENTS = true;
@ -289,12 +290,53 @@ class GridstoreStorage {
tryStore();
}
createReadStream(id, attachmentData) {
let stream = this.gridstore.openDownloadStream(id);
createReadStream(id, attachmentData, options) {
let encoderOptions = {};
let streamOptions = {};
if (attachmentData && attachmentData.metadata) {
encoderOptions.lineLength = attachmentData.metadata.lineLen;
if (options && attachmentData.metadata.decoded) {
let offsetOptions = base64Offset(attachmentData.metadata.lineLen, options.startFrom, options.maxLength);
encoderOptions.skipStartBytes = offsetOptions.base64SkipStartBytes;
encoderOptions.limitOutbutBytes = offsetOptions.base64LimitBytes;
encoderOptions.startPadding = offsetOptions.base64Padding;
streamOptions.start = offsetOptions.binaryStartOffset;
if (offsetOptions.binaryEndOffset) {
streamOptions.end = offsetOptions.binaryEndOffset;
}
} else if (options && !attachmentData.metadata.decoded) {
streamOptions.start = options.startFrom;
streamOptions.end = options.startFrom + options.maxLength;
}
if (streamOptions.start && streamOptions.start > attachmentData.length) {
streamOptions.start = attachmentData.length;
}
if (streamOptions.end && streamOptions.end > attachmentData.length) {
streamOptions.end = attachmentData.length;
}
}
/*
log.silly(
'GridStore',
'STREAM id=%s src_len=%s src_start=%s src_end=%s dst_start=%s dst_end=%s',
id.toString('hex'),
attachmentData && attachmentData.length,
streamOptions.start,
streamOptions.end,
options.startFrom,
options.startFrom + options.maxLength
);
*/
let stream = this.gridstore.openDownloadStream(id, streamOptions);
if (attachmentData && attachmentData.metadata.decoded) {
let encoder = new libbase64.Encoder({
lineLength: attachmentData.metadata.lineLen
});
let encoder = new libbase64.Encoder(encoderOptions);
stream.once('error', err => {
// pass error forward

View file

@ -50,7 +50,7 @@
"joi": "14.3.1",
"js-yaml": "3.13.1",
"key-fingerprint": "1.1.0",
"libbase64": "1.2.0",
"libbase64": "1.2.1",
"libmime": "4.1.3",
"libqp": "1.1.0",
"mailsplit": "4.4.1",