Changed quota handling

This commit is contained in:
Andris Reinman 2017-03-29 20:06:09 +03:00
parent 9174ae8c09
commit 2aa4f79142
19 changed files with 367 additions and 1050 deletions

View file

@ -41,7 +41,7 @@ Wild Duck IMAP server supports the following IMAP standards:
### Does it work?
Yes, it does. You can run the server and get a working IMAP server for mail store, SMTP server for pushing messages to the mail store and HTTP API server to create new users. All handled by Node.js, MongoDB and Redis, no additional dependencies needed.
Yes, it does. You can run the server and get a working IMAP server for mail store, SMTP server for pushing messages to the mail store and HTTP API server to create new users. All handled by Node.js, MongoDB and Redis, no additional dependencies needed. The IMAP server hosting уайлддак.орг uses a MongoDB replica set of 3 hosts.
### What are the killer features?
@ -217,6 +217,35 @@ The response for successful operation should look like this:
Quota changes apply immediately.
### POST /user/quota/reset
Recalculates used storage for an user. Use this when it seems that quota counters for an user do not match with reality.
Arguments
- **username** is the username of the user to check
**Example**
```
curl -XPOST "http://localhost:8080/user/quota/reset" -H 'content-type: application/json' -d '{
"username": "testuser"
}'
```
The response for successful operation should look like this:
```json
{
"success": true,
"username": "testuser",
"previousStorageUsed": 1000,
"storageUsed": 800
}
```
Be aware though that this method is not atomic and should be done only if quota counters are way off.
### POST /user/password
Updates password for an user

139
api.js
View file

@ -82,7 +82,6 @@ server.post('/user/create', (req, res, next) => {
password: hash,
address: false,
storageUsed: 0,
messages: 0,
quota,
created: new Date()
}, (err, result) => {
@ -104,8 +103,6 @@ server.post('/user/create', (req, res, next) => {
uidValidity,
uidNext: 1,
modifyIndex: 0,
storageUsed: 0,
messages: 0,
subscribed: true
}, {
user,
@ -114,8 +111,6 @@ server.post('/user/create', (req, res, next) => {
uidValidity,
uidNext: 1,
modifyIndex: 0,
storageUsed: 0,
messages: 0,
subscribed: true
}, {
user,
@ -124,8 +119,6 @@ server.post('/user/create', (req, res, next) => {
uidValidity,
uidNext: 1,
modifyIndex: 0,
storageUsed: 0,
messages: 0,
subscribed: true
}, {
user,
@ -134,8 +127,6 @@ server.post('/user/create', (req, res, next) => {
uidValidity,
uidNext: 1,
modifyIndex: 0,
storageUsed: 0,
messages: 0,
subscribed: true
}], {
w: 1,
@ -339,6 +330,117 @@ server.post('/user/quota', (req, res, next) => {
});
});
server.post('/user/quota/reset', (req, res, next) => {
res.charSet('utf-8');
const schema = Joi.object().keys({
username: Joi.string().alphanum().lowercase().min(3).max(30).required()
});
const result = Joi.validate({
username: req.params.username
}, schema, {
abortEarly: false,
convert: true,
allowUnknown: true
});
if (result.error) {
res.json({
error: result.error.message
});
return next();
}
let username = result.value.username;
db.database.collection('users').findOne({
username
}, (err, user) => {
if (err) {
res.json({
error: 'MongoDB Error: ' + err.message,
username
});
return next();
}
if (!user) {
res.json({
error: 'This user does not exist',
username
});
return next();
}
// calculate mailbox size by aggregating the size's of all messages
db.database.collection('messages').aggregate([{
$match: {
user: user._id
}
}, {
$group: {
_id: {
user: '$user'
},
storageUsed: {
$sum: '$size'
}
}
}], {
cursor: {
batchSize: 1
}
}).toArray((err, result) => {
if (err) {
res.json({
error: 'MongoDB Error: ' + err.message,
username
});
return next();
}
let storageUsed = result && result[0] && result[0].storageUsed || 0;
// update quota counter
db.database.collection('users').findOneAndUpdate({
_id: user._id
}, {
$set: {
storageUsed: Number(storageUsed) || 0
}
}, {
returnOriginal: false
}, (err, result) => {
if (err) {
res.json({
error: 'MongoDB Error: ' + err.message,
username
});
return next();
}
if (!result || !result.value) {
res.json({
error: 'This user does not exist',
username
});
return next();
}
res.json({
success: true,
username,
previousStorageUsed: Number(result.value.storageUsed) || 0,
storageUsed: user.storageUsed
});
return next();
});
});
});
});
server.post('/user/password', (req, res, next) => {
res.charSet('utf-8');
@ -531,30 +633,33 @@ server.get('/user/mailboxes', (req, res, next) => {
mailboxes = [];
}
let priority = {
Inbox: 1,
Sent: 2,
Junk: 3,
Trash: 4
};
res.json({
success: true,
username,
mailboxes: mailboxes.map(mailbox => ({
id: mailbox._id.toString(),
path: mailbox.path,
special: mailbox.path === 'INBOX' ? 'Inbox' : (mailbox.specialUse ? mailbox.specialUse.replace(/^\\/, '') : false),
messages: mailbox.messages
special: mailbox.path === 'INBOX' ? 'Inbox' : (mailbox.specialUse ? mailbox.specialUse.replace(/^\\/, '') : false)
})).sort((a, b) => {
if (a.special && !b.special) {
return -1;
}
if (b.special && !a.special) {
return 1;
}
let priority = {
Inbox: 1,
Sent: 2,
Junk: 3,
Trash: 4
};
if (a.special && b.special) {
return (priority[a.special] || 5) - (priority[b.special] || 5);
}
return a.path.localeCompare(b.path);
})
});

View file

@ -21,9 +21,8 @@ const transporter = nodemailer.createTransport({
debug: false
});
let sent = 0;
let total = 1;
let total = 10000;
let startTime = Date.now();
function send() {
@ -75,10 +74,14 @@ function send() {
if (sent >= total) {
console.log('Sent %s messages in %s s', sent, (Date.now() - startTime) / 1000);
return transporter.close();
}else {
send();
}
});
}
send();
/*
for (let i = 0; i < total; i++) {
send();
}
*/

View file

@ -1,543 +0,0 @@
'use strict';
// Replace '../index' with 'imap-core' when running this script outside this directory
let IMAPServerModule = require('../index');
let IMAPServer = IMAPServerModule.IMAPServer;
let MemoryNotifier = IMAPServerModule.MemoryNotifier;
const SERVER_PORT = 9993;
const SERVER_HOST = '127.0.0.1';
// Connect to this example server by running
// openssl s_client -crlf -connect localhost:9993
// Username is "testuser" and password is "pass"
// This example uses global folders and subscriptions
let folders = new Map();
let subscriptions = new WeakSet();
// configure initial mailbox state
[
// INBOX
{
path: 'INBOX',
uidValidity: 123,
uidNext: 70,
modifyIndex: 6,
messages: [{
uid: 45,
flags: [],
date: new Date(),
modseq: 1,
raw: Buffer.from('from: sender@example.com\r\nto: to@example.com\r\ncc: cc@example.com\r\nsubject: test')
}, {
uid: 49,
flags: ['\\Seen'],
date: new Date(),
modseq: 2
}, {
uid: 50,
flags: ['\\Seen'],
date: new Date(),
modseq: 3
}, {
uid: 52,
flags: [],
date: new Date(),
modseq: 4
}, {
uid: 53,
flags: [],
date: new Date(),
modseq: 5
}, {
uid: 60,
flags: [],
date: new Date(),
modseq: 6
}],
journal: []
},
// [Gmail]/Sent Mail
{
path: '[Gmail]/Sent Mail',
specialUse: '\\Sent',
uidValidity: 123,
uidNext: 90,
modifyIndex: 0,
messages: [],
journal: []
}
].forEach(folder => {
folders.set(folder.path, folder);
subscriptions.add(folder);
});
// Setup server
let server = new IMAPServer({
secure: true,
id: {
name: 'test'
}
});
// setup notification system for updates
server.notifier = new MemoryNotifier({
folders
});
server.onAuth = function (login, session, callback) {
if (login.username !== 'testuser' || login.password !== 'pass') {
return callback();
}
callback(null, {
user: {
username: login.username
}
});
};
// LIST "" "*"
// Returns all folders, query is informational
// folders is either an Array or a Map
server.onList = function (query, session, callback) {
this.logger.debug('[%s] LIST for "%s"', session.id, query);
callback(null, folders);
};
// LSUB "" "*"
// Returns all subscribed folders, query is informational
// folders is either an Array or a Map
server.onLsub = function (query, session, callback) {
this.logger.debug('[%s] LSUB for "%s"', session.id, query);
let subscribed = [];
folders.forEach(folder => {
if (subscriptions.has(folder)) {
subscribed.push(folder);
}
});
callback(null, subscribed);
};
// SUBSCRIBE "path/to/mailbox"
server.onSubscribe = function (mailbox, session, callback) {
this.logger.debug('[%s] SUBSCRIBE to "%s"', session.id, mailbox);
if (!folders.has(mailbox)) {
return callback(null, 'NONEXISTENT');
}
subscriptions.add(folders.get(mailbox));
callback(null, true);
};
// UNSUBSCRIBE "path/to/mailbox"
server.onUnsubscribe = function (mailbox, session, callback) {
this.logger.debug('[%s] UNSUBSCRIBE from "%s"', session.id, mailbox);
if (!folders.has(mailbox)) {
return callback(null, 'NONEXISTENT');
}
subscriptions.delete(folders.get(mailbox));
callback(null, true);
};
// CREATE "path/to/mailbox"
server.onCreate = function (mailbox, session, callback) {
this.logger.debug('[%s] CREATE "%s"', session.id, mailbox);
if (folders.has(mailbox)) {
return callback(null, 'ALREADYEXISTS');
}
folders.set(mailbox, {
path: mailbox,
uidValidity: Date.now(),
uidNext: 1,
modifyIndex: 0,
messages: [],
journal: []
});
subscriptions.add(folders.get(mailbox));
callback(null, true);
};
// RENAME "path/to/mailbox" "new/path"
// NB! RENAME affects child and hierarchy mailboxes as well, this example does not do this
server.onRename = function (mailbox, newname, session, callback) {
this.logger.debug('[%s] RENAME "%s" to "%s"', session.id, mailbox, newname);
if (!folders.has(mailbox)) {
return callback(null, 'NONEXISTENT');
}
if (folders.has(newname)) {
return callback(null, 'ALREADYEXISTS');
}
let oldMailbox = folders.get(mailbox);
folders.delete(mailbox);
oldMailbox.path = newname;
folders.set(newname, oldMailbox);
callback(null, true);
};
// DELETE "path/to/mailbox"
server.onDelete = function (mailbox, session, callback) {
this.logger.debug('[%s] DELETE "%s"', session.id, mailbox);
if (!folders.has(mailbox)) {
return callback(null, 'NONEXISTENT');
}
// keep SPECIAL-USE folders
if (folders.get(mailbox).specialUse) {
return callback(null, 'CANNOT');
}
folders.delete(mailbox);
callback(null, true);
};
// SELECT/EXAMINE
server.onOpen = function (mailbox, session, callback) {
this.logger.debug('[%s] Opening "%s"', session.id, mailbox);
if (!folders.has(mailbox)) {
return callback(null, 'NONEXISTENT');
}
let folder = folders.get(mailbox);
return callback(null, {
specialUse: folder.specialUse,
uidValidity: folder.uidValidity,
uidNext: folder.uidNext,
modifyIndex: folder.modifyIndex,
uidList: folder.messages.map(message => message.uid)
});
};
// STATUS (X Y X)
server.onStatus = function (mailbox, session, callback) {
this.logger.debug('[%s] Requested status for "%s"', session.id, mailbox);
if (!folders.has(mailbox)) {
return callback(null, 'NONEXISTENT');
}
let folder = folders.get(mailbox);
return callback(null, {
messages: folder.messages.length,
uidNext: folder.uidNext,
uidValidity: folder.uidValidity,
unseen: folder.messages.filter(message => message.flags.indexOf('\\Seen') < 0).length
});
};
// APPEND mailbox (flags) date message
server.onAppend = function (mailbox, flags, date, raw, session, callback) {
this.logger.debug('[%s] Appending message to "%s"', session.id, mailbox);
if (!folders.has(mailbox)) {
return callback(null, 'TRYCREATE');
}
date = date && new Date(date) || new Date();
let folder = folders.get(mailbox);
let message = {
uid: folder.uidNext++,
date: date && new Date(date) || new Date(),
raw,
flags
};
folder.messages.push(message);
// do not write directly to stream, use notifications as the currently selected mailbox might not be the one that receives the message
this.notifier.addEntries(session.user.username, mailbox, {
command: 'EXISTS',
uid: message.uid
}, () => {
this.notifier.fire(session.user.username, mailbox);
return callback(null, true, {
uidValidity: folder.uidValidity,
uid: message.uid
});
});
};
// STORE / UID STORE, updates flags for selected UIDs
server.onStore = function (mailbox, update, session, callback) {
this.logger.debug('[%s] Updating messages in "%s"', session.id, mailbox);
if (!folders.has(mailbox)) {
return callback(null, 'NONEXISTENT');
}
let folder = folders.get(mailbox);
let i = 0;
let processMessages = () => {
if (i >= folder.messages.length) {
this.notifier.fire(session.user.username, mailbox);
return callback(null, true);
}
let message = folder.messages[i++];
let updated = false;
if (update.messages.indexOf(message.uid) < 0) {
return processMessages();
}
switch (update.action) {
case 'set':
// check if update set matches current or is different
if (message.flags.length !== update.value.length || update.value.filter(flag => message.flags.indexOf(flag) < 0).length) {
updated = true;
}
// set flags
message.flags = [].concat(update.value);
break;
case 'add':
message.flags = message.flags.concat(update.value.filter(flag => {
if (message.flags.indexOf(flag) < 0) {
updated = true;
return true;
}
return false;
}));
break;
case 'remove':
message.flags = message.flags.filter(flag => {
if (update.value.indexOf(flag) < 0) {
return true;
}
updated = true;
return false;
});
break;
}
// Onlsy show response if not silent
if (!update.silent) {
session.writeStream.write(session.formatResponse('FETCH', message.uid, {
uid: update.isUid ? message.uid : false,
flags: message.flags
}));
}
// notifiy other clients only if something changed
if (updated) {
this.notifier.addEntries(session.user.username, mailbox, {
command: 'FETCH',
ignore: session.id,
uid: message.uid,
flags: message.flags
}, processMessages);
} else {
processMessages();
}
};
processMessages();
};
// EXPUNGE deletes all messages in selected mailbox marked with \Delete
// EXPUNGE deletes all messages in selected mailbox marked with \Delete
server.onExpunge = function (mailbox, update, session, callback) {
this.logger.debug('[%s] Deleting messages from "%s"', session.id, mailbox);
if (!folders.has(mailbox)) {
return callback(null, 'NONEXISTENT');
}
let folder = folders.get(mailbox);
let deleted = [];
let i, len;
for (i = folder.messages.length - 1; i >= 0; i--) {
if (
(
(update.isUid && update.messages.indexOf(folder.messages[i].uid) >= 0) ||
!update.isUid
) && folder.messages[i].flags.indexOf('\\Deleted') >= 0) {
deleted.unshift(folder.messages[i].uid);
folder.messages.splice(i, 1);
}
}
let entries = [];
for (i = 0, len = deleted.length; i < len; i++) {
entries.push({
command: 'EXPUNGE',
ignore: session.id,
uid: deleted[i]
});
if (!update.silent) {
session.writeStream.write(session.formatResponse('EXPUNGE', deleted[i]));
}
}
this.notifier.addEntries(session.user.username, mailbox, entries, () => {
this.notifier.fire(session.user.username, mailbox);
return callback(null, true);
});
};
// COPY / UID COPY sequence mailbox
server.onCopy = function (mailbox, update, session, callback) {
this.logger.debug('[%s] Copying messages from "%s" to "%s"', session.id, mailbox, update.destination);
if (!folders.has(mailbox)) {
return callback(null, 'NONEXISTENT');
}
if (!folders.has(update.destination)) {
return callback(null, 'TRYCREATE');
}
let sourceFolder = folders.get(mailbox);
let destinationFolder = folders.get(update.destination);
let messages = [];
let sourceUid = [];
let destinationUid = [];
let i, len;
let entries = [];
for (i = sourceFolder.messages.length - 1; i >= 0; i--) {
if (update.messages.indexOf(sourceFolder.messages[i].uid) >= 0) {
messages.unshift(JSON.parse(JSON.stringify(sourceFolder.messages[i])));
sourceUid.unshift(sourceFolder.messages[i].uid);
}
}
for (i = 0, len = messages.length; i < len; i++) {
messages[i].uid = destinationFolder.uidNext++;
destinationUid.push(messages[i].uid);
destinationFolder.messages.push(messages[i]);
// do not write directly to stream, use notifications as the currently selected mailbox might not be the one that receives the message
entries.push({
command: 'EXISTS',
uid: messages[i].uid
});
}
this.notifier.addEntries(update.destination, session.user.username, entries, () => {
this.notifier.fire(update.destination, session.user.username);
return callback(null, true, {
uidValidity: destinationFolder.uidValidity,
sourceUid,
destinationUid
});
});
};
// sends results to socket
server.onFetch = function (mailbox, options, session, callback) {
this.logger.debug('[%s] Requested FETCH for "%s"', session.id, mailbox);
if (!folders.has(mailbox)) {
return callback(null, 'NONEXISTENT');
}
let folder = folders.get(mailbox);
let entries = [];
if (options.markAsSeen) {
// mark all matching messages as seen
folder.messages.forEach(message => {
if (options.messages.indexOf(message.uid) < 0) {
return;
}
// if BODY[] is touched, then add \Seen flag and notify other clients
if (message.flags.indexOf('\\Seen') < 0) {
message.flags.unshift('\\Seen');
entries.push({
command: 'FETCH',
ignore: session.id,
uid: message.uid,
flags: message.flags
});
}
});
}
this.notifier.addEntries(session.user.username, mailbox, entries, () => {
folder.messages.forEach(message => {
if (options.messages.indexOf(message.uid) < 0) {
return;
}
// send formatted response to socket
session.writeStream.write(session.formatResponse('FETCH', message.uid, {
query: options.query,
values: session.getQueryResponse(options.query, message)
}));
});
// once messages are processed show relevant updates
this.notifier.fire(session.user.username, mailbox);
callback(null, true);
});
};
// returns an array of matching UID values
server.onSearch = function (mailbox, options, session, callback) {
if (!folders.has(mailbox)) {
return callback(null, 'NONEXISTENT');
}
let folder = folders.get(mailbox);
let highestModseq = 0;
let uidList = folder.messages.filter(message => {
let match = session.matchSearchQuery(message, options.query);
if (match && highestModseq < message.modseq) {
highestModseq = message.modseq;
}
return match;
}).map(message => message.uid);
callback(null, {
uidList,
highestModseq
});
};
// -------
server.on('error', err => {
console.log('Error occurred\n%s', err.stack); // eslint-disable-line no-console
});
process.on('SIGINT', () => {
server.close(() => {
process.exit();
});
});
// start listening
server.listen(SERVER_PORT, SERVER_HOST);

View file

@ -1,6 +1,4 @@
'use strict';
module.exports.IMAPServer = require('./lib/imap-server').IMAPServer;
module.exports.RedisNotifier = require('./lib/redis-notifier');
module.exports.MemoryNotifier = require('./lib/memory-notifier');
module.exports.imapHandler = require('./lib/handler/imap-handler');

View file

@ -30,6 +30,8 @@ module.exports = {
capabilities.push('UTF8=ACCEPT');
capabilities.push('QUOTA');
// capabilities.push('MOVE');
if (this._server.options.maxMessage) {
capabilities.push('APPENDLIMIT=' + this._server.options.maxMessage);
}

View file

@ -0,0 +1,74 @@
'use strict';
let imapTools = require('../imap-tools');
let imapHandler = require('../handler/imap-handler');
module.exports = {
state: 'Selected',
schema: [{
name: 'range',
type: 'sequence'
}, {
name: 'mailbox',
type: 'string'
}],
handler(command, callback) {
let cmd = (command.command || '').toString().toUpperCase();
// Check if MOVE method is set
if (typeof this._server.onMove !== 'function') {
return callback(null, {
response: 'NO',
message: cmd + ' not implemented'
});
}
let range = command.attributes[0] && command.attributes[0].value || '';
let path = Buffer.from(command.attributes[1] && command.attributes[1].value || '', 'binary').toString();
let mailbox = imapTools.normalizeMailbox(path, !this.acceptUTF8Enabled);
if (!mailbox) {
return callback(new Error('Invalid mailbox argument for ' + cmd));
}
if (!imapTools.validateSequnce(range)) {
return callback(new Error('Invalid sequence set for ' + cmd));
}
let messages = imapTools.getMessageRange(this.selected.uidList, range, cmd === 'UID MOVE');
this._server.onMove(this.selected.mailbox, {
destination: mailbox,
messages
}, this.session, (err, success, info) => {
if (err) {
return callback(err);
}
let code = typeof success === 'string' ? success.toUpperCase() : false;
if (success === true) {
this.send(imapHandler.compiler({
tag: '*',
command: 'OK',
attributes: [{
type: 'SECTION',
section: [{
type: 'TEXT',
value: 'COPYUID ' + info.uidValidity + ' ' + imapTools.packMessageRange(info.sourceUid) + ' ' + imapTools.packMessageRange(info.destinationUid)
}]
}]
}));
}
callback(null, {
response: success === true ? 'OK' : 'NO',
code
});
});
}
};

View file

@ -36,6 +36,8 @@ let commands = new Map([
['UNSELECT', require('./commands/unselect')],
['COPY', require('./commands/copy')],
['UID COPY', require('./commands/copy')],
['MOVE', require('./commands/move')],
['UID MOVE', require('./commands/move')],
['FETCH', require('./commands/fetch')],
['UID FETCH', require('./commands/fetch')],
['SEARCH', require('./commands/search')],

View file

@ -1,6 +1,6 @@
'use strict';
let addressparser = require('addressparser');
let addressparser = require('nodemailer/lib/addressparser');
/**
* Parses a RFC822 message into a structured object (JSON compatible)

View file

@ -1,150 +0,0 @@
'use strict';
let crypto = require('crypto');
let EventEmitter = require('events').EventEmitter;
// Expects that the folder listing is a Map
class MemoryNotifier extends EventEmitter {
constructor(options) {
super();
this.folders = options.folders || new Map();
let logfunc = (...args) => {
let level = args.shift() || 'DEBUG';
let message = args.shift() || '';
console.log([level].concat(message || '').join(' '), ...args); // eslint-disable-line no-console
};
this.logger = options.logger || {
info: logfunc.bind(null, 'INFO'),
debug: logfunc.bind(null, 'DEBUG'),
error: logfunc.bind(null, 'ERROR')
};
this._listeners = new EventEmitter();
this._listeners.setMaxListeners(0);
EventEmitter.call(this);
}
/**
* Generates hashed event names for mailbox:username pairs
*
* @param {String} mailbox
* @param {String} username
* @returns {String} md5 hex
*/
_eventName(mailbox, username) {
return crypto.createHash('md5').update(username + ':' + mailbox).digest('hex');
}
/**
* Registers an event handler for mailbox:username events
*
* @param {String} username
* @param {String} mailbox
* @param {Function} handler Function to run once there are new entries in the journal
*/
addListener(session, mailbox, handler) {
let eventName = this._eventName(session.user.username, mailbox);
this._listeners.addListener(eventName, handler);
this.logger.debug('New journal listener for %s ("%s:%s")', eventName, session.user.username, mailbox);
}
/**
* Unregisters an event handler for mailbox:username events
*
* @param {String} username
* @param {String} mailbox
* @param {Function} handler Function to run once there are new entries in the journal
*/
removeListener(session, mailbox, handler) {
let eventName = this._eventName(session.user.username, mailbox);
this._listeners.removeListener(eventName, handler);
this.logger.debug('Removed journal listener from %s ("%s:%s")', eventName, session.user.username, mailbox);
}
/**
* Stores multiple journal entries to db
*
* @param {String} username
* @param {String} mailbox
* @param {Array|Object} entries An array of entries to be journaled
* @param {Function} callback Runs once the entry is either stored or an error occurred
*/
addEntries(username, mailbox, entries, callback) {
let folder = this.folders.get(mailbox);
if (!folder) {
return callback(null, new Error('Selected mailbox does not exist'));
}
if (entries && !Array.isArray(entries)) {
entries = [entries];
} else if (!entries || !entries.length) {
return callback(null, false);
}
// store entires in the folder object
if (!folder.journal) {
folder.journal = [];
}
entries.forEach(entry => {
entry.modseq = ++folder.modifyIndex;
folder.journal.push(entry);
});
setImmediate(callback);
}
/**
* Sends a notification that there are new updates in the selected mailbox
*
* @param {String} username
* @param {String} mailbox
*/
fire(username, mailbox, payload) {
let eventName = this._eventName(username, mailbox);
setImmediate(() => {
this._listeners.emit(eventName, payload);
});
}
/**
* Returns all entries from the journal that have higher than provided modification index
*
* @param {String} username
* @param {String} mailbox
* @param {Number} modifyIndex Last known modification id
* @param {Function} callback Returns update entries as an array
*/
getUpdates(session, mailbox, modifyIndex, callback) {
modifyIndex = Number(modifyIndex) || 0;
if (!this.folders.has(mailbox)) {
return callback(null, 'NONEXISTENT');
}
let folder = this.folders.get(mailbox);
let minIndex = folder.journal.length;
for (let i = folder.journal.length - 1; i >= 0; i--) {
if (folder.journal[i].modseq > modifyIndex) {
minIndex = i;
} else {
break;
}
}
return callback(null, folder.journal.slice(minIndex));
}
}
module.exports = MemoryNotifier;

View file

@ -1,22 +0,0 @@
-- inserts JSON values to a sorted set where score value is an incrementing number resolved from a KEYS[1].modifyIndex
-- check if the mailbox even exists
if not redis.call('exists', KEYS[1]) then
return {err='Selected mailbox does not exist'}
end;
local len = table.getn(ARGV);
-- do a single increment to get id values for all elements instead of incrementing it one by one
local score = redis.call('hincrby', KEYS[1], 'modifyIndex', len) - len;
for i = 1, #ARGV do
-- we include modification index in the stored value to ensure that all values are always unique
-- otherwise adding new element with the same data does not insert a new entry but overrides
-- an existing one
redis.call('zadd', KEYS[2], score + i, tostring(score + i) .. ':' .. ARGV[i]);
end;
-- return the largest modification index
return score + len;

View file

@ -1,244 +0,0 @@
'use strict';
let redis = require('redis');
let EventEmitter = require('events').EventEmitter;
let crypto = require('crypto');
let fs = require('fs');
let scripts = {
addEntries: fs.readFileSync(__dirname + '/add-entries.lua')
};
// Assumes that there are following hash keys in Redis:
// u:[username]:folder:[md5(path)]
// with the following key:
// modifyIndex: Number
class RedisNotifier extends EventEmitter {
constructor(options) {
super();
options = options || {};
this.options = {
port: options.port || 6379,
host: options.host || 'localhost',
db: options.db || 0,
prefix: options.prefix || 'imap:'
};
let logfunc = (...args) => {
let level = args.shift() || 'DEBUG';
let message = args.shift() || '';
console.log([level].concat(message || '').join(' '), ...args); // eslint-disable-line no-console
};
this.logger = options.logger || {
info: logfunc.bind(null, 'INFO'),
debug: logfunc.bind(null, 'DEBUG'),
error: logfunc.bind(null, 'ERROR')
};
// we need two db connections as subscriber can't manage data
this._db = redis.createClient(options.port, options.host);
this._subscriber = redis.createClient(options.port, options.host);
this._pubsubListeners = new Map();
this._listeners = new EventEmitter();
this._listeners.setMaxListeners(0);
this._subscriber.on('message', (channel, message) => {
try {
message = JSON.parse(message);
} catch (E) {
// ignore
}
this.logger.debug(
'Journal update notification for %s, updating %s subscribers',
channel.slice(this.options.prefix.length),
this._listeners.listenerCount(channel.slice(this.options.prefix.length)));
this._listeners.emit(channel.slice(this.options.prefix.length), message);
});
EventEmitter.call(this);
}
/**
* Generates hashed event names for mailbox:username pairs
*
* @param {String} username
* @param {String} mailbox
* @returns {String} md5 hex
*/
_eventName(username, mailbox) {
return crypto.createHash('md5').update(username + ':' + mailbox).digest('hex');
}
/**
* Registers an event handler for mailbox:username events
*
* @param {String} username
* @param {String} mailbox
* @param {Function} handler Function to run once there are new entries in the journal
*/
addListener(session, mailbox, handler) {
let eventName = this._eventName(session.user.username, mailbox);
this._listeners.addListener(eventName, handler);
if (!this._pubsubListeners.has(eventName)) {
this._pubsubListeners.set(eventName, 1);
this._subscriber.subscribe(this.options.prefix + eventName);
} else {
this._pubsubListeners.set(eventName, this._pubsubListeners.get(eventName) + 1);
}
this.logger.debug('New journal listener for %s ("%s:%s", total %s subscribers)', eventName, session.user.username, mailbox, this._listeners.listenerCount(eventName));
}
/**
* Unregisters an event handler for mailbox:username events
*
* @param {String} username
* @param {String} mailbox
* @param {Function} handler Function to run once there are new entries in the journal
*/
removeListener(session, mailbox, handler) {
let count, eventName = this._eventName(session.user.username, mailbox);
this._listeners.removeListener(eventName, handler);
if (this._pubsubListeners.has(eventName) && (count = this._pubsubListeners.get(eventName)) && count > 0) {
count--;
if (!count) {
this._subscriber.unsubscribe(this.options.prefix + eventName);
this._pubsubListeners.delete(eventName);
} else {
this._pubsubListeners.set(eventName, 1);
}
this.logger.debug('Removed journal listener from %s ("%s:%s", total %s subscribers)', eventName, session.user.username, mailbox, this._listeners.listenerCount(eventName));
}
}
/**
* Stores multiple journal entries to db
*
* @param {String} username
* @param {String} mailbox
* @param {Array|Object} entries An array of entries to be journaled
* @param {Function} callback Runs once the entry is either stored or an error occurred
*/
addEntries(username, mailbox, entries, callback) {
let mailboxHash = crypto.createHash('md5').update(mailbox).digest('hex');
if (entries && !Array.isArray(entries)) {
entries = [entries];
} else if (!entries || !entries.length) {
return callback(null, false);
}
entries = entries.map(entry => JSON.stringify(entry));
this.logger.debug('Adding journal entries for %s (%s)\n%s', mailbox, mailboxHash, entries.join('\n'));
this._db.multi().
select(this.options.db).
eval([
scripts.addEntries,
2,
'u:' + username + ':folder:' + mailboxHash,
'u:' + username + ':journal:' + mailboxHash
].concat(entries)).
exec(err => {
if (err) {
return callback(err);
}
return callback(null, true);
});
}
/**
* Sends a notification that there are new updates in the selected mailbox
*
* @param {String} username
* @param {String} mailbox
*/
fire(username, mailbox, payload) {
let eventName = this._eventName(username, mailbox);
payload = payload || false;
setImmediate(this._db.publish.bind(this._db, this.options.prefix + eventName, JSON.stringify(payload)));
}
/**
* Returns all entries from the journal that have higher than provided modification index
*
* @param {String} username
* @param {String} mailbox
* @param {Number} modifyIndex Last known modification id
* @param {Function} callback Returns update entries as an array
*/
getUpdates(session, mailbox, modifyIndex, callback) {
modifyIndex = Number(modifyIndex) || 0;
let mailboxHash = crypto.createHash('md5').update(mailbox).digest('hex');
let username = session.user.username;
this._db.multi().
select(this.options.db).
exists('u:' + username + ':journal:' + mailboxHash).
zrangebyscore('u:' + username + ':journal:' + mailboxHash, modifyIndex + 1, Infinity).
exec((err, replies) => {
let updates;
this.logger.debug('[%s] Loaded journal updates for "%s:%s" since %s', session.id, username, mailbox, modifyIndex + 1);
if (err) {
return callback(err);
}
if (!replies || !replies[1]) {
return callback(null, 'NONEXISTENT');
}
updates = (replies[2] || []).
map(entry => {
let data;
let m = (entry || '').toString().match(/^(\d+)\:/);
if (!m) {
// invalidly formatted entry
this.logger.debug('[%s] Invalidly formatted entry for "%s:%s" (%s)', session.id, username, mailbox, (entry).toString());
return false;
}
try {
data = JSON.parse(entry.substr(m[0].length));
data.modseq = Number(m[1]) || false;
// we mess around with json in redis lua but lua does not make
// a distinction between an object and an array, if an array
// is empty then it will be invalidly detected as an object
if (data.flags && !Array.isArray(data.flags)) {
data.flags = [];
}
} catch (E) {
this.logger.error('[%s] Failed parsing journal update for "%s:%s" (%s): %s', session.id, username, mailbox, entry.substr(m[0].length), E.message);
}
return data;
}).filter(entry =>
// only include entries with data
(entry && entry.uid)
);
this.logger.debug('[%s] Processing journal updates for "%s:%s": %s', session.id, username, mailbox, JSON.stringify(updates));
callback(null, updates);
});
}
}
module.exports = RedisNotifier;

99
imap.js
View file

@ -157,8 +157,6 @@ server.onCreate = function (path, session, callback) {
uidValidity: Math.floor(Date.now() / 1000),
uidNext: 1,
modifyIndex: 0,
storageUsed: 0,
messages: 0,
subscribed: true
};
@ -239,23 +237,54 @@ server.onDelete = function (path, session, callback) {
return callback(err);
}
// decrement quota counters
db.database.collection('users').findOneAndUpdate({
_id: mailbox.user
}, {
$inc: {
storageUsed: -Number(mailbox.storageUsed) || 0,
messages: -Number(mailbox.messages) || 0
}
}, () => {
db.database.collection('journal').deleteMany({
// calculate mailbox size by aggregating the size's of all messages
db.database.collection('messages').aggregate([{
$match: {
mailbox: mailbox._id
}, err => {
if (err) {
return callback(err);
}
}, {
$group: {
_id: {
mailbox: '$mailbox'
},
storageUsed: {
$sum: '$size'
}
callback(null, true);
});
}
}], {
cursor: {
batchSize: 1
}
}).toArray((err, res) => {
if (err) {
return callback(err);
}
let storageUsed = res && res[0] && res[0].storageUsed || 0;
let done = () => {
db.database.collection('journal').deleteMany({
mailbox: mailbox._id
}, err => {
if (err) {
return callback(err);
}
callback(null, true);
});
};
if (!storageUsed) {
return done();
}
// decrement quota counters
db.database.collection('users').findOneAndUpdate({
_id: mailbox.user
}, {
$inc: {
storageUsed: -Number(storageUsed) || 0
}
}, done);
});
});
});
@ -553,23 +582,13 @@ server.onExpunge = function (path, update, session, callback) {
return next();
}
db.database.collection('mailboxes').findOneAndUpdate({
_id: mailbox._id
db.database.collection('users').findOneAndUpdate({
_id: mailbox.user
}, {
$inc: {
storageUsed: -deletedStorage,
messages: -deletedMessages
storageUsed: -deletedStorage
}
}, () => {
db.database.collection('users').findOneAndUpdate({
_id: mailbox.user
}, {
$inc: {
storageUsed: -deletedStorage,
messages: -deletedMessages
}
}, next);
});
}, next);
};
let processNext = () => {
@ -680,23 +699,13 @@ server.onCopy = function (path, update, session, callback) {
if (!copiedMessages) {
return next();
}
db.database.collection('mailboxes').findOneAndUpdate({
_id: target._id
db.database.collection('users').findOneAndUpdate({
_id: mailbox.user
}, {
$inc: {
storageUsed: copiedStorage,
messages: copiedMessages
storageUsed: copiedStorage
}
}, () => {
db.database.collection('users').findOneAndUpdate({
_id: mailbox.user
}, {
$inc: {
storageUsed: copiedStorage,
messages: copiedMessages
}
}, next);
});
}, next);
};
let sourceUid = [];

View file

@ -54,6 +54,11 @@
"key": {
"mailbox": 1
}
}, {
"name": "user_messages",
"key": {
"user": 1
}
}, {
"name": "mailbox_uid_modseq",
"key": {

View file

@ -1,5 +1,7 @@
'use strict';
const config = require('config');
const tools = require('./tools');
const crypto = require('crypto');
const EventEmitter = require('events').EventEmitter;
const redis = require('redis');
@ -10,7 +12,8 @@ class ImapNotifier extends EventEmitter {
super();
this.database = options.database;
this.publisher = redis.createClient();
this.publisher = redis.createClient(tools.redisConfig(config.redis));
let logfunc = (...args) => {
let level = args.shift() || 'DEBUG';
@ -30,7 +33,7 @@ class ImapNotifier extends EventEmitter {
return;
}
this.subsriber = redis.createClient();
this.subsriber = redis.createClient(tools.redisConfig(config.redis));
this._listeners = new EventEmitter();
this._listeners.setMaxListeners(0);

View file

@ -6,6 +6,7 @@ const ObjectID = require('mongodb').ObjectID;
const RedFour = require('redfour');
const Indexer = require('../imap-core/lib/indexer/indexer');
const ImapNotifier = require('./imap-notifier');
const tools = require('./tools');
class MessageHandler {
@ -19,7 +20,7 @@ class MessageHandler {
pushOnly: true
});
this.redlock = new RedFour({
redis: config.redis,
redis: tools.redisConfig(config.redis),
namespace: 'wildduck'
});
}
@ -117,9 +118,7 @@ class MessageHandler {
// allocate bot UID and MODSEQ values so when journal is later sorted by
// modseq then UIDs are always in ascending order
uidNext: 1,
modifyIndex: 1,
storageUsed: size,
messages: 1
modifyIndex: 1
}
}, (err, item) => {
if (err) {
@ -144,14 +143,20 @@ class MessageHandler {
let message = {
_id: id,
mailbox: mailbox._id,
user: mailbox.user,
uid: mailbox.uidNext,
modseq: mailbox.modifyIndex + 1,
internaldate,
headerdate,
flags: [].concat(options.flags || []),
size,
meta: options.meta || {},
modseq: mailbox.modifyIndex + 1,
mimeTree,
envelope,
bodystructure,
@ -160,14 +165,7 @@ class MessageHandler {
this.database.collection('messages').insertOne(message, err => {
if (err) {
return this.database.collection('mailboxes').findOneAndUpdate({
_id: mailbox._id
}, {
$inc: {
storageUsed: -size,
messages: -1
}
}, () => rollback(err));
return rollback(err);
}
let uidValidity = mailbox.uidValidity;

View file

@ -25,6 +25,46 @@ function normalizeAddress(address, withNames) {
return addr;
}
// returns a redis config object with a retry strategy
function redisConfig(defaultConfig) {
let response = {};
if (typeof defaultConfig === 'string') {
defaultConfig = {
url: defaultConfig
};
}
Object.keys(defaultConfig || {}).forEach(key => {
response[key] = defaultConfig[key];
});
if (!response.hasOwnProperty('retry_strategy')) {
response.retry_strategy = options => {
if (options.error && options.error.code === 'ECONNREFUSED') {
// End reconnecting on a specific error and flush all commands with a individual error
return new Error('The server refused the connection');
}
if (options.total_retry_time > 1000 * 60 * 60) {
// End reconnecting after a specific timeout and flush all commands with a individual error
return new Error('Retry time exhausted');
}
if (options.attempt > 10) {
// End reconnecting with built in error
return undefined; // eslint-disable-line no-undefined
}
// reconnect after
return Math.min(options.attempt * 100, 3000);
};
}
return response;
}
module.exports = {
normalizeAddress
normalizeAddress,
redisConfig
};

View file

@ -1,6 +1,6 @@
{
"name": "wildduck",
"version": "1.0.6",
"version": "1.0.7",
"description": "IMAP server built with Node.js and MongoDB",
"main": "server.js",
"scripts": {
@ -16,11 +16,9 @@
"grunt-cli": "^1.2.0",
"grunt-eslint": "^19.0.0",
"grunt-mocha-test": "^0.13.2",
"mocha": "^3.2.0",
"nodemailer": "^3.1.8"
"mocha": "^3.2.0"
},
"dependencies": {
"addressparser": "^1.0.1",
"bcryptjs": "^2.4.3",
"clone": "^2.1.1",
"config": "^1.25.1",
@ -30,7 +28,7 @@
"libmime": "^3.1.0",
"mailparser": "^2.0.2",
"mongodb": "^2.2.25",
"nodemailer-fetch": "^2.1.0",
"nodemailer": "^3.1.8",
"npmlog": "^4.0.2",
"redfour": "^1.0.0",
"redis": "^2.7.1",

12
smtp.js
View file

@ -43,13 +43,23 @@ const server = new SMTPServer({
// Accept messages up to 10 MB
size: maxMessageSize,
onMailFrom(address, session, callback) {
// reset session entries
session.users = new Map();
session.maxAllowedStorage = maxStorage;
// accept sender address
return callback();
},
// Validate RCPT TO envelope address. Example allows all addresses that do not start with 'deny'
// If this method is not set, all addresses are allowed
onRcptTo(rcpt, session, callback) {
let originalRecipient = tools.normalizeAddress(rcpt.address);
let recipient = originalRecipient.replace(/\+[^@]*@/, '@');
if (session.users && session.users.has(recipient)) {
if (session.users.has(recipient)) {
return callback();
}