Merge pull request #18 from nodemailer/attachment_storage

Attachment storage
This commit is contained in:
Andris Reinman 2017-08-07 13:12:28 +03:00 committed by GitHub
commit 1def9f1e5e
16 changed files with 446 additions and 307 deletions

View file

@ -193,7 +193,7 @@ Shard the following collections by these keys:
sh.enableSharding('wildduck'); sh.enableSharding('wildduck');
sh.shardCollection('wildduck.messages', { mailbox: 1, uid: 1 }); sh.shardCollection('wildduck.messages', { mailbox: 1, uid: 1 });
sh.shardCollection('wildduck.threads', { user: 'hashed' }); sh.shardCollection('wildduck.threads', { user: 'hashed' });
sh.shardCollection('wildduck.attachments.files', { 'metadata.h': 'hashed' }); sh.shardCollection('wildduck.attachments.files', { _id: 'hashed' });
sh.shardCollection('wildduck.attachments.chunks', { files_id: 'hashed' }); sh.shardCollection('wildduck.attachments.chunks', { files_id: 'hashed' });
``` ```

6
api.js
View file

@ -100,18 +100,22 @@ module.exports = done => {
database: db.database, database: db.database,
redis: db.redis redis: db.redis
}); });
messageHandler = new MessageHandler({ messageHandler = new MessageHandler({
database: db.database, database: db.database,
users: db.users, users: db.users,
redis: db.redis,
gridfs: db.gridfs, gridfs: db.gridfs,
redis: db.redis attachments: config.attachments
}); });
userHandler = new UserHandler({ userHandler = new UserHandler({
database: db.database, database: db.database,
users: db.users, users: db.users,
redis: db.redis, redis: db.redis,
messageHandler messageHandler
}); });
mailboxHandler = new MailboxHandler({ mailboxHandler = new MailboxHandler({
database: db.database, database: db.database,
users: db.users, users: db.users,

View file

@ -52,6 +52,10 @@ maxForwards=2000
# used to push outbound emails to the sending queue # used to push outbound emails to the sending queue
#sender="zone-mta" #sender="zone-mta"
[attachments]
type="gridstore"
bucket="attachments"
[log] [log]
level="silly" level="silly"
# log to syslog if true # log to syslog if true

View file

@ -6,8 +6,6 @@ const PassThrough = stream.PassThrough;
const BodyStructure = require('./body-structure'); const BodyStructure = require('./body-structure');
const createEnvelope = require('./create-envelope'); const createEnvelope = require('./create-envelope');
const parseMimeTree = require('./parse-mime-tree'); const parseMimeTree = require('./parse-mime-tree');
const ObjectID = require('mongodb').ObjectID;
const GridFSBucket = require('mongodb').GridFSBucket;
const libmime = require('libmime'); const libmime = require('libmime');
const libqp = require('libqp'); const libqp = require('libqp');
const libbase64 = require('libbase64'); const libbase64 = require('libbase64');
@ -16,25 +14,12 @@ const he = require('he');
const htmlToText = require('html-to-text'); const htmlToText = require('html-to-text');
const crypto = require('crypto'); const crypto = require('crypto');
let cryptoAsync;
try {
cryptoAsync = require('@ronomon/crypto-async'); // eslint-disable-line global-require
} catch (E) {
// ignore
}
class Indexer { class Indexer {
constructor(options) { constructor(options) {
this.options = options || {}; this.options = options || {};
this.fetchOptions = this.options.fetchOptions || {}; this.fetchOptions = this.options.fetchOptions || {};
this.database = this.options.database; this.attachmentStorage = this.options.attachmentStorage;
this.gridfs = this.options.gridfs || this.options.database;
if (this.gridfs) {
this.gridstore = new GridFSBucket(this.gridfs, {
bucketName: 'attachments'
});
}
// create logger // create logger
this.logger = this.options.logger || { this.logger = this.options.logger || {
@ -195,7 +180,11 @@ class Indexer {
} else if (node.attachmentId && !skipExternal) { } else if (node.attachmentId && !skipExternal) {
append(false, true); // force newline between header and contents append(false, true); // force newline between header and contents
let attachmentStream = this.gridstore.openDownloadStream(node.attachmentId); let attachmentId = node.attachmentId;
if (mimeTree.attachmentMap && mimeTree.attachmentMap[node.attachmentId]) {
attachmentId = mimeTree.attachmentMap[node.attachmentId];
}
let attachmentStream = this.attachmentStorage.createReadStream(attachmentId);
attachmentStream.once('error', err => { attachmentStream.once('error', err => {
res.emit('error', err); res.emit('error', err);
@ -267,16 +256,13 @@ class Indexer {
*/ */
getMaildata(messageId, mimeTree) { getMaildata(messageId, mimeTree) {
let magic = parseInt(crypto.randomBytes(2).toString('hex'), 16); let magic = parseInt(crypto.randomBytes(2).toString('hex'), 16);
let map = {};
let maildata = { let maildata = {
nodes: [], nodes: [],
attachments: [], attachments: [],
text: '', text: '',
html: [], html: [],
// magic number to append to increment stored attachment object counter // magic number to append to increment stored attachment object counter
magic, magic
// match ids referenced in document to actual attachment ids
map
}; };
let idcount = 0; let idcount = 0;
@ -363,7 +349,6 @@ class Indexer {
// remove attachments and very large text nodes from the mime tree // remove attachments and very large text nodes from the mime tree
if (!isMultipart && node.body && node.body.length && (!isInlineText || node.size > 300 * 1024)) { if (!isMultipart && node.body && node.body.length && (!isInlineText || node.size > 300 * 1024)) {
let attachmentId = 'ATT' + leftPad(++idcount, '0', 5); let attachmentId = 'ATT' + leftPad(++idcount, '0', 5);
map[attachmentId] = new ObjectID();
let fileName = let fileName =
(node.parsedHeader['content-disposition'] && (node.parsedHeader['content-disposition'] &&
@ -371,6 +356,7 @@ class Indexer {
node.parsedHeader['content-disposition'].params.filename) || node.parsedHeader['content-disposition'].params.filename) ||
(node.parsedHeader['content-type'] && node.parsedHeader['content-type'].params && node.parsedHeader['content-type'].params.name) || (node.parsedHeader['content-type'] && node.parsedHeader['content-type'].params && node.parsedHeader['content-type'].params.name) ||
false; false;
let contentId = (node.parsedHeader['content-id'] || '').toString().replace(/<|>/g, '').trim(); let contentId = (node.parsedHeader['content-id'] || '').toString().replace(/<|>/g, '').trim();
if (fileName) { if (fileName) {
@ -391,21 +377,9 @@ class Indexer {
// push to queue // push to queue
maildata.nodes.push({ maildata.nodes.push({
attachmentId, attachmentId,
options: { magic: maildata.magic,
fsync: true,
contentType, contentType,
// metadata should include only minimally required information, this would allow transferEncoding,
// to share attachments between different messages if the content is exactly the same
// even though metadata (filename, content-disposition etc) might not
metadata: {
// values to detect if there are messages that reference to this attachment or not
m: maildata.magic,
c: 1,
// how to decode contents if a webclient or API asks for the attachment
transferEncoding
}
},
body: node.body body: node.body
}); });
@ -463,82 +437,19 @@ class Indexer {
storeNodeBodies(messageId, maildata, mimeTree, callback) { storeNodeBodies(messageId, maildata, mimeTree, callback) {
let pos = 0; let pos = 0;
let nodes = maildata.nodes; let nodes = maildata.nodes;
mimeTree.attachmentMap = {};
let storeNode = () => { let storeNode = () => {
if (pos >= nodes.length) { if (pos >= nodes.length) {
// replace attachment IDs with ObjectIDs in the mimeTree return callback(null, true);
let walk = (node, next) => {
if (node.attachmentId && maildata.map[node.attachmentId]) {
node.attachmentId = maildata.map[node.attachmentId];
}
if (Array.isArray(node.childNodes)) {
let pos = 0;
let processChildNodes = () => {
if (pos >= node.childNodes.length) {
return next();
}
let childNode = node.childNodes[pos++];
walk(childNode, () => processChildNodes());
};
processChildNodes();
} else {
next();
}
};
return walk(mimeTree, () => callback(null, true));
} }
let node = nodes[pos++]; let node = nodes[pos++];
this.attachmentStorage.create(node, (err, id) => {
calculateHash(node.body, (err, hash) => {
if (err) { if (err) {
return callback(err); return callback(err);
} }
mimeTree.attachmentMap[node.attachmentId] = id;
this.gridfs.collection('attachments.files').findOneAndUpdate({
'metadata.h': hash
}, {
$inc: {
'metadata.c': 1,
'metadata.m': maildata.magic
}
}, {
returnOriginal: false
}, (err, result) => {
if (err) {
return callback(err);
}
if (result && result.value) {
maildata.map[node.attachmentId] = result.value._id;
return storeNode(); return storeNode();
}
let returned = false;
node.options.metadata.h = hash;
let store = this.gridstore.openUploadStreamWithId(maildata.map[node.attachmentId], null, node.options);
store.once('error', err => {
if (returned) {
return;
}
returned = true;
callback(err);
});
store.once('finish', () => {
if (returned) {
return;
}
returned = true;
return storeNode();
});
store.end(node.body);
});
}); });
}; };
@ -800,20 +711,4 @@ function leftPad(val, chr, len) {
return chr.repeat(len - val.toString().length) + val; return chr.repeat(len - val.toString().length) + val;
} }
function calculateHash(input, callback) {
let algo = 'sha256';
if (!cryptoAsync) {
setImmediate(() => callback(null, crypto.createHash(algo).update(input).digest('hex')));
return;
}
cryptoAsync.hash(algo, input, (err, hash) => {
if (err) {
return callback(err);
}
return callback(null, hash.toString('hex'));
});
}
module.exports = Indexer; module.exports = Indexer;

88
imap.js
View file

@ -84,60 +84,6 @@ let mailboxHandler;
let gcTimeout; let gcTimeout;
let gcLock; let gcLock;
function deleteOrphanedAttachments(callback) {
// NB! scattered query
let cursor = db.gridfs.collection('attachments.files').find({
'metadata.c': 0,
'metadata.m': 0
});
let deleted = 0;
let processNext = () => {
cursor.next((err, attachment) => {
if (err) {
return callback(err);
}
if (!attachment) {
return cursor.close(() => {
// delete all attachments that do not have any active links to message objects
callback(null, deleted);
});
}
if (!attachment || (attachment.metadata && attachment.metadata.c)) {
// skip
return processNext();
}
// delete file entry first
db.gridfs.collection('attachments.files').deleteOne({
_id: attachment._id,
// make sure that we do not delete a message that is already re-used
'metadata.c': 0,
'metadata.m': 0
}, (err, result) => {
if (err || !result.deletedCount) {
return processNext();
}
// delete data chunks
db.gridfs.collection('attachments.chunks').deleteMany({
files_id: attachment._id
}, err => {
if (err) {
// ignore as we don't really care if we have orphans or not
}
deleted++;
processNext();
});
});
});
};
processNext();
}
function clearExpiredMessages() { function clearExpiredMessages() {
clearTimeout(gcTimeout); clearTimeout(gcTimeout);
let startTime = Date.now(); let startTime = Date.now();
@ -181,7 +127,7 @@ function clearExpiredMessages() {
if (config.imap.disableRetention) { if (config.imap.disableRetention) {
// delete all attachments that do not have any active links to message objects // delete all attachments that do not have any active links to message objects
return deleteOrphanedAttachments(() => done(null, true)); return messageHandler.attachmentStorage.deleteOrphaned(() => done(null, true));
} }
// find and delete all messages that are expired // find and delete all messages that are expired
@ -199,7 +145,7 @@ function clearExpiredMessages() {
mailbox: true, mailbox: true,
uid: true, uid: true,
size: true, size: true,
map: true, 'mimeTree.attachmentMap': true,
magic: true, magic: true,
unseen: true unseen: true
}); });
@ -208,7 +154,7 @@ function clearExpiredMessages() {
let clear = () => let clear = () =>
cursor.close(() => { cursor.close(() => {
// delete all attachments that do not have any active links to message objects // delete all attachments that do not have any active links to message objects
deleteOrphanedAttachments(() => { messageHandler.attachmentStorage.deleteOrphaned(() => {
if (deleted) { if (deleted) {
server.logger.debug( server.logger.debug(
{ {
@ -295,9 +241,25 @@ module.exports = done => {
redis: db.redis redis: db.redis
}); });
messageHandler = new MessageHandler({ database: db.database, gridfs: db.gridfs, redis: db.redis }); messageHandler = new MessageHandler({
userHandler = new UserHandler({ database: db.database, users: db.users, redis: db.redis }); database: db.database,
mailboxHandler = new MailboxHandler({ database: db.database, users: db.users, redis: db.redis, notifier: server.notifier }); redis: db.redis,
gridfs: db.gridfs,
attachments: config.attachments
});
userHandler = new UserHandler({
database: db.database,
users: db.users,
redis: db.redis
});
mailboxHandler = new MailboxHandler({
database: db.database,
users: db.users,
redis: db.redis,
notifier: server.notifier
});
let started = false; let started = false;
@ -324,7 +286,7 @@ module.exports = done => {
}); });
// setup command handlers for the server instance // setup command handlers for the server instance
server.onFetch = onFetch(server); server.onFetch = onFetch(server, messageHandler);
server.onAuth = onAuth(server, userHandler); server.onAuth = onAuth(server, userHandler);
server.onList = onList(server); server.onList = onList(server);
server.onLsub = onLsub(server); server.onLsub = onLsub(server);
@ -337,8 +299,8 @@ module.exports = done => {
server.onStatus = onStatus(server); server.onStatus = onStatus(server);
server.onAppend = onAppend(server, messageHandler); server.onAppend = onAppend(server, messageHandler);
server.onStore = onStore(server); server.onStore = onStore(server);
server.onExpunge = onExpunge(server); server.onExpunge = onExpunge(server, messageHandler);
server.onCopy = onCopy(server); server.onCopy = onCopy(server, messageHandler);
server.onMove = onMove(server, messageHandler); server.onMove = onMove(server, messageHandler);
server.onSearch = onSearch(server); server.onSearch = onSearch(server);
server.onGetQuotaRoot = onGetQuotaRoot(server); server.onGetQuotaRoot = onGetQuotaRoot(server);

View file

@ -253,6 +253,12 @@ indexes:
# attachments.files collection should be sharded by _id (hash) # attachments.files collection should be sharded by _id (hash)
# attachments.chunks collection should be sharded by files_id (hash) # attachments.chunks collection should be sharded by files_id (hash)
- collection: attachments.files
type: gridfs # index applies to gridfs database
index:
name: attachment_id_hashed
key:
_id: hashed
- collection: attachments.files - collection: attachments.files
type: gridfs # index applies to gridfs database type: gridfs # index applies to gridfs database
index: index:

View file

@ -5,7 +5,6 @@ const MongoPaging = require('mongo-cursor-pagination');
const addressparser = require('addressparser'); const addressparser = require('addressparser');
const ObjectID = require('mongodb').ObjectID; const ObjectID = require('mongodb').ObjectID;
const tools = require('../tools'); const tools = require('../tools');
const GridFSBucket = require('mongodb').GridFSBucket;
const libbase64 = require('libbase64'); const libbase64 = require('libbase64');
const libqp = require('libqp'); const libqp = require('libqp');
@ -667,7 +666,7 @@ module.exports = (db, server, messageHandler) => {
_id: true, _id: true,
user: true, user: true,
attachments: true, attachments: true,
map: true 'mimeTree.attachmentMap': true
} }
}, (err, messageData) => { }, (err, messageData) => {
if (err) { if (err) {
@ -683,7 +682,7 @@ module.exports = (db, server, messageHandler) => {
return next(); return next();
} }
let attachmentId = messageData.map[attachment]; let attachmentId = messageData.mimeTree.attachmentMap && messageData.mimeTree.attachmentMap[attachment];
if (!attachmentId) { if (!attachmentId) {
res.json({ res.json({
error: 'This attachment does not exist' error: 'This attachment does not exist'
@ -691,36 +690,25 @@ module.exports = (db, server, messageHandler) => {
return next(); return next();
} }
db.gridfs.collection('attachments.files').findOne({ messageHandler.attachmentStorage.get(attachmentId, (err, attachmentData) => {
_id: attachmentId
}, (err, attachmentData) => {
if (err) { if (err) {
res.json({ res.json({
error: err.message error: err.message
}); });
return next(); return next();
} }
if (!attachmentData) {
res.json({
error: 'This attachment does not exist'
});
return next();
}
res.writeHead(200, { res.writeHead(200, {
'Content-Type': attachmentData.contentType || 'application/octet-stream' 'Content-Type': attachmentData.contentType || 'application/octet-stream'
}); });
let bucket = new GridFSBucket(db.gridfs, { let attachmentStream = messageHandler.attachmentStorage.createReadStream(attachmentId);
bucketName: 'attachments'
});
let attachmentStream = bucket.openDownloadStream(attachmentId);
attachmentStream.once('error', err => res.emit('error', err)); attachmentStream.once('error', err => res.emit('error', err));
if (attachmentData.metadata.transferEncoding === 'base64') { if (attachmentData.transferEncoding === 'base64') {
attachmentStream.pipe(new libbase64.Decoder()).pipe(res); attachmentStream.pipe(new libbase64.Decoder()).pipe(res);
} else if (attachmentData.metadata.transferEncoding === 'quoted-printable') { } else if (attachmentData.transferEncoding === 'quoted-printable') {
attachmentStream.pipe(new libqp.Decoder()).pipe(res); attachmentStream.pipe(new libqp.Decoder()).pipe(res);
} else { } else {
attachmentStream.pipe(res); attachmentStream.pipe(res);
@ -876,7 +864,7 @@ module.exports = (db, server, messageHandler) => {
mailbox: true, mailbox: true,
uid: true, uid: true,
size: true, size: true,
map: true, 'mimeTree.attachmentMap': true,
magic: true, magic: true,
unseen: true unseen: true
} }

84
lib/attachment-storage.js Normal file
View file

@ -0,0 +1,84 @@
'use strict';
const GridstoreStorage = require('./attachments/gridstore-storage.js');
const crypto = require('crypto');
let cryptoAsync;
try {
cryptoAsync = require('@ronomon/crypto-async'); // eslint-disable-line global-require
} catch (E) {
// ignore
}
class AttachmentStorage {
constructor(options) {
this.options = options || {};
let type = (options.options && options.options.type) || 'gridstore';
switch (type) {
case 'gridstore':
default:
this.storage = new GridstoreStorage(this.options);
break;
}
}
get(attachmentId, callback) {
return this.storage.get(attachmentId, callback);
}
create(attachment, callback) {
this.calculateHash(attachment.body, (err, hash) => {
if (err) {
return callback(err);
}
return this.storage.create(attachment, hash, callback);
});
}
createReadStream(id) {
return this.storage.createReadStream(id);
}
deleteMany(ids, magic, callback) {
let pos = 0;
let deleteNext = () => {
if (pos >= ids.length) {
return callback(null, true);
}
let id = ids[pos++];
this.delete(id, magic, deleteNext);
};
deleteNext();
}
updateMany(ids, count, magic, callback) {
this.storage.update(ids, count, magic, callback);
}
delete(id, magic, callback) {
this.storage.delete(id, magic, callback);
}
deleteOrphaned(callback) {
this.storage.deleteOrphaned(callback);
}
calculateHash(input, callback) {
let algo = 'sha256';
if (!cryptoAsync) {
setImmediate(() => callback(null, crypto.createHash(algo).update(input).digest('hex')));
return;
}
cryptoAsync.hash(algo, input, (err, hash) => {
if (err) {
return callback(err);
}
return callback(null, hash.toString('hex'));
});
}
}
module.exports = AttachmentStorage;

View file

@ -0,0 +1,217 @@
'use strict';
const ObjectID = require('mongodb').ObjectID;
const GridFSBucket = require('mongodb').GridFSBucket;
class GridstoreStorage {
constructor(options) {
this.bucketName = (options.options && options.options.bucket) || 'attachments';
this.gridfs = options.gridfs;
this.gridstore = new GridFSBucket(this.gridfs, {
bucketName: this.bucketName
});
}
get(attachmentId, callback) {
this.gridfs.collection(this.bucketName + '.files').findOne({
_id: attachmentId
}, (err, attachmentData) => {
if (err) {
return callback(err);
}
if (!attachmentData) {
return callback(new Error('This attachment does not exist'));
}
return callback(null, {
contentType: attachmentData.contentType,
transferEncoding: attachmentData.metadata.transferEncoding,
length: attachmentData.length,
count: attachmentData.metadata.c,
hash: attachmentData.metadata.h,
metadata: attachmentData.metadata
});
});
}
create(attachment, hash, callback) {
this.gridfs.collection(this.bucketName + '.files').findOneAndUpdate({
'metadata.h': hash
}, {
$inc: {
'metadata.c': 1,
'metadata.m': attachment.magic
}
}, {
returnOriginal: false
}, (err, result) => {
if (err) {
return callback(err);
}
if (result && result.value) {
return callback(null, result.value._id);
}
let returned = false;
let id = new ObjectID();
let metadata = {
h: hash,
m: attachment.magic,
c: 1,
transferEncoding: attachment.transferEncoding
};
Object.keys(attachment.metadata || {}).forEach(key => {
if (!(key in attachment.metadata)) {
metadata[key] = attachment.metadata[key];
}
});
let store = this.gridstore.openUploadStreamWithId(id, null, {
contentType: attachment.contentType,
metadata
});
store.once('error', err => {
if (returned) {
return;
}
returned = true;
callback(err);
});
store.once('finish', () => {
if (returned) {
return;
}
returned = true;
return callback(null, id);
});
store.end(attachment.body);
});
}
createReadStream(id) {
return this.gridstore.openDownloadStream(id);
}
delete(id, magic, callback) {
this.gridfs.collection(this.bucketName + '.files').findOneAndUpdate({
_id: id
}, {
$inc: {
'metadata.c': -1,
'metadata.m': -magic
}
}, {
returnOriginal: false
}, (err, result) => {
if (err) {
return callback(err);
}
if (!result || !result.value) {
return callback(null, false);
}
/*
// disabled as it is preferred that attachments are not deleted immediately but
// after a while by a cleanup process. This gives the opportunity to reuse the
// attachment
if (result.value.metadata.c === 0 && result.value.metadata.m === 0) {
return this.gridstore.delete(id, err => {
if (err) {
return callback(err);
}
callback(null, 1);
});
}
*/
return callback(null, true);
});
}
update(ids, count, magic, callback) {
// update attachments
this.gridfs.collection(this.bucketName + '.files').updateMany(
{
_id: Array.isArray(ids)
? {
$in: ids
}
: ids
},
{
$inc: {
'metadata.c': count,
'metadata.m': magic
}
},
{
multi: true,
w: 1
},
callback
);
}
deleteOrphaned(callback) {
// NB! scattered query
let cursor = this.gridfs.collection(this.bucketName + '.files').find({
'metadata.c': 0,
'metadata.m': 0
});
let deleted = 0;
let processNext = () => {
cursor.next((err, attachment) => {
if (err) {
return callback(err);
}
if (!attachment) {
return cursor.close(() => {
// delete all attachments that do not have any active links to message objects
callback(null, deleted);
});
}
if (!attachment || (attachment.metadata && attachment.metadata.c)) {
// skip
return processNext();
}
// delete file entry first
this.gridfs.collection('attachments.files').deleteOne({
_id: attachment._id,
// make sure that we do not delete a message that is already re-used
'metadata.c': 0,
'metadata.m': 0
}, (err, result) => {
if (err || !result.deletedCount) {
return processNext();
}
// delete data chunks
this.gridfs.collection('attachments.chunks').deleteMany({
files_id: attachment._id
}, err => {
if (err) {
// ignore as we don't really care if we have orphans or not
}
deleted++;
processNext();
});
});
});
};
processNext();
}
}
module.exports = GridstoreStorage;

View file

@ -5,7 +5,7 @@ const db = require('../db');
const tools = require('../tools'); const tools = require('../tools');
// COPY / UID COPY sequence mailbox // COPY / UID COPY sequence mailbox
module.exports = server => (path, update, session, callback) => { module.exports = (server, messageHandler) => (path, update, session, callback) => {
server.logger.debug( server.logger.debug(
{ {
tnx: 'copy', tnx: 'copy',
@ -16,6 +16,7 @@ module.exports = server => (path, update, session, callback) => {
path, path,
update.destination update.destination
); );
db.database.collection('mailboxes').findOne({ db.database.collection('mailboxes').findOne({
user: session.user.id, user: session.user.id,
path path
@ -151,8 +152,9 @@ module.exports = server => (path, update, session, callback) => {
copiedMessages++; copiedMessages++;
copiedStorage += Number(message.size) || 0; copiedStorage += Number(message.size) || 0;
let attachments = Object.keys(message.map || {}).map(key => message.map[key]); let attachmentIds = Object.keys(message.mimetree.attachmentMap || {}).map(key => message.mimetree.attachmentMap[key]);
if (!attachments.length) {
if (!attachmentIds.length) {
let entry = { let entry = {
command: 'EXISTS', command: 'EXISTS',
uid: message.uid, uid: message.uid,
@ -165,20 +167,7 @@ module.exports = server => (path, update, session, callback) => {
return server.notifier.addEntries(session.user.id, target.path, entry, processNext); return server.notifier.addEntries(session.user.id, target.path, entry, processNext);
} }
// update attachments messageHandler.attachmentStorage.updateMany(attachmentIds, 1, message.magic, err => {
db.gridfs.collection('attachments.files').updateMany({
_id: {
$in: attachments
}
}, {
$inc: {
'metadata.c': 1,
'metadata.m': message.magic
}
}, {
multi: true,
w: 1
}, err => {
if (err) { if (err) {
// should we care about this error? // should we care about this error?
} }

View file

@ -3,7 +3,7 @@
const db = require('../db'); const db = require('../db');
// EXPUNGE deletes all messages in selected mailbox marked with \Delete // EXPUNGE deletes all messages in selected mailbox marked with \Delete
module.exports = server => (path, update, session, callback) => { module.exports = (server, messageHandler) => (path, update, session, callback) => {
server.logger.debug( server.logger.debug(
{ {
tnx: 'expunge', tnx: 'expunge',
@ -35,7 +35,7 @@ module.exports = server => (path, update, session, callback) => {
_id: true, _id: true,
uid: true, uid: true,
size: true, size: true,
map: true, 'mimeTree.attachmentMap': true,
magic: true, magic: true,
unseen: true unseen: true
}) })
@ -92,9 +92,9 @@ module.exports = server => (path, update, session, callback) => {
deletedMessages++; deletedMessages++;
deletedStorage += Number(message.size) || 0; deletedStorage += Number(message.size) || 0;
let attachments = Object.keys(message.map || {}).map(key => message.map[key]); let attachmentIds = Object.keys(message.mimeTree.attachmentMap || {}).map(key => message.mimeTree.attachmentMap[key]);
if (!attachments.length) { if (!attachmentIds.length) {
// not stored attachments // not stored attachments
return server.notifier.addEntries( return server.notifier.addEntries(
session.user.id, session.user.id,
@ -110,22 +110,9 @@ module.exports = server => (path, update, session, callback) => {
); );
} }
// remove references to attachments (if any exist) messageHandler.attachmentStorage.updateMany(attachmentIds, -1, -message.magic, err => {
db.gridfs.collection('attachments.files').updateMany({
_id: {
$in: attachments
}
}, {
$inc: {
'metadata.c': -1,
'metadata.m': -message.magic
}
}, {
multi: true,
w: 1
}, err => {
if (err) { if (err) {
// ignore as we don't really care if we have orphans or not // should we care about this error?
} }
server.notifier.addEntries( server.notifier.addEntries(
session.user.id, session.user.id,

View file

@ -7,7 +7,7 @@ const db = require('../db');
const tools = require('../tools'); const tools = require('../tools');
const consts = require('../consts'); const consts = require('../consts');
module.exports = server => (path, options, session, callback) => { module.exports = (server, messageHandler) => (path, options, session, callback) => {
server.logger.debug( server.logger.debug(
{ {
tnx: 'fetch', tnx: 'fetch',
@ -119,7 +119,7 @@ module.exports = server => (path, options, session, callback) => {
logger: server.logger, logger: server.logger,
fetchOptions: {}, fetchOptions: {},
database: db.database, database: db.database,
gridfs: db.gridfs, attachmentStorage: messageHandler.attachmentStorage,
acceptUTF8Enabled: session.isUTF8Enabled() acceptUTF8Enabled: session.isUTF8Enabled()
}) })
}) })

View file

@ -5,6 +5,7 @@ const uuidV1 = require('uuid/v1');
const ObjectID = require('mongodb').ObjectID; const ObjectID = require('mongodb').ObjectID;
const Indexer = require('../imap-core/lib/indexer/indexer'); const Indexer = require('../imap-core/lib/indexer/indexer');
const ImapNotifier = require('./imap-notifier'); const ImapNotifier = require('./imap-notifier');
const AttachmentStorage = require('./attachment-storage');
const libmime = require('libmime'); const libmime = require('libmime');
const counters = require('./counters'); const counters = require('./counters');
const consts = require('./consts'); const consts = require('./consts');
@ -21,17 +22,25 @@ class MessageHandler {
constructor(options) { constructor(options) {
this.database = options.database; this.database = options.database;
this.redis = options.redis; this.redis = options.redis;
this.indexer = new Indexer({
database: options.database, this.attachmentStorage =
gridfs: options.gridfs options.attachmentStorage ||
new AttachmentStorage({
gridfs: options.gridfs || options.database,
options: options.attachments
}); });
this.indexer = new Indexer({
attachmentStorage: this.attachmentStorage
});
this.notifier = new ImapNotifier({ this.notifier = new ImapNotifier({
database: options.database, database: options.database,
redis: this.redis, redis: this.redis,
pushOnly: true pushOnly: true
}); });
this.users = options.users || options.database; this.users = options.users || options.database;
this.gridfs = options.gridfs || options.database;
this.counters = counters(this.redis); this.counters = counters(this.redis);
} }
@ -111,17 +120,12 @@ class MessageHandler {
return callback(...args); return callback(...args);
} }
let attachments = Object.keys(maildata.map || {}).map(key => maildata.map[key]); let attachmentIds = Object.keys(mimeTree.attachmentMap || {}).map(key => mimeTree.attachmentMap[key]);
if (!attachments.length) { if (!attachmentIds.length) {
return callback(...args); return callback(...args);
} }
// error occured, remove attachments this.attachmentStorage.deleteMany(attachmentIds, maildata.magic, () => callback(...args));
this.gridfs.collection('attachments.files').deleteMany({
_id: {
$in: attachments
}
}, () => callback(...args));
}; };
this.indexer.storeNodeBodies(id, maildata, mimeTree, err => { this.indexer.storeNodeBodies(id, maildata, mimeTree, err => {
@ -130,7 +134,7 @@ class MessageHandler {
} }
// prepare message object // prepare message object
let message = { let messageData = {
_id: id, _id: id,
// should be kept when COPY'ing or MOVE'ing // should be kept when COPY'ing or MOVE'ing
@ -166,37 +170,38 @@ class MessageHandler {
draft: flags.includes('\\Draft'), draft: flags.includes('\\Draft'),
magic: maildata.magic, magic: maildata.magic,
map: maildata.map,
subject subject
}; };
if (maildata.attachments && maildata.attachments.length) { if (maildata.attachments && maildata.attachments.length) {
message.attachments = maildata.attachments; messageData.attachments = maildata.attachments;
message.ha = true; messageData.ha = true;
} else { } else {
message.ha = false; messageData.ha = false;
} }
if (maildata.text) { if (maildata.text) {
message.text = maildata.text.replace(/\r\n/g, '\n').trim(); messageData.text = maildata.text.replace(/\r\n/g, '\n').trim();
// text is indexed with a fulltext index, so only store the beginning of it // text is indexed with a fulltext index, so only store the beginning of it
message.text = messageData.text =
message.text.length <= consts.MAX_PLAINTEXT_CONTENT ? message.text : message.text.substr(0, consts.MAX_PLAINTEXT_CONTENT); messageData.text.length <= consts.MAX_PLAINTEXT_CONTENT
message.intro = message.text.replace(/\s+/g, ' ').trim(); ? messageData.text
if (message.intro.length > 128) { : messageData.text.substr(0, consts.MAX_PLAINTEXT_CONTENT);
let intro = message.intro.substr(0, 128); messageData.intro = messageData.text.replace(/\s+/g, ' ').trim();
if (messageData.intro.length > 128) {
let intro = messageData.intro.substr(0, 128);
let lastSp = intro.lastIndexOf(' '); let lastSp = intro.lastIndexOf(' ');
if (lastSp > 0) { if (lastSp > 0) {
intro = intro.substr(0, lastSp); intro = intro.substr(0, lastSp);
} }
message.intro = intro + '…'; messageData.intro = intro + '…';
} }
} }
if (maildata.html && maildata.html.length) { if (maildata.html && maildata.html.length) {
let htmlSize = 0; let htmlSize = 0;
message.html = maildata.html messageData.html = maildata.html
.map(html => { .map(html => {
if (htmlSize >= consts.MAX_HTML_CONTENT || !html) { if (htmlSize >= consts.MAX_HTML_CONTENT || !html) {
return ''; return '';
@ -262,17 +267,17 @@ class MessageHandler {
let mailboxData = item.value; let mailboxData = item.value;
// updated message object by setting mailbox specific values // updated message object by setting mailbox specific values
message.mailbox = mailboxData._id; messageData.mailbox = mailboxData._id;
message.user = mailboxData.user; messageData.user = mailboxData.user;
message.uid = mailboxData.uidNext; messageData.uid = mailboxData.uidNext;
message.modseq = mailboxData.modifyIndex + 1; messageData.modseq = mailboxData.modifyIndex + 1;
if (!['\\Junk', '\\Trash'].includes(mailboxData.specialUse)) { if (!['\\Junk', '\\Trash'].includes(mailboxData.specialUse)) {
message.searchable = true; messageData.searchable = true;
} }
if (mailboxData.specialUse === '\\Junk') { if (mailboxData.specialUse === '\\Junk') {
message.junk = true; messageData.junk = true;
} }
this.getThreadId(mailboxData.user, subject, mimeTree, (err, thread) => { this.getThreadId(mailboxData.user, subject, mimeTree, (err, thread) => {
@ -280,18 +285,18 @@ class MessageHandler {
return rollback(err); return rollback(err);
} }
message.thread = thread; messageData.thread = thread;
this.database.collection('messages').insertOne(message, err => { this.database.collection('messages').insertOne(messageData, err => {
if (err) { if (err) {
return rollback(err); return rollback(err);
} }
let uidValidity = mailboxData.uidValidity; let uidValidity = mailboxData.uidValidity;
let uid = message.uid; let uid = messageData.uid;
if (options.session && options.session.selected && options.session.selected.mailbox === mailboxData.path) { if (options.session && options.session.selected && options.session.selected.mailbox === mailboxData.path) {
options.session.writeStream.write(options.session.formatResponse('EXISTS', message.uid)); options.session.writeStream.write(options.session.formatResponse('EXISTS', messageData.uid));
} }
this.notifier.addEntries( this.notifier.addEntries(
@ -299,18 +304,18 @@ class MessageHandler {
false, false,
{ {
command: 'EXISTS', command: 'EXISTS',
uid: message.uid, uid: messageData.uid,
ignore: options.session && options.session.id, ignore: options.session && options.session.id,
message: message._id, message: messageData._id,
modseq: message.modseq, modseq: messageData.modseq,
unseen: message.unseen unseen: messageData.unseen
}, },
() => { () => {
this.notifier.fire(mailboxData.user, mailboxData.path); this.notifier.fire(mailboxData.user, mailboxData.path);
return cleanup(null, true, { return cleanup(null, true, {
uidValidity, uidValidity,
uid, uid,
id: message._id, id: messageData._id,
status: 'new' status: 'new'
}); });
} }
@ -494,30 +499,12 @@ class MessageHandler {
}, },
() => { () => {
let updateAttachments = next => { let updateAttachments = next => {
let attachments = Object.keys(message.map || {}).map(key => message.map[key]); let attachmentIds = Object.keys(message.mimeTree.attachmentMap || {}).map(key => message.mimeTree.attachmentMap[key]);
if (!attachments.length) { if (!attachmentIds.length) {
return next(); return next();
} }
// remove link to message from attachments (if any exist) this.attachmentStorage.deleteMany(attachmentIds, next);
this.gridfs.collection('attachments.files').updateMany({
_id: {
$in: attachments
}
}, {
$inc: {
'metadata.c': -1,
'metadata.m': -message.magic
}
}, {
multi: true,
w: 1
}, err => {
if (err) {
// ignore as we don't really care if we have orphans or not
}
next();
});
}; };
updateAttachments(() => { updateAttachments(() => {

View file

@ -432,7 +432,13 @@ module.exports = done => {
return setImmediate(() => done(null, false)); return setImmediate(() => done(null, false));
} }
messageHandler = new MessageHandler({ database: db.database, gridfs: db.gridfs, users: db.users, redis: db.redis }); messageHandler = new MessageHandler({
database: db.database,
users: db.users,
redis: db.redis,
gridfs: db.gridfs,
attachments: config.attachments
});
let started = false; let started = false;

View file

@ -1,6 +1,6 @@
{ {
"name": "wildduck", "name": "wildduck",
"version": "1.0.66", "version": "1.0.67",
"description": "IMAP server built with Node.js and MongoDB", "description": "IMAP server built with Node.js and MongoDB",
"main": "server.js", "main": "server.js",
"scripts": { "scripts": {

14
pop3.js
View file

@ -307,8 +307,18 @@ module.exports = done => {
let started = false; let started = false;
messageHandler = new MessageHandler({ database: db.database, gridfs: db.gridfs, redis: db.redis }); messageHandler = new MessageHandler({
userHandler = new UserHandler({ database: db.database, users: db.users, redis: db.redis }); database: db.database,
redis: db.redis,
gridfs: db.gridfs,
attachments: config.attachments
});
userHandler = new UserHandler({
database: db.database,
users: db.users,
redis: db.redis
});
server.on('error', err => { server.on('error', err => {
if (!started) { if (!started) {