wildduck/lib/message-handler.js

671 lines
28 KiB
JavaScript
Raw Normal View History

'use strict';
const config = require('config');
const uuidV1 = require('uuid/v1');
const ObjectID = require('mongodb').ObjectID;
const RedFour = require('redfour');
const Indexer = require('../imap-core/lib/indexer/indexer');
const ImapNotifier = require('./imap-notifier');
2017-03-30 01:06:09 +08:00
const tools = require('./tools');
2017-04-02 00:22:47 +08:00
const libmime = require('libmime');
2017-04-09 17:33:10 +08:00
// home many modifications to cache before writing
const BULK_BATCH_SIZE = 150;
class MessageHandler {
constructor(database) {
this.database = database;
this.indexer = new Indexer({
database
});
this.notifier = new ImapNotifier({
database,
pushOnly: true
});
this.redlock = new RedFour({
2017-03-30 01:06:09 +08:00
redis: tools.redisConfig(config.redis),
namespace: 'wildduck'
});
}
getMailbox(options, callback) {
let query = {};
if (options.mailbox) {
if (typeof options.mailbox === 'object' && options.mailbox._id) {
2017-04-10 22:12:47 +08:00
return setImmediate(() => callback(null, options.mailbox));
}
query._id = options.mailbox;
} else {
query.user = options.user;
2017-04-11 05:36:22 +08:00
if (options.specialUse) {
query.specialUse = options.specialUse;
} else {
query.path = options.path;
}
}
2017-04-11 05:36:22 +08:00
this.database.collection('mailboxes').findOne(query, (err, mailbox) => {
if (err) {
return callback(err);
}
if (!mailbox) {
let err = new Error('Mailbox is missing');
err.imapResponse = 'TRYCREATE';
return callback(err);
}
callback(null, mailbox);
});
}
2017-04-11 05:36:22 +08:00
// Monster method for inserting new messages to a mailbox
// TODO: Refactor into smaller pieces
add(options, callback) {
let id = new ObjectID();
let mimeTree = this.indexer.parseMimeTree(options.raw);
let size = this.indexer.getSize(mimeTree);
let bodystructure = this.indexer.getBodyStructure(mimeTree);
let envelope = this.indexer.getEnvelope(mimeTree);
2017-04-10 22:12:47 +08:00
let internaldate = options.date && new Date(options.date) || new Date();
let hdate = mimeTree.parsedHeader.date && new Date(mimeTree.parsedHeader.date) || false;
let flags = [].concat(options.flags || []);
if (!hdate || hdate.toString() === 'Invalid Date') {
hdate = internaldate;
}
let msgid = envelope[9] || ('<' + uuidV1() + '@wildduck.email>');
2017-04-02 00:22:47 +08:00
let headers = (mimeTree.header || []).map(line => {
line = Buffer.from(line, 'binary').toString();
let key = line.substr(0, line.indexOf(':')).trim().toLowerCase();
let value = line.substr(line.indexOf(':') + 1).trim().toLowerCase().replace(/\s*\r?\n\s*/g, ' ');
try {
value = libmime.decodeWords(value);
} catch (E) {
// ignore
}
// trim long values as mongodb indexed fields can not be too long
if (Buffer.byteLength(key, 'utf-8') >= 255) {
key = Buffer.from(key).slice(0, 255).toString();
key = key.substr(0, key.length - 4);
}
if (Buffer.byteLength(value, 'utf-8') >= 880) {
// value exceeds MongoDB max indexed value length
value = Buffer.from(value).slice(0, 880).toString();
// remove last 4 chars to be sure we do not have any incomplete unicode sequences
value = value.substr(0, value.length - 4);
}
2017-04-02 00:22:47 +08:00
return {
key,
value
};
});
this.getMailbox(options, (err, mailbox) => {
if (err) {
return callback(err);
}
2017-04-11 05:36:22 +08:00
// if a similar message already exists then update existing one
2017-04-10 22:12:47 +08:00
let checkExisting = next => {
this.database.collection('messages').findOne({
mailbox: mailbox._id,
hdate,
msgid
}, (err, existing) => {
if (err) {
return callback(err);
}
2017-04-10 22:12:47 +08:00
if (!existing) {
2017-04-11 05:36:22 +08:00
// nothing to do here, continue adding message
2017-04-10 22:12:47 +08:00
return next();
}
2017-04-10 22:12:47 +08:00
if (options.skipExisting) {
// message already exists, just skip it
2017-04-12 19:52:29 +08:00
return callback(null, false, {
id: existing.id
});
2017-04-10 22:12:47 +08:00
}
2017-04-11 05:36:22 +08:00
// As duplicate message was found, update UID, MODSEQ and FLAGS
// Ensure sequential UID by locking mailbox
this.redlock.waitAcquireLock(mailbox._id.toString(), 30 * 1000, 10 * 1000, (err, lock) => {
2017-04-10 22:12:47 +08:00
if (err) {
return callback(err);
}
2017-04-11 05:36:22 +08:00
if (!lock || !lock.success) {
// did not get a insert lock in 10 seconds
return callback(new Error('Failed to acquire lock'));
}
// acquire new UID+MODSEQ
this.database.collection('mailboxes').findOneAndUpdate({
_id: mailbox._id
}, {
$inc: {
// allocate bot UID and MODSEQ values so when journal is later sorted by
// modseq then UIDs are always in ascending order
uidNext: 1,
modifyIndex: 1
}
}, {
returnOriginal: true
}, (err, item) => {
if (err) {
return this.redlock.releaseLock(lock, () => callback(err));
}
if (!item || !item.value) {
// was not able to acquire a lock
let err = new Error('Mailbox is missing');
err.imapResponse = 'TRYCREATE';
return this.redlock.releaseLock(lock, () => callback(err));
}
let mailbox = item.value;
let uid = mailbox.uidNext;
let modseq = mailbox.modifyIndex + 1;
this.database.collection('messages').findOneAndUpdate({
_id: existing._id
}, {
$set: {
uid,
modseq,
flags
}
}, {
returnOriginal: false
}, (err, item) => {
if (err) {
return this.redlock.releaseLock(lock, () => callback(err));
}
if (!item || !item.value) {
// message was not found for whatever reason
return this.redlock.releaseLock(lock, next);
}
let updated = item.value;
if (options.session && options.session.selected && options.session.selected.mailbox === mailbox.path) {
options.session.writeStream.write(options.session.formatResponse('EXPUNGE', existing.uid));
}
if (options.session && options.session.selected && options.session.selected.mailbox === mailbox.path) {
options.session.writeStream.write(options.session.formatResponse('EXISTS', updated.uid));
}
this.notifier.addEntries(mailbox, false, {
command: 'EXPUNGE',
ignore: options.session && options.session.id,
uid: existing.uid,
message: existing._id
}, () => {
this.notifier.addEntries(mailbox, false, {
command: 'EXISTS',
uid: updated.uid,
ignore: options.session && options.session.id,
message: updated._id,
modseq: updated.modseq
}, () => {
this.notifier.fire(mailbox.user, mailbox.path);
return this.redlock.releaseLock(lock, () => callback(null, true, {
uidValidity: mailbox.uidValidity,
2017-04-12 19:52:29 +08:00
uid,
id: existing._id
2017-04-11 05:36:22 +08:00
}));
});
});
});
});
2017-04-10 22:12:47 +08:00
});
});
};
2017-04-10 22:12:47 +08:00
checkExisting(() => {
this.indexer.processContent(id, mimeTree, (err, maildata) => {
if (err) {
return callback(err);
}
2017-04-10 22:12:47 +08:00
// prepare message object
let message = {
_id: id,
internaldate,
hdate,
flags,
size,
meta: options.meta || {},
headers,
mimeTree,
envelope,
bodystructure,
msgid,
// use boolean for more common flags
seen: flags.includes('\\Seen'),
flagged: flags.includes('\\Flagged'),
deleted: flags.includes('\\Deleted')
};
if (maildata.attachments && maildata.attachments.length) {
message.attachments = maildata.attachments;
message.hasAttachments = true;
} else {
message.hasAttachments = false;
}
2017-04-10 22:12:47 +08:00
let maxTextLength = 300 * 1024;
2017-04-10 22:12:47 +08:00
if (maildata.text) {
message.text = maildata.text.replace(/\r\n/g, '\n').trim();
message.text = message.text.length <= maxTextLength ? message.text : message.text.substr(0, maxTextLength);
message.intro = message.text.replace(/\s+/g, ' ').trim();
if (message.intro.length > 128) {
message.intro = message.intro.substr(0, 128) + '…';
}
2017-04-10 22:12:47 +08:00
}
2017-04-10 22:12:47 +08:00
if (maildata.html && maildata.html.length) {
let htmlSize = 0;
message.html = maildata.html.map(html => {
if (htmlSize >= maxTextLength || !html) {
return '';
}
2017-04-10 22:12:47 +08:00
if (htmlSize + Buffer.byteLength(html) <= maxTextLength) {
htmlSize += Buffer.byteLength(html);
return html;
}
2017-04-10 22:12:47 +08:00
html = html.substr(0, htmlSize + Buffer.byteLength(html) - maxTextLength);
htmlSize += Buffer.byteLength(html);
return html;
}).filter(html => html);
}
2017-04-10 22:12:47 +08:00
// Another server might be waiting for the lock
this.redlock.waitAcquireLock(mailbox._id.toString(), 30 * 1000, 10 * 1000, (err, lock) => {
if (err) {
return callback(err);
}
2017-04-10 22:12:47 +08:00
if (!lock || !lock.success) {
// did not get a insert lock in 10 seconds
return callback(new Error('The user you are trying to contact is receiving mail at a rate that prevents additional messages from being delivered. Please resend your message at a later time'));
}
2017-04-10 22:12:47 +08:00
this.database.collection('users').findOneAndUpdate({
_id: mailbox.user
2017-03-27 04:58:05 +08:00
}, {
$inc: {
2017-04-10 22:12:47 +08:00
storageUsed: size
2017-03-27 04:58:05 +08:00
}
2017-04-10 22:12:47 +08:00
}, err => {
if (err) {
2017-04-10 22:12:47 +08:00
this.redlock.releaseLock(lock, () => false);
return callback(err);
}
2017-04-10 22:12:47 +08:00
let rollback = err => {
this.database.collection('users').findOneAndUpdate({
_id: mailbox.user
}, {
$inc: {
storageUsed: -size
}
}, () => {
this.redlock.releaseLock(lock, () => callback(err));
});
};
2017-04-02 01:15:10 +08:00
2017-04-10 22:12:47 +08:00
// acquire new UID+MODSEQ
this.database.collection('mailboxes').findOneAndUpdate({
_id: mailbox._id
}, {
$inc: {
// allocate bot UID and MODSEQ values so when journal is later sorted by
// modseq then UIDs are always in ascending order
uidNext: 1,
modifyIndex: 1
}
}, (err, item) => {
2017-03-27 04:58:05 +08:00
if (err) {
2017-03-30 01:06:09 +08:00
return rollback(err);
2017-03-27 04:58:05 +08:00
}
2017-04-10 22:12:47 +08:00
if (!item || !item.value) {
// was not able to acquire a lock
let err = new Error('Mailbox is missing');
err.imapResponse = 'TRYCREATE';
return rollback(err);
}
2017-03-27 04:58:05 +08:00
2017-04-10 22:12:47 +08:00
let mailbox = item.value;
// updated message object by setting mailbox specific values
message.mailbox = mailbox._id;
message.user = mailbox.user;
message.uid = mailbox.uidNext;
message.modseq = mailbox.modifyIndex + 1;
this.database.collection('messages').insertOne(message, err => {
if (err) {
return rollback(err);
}
let uidValidity = mailbox.uidValidity;
let uid = message.uid;
if (options.session && options.session.selected && options.session.selected.mailbox === mailbox.path) {
options.session.writeStream.write(options.session.formatResponse('EXISTS', message.uid));
}
this.notifier.addEntries(mailbox, false, {
command: 'EXISTS',
uid: message.uid,
ignore: options.session && options.session.id,
message: message._id,
modseq: message.modseq
}, () => {
this.redlock.releaseLock(lock, () => {
this.notifier.fire(mailbox.user, mailbox.path);
return callback(null, true, {
uidValidity,
2017-04-12 19:52:29 +08:00
uid,
id: message._id
2017-04-10 22:12:47 +08:00
});
2017-03-27 04:58:05 +08:00
});
});
});
});
});
});
});
});
});
}
updateQuota(mailbox, inc, callback) {
inc = inc || {};
this.database.collection('mailboxes').findOneAndUpdate({
_id: mailbox._id
}, {
$inc: {
2017-04-04 18:30:38 +08:00
storageUsed: Number(inc.storageUsed) || 0
}
}, () => {
this.database.collection('users').findOneAndUpdate({
_id: mailbox.user
}, {
$inc: {
2017-04-04 18:30:38 +08:00
storageUsed: Number(inc.storageUsed) || 0
}
}, callback);
});
}
2017-04-10 22:12:47 +08:00
del(options, callback) {
this.database.collection('messages').findOne(options.query, (err, message) => {
if (err) {
return callback(err);
}
if (!message) {
return callback(new Error('Message does not exist'));
}
2017-04-10 22:12:47 +08:00
this.getMailbox({
mailbox: options.mailbox || message.mailbox
}, (err, mailbox) => {
if (err) {
return callback(err);
}
this.database.collection('messages').deleteOne({
_id: message._id
}, err => {
if (err) {
return callback(err);
}
this.updateQuota(mailbox, {
2017-04-04 18:30:38 +08:00
storageUsed: -message.size
}, () => {
// remove link to message from attachments (if any exist)
this.database.collection('attachments.files').updateMany({
'metadata.messages': message._id
}, {
$pull: {
'metadata.messages': message._id
}
}, {
multi: true,
w: 1
}, err => {
if (err) {
// ignore as we don't really care if we have orphans or not
}
2017-04-10 22:12:47 +08:00
if (options.session && options.session.selected && options.session.selected.mailbox === mailbox.path) {
options.session.writeStream.write(options.session.formatResponse('EXPUNGE', message.uid));
}
this.notifier.addEntries(mailbox, false, {
command: 'EXPUNGE',
2017-04-10 22:12:47 +08:00
ignore: options.session && options.session.id,
uid: message.uid,
message: message._id
}, () => {
this.notifier.fire(mailbox.user, mailbox.path);
// delete all attachments that do not have any active links to message objects
this.database.collection('attachments.files').deleteMany({
'metadata.messages': {
$size: 0
}
}, err => {
if (err) {
// ignore as we don't really care if we have orphans or not
}
return callback(null, true);
});
});
});
});
});
});
});
}
2017-04-09 17:33:10 +08:00
move(options, callback) {
this.getMailbox(options.source, (err, mailbox) => {
if (err) {
return callback(err);
}
this.getMailbox(options.destination, (err, target) => {
if (err) {
return callback(err);
}
this.database.collection('mailboxes').findOneAndUpdate({
_id: mailbox._id
}, {
$inc: {
// increase the mailbox modification index
// to indicate that something happened
modifyIndex: 1
}
}, {
uidNext: true
}, () => {
let cursor = this.database.collection('messages').find({
mailbox: mailbox._id,
uid: {
$in: options.messages || []
}
}).project({
uid: 1
}).sort([
['uid', 1]
]);
let sourceUid = [];
let destinationUid = [];
let removeEntries = [];
let existsEntries = [];
let done = err => {
let next = () => {
if (err) {
return callback(err);
}
return callback(null, true, {
uidValidity: target.uidValidity,
sourceUid,
destinationUid
});
};
if (existsEntries.length) {
// mark messages as deleted from old mailbox
return this.notifier.addEntries(mailbox, false, removeEntries, () => {
// mark messages as added to new mailbox
this.notifier.addEntries(target, false, existsEntries, () => {
this.notifier.fire(mailbox.user, mailbox.path);
this.notifier.fire(target.user, target.path);
next();
});
});
}
next();
};
let processNext = () => {
cursor.next((err, message) => {
if (err) {
return done(err);
}
if (!message) {
return cursor.close(done);
}
sourceUid.unshift(message.uid);
this.database.collection('mailboxes').findOneAndUpdate({
_id: target._id
}, {
$inc: {
uidNext: 1
}
}, {
uidNext: true
}, (err, item) => {
if (err) {
return done(err);
}
if (!item || !item.value) {
return done(new Error('Mailbox disappeared'));
}
let uidNext = item.value.uidNext;
destinationUid.unshift(uidNext);
let updateOptions = {
$set: {
mailbox: target._id,
// new mailbox means new UID
uid: uidNext,
// this will be changed later by the notification system
modseq: 0
}
};
if (options.markAsSeen) {
updateOptions.$set.seen = true;
updateOptions.$addToSet = {
flags: '\\Seen'
};
}
// update message, change mailbox from old to new one
this.database.collection('messages').findOneAndUpdate({
_id: message._id
}, updateOptions, err => {
if (err) {
return done(err);
}
if (options.session) {
options.session.writeStream.write(options.session.formatResponse('EXPUNGE', message.uid));
}
removeEntries.push({
command: 'EXPUNGE',
ignore: options.session && options.session.id,
uid: message.uid
});
existsEntries.push({
command: 'EXISTS',
uid: uidNext,
message: message._id
});
if (existsEntries.length >= BULK_BATCH_SIZE) {
// mark messages as deleted from old mailbox
return this.notifier.addEntries(mailbox, false, removeEntries, () => {
// mark messages as added to new mailbox
this.notifier.addEntries(target, false, existsEntries, () => {
removeEntries = [];
existsEntries = [];
this.notifier.fire(mailbox.user, mailbox.path);
this.notifier.fire(target.user, target.path);
processNext();
});
});
}
processNext();
});
});
});
};
processNext();
});
});
});
}
}
module.exports = MessageHandler;