Started with QUOTA handling

This commit is contained in:
Andris Reinman 2017-03-26 23:58:05 +03:00
parent 31dd36b78b
commit 845ea959be
7 changed files with 303 additions and 144 deletions

View file

@ -67,9 +67,7 @@ Actual update data (information about new and deleted messages, flag updates and
1. Add interoperability with current servers, for example by fetching authentication data from MySQL 1. Add interoperability with current servers, for example by fetching authentication data from MySQL
2. Maybe allow some kind of message manipulation through plugins? This would allow to turn Wild Duck for example into an encrypted mail server mail data would be encrypted using users public key before storing it to DB and decrypted with users private key whenever the user logs in and FETCHes or SEARCHes messages. Private key would be protected by users password. For the user the encryption layer would be invisible while guaranteeing that if the user is currently not logged in then there would be no way to read the messages as the private key is locked. 2. Maybe allow some kind of message manipulation through plugins? This would allow to turn Wild Duck for example into an encrypted mail server mail data would be encrypted using users public key before storing it to DB and decrypted with users private key whenever the user logs in and FETCHes or SEARCHes messages. Private key would be protected by users password. For the user the encryption layer would be invisible while guaranteeing that if the user is currently not logged in then there would be no way to read the messages as the private key is locked.
3. Add quota handling. Every time a user gets a new message added to storage, the quota counter should increase. If only a single quota root would be used per account then implementing rfc2087 should be fairly easy. What is not so easy is keeping count on copied and deleted messages (there's a great technique for this described in the [mail.ru blog](https://team.mail.ru/efficient-storage-how-we-went-down-from-50-pb-to-32-pb/)). 3. Add proper quota handling. Every time a user gets a new message added to storage, the quota counter should increase. If only a single quota root would be used per account then implementing rfc2087 should be fairly easy. Currently Wild Duck keeps a count on storage used by an user but this is not exposed to the client.
The problem with quota counters is that the actions (_store message + increment counter for mailbox_ or _delete message + decrement counter for mailbox_) are not transactional, so if something fails, the counter might end up in an invalid state. A possible fix would be to use fake transactions - set up a transaction with mailbox and counter data by storing a transaction entry, then process required actions and finally remove the transaction entry. If something fails and transaction is not completed, then the mailbox would be marked for reindexing which would mean that the mailbox quota is entirely re-calculated and quota counters are reset.
## Usage ## Usage
@ -124,6 +122,7 @@ Arguments
- **username** is the username of the user. This is not an email address but authentication username, use only letters and numbers - **username** is the username of the user. This is not an email address but authentication username, use only letters and numbers
- **password** is the password for the user - **password** is the password for the user
- **storage** (optional) is the maximum storage in bytes allowed for this user
**Example** **Example**

15
api.js
View file

@ -23,7 +23,8 @@ server.use(restify.bodyParser({
server.post('/user/create', (req, res, next) => { server.post('/user/create', (req, res, next) => {
const schema = Joi.object().keys({ const schema = Joi.object().keys({
username: Joi.string().alphanum().lowercase().min(3).max(30).required(), username: Joi.string().alphanum().lowercase().min(3).max(30).required(),
password: Joi.string().min(3).max(100).required() password: Joi.string().min(3).max(100).required(),
storage: Joi.number().default(0)
}); });
const result = Joi.validate({ const result = Joi.validate({
@ -44,6 +45,7 @@ server.post('/user/create', (req, res, next) => {
let username = result.value.username; let username = result.value.username;
let password = result.value.password; let password = result.value.password;
let storage = result.value.storage;
database.collection('users').findOne({ database.collection('users').findOne({
username username
@ -69,6 +71,9 @@ server.post('/user/create', (req, res, next) => {
username, username,
password: hash, password: hash,
address: false, address: false,
storageUsed: 0,
messages: 0,
storage,
created: new Date() created: new Date()
}, (err, result) => { }, (err, result) => {
if (err) { if (err) {
@ -89,6 +94,8 @@ server.post('/user/create', (req, res, next) => {
uidValidity, uidValidity,
uidNext: 1, uidNext: 1,
modifyIndex: 0, modifyIndex: 0,
storageUsed: 0,
messages: 0,
subscribed: true subscribed: true
}, { }, {
user, user,
@ -97,6 +104,8 @@ server.post('/user/create', (req, res, next) => {
uidValidity, uidValidity,
uidNext: 1, uidNext: 1,
modifyIndex: 0, modifyIndex: 0,
storageUsed: 0,
messages: 0,
subscribed: true subscribed: true
}, { }, {
user, user,
@ -105,6 +114,8 @@ server.post('/user/create', (req, res, next) => {
uidValidity, uidValidity,
uidNext: 1, uidNext: 1,
modifyIndex: 0, modifyIndex: 0,
storageUsed: 0,
messages: 0,
subscribed: true subscribed: true
}, { }, {
user, user,
@ -113,6 +124,8 @@ server.post('/user/create', (req, res, next) => {
uidValidity, uidValidity,
uidNext: 1, uidNext: 1,
modifyIndex: 0, modifyIndex: 0,
storageUsed: 0,
messages: 0,
subscribed: true subscribed: true
}], { }], {
w: 1, w: 1,

View file

@ -13,53 +13,72 @@ const config = require('config');
const nodemailer = require('nodemailer'); const nodemailer = require('nodemailer');
const transporter = nodemailer.createTransport({ const transporter = nodemailer.createTransport({
pool: true,
maxConnections: 10,
host: 'localhost', host: 'localhost',
port: config.smtp.port, port: config.smtp.port,
logger: false, logger: false,
debug: false debug: false
}); });
transporter.sendMail({
envelope: { let sent = 0;
let total = 1;
let startTime = Date.now();
function send() {
transporter.sendMail({
envelope: {
from: 'andris@kreata.ee',
to: [recipient]
},
from: 'andris@kreata.ee', from: 'andris@kreata.ee',
to: [recipient] to: recipient,
}, subject: 'Test ööö message [' + Date.now() + ']',
from: 'andris@kreata.ee', text: 'Hello world! Current time is ' + new Date().toString() + ' <img src="cid:note@example.com"/>',
to: recipient, html: '<p>Hello world! Current time is <em>' + new Date().toString() + '</em></p>',
subject: 'Test ööö message [' + Date.now() + ']', attachments: [
text: 'Hello world! Current time is ' + new Date().toString() + ' <img src="cid:note@example.com"/>',
html: '<p>Hello world! Current time is <em>' + new Date().toString() + '</em></p>',
attachments: [
// attachment as plaintext // attachment as plaintext
{ {
filename: 'notes.txt', filename: 'notes.txt',
content: 'Some notes about this e-mail', content: 'Some notes about this e-mail',
contentType: 'text/plain' // optional, would be detected from the filename contentType: 'text/plain' // optional, would be detected from the filename
}, },
// Small Binary Buffer attachment, should be kept with message // Small Binary Buffer attachment, should be kept with message
{ {
filename: 'image.png', filename: 'image.png',
content: new Buffer('iVBORw0KGgoAAAANSUhEUgAAABAAAAAQAQMAAAAlPW0iAAAABlBMVEUAAAD/' + content: new Buffer('iVBORw0KGgoAAAANSUhEUgAAABAAAAAQAQMAAAAlPW0iAAAABlBMVEUAAAD/' +
'//+l2Z/dAAAAM0lEQVR4nGP4/5/h/1+G/58ZDrAz3D/McH8yw83NDDeNGe4U' + '//+l2Z/dAAAAM0lEQVR4nGP4/5/h/1+G/58ZDrAz3D/McH8yw83NDDeNGe4U' +
'g9C9zwz3gVLMDA/A6P9/AFGGFyjOXZtQAAAAAElFTkSuQmCC', 'base64'), 'g9C9zwz3gVLMDA/A6P9/AFGGFyjOXZtQAAAAAElFTkSuQmCC', 'base64'),
cid: 'note@example.com' // should be as unique as possible cid: 'note@example.com' // should be as unique as possible
}, },
// Large Binary Buffer attachment, should be kept separately // Large Binary Buffer attachment, should be kept separately
{ {
path: __dirname + '/swan.jpg', path: __dirname + '/swan.jpg',
filename: 'swän.jpg' filename: 'swän.jpg'
}
]
}, (err, info) => {
if (err && err.response) {
console.log('Message failed: %s', err.response);
} else if (err) {
console.log(err);
} else {
console.log(info);
} }
] sent++;
}, (err, info) => { if (sent >= total) {
if (err && err.response) { console.log('Sent %s messages in %s s', sent, (Date.now() - startTime) / 1000);
console.log('Message failed: %s', err.response); return transporter.close();
} else if (err) { }
console.log(err); });
} else { }
console.log(info);
} for (let i = 0; i < total; i++) {
}); send();
}

View file

@ -468,6 +468,8 @@ module.exports.formatInternalDate = function (date) {
*/ */
module.exports.getQueryResponse = function (query, message, options) { module.exports.getQueryResponse = function (query, message, options) {
options = options || {};
// for optimization purposes try to use cached mimeTree etc. if available // for optimization purposes try to use cached mimeTree etc. if available
// If these values are missing then generate these when first time required // If these values are missing then generate these when first time required
// So if the query is for (UID FLAGS) then mimeTree is never generated // So if the query is for (UID FLAGS) then mimeTree is never generated

View file

@ -21,6 +21,8 @@ describe('IMAP Protocol integration tests', function () {
logger: false // remove to print IMAP traffic to console logger: false // remove to print IMAP traffic to console
}); });
server.acceptUTF8Enabled = false;
server.listen(function () { server.listen(function () {
port = server.server.address().port; port = server.server.address().port;
done(); done();

185
imap.js
View file

@ -158,6 +158,8 @@ server.onCreate = function (path, session, callback) {
uidValidity: Math.floor(Date.now() / 1000), uidValidity: Math.floor(Date.now() / 1000),
uidNext: 1, uidNext: 1,
modifyIndex: 0, modifyIndex: 0,
storageUsed: 0,
messages: 0,
subscribed: true subscribed: true
}; };
@ -231,21 +233,30 @@ server.onDelete = function (path, session, callback) {
return callback(err); return callback(err);
} }
database.collection('journal').deleteMany({ database.collection('messages').deleteMany({
mailbox: mailbox._id mailbox: mailbox._id
}, err => { }, err => {
if (err) { if (err) {
return callback(err); return callback(err);
} }
database.collection('messages').deleteMany({ // decrement quota counters
mailbox: mailbox._id database.collection('users').findOneAndUpdate({
}, err => { _id: mailbox.user
if (err) { }, {
return callback(err); $inc: {
storageUsed: -Number(mailbox.storageUsed) || 0,
messages: -Number(mailbox.messages) || 0
} }
}, () => {
callback(null, true); database.collection('journal').deleteMany({
mailbox: mailbox._id
}, err => {
if (err) {
return callback(err);
}
callback(null, true);
});
}); });
}); });
}); });
@ -327,25 +338,41 @@ server.onStatus = function (path, session, callback) {
// APPEND mailbox (flags) date message // APPEND mailbox (flags) date message
server.onAppend = function (path, flags, date, raw, session, callback) { server.onAppend = function (path, flags, date, raw, session, callback) {
this.logger.debug('[%s] Appending message to "%s"', session.id, path); this.logger.debug('[%s] Appending message to "%s"', session.id, path);
messageHandler.add({
user: session.user.id, database.collection('users').findOne({
path, _id: session.user.id
meta: { }, (err, user) => {
source: 'IMAP',
to: session.user.username,
time: Date.now()
},
date,
flags,
raw
}, (err, status, data) => {
if (err) { if (err) {
if (err.imapResponse) {
return callback(null, err.imapResponse);
}
return callback(err); return callback(err);
} }
callback(null, status, data); if (!user) {
return callback(new Error('User not found'));
}
if (user.storage && user.storageUsed + raw.length > user.storage) {
return callback(false, 'OVERQUOTA');
}
messageHandler.add({
user: session.user.id,
path,
meta: {
source: 'IMAP',
to: session.user.username,
time: Date.now()
},
date,
flags,
raw
}, (err, status, data) => {
if (err) {
if (err.imapResponse) {
return callback(null, err.imapResponse);
}
return callback(err);
}
callback(null, status, data);
});
}); });
}; };
@ -513,31 +540,61 @@ server.onExpunge = function (path, update, session, callback) {
flags: '\\Deleted' flags: '\\Deleted'
}).project({ }).project({
_id: true, _id: true,
uid: true uid: true,
size: true
}).sort([ }).sort([
['uid', 1] ['uid', 1]
]); ]);
let deletedMessages = 0;
let deletedStorage = 0;
let updateQuota = next => {
if (!deletedMessages) {
return next();
}
database.collection('mailboxes').findOneAndUpdate({
_id: mailbox._id
}, {
$inc: {
storageUsed: -deletedStorage,
messages: -deletedMessages
}
}, () => {
database.collection('users').findOneAndUpdate({
_id: mailbox.user
}, {
$inc: {
storageUsed: -deletedStorage,
messages: -deletedMessages
}
}, next);
});
};
let processNext = () => { let processNext = () => {
cursor.next((err, message) => { cursor.next((err, message) => {
if (err) { if (err) {
return callback(err); return updateQuota(() => callback(err));
} }
if (!message) { if (!message) {
return cursor.close(() => { return cursor.close(() => {
this.notifier.fire(session.user.id, path); updateQuota(() => {
this.notifier.fire(session.user.id, path);
// delete all attachments that do not have any active links to message objects // delete all attachments that do not have any active links to message objects
database.collection('attachments.files').deleteMany({ database.collection('attachments.files').deleteMany({
'metadata.messages': { 'metadata.messages': {
$size: 0 $size: 0
} }
}, err => { }, err => {
if (err) { if (err) {
// ignore as we don't really care if we have orphans or not // ignore as we don't really care if we have orphans or not
} }
return callback(null, true); return callback(null, true);
});
}); });
}); });
} }
@ -550,9 +607,12 @@ server.onExpunge = function (path, update, session, callback) {
_id: message._id _id: message._id
}, err => { }, err => {
if (err) { if (err) {
return cursor.close(() => callback(err)); return updateQuota(() => cursor.close(() => callback(err)));
} }
deletedMessages++;
deletedStorage += Number(message.size) || 0;
// remove link to message from attachments (if any exist) // remove link to message from attachments (if any exist)
database.collection('attachments.files').updateMany({ database.collection('attachments.files').updateMany({
'metadata.messages': message._id 'metadata.messages': message._id
@ -614,20 +674,48 @@ server.onCopy = function (path, update, session, callback) {
} }
}); // no projection as we need to copy the entire message }); // no projection as we need to copy the entire message
let copiedMessages = 0;
let copiedStorage = 0;
let updateQuota = next => {
if (!copiedMessages) {
return next();
}
database.collection('mailboxes').findOneAndUpdate({
_id: target._id
}, {
$inc: {
storageUsed: copiedStorage,
messages: copiedMessages
}
}, () => {
database.collection('users').findOneAndUpdate({
_id: mailbox.user
}, {
$inc: {
storageUsed: copiedStorage,
messages: copiedMessages
}
}, next);
});
};
let sourceUid = []; let sourceUid = [];
let destinationUid = []; let destinationUid = [];
let processNext = () => { let processNext = () => {
cursor.next((err, message) => { cursor.next((err, message) => {
if (err) { if (err) {
return callback(err); return updateQuota(() => callback(err));
} }
if (!message) { if (!message) {
return cursor.close(() => { return cursor.close(() => {
this.notifier.fire(session.user.id, target.path); updateQuota(() => {
return callback(null, true, { this.notifier.fire(session.user.id, target.path);
uidValidity: target.uidValidity, return callback(null, true, {
sourceUid, uidValidity: target.uidValidity,
destinationUid sourceUid,
destinationUid
});
}); });
}); });
} }
@ -645,12 +733,12 @@ server.onCopy = function (path, update, session, callback) {
uidNext: true uidNext: true
}, (err, item) => { }, (err, item) => {
if (err) { if (err) {
return callback(err); return updateQuota(() => callback(err));
} }
if (!item || !item.value) { if (!item || !item.value) {
// was not able to acquire a lock // was not able to acquire a lock
return callback(null, 'TRYCREATE'); return updateQuota(() => callback(null, 'TRYCREATE'));
} }
let uidNext = item.value.uidNext; let uidNext = item.value.uidNext;
@ -667,9 +755,12 @@ server.onCopy = function (path, update, session, callback) {
database.collection('messages').insertOne(message, err => { database.collection('messages').insertOne(message, err => {
if (err) { if (err) {
return callback(err); return updateQuota(() => callback(err));
} }
copiedMessages++;
copiedStorage += Number(message.size) || 0;
// remove link to message from attachments (if any exist) // remove link to message from attachments (if any exist)
database.collection('attachments.files').updateMany({ database.collection('attachments.files').updateMany({
'metadata.messages': sourceId 'metadata.messages': sourceId

View file

@ -83,76 +83,109 @@ class MessageHandler {
return callback(new Error('The user you are trying to contact is receiving mail at a rate that prevents additional messages from being delivered. Please resend your message at a later time')); return callback(new Error('The user you are trying to contact is receiving mail at a rate that prevents additional messages from being delivered. Please resend your message at a later time'));
} }
// acquire new UID+MODSEQ this.database.collection('users').findOneAndUpdate({
this.database.collection('mailboxes').findOneAndUpdate({ _id: mailbox.user
_id: mailbox._id
}, { }, {
$inc: { $inc: {
// allocate bot UID and MODSEQ values so when journal is later sorted by storageUsed: size,
// modseq then UIDs are always in ascending order messages: 1
uidNext: 1,
modifyIndex: 1
} }
}, (err, item) => { }, err => {
if (err) { if (err) {
this.redlock.releaseLock(lock, () => false); this.redlock.releaseLock(lock, () => false);
return callback(err); return callback(err);
} }
if (!item || !item.value) { let rollback = err => {
// was not able to acquire a lock this.database.collection('users').findOneAndUpdate({
let err = new Error('Mailbox is missing'); _id: mailbox.user
err.imapResponse = 'TRYCREATE'; }, {
this.redlock.releaseLock(lock, () => false); $inc: {
return callback(err); storageUsed: -size,
} messages: -1
}
let mailbox = item.value; }, () => {
this.redlock.releaseLock(lock, () => callback(err));
let internaldate = options.date && new Date(options.date) || new Date(); });
let headerdate = mimeTree.parsedHeader.date && new Date(mimeTree.parsedHeader.date) || false;
if (!headerdate || headerdate.toString() === 'Invalid Date') {
headerdate = internaldate;
}
let message = {
_id: id,
mailbox: mailbox._id,
uid: mailbox.uidNext,
internaldate,
headerdate,
flags: [].concat(options.flags || []),
size,
meta: options.meta || {},
modseq: mailbox.modifyIndex + 1,
mimeTree,
envelope,
bodystructure,
messageId
}; };
this.database.collection('messages').insertOne(message, err => { // acquire new UID+MODSEQ
this.database.collection('mailboxes').findOneAndUpdate({
_id: mailbox._id
}, {
$inc: {
// allocate bot UID and MODSEQ values so when journal is later sorted by
// modseq then UIDs are always in ascending order
uidNext: 1,
modifyIndex: 1,
storageUsed: size,
messages: 1
}
}, (err, item) => {
if (err) { if (err) {
this.redlock.releaseLock(lock, () => false); return rollback(err);
return callback(err);
} }
let uidValidity = mailbox.uidValidity; if (!item || !item.value) {
let uid = message.uid; // was not able to acquire a lock
let err = new Error('Mailbox is missing');
err.imapResponse = 'TRYCREATE';
return rollback(err);
}
this.notifier.addEntries(mailbox, false, { let mailbox = item.value;
command: 'EXISTS',
uid: message.uid,
message: message._id,
modseq: message.modseq
}, () => {
this.redlock.releaseLock(lock, () => { let internaldate = options.date && new Date(options.date) || new Date();
this.notifier.fire(mailbox.user, mailbox.path); let headerdate = mimeTree.parsedHeader.date && new Date(mimeTree.parsedHeader.date) || false;
return callback(null, true, {
uidValidity, if (!headerdate || headerdate.toString() === 'Invalid Date') {
uid headerdate = internaldate;
}
let message = {
_id: id,
mailbox: mailbox._id,
uid: mailbox.uidNext,
internaldate,
headerdate,
flags: [].concat(options.flags || []),
size,
meta: options.meta || {},
modseq: mailbox.modifyIndex + 1,
mimeTree,
envelope,
bodystructure,
messageId
};
this.database.collection('messages').insertOne(message, err => {
if (err) {
return this.database.collection('mailboxes').findOneAndUpdate({
_id: mailbox._id
}, {
$inc: {
storageUsed: -size,
messages: -1
}
}, () => rollback(err));
}
let uidValidity = mailbox.uidValidity;
let uid = message.uid;
this.notifier.addEntries(mailbox, false, {
command: 'EXISTS',
uid: message.uid,
message: message._id,
modseq: message.modseq
}, () => {
this.redlock.releaseLock(lock, () => {
this.notifier.fire(mailbox.user, mailbox.path);
return callback(null, true, {
uidValidity,
uid
});
}); });
}); });
}); });