wildduck/lib/message-handler.js

800 lines
32 KiB
JavaScript
Raw Normal View History

'use strict';
const config = require('config');
const redis = require('redis');
const uuidV1 = require('uuid/v1');
const ObjectID = require('mongodb').ObjectID;
const Indexer = require('../imap-core/lib/indexer/indexer');
const ImapNotifier = require('./imap-notifier');
2017-03-30 01:06:09 +08:00
const tools = require('./tools');
2017-04-02 00:22:47 +08:00
const libmime = require('libmime');
const counters = require('./counters');
2017-06-08 21:04:34 +08:00
const parseDate = require('../imap-core/lib/parse-date');
2017-05-23 22:17:54 +08:00
// how many modifications to cache before writing
2017-04-09 17:33:10 +08:00
const BULK_BATCH_SIZE = 150;
2017-05-23 22:17:54 +08:00
const SCHEMA_VERSION = '1.0';
2017-06-05 21:40:48 +08:00
// how much plaintext to store. this is indexed with a fulltext index
const MAX_PLAINTEXT_CONTENT = 2 * 1024;
// how much HTML content to store. not indexed
const MAX_HTML_CONTENT = 300 * 1024;
2017-05-23 22:17:54 +08:00
// index only the following headers for SEARCH
const INDEXED_HEADERS = ['to', 'cc', 'subject', 'from', 'sender', 'reply-to', 'message-id', 'thread-index'];
2017-04-09 17:33:10 +08:00
class MessageHandler {
constructor(database, redisConfig) {
this.database = database;
this.redis = redisConfig || tools.redisConfig(config.redis);
this.indexer = new Indexer({
database
});
this.notifier = new ImapNotifier({
database,
pushOnly: true
});
this.counters = counters(redis.createClient(this.redis));
}
getMailbox(options, callback) {
let query = {};
if (options.mailbox) {
if (typeof options.mailbox === 'object' && options.mailbox._id) {
2017-04-10 22:12:47 +08:00
return setImmediate(() => callback(null, options.mailbox));
}
query._id = options.mailbox;
} else {
query.user = options.user;
2017-04-11 05:36:22 +08:00
if (options.specialUse) {
query.specialUse = options.specialUse;
} else {
query.path = options.path;
}
}
2017-04-11 05:36:22 +08:00
this.database.collection('mailboxes').findOne(query, (err, mailbox) => {
if (err) {
return callback(err);
}
if (!mailbox) {
let err = new Error('Mailbox is missing');
err.imapResponse = 'TRYCREATE';
return callback(err);
}
callback(null, mailbox);
});
}
2017-04-11 05:36:22 +08:00
// Monster method for inserting new messages to a mailbox
// TODO: Refactor into smaller pieces
add(options, callback) {
2017-04-13 16:35:39 +08:00
let prepared = options.prepared || this.prepareMessage(options);
let id = prepared.id;
let mimeTree = prepared.mimeTree;
let size = prepared.size;
let bodystructure = prepared.bodystructure;
let envelope = prepared.envelope;
let idate = prepared.idate;
let hdate = prepared.hdate;
let msgid = prepared.msgid;
let headers = prepared.headers;
2017-04-24 20:15:53 +08:00
let flags = Array.isArray(options.flags) ? options.flags : [].concat(options.flags || []);
2017-05-15 21:09:08 +08:00
let maildata = options.maildata || this.indexer.getMaildata(id, mimeTree);
2017-04-02 00:22:47 +08:00
this.getMailbox(options, (err, mailbox) => {
if (err) {
return callback(err);
}
this.checkExistingMessage(
mailbox._id,
{
hdate,
msgid,
flags
},
options,
(...args) => {
if (args[0] || args[1]) {
2017-05-15 21:09:08 +08:00
return callback(...args);
}
let cleanup = (...args) => {
if (!args[0]) {
return callback(...args);
2017-05-15 21:09:08 +08:00
}
let attachments = Object.keys(maildata.map || {}).map(key => maildata.map[key]);
if (!attachments.length) {
return callback(...args);
}
2017-05-15 21:09:08 +08:00
// error occured, remove attachments
this.database.collection('attachments.files').deleteMany({
_id: {
$in: attachments
}
}, () => callback(...args));
};
this.indexer.storeNodeBodies(id, maildata, mimeTree, err => {
if (err) {
return cleanup(err);
}
2017-04-10 22:12:47 +08:00
// prepare message object
let message = {
_id: id,
2017-05-23 22:17:54 +08:00
v: SCHEMA_VERSION,
// if true then expirest after rdate + retention
exp: !!mailbox.retention,
rdate: Date.now() + (mailbox.retention || 0),
2017-04-10 22:12:47 +08:00
idate,
hdate,
flags,
size,
2017-04-10 22:12:47 +08:00
// some custom metadata about the delivery
meta: options.meta || {},
2017-04-24 19:51:11 +08:00
// list filter IDs that matched this message
filters: Array.isArray(options.filters) ? options.filters : [].concat(options.filters || []),
2017-04-10 22:12:47 +08:00
headers,
mimeTree,
envelope,
bodystructure,
msgid,
2017-05-15 21:09:08 +08:00
// use boolean for more commonly used (and searched for) flags
seen: flags.includes('\\Seen'),
flagged: flags.includes('\\Flagged'),
deleted: flags.includes('\\Deleted'),
draft: flags.includes('\\Draft'),
2017-04-10 22:12:47 +08:00
magic: maildata.magic,
map: maildata.map
};
if (maildata.attachments && maildata.attachments.length) {
message.attachments = maildata.attachments;
message.ha = true;
} else {
message.ha = false;
}
if (maildata.text) {
message.text = maildata.text.replace(/\r\n/g, '\n').trim();
2017-06-05 21:40:48 +08:00
// text is indexed with a fulltext index, so only store the beginning of it
message.text = message.text.length <= MAX_PLAINTEXT_CONTENT ? message.text : message.text.substr(0, MAX_PLAINTEXT_CONTENT);
message.intro = message.text.replace(/\s+/g, ' ').trim();
if (message.intro.length > 128) {
let intro = message.intro.substr(0, 128);
let lastSp = intro.lastIndexOf(' ');
if (lastSp > 0) {
intro = intro.substr(0, lastSp);
}
message.intro = intro + '…';
2017-04-10 22:12:47 +08:00
}
}
if (maildata.html && maildata.html.length) {
let htmlSize = 0;
message.html = maildata.html
.map(html => {
2017-06-05 21:40:48 +08:00
if (htmlSize >= MAX_HTML_CONTENT || !html) {
return '';
}
2017-06-05 21:40:48 +08:00
if (htmlSize + Buffer.byteLength(html) <= MAX_HTML_CONTENT) {
htmlSize += Buffer.byteLength(html);
return html;
}
2017-06-05 21:40:48 +08:00
html = html.substr(0, htmlSize + Buffer.byteLength(html) - MAX_HTML_CONTENT);
htmlSize += Buffer.byteLength(html);
return html;
})
.filter(html => html);
}
this.database.collection('users').findOneAndUpdate({
_id: mailbox.user
2017-03-27 04:58:05 +08:00
}, {
$inc: {
storageUsed: size
2017-03-27 04:58:05 +08:00
}
}, err => {
if (err) {
return cleanup(err);
}
let rollback = err => {
this.database.collection('users').findOneAndUpdate({
_id: mailbox.user
}, {
$inc: {
storageUsed: -size
}
}, () => {
cleanup(err);
});
};
// acquire new UID+MODSEQ
this.database.collection('mailboxes').findOneAndUpdate({
_id: mailbox._id
}, {
$inc: {
// allocate bot UID and MODSEQ values so when journal is later sorted by
// modseq then UIDs are always in ascending order
uidNext: 1,
modifyIndex: 1
}
}, (err, item) => {
2017-03-27 04:58:05 +08:00
if (err) {
2017-03-30 01:06:09 +08:00
return rollback(err);
2017-03-27 04:58:05 +08:00
}
if (!item || !item.value) {
// was not able to acquire a lock
let err = new Error('Mailbox is missing');
err.imapResponse = 'TRYCREATE';
return rollback(err);
2017-04-10 22:12:47 +08:00
}
2017-03-27 04:58:05 +08:00
let mailbox = item.value;
// updated message object by setting mailbox specific values
message.mailbox = mailbox._id;
message.user = mailbox.user;
message.uid = mailbox.uidNext;
message.modseq = mailbox.modifyIndex + 1;
this.database.collection('messages').insertOne(message, err => {
if (err) {
return rollback(err);
}
let uidValidity = mailbox.uidValidity;
let uid = message.uid;
if (options.session && options.session.selected && options.session.selected.mailbox === mailbox.path) {
options.session.writeStream.write(options.session.formatResponse('EXISTS', message.uid));
}
this.notifier.addEntries(
mailbox,
false,
{
command: 'EXISTS',
uid: message.uid,
ignore: options.session && options.session.id,
message: message._id,
modseq: message.modseq
},
() => {
this.notifier.fire(mailbox.user, mailbox.path);
return cleanup(null, true, {
uidValidity,
uid,
2017-06-14 14:35:30 +08:00
id: message._id,
status: 'new'
});
}
);
});
});
});
});
}
);
});
}
2017-05-23 22:17:54 +08:00
checkExistingMessage(mailboxId, message, options, callback) {
// if a similar message already exists then update existing one
this.database.collection('messages').findOne({
mailbox: mailboxId,
hdate: message.hdate,
msgid: message.msgid
}, (err, existing) => {
if (err) {
return callback(err);
}
if (!existing) {
// nothing to do here, continue adding message
return callback();
}
if (options.skipExisting) {
// message already exists, just skip it
2017-06-06 15:56:58 +08:00
return callback(null, true, {
uid: existing.uid,
2017-06-14 14:35:30 +08:00
id: existing._id,
status: 'skip'
2017-05-23 22:17:54 +08:00
});
}
// As duplicate message was found, update UID, MODSEQ and FLAGS
// acquire new UID+MODSEQ
this.database.collection('mailboxes').findOneAndUpdate({
_id: mailboxId
}, {
$inc: {
// allocate bot UID and MODSEQ values so when journal is later sorted by
// modseq then UIDs are always in ascending order
uidNext: 1,
modifyIndex: 1
}
}, {
returnOriginal: true
}, (err, item) => {
2017-05-23 22:17:54 +08:00
if (err) {
return callback(err);
}
if (!item || !item.value) {
// was not able to acquire a lock
let err = new Error('Mailbox is missing');
err.imapResponse = 'TRYCREATE';
return callback(err);
2017-05-23 22:17:54 +08:00
}
let mailbox = item.value;
let uid = mailbox.uidNext;
let modseq = mailbox.modifyIndex + 1;
this.database.collection('messages').findOneAndUpdate({
_id: existing._id
2017-05-23 22:17:54 +08:00
}, {
$set: {
uid,
modseq,
flags: message.flags
2017-05-23 22:17:54 +08:00
}
}, {
returnOriginal: false
2017-05-23 22:17:54 +08:00
}, (err, item) => {
if (err) {
return callback(err);
2017-05-23 22:17:54 +08:00
}
if (!item || !item.value) {
// message was not found for whatever reason
return callback();
2017-05-23 22:17:54 +08:00
}
let updated = item.value;
2017-05-23 22:17:54 +08:00
if (options.session && options.session.selected && options.session.selected.mailbox === mailbox.path) {
options.session.writeStream.write(options.session.formatResponse('EXPUNGE', existing.uid));
}
2017-05-23 22:17:54 +08:00
if (options.session && options.session.selected && options.session.selected.mailbox === mailbox.path) {
options.session.writeStream.write(options.session.formatResponse('EXISTS', updated.uid));
}
this.notifier.addEntries(
mailbox,
false,
{
command: 'EXPUNGE',
2017-05-23 22:17:54 +08:00
ignore: options.session && options.session.id,
uid: existing.uid,
message: existing._id
},
() => {
this.notifier.addEntries(
mailbox,
false,
{
command: 'EXISTS',
uid: updated.uid,
ignore: options.session && options.session.id,
message: updated._id,
modseq: updated.modseq
},
() => {
this.notifier.fire(mailbox.user, mailbox.path);
return callback(null, true, {
uidValidity: mailbox.uidValidity,
uid,
2017-06-14 14:35:30 +08:00
id: existing._id,
status: 'update'
});
}
);
}
);
2017-05-23 22:17:54 +08:00
});
});
});
}
updateQuota(mailbox, inc, callback) {
inc = inc || {};
this.database.collection('users').findOneAndUpdate(
{
_id: mailbox.user
},
{
$inc: {
storageUsed: Number(inc.storageUsed) || 0
}
},
callback
);
}
2017-04-10 22:12:47 +08:00
del(options, callback) {
let getMessage = next => {
if (options.message) {
return next(null, options.message);
}
this.database.collection('messages').findOne(
options.query,
{
fields: {
mailbox: true,
uid: true,
size: true,
map: true,
magic: true
}
},
next
);
};
getMessage((err, message) => {
if (err) {
return callback(err);
}
if (!message) {
return callback(new Error('Message does not exist'));
}
this.getMailbox(
{
mailbox: options.mailbox || message.mailbox
},
(err, mailbox) => {
if (err) {
return callback(err);
}
this.database.collection('messages').deleteOne({
_id: message._id
}, err => {
if (err) {
return callback(err);
}
2017-05-15 21:09:08 +08:00
this.updateQuota(
mailbox,
{
storageUsed: -message.size
},
() => {
let updateAttachments = next => {
let attachments = Object.keys(message.map || {}).map(key => message.map[key]);
if (!attachments.length) {
return next();
}
2017-04-10 22:12:47 +08:00
// remove link to message from attachments (if any exist)
this.database.collection('attachments.files').updateMany({
_id: {
$in: attachments
}
}, {
$inc: {
'metadata.c': -1,
'metadata.m': -message.magic
}
}, {
multi: true,
w: 1
}, err => {
if (err) {
// ignore as we don't really care if we have orphans or not
}
next();
});
};
2017-05-15 21:09:08 +08:00
updateAttachments(() => {
if (options.session && options.session.selected && options.session.selected.mailbox === mailbox.path) {
options.session.writeStream.write(options.session.formatResponse('EXPUNGE', message.uid));
}
2017-05-15 21:09:08 +08:00
this.notifier.addEntries(
mailbox,
false,
{
command: 'EXPUNGE',
ignore: options.session && options.session.id,
uid: message.uid,
message: message._id
},
() => {
this.notifier.fire(mailbox.user, mailbox.path);
if (options.skipAttachments) {
return callback(null, true);
}
return callback(null, true);
}
);
});
2017-04-10 22:12:47 +08:00
}
);
});
}
);
});
}
2017-04-09 17:33:10 +08:00
move(options, callback) {
this.getMailbox(options.source, (err, mailbox) => {
if (err) {
return callback(err);
}
this.getMailbox(options.destination, (err, target) => {
if (err) {
return callback(err);
}
this.database.collection('mailboxes').findOneAndUpdate({
_id: mailbox._id
}, {
$inc: {
// increase the mailbox modification index
// to indicate that something happened
modifyIndex: 1
}
}, {
uidNext: true
}, () => {
let cursor = this.database
.collection('messages')
.find({
mailbox: mailbox._id,
uid: {
$in: options.messages || []
}
})
.project({
uid: 1
})
.sort([['uid', 1]]);
2017-04-09 17:33:10 +08:00
let sourceUid = [];
let destinationUid = [];
let removeEntries = [];
let existsEntries = [];
let done = err => {
let next = () => {
if (err) {
return callback(err);
}
return callback(null, true, {
uidValidity: target.uidValidity,
sourceUid,
2017-06-14 14:35:30 +08:00
destinationUid,
status: 'moved'
2017-04-09 17:33:10 +08:00
});
};
if (existsEntries.length) {
// mark messages as deleted from old mailbox
return this.notifier.addEntries(mailbox, false, removeEntries, () => {
// mark messages as added to new mailbox
this.notifier.addEntries(target, false, existsEntries, () => {
this.notifier.fire(mailbox.user, mailbox.path);
this.notifier.fire(target.user, target.path);
next();
});
});
}
next();
};
let processNext = () => {
cursor.next((err, message) => {
if (err) {
return done(err);
}
if (!message) {
return cursor.close(done);
}
sourceUid.unshift(message.uid);
this.database.collection('mailboxes').findOneAndUpdate({
_id: target._id
}, {
$inc: {
uidNext: 1
}
}, {
uidNext: true
}, (err, item) => {
if (err) {
2017-05-25 15:45:16 +08:00
return cursor.close(() => done(err));
2017-04-09 17:33:10 +08:00
}
if (!item || !item.value) {
2017-05-25 15:45:16 +08:00
return cursor.close(() => done(new Error('Mailbox disappeared')));
2017-04-09 17:33:10 +08:00
}
let uidNext = item.value.uidNext;
destinationUid.unshift(uidNext);
let updateOptions = {
$set: {
mailbox: target._id,
// new mailbox means new UID
uid: uidNext,
// this will be changed later by the notification system
modseq: 0,
// retention settings
exp: !!target.retention,
rdate: Date.now() + (target.retention || 0)
2017-04-09 17:33:10 +08:00
}
};
if (options.markAsSeen) {
updateOptions.$set.seen = true;
updateOptions.$addToSet = {
flags: '\\Seen'
};
}
// update message, change mailbox from old to new one
this.database.collection('messages').findOneAndUpdate({
_id: message._id
}, updateOptions, err => {
if (err) {
2017-05-25 15:45:16 +08:00
return cursor.close(() => done(err));
2017-04-09 17:33:10 +08:00
}
if (options.session) {
options.session.writeStream.write(options.session.formatResponse('EXPUNGE', message.uid));
}
removeEntries.push({
command: 'EXPUNGE',
ignore: options.session && options.session.id,
uid: message.uid
});
existsEntries.push({
command: 'EXISTS',
uid: uidNext,
message: message._id
});
if (existsEntries.length >= BULK_BATCH_SIZE) {
// mark messages as deleted from old mailbox
return this.notifier.addEntries(mailbox, false, removeEntries, () => {
// mark messages as added to new mailbox
this.notifier.addEntries(target, false, existsEntries, () => {
removeEntries = [];
existsEntries = [];
this.notifier.fire(mailbox.user, mailbox.path);
this.notifier.fire(target.user, target.path);
processNext();
});
});
}
processNext();
});
});
});
};
processNext();
});
});
});
}
2017-04-13 16:35:39 +08:00
generateIndexedHeaders(headersArray) {
return (headersArray || [])
.map(line => {
line = Buffer.from(line, 'binary').toString();
2017-04-13 16:35:39 +08:00
let key = line.substr(0, line.indexOf(':')).trim().toLowerCase();
2017-05-23 22:17:54 +08:00
if (!INDEXED_HEADERS.includes(key)) {
// do not index this header
return false;
}
2017-05-23 22:17:54 +08:00
let value = line.substr(line.indexOf(':') + 1).trim().replace(/\s*\r?\n\s*/g, ' ');
2017-04-13 16:35:39 +08:00
try {
value = libmime.decodeWords(value);
} catch (E) {
// ignore
}
2017-04-13 16:35:39 +08:00
// store indexed value as lowercase for easier SEARCHing
value = value.toLowerCase();
2017-04-13 16:35:39 +08:00
// trim long values as mongodb indexed fields can not be too long
if (Buffer.byteLength(key, 'utf-8') >= 255) {
key = Buffer.from(key).slice(0, 255).toString();
key = key.substr(0, key.length - 4);
}
2017-04-13 16:35:39 +08:00
if (Buffer.byteLength(value, 'utf-8') >= 880) {
// value exceeds MongoDB max indexed value length
value = Buffer.from(value).slice(0, 880).toString();
// remove last 4 chars to be sure we do not have any incomplete unicode sequences
value = value.substr(0, value.length - 4);
}
2017-04-13 16:35:39 +08:00
return {
key,
value
};
})
.filter(line => line);
2017-04-13 16:35:39 +08:00
}
prepareMessage(options) {
let id = new ObjectID();
let mimeTree = this.indexer.parseMimeTree(options.raw);
let size = this.indexer.getSize(mimeTree);
let bodystructure = this.indexer.getBodyStructure(mimeTree);
let envelope = this.indexer.getEnvelope(mimeTree);
2017-06-08 21:04:34 +08:00
let idate = (options.date && parseDate(options.date)) || new Date();
let hdate = (mimeTree.parsedHeader.date && parseDate(mimeTree.parsedHeader.date, idate)) || false;
2017-04-13 16:35:39 +08:00
let flags = [].concat(options.flags || []);
if (!hdate || hdate.toString() === 'Invalid Date') {
hdate = idate;
}
let msgid = envelope[9] || '<' + uuidV1() + '@wildduck.email>';
2017-04-13 16:35:39 +08:00
let headers = this.generateIndexedHeaders(mimeTree.header);
return {
id,
mimeTree,
size,
bodystructure,
envelope,
idate,
hdate,
flags,
msgid,
headers
};
}
}
module.exports = MessageHandler;