From 2aa4f7914203818cb3e750e2d78cab0bcb7c3a40 Mon Sep 17 00:00:00 2001 From: Andris Reinman Date: Wed, 29 Mar 2017 20:06:09 +0300 Subject: [PATCH] Changed quota handling --- README.md | 31 +- api.js | 139 ++++- examples/push-message.js | 9 +- imap-core/examples/index.js | 543 ------------------- imap-core/index.js | 2 - imap-core/lib/commands/capability.js | 2 + imap-core/lib/commands/move.js | 74 +++ imap-core/lib/imap-command.js | 2 + imap-core/lib/indexer/parse-mime-tree.js | 2 +- imap-core/lib/memory-notifier/index.js | 150 ----- imap-core/lib/redis-notifier/add-entries.lua | 22 - imap-core/lib/redis-notifier/index.js | 244 --------- imap.js | 99 ++-- indexes.json | 5 + lib/imap-notifier.js | 7 +- lib/message-handler.js | 24 +- lib/tools.js | 42 +- package.json | 8 +- smtp.js | 12 +- 19 files changed, 367 insertions(+), 1050 deletions(-) delete mode 100644 imap-core/examples/index.js create mode 100644 imap-core/lib/commands/move.js delete mode 100644 imap-core/lib/memory-notifier/index.js delete mode 100644 imap-core/lib/redis-notifier/add-entries.lua delete mode 100644 imap-core/lib/redis-notifier/index.js diff --git a/README.md b/README.md index e6217b1..6510511 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,7 @@ Wild Duck IMAP server supports the following IMAP standards: ### Does it work? -Yes, it does. You can run the server and get a working IMAP server for mail store, SMTP server for pushing messages to the mail store and HTTP API server to create new users. All handled by Node.js, MongoDB and Redis, no additional dependencies needed. +Yes, it does. You can run the server and get a working IMAP server for mail store, SMTP server for pushing messages to the mail store and HTTP API server to create new users. All handled by Node.js, MongoDB and Redis, no additional dependencies needed. The IMAP server hosting уайлддак.орг uses a MongoDB replica set of 3 hosts. ### What are the killer features? @@ -217,6 +217,35 @@ The response for successful operation should look like this: Quota changes apply immediately. +### POST /user/quota/reset + +Recalculates used storage for an user. Use this when it seems that quota counters for an user do not match with reality. + +Arguments + +- **username** is the username of the user to check + +**Example** + +``` +curl -XPOST "http://localhost:8080/user/quota/reset" -H 'content-type: application/json' -d '{ + "username": "testuser" +}' +``` + +The response for successful operation should look like this: + +```json +{ + "success": true, + "username": "testuser", + "previousStorageUsed": 1000, + "storageUsed": 800 +} +``` + +Be aware though that this method is not atomic and should be done only if quota counters are way off. + ### POST /user/password Updates password for an user diff --git a/api.js b/api.js index be72ccc..ac796f2 100644 --- a/api.js +++ b/api.js @@ -82,7 +82,6 @@ server.post('/user/create', (req, res, next) => { password: hash, address: false, storageUsed: 0, - messages: 0, quota, created: new Date() }, (err, result) => { @@ -104,8 +103,6 @@ server.post('/user/create', (req, res, next) => { uidValidity, uidNext: 1, modifyIndex: 0, - storageUsed: 0, - messages: 0, subscribed: true }, { user, @@ -114,8 +111,6 @@ server.post('/user/create', (req, res, next) => { uidValidity, uidNext: 1, modifyIndex: 0, - storageUsed: 0, - messages: 0, subscribed: true }, { user, @@ -124,8 +119,6 @@ server.post('/user/create', (req, res, next) => { uidValidity, uidNext: 1, modifyIndex: 0, - storageUsed: 0, - messages: 0, subscribed: true }, { user, @@ -134,8 +127,6 @@ server.post('/user/create', (req, res, next) => { uidValidity, uidNext: 1, modifyIndex: 0, - storageUsed: 0, - messages: 0, subscribed: true }], { w: 1, @@ -339,6 +330,117 @@ server.post('/user/quota', (req, res, next) => { }); }); +server.post('/user/quota/reset', (req, res, next) => { + res.charSet('utf-8'); + + const schema = Joi.object().keys({ + username: Joi.string().alphanum().lowercase().min(3).max(30).required() + }); + + const result = Joi.validate({ + username: req.params.username + }, schema, { + abortEarly: false, + convert: true, + allowUnknown: true + }); + + if (result.error) { + res.json({ + error: result.error.message + }); + return next(); + } + + let username = result.value.username; + + db.database.collection('users').findOne({ + username + }, (err, user) => { + if (err) { + res.json({ + error: 'MongoDB Error: ' + err.message, + username + }); + return next(); + } + + if (!user) { + res.json({ + error: 'This user does not exist', + username + }); + return next(); + } + + + // calculate mailbox size by aggregating the size's of all messages + db.database.collection('messages').aggregate([{ + $match: { + user: user._id + } + }, { + $group: { + _id: { + user: '$user' + }, + storageUsed: { + $sum: '$size' + } + } + }], { + cursor: { + batchSize: 1 + } + }).toArray((err, result) => { + if (err) { + res.json({ + error: 'MongoDB Error: ' + err.message, + username + }); + return next(); + } + + let storageUsed = result && result[0] && result[0].storageUsed || 0; + + // update quota counter + db.database.collection('users').findOneAndUpdate({ + _id: user._id + }, { + $set: { + storageUsed: Number(storageUsed) || 0 + } + }, { + returnOriginal: false + }, (err, result) => { + if (err) { + res.json({ + error: 'MongoDB Error: ' + err.message, + username + }); + return next(); + } + + if (!result || !result.value) { + res.json({ + error: 'This user does not exist', + username + }); + return next(); + } + + res.json({ + success: true, + username, + previousStorageUsed: Number(result.value.storageUsed) || 0, + storageUsed: user.storageUsed + }); + return next(); + }); + }); + }); +}); + server.post('/user/password', (req, res, next) => { res.charSet('utf-8'); @@ -531,30 +633,33 @@ server.get('/user/mailboxes', (req, res, next) => { mailboxes = []; } + let priority = { + Inbox: 1, + Sent: 2, + Junk: 3, + Trash: 4 + }; + res.json({ success: true, username, mailboxes: mailboxes.map(mailbox => ({ id: mailbox._id.toString(), path: mailbox.path, - special: mailbox.path === 'INBOX' ? 'Inbox' : (mailbox.specialUse ? mailbox.specialUse.replace(/^\\/, '') : false), - messages: mailbox.messages + special: mailbox.path === 'INBOX' ? 'Inbox' : (mailbox.specialUse ? mailbox.specialUse.replace(/^\\/, '') : false) })).sort((a, b) => { if (a.special && !b.special) { return -1; } + if (b.special && !a.special) { return 1; } - let priority = { - Inbox: 1, - Sent: 2, - Junk: 3, - Trash: 4 - }; + if (a.special && b.special) { return (priority[a.special] || 5) - (priority[b.special] || 5); } + return a.path.localeCompare(b.path); }) }); diff --git a/examples/push-message.js b/examples/push-message.js index 6a9acbb..8ed5bb1 100644 --- a/examples/push-message.js +++ b/examples/push-message.js @@ -21,9 +21,8 @@ const transporter = nodemailer.createTransport({ debug: false }); - let sent = 0; -let total = 1; +let total = 10000; let startTime = Date.now(); function send() { @@ -75,10 +74,14 @@ function send() { if (sent >= total) { console.log('Sent %s messages in %s s', sent, (Date.now() - startTime) / 1000); return transporter.close(); + }else { + send(); } }); } - +send(); +/* for (let i = 0; i < total; i++) { send(); } +*/ diff --git a/imap-core/examples/index.js b/imap-core/examples/index.js deleted file mode 100644 index d20048a..0000000 --- a/imap-core/examples/index.js +++ /dev/null @@ -1,543 +0,0 @@ -'use strict'; - -// Replace '../index' with 'imap-core' when running this script outside this directory - -let IMAPServerModule = require('../index'); -let IMAPServer = IMAPServerModule.IMAPServer; -let MemoryNotifier = IMAPServerModule.MemoryNotifier; - -const SERVER_PORT = 9993; -const SERVER_HOST = '127.0.0.1'; - -// Connect to this example server by running -// openssl s_client -crlf -connect localhost:9993 -// Username is "testuser" and password is "pass" - -// This example uses global folders and subscriptions -let folders = new Map(); -let subscriptions = new WeakSet(); - -// configure initial mailbox state -[ - // INBOX - { - path: 'INBOX', - uidValidity: 123, - uidNext: 70, - modifyIndex: 6, - messages: [{ - uid: 45, - flags: [], - date: new Date(), - modseq: 1, - raw: Buffer.from('from: sender@example.com\r\nto: to@example.com\r\ncc: cc@example.com\r\nsubject: test') - }, { - uid: 49, - flags: ['\\Seen'], - date: new Date(), - modseq: 2 - }, { - uid: 50, - flags: ['\\Seen'], - date: new Date(), - modseq: 3 - }, { - uid: 52, - flags: [], - date: new Date(), - modseq: 4 - }, { - uid: 53, - flags: [], - date: new Date(), - modseq: 5 - }, { - uid: 60, - flags: [], - date: new Date(), - modseq: 6 - }], - journal: [] - }, - // [Gmail]/Sent Mail - { - path: '[Gmail]/Sent Mail', - specialUse: '\\Sent', - uidValidity: 123, - uidNext: 90, - modifyIndex: 0, - messages: [], - journal: [] - } -].forEach(folder => { - folders.set(folder.path, folder); - subscriptions.add(folder); -}); - -// Setup server -let server = new IMAPServer({ - secure: true, - id: { - name: 'test' - } -}); - -// setup notification system for updates -server.notifier = new MemoryNotifier({ - folders -}); - -server.onAuth = function (login, session, callback) { - if (login.username !== 'testuser' || login.password !== 'pass') { - return callback(); - } - - callback(null, { - user: { - username: login.username - } - }); -}; - -// LIST "" "*" -// Returns all folders, query is informational -// folders is either an Array or a Map -server.onList = function (query, session, callback) { - this.logger.debug('[%s] LIST for "%s"', session.id, query); - - callback(null, folders); -}; - -// LSUB "" "*" -// Returns all subscribed folders, query is informational -// folders is either an Array or a Map -server.onLsub = function (query, session, callback) { - this.logger.debug('[%s] LSUB for "%s"', session.id, query); - - let subscribed = []; - folders.forEach(folder => { - if (subscriptions.has(folder)) { - subscribed.push(folder); - } - }); - - callback(null, subscribed); -}; - -// SUBSCRIBE "path/to/mailbox" -server.onSubscribe = function (mailbox, session, callback) { - this.logger.debug('[%s] SUBSCRIBE to "%s"', session.id, mailbox); - - if (!folders.has(mailbox)) { - return callback(null, 'NONEXISTENT'); - } - - subscriptions.add(folders.get(mailbox)); - callback(null, true); -}; - -// UNSUBSCRIBE "path/to/mailbox" -server.onUnsubscribe = function (mailbox, session, callback) { - this.logger.debug('[%s] UNSUBSCRIBE from "%s"', session.id, mailbox); - - if (!folders.has(mailbox)) { - return callback(null, 'NONEXISTENT'); - } - - subscriptions.delete(folders.get(mailbox)); - callback(null, true); -}; - -// CREATE "path/to/mailbox" -server.onCreate = function (mailbox, session, callback) { - this.logger.debug('[%s] CREATE "%s"', session.id, mailbox); - - if (folders.has(mailbox)) { - return callback(null, 'ALREADYEXISTS'); - } - - folders.set(mailbox, { - path: mailbox, - uidValidity: Date.now(), - uidNext: 1, - modifyIndex: 0, - messages: [], - journal: [] - }); - - subscriptions.add(folders.get(mailbox)); - callback(null, true); -}; - -// RENAME "path/to/mailbox" "new/path" -// NB! RENAME affects child and hierarchy mailboxes as well, this example does not do this -server.onRename = function (mailbox, newname, session, callback) { - this.logger.debug('[%s] RENAME "%s" to "%s"', session.id, mailbox, newname); - - if (!folders.has(mailbox)) { - return callback(null, 'NONEXISTENT'); - } - - if (folders.has(newname)) { - return callback(null, 'ALREADYEXISTS'); - } - - let oldMailbox = folders.get(mailbox); - folders.delete(mailbox); - - oldMailbox.path = newname; - folders.set(newname, oldMailbox); - - callback(null, true); -}; - -// DELETE "path/to/mailbox" -server.onDelete = function (mailbox, session, callback) { - this.logger.debug('[%s] DELETE "%s"', session.id, mailbox); - - if (!folders.has(mailbox)) { - return callback(null, 'NONEXISTENT'); - } - - // keep SPECIAL-USE folders - if (folders.get(mailbox).specialUse) { - return callback(null, 'CANNOT'); - } - - folders.delete(mailbox); - callback(null, true); -}; - -// SELECT/EXAMINE -server.onOpen = function (mailbox, session, callback) { - this.logger.debug('[%s] Opening "%s"', session.id, mailbox); - - if (!folders.has(mailbox)) { - return callback(null, 'NONEXISTENT'); - } - - let folder = folders.get(mailbox); - - return callback(null, { - specialUse: folder.specialUse, - uidValidity: folder.uidValidity, - uidNext: folder.uidNext, - modifyIndex: folder.modifyIndex, - uidList: folder.messages.map(message => message.uid) - }); -}; - -// STATUS (X Y X) -server.onStatus = function (mailbox, session, callback) { - this.logger.debug('[%s] Requested status for "%s"', session.id, mailbox); - - if (!folders.has(mailbox)) { - return callback(null, 'NONEXISTENT'); - } - - let folder = folders.get(mailbox); - - return callback(null, { - messages: folder.messages.length, - uidNext: folder.uidNext, - uidValidity: folder.uidValidity, - unseen: folder.messages.filter(message => message.flags.indexOf('\\Seen') < 0).length - }); -}; - -// APPEND mailbox (flags) date message -server.onAppend = function (mailbox, flags, date, raw, session, callback) { - this.logger.debug('[%s] Appending message to "%s"', session.id, mailbox); - - if (!folders.has(mailbox)) { - return callback(null, 'TRYCREATE'); - } - - date = date && new Date(date) || new Date(); - - let folder = folders.get(mailbox); - let message = { - uid: folder.uidNext++, - date: date && new Date(date) || new Date(), - raw, - flags - }; - - folder.messages.push(message); - - // do not write directly to stream, use notifications as the currently selected mailbox might not be the one that receives the message - this.notifier.addEntries(session.user.username, mailbox, { - command: 'EXISTS', - uid: message.uid - }, () => { - this.notifier.fire(session.user.username, mailbox); - - return callback(null, true, { - uidValidity: folder.uidValidity, - uid: message.uid - }); - }); -}; - -// STORE / UID STORE, updates flags for selected UIDs -server.onStore = function (mailbox, update, session, callback) { - this.logger.debug('[%s] Updating messages in "%s"', session.id, mailbox); - - if (!folders.has(mailbox)) { - return callback(null, 'NONEXISTENT'); - } - - let folder = folders.get(mailbox); - let i = 0; - - let processMessages = () => { - if (i >= folder.messages.length) { - this.notifier.fire(session.user.username, mailbox); - return callback(null, true); - } - - let message = folder.messages[i++]; - let updated = false; - - if (update.messages.indexOf(message.uid) < 0) { - return processMessages(); - } - - switch (update.action) { - case 'set': - // check if update set matches current or is different - if (message.flags.length !== update.value.length || update.value.filter(flag => message.flags.indexOf(flag) < 0).length) { - updated = true; - } - // set flags - message.flags = [].concat(update.value); - break; - - case 'add': - message.flags = message.flags.concat(update.value.filter(flag => { - if (message.flags.indexOf(flag) < 0) { - updated = true; - return true; - } - return false; - })); - break; - - case 'remove': - message.flags = message.flags.filter(flag => { - if (update.value.indexOf(flag) < 0) { - return true; - } - updated = true; - return false; - }); - break; - } - - // Onlsy show response if not silent - if (!update.silent) { - session.writeStream.write(session.formatResponse('FETCH', message.uid, { - uid: update.isUid ? message.uid : false, - flags: message.flags - })); - } - - // notifiy other clients only if something changed - if (updated) { - this.notifier.addEntries(session.user.username, mailbox, { - command: 'FETCH', - ignore: session.id, - uid: message.uid, - flags: message.flags - }, processMessages); - } else { - processMessages(); - } - }; - - processMessages(); -}; - -// EXPUNGE deletes all messages in selected mailbox marked with \Delete -// EXPUNGE deletes all messages in selected mailbox marked with \Delete -server.onExpunge = function (mailbox, update, session, callback) { - this.logger.debug('[%s] Deleting messages from "%s"', session.id, mailbox); - - if (!folders.has(mailbox)) { - return callback(null, 'NONEXISTENT'); - } - - let folder = folders.get(mailbox); - let deleted = []; - let i, len; - - for (i = folder.messages.length - 1; i >= 0; i--) { - if ( - ( - (update.isUid && update.messages.indexOf(folder.messages[i].uid) >= 0) || - !update.isUid - ) && folder.messages[i].flags.indexOf('\\Deleted') >= 0) { - - deleted.unshift(folder.messages[i].uid); - folder.messages.splice(i, 1); - } - } - - let entries = []; - for (i = 0, len = deleted.length; i < len; i++) { - entries.push({ - command: 'EXPUNGE', - ignore: session.id, - uid: deleted[i] - }); - if (!update.silent) { - session.writeStream.write(session.formatResponse('EXPUNGE', deleted[i])); - } - } - - this.notifier.addEntries(session.user.username, mailbox, entries, () => { - this.notifier.fire(session.user.username, mailbox); - return callback(null, true); - }); -}; - -// COPY / UID COPY sequence mailbox -server.onCopy = function (mailbox, update, session, callback) { - this.logger.debug('[%s] Copying messages from "%s" to "%s"', session.id, mailbox, update.destination); - - if (!folders.has(mailbox)) { - return callback(null, 'NONEXISTENT'); - } - - if (!folders.has(update.destination)) { - return callback(null, 'TRYCREATE'); - } - - let sourceFolder = folders.get(mailbox); - let destinationFolder = folders.get(update.destination); - - let messages = []; - let sourceUid = []; - let destinationUid = []; - let i, len; - let entries = []; - - for (i = sourceFolder.messages.length - 1; i >= 0; i--) { - if (update.messages.indexOf(sourceFolder.messages[i].uid) >= 0) { - messages.unshift(JSON.parse(JSON.stringify(sourceFolder.messages[i]))); - sourceUid.unshift(sourceFolder.messages[i].uid); - } - } - - for (i = 0, len = messages.length; i < len; i++) { - messages[i].uid = destinationFolder.uidNext++; - destinationUid.push(messages[i].uid); - destinationFolder.messages.push(messages[i]); - - // do not write directly to stream, use notifications as the currently selected mailbox might not be the one that receives the message - entries.push({ - command: 'EXISTS', - uid: messages[i].uid - }); - } - - this.notifier.addEntries(update.destination, session.user.username, entries, () => { - this.notifier.fire(update.destination, session.user.username); - - return callback(null, true, { - uidValidity: destinationFolder.uidValidity, - sourceUid, - destinationUid - }); - }); -}; - -// sends results to socket -server.onFetch = function (mailbox, options, session, callback) { - this.logger.debug('[%s] Requested FETCH for "%s"', session.id, mailbox); - if (!folders.has(mailbox)) { - return callback(null, 'NONEXISTENT'); - } - - let folder = folders.get(mailbox); - let entries = []; - - if (options.markAsSeen) { - // mark all matching messages as seen - folder.messages.forEach(message => { - if (options.messages.indexOf(message.uid) < 0) { - return; - } - - // if BODY[] is touched, then add \Seen flag and notify other clients - if (message.flags.indexOf('\\Seen') < 0) { - message.flags.unshift('\\Seen'); - entries.push({ - command: 'FETCH', - ignore: session.id, - uid: message.uid, - flags: message.flags - }); - } - - }); - } - - this.notifier.addEntries(session.user.username, mailbox, entries, () => { - - folder.messages.forEach(message => { - if (options.messages.indexOf(message.uid) < 0) { - return; - } - // send formatted response to socket - session.writeStream.write(session.formatResponse('FETCH', message.uid, { - query: options.query, - values: session.getQueryResponse(options.query, message) - })); - }); - - // once messages are processed show relevant updates - this.notifier.fire(session.user.username, mailbox); - - callback(null, true); - - }); -}; - -// returns an array of matching UID values -server.onSearch = function (mailbox, options, session, callback) { - if (!folders.has(mailbox)) { - return callback(null, 'NONEXISTENT'); - } - - let folder = folders.get(mailbox); - let highestModseq = 0; - - let uidList = folder.messages.filter(message => { - let match = session.matchSearchQuery(message, options.query); - if (match && highestModseq < message.modseq) { - highestModseq = message.modseq; - } - return match; - }).map(message => message.uid); - - callback(null, { - uidList, - highestModseq - }); -}; - -// ------- - -server.on('error', err => { - console.log('Error occurred\n%s', err.stack); // eslint-disable-line no-console -}); - -process.on('SIGINT', () => { - server.close(() => { - process.exit(); - }); -}); - -// start listening -server.listen(SERVER_PORT, SERVER_HOST); diff --git a/imap-core/index.js b/imap-core/index.js index 0caa9c1..c98723f 100644 --- a/imap-core/index.js +++ b/imap-core/index.js @@ -1,6 +1,4 @@ 'use strict'; module.exports.IMAPServer = require('./lib/imap-server').IMAPServer; -module.exports.RedisNotifier = require('./lib/redis-notifier'); -module.exports.MemoryNotifier = require('./lib/memory-notifier'); module.exports.imapHandler = require('./lib/handler/imap-handler'); diff --git a/imap-core/lib/commands/capability.js b/imap-core/lib/commands/capability.js index 42da2c4..715186d 100644 --- a/imap-core/lib/commands/capability.js +++ b/imap-core/lib/commands/capability.js @@ -30,6 +30,8 @@ module.exports = { capabilities.push('UTF8=ACCEPT'); capabilities.push('QUOTA'); + // capabilities.push('MOVE'); + if (this._server.options.maxMessage) { capabilities.push('APPENDLIMIT=' + this._server.options.maxMessage); } diff --git a/imap-core/lib/commands/move.js b/imap-core/lib/commands/move.js new file mode 100644 index 0000000..53a507a --- /dev/null +++ b/imap-core/lib/commands/move.js @@ -0,0 +1,74 @@ +'use strict'; + +let imapTools = require('../imap-tools'); +let imapHandler = require('../handler/imap-handler'); + +module.exports = { + state: 'Selected', + + schema: [{ + name: 'range', + type: 'sequence' + }, { + name: 'mailbox', + type: 'string' + }], + + handler(command, callback) { + + let cmd = (command.command || '').toString().toUpperCase(); + + // Check if MOVE method is set + if (typeof this._server.onMove !== 'function') { + return callback(null, { + response: 'NO', + message: cmd + ' not implemented' + }); + } + + let range = command.attributes[0] && command.attributes[0].value || ''; + let path = Buffer.from(command.attributes[1] && command.attributes[1].value || '', 'binary').toString(); + let mailbox = imapTools.normalizeMailbox(path, !this.acceptUTF8Enabled); + + if (!mailbox) { + return callback(new Error('Invalid mailbox argument for ' + cmd)); + } + + if (!imapTools.validateSequnce(range)) { + return callback(new Error('Invalid sequence set for ' + cmd)); + } + + let messages = imapTools.getMessageRange(this.selected.uidList, range, cmd === 'UID MOVE'); + + this._server.onMove(this.selected.mailbox, { + destination: mailbox, + messages + }, this.session, (err, success, info) => { + if (err) { + return callback(err); + } + + let code = typeof success === 'string' ? success.toUpperCase() : false; + + if (success === true) { + this.send(imapHandler.compiler({ + tag: '*', + command: 'OK', + attributes: [{ + type: 'SECTION', + section: [{ + type: 'TEXT', + value: 'COPYUID ' + info.uidValidity + ' ' + imapTools.packMessageRange(info.sourceUid) + ' ' + imapTools.packMessageRange(info.destinationUid) + }] + }] + })); + } + + callback(null, { + response: success === true ? 'OK' : 'NO', + code + }); + + }); + } +}; diff --git a/imap-core/lib/imap-command.js b/imap-core/lib/imap-command.js index 5a87bb0..0bc6c7b 100644 --- a/imap-core/lib/imap-command.js +++ b/imap-core/lib/imap-command.js @@ -36,6 +36,8 @@ let commands = new Map([ ['UNSELECT', require('./commands/unselect')], ['COPY', require('./commands/copy')], ['UID COPY', require('./commands/copy')], + ['MOVE', require('./commands/move')], + ['UID MOVE', require('./commands/move')], ['FETCH', require('./commands/fetch')], ['UID FETCH', require('./commands/fetch')], ['SEARCH', require('./commands/search')], diff --git a/imap-core/lib/indexer/parse-mime-tree.js b/imap-core/lib/indexer/parse-mime-tree.js index a1273d0..a6b40c7 100644 --- a/imap-core/lib/indexer/parse-mime-tree.js +++ b/imap-core/lib/indexer/parse-mime-tree.js @@ -1,6 +1,6 @@ 'use strict'; -let addressparser = require('addressparser'); +let addressparser = require('nodemailer/lib/addressparser'); /** * Parses a RFC822 message into a structured object (JSON compatible) diff --git a/imap-core/lib/memory-notifier/index.js b/imap-core/lib/memory-notifier/index.js deleted file mode 100644 index 331f755..0000000 --- a/imap-core/lib/memory-notifier/index.js +++ /dev/null @@ -1,150 +0,0 @@ -'use strict'; - -let crypto = require('crypto'); -let EventEmitter = require('events').EventEmitter; - -// Expects that the folder listing is a Map - -class MemoryNotifier extends EventEmitter { - - constructor(options) { - super(); - this.folders = options.folders || new Map(); - - let logfunc = (...args) => { - let level = args.shift() || 'DEBUG'; - let message = args.shift() || ''; - - console.log([level].concat(message || '').join(' '), ...args); // eslint-disable-line no-console - }; - - this.logger = options.logger || { - info: logfunc.bind(null, 'INFO'), - debug: logfunc.bind(null, 'DEBUG'), - error: logfunc.bind(null, 'ERROR') - }; - - this._listeners = new EventEmitter(); - this._listeners.setMaxListeners(0); - - EventEmitter.call(this); - } - - /** - * Generates hashed event names for mailbox:username pairs - * - * @param {String} mailbox - * @param {String} username - * @returns {String} md5 hex - */ - _eventName(mailbox, username) { - return crypto.createHash('md5').update(username + ':' + mailbox).digest('hex'); - } - - /** - * Registers an event handler for mailbox:username events - * - * @param {String} username - * @param {String} mailbox - * @param {Function} handler Function to run once there are new entries in the journal - */ - addListener(session, mailbox, handler) { - let eventName = this._eventName(session.user.username, mailbox); - this._listeners.addListener(eventName, handler); - - this.logger.debug('New journal listener for %s ("%s:%s")', eventName, session.user.username, mailbox); - } - - /** - * Unregisters an event handler for mailbox:username events - * - * @param {String} username - * @param {String} mailbox - * @param {Function} handler Function to run once there are new entries in the journal - */ - removeListener(session, mailbox, handler) { - let eventName = this._eventName(session.user.username, mailbox); - this._listeners.removeListener(eventName, handler); - - this.logger.debug('Removed journal listener from %s ("%s:%s")', eventName, session.user.username, mailbox); - } - - /** - * Stores multiple journal entries to db - * - * @param {String} username - * @param {String} mailbox - * @param {Array|Object} entries An array of entries to be journaled - * @param {Function} callback Runs once the entry is either stored or an error occurred - */ - addEntries(username, mailbox, entries, callback) { - let folder = this.folders.get(mailbox); - - if (!folder) { - return callback(null, new Error('Selected mailbox does not exist')); - } - - if (entries && !Array.isArray(entries)) { - entries = [entries]; - } else if (!entries || !entries.length) { - return callback(null, false); - } - - // store entires in the folder object - if (!folder.journal) { - folder.journal = []; - } - - entries.forEach(entry => { - entry.modseq = ++folder.modifyIndex; - folder.journal.push(entry); - }); - - setImmediate(callback); - } - - /** - * Sends a notification that there are new updates in the selected mailbox - * - * @param {String} username - * @param {String} mailbox - */ - fire(username, mailbox, payload) { - let eventName = this._eventName(username, mailbox); - setImmediate(() => { - this._listeners.emit(eventName, payload); - }); - } - - /** - * Returns all entries from the journal that have higher than provided modification index - * - * @param {String} username - * @param {String} mailbox - * @param {Number} modifyIndex Last known modification id - * @param {Function} callback Returns update entries as an array - */ - getUpdates(session, mailbox, modifyIndex, callback) { - modifyIndex = Number(modifyIndex) || 0; - - if (!this.folders.has(mailbox)) { - return callback(null, 'NONEXISTENT'); - } - - let folder = this.folders.get(mailbox); - let minIndex = folder.journal.length; - - for (let i = folder.journal.length - 1; i >= 0; i--) { - if (folder.journal[i].modseq > modifyIndex) { - minIndex = i; - } else { - break; - } - } - - return callback(null, folder.journal.slice(minIndex)); - } - -} - -module.exports = MemoryNotifier; diff --git a/imap-core/lib/redis-notifier/add-entries.lua b/imap-core/lib/redis-notifier/add-entries.lua deleted file mode 100644 index 6b4e4df..0000000 --- a/imap-core/lib/redis-notifier/add-entries.lua +++ /dev/null @@ -1,22 +0,0 @@ --- inserts JSON values to a sorted set where score value is an incrementing number resolved from a KEYS[1].modifyIndex - --- check if the mailbox even exists -if not redis.call('exists', KEYS[1]) then - return {err='Selected mailbox does not exist'} -end; - -local len = table.getn(ARGV); - --- do a single increment to get id values for all elements instead of incrementing it one by one -local score = redis.call('hincrby', KEYS[1], 'modifyIndex', len) - len; - -for i = 1, #ARGV do - -- we include modification index in the stored value to ensure that all values are always unique - -- otherwise adding new element with the same data does not insert a new entry but overrides - -- an existing one - - redis.call('zadd', KEYS[2], score + i, tostring(score + i) .. ':' .. ARGV[i]); -end; - --- return the largest modification index -return score + len; \ No newline at end of file diff --git a/imap-core/lib/redis-notifier/index.js b/imap-core/lib/redis-notifier/index.js deleted file mode 100644 index 849c0a7..0000000 --- a/imap-core/lib/redis-notifier/index.js +++ /dev/null @@ -1,244 +0,0 @@ -'use strict'; - -let redis = require('redis'); -let EventEmitter = require('events').EventEmitter; -let crypto = require('crypto'); -let fs = require('fs'); -let scripts = { - addEntries: fs.readFileSync(__dirname + '/add-entries.lua') -}; - -// Assumes that there are following hash keys in Redis: -// u:[username]:folder:[md5(path)] -// with the following key: -// modifyIndex: Number - -class RedisNotifier extends EventEmitter { - - constructor(options) { - super(); - - options = options || {}; - - this.options = { - port: options.port || 6379, - host: options.host || 'localhost', - db: options.db || 0, - prefix: options.prefix || 'imap:' - }; - - let logfunc = (...args) => { - let level = args.shift() || 'DEBUG'; - let message = args.shift() || ''; - - console.log([level].concat(message || '').join(' '), ...args); // eslint-disable-line no-console - }; - - this.logger = options.logger || { - info: logfunc.bind(null, 'INFO'), - debug: logfunc.bind(null, 'DEBUG'), - error: logfunc.bind(null, 'ERROR') - }; - - // we need two db connections as subscriber can't manage data - this._db = redis.createClient(options.port, options.host); - this._subscriber = redis.createClient(options.port, options.host); - - this._pubsubListeners = new Map(); - this._listeners = new EventEmitter(); - this._listeners.setMaxListeners(0); - - this._subscriber.on('message', (channel, message) => { - try { - message = JSON.parse(message); - } catch (E) { - // ignore - } - this.logger.debug( - 'Journal update notification for %s, updating %s subscribers', - channel.slice(this.options.prefix.length), - this._listeners.listenerCount(channel.slice(this.options.prefix.length))); - - this._listeners.emit(channel.slice(this.options.prefix.length), message); - }); - - EventEmitter.call(this); - } - - /** - * Generates hashed event names for mailbox:username pairs - * - * @param {String} username - * @param {String} mailbox - * @returns {String} md5 hex - */ - _eventName(username, mailbox) { - return crypto.createHash('md5').update(username + ':' + mailbox).digest('hex'); - } - - /** - * Registers an event handler for mailbox:username events - * - * @param {String} username - * @param {String} mailbox - * @param {Function} handler Function to run once there are new entries in the journal - */ - addListener(session, mailbox, handler) { - let eventName = this._eventName(session.user.username, mailbox); - this._listeners.addListener(eventName, handler); - - if (!this._pubsubListeners.has(eventName)) { - this._pubsubListeners.set(eventName, 1); - this._subscriber.subscribe(this.options.prefix + eventName); - } else { - this._pubsubListeners.set(eventName, this._pubsubListeners.get(eventName) + 1); - } - - this.logger.debug('New journal listener for %s ("%s:%s", total %s subscribers)', eventName, session.user.username, mailbox, this._listeners.listenerCount(eventName)); - } - - /** - * Unregisters an event handler for mailbox:username events - * - * @param {String} username - * @param {String} mailbox - * @param {Function} handler Function to run once there are new entries in the journal - */ - removeListener(session, mailbox, handler) { - let count, eventName = this._eventName(session.user.username, mailbox); - this._listeners.removeListener(eventName, handler); - - if (this._pubsubListeners.has(eventName) && (count = this._pubsubListeners.get(eventName)) && count > 0) { - count--; - if (!count) { - this._subscriber.unsubscribe(this.options.prefix + eventName); - this._pubsubListeners.delete(eventName); - } else { - this._pubsubListeners.set(eventName, 1); - } - - this.logger.debug('Removed journal listener from %s ("%s:%s", total %s subscribers)', eventName, session.user.username, mailbox, this._listeners.listenerCount(eventName)); - } - } - - /** - * Stores multiple journal entries to db - * - * @param {String} username - * @param {String} mailbox - * @param {Array|Object} entries An array of entries to be journaled - * @param {Function} callback Runs once the entry is either stored or an error occurred - */ - addEntries(username, mailbox, entries, callback) { - let mailboxHash = crypto.createHash('md5').update(mailbox).digest('hex'); - - if (entries && !Array.isArray(entries)) { - entries = [entries]; - } else if (!entries || !entries.length) { - return callback(null, false); - } - - entries = entries.map(entry => JSON.stringify(entry)); - - this.logger.debug('Adding journal entries for %s (%s)\n%s', mailbox, mailboxHash, entries.join('\n')); - - this._db.multi(). - select(this.options.db). - eval([ - scripts.addEntries, - 2, - 'u:' + username + ':folder:' + mailboxHash, - 'u:' + username + ':journal:' + mailboxHash - ].concat(entries)). - exec(err => { - if (err) { - return callback(err); - } - - return callback(null, true); - }); - } - - /** - * Sends a notification that there are new updates in the selected mailbox - * - * @param {String} username - * @param {String} mailbox - */ - fire(username, mailbox, payload) { - let eventName = this._eventName(username, mailbox); - - payload = payload || false; - - setImmediate(this._db.publish.bind(this._db, this.options.prefix + eventName, JSON.stringify(payload))); - } - - /** - * Returns all entries from the journal that have higher than provided modification index - * - * @param {String} username - * @param {String} mailbox - * @param {Number} modifyIndex Last known modification id - * @param {Function} callback Returns update entries as an array - */ - getUpdates(session, mailbox, modifyIndex, callback) { - modifyIndex = Number(modifyIndex) || 0; - - let mailboxHash = crypto.createHash('md5').update(mailbox).digest('hex'); - let username = session.user.username; - - this._db.multi(). - select(this.options.db). - exists('u:' + username + ':journal:' + mailboxHash). - zrangebyscore('u:' + username + ':journal:' + mailboxHash, modifyIndex + 1, Infinity). - exec((err, replies) => { - let updates; - - this.logger.debug('[%s] Loaded journal updates for "%s:%s" since %s', session.id, username, mailbox, modifyIndex + 1); - - if (err) { - return callback(err); - } - if (!replies || !replies[1]) { - return callback(null, 'NONEXISTENT'); - } - - updates = (replies[2] || []). - map(entry => { - let data; - let m = (entry || '').toString().match(/^(\d+)\:/); - - if (!m) { - // invalidly formatted entry - this.logger.debug('[%s] Invalidly formatted entry for "%s:%s" (%s)', session.id, username, mailbox, (entry).toString()); - return false; - } - - try { - data = JSON.parse(entry.substr(m[0].length)); - data.modseq = Number(m[1]) || false; - // we mess around with json in redis lua but lua does not make - // a distinction between an object and an array, if an array - // is empty then it will be invalidly detected as an object - if (data.flags && !Array.isArray(data.flags)) { - data.flags = []; - } - } catch (E) { - this.logger.error('[%s] Failed parsing journal update for "%s:%s" (%s): %s', session.id, username, mailbox, entry.substr(m[0].length), E.message); - } - - return data; - }).filter(entry => - // only include entries with data - (entry && entry.uid) - ); - - this.logger.debug('[%s] Processing journal updates for "%s:%s": %s', session.id, username, mailbox, JSON.stringify(updates)); - - callback(null, updates); - }); - } - -} - -module.exports = RedisNotifier; diff --git a/imap.js b/imap.js index 322d9c9..4d5c6e7 100644 --- a/imap.js +++ b/imap.js @@ -157,8 +157,6 @@ server.onCreate = function (path, session, callback) { uidValidity: Math.floor(Date.now() / 1000), uidNext: 1, modifyIndex: 0, - storageUsed: 0, - messages: 0, subscribed: true }; @@ -239,23 +237,54 @@ server.onDelete = function (path, session, callback) { return callback(err); } - // decrement quota counters - db.database.collection('users').findOneAndUpdate({ - _id: mailbox.user - }, { - $inc: { - storageUsed: -Number(mailbox.storageUsed) || 0, - messages: -Number(mailbox.messages) || 0 - } - }, () => { - db.database.collection('journal').deleteMany({ + // calculate mailbox size by aggregating the size's of all messages + db.database.collection('messages').aggregate([{ + $match: { mailbox: mailbox._id - }, err => { - if (err) { - return callback(err); + } + }, { + $group: { + _id: { + mailbox: '$mailbox' + }, + storageUsed: { + $sum: '$size' } - callback(null, true); - }); + } + }], { + cursor: { + batchSize: 1 + } + }).toArray((err, res) => { + if (err) { + return callback(err); + } + + let storageUsed = res && res[0] && res[0].storageUsed || 0; + + let done = () => { + db.database.collection('journal').deleteMany({ + mailbox: mailbox._id + }, err => { + if (err) { + return callback(err); + } + callback(null, true); + }); + }; + + if (!storageUsed) { + return done(); + } + + // decrement quota counters + db.database.collection('users').findOneAndUpdate({ + _id: mailbox.user + }, { + $inc: { + storageUsed: -Number(storageUsed) || 0 + } + }, done); }); }); }); @@ -553,23 +582,13 @@ server.onExpunge = function (path, update, session, callback) { return next(); } - db.database.collection('mailboxes').findOneAndUpdate({ - _id: mailbox._id + db.database.collection('users').findOneAndUpdate({ + _id: mailbox.user }, { $inc: { - storageUsed: -deletedStorage, - messages: -deletedMessages + storageUsed: -deletedStorage } - }, () => { - db.database.collection('users').findOneAndUpdate({ - _id: mailbox.user - }, { - $inc: { - storageUsed: -deletedStorage, - messages: -deletedMessages - } - }, next); - }); + }, next); }; let processNext = () => { @@ -680,23 +699,13 @@ server.onCopy = function (path, update, session, callback) { if (!copiedMessages) { return next(); } - db.database.collection('mailboxes').findOneAndUpdate({ - _id: target._id + db.database.collection('users').findOneAndUpdate({ + _id: mailbox.user }, { $inc: { - storageUsed: copiedStorage, - messages: copiedMessages + storageUsed: copiedStorage } - }, () => { - db.database.collection('users').findOneAndUpdate({ - _id: mailbox.user - }, { - $inc: { - storageUsed: copiedStorage, - messages: copiedMessages - } - }, next); - }); + }, next); }; let sourceUid = []; diff --git a/indexes.json b/indexes.json index 64b5b38..f244d39 100644 --- a/indexes.json +++ b/indexes.json @@ -54,6 +54,11 @@ "key": { "mailbox": 1 } + }, { + "name": "user_messages", + "key": { + "user": 1 + } }, { "name": "mailbox_uid_modseq", "key": { diff --git a/lib/imap-notifier.js b/lib/imap-notifier.js index fd09a69..20cd104 100644 --- a/lib/imap-notifier.js +++ b/lib/imap-notifier.js @@ -1,5 +1,7 @@ 'use strict'; +const config = require('config'); +const tools = require('./tools'); const crypto = require('crypto'); const EventEmitter = require('events').EventEmitter; const redis = require('redis'); @@ -10,7 +12,8 @@ class ImapNotifier extends EventEmitter { super(); this.database = options.database; - this.publisher = redis.createClient(); + this.publisher = redis.createClient(tools.redisConfig(config.redis)); + let logfunc = (...args) => { let level = args.shift() || 'DEBUG'; @@ -30,7 +33,7 @@ class ImapNotifier extends EventEmitter { return; } - this.subsriber = redis.createClient(); + this.subsriber = redis.createClient(tools.redisConfig(config.redis)); this._listeners = new EventEmitter(); this._listeners.setMaxListeners(0); diff --git a/lib/message-handler.js b/lib/message-handler.js index 07cf022..d0f4add 100644 --- a/lib/message-handler.js +++ b/lib/message-handler.js @@ -6,6 +6,7 @@ const ObjectID = require('mongodb').ObjectID; const RedFour = require('redfour'); const Indexer = require('../imap-core/lib/indexer/indexer'); const ImapNotifier = require('./imap-notifier'); +const tools = require('./tools'); class MessageHandler { @@ -19,7 +20,7 @@ class MessageHandler { pushOnly: true }); this.redlock = new RedFour({ - redis: config.redis, + redis: tools.redisConfig(config.redis), namespace: 'wildduck' }); } @@ -117,9 +118,7 @@ class MessageHandler { // allocate bot UID and MODSEQ values so when journal is later sorted by // modseq then UIDs are always in ascending order uidNext: 1, - modifyIndex: 1, - storageUsed: size, - messages: 1 + modifyIndex: 1 } }, (err, item) => { if (err) { @@ -144,14 +143,20 @@ class MessageHandler { let message = { _id: id, + mailbox: mailbox._id, + user: mailbox.user, + uid: mailbox.uidNext, + modseq: mailbox.modifyIndex + 1, + internaldate, headerdate, flags: [].concat(options.flags || []), size, + meta: options.meta || {}, - modseq: mailbox.modifyIndex + 1, + mimeTree, envelope, bodystructure, @@ -160,14 +165,7 @@ class MessageHandler { this.database.collection('messages').insertOne(message, err => { if (err) { - return this.database.collection('mailboxes').findOneAndUpdate({ - _id: mailbox._id - }, { - $inc: { - storageUsed: -size, - messages: -1 - } - }, () => rollback(err)); + return rollback(err); } let uidValidity = mailbox.uidValidity; diff --git a/lib/tools.js b/lib/tools.js index dc0dd0b..d20396d 100644 --- a/lib/tools.js +++ b/lib/tools.js @@ -25,6 +25,46 @@ function normalizeAddress(address, withNames) { return addr; } +// returns a redis config object with a retry strategy +function redisConfig(defaultConfig) { + let response = {}; + + if (typeof defaultConfig === 'string') { + defaultConfig = { + url: defaultConfig + }; + } + + Object.keys(defaultConfig || {}).forEach(key => { + response[key] = defaultConfig[key]; + }); + if (!response.hasOwnProperty('retry_strategy')) { + response.retry_strategy = options => { + + if (options.error && options.error.code === 'ECONNREFUSED') { + // End reconnecting on a specific error and flush all commands with a individual error + return new Error('The server refused the connection'); + } + + if (options.total_retry_time > 1000 * 60 * 60) { + // End reconnecting after a specific timeout and flush all commands with a individual error + return new Error('Retry time exhausted'); + } + + if (options.attempt > 10) { + // End reconnecting with built in error + return undefined; // eslint-disable-line no-undefined + } + + // reconnect after + return Math.min(options.attempt * 100, 3000); + }; + } + + return response; +} + module.exports = { - normalizeAddress + normalizeAddress, + redisConfig }; diff --git a/package.json b/package.json index 1c480ca..cd55d04 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "wildduck", - "version": "1.0.6", + "version": "1.0.7", "description": "IMAP server built with Node.js and MongoDB", "main": "server.js", "scripts": { @@ -16,11 +16,9 @@ "grunt-cli": "^1.2.0", "grunt-eslint": "^19.0.0", "grunt-mocha-test": "^0.13.2", - "mocha": "^3.2.0", - "nodemailer": "^3.1.8" + "mocha": "^3.2.0" }, "dependencies": { - "addressparser": "^1.0.1", "bcryptjs": "^2.4.3", "clone": "^2.1.1", "config": "^1.25.1", @@ -30,7 +28,7 @@ "libmime": "^3.1.0", "mailparser": "^2.0.2", "mongodb": "^2.2.25", - "nodemailer-fetch": "^2.1.0", + "nodemailer": "^3.1.8", "npmlog": "^4.0.2", "redfour": "^1.0.0", "redis": "^2.7.1", diff --git a/smtp.js b/smtp.js index 32d1d1e..a3e73a8 100644 --- a/smtp.js +++ b/smtp.js @@ -43,13 +43,23 @@ const server = new SMTPServer({ // Accept messages up to 10 MB size: maxMessageSize, + onMailFrom(address, session, callback) { + + // reset session entries + session.users = new Map(); + session.maxAllowedStorage = maxStorage; + + // accept sender address + return callback(); + }, + // Validate RCPT TO envelope address. Example allows all addresses that do not start with 'deny' // If this method is not set, all addresses are allowed onRcptTo(rcpt, session, callback) { let originalRecipient = tools.normalizeAddress(rcpt.address); let recipient = originalRecipient.replace(/\+[^@]*@/, '@'); - if (session.users && session.users.has(recipient)) { + if (session.users.has(recipient)) { return callback(); }