wildduck/lib/message-handler.js
2021-09-04 10:30:50 +03:00

1802 lines
74 KiB
JavaScript

'use strict';
const crypto = require('crypto');
const { v1: uuidV1 } = require('uuid');
const ObjectId = require('mongodb').ObjectId;
const Indexer = require('../imap-core/lib/indexer/indexer');
const ImapNotifier = require('./imap-notifier');
const AttachmentStorage = require('./attachment-storage');
const AuditHandler = require('./audit-handler');
const libmime = require('libmime');
const counters = require('./counters');
const consts = require('./consts');
const tools = require('./tools');
const openpgp = require('openpgp');
const parseDate = require('../imap-core/lib/parse-date');
const log = require('npmlog');
const packageData = require('../package.json');
const { SettingsHandler } = require('./settings-handler');
// index only the following headers for SEARCH
const INDEXED_HEADERS = ['to', 'cc', 'subject', 'from', 'sender', 'reply-to', 'message-id', 'thread-index', 'list-id', 'delivered-to'];
openpgp.config.commentstring = 'Plaintext message encrypted by WildDuck Mail Server';
openpgp.config.versionString = `WildDuck v${packageData.version}`;
class MessageHandler {
constructor(options) {
this.database = options.database;
this.redis = options.redis;
this.loggelf = options.loggelf || (() => false);
this.attachmentStorage =
options.attachmentStorage ||
new AttachmentStorage({
gridfs: options.gridfs || options.database,
options: options.attachments,
redis: this.redis
});
this.indexer = new Indexer({
attachmentStorage: this.attachmentStorage,
loggelf: message => this.loggelf(message)
});
this.notifier = new ImapNotifier({
database: options.database,
redis: this.redis,
pushOnly: true
});
this.users = options.users || options.database;
this.counters = counters(this.redis);
this.auditHandler = new AuditHandler({
database: this.database,
users: this.users,
gridfs: options.gridfs || this.database,
bucket: 'audit',
loggelf: message => this.loggelf(message)
});
this.settingsHandler = new SettingsHandler({ db: this.database });
}
async getMailboxAsync(options) {
let query = options.query;
if (!query) {
query = {};
if (options.mailbox) {
if (tools.isId(options.mailbox._id)) {
return options.mailbox;
}
if (tools.isId(options.mailbox)) {
query._id = new ObjectId(options.mailbox);
} else {
throw new Error('Invalid mailbox ID');
}
if (options.user) {
query.user = options.user;
}
} else {
query.user = options.user;
if (options.specialUse) {
query.specialUse = options.specialUse;
} else if (options.path) {
query.path = options.path;
} else {
let err = new Error('Mailbox is missing');
err.imapResponse = 'TRYCREATE';
throw err;
}
}
}
let mailboxData = await this.database.collection('mailboxes').findOne(query);
if (!mailboxData) {
if (options.path !== 'INBOX' && options.inboxDefault) {
// fall back to INBOX if requested mailbox is missing
mailboxData = await this.database.collection('mailboxes').findOne({
user: options.user,
path: 'INBOX'
});
if (!mailboxData) {
let err = new Error('Mailbox is missing');
err.imapResponse = 'TRYCREATE';
throw err;
}
return mailboxData;
}
let err = new Error('Mailbox is missing');
err.imapResponse = 'TRYCREATE';
throw err;
}
return mailboxData;
}
getMailbox(options, callback) {
this.getMailboxAsync(options)
.then(mailboxData => callback(null, mailboxData))
.catch(err => callback(err));
}
/**
* Adds or updates messages in the address register that is needed for typeahead address search
* @param {ObjectId} user
* @param {Object[]} addresses
* @param {string} [addresses.name] Name from the address
* @param {string} [addresses.address] Email address
*/
async updateAddressRegister(user, addresses) {
if (!addresses || !addresses.length) {
return;
}
try {
for (let addr of addresses) {
if (!addr.address) {
continue;
}
addr = tools.normalizeAddress(addr, true);
addr.addrview = tools.uview(addr.address);
let updates = { updated: new Date() };
if (addr.name) {
updates.name = addr.name;
try {
// try to decode
updates.name = libmime.decodeWords(updates.name);
} catch (E) {
// ignore
}
}
await this.database.collection('addressregister').findOneAndUpdate(
{
user,
addrview: addr.addrview
},
{
$set: updates,
$setOnInsert: {
user,
address: addr.address,
addrview: addr.addrview
}
},
{ upsert: true, projection: { _id: true } }
);
}
} catch (err) {
// can ignore, not an important opration
}
}
// Monster method for inserting new messages to a mailbox
// TODO: Refactor into smaller pieces
add(options, callback) {
if (!options.prepared && options.raw && options.raw.length > consts.MAX_ALLOWED_MESSAGE_SIZE) {
return setImmediate(() => callback(new Error('Message size ' + options.raw.length + ' bytes is too large')));
}
this.prepareMessage(options, (err, prepared) => {
if (err) {
return callback(err);
}
let id = prepared.id;
let mimeTree = prepared.mimeTree;
let size = prepared.size;
let bodystructure = prepared.bodystructure;
let envelope = prepared.envelope;
let idate = prepared.idate;
let hdate = prepared.hdate;
let msgid = prepared.msgid;
let subject = prepared.subject;
let headers = prepared.headers;
let flags = Array.isArray(options.flags) ? options.flags : [].concat(options.flags || []);
let maildata = options.maildata || this.indexer.getMaildata(mimeTree);
this.getMailbox(options, (err, mailboxData) => {
if (err) {
return callback(err);
}
let cleanup = (...args) => {
if (!args[0]) {
return callback(...args);
}
let attachmentIds = Object.keys(mimeTree.attachmentMap || {}).map(key => mimeTree.attachmentMap[key]);
if (!attachmentIds.length) {
return callback(...args);
}
this.attachmentStorage.deleteMany(attachmentIds, maildata.magic, () => callback(...args));
};
this.indexer.storeNodeBodies(maildata, mimeTree, err => {
if (err) {
return cleanup(err);
}
// prepare message object
let messageData = {
_id: id,
// should be kept when COPY'ing or MOVE'ing
root: id,
v: consts.SCHEMA_VERSION,
// if true then expires after rdate + retention
exp: !!mailboxData.retention,
rdate: Date.now() + (mailboxData.retention || 0),
// make sure the field exists. it is set to true when user is deleted
userDeleted: false,
idate,
hdate,
flags,
size,
// some custom metadata about the delivery
meta: options.meta || {},
// list filter IDs that matched this message
filters: Array.isArray(options.filters) ? options.filters : [].concat(options.filters || []),
headers,
mimeTree,
envelope,
bodystructure,
msgid,
// use boolean for more commonly used (and searched for) flags
unseen: !flags.includes('\\Seen'),
flagged: flags.includes('\\Flagged'),
undeleted: !flags.includes('\\Deleted'),
draft: flags.includes('\\Draft'),
magic: maildata.magic,
subject,
// do not archive deleted messages that have been copied
copied: false
};
if (options.verificationResults) {
messageData.verificationResults = options.verificationResults;
}
if (options.outbound) {
messageData.outbound = [].concat(options.outbound || []);
}
if (options.forwardTargets) {
messageData.forwardTargets = [].concat(options.forwardTargets || []);
}
if (maildata.attachments && maildata.attachments.length) {
messageData.attachments = maildata.attachments;
messageData.ha = maildata.attachments.some(a => !a.related);
} else {
messageData.ha = false;
}
if (maildata.text) {
messageData.text = maildata.text.replace(/\r\n/g, '\n').trim();
// text is indexed with a fulltext index, so only store the beginning of it
if (messageData.text.length > consts.MAX_PLAINTEXT_INDEXED) {
messageData.textFooter = messageData.text.substr(consts.MAX_PLAINTEXT_INDEXED);
messageData.text = messageData.text.substr(0, consts.MAX_PLAINTEXT_INDEXED);
// truncate remaining text if total length exceeds maximum allowed
if (
consts.MAX_PLAINTEXT_CONTENT > consts.MAX_PLAINTEXT_INDEXED &&
messageData.textFooter.length > consts.MAX_PLAINTEXT_CONTENT - consts.MAX_PLAINTEXT_INDEXED
) {
messageData.textFooter = messageData.textFooter.substr(0, consts.MAX_PLAINTEXT_CONTENT - consts.MAX_PLAINTEXT_INDEXED);
}
}
messageData.text =
messageData.text.length <= consts.MAX_PLAINTEXT_CONTENT
? messageData.text
: messageData.text.substr(0, consts.MAX_PLAINTEXT_CONTENT);
messageData.intro = messageData.text
// assume we get the intro text from first 2 kB
.substr(0, 2 * 1024)
// remove markdown urls
.replace(/\[[^\]]*\]/g, ' ')
// remove quoted parts
// "> quote from previous message"
.replace(/^>.*$/gm, '')
// remove lines with repetetive chars
// "---------------------"
.replace(/^\s*(.)\1+\s*$/gm, '')
// join lines
.replace(/\s+/g, ' ')
.trim();
if (messageData.intro.length > 128) {
let intro = messageData.intro.substr(0, 128);
let lastSp = intro.lastIndexOf(' ');
if (lastSp > 0) {
intro = intro.substr(0, lastSp);
}
messageData.intro = intro + '…';
}
}
if (maildata.html && maildata.html.length) {
let htmlSize = 0;
messageData.html = maildata.html
.map(html => {
if (htmlSize >= consts.MAX_HTML_CONTENT || !html) {
return '';
}
if (htmlSize + Buffer.byteLength(html) <= consts.MAX_HTML_CONTENT) {
htmlSize += Buffer.byteLength(html);
return html;
}
html = html.substr(0, htmlSize + Buffer.byteLength(html) - consts.MAX_HTML_CONTENT);
htmlSize += Buffer.byteLength(html);
return html;
})
.filter(html => html);
}
this.users.collection('users').findOneAndUpdate(
{
_id: mailboxData.user
},
{
$inc: {
storageUsed: size
}
},
{
returnDocument: 'after',
projection: {
storageUsed: true
}
},
(err, r) => {
if (err) {
return cleanup(err);
}
if (r && r.value) {
this.loggelf({
short_message: '[QUOTA] +',
_mail_action: 'quota',
_user: mailboxData.user,
_inc: size,
_storage_used: r.value.storageUsed,
_sess: options.session && options.session.id,
_mailbox: mailboxData._id
});
}
let rollback = err => {
this.users.collection('users').findOneAndUpdate(
{
_id: mailboxData.user
},
{
$inc: {
storageUsed: -size
}
},
{
returnDocument: 'after',
projection: {
storageUsed: true
}
},
(...args) => {
let r = args && args[1];
if (r && r.value) {
this.loggelf({
short_message: '[QUOTA] -',
_mail_action: 'quota',
_user: mailboxData.user,
_inc: -size,
_storage_used: r.value.storageUsed,
_sess: options.session && options.session.id,
_mailbox: mailboxData._id,
_rollback: 'yes',
_error: err.message,
_code: err.code
});
}
cleanup(err);
}
);
};
// acquire new UID+MODSEQ
this.database.collection('mailboxes').findOneAndUpdate(
{
_id: mailboxData._id
},
{
$inc: {
// allocate bot UID and MODSEQ values so when journal is later sorted by
// modseq then UIDs are always in ascending order
uidNext: 1,
modifyIndex: 1
}
},
{
// use original value to get correct UIDNext
returnDocument: 'before'
},
(err, item) => {
if (err) {
return rollback(err);
}
if (!item || !item.value) {
// was not able to acquire a lock
let err = new Error('Mailbox is missing');
err.imapResponse = 'TRYCREATE';
return rollback(err);
}
let mailboxData = item.value;
// updated message object by setting mailbox specific values
messageData.mailbox = mailboxData._id;
messageData.user = mailboxData.user;
messageData.uid = mailboxData.uidNext;
messageData.modseq = mailboxData.modifyIndex + 1;
if (!flags.includes('\\Deleted')) {
messageData.searchable = true;
}
if (mailboxData.specialUse === '\\Junk') {
messageData.junk = true;
}
this.getThreadId(mailboxData.user, subject, mimeTree, (err, thread) => {
if (err) {
return rollback(err);
}
messageData.thread = thread;
this.database.collection('messages').insertOne(messageData, { writeConcern: 'majority' }, (err, r) => {
if (err) {
return rollback(err);
}
if (!r || !r.acknowledged) {
let err = new Error('Failed to store message [1]');
err.responseCode = 500;
err.code = 'StoreError';
return rollback(err);
}
let logTime = messageData.meta.time || new Date();
if (typeof logTime === 'number') {
logTime = new Date(logTime);
}
let uidValidity = mailboxData.uidValidity;
let uid = messageData.uid;
if (
options.session &&
options.session.selected &&
options.session.selected.mailbox &&
options.session.selected.mailbox.toString() === mailboxData._id.toString()
) {
options.session.writeStream.write(options.session.formatResponse('EXISTS', messageData.uid));
}
let updateAddressRegister = next => {
let addresses = [];
if (messageData.junk || flags.includes('\\Draft')) {
// skip junk and draft messages
return next();
}
let parsed = messageData.mimeTree && messageData.mimeTree.parsedHeader;
if (parsed) {
let keyList = mailboxData.specialUse === '\\Sent' ? ['to', 'cc', 'bcc'] : ['from'];
for (let key of keyList) {
if (parsed[key] && parsed[key].length) {
for (let addr of parsed[key]) {
if (!addresses.some(a => a.address === addr.address)) {
addresses.push(addr);
}
}
}
}
}
if (!addresses.length) {
return next();
}
this.updateAddressRegister(mailboxData.user, addresses)
.then(() => next())
.catch(err => next(err));
};
updateAddressRegister(() => {
this.notifier.addEntries(
mailboxData,
{
command: 'EXISTS',
uid: messageData.uid,
ignore: options.session && options.session.id,
message: messageData._id,
modseq: messageData.modseq,
unseen: messageData.unseen,
idate: messageData.idate
},
() => {
this.notifier.fire(mailboxData.user);
let raw = options.rawchunks || options.raw;
let processAudits = async () => {
let audits = await this.database
.collection('audits')
.find({ user: mailboxData.user, expires: { $gt: new Date() } })
.toArray();
let now = new Date();
for (let auditData of audits) {
if ((auditData.start && auditData.start > now) || (auditData.end && auditData.end < now)) {
// audit not active
continue;
}
await this.auditHandler.store(auditData._id, raw, {
date: messageData.idate,
msgid: messageData.msgid,
header: messageData.mimeTree && messageData.mimeTree.parsedHeader,
ha: messageData.ha,
mailbox: mailboxData._id,
mailboxPath: mailboxData.path,
info: Object.assign({ queueId: messageData.outbound }, messageData.meta)
});
}
};
let next = () => {
cleanup(null, true, {
uidValidity,
uid,
id: messageData._id.toString(),
mailbox: mailboxData._id.toString(),
mailboxPath: mailboxData.path,
status: 'new'
});
};
// do not use more suitable .finally() as it is not supported in Node v8
return processAudits().then(next).catch(next);
}
);
});
});
});
}
);
}
);
});
});
});
}
async updateQuotaAsync(user, inc, options) {
inc = inc || {};
if (options.delayNotifications) {
// quota change is handled at some later time
return;
}
let r = await this.users.collection('users').findOneAndUpdate(
{
_id: user
},
{
$inc: {
storageUsed: Number(inc.storageUsed) || 0
}
},
{
returnDocument: 'after',
projection: {
storageUsed: true
}
}
);
if (r && r.value) {
this.loggelf({
short_message: '[QUOTA] ' + (Number(inc.storageUsed) || 0 < 0 ? '-' : '+'),
_mail_action: 'quota',
_user: user,
_inc: inc.storageUsed,
_storage_used: r.value.storageUsed,
_sess: options.session && options.session.id,
_mailbox: inc.mailbox
});
}
return r;
}
updateQuota(user, inc, options, callback) {
this.updateQuotaAsync(user, inc, options)
.then(res => callback(null, res))
.catch(err => callback(err));
}
async delAsync(options) {
let messageData = options.messageData;
let curtime = new Date();
let mailboxData;
try {
mailboxData = await this.getMailboxAsync(
options.mailbox || {
mailbox: messageData.mailbox
}
);
} catch (err) {
if (!err.imapResponse) {
throw err;
}
}
if (options.archive) {
let archiveTime = await this.settingsHandler.get('const:archive:time', {});
messageData.archived = curtime;
messageData.exp = true;
messageData.rdate = curtime.getTime() + archiveTime;
let r;
try {
r = await this.database.collection('archived').insertOne(messageData, { writeConcern: 'majority' });
} catch (err) {
// if code is 11000 then message is already archived, probably the same message from another mailbox
if (err.code !== 11000) {
throw err;
}
}
if (r && r.acknowledged) {
this.loggelf({
short_message: '[ARCHIVED]',
_mail_action: 'archived',
_user: messageData.user,
_mailbox: messageData.mailbox,
_uid: messageData.uid,
_stored_id: messageData._id,
_expires: messageData.rdate,
_sess: options.session && options.session.id,
_size: messageData.size
});
}
}
let r = await this.database.collection('messages').deleteOne(
{
_id: messageData._id,
mailbox: messageData.mailbox,
uid: messageData.uid
},
{ writeConcern: 'majority' }
);
if (!r || !r.deletedCount) {
// nothing was deleted!
return false;
}
try {
await this.updateQuotaAsync(
messageData.user,
{
storageUsed: -messageData.size,
mailbox: messageData.mailbox
},
options
);
} catch (err) {
log.error('messagedel', err);
}
if (!mailboxData) {
// deleted an orphan message
return true;
}
if (!options.archive) {
// archived messages still need the attachments
let attachmentIds = Object.keys(messageData.mimeTree.attachmentMap || {}).map(key => messageData.mimeTree.attachmentMap[key]);
if (attachmentIds.length) {
try {
await new Promise((resolve, reject) => {
this.attachmentStorage.deleteMany(attachmentIds, messageData.magic, err => {
if (err) {
return reject(err);
}
resolve();
});
});
} catch (err) {
log.error('attachdel', err);
}
}
}
if (
options.session &&
options.session.selected &&
options.session.selected.mailbox &&
options.session.selected.mailbox.toString() === mailboxData._id.toString()
) {
options.session.writeStream.write(options.session.formatResponse('EXPUNGE', messageData.uid));
}
try {
await new Promise((resolve, reject) => {
this.notifier.addEntries(
mailboxData,
{
command: 'EXPUNGE',
ignore: options.session && options.session.id,
uid: messageData.uid,
message: messageData._id,
unseen: messageData.unseen
},
err => {
if (err) {
return reject(err);
}
resolve();
}
);
});
} catch (err) {
log.error('notify', err);
}
if (!options.delayNotifications) {
this.notifier.fire(mailboxData.user);
}
return true;
}
del(options, callback) {
this.delAsync(options)
.then(res => callback(null, res))
.catch(err => callback(err));
}
move(options, callback) {
this.getMailbox(options.source, (err, mailboxData) => {
if (err) {
return callback(err);
}
this.getMailbox(options.destination, (err, targetData) => {
if (err) {
return callback(err);
}
this.database.collection('mailboxes').findOneAndUpdate(
{
_id: mailboxData._id
},
{
$inc: {
// increase the mailbox modification index
// to indicate that something happened
modifyIndex: 1
}
},
{
returnDocument: 'after',
projection: {
_id: true,
uidNext: true,
modifyIndex: true
}
},
(err, item) => {
if (err) {
return callback(err);
}
let newModseq = (item && item.value && item.value.modifyIndex) || 1;
let cursor = this.database
.collection('messages')
.find({
mailbox: mailboxData._id,
uid: options.messageQuery ? options.messageQuery : tools.checkRangeQuery(options.messages)
})
// ordering is needed for IMAP UIDPLUS results
.sort({ uid: 1 });
let sourceUid = [];
let destinationUid = [];
let removeEntries = [];
let existsEntries = [];
let done = err => {
let next = () => {
if (err) {
return callback(err);
}
return callback(null, true, {
uidValidity: targetData.uidValidity,
sourceUid,
destinationUid,
mailbox: mailboxData._id,
target: targetData._id,
status: 'moved'
});
};
if (sourceUid.length && options.showExpunged) {
options.session.writeStream.write({
tag: '*',
command: String(options.session.selected.uidList.length),
attributes: [
{
type: 'atom',
value: 'EXISTS'
}
]
});
}
if (existsEntries.length) {
// mark messages as deleted from old mailbox
return this.notifier.addEntries(mailboxData, removeEntries, () => {
// mark messages as added to new mailbox
this.notifier.addEntries(targetData, existsEntries, () => {
this.notifier.fire(mailboxData.user);
next();
});
});
}
next();
};
let processNext = () => {
cursor.next((err, message) => {
if (err) {
return done(err);
}
if (!message) {
return cursor.close(done);
}
let messageId = message._id;
let messageUid = message.uid;
if (options.returnIds) {
sourceUid.push(message._id);
} else {
sourceUid.push(messageUid);
}
this.database.collection('mailboxes').findOneAndUpdate(
{
_id: targetData._id
},
{
$inc: {
uidNext: 1
}
},
{
projection: {
uidNext: true,
modifyIndex: true
},
returnDocument: 'before'
},
(err, item) => {
if (err) {
return cursor.close(() => done(err));
}
if (!item || !item.value) {
return cursor.close(() => done(new Error('Mailbox disappeared')));
}
message._id = new ObjectId();
let uidNext = item.value.uidNext;
let modifyIndex = item.value.modifyIndex;
if (options.returnIds) {
destinationUid.push(message._id);
} else {
destinationUid.push(uidNext);
}
// set new mailbox
message.mailbox = targetData._id;
// new mailbox means new UID
message.uid = uidNext;
// retention settings
message.exp = !!targetData.retention;
message.rdate = Date.now() + (targetData.retention || 0);
message.modseq = modifyIndex; // reset message modseq to whatever it is for the mailbox right now
let unseen = message.unseen;
if (!message.flags.includes('\\Deleted')) {
message.searchable = true;
} else {
delete message.searchable;
}
let junk = false;
if (targetData.specialUse === '\\Junk' && !message.junk) {
message.junk = true;
junk = 1;
} else if (targetData.specialUse !== '\\Trash' && message.junk) {
delete message.junk;
junk = -1;
}
Object.keys(options.updates || []).forEach(key => {
switch (key) {
case 'seen':
case 'deleted':
{
let fname = '\\' + key.charAt(0).toUpperCase() + key.substr(1);
if (!options.updates[key] && !message.flags.includes(fname)) {
// add missing flag
message.flags.push(fname);
} else if (options.updates[key] && message.flags.includes(fname)) {
// remove non-needed flag
let flags = new Set(message.flags);
flags.delete(fname);
message.flags = Array.from(flags);
}
message['un' + key] = options.updates[key];
}
break;
case 'flagged':
case 'draft':
{
let fname = '\\' + key.charAt(0).toUpperCase() + key.substr(1);
if (options.updates[key] && !message.flags.includes(fname)) {
// add missing flag
message.flags.push(fname);
} else if (!options.updates[key] && message.flags.includes(fname)) {
// remove non-needed flag
let flags = new Set(message.flags);
flags.delete(fname);
message.flags = Array.from(flags);
}
message[key] = options.updates[key];
}
break;
case 'expires':
{
if (options.updates.expires) {
message.exp = true;
message.rdate = options.updates.expires.getTime();
} else {
message.exp = false;
}
}
break;
case 'metaData':
message.meta = message.meta || {};
message.meta.custom = options.updates.metaData;
break;
case 'outbound':
message.outbound = [].concat(message.outbound || []).concat(options.updates.outbound || []);
break;
}
});
if (options.markAsSeen) {
message.unseen = false;
if (!message.flags.includes('\\Seen')) {
message.flags.push('\\Seen');
}
}
this.database.collection('messages').insertOne(message, { writeConcern: 'majority' }, (err, r) => {
if (err) {
return cursor.close(() => done(err));
}
if (!r || !r.acknowledged) {
let err = new Error('Failed to store message [2]');
err.responseCode = 500;
err.code = 'StoreError';
return cursor.close(() => done(err));
}
let insertId = r.insertedId;
// delete old message
this.database.collection('messages').deleteOne(
{
_id: messageId,
mailbox: mailboxData._id,
uid: messageUid
},
{ writeConcern: 'majority' },
(err, r) => {
if (err) {
return cursor.close(() => done(err));
}
if (r && r.deletedCount) {
if (options.session) {
options.session.writeStream.write(options.session.formatResponse('EXPUNGE', sourceUid));
}
removeEntries.push({
command: 'EXPUNGE',
ignore: options.session && options.session.id,
uid: messageUid,
message: messageId,
unseen,
// modseq is needed to avoid updating mailbox entry
modseq: newModseq
});
if (options.showExpunged) {
options.session.writeStream.write(options.session.formatResponse('EXPUNGE', messageUid));
}
}
let entry = {
command: 'EXISTS',
uid: uidNext,
message: insertId,
unseen: message.unseen,
idate: message.idate
};
if (junk) {
entry.junk = junk;
}
existsEntries.push(entry);
if (existsEntries.length >= consts.BULK_BATCH_SIZE) {
// mark messages as deleted from old mailbox
return this.notifier.addEntries(mailboxData, removeEntries, () => {
// mark messages as added to new mailbox
this.notifier.addEntries(targetData, existsEntries, () => {
removeEntries = [];
existsEntries = [];
this.notifier.fire(mailboxData.user);
processNext();
});
});
}
processNext();
}
);
});
}
);
});
};
processNext();
}
);
});
});
}
// NB! does not update user quota
put(messageData, callback) {
let getMailbox = next => {
this.getMailbox({ mailbox: messageData.mailbox }, (err, mailboxData) => {
if (err && err.imapResponse !== 'TRYCREATE') {
return callback(err);
}
if (mailboxData) {
return next(null, mailboxData);
}
this.getMailbox(
{
query: {
user: messageData.user,
path: 'INBOX'
}
},
next
);
});
};
getMailbox((err, mailboxData) => {
if (err) {
return callback(err);
}
this.database.collection('mailboxes').findOneAndUpdate(
{
_id: mailboxData._id
},
{
$inc: {
uidNext: 1
}
},
{
uidNext: true
},
(err, item) => {
if (err) {
return callback(err);
}
if (!item || !item.value) {
return callback(new Error('Mailbox disappeared'));
}
let uidNext = item.value.uidNext;
// set new mailbox
messageData.mailbox = mailboxData._id;
// new mailbox means new UID
messageData.uid = uidNext;
// this will be changed later by the notification system
messageData.modseq = 0;
// retention settings
messageData.exp = !!mailboxData.retention;
messageData.rdate = Date.now() + (mailboxData.retention || 0);
if (!mailboxData.undeleted) {
delete messageData.searchable;
} else {
messageData.searchable = true;
}
let junk = false;
if (mailboxData.specialUse === '\\Junk' && !messageData.junk) {
messageData.junk = true;
junk = 1;
} else if (mailboxData.specialUse !== '\\Trash' && messageData.junk) {
delete messageData.junk;
junk = -1;
}
this.database.collection('messages').insertOne(messageData, { writeConcern: 'majority' }, (err, r) => {
if (err) {
if (err.code === 11000) {
// message already exists
return callback(null, false);
}
return callback(err);
}
if (!r || !r.acknowledged) {
let err = new Error('Failed to store message [3]');
err.responseCode = 500;
err.code = 'StoreError';
return callback(err);
}
let insertId = r.insertedId;
let entry = {
command: 'EXISTS',
uid: uidNext,
message: insertId,
unseen: messageData.unseen,
idate: messageData.idate
};
if (junk) {
entry.junk = junk;
}
// mark messages as added to new mailbox
this.notifier.addEntries(mailboxData, entry, () => {
this.notifier.fire(mailboxData.user);
return callback(null, {
mailbox: mailboxData._id,
message: insertId,
uid: uidNext
});
});
});
}
);
});
}
generateIndexedHeaders(headersArray) {
// allow configuring extra header keys that are indexed
return (headersArray || [])
.map(line => {
line = Buffer.from(line, 'binary').toString();
let key = line.substr(0, line.indexOf(':')).trim().toLowerCase();
if (!INDEXED_HEADERS.includes(key)) {
// do not index this header
return false;
}
let value = line
.substr(line.indexOf(':') + 1)
.trim()
.replace(/\s*\r?\n\s*/g, ' ');
try {
value = libmime.decodeWords(value);
} catch (E) {
// ignore
}
// store indexed value as lowercase for easier SEARCHing
value = value.toLowerCase();
switch (key) {
case 'list-id':
// only index the actual ID of the list
if (value.indexOf('<') >= 0) {
let m = value.match(/<([^>]+)/);
if (m && m[1] && m[1].trim()) {
value = m[1].trim();
}
}
break;
}
// trim long values as mongodb indexed fields can not be too long
if (Buffer.byteLength(key, 'utf-8') >= 255) {
key = Buffer.from(key).slice(0, 255).toString();
key = key.substr(0, key.length - 4);
}
if (Buffer.byteLength(value, 'utf-8') >= 880) {
// value exceeds MongoDB max indexed value length
value = Buffer.from(value).slice(0, 880).toString();
// remove last 4 chars to be sure we do not have any incomplete unicode sequences
value = value.substr(0, value.length - 4);
}
return {
key,
value
};
})
.filter(line => line);
}
prepareMessage(options, callback) {
if (options.prepared) {
return setImmediate(() => callback(null, options.prepared));
}
let id = new ObjectId();
let mimeTree = options.mimeTree || this.indexer.parseMimeTree(options.raw);
let size = this.indexer.getSize(mimeTree);
let bodystructure = this.indexer.getBodyStructure(mimeTree);
let envelope = this.indexer.getEnvelope(mimeTree);
let idate = (options.date && parseDate(options.date)) || new Date();
let hdate = (mimeTree.parsedHeader.date && parseDate([].concat(mimeTree.parsedHeader.date || []).pop() || '', idate)) || false;
let subject = ([].concat(mimeTree.parsedHeader.subject || []).pop() || '').trim();
try {
subject = libmime.decodeWords(subject);
} catch (E) {
// ignore
}
subject = this.normalizeSubject(subject, {
removePrefix: false
});
let flags = [].concat(options.flags || []);
if (!hdate || hdate.toString() === 'Invalid Date') {
hdate = idate;
}
let msgid = envelope[9] || '<' + uuidV1() + '@wildduck.email>';
let headers = this.generateIndexedHeaders(mimeTree.header);
let prepared = {
id,
mimeTree,
size,
bodystructure,
envelope,
idate,
hdate,
flags,
msgid,
headers,
subject
};
return setImmediate(() => callback(null, prepared));
}
// resolves or generates new thread id for a message
getThreadId(userId, subject, mimeTree, callback) {
let referenceIds = new Set(
[
[].concat(mimeTree.parsedHeader['message-id'] || []).pop() || '',
[].concat(mimeTree.parsedHeader['in-reply-to'] || []).pop() || '',
([].concat(mimeTree.parsedHeader['thread-index'] || []).pop() || '').substr(0, 22),
[].concat(mimeTree.parsedHeader.references || []).pop() || ''
]
.join(' ')
.split(/\s+/)
.map(id => id.replace(/[<>]/g, '').trim())
.filter(id => id)
.map(id => crypto.createHash('sha1').update(id).digest('base64').replace(/[=]+$/g, ''))
);
subject = this.normalizeSubject(subject, {
removePrefix: true
});
referenceIds = Array.from(referenceIds).slice(0, 10);
// most messages are not threaded, so an upsert call should be ok to make
this.database.collection('threads').findOneAndUpdate(
{
user: userId,
ids: { $in: referenceIds },
subject
},
{
$addToSet: {
ids: { $each: referenceIds }
},
$set: {
updated: new Date()
}
},
{
returnDocument: 'after'
},
(err, r) => {
if (err) {
return callback(err);
}
if (r.value) {
return callback(null, r.value._id);
}
// thread not found, create a new one
this.database.collection('threads').insertOne(
{
user: userId,
subject,
ids: referenceIds,
updated: new Date()
},
(err, r) => {
if (err) {
return callback(err);
}
return callback(null, r.insertedId);
}
);
}
);
}
normalizeSubject(subject, options) {
options = options || {};
subject = subject.replace(/\s+/g, ' ').trim();
if (options.removePrefix) {
let match = true;
while (match) {
match = false;
subject = subject
.replace(/^(re|fwd?)\s*:|\s*\(fwd\)\s*$/gi, () => {
match = true;
return '';
})
.trim();
}
}
return subject;
}
update(user, mailbox, messageQuery, changes, callback) {
let updates = { $set: {} };
let update = false;
let addFlags = [];
let removeFlags = [];
let notifyEntries = [];
Object.keys(changes || {}).forEach(key => {
switch (key) {
case 'seen':
updates.$set.unseen = !changes.seen;
if (changes.seen) {
addFlags.push('\\Seen');
} else {
removeFlags.push('\\Seen');
}
update = true;
break;
case 'deleted':
updates.$set.undeleted = !changes.deleted;
if (changes.deleted) {
addFlags.push('\\Deleted');
} else {
removeFlags.push('\\Deleted');
}
update = true;
break;
case 'flagged':
updates.$set.flagged = changes.flagged;
if (changes.flagged) {
addFlags.push('\\Flagged');
} else {
removeFlags.push('\\Flagged');
}
update = true;
break;
case 'draft':
updates.$set.flagged = changes.draft;
if (changes.draft) {
addFlags.push('\\Draft');
} else {
removeFlags.push('\\Draft');
}
update = true;
break;
case 'expires':
if (changes.expires) {
updates.$set.exp = true;
updates.$set.rdate = changes.expires.getTime();
} else {
updates.$set.exp = false;
}
update = true;
break;
case 'metaData':
updates.$set['meta.custom'] = changes.metaData;
update = true;
break;
}
});
if (!update) {
return callback(new Error('Nothing was changed'));
}
if (addFlags.length) {
if (!updates.$addToSet) {
updates.$addToSet = {};
}
updates.$addToSet.flags = { $each: addFlags };
}
if (removeFlags.length) {
if (!updates.$pull) {
updates.$pull = {};
}
updates.$pull.flags = { $in: removeFlags };
}
// acquire new MODSEQ
this.database.collection('mailboxes').findOneAndUpdate(
{
_id: mailbox,
user
},
{
$inc: {
// allocate new MODSEQ value
modifyIndex: 1
}
},
{
returnDocument: 'after'
},
(err, item) => {
if (err) {
return callback(err);
}
if (!item || !item.value) {
return callback(new Error('Mailbox is missing'));
}
let mailboxData = item.value;
updates.$set.modseq = mailboxData.modifyIndex;
let updatedCount = 0;
let cursor = this.database
.collection('messages')
.find({
mailbox: mailboxData._id,
uid: messageQuery
})
.project({
_id: true,
uid: true
});
let done = err => {
let next = () => {
if (err) {
return callback(err);
}
return callback(null, updatedCount);
};
if (notifyEntries.length) {
return this.notifier.addEntries(mailboxData, notifyEntries, () => {
notifyEntries = [];
this.notifier.fire(mailboxData.user);
next();
});
}
next();
};
let processNext = () => {
cursor.next((err, messageData) => {
if (err) {
return done(err);
}
if (!messageData) {
return cursor.close(done);
}
this.database.collection('messages').findOneAndUpdate(
{
_id: messageData._id,
// hash key
mailbox,
uid: messageData.uid
},
updates,
{
projection: {
_id: true,
uid: true,
flags: true
},
returnDocument: 'after'
},
(err, item) => {
if (err) {
return cursor.close(() => done(err));
}
if (!item || !item.value) {
return processNext();
}
let messageData = item.value;
updatedCount++;
notifyEntries.push({
command: 'FETCH',
uid: messageData.uid,
flags: messageData.flags,
message: messageData._id,
unseenChange: 'seen' in changes
});
if (notifyEntries.length >= consts.BULK_BATCH_SIZE) {
return this.notifier.addEntries(mailboxData, notifyEntries, () => {
notifyEntries = [];
this.notifier.fire(mailboxData.user);
processNext();
});
}
processNext();
}
);
});
};
processNext();
}
);
}
encryptMessage(pubKey, raw, callback) {
this.encryptMessageAsync(pubKey, raw)
.then(res => callback(null, res))
.catch(err => callback(err));
}
async encryptMessageAsync(pubKeyArmored, raw) {
if (!pubKeyArmored) {
return false;
}
if (raw && Array.isArray(raw.chunks) && raw.chunklen) {
raw = Buffer.concat(raw.chunks, raw.chunklen);
}
let lastBytes = [];
let headerEnd = raw.length;
let headerLength = 0;
// split the message into header and body
for (let i = 0, len = raw.length; i < len; i++) {
lastBytes.unshift(raw[i]);
if (lastBytes.length > 10) {
lastBytes.length = 4;
}
if (lastBytes.length < 2) {
continue;
}
let pos = 0;
if (lastBytes[pos] !== 0x0a) {
continue;
}
pos++;
if (lastBytes[pos] === 0x0d) {
pos++;
}
if (lastBytes[pos] !== 0x0a) {
continue;
}
pos++;
if (lastBytes[pos] === 0x0d) {
pos++;
}
// we have a match!'
headerEnd = i + 1 - pos;
headerLength = pos;
break;
}
let header = raw.slice(0, headerEnd);
let breaker = headerLength ? raw.slice(headerEnd, headerEnd + headerLength) : Buffer.alloc(0);
let body = headerEnd + headerLength < raw.length ? raw.slice(headerEnd + headerLength) : Buffer.alloc(0);
// modify headers
let headers = [];
let bodyHeaders = [];
let lastHeader = false;
let boundary = 'nm_' + crypto.randomBytes(14).toString('hex');
let headerLines = header.toString('binary').split('\r\n');
// use for, so we could escape from it if needed
for (let i = 0, len = headerLines.length; i < len; i++) {
let line = headerLines[i];
if (!i || !lastHeader || !/^\s/.test(line)) {
lastHeader = [line];
if (/^content-type:/i.test(line)) {
let parts = line.split(':');
let value = parts.slice(1).join(':');
if (value.split(';').shift().trim().toLowerCase() === 'multipart/encrypted') {
// message is already encrypted, do nothing
return false;
}
bodyHeaders.push(lastHeader);
} else if (/^content-transfer-encoding:/i.test(line)) {
bodyHeaders.push(lastHeader);
} else {
headers.push(lastHeader);
}
} else {
lastHeader.push(line);
}
}
headers.push(['Content-Type: multipart/encrypted; protocol="application/pgp-encrypted";'], [' boundary="' + boundary + '"']);
headers.push(['Content-Description: OpenPGP encrypted message']);
headers.push(['Content-Transfer-Encoding: 7bit']);
headers = Buffer.from(headers.map(line => line.join('\r\n')).join('\r\n'), 'binary');
bodyHeaders = Buffer.from(bodyHeaders.map(line => line.join('\r\n')).join('\r\n'), 'binary');
let pubKey;
try {
pubKey = await openpgp.readKey({ armoredKey: tools.prepareArmoredPubKey(pubKeyArmored), config: { tolerant: true } });
} catch (err) {
return false;
}
if (!pubKey) {
return false;
}
let ciphertext;
try {
ciphertext = await openpgp.encrypt({
message: await openpgp.createMessage({ binary: Buffer.concat([Buffer.from(bodyHeaders + '\r\n\r\n'), body]) }),
encryptionKeys: pubKey,
format: 'armored'
});
} catch (err) {
return false;
}
let text =
'This is an OpenPGP/MIME encrypted message\r\n\r\n' +
'--' +
boundary +
'\r\n' +
'Content-Type: application/pgp-encrypted\r\n' +
'Content-Transfer-Encoding: 7bit\r\n' +
'\r\n' +
'Version: 1\r\n' +
'\r\n' +
'--' +
boundary +
'\r\n' +
'Content-Type: application/octet-stream; name=encrypted.asc\r\n' +
'Content-Disposition: inline; filename=encrypted.asc\r\n' +
'Content-Transfer-Encoding: 7bit\r\n' +
'\r\n' +
ciphertext +
'\r\n--' +
boundary +
'--\r\n';
return Buffer.concat([headers, breaker, Buffer.from(text)]);
}
}
module.exports = MessageHandler;