deduplicate attachments

This commit is contained in:
Andris Reinman 2017-05-15 16:09:08 +03:00
parent 9e488397cc
commit fddccd79eb
10 changed files with 353 additions and 153 deletions

View file

@ -88,6 +88,7 @@ Yes, it does. You can run the server and get working IMAP and POP3 servers for m
4. Focus on internationalization, ie. supporting email addresses with non-ascii characters
5. `+`-labels: _андрис+ööö@уайлддак.орг_ is delivered to _андрис@уайлддак.орг_
6. Access messages both using IMAP and HTTP API. The latter serves parsed data, so no need to fetch RFC822 messages and parse out html, plaintext content or attachments. It is super easy to create a webmail interface on top of this.
7. De-duplication of attachments. If the same attachment is referenced by different messages then only a single copy of the attachment is stored. Attachment is stored in the encoded form (eg. encoded in base64) to not break any signatures so the resulting encoding must match as well.
### Isn't it bad to use a database as a mail store?
@ -513,7 +514,7 @@ Response message includes the following fields
- **attachments** is an array of attachment objects. Attachments can be shared between messages.
- **id** is the id of the attachment
- **id** is the id of the attachment in the form of "ATT00001"
- **fileName** is the name of the attachment. Autogenerated from Content-Type if not set in source
- **contentType** is the MIME type of the message
- **disposition** defines Content-Disposition and is either 'inline', 'attachment' or _false_
@ -563,7 +564,7 @@ The response for successful operation should look like this:
"html": ["<p>Hello world!</p>"],
"attachments": [
{
"id": "58e2254289cccb742fd6c015",
"id": "ATT00001",
"fileName": "image.png",
"contentType": "image/png",
"disposition": "attachment",
@ -588,7 +589,7 @@ Parameters
**Example**
```
curl "http://localhost:8080/message/58d8299c5195c38e77c2daa5/attachment/58e2254289cccb742fd6c015"
curl "http://localhost:8080/message/58d8299c5195c38e77c2daa5/attachment/ATT00001"
```
### GET /message/:id/raw

88
api.js
View file

@ -970,6 +970,7 @@ server.get('/message/:id/raw', (req, res, next) => {
db.database.collection('messages').findOne(query, {
mimeTree: true,
map: true,
size: true
}, (err, message) => {
if (err) {
@ -1008,7 +1009,7 @@ server.get('/message/:message/attachment/:attachment', (req, res, next) => {
const schema = Joi.object().keys({
message: Joi.string().hex().lowercase().length(24).required(),
attachment: Joi.string().hex().lowercase().length(24).required()
attachment: Joi.string().regex(/^ATT\d+$/i).uppercase().required()
});
const result = Joi.validate({
@ -1027,47 +1028,80 @@ server.get('/message/:message/attachment/:attachment', (req, res, next) => {
return next();
}
let message = result.value.message;
let attachment = result.value.attachment;
let messageId = result.value.message;
let attachmentMid = result.value.attachment;
let query = {
_id: new ObjectID(attachment),
'metadata.messages': new ObjectID(message)
};
db.database.collection('attachments.files').findOne(query, (err, messageData) => {
db.database.collection('messages').findOne({
_id: new ObjectID(messageId)
}, {
fields: {
map: true
}
}, (err, message) => {
if (err) {
res.json({
error: 'MongoDB Error: ' + err.message,
attachment,
message
attachment: attachmentMid,
message: messageId
});
return next();
}
if (!messageData) {
if (!message) {
res.json({
error: 'This message does not exist',
attachment,
message
attachment: attachmentMid,
message: messageId
});
return next();
}
res.writeHead(200, {
'Content-Type': messageData.contentType || 'application/octet-stream'
});
let attachmentId = message.map && message.map[attachmentMid];
let attachmentStream = messageHandler.indexer.gridstore.createReadStream(messageData._id);
attachmentStream.once('error', err => res.emit('error', err));
if (messageData.metadata.transferEncoding === 'base64') {
attachmentStream.pipe(new libbase64.Decoder()).pipe(res);
} else if (messageData.metadata.transferEncoding === 'quoted-printable') {
attachmentStream.pipe(new libqp.Decoder()).pipe(res);
} else {
attachmentStream.pipe(res);
if (!attachmentId) {
res.json({
error: 'This attachment does not exist',
attachment: attachmentMid,
message: messageId
});
return next();
}
db.database.collection('attachments.files').findOne({
_id: new ObjectID(attachmentId)
}, (err, messageData) => {
if (err) {
res.json({
error: 'MongoDB Error: ' + err.message,
attachment: attachmentMid,
message: messageId
});
return next();
}
if (!messageData) {
res.json({
error: 'This message does not exist',
attachment: attachmentMid,
message: messageId
});
return next();
}
res.writeHead(200, {
'Content-Type': messageData.contentType || 'application/octet-stream'
});
let attachmentStream = messageHandler.indexer.gridstore.openDownloadStream(messageData._id);
attachmentStream.once('error', err => res.emit('error', err));
if (messageData.metadata.transferEncoding === 'base64') {
attachmentStream.pipe(new libbase64.Decoder()).pipe(res);
} else if (messageData.metadata.transferEncoding === 'quoted-printable') {
attachmentStream.pipe(new libqp.Decoder()).pipe(res);
} else {
attachmentStream.pipe(res);
}
});
});
});

View file

@ -7,7 +7,7 @@ const BodyStructure = require('./body-structure');
const createEnvelope = require('./create-envelope');
const parseMimeTree = require('./parse-mime-tree');
const ObjectID = require('mongodb').ObjectID;
const GridFs = require('grid-fs');
const GridFSBucket = require('mongodb').GridFSBucket;
const libmime = require('libmime');
const libqp = require('libqp');
const libbase64 = require('libbase64');
@ -24,7 +24,9 @@ class Indexer {
this.database = this.options.database;
if (this.database) {
this.gridstore = new GridFs(this.database, 'attachments');
this.gridstore = new GridFSBucket(this.database, {
bucketName: 'attachments'
});
}
// create logger
@ -187,7 +189,7 @@ class Indexer {
} else if (node.attachmentId && !skipExternal) {
append(false, true); // force newline between header and contents
let attachmentStream = this.gridstore.createReadStream(node.attachmentId);
let attachmentStream = this.gridstore.openDownloadStream(node.attachmentId);
attachmentStream.once('error', err => {
res.emit('error', err);
@ -255,14 +257,21 @@ class Indexer {
/**
* Decode text/plain and text/html parts, separate node bodies from the tree
*/
processContent(messageId, mimeTree) {
let response = {
getMaildata(messageId, mimeTree) {
let magic = parseInt(crypto.randomBytes(2).toString('hex'), 16);
let map = {};
let maildata = {
nodes: [],
attachments: [],
text: '',
html: []
html: [],
// magic number to append to increment stored attachment object counter
magic,
// match ids referenced in document to actual attachment ids
map
};
let idcount = 0;
let htmlContent = [];
let textContent = [];
let cidMap = new Map();
@ -335,7 +344,8 @@ class Indexer {
// remove attachments and very large text nodes from the mime tree
if (!isMultipart && node.body && node.body.length && (!isInlineText || node.size > 300 * 1024)) {
let attachmentId = new ObjectID();
let attachmentId = 'ATT' + leftPad(++idcount, '0', 5);
map[attachmentId] = new ObjectID();
let fileName = (node.parsedHeader['content-disposition'] && node.parsedHeader['content-disposition'].params && node.parsedHeader['content-disposition'].params.filename) || (node.parsedHeader['content-type'] && node.parsedHeader['content-type'].params && node.parsedHeader['content-type'].params.name) || false;
let contentId = (node.parsedHeader['content-id'] || '').toString().replace(/<|>/g, '').trim();
@ -356,18 +366,19 @@ class Indexer {
});
// push to queue
response.nodes.push({
maildata.nodes.push({
attachmentId,
options: {
fsync: true,
content_type: contentType,
contentType,
// metadata should include only minimally required information, this would allow
// to share attachments between different messages if the content is exactly the same
// even though metadata (filename, content-disposition etc) might not
metadata: {
// if we copy the same message to other mailboxes then instead
// of copying attachments we add a pointer to the new message here
messages: [messageId],
// values to detect if there are messages that reference to this attachment or not
m: maildata.magic,
c: 1,
// how to decode contents if a webclient or API asks for the attachment
transferEncoding
}
@ -378,7 +389,7 @@ class Indexer {
// do not include text content, multipart elements and embedded messages in the attachment list
if (!isInlineText && !(contentType === 'message/rfc822' && (!disposition || disposition === 'inline'))) {
// list in the attachments array
response.attachments.push({
maildata.attachments.push({
id: attachmentId,
fileName,
contentType,
@ -416,43 +427,93 @@ class Indexer {
return match;
});
response.html = htmlContent.filter(str => str.trim()).map(updateCidLinks);
response.text = textContent.filter(str => str.trim()).map(updateCidLinks).join('\n').trim();
maildata.html = htmlContent.filter(str => str.trim()).map(updateCidLinks);
maildata.text = textContent.filter(str => str.trim()).map(updateCidLinks).join('\n').trim();
return response;
return maildata;
}
/**
* Stores attachments to GridStore
*/
storeNodeBodies(messageId, nodes, callback) {
storeNodeBodies(messageId, maildata, mimeTree, callback) {
let pos = 0;
let nodes = maildata.nodes;
let storeNode = () => {
if (pos >= nodes.length) {
return callback(null, true);
// replace attachment IDs with ObjectIDs in the mimeTree
let walk = (node, next) => {
if (node.attachmentId && maildata.map[node.attachmentId]) {
node.attachmentId = maildata.map[node.attachmentId];
}
if (Array.isArray(node.childNodes)) {
let pos = 0;
let processChildNodes = () => {
if (pos >= node.childNodes.length) {
return next();
}
let childNode = node.childNodes[pos++];
walk(childNode, () => processChildNodes());
};
processChildNodes();
} else {
next();
}
};
return walk(mimeTree, () => callback(null, true));
}
let nodeData = nodes[pos++];
let returned = false;
let store = this.gridstore.createWriteStream(nodeData.attachmentId, nodeData.options);
let node = nodes[pos++];
store.once('error', err => {
if (returned) {
return;
let hash = crypto.createHash('sha256').update(node.body).digest('hex');
this.database.collection('attachments.files').findOneAndUpdate({
'metadata.h': hash
}, {
$inc: {
'metadata.c': 1,
'metadata.m': maildata.magic
}
returned = true;
callback(err);
});
store.on('close', () => {
if (returned) {
return;
}, {
returnOriginal: false
}, (err, result) => {
if (err) {
return callback(err);
}
returned = true;
return storeNode();
});
store.end(nodeData.body);
if (result && result.value) {
maildata.map[node.attachmentId] = result.value._id;
return storeNode();
}
let returned = false;
node.options.metadata.h = hash;
let store = this.gridstore.openUploadStreamWithId(maildata.map[node.attachmentId], null, node.options);
store.once('error', err => {
if (returned) {
return;
}
returned = true;
callback(err);
});
store.once('finish', () => {
if (returned) {
return;
}
returned = true;
return storeNode();
});
store.end(node.body);
});
};
storeNode();
@ -701,4 +762,8 @@ function textToHtml(str) {
return text;
}
function leftPad(val, chr, len) {
return chr.repeat(len - val.toString().length) + val;
}
module.exports = Indexer;

View file

@ -593,7 +593,7 @@ describe('Search term match tests', function () {
describe('INTERNALDATE', function () {
it('should match <', function (done) {
matchSearchQuery({
internaldate: new Date('1999-01-01')
idate: new Date('1999-01-01')
}, {
key: 'internaldate',
value: '2001-01-01',
@ -607,7 +607,7 @@ describe('Search term match tests', function () {
it('should not match <', function (done) {
matchSearchQuery({
internaldate: new Date('1999-01-01')
idate: new Date('1999-01-01')
}, {
key: 'internaldate',
value: '1998-01-01',
@ -621,7 +621,7 @@ describe('Search term match tests', function () {
it('should match =', function (done) {
matchSearchQuery({
internaldate: new Date('1999-01-01')
idate: new Date('1999-01-01')
}, {
key: 'internaldate',
value: '1999-01-01',
@ -635,7 +635,7 @@ describe('Search term match tests', function () {
it('should not match <', function (done) {
matchSearchQuery({
internaldate: new Date('1999-01-01')
idate: new Date('1999-01-01')
}, {
key: 'internaldate',
value: '1999-01-02',
@ -649,7 +649,7 @@ describe('Search term match tests', function () {
it('should match >=', function (done) {
matchSearchQuery({
internaldate: new Date('1999-01-01')
idate: new Date('1999-01-01')
}, {
key: 'internaldate',
value: '1999-01-01',
@ -674,7 +674,7 @@ describe('Search term match tests', function () {
it('should not match >=', function (done) {
matchSearchQuery({
internaldate: new Date('1999-01-01')
idate: new Date('1999-01-01')
}, {
key: 'internaldate',
value: '1999-01-02',

126
imap.js
View file

@ -773,7 +773,9 @@ server.onExpunge = function (path, update, session, callback) {
}).project({
_id: true,
uid: true,
size: true
size: true,
map: true,
magic: true
}).sort([
['uid', 1]
]);
@ -804,19 +806,7 @@ server.onExpunge = function (path, update, session, callback) {
return cursor.close(() => {
updateQuota(() => {
this.notifier.fire(session.user.id, path);
// delete all attachments that do not have any active links to message objects
db.database.collection('attachments.files').deleteMany({
'metadata.messages': {
$size: 0
}
}, err => {
if (err) {
// ignore as we don't really care if we have orphans or not
}
return callback(null, true);
});
return callback(null, true);
});
});
}
@ -835,12 +825,27 @@ server.onExpunge = function (path, update, session, callback) {
deletedMessages++;
deletedStorage += Number(message.size) || 0;
// remove link to message from attachments (if any exist)
let attachments = Object.keys(message.map || {}).map(key => message.map[key]);
if (!attachments.length) {
// not stored attachments
return this.notifier.addEntries(session.user.id, path, {
command: 'EXPUNGE',
ignore: session.id,
uid: message.uid,
message: message._id
}, processNext);
}
// remove references to attachments (if any exist)
db.database.collection('attachments.files').updateMany({
'metadata.messages': message._id
_id: {
$in: attachments
}
}, {
$pull: {
'metadata.messages': message._id
$inc: {
'metadata.c': -1,
'metadata.m': -message.magic
}
}, {
multi: true,
@ -934,10 +939,7 @@ server.onCopy = function (path, update, session, callback) {
});
}
let sourceId = message._id;
// Copying is not done in bulk to minimize risk of going out of sync with incremental UIDs
sourceUid.unshift(message.uid);
db.database.collection('mailboxes').findOneAndUpdate({
_id: target._id
@ -980,12 +982,24 @@ server.onCopy = function (path, update, session, callback) {
copiedMessages++;
copiedStorage += Number(message.size) || 0;
// remove link to message from attachments (if any exist)
let attachments = Object.keys(message.map || {}).map(key => message.map[key]);
if (!attachments.length) {
return this.notifier.addEntries(session.user.id, target.path, {
command: 'EXISTS',
uid: message.uid,
message: message._id
}, processNext);
}
// update attachments
db.database.collection('attachments.files').updateMany({
'metadata.messages': sourceId
_id: {
$in: attachments
}
}, {
$push: {
'metadata.messages': message._id
$inc: {
'metadata.c': 1,
'metadata.m': message.magic
}
}, {
multi: true,
@ -1610,6 +1624,59 @@ server.onGetQuota = function (quotaRoot, session, callback) {
});
};
function deleteOrphanedAttachments(callback) {
let cursor = db.database.collection('attachments.files').find({
'metadata.c': 0,
'metadata.m': 0
});
let deleted = 0;
let processNext = () => {
cursor.next((err, attachment) => {
if (err) {
return callback(err);
}
if (!attachment) {
return cursor.close(() => {
// delete all attachments that do not have any active links to message objects
callback(null, deleted);
});
}
if (!attachment || (attachment.metadata && attachment.metadata.c)) {
// skip
return processNext();
}
// delete file entry first
db.database.collection('attachments.files').deleteOne({
_id: attachment._id,
// make sure that we do not delete a message that is already re-used
'metadata.c': 0,
'metadata.m': 0
}, (err, result) => {
if (err || !result.deletedCount) {
return processNext();
}
// delete data chunks
db.database.collection('attachments.chunks').deleteMany({
files_id: attachment._id
}, err => {
if (err) {
// ignore as we don't really care if we have orphans or not
}
deleted++;
processNext();
});
});
});
};
processNext();
}
function clearExpiredMessages() {
clearTimeout(gcTimeout);
@ -1669,14 +1736,7 @@ function clearExpiredMessages() {
if (!message) {
return cursor.close(() => {
// delete all attachments that do not have any active links to message objects
db.database.collection('attachments.files').deleteMany({
'metadata.messages': {
$size: 0
}
}, err => {
if (err) {
// ignore as we don't really care if we have orphans or not
}
deleteOrphanedAttachments(() => {
server.logger.debug({
tnx: 'gc'
}, 'Deleted %s messages', deleted);

View file

@ -161,11 +161,17 @@
}
}]
}, {
"collection": "attachment.files",
"collection": "attachments.files",
"indexes": [{
"name": "related_messages",
"name": "attachment_hash",
"key": {
"metadata.messages": 1
"metadata.h": 1
}
}, {
"name": "related_attachments",
"key": {
"metadata.c": 1,
"metadata.m": 1
}
}]
}, {

View file

@ -6,7 +6,7 @@ const SeqIndex = require('seq-index');
const DkimStream = require('./dkim-stream');
const MessageSplitter = require('./message-splitter');
const seqIndex = new SeqIndex();
const GridFs = require('grid-fs');
const GridFSBucket = require('mongodb').GridFSBucket;
const uuid = require('uuid');
const os = require('os');
const hostname = os.hostname();
@ -249,12 +249,14 @@ module.exports = (options, callback) => {
};
function store(id, stream, callback) {
gridstore = gridstore || new GridFs(db.senderDb, config.sender.gfs);
gridstore = gridstore || new GridFSBucket(db.senderDb, {
bucketName: 'config.sender.gf'
});
let returned = false;
let store = gridstore.createWriteStream('message ' + id, {
let store = gridstore.openUploadStream('message ' + id, {
fsync: true,
content_type: 'message/rfc822',
contentType: 'message/rfc822',
metadata: {
created: new Date()
}
@ -266,7 +268,7 @@ function store(id, stream, callback) {
}
returned = true;
store.once('close', () => {
store.once('finish', () => {
removeMessage(id, () => callback(err));
});
@ -281,7 +283,7 @@ function store(id, stream, callback) {
callback(err);
});
store.on('close', () => {
store.once('finish', () => {
if (returned) {
return;
}

View file

@ -81,7 +81,7 @@ class MessageHandler {
let headers = prepared.headers;
let flags = Array.isArray(options.flags) ? options.flags : [].concat(options.flags || []);
let maildata = options.maildata || this.indexer.processContent(id, mimeTree);
let maildata = options.maildata || this.indexer.getMaildata(id, mimeTree);
this.getMailbox(options, (err, mailbox) => {
if (err) {
@ -210,9 +210,35 @@ class MessageHandler {
};
checkExisting(() => {
this.indexer.storeNodeBodies(id, maildata.nodes, err => {
let cleanup = (...args) => {
if (!args[0]) {
return callback(...args);
}
let attachments = Object.keys(maildata.map || {}).map(key => maildata.map[key]);
if (!attachments.length) {
return callback(...args);
}
// error occured, remove attachments
this.database.collection('attachments.files').deleteMany({
_id: {
$in: attachments
}
}, err => {
if (err) {
// ignore as we don't really care if we have orphans or not
}
return callback(null, true);
});
};
this.indexer.storeNodeBodies(id, maildata, mimeTree, err => {
if (err) {
return callback(err);
return cleanup(err);
}
// prepare message object
@ -244,7 +270,10 @@ class MessageHandler {
seen: flags.includes('\\Seen'),
flagged: flags.includes('\\Flagged'),
deleted: flags.includes('\\Deleted'),
draft: flags.includes('\\Draft')
draft: flags.includes('\\Draft'),
magic: maildata.magic,
map: maildata.map
};
if (maildata.attachments && maildata.attachments.length) {
@ -286,12 +315,12 @@ class MessageHandler {
// Another server might be waiting for the lock
this.redlock.waitAcquireLock(mailbox._id.toString(), 30 * 1000, 10 * 1000, (err, lock) => {
if (err) {
return callback(err);
return cleanup(err);
}
if (!lock || !lock.success) {
// did not get a insert lock in 10 seconds
return callback(new Error('The user you are trying to contact is receiving mail at a rate that prevents additional messages from being delivered. Please resend your message at a later time'));
return cleanup(new Error('The user you are trying to contact is receiving mail at a rate that prevents additional messages from being delivered. Please resend your message at a later time'));
}
this.database.collection('users').findOneAndUpdate({
@ -303,7 +332,7 @@ class MessageHandler {
}, err => {
if (err) {
this.redlock.releaseLock(lock, () => false);
return callback(err);
return cleanup(err);
}
let rollback = err => {
@ -314,7 +343,7 @@ class MessageHandler {
storageUsed: -size
}
}, () => {
this.redlock.releaseLock(lock, () => callback(err));
this.redlock.releaseLock(lock, () => cleanup(err));
});
};
@ -370,7 +399,7 @@ class MessageHandler {
this.redlock.releaseLock(lock, () => {
this.notifier.fire(mailbox.user, mailbox.path);
return callback(null, true, {
return cleanup(null, true, {
uidValidity,
uid,
id: message._id
@ -441,21 +470,36 @@ class MessageHandler {
this.updateQuota(mailbox, {
storageUsed: -message.size
}, () => {
// remove link to message from attachments (if any exist)
this.database.collection('attachments.files').updateMany({
'metadata.messages': message._id
}, {
$pull: {
'metadata.messages': message._id
}
}, {
multi: true,
w: 1
}, err => {
if (err) {
// ignore as we don't really care if we have orphans or not
let updateAttachments = next => {
let attachments = Object.keys(message.map || {}).map(key => message.map[key]);
if (!attachments.length) {
return next();
}
// remove link to message from attachments (if any exist)
this.database.collection('attachments.files').updateMany({
_id: {
$in: attachments
}
}, {
$inc: {
'metadata.c': -1,
'metadata.m': -message.magic
}
}, {
multi: true,
w: 1
}, err => {
if (err) {
// ignore as we don't really care if we have orphans or not
}
next();
});
};
updateAttachments(() => {
if (options.session && options.session.selected && options.session.selected.mailbox === mailbox.path) {
options.session.writeStream.write(options.session.formatResponse('EXPUNGE', message.uid));
}
@ -472,18 +516,7 @@ class MessageHandler {
return callback(null, true);
}
// delete all attachments that do not have any active links to message objects
this.database.collection('attachments.files').deleteMany({
'metadata.messages': {
$size: 0
}
}, err => {
if (err) {
// ignore as we don't really care if we have orphans or not
}
return callback(null, true);
});
return callback(null, true);
});
});
});

View file

@ -155,7 +155,7 @@ const serverOptions = {
let prepared = messageHandler.prepareMessage({
raw
});
let maildata = messageHandler.indexer.processContent(prepared.id, prepared.mimeTree);
let maildata = messageHandler.indexer.getMaildata(prepared.id, prepared.mimeTree);
// default flags are empty
let flags = [];

View file

@ -16,17 +16,16 @@
"grunt-cli": "^1.2.0",
"grunt-eslint": "^19.0.0",
"grunt-mocha-test": "^0.13.2",
"mocha": "^3.3.0"
"mocha": "^3.4.1"
},
"dependencies": {
"addressparser": "^1.0.1",
"bcryptjs": "^2.4.3",
"config": "^1.25.1",
"config": "^1.26.1",
"generate-password": "^1.3.0",
"grid-fs": "^1.0.1",
"html-to-text": "^3.2.0",
"iconv-lite": "^0.4.17",
"joi": "^10.4.1",
"joi": "^10.4.2",
"libbase64": "^0.1.0",
"libmime": "^3.1.0",
"libqp": "^1.1.0",
@ -34,7 +33,7 @@
"mongodb": "^2.2.26",
"node-redis-scripty": "0.0.5",
"nodemailer": "^4.0.1",
"npmlog": "^4.0.2",
"npmlog": "^4.1.0",
"qrcode": "^0.8.1",
"redfour": "^1.0.0",
"redis": "^2.7.1",