From ed75658f80b6f364b230a5123e03b03da324d917 Mon Sep 17 00:00:00 2001 From: Andris Reinman Date: Sat, 15 Jul 2017 19:08:33 +0300 Subject: [PATCH] Changed messages shard key to mailbox+uid --- README.md | 2 +- imap.js | 1931 +---------------------------- indexes.yaml | 64 +- lib/consts.js | 12 + lib/handlers/on-append.js | 56 + lib/handlers/on-auth.js | 34 + lib/handlers/on-copy.js | 189 +++ lib/handlers/on-create.js | 57 + lib/handlers/on-delete.js | 109 ++ lib/handlers/on-expunge.js | 146 +++ lib/handlers/on-fetch.js | 207 ++++ lib/handlers/on-get-quota-root.js | 44 + lib/handlers/on-get-quota.js | 36 + lib/handlers/on-list.js | 25 + lib/handlers/on-lsub.js | 25 + lib/handlers/on-move.js | 43 + lib/handlers/on-open.js | 44 + lib/handlers/on-rename.js | 49 + lib/handlers/on-search.js | 366 ++++++ lib/handlers/on-status.js | 56 + lib/handlers/on-store.js | 355 ++++++ lib/handlers/on-subscribe.js | 35 + lib/handlers/on-unsubscribe.js | 35 + lib/imap-notifier.js | 2 +- lib/message-handler.js | 119 +- lib/pop3-connection.js | 4 +- lib/pop3-server.js | 2 +- pop3.js | 11 +- 28 files changed, 2067 insertions(+), 1991 deletions(-) create mode 100644 lib/consts.js create mode 100644 lib/handlers/on-append.js create mode 100644 lib/handlers/on-auth.js create mode 100644 lib/handlers/on-copy.js create mode 100644 lib/handlers/on-create.js create mode 100644 lib/handlers/on-delete.js create mode 100644 lib/handlers/on-expunge.js create mode 100644 lib/handlers/on-fetch.js create mode 100644 lib/handlers/on-get-quota-root.js create mode 100644 lib/handlers/on-get-quota.js create mode 100644 lib/handlers/on-list.js create mode 100644 lib/handlers/on-lsub.js create mode 100644 lib/handlers/on-move.js create mode 100644 lib/handlers/on-open.js create mode 100644 lib/handlers/on-rename.js create mode 100644 lib/handlers/on-search.js create mode 100644 lib/handlers/on-status.js create mode 100644 lib/handlers/on-store.js create mode 100644 lib/handlers/on-subscribe.js create mode 100644 lib/handlers/on-unsubscribe.js diff --git a/README.md b/README.md index 3fdf8beb..0204933f 100644 --- a/README.md +++ b/README.md @@ -446,7 +446,7 @@ Shard the following collections by these keys: ```javascript sh.enableSharding('wildduck'); -sh.shardCollection('wildduck.messages', { user: 'hashed' }); +sh.shardCollection('wildduck.messages', { mailbox: 1, uid: 1 }); sh.shardCollection('wildduck.threads', { user: 'hashed' }); sh.shardCollection('wildduck.attachments.files', { 'metadata.h': 'hashed' }); sh.shardCollection('wildduck.attachments.chunks', { files_id: 'hashed' }); diff --git a/imap.js b/imap.js index 9ef39fe1..097056bc 100644 --- a/imap.js +++ b/imap.js @@ -1,32 +1,40 @@ 'use strict'; const log = require('npmlog'); -const util = require('util'); const config = require('config'); const IMAPServerModule = require('./imap-core'); const IMAPServer = IMAPServerModule.IMAPServer; const ImapNotifier = require('./lib/imap-notifier'); -const imapHandler = IMAPServerModule.imapHandler; -const ObjectID = require('mongodb').ObjectID; const Indexer = require('./imap-core/lib/indexer/indexer'); -const imapTools = require('./imap-core/lib/imap-tools'); const MessageHandler = require('./lib/message-handler'); const UserHandler = require('./lib/user-handler'); const db = require('./lib/db'); +const consts = require('./lib/consts'); const RedFour = require('redfour'); const packageData = require('./package.json'); const yaml = require('js-yaml'); const fs = require('fs'); const setupIndexes = yaml.safeLoad(fs.readFileSync(__dirname + '/indexes.yaml', 'utf8')).indexes; -// home many modifications to cache before writing -const BULK_BATCH_SIZE = 150; - -// how often to clear expired messages -const GC_INTERVAL = 10 * 60 * 1000; - -// artificail delay between deleting next expired message in ms -const GC_DELAY_DELETE = 100; +const onFetch = require('./lib/handlers/on-fetch'); +const onAuth = require('./lib/handlers/on-auth'); +const onList = require('./lib/handlers/on-list'); +const onLsub = require('./lib/handlers/on-lsub'); +const onSubscribe = require('./lib/handlers/on-subscribe'); +const onUnsubscribe = require('./lib/handlers/on-unsubscribe'); +const onCreate = require('./lib/handlers/on-create'); +const onRename = require('./lib/handlers/on-rename'); +const onDelete = require('./lib/handlers/on-delete'); +const onOpen = require('./lib/handlers/on-open'); +const onStatus = require('./lib/handlers/on-status'); +const onAppend = require('./lib/handlers/on-append'); +const onStore = require('./lib/handlers/on-store'); +const onExpunge = require('./lib/handlers/on-expunge'); +const onCopy = require('./lib/handlers/on-copy'); +const onMove = require('./lib/handlers/on-move'); +const onSearch = require('./lib/handlers/on-search'); +const onGetQuotaRoot = require('./lib/handlers/on-get-quota-root'); +const onGetQuota = require('./lib/handlers/on-get-quota'); // Setup server const serverOptions = { @@ -89,1866 +97,6 @@ let userHandler; let gcTimeout; let gcLock; -server.onAuth = function(login, session, callback) { - let username = (login.username || '').toString().trim(); - - userHandler.authenticate( - username, - login.password, - { - protocol: 'IMAP', - ip: session.remoteAddress - }, - (err, result) => { - if (err) { - return callback(err); - } - if (!result) { - return callback(); - } - - if (result.scope === 'master' && result.enabled2fa) { - // master password not allowed if 2fa is enabled! - return callback(); - } - - callback(null, { - user: { - id: result.user, - username: result.username - } - }); - } - ); -}; - -// LIST "" "*" -// Returns all folders, query is informational -// folders is either an Array or a Map -server.onList = function(query, session, callback) { - this.logger.debug( - { - tnx: 'list', - cid: session.id - }, - '[%s] LIST for "%s"', - session.id, - query - ); - db.database - .collection('mailboxes') - .find({ - user: session.user.id - }) - .toArray(callback); -}; - -// LSUB "" "*" -// Returns all subscribed folders, query is informational -// folders is either an Array or a Map -server.onLsub = function(query, session, callback) { - this.logger.debug( - { - tnx: 'lsub', - cid: session.id - }, - '[%s] LSUB for "%s"', - session.id, - query - ); - db.database - .collection('mailboxes') - .find({ - user: session.user.id, - subscribed: true - }) - .toArray(callback); -}; - -// SUBSCRIBE "path/to/mailbox" -server.onSubscribe = function(path, session, callback) { - this.logger.debug( - { - tnx: 'subscribe', - cid: session.id - }, - '[%s] SUBSCRIBE to "%s"', - session.id, - path - ); - db.database.collection('mailboxes').findOneAndUpdate({ - user: session.user.id, - path - }, { - $set: { - subscribed: true - } - }, {}, (err, item) => { - if (err) { - return callback(err); - } - - if (!item || !item.value) { - // was not able to acquire a lock - return callback(null, 'NONEXISTENT'); - } - - callback(null, true); - }); -}; - -// UNSUBSCRIBE "path/to/mailbox" -server.onUnsubscribe = function(path, session, callback) { - this.logger.debug( - { - tnx: 'unsubscribe', - cid: session.id - }, - '[%s] UNSUBSCRIBE from "%s"', - session.id, - path - ); - db.database.collection('mailboxes').findOneAndUpdate({ - user: session.user.id, - path - }, { - $set: { - subscribed: false - } - }, {}, (err, item) => { - if (err) { - return callback(err); - } - - if (!item || !item.value) { - // was not able to acquire a lock - return callback(null, 'NONEXISTENT'); - } - - callback(null, true); - }); -}; - -// CREATE "path/to/mailbox" -server.onCreate = function(path, session, callback) { - this.logger.debug( - { - tnx: 'create', - cid: session.id - }, - '[%s] CREATE "%s"', - session.id, - path - ); - db.database.collection('mailboxes').findOne({ - user: session.user.id, - path - }, (err, mailbox) => { - if (err) { - return callback(err); - } - if (mailbox) { - return callback(null, 'ALREADYEXISTS'); - } - - db.users.collection('users').findOne({ - _id: session.user.id - }, { - fields: { - retention: true - } - }, (err, user) => { - if (err) { - return callback(err); - } - - mailbox = { - user: session.user.id, - path, - uidValidity: Math.floor(Date.now() / 1000), - uidNext: 1, - modifyIndex: 0, - subscribed: true, - flags: [], - retention: user.retention - }; - - db.database.collection('mailboxes').insertOne(mailbox, err => { - if (err) { - return callback(err); - } - return callback(null, true); - }); - }); - }); -}; - -// RENAME "path/to/mailbox" "new/path" -// NB! RENAME affects child and hierarchy mailboxes as well, this example does not do this -server.onRename = function(path, newname, session, callback) { - this.logger.debug( - { - tnx: 'rename', - cid: session.id - }, - '[%s] RENAME "%s" to "%s"', - session.id, - path, - newname - ); - db.database.collection('mailboxes').findOne({ - user: session.user.id, - path: newname - }, (err, mailbox) => { - if (err) { - return callback(err); - } - if (mailbox) { - return callback(null, 'ALREADYEXISTS'); - } - - db.database.collection('mailboxes').findOneAndUpdate({ - user: session.user.id, - path - }, { - $set: { - path: newname - } - }, {}, (err, item) => { - if (err) { - return callback(err); - } - - if (!item || !item.value) { - // was not able to acquire a lock - return callback(null, 'NONEXISTENT'); - } - - callback(null, true); - }); - }); -}; - -// DELETE "path/to/mailbox" -server.onDelete = function(path, session, callback) { - this.logger.debug( - { - tnx: 'delete', - cid: session.id - }, - '[%s] DELETE "%s"', - session.id, - path - ); - db.database.collection('mailboxes').findOne({ - user: session.user.id, - path - }, (err, mailbox) => { - if (err) { - return callback(err); - } - if (!mailbox) { - return callback(null, 'NONEXISTENT'); - } - if (mailbox.specialUse) { - return callback(null, 'CANNOT'); - } - - db.database.collection('mailboxes').deleteOne({ - _id: mailbox._id - }, err => { - if (err) { - return callback(err); - } - - // calculate mailbox size by aggregating the size's of all messages - db.database - .collection('messages') - .aggregate( - [ - { - $match: { - user: session.user.id, - mailbox: mailbox._id - } - }, - { - $group: { - _id: { - mailbox: '$mailbox' - }, - storageUsed: { - $sum: '$size' - } - } - } - ], - { - cursor: { - batchSize: 1 - } - } - ) - .toArray((err, res) => { - if (err) { - return callback(err); - } - - let storageUsed = (res && res[0] && res[0].storageUsed) || 0; - - db.database.collection('messages').deleteMany({ - user: session.user.id, - mailbox: mailbox._id - }, err => { - if (err) { - return callback(err); - } - - let done = () => { - db.database.collection('journal').deleteMany({ - mailbox: mailbox._id - }, err => { - if (err) { - return callback(err); - } - callback(null, true); - }); - }; - - if (!storageUsed) { - return done(); - } - - // decrement quota counters - db.users.collection('users').findOneAndUpdate( - { - _id: mailbox.user - }, - { - $inc: { - storageUsed: -Number(storageUsed) || 0 - } - }, - done - ); - }); - }); - }); - }); -}; - -// SELECT/EXAMINE -server.onOpen = function(path, session, callback) { - this.logger.debug( - { - tnx: 'open', - cid: session.id - }, - '[%s] Opening "%s"', - session.id, - path - ); - db.database.collection('mailboxes').findOne({ - user: session.user.id, - path - }, (err, mailbox) => { - if (err) { - return callback(err); - } - if (!mailbox) { - return callback(null, 'NONEXISTENT'); - } - - db.database - .collection('messages') - .find({ - user: session.user.id, - mailbox: mailbox._id - }) - .project({ - uid: true - }) - .sort([['uid', 1]]) - .toArray((err, messages) => { - if (err) { - return callback(err); - } - mailbox.uidList = messages.map(message => message.uid); - callback(null, mailbox); - }); - }); -}; - -// STATUS (X Y X) -server.onStatus = function(path, session, callback) { - this.logger.debug( - { - tnx: 'status', - cid: session.id - }, - '[%s] Requested status for "%s"', - session.id, - path - ); - db.database.collection('mailboxes').findOne({ - user: session.user.id, - path - }, (err, mailbox) => { - if (err) { - return callback(err); - } - if (!mailbox) { - return callback(null, 'NONEXISTENT'); - } - - db.database - .collection('messages') - .find({ - user: session.user.id, - mailbox: mailbox._id - }) - .count((err, total) => { - if (err) { - return callback(err); - } - db.database - .collection('messages') - .find({ - user: session.user.id, - mailbox: mailbox._id, - seen: false - }) - .count((err, unseen) => { - if (err) { - return callback(err); - } - - return callback(null, { - messages: total, - uidNext: mailbox.uidNext, - uidValidity: mailbox.uidValidity, - unseen - }); - }); - }); - }); -}; - -// APPEND mailbox (flags) date message -server.onAppend = function(path, flags, date, raw, session, callback) { - this.logger.debug( - { - tnx: 'append', - cid: session.id - }, - '[%s] Appending message to "%s"', - session.id, - path - ); - - db.users.collection('users').findOne({ - _id: session.user.id - }, (err, user) => { - if (err) { - return callback(err); - } - if (!user) { - return callback(new Error('User not found')); - } - - if (user.quota && user.storageUsed > user.quota) { - return callback(false, 'OVERQUOTA'); - } - - messageHandler.add( - { - user: session.user.id, - path, - meta: { - source: 'IMAP', - to: session.user.username, - time: Date.now() - }, - session, - date, - flags, - raw - }, - (err, status, data) => { - if (err) { - if (err.imapResponse) { - return callback(null, err.imapResponse); - } - return callback(err); - } - callback(null, status, data); - } - ); - }); -}; - -server.updateMailboxFlags = function(mailbox, update, callback) { - if (update.action === 'remove') { - // we didn't add any new flags, so there's nothing to update - return callback(); - } - - let mailboxFlags = imapTools.systemFlags.concat(mailbox.flags || []).map(flag => flag.trim().toLowerCase()); - let newFlags = []; - - // find flags that are not listed with mailbox - update.value.forEach(flag => { - // limit mailbox flags by 100 - if (mailboxFlags.length + newFlags.length >= 100) { - return; - } - // if mailbox does not have such flag, then add it - if (!mailboxFlags.includes(flag.toLowerCase().trim())) { - newFlags.push(flag); - } - }); - - // nothing new found - if (!newFlags.length) { - return callback(); - } - - // found some new flags not yet set for mailbox - // FIXME: Should we send unsolicited FLAGS and PERMANENTFLAGS notifications? Probably not - return db.database.collection('mailboxes').findOneAndUpdate( - { - _id: mailbox._id - }, - { - $addToSet: { - flags: { - $each: newFlags - } - } - }, - {}, - callback - ); -}; - -// STORE / UID STORE, updates flags for selected UIDs -server.onStore = function(path, update, session, callback) { - this.logger.debug( - { - tnx: 'store', - cid: session.id - }, - '[%s] Updating messages in "%s"', - session.id, - path - ); - db.database.collection('mailboxes').findOne({ - user: session.user.id, - path - }, (err, mailbox) => { - if (err) { - return callback(err); - } - - if (!mailbox) { - return callback(null, 'NONEXISTENT'); - } - - let query = { - user: session.user.id, - mailbox: mailbox._id - }; - - if (update.unchangedSince) { - query = { - user: session.user.id, - mailbox: mailbox._id, - modseq: { - $lte: update.unchangedSince - } - }; - } - - let queryAll = false; - if (update.messages.length !== session.selected.uidList.length) { - // do not use uid selector for 1:* - query.uid = { - $in: update.messages - }; - } else { - // 1:* - queryAll = true; - } - - let cursor = db.database - .collection('messages') - .find(query) - .project({ - _id: true, - uid: true, - flags: true - }) - .sort([['uid', 1]]); - - let updateEntries = []; - let notifyEntries = []; - - let done = (...args) => { - if (updateEntries.length) { - return db.database.collection('messages').bulkWrite(updateEntries, { - ordered: false, - w: 1 - }, () => { - updateEntries = []; - this.notifier.addEntries(session.user.id, path, notifyEntries, () => { - notifyEntries = []; - this.notifier.fire(session.user.id, path); - if (args[0]) { - // first argument is an error - return callback(...args); - } else { - server.updateMailboxFlags(mailbox, update, () => callback(...args)); - } - }); - }); - } - this.notifier.fire(session.user.id, path); - if (args[0]) { - // first argument is an error - return callback(...args); - } else { - server.updateMailboxFlags(mailbox, update, () => callback(...args)); - } - }; - - // We have to process all messages one by one instead of just calling an update - // for all messages as we need to know which messages were exactly modified, - // otherwise we can't send flag update notifications and modify modseq values - let processNext = () => { - cursor.next((err, message) => { - if (err) { - return done(err); - } - if (!message) { - return cursor.close(() => done(null, true)); - } - if (queryAll && !session.selected.uidList.includes(message.uid)) { - // skip processing messages that we do not know about yet - return processNext(); - } - - let flagsupdate = false; // query object for updates - - let updated = false; - let existingFlags = message.flags.map(flag => flag.toLowerCase().trim()); - switch (update.action) { - case 'set': - // check if update set matches current or is different - if ( - // if length does not match - existingFlags.length !== update.value.length || - // or a new flag was found - update.value.filter(flag => !existingFlags.includes(flag.toLowerCase().trim())).length - ) { - updated = true; - } - - message.flags = [].concat(update.value); - - // set flags - if (updated) { - flagsupdate = { - $set: { - flags: message.flags, - seen: message.flags.includes('\\Seen'), - flagged: message.flags.includes('\\Flagged'), - deleted: message.flags.includes('\\Deleted'), - draft: message.flags.includes('\\Draft') - } - }; - } - break; - - case 'add': { - let newFlags = []; - message.flags = message.flags.concat( - update.value.filter(flag => { - if (!existingFlags.includes(flag.toLowerCase().trim())) { - updated = true; - newFlags.push(flag); - return true; - } - return false; - }) - ); - - // add flags - if (updated) { - flagsupdate = { - $addToSet: { - flags: { - $each: newFlags - } - } - }; - - if ( - newFlags.includes('\\Seen') || - newFlags.includes('\\Flagged') || - newFlags.includes('\\Deleted') || - newFlags.includes('\\Draft') - ) { - flagsupdate.$set = {}; - if (newFlags.includes('\\Seen')) { - flagsupdate.$set = { - seen: true - }; - } - if (newFlags.includes('\\Flagged')) { - flagsupdate.$set = { - flagged: true - }; - } - if (newFlags.includes('\\Deleted')) { - flagsupdate.$set = { - deleted: true - }; - } - if (newFlags.includes('\\Draft')) { - flagsupdate.$set = { - draft: true - }; - } - } - } - break; - } - - case 'remove': { - // We need to use the case of existing flags when removing - let oldFlags = []; - let flagsUpdates = update.value.map(flag => flag.toLowerCase().trim()); - message.flags = message.flags.filter(flag => { - if (!flagsUpdates.includes(flag.toLowerCase().trim())) { - return true; - } - oldFlags.push(flag); - updated = true; - return false; - }); - - // remove flags - if (updated) { - flagsupdate = { - $pull: { - flags: { - $in: oldFlags - } - } - }; - if ( - oldFlags.includes('\\Seen') || - oldFlags.includes('\\Flagged') || - oldFlags.includes('\\Deleted') || - oldFlags.includes('\\Draft') - ) { - flagsupdate.$set = {}; - if (oldFlags.includes('\\Seen')) { - flagsupdate.$set = { - seen: false - }; - } - if (oldFlags.includes('\\Flagged')) { - flagsupdate.$set = { - flagged: false - }; - } - if (oldFlags.includes('\\Deleted')) { - flagsupdate.$set = { - deleted: false - }; - } - if (oldFlags.includes('\\Draft')) { - flagsupdate.$set = { - draft: false - }; - } - } - } - break; - } - } - - if (!update.silent) { - // print updated state of the message - session.writeStream.write( - session.formatResponse('FETCH', message.uid, { - uid: update.isUid ? message.uid : false, - flags: message.flags - }) - ); - } - - if (updated) { - updateEntries.push({ - updateOne: { - filter: { - _id: message._id, - user: session.user.id - }, - update: flagsupdate - } - }); - - notifyEntries.push({ - command: 'FETCH', - ignore: session.id, - uid: message.uid, - flags: message.flags, - message: message._id - }); - - if (updateEntries.length >= BULK_BATCH_SIZE) { - return db.database.collection('messages').bulkWrite(updateEntries, { - ordered: false, - w: 1 - }, err => { - updateEntries = []; - if (err) { - return cursor.close(() => done(err)); - } - - this.notifier.addEntries(session.user.id, path, notifyEntries, () => { - notifyEntries = []; - this.notifier.fire(session.user.id, path); - processNext(); - }); - }); - } else { - processNext(); - } - } else { - processNext(); - } - }); - }; - - processNext(); - }); -}; - -// EXPUNGE deletes all messages in selected mailbox marked with \Delete -server.onExpunge = function(path, update, session, callback) { - this.logger.debug( - { - tnx: 'expunge', - cid: session.id - }, - '[%s] Deleting messages from "%s"', - session.id, - path - ); - db.database.collection('mailboxes').findOne({ - user: session.user.id, - path - }, (err, mailbox) => { - if (err) { - return callback(err); - } - if (!mailbox) { - return callback(null, 'NONEXISTENT'); - } - - let cursor = db.database - .collection('messages') - .find({ - user: session.user.id, - mailbox: mailbox._id, - deleted: true - }) - .project({ - _id: true, - uid: true, - size: true, - map: true, - magic: true - }) - .sort([['uid', 1]]); - - let deletedMessages = 0; - let deletedStorage = 0; - - let updateQuota = next => { - if (!deletedMessages) { - return next(); - } - - db.users.collection('users').findOneAndUpdate( - { - _id: mailbox.user - }, - { - $inc: { - storageUsed: -deletedStorage - } - }, - next - ); - }; - - let processNext = () => { - cursor.next((err, message) => { - if (err) { - return updateQuota(() => callback(err)); - } - if (!message) { - return cursor.close(() => { - updateQuota(() => { - this.notifier.fire(session.user.id, path); - return callback(null, true); - }); - }); - } - - if (!update.silent) { - session.writeStream.write(session.formatResponse('EXPUNGE', message.uid)); - } - - db.database.collection('messages').deleteOne({ - _id: message._id, - user: session.user.id - }, err => { - if (err) { - return updateQuota(() => cursor.close(() => callback(err))); - } - - deletedMessages++; - deletedStorage += Number(message.size) || 0; - - let attachments = Object.keys(message.map || {}).map(key => message.map[key]); - - if (!attachments.length) { - // not stored attachments - return this.notifier.addEntries( - session.user.id, - path, - { - command: 'EXPUNGE', - ignore: session.id, - uid: message.uid, - message: message._id - }, - processNext - ); - } - - // remove references to attachments (if any exist) - db.gridfs.collection('attachments.files').updateMany({ - _id: { - $in: attachments - } - }, { - $inc: { - 'metadata.c': -1, - 'metadata.m': -message.magic - } - }, { - multi: true, - w: 1 - }, err => { - if (err) { - // ignore as we don't really care if we have orphans or not - } - this.notifier.addEntries( - session.user.id, - path, - { - command: 'EXPUNGE', - ignore: session.id, - uid: message.uid, - message: message._id - }, - processNext - ); - }); - }); - }); - }; - - processNext(); - }); -}; - -// COPY / UID COPY sequence mailbox -server.onCopy = function(path, update, session, callback) { - this.logger.debug( - { - tnx: 'copy', - cid: session.id - }, - '[%s] Copying messages from "%s" to "%s"', - session.id, - path, - update.destination - ); - db.database.collection('mailboxes').findOne({ - user: session.user.id, - path - }, (err, mailbox) => { - if (err) { - return callback(err); - } - if (!mailbox) { - return callback(null, 'NONEXISTENT'); - } - - db.database.collection('mailboxes').findOne({ - user: session.user.id, - path: update.destination - }, (err, target) => { - if (err) { - return callback(err); - } - if (!target) { - return callback(null, 'TRYCREATE'); - } - - let cursor = db.database - .collection('messages') - .find({ - user: session.user.id, - mailbox: mailbox._id, - uid: { - $in: update.messages - } - }) - .sort([['uid', 1]]); // no projection as we need to copy the entire message - - let copiedMessages = 0; - let copiedStorage = 0; - - let updateQuota = next => { - if (!copiedMessages) { - return next(); - } - db.users.collection('users').findOneAndUpdate( - { - _id: mailbox.user - }, - { - $inc: { - storageUsed: copiedStorage - } - }, - next - ); - }; - - let sourceUid = []; - let destinationUid = []; - let processNext = () => { - cursor.next((err, message) => { - if (err) { - return updateQuota(() => callback(err)); - } - if (!message) { - return cursor.close(() => { - updateQuota(() => { - this.notifier.fire(session.user.id, target.path); - return callback(null, true, { - uidValidity: target.uidValidity, - sourceUid, - destinationUid - }); - }); - }); - } - - // Copying is not done in bulk to minimize risk of going out of sync with incremental UIDs - sourceUid.unshift(message.uid); - db.database.collection('mailboxes').findOneAndUpdate({ - _id: target._id - }, { - $inc: { - uidNext: 1 - } - }, { - uidNext: true - }, (err, item) => { - if (err) { - return cursor.close(() => { - updateQuota(() => callback(err)); - }); - } - - if (!item || !item.value) { - // was not able to acquire a lock - return cursor.close(() => { - updateQuota(() => callback(null, 'TRYCREATE')); - }); - } - - let uidNext = item.value.uidNext; - destinationUid.unshift(uidNext); - - message._id = new ObjectID(); - message.mailbox = target._id; - message.uid = uidNext; - - // retention settings - message.exp = !!target.retention; - message.rdate = Date.now() + (target.retention || 0); - - if (!message.meta) { - message.meta = {}; - } - message.meta.source = 'IMAPCOPY'; - - db.database.collection('messages').insertOne(message, err => { - if (err) { - return cursor.close(() => { - updateQuota(() => callback(err)); - }); - } - - copiedMessages++; - copiedStorage += Number(message.size) || 0; - - let attachments = Object.keys(message.map || {}).map(key => message.map[key]); - if (!attachments.length) { - return this.notifier.addEntries( - session.user.id, - target.path, - { - command: 'EXISTS', - uid: message.uid, - message: message._id - }, - processNext - ); - } - - // update attachments - db.gridfs.collection('attachments.files').updateMany({ - _id: { - $in: attachments - } - }, { - $inc: { - 'metadata.c': 1, - 'metadata.m': message.magic - } - }, { - multi: true, - w: 1 - }, err => { - if (err) { - // should we care about this error? - } - this.notifier.addEntries( - session.user.id, - target.path, - { - command: 'EXISTS', - uid: message.uid, - message: message._id - }, - processNext - ); - }); - }); - }); - }); - }; - processNext(); - }); - }); -}; - -// MOVE / UID MOVE sequence mailbox -server.onMove = function(path, update, session, callback) { - this.logger.debug( - { - tnx: 'move', - cid: session.id - }, - '[%s] Moving messages from "%s" to "%s"', - session.id, - path, - update.destination - ); - - messageHandler.move( - { - user: session.user.id, - // folder to move messages from - source: { - user: session.user.id, - path - }, - // folder to move messages to - destination: { - user: session.user.id, - path: update.destination - }, - session, - // list of UIDs to move - messages: update.messages - }, - (...args) => { - if (args[0]) { - if (args[0].imapResponse) { - return callback(null, args[0].imapResponse); - } - return callback(args[0]); - } - callback(...args); - } - ); -}; - -// sends results to socket -server.onFetch = function(path, options, session, callback) { - this.logger.debug( - { - tnx: 'fetch', - cid: session.id - }, - '[%s] Requested FETCH for "%s"', - session.id, - path - ); - db.database.collection('mailboxes').findOne({ - user: session.user.id, - path - }, (err, mailbox) => { - if (err) { - return callback(err); - } - if (!mailbox) { - return callback(null, 'NONEXISTENT'); - } - - let projection = { - uid: true, - modseq: true, - idate: true, - flags: true, - envelope: true, - bodystructure: true, - size: true - }; - - if (!options.metadataOnly) { - projection.mimeTree = true; - } - - let query = { - user: session.user.id, - mailbox: mailbox._id - }; - - if (options.changedSince) { - query = { - user: session.user.id, - mailbox: mailbox._id, - modseq: { - $gt: options.changedSince - } - }; - } - - let queryAll = false; - if (options.messages.length !== session.selected.uidList.length) { - // do not use uid selector for 1:* - query.uid = { - $in: options.messages - }; - } else { - // 1:* - queryAll = true; - } - - let isUpdated = false; - let updateEntries = []; - let notifyEntries = []; - - let done = (...args) => { - if (updateEntries.length) { - return db.database.collection('messages').bulkWrite(updateEntries, { - ordered: false, - w: 1 - }, () => { - updateEntries = []; - this.notifier.addEntries(session.user.id, path, notifyEntries, () => { - notifyEntries = []; - this.notifier.fire(session.user.id, path); - return callback(...args); - }); - }); - } - if (isUpdated) { - this.notifier.fire(session.user.id, path); - } - return callback(...args); - }; - - let cursor = db.database.collection('messages').find(query).project(projection).sort([['uid', 1]]); - - let rowCount = 0; - let processNext = () => { - cursor.next((err, message) => { - if (err) { - return done(err); - } - if (!message) { - return cursor.close(() => { - done(null, true); - }); - } - - if (queryAll && !session.selected.uidList.includes(message.uid)) { - // skip processing messages that we do not know about yet - return processNext(); - } - - let markAsSeen = options.markAsSeen && !message.flags.includes('\\Seen'); - if (markAsSeen) { - message.flags.unshift('\\Seen'); - } - - let stream = imapHandler.compileStream( - session.formatResponse('FETCH', message.uid, { - query: options.query, - values: session.getQueryResponse(options.query, message, { - logger: this.logger, - fetchOptions: {}, - database: db.database, - gridfs: db.gridfs, - acceptUTF8Enabled: session.isUTF8Enabled() - }) - }) - ); - - stream.description = util.format('* FETCH #%s uid=%s size=%sB ', ++rowCount, message.uid, message.size); - - stream.on('error', err => { - session.socket.write('* BYE INTERNAL ERROR\n'); - session.socket.destroy(); // ended up in erroneus state, kill the connection to abort - return cursor.close(() => done(err)); - }); - - // send formatted response to socket - session.writeStream.write(stream, () => { - if (!markAsSeen) { - return processNext(); - } - - this.logger.debug( - { - tnx: 'flags', - cid: session.id - }, - '[%s] UPDATE FLAGS for "%s"', - session.id, - message.uid - ); - - isUpdated = true; - - updateEntries.push({ - updateOne: { - filter: { - _id: message._id, - user: session.user.id - }, - update: { - $addToSet: { - flags: '\\Seen' - }, - $set: { - seen: true - } - } - } - }); - - notifyEntries.push({ - command: 'FETCH', - ignore: session.id, - uid: message.uid, - flags: message.flags, - message: message._id - }); - - if (updateEntries.length >= BULK_BATCH_SIZE) { - return db.database.collection('messages').bulkWrite(updateEntries, { - ordered: false, - w: 1 - }, err => { - updateEntries = []; - if (err) { - return cursor.close(() => done(err)); - } - - this.notifier.addEntries(session.user.id, path, notifyEntries, () => { - notifyEntries = []; - this.notifier.fire(session.user.id, path); - processNext(); - }); - }); - } else { - processNext(); - } - }); - }); - }; - - processNext(); - }); -}; - -/** - * Returns an array of matching UID values - * - * IMAP search can be quite complex, so we optimize here for most common queries to be handled - * by MongoDB and then do the final filtering on the client side. This allows - */ -server.onSearch = function(path, options, session, callback) { - db.database.collection('mailboxes').findOne({ - user: session.user.id, - path - }, (err, mailbox) => { - if (err) { - return callback(err); - } - if (!mailbox) { - return callback(null, 'NONEXISTENT'); - } - - // prepare query - - let query = { - user: session.user.id, - mailbox: mailbox._id - }; - - let walkQuery = (parent, ne, node) => { - node.forEach(term => { - switch (term.key) { - case 'all': - if (ne) { - parent.push({ - // should not match anything - _id: -1 - }); - } - break; - - case 'not': - walkQuery(parent, !ne, [].concat(term.value || [])); - break; - - case 'or': { - let $or = []; - - [].concat(term.value || []).forEach(entry => { - walkQuery($or, false, [].concat(entry || [])); - }); - - if ($or.length) { - parent.push({ - $or - }); - } - - break; - } - - case 'text': // search over entire email - case 'body': // search over email body - if (term.value && !ne) { - // fulltext can only be in the root of the query, not in $not, $or expressions - // https://docs.mongodb.com/v3.4/tutorial/text-search-in-aggregation/#restrictions - query.$text = { - $search: term.value - }; - } else { - // can not search by text - parent.push({ - // should not match anything - _id: -1 - }); - } - break; - - case 'modseq': - parent.push({ - modseq: { - [!ne ? '$gte' : '$lt']: term.value - } - }); - break; - - case 'uid': - if (Array.isArray(term.value)) { - if (!term.value.length) { - // trying to find a message that does not exist - return callback(null, { - uidList: [], - highestModseq: 0 - }); - } - if (term.value.length !== session.selected.uidList.length) { - // not 1:* - parent.push({ - uid: { - [!ne ? '$in' : '$nin']: term.value - } - }); - } else if (ne) { - parent.push({ - // should not match anything - _id: -1 - }); - } - } else { - parent.push({ - uid: { - [!ne ? '$eq' : '$ne']: term.value - } - }); - } - break; - - case 'flag': - { - switch (term.value) { - case '\\Seen': - case '\\Deleted': - case '\\Flagged': - case '\\Draft': - if (term.exists) { - parent.push({ - [term.value.toLowerCase().substr(1)]: !ne - }); - } else { - parent.push({ - [term.value.toLowerCase().substr(1)]: ne - }); - } - break; - default: - if (term.exists) { - parent.push({ - flags: { - [!ne ? '$eq' : '$ne']: term.value - } - }); - } else { - parent.push({ - flags: { - [!ne ? '$ne' : '$eq']: term.value - } - }); - } - } - } - break; - - case 'header': - { - // FIXME: this does not match unicode symbols for whatever reason - let regex = Buffer.from(term.value, 'binary').toString().replace(/[-/\\^$*+?.()|[\]{}]/g, '\\$&'); - let entry = term.value - ? { - headers: { - $elemMatch: { - key: term.header, - value: !ne - ? { - $regex: regex, - $options: 'i' - } - : { - $not: { - $regex: regex, - $options: 'i' - } - } - } - } - } - : { - 'headers.key': !ne - ? term.header - : { - $ne: term.header - } - }; - parent.push(entry); - } - break; - - case 'internaldate': - { - let op = false; - let value = new Date(term.value + ' GMT'); - switch (term.operator) { - case '<': - op = '$lt'; - break; - case '<=': - op = '$lte'; - break; - case '>': - op = '$gt'; - break; - case '>=': - op = '$gte'; - break; - } - let entry = !op - ? [ - { - $gte: value - }, - { - $lt: new Date(value.getTime() + 24 * 3600 * 1000) - } - ] - : { - [op]: value - }; - - entry = { - idate: !ne - ? entry - : { - $not: entry - } - }; - - parent.push(entry); - } - break; - - case 'headerdate': - { - let op = false; - let value = new Date(term.value + ' GMT'); - switch (term.operator) { - case '<': - op = '$lt'; - break; - case '<=': - op = '$lte'; - break; - case '>': - op = '$gt'; - break; - case '>=': - op = '$gte'; - break; - } - let entry = !op - ? [ - { - $gte: value - }, - { - $lt: new Date(value.getTime() + 24 * 3600 * 1000) - } - ] - : { - [op]: value - }; - - entry = { - hdate: !ne - ? entry - : { - $not: entry - } - }; - - parent.push(entry); - } - break; - - case 'size': - { - let op = '$eq'; - let value = Number(term.value) || 0; - switch (term.operator) { - case '<': - op = '$lt'; - break; - case '<=': - op = '$lte'; - break; - case '>': - op = '$gt'; - break; - case '>=': - op = '$gte'; - break; - } - - let entry = { - [op]: value - }; - - entry = { - size: !ne - ? entry - : { - $not: entry - } - }; - - parent.push(entry); - } - break; - } - }); - }; - - let $and = []; - walkQuery($and, false, options.query); - if ($and.length) { - query.$and = $and; - } - - this.logger.info( - { - tnx: 'search', - cid: session.id - }, - '[%s] SEARCH %s', - session.id, - JSON.stringify(query) - ); - - let cursor = db.database.collection('messages').find(query).project({ - uid: true, - modseq: true - }); - - let highestModseq = 0; - let uidList = []; - - let processNext = () => { - cursor.next((err, message) => { - if (err) { - this.logger.error( - { - tnx: 'search', - cid: session.id - }, - '[%s] SEARCHFAIL %s error="%s"', - session.id, - JSON.stringify(query), - err.message - ); - return callback(new Error('Can not make requested search query')); - } - if (!message) { - return cursor.close(() => - callback(null, { - uidList, - highestModseq - }) - ); - } - - if (highestModseq < message.modseq) { - highestModseq = message.modseq; - } - - uidList.push(message.uid); - processNext(); - }); - }; - - processNext(); - }); -}; - -server.onGetQuotaRoot = function(path, session, callback) { - this.logger.debug( - { - tnx: 'quota', - cid: session.id - }, - '[%s] Requested quota root info for "%s"', - session.id, - path - ); - - db.database.collection('mailboxes').findOne({ - user: session.user.id, - path - }, (err, mailbox) => { - if (err) { - return callback(err); - } - if (!mailbox) { - return callback(null, 'NONEXISTENT'); - } - - db.users.collection('users').findOne({ - _id: session.user.id - }, (err, user) => { - if (err) { - return callback(err); - } - if (!user) { - return callback(new Error('User data not found')); - } - - return callback(null, { - root: '', - quota: user.quota || server.options.maxStorage || 0, - storageUsed: Math.max(user.storageUsed || 0, 0) - }); - }); - }); -}; - -server.onGetQuota = function(quotaRoot, session, callback) { - this.logger.debug( - { - tnx: 'quota', - cid: session.id - }, - '[%s] Requested quota info for "%s"', - session.id, - quotaRoot - ); - - if (quotaRoot !== '') { - return callback(null, 'NONEXISTENT'); - } - - db.users.collection('users').findOne({ - _id: session.user.id - }, (err, user) => { - if (err) { - return callback(err); - } - if (!user) { - return callback(new Error('User data not found')); - } - - return callback(null, { - root: '', - quota: user.quota || server.options.maxStorage || 0, - storageUsed: Math.max(user.storageUsed || 0, 0) - }); - }); -}; - function deleteOrphanedAttachments(callback) { let cursor = db.gridfs.collection('attachments.files').find({ 'metadata.c': 0, @@ -2007,7 +155,7 @@ function clearExpiredMessages() { let startTime = Date.now(); // First, acquire the lock. This prevents multiple connected clients for deleting the same messages - gcLock.acquireLock('gc_expired', Math.round(GC_INTERVAL * 1.2) /* Lock expires if not released */, (err, lock) => { + gcLock.acquireLock('gc_expired', Math.round(consts.GC_INTERVAL * 1.2) /* Lock expires if not released */, (err, lock) => { if (err) { server.logger.error( { @@ -2017,11 +165,11 @@ function clearExpiredMessages() { 'Failed to acquire lock error=%s', err.message ); - gcTimeout = setTimeout(clearExpiredMessages, GC_INTERVAL); + gcTimeout = setTimeout(clearExpiredMessages, consts.GC_INTERVAL); gcTimeout.unref(); return; } else if (!lock.success) { - gcTimeout = setTimeout(clearExpiredMessages, GC_INTERVAL); + gcTimeout = setTimeout(clearExpiredMessages, consts.GC_INTERVAL); gcTimeout.unref(); return; } @@ -2038,7 +186,7 @@ function clearExpiredMessages() { err.message ); } - gcTimeout = setTimeout(clearExpiredMessages, GC_INTERVAL); + gcTimeout = setTimeout(clearExpiredMessages, consts.GC_INTERVAL); gcTimeout.unref(); }); }; @@ -2084,7 +232,7 @@ function clearExpiredMessages() { }); let processNext = () => { - if (Date.now() - startTime > GC_INTERVAL * 0.8) { + if (Date.now() - startTime > consts.GC_INTERVAL * 0.8) { return clear(); } @@ -2105,7 +253,7 @@ function clearExpiredMessages() { message._id ); - gcTimeout = setTimeout(clearExpiredMessages, GC_INTERVAL); + gcTimeout = setTimeout(clearExpiredMessages, consts.GC_INTERVAL); messageHandler.del( { @@ -2117,8 +265,8 @@ function clearExpiredMessages() { return cursor.close(() => done(err)); } deleted++; - if (GC_DELAY_DELETE) { - setTimeout(processNext, GC_DELAY_DELETE); + if (consts.GC_DELAY_DELETE) { + setTimeout(processNext, consts.GC_DELAY_DELETE); } else { setImmediate(processNext); } @@ -2141,7 +289,7 @@ module.exports = done => { namespace: 'wildduck' }); - gcTimeout = setTimeout(clearExpiredMessages, GC_INTERVAL); + gcTimeout = setTimeout(clearExpiredMessages, consts.GC_INTERVAL); gcTimeout.unref(); let start = () => { @@ -2180,6 +328,27 @@ module.exports = done => { started = true; done(null, server); }); + + // setup command handlers for the server instance + server.onFetch = onFetch(server); + server.onAuth = onAuth(server, userHandler); + server.onList = onList(server); + server.onLsub = onLsub(server); + server.onSubscribe = onSubscribe(server); + server.onUnsubscribe = onUnsubscribe(server); + server.onCreate = onCreate(server); + server.onRename = onRename(server); + server.onDelete = onDelete(server); + server.onOpen = onOpen(server); + server.onStatus = onStatus(server); + server.onAppend = onAppend(server, messageHandler); + server.onStore = onStore(server); + server.onExpunge = onExpunge(server); + server.onCopy = onCopy(server); + server.onMove = onMove(server, messageHandler); + server.onSearch = onSearch(server); + server.onGetQuotaRoot = onGetQuotaRoot(server); + server.onGetQuota = onGetQuota(server); }; let indexpos = 0; diff --git a/indexes.yaml b/indexes.yaml index a5e81510..1f813cef 100644 --- a/indexes.yaml +++ b/indexes.yaml @@ -55,29 +55,7 @@ indexes: specialUse: 1 # Indexes for the messages collection -# NB! this is a sharded collection and the shard -# key should be 'user' so keep this field as the first one -# in indexes -- collection: messages - index: - # hashed index needed for sharding - name: messages_shard - key: - user: hashed - -- collection: messages - index: - name: mailbox_by_id - key: - _id: 1 - user: 1 -- collection: messages - index: - name: mailbox_messages - key: - user: 1 - mailbox: 1 - collection: messages index: name: user_messages_by_thread @@ -86,45 +64,50 @@ indexes: thread: 1 - collection: messages index: + # use also as sharding key name: mailbox_uid key: - user: 1 mailbox: 1 uid: 1 + +- collection: messages + index: + # several message related queries include the shard key values + name: mailbox_uid_shard + key: + _id: 1 + mailbox: 1 + uid: 1 + +- collection: messages + index: + name: newer_first + key: + mailbox: 1 + uid: -1 - collection: messages index: name: mailbox_modseq_uid key: - user: 1 mailbox: 1 modseq: 1 uid: 1 -- collection: messages - index: - name: newer_first - key: - user: 1 - mailbox: 1 - uid: -1 - collection: messages index: name: mailbox_flags key: - user: 1 mailbox: 1 flags: 1 - collection: messages index: name: by_modseq key: - user: 1 mailbox: 1 modseq: 1 - collection: messages index: name: by_idate key: - user: 1 mailbox: 1 idate: 1 _id: 1 @@ -132,7 +115,6 @@ indexes: index: name: by_idate_newer key: - user: 1 mailbox: 1 idate: -1 _id: -1 @@ -140,7 +122,6 @@ indexes: index: name: by_hdate key: - user: 1 mailbox: 1 hdate: 1 msgid: 1 @@ -148,22 +129,18 @@ indexes: index: name: by_size key: - user: 1 mailbox: 1 size: 1 - collection: messages index: name: by_headers key: - user: 1 mailbox: 1 headers.key: 1 headers.value: 1 - collection: messages index: - # there can be only one $text index per collection, so in order to make - # account wide searches we do not use mailbox as compound key element here. - # IMAP TEXT and BODY searches might be slower though + # there can be only one $text index per collection name: fulltext key: user: 1 @@ -176,35 +153,30 @@ indexes: index: name: mailbox_seen_flag key: - user: 1 mailbox: 1 seen: 1 - collection: messages index: name: mailbox_deleted_flag key: - user: 1 mailbox: 1 deleted: 1 - collection: messages index: name: mailbox_flagged_flag key: - user: 1 mailbox: 1 flagged: 1 - collection: messages index: name: mailbox_draft_flag key: - user: 1 mailbox: 1 draft: 1 - collection: messages index: name: has_attachment key: - user: 1 mailbox: 1 ha: 1 - collection: messages diff --git a/lib/consts.js b/lib/consts.js new file mode 100644 index 00000000..b36e6920 --- /dev/null +++ b/lib/consts.js @@ -0,0 +1,12 @@ +'use strict'; + +module.exports = { + // home many modifications to cache before writing + BULK_BATCH_SIZE: 150, + + // how often to clear expired messages + GC_INTERVAL: 10 * 60 * 1000, + + // artificail delay between deleting next expired message in ms + GC_DELAY_DELETE: 100 +}; diff --git a/lib/handlers/on-append.js b/lib/handlers/on-append.js new file mode 100644 index 00000000..e9ff3df0 --- /dev/null +++ b/lib/handlers/on-append.js @@ -0,0 +1,56 @@ +'use strict'; + +const db = require('../db'); + +// APPEND mailbox (flags) date message +module.exports = (server, messageHandler) => (path, flags, date, raw, session, callback) => { + server.logger.debug( + { + tnx: 'append', + cid: session.id + }, + '[%s] Appending message to "%s"', + session.id, + path + ); + + db.users.collection('users').findOne({ + _id: session.user.id + }, (err, user) => { + if (err) { + return callback(err); + } + if (!user) { + return callback(new Error('User not found')); + } + + if (user.quota && user.storageUsed > user.quota) { + return callback(false, 'OVERQUOTA'); + } + + messageHandler.add( + { + user: session.user.id, + path, + meta: { + source: 'IMAP', + to: session.user.username, + time: Date.now() + }, + session, + date, + flags, + raw + }, + (err, status, data) => { + if (err) { + if (err.imapResponse) { + return callback(null, err.imapResponse); + } + return callback(err); + } + callback(null, status, data); + } + ); + }); +}; diff --git a/lib/handlers/on-auth.js b/lib/handlers/on-auth.js new file mode 100644 index 00000000..692fced6 --- /dev/null +++ b/lib/handlers/on-auth.js @@ -0,0 +1,34 @@ +'use strict'; + +module.exports = (server, userHandler) => (login, session, callback) => { + let username = (login.username || '').toString().trim(); + + userHandler.authenticate( + username, + login.password, + { + protocol: 'IMAP', + ip: session.remoteAddress + }, + (err, result) => { + if (err) { + return callback(err); + } + if (!result) { + return callback(); + } + + if (result.scope === 'master' && result.enabled2fa) { + // master password not allowed if 2fa is enabled! + return callback(); + } + + callback(null, { + user: { + id: result.user, + username: result.username + } + }); + } + ); +}; diff --git a/lib/handlers/on-copy.js b/lib/handlers/on-copy.js new file mode 100644 index 00000000..e096fbe7 --- /dev/null +++ b/lib/handlers/on-copy.js @@ -0,0 +1,189 @@ +'use strict'; + +const ObjectID = require('mongodb').ObjectID; +const db = require('../db'); + +// COPY / UID COPY sequence mailbox +module.exports = server => (path, update, session, callback) => { + server.logger.debug( + { + tnx: 'copy', + cid: session.id + }, + '[%s] Copying messages from "%s" to "%s"', + session.id, + path, + update.destination + ); + db.database.collection('mailboxes').findOne({ + user: session.user.id, + path + }, (err, mailbox) => { + if (err) { + return callback(err); + } + if (!mailbox) { + return callback(null, 'NONEXISTENT'); + } + + db.database.collection('mailboxes').findOne({ + user: session.user.id, + path: update.destination + }, (err, target) => { + if (err) { + return callback(err); + } + if (!target) { + return callback(null, 'TRYCREATE'); + } + + let cursor = db.database + .collection('messages') + .find({ + mailbox: mailbox._id, + uid: { + $in: update.messages + } + }) + .sort([['uid', 1]]); // no projection as we need to copy the entire message + + let copiedMessages = 0; + let copiedStorage = 0; + + let updateQuota = next => { + if (!copiedMessages) { + return next(); + } + db.users.collection('users').findOneAndUpdate( + { + _id: mailbox.user + }, + { + $inc: { + storageUsed: copiedStorage + } + }, + next + ); + }; + + let sourceUid = []; + let destinationUid = []; + let processNext = () => { + cursor.next((err, message) => { + if (err) { + return updateQuota(() => callback(err)); + } + if (!message) { + return cursor.close(() => { + updateQuota(() => { + server.notifier.fire(session.user.id, target.path); + return callback(null, true, { + uidValidity: target.uidValidity, + sourceUid, + destinationUid + }); + }); + }); + } + + // Copying is not done in bulk to minimize risk of going out of sync with incremental UIDs + sourceUid.unshift(message.uid); + db.database.collection('mailboxes').findOneAndUpdate({ + _id: target._id + }, { + $inc: { + uidNext: 1 + } + }, { + uidNext: true + }, (err, item) => { + if (err) { + return cursor.close(() => { + updateQuota(() => callback(err)); + }); + } + + if (!item || !item.value) { + // was not able to acquire a lock + return cursor.close(() => { + updateQuota(() => callback(null, 'TRYCREATE')); + }); + } + + let uidNext = item.value.uidNext; + destinationUid.unshift(uidNext); + + message._id = new ObjectID(); + message.mailbox = target._id; + message.uid = uidNext; + + // retention settings + message.exp = !!target.retention; + message.rdate = Date.now() + (target.retention || 0); + + if (!message.meta) { + message.meta = {}; + } + message.meta.source = 'IMAPCOPY'; + + db.database.collection('messages').insertOne(message, err => { + if (err) { + return cursor.close(() => { + updateQuota(() => callback(err)); + }); + } + + copiedMessages++; + copiedStorage += Number(message.size) || 0; + + let attachments = Object.keys(message.map || {}).map(key => message.map[key]); + if (!attachments.length) { + return server.notifier.addEntries( + session.user.id, + target.path, + { + command: 'EXISTS', + uid: message.uid, + message: message._id + }, + processNext + ); + } + + // update attachments + db.gridfs.collection('attachments.files').updateMany({ + _id: { + $in: attachments + } + }, { + $inc: { + 'metadata.c': 1, + 'metadata.m': message.magic + } + }, { + multi: true, + w: 1 + }, err => { + if (err) { + // should we care about this error? + } + server.notifier.addEntries( + session.user.id, + target.path, + { + command: 'EXISTS', + uid: message.uid, + message: message._id + }, + processNext + ); + }); + }); + }); + }); + }; + processNext(); + }); + }); +}; diff --git a/lib/handlers/on-create.js b/lib/handlers/on-create.js new file mode 100644 index 00000000..1626a823 --- /dev/null +++ b/lib/handlers/on-create.js @@ -0,0 +1,57 @@ +'use strict'; + +const db = require('../db'); + +// CREATE "path/to/mailbox" +module.exports = server => (path, session, callback) => { + server.logger.debug( + { + tnx: 'create', + cid: session.id + }, + '[%s] CREATE "%s"', + session.id, + path + ); + db.database.collection('mailboxes').findOne({ + user: session.user.id, + path + }, (err, mailbox) => { + if (err) { + return callback(err); + } + if (mailbox) { + return callback(null, 'ALREADYEXISTS'); + } + + db.users.collection('users').findOne({ + _id: session.user.id + }, { + fields: { + retention: true + } + }, (err, user) => { + if (err) { + return callback(err); + } + + mailbox = { + user: session.user.id, + path, + uidValidity: Math.floor(Date.now() / 1000), + uidNext: 1, + modifyIndex: 0, + subscribed: true, + flags: [], + retention: user.retention + }; + + db.database.collection('mailboxes').insertOne(mailbox, err => { + if (err) { + return callback(err); + } + return callback(null, true); + }); + }); + }); +}; diff --git a/lib/handlers/on-delete.js b/lib/handlers/on-delete.js new file mode 100644 index 00000000..e25550a0 --- /dev/null +++ b/lib/handlers/on-delete.js @@ -0,0 +1,109 @@ +'use strict'; + +const db = require('../db'); + +// DELETE "path/to/mailbox" +module.exports = server => (path, session, callback) => { + server.logger.debug( + { + tnx: 'delete', + cid: session.id + }, + '[%s] DELETE "%s"', + session.id, + path + ); + db.database.collection('mailboxes').findOne({ + user: session.user.id, + path + }, (err, mailbox) => { + if (err) { + return callback(err); + } + if (!mailbox) { + return callback(null, 'NONEXISTENT'); + } + if (mailbox.specialUse) { + return callback(null, 'CANNOT'); + } + + db.database.collection('mailboxes').deleteOne({ + _id: mailbox._id + }, err => { + if (err) { + return callback(err); + } + + // calculate mailbox size by aggregating the size's of all messages + db.database + .collection('messages') + .aggregate( + [ + { + $match: { + mailbox: mailbox._id + } + }, + { + $group: { + _id: { + mailbox: '$mailbox' + }, + storageUsed: { + $sum: '$size' + } + } + } + ], + { + cursor: { + batchSize: 1 + } + } + ) + .toArray((err, res) => { + if (err) { + return callback(err); + } + + let storageUsed = (res && res[0] && res[0].storageUsed) || 0; + + db.database.collection('messages').deleteMany({ + mailbox: mailbox._id + }, err => { + if (err) { + return callback(err); + } + + let done = () => { + db.database.collection('journal').deleteMany({ + mailbox: mailbox._id + }, err => { + if (err) { + return callback(err); + } + callback(null, true); + }); + }; + + if (!storageUsed) { + return done(); + } + + // decrement quota counters + db.users.collection('users').findOneAndUpdate( + { + _id: mailbox.user + }, + { + $inc: { + storageUsed: -Number(storageUsed) || 0 + } + }, + done + ); + }); + }); + }); + }); +}; diff --git a/lib/handlers/on-expunge.js b/lib/handlers/on-expunge.js new file mode 100644 index 00000000..b1b5125b --- /dev/null +++ b/lib/handlers/on-expunge.js @@ -0,0 +1,146 @@ +'use strict'; + +const db = require('../db'); + +// EXPUNGE deletes all messages in selected mailbox marked with \Delete +module.exports = server => (path, update, session, callback) => { + server.logger.debug( + { + tnx: 'expunge', + cid: session.id + }, + '[%s] Deleting messages from "%s"', + session.id, + path + ); + db.database.collection('mailboxes').findOne({ + user: session.user.id, + path + }, (err, mailbox) => { + if (err) { + return callback(err); + } + if (!mailbox) { + return callback(null, 'NONEXISTENT'); + } + + let cursor = db.database + .collection('messages') + .find({ + user: session.user.id, + mailbox: mailbox._id, + deleted: true + }) + .project({ + _id: true, + uid: true, + size: true, + map: true, + magic: true + }) + .sort([['uid', 1]]); + + let deletedMessages = 0; + let deletedStorage = 0; + + let updateQuota = next => { + if (!deletedMessages) { + return next(); + } + + db.users.collection('users').findOneAndUpdate( + { + _id: mailbox.user + }, + { + $inc: { + storageUsed: -deletedStorage + } + }, + next + ); + }; + + let processNext = () => { + cursor.next((err, message) => { + if (err) { + return updateQuota(() => callback(err)); + } + if (!message) { + return cursor.close(() => { + updateQuota(() => { + server.notifier.fire(session.user.id, path); + return callback(null, true); + }); + }); + } + + if (!update.silent) { + session.writeStream.write(session.formatResponse('EXPUNGE', message.uid)); + } + + db.database.collection('messages').deleteOne({ + _id: message._id, + mailbox: mailbox._id, + uid: message.uid + }, err => { + if (err) { + return updateQuota(() => cursor.close(() => callback(err))); + } + + deletedMessages++; + deletedStorage += Number(message.size) || 0; + + let attachments = Object.keys(message.map || {}).map(key => message.map[key]); + + if (!attachments.length) { + // not stored attachments + return server.notifier.addEntries( + session.user.id, + path, + { + command: 'EXPUNGE', + ignore: session.id, + uid: message.uid, + message: message._id + }, + processNext + ); + } + + // remove references to attachments (if any exist) + db.gridfs.collection('attachments.files').updateMany({ + _id: { + $in: attachments + } + }, { + $inc: { + 'metadata.c': -1, + 'metadata.m': -message.magic + } + }, { + multi: true, + w: 1 + }, err => { + if (err) { + // ignore as we don't really care if we have orphans or not + } + server.notifier.addEntries( + session.user.id, + path, + { + command: 'EXPUNGE', + ignore: session.id, + uid: message.uid, + message: message._id + }, + processNext + ); + }); + }); + }); + }; + + processNext(); + }); +}; diff --git a/lib/handlers/on-fetch.js b/lib/handlers/on-fetch.js new file mode 100644 index 00000000..b15acd2f --- /dev/null +++ b/lib/handlers/on-fetch.js @@ -0,0 +1,207 @@ +'use strict'; + +const IMAPServerModule = require('../../imap-core'); +const imapHandler = IMAPServerModule.imapHandler; +const util = require('util'); +const db = require('../db'); +const consts = require('../consts'); + +module.exports = server => (path, options, session, callback) => { + server.logger.debug( + { + tnx: 'fetch', + cid: session.id + }, + '[%s] Requested FETCH for "%s"', + session.id, + path + ); + db.database.collection('mailboxes').findOne({ + user: session.user.id, + path + }, (err, mailbox) => { + if (err) { + return callback(err); + } + if (!mailbox) { + return callback(null, 'NONEXISTENT'); + } + + let projection = { + uid: true, + modseq: true, + idate: true, + flags: true, + envelope: true, + bodystructure: true, + size: true + }; + + if (!options.metadataOnly) { + projection.mimeTree = true; + } + + let query = { + mailbox: mailbox._id + }; + + if (options.changedSince) { + query = { + mailbox: mailbox._id, + modseq: { + $gt: options.changedSince + } + }; + } + + let queryAll = false; + if (options.messages.length !== session.selected.uidList.length) { + // do not use uid selector for 1:* + query.uid = { + $in: options.messages + }; + } else { + // 1:* + queryAll = true; + } + + let isUpdated = false; + let updateEntries = []; + let notifyEntries = []; + + let done = (...args) => { + if (updateEntries.length) { + return db.database.collection('messages').bulkWrite(updateEntries, { + ordered: false, + w: 1 + }, () => { + updateEntries = []; + server.notifier.addEntries(session.user.id, path, notifyEntries, () => { + notifyEntries = []; + server.notifier.fire(session.user.id, path); + return callback(...args); + }); + }); + } + if (isUpdated) { + server.notifier.fire(session.user.id, path); + } + return callback(...args); + }; + + let cursor = db.database.collection('messages').find(query).project(projection).sort([['uid', 1]]); + + let rowCount = 0; + let processNext = () => { + cursor.next((err, message) => { + if (err) { + return done(err); + } + if (!message) { + return cursor.close(() => { + done(null, true); + }); + } + + if (queryAll && !session.selected.uidList.includes(message.uid)) { + // skip processing messages that we do not know about yet + return processNext(); + } + + let markAsSeen = options.markAsSeen && !message.flags.includes('\\Seen'); + if (markAsSeen) { + message.flags.unshift('\\Seen'); + } + + let stream = imapHandler.compileStream( + session.formatResponse('FETCH', message.uid, { + query: options.query, + values: session.getQueryResponse(options.query, message, { + logger: server.logger, + fetchOptions: {}, + database: db.database, + gridfs: db.gridfs, + acceptUTF8Enabled: session.isUTF8Enabled() + }) + }) + ); + + stream.description = util.format('* FETCH #%s uid=%s size=%sB ', ++rowCount, message.uid, message.size); + + stream.on('error', err => { + session.socket.write('* BYE INTERNAL ERROR\n'); + session.socket.destroy(); // ended up in erroneus state, kill the connection to abort + return cursor.close(() => done(err)); + }); + + // send formatted response to socket + session.writeStream.write(stream, () => { + if (!markAsSeen) { + return processNext(); + } + + server.logger.debug( + { + tnx: 'flags', + cid: session.id + }, + '[%s] UPDATE FLAGS for "%s"', + session.id, + message.uid + ); + + isUpdated = true; + + updateEntries.push({ + updateOne: { + filter: { + _id: message._id, + // include sharding key in query + mailbox: mailbox._id, + uid: message.uid + }, + update: { + $addToSet: { + flags: '\\Seen' + }, + $set: { + seen: true + } + } + } + }); + + notifyEntries.push({ + command: 'FETCH', + ignore: session.id, + uid: message.uid, + flags: message.flags, + message: message._id + }); + + if (updateEntries.length >= consts.BULK_BATCH_SIZE) { + return db.database.collection('messages').bulkWrite(updateEntries, { + ordered: false, + w: 1 + }, err => { + updateEntries = []; + if (err) { + return cursor.close(() => done(err)); + } + + server.notifier.addEntries(session.user.id, path, notifyEntries, () => { + notifyEntries = []; + server.notifier.fire(session.user.id, path); + processNext(); + }); + }); + } else { + processNext(); + } + }); + }); + }; + + processNext(); + }); +}; diff --git a/lib/handlers/on-get-quota-root.js b/lib/handlers/on-get-quota-root.js new file mode 100644 index 00000000..e5cb47a7 --- /dev/null +++ b/lib/handlers/on-get-quota-root.js @@ -0,0 +1,44 @@ +'use strict'; + +const db = require('../db'); + +module.exports = server => (path, session, callback) => { + server.logger.debug( + { + tnx: 'quota', + cid: session.id + }, + '[%s] Requested quota root info for "%s"', + session.id, + path + ); + + db.database.collection('mailboxes').findOne({ + user: session.user.id, + path + }, (err, mailbox) => { + if (err) { + return callback(err); + } + if (!mailbox) { + return callback(null, 'NONEXISTENT'); + } + + db.users.collection('users').findOne({ + _id: session.user.id + }, (err, user) => { + if (err) { + return callback(err); + } + if (!user) { + return callback(new Error('User data not found')); + } + + return callback(null, { + root: '', + quota: user.quota || server.options.maxStorage || 0, + storageUsed: Math.max(user.storageUsed || 0, 0) + }); + }); + }); +}; diff --git a/lib/handlers/on-get-quota.js b/lib/handlers/on-get-quota.js new file mode 100644 index 00000000..2deb4e17 --- /dev/null +++ b/lib/handlers/on-get-quota.js @@ -0,0 +1,36 @@ +'use strict'; + +const db = require('../db'); + +module.exports = server => (quotaRoot, session, callback) => { + server.logger.debug( + { + tnx: 'quota', + cid: session.id + }, + '[%s] Requested quota info for "%s"', + session.id, + quotaRoot + ); + + if (quotaRoot !== '') { + return callback(null, 'NONEXISTENT'); + } + + db.users.collection('users').findOne({ + _id: session.user.id + }, (err, user) => { + if (err) { + return callback(err); + } + if (!user) { + return callback(new Error('User data not found')); + } + + return callback(null, { + root: '', + quota: user.quota || server.options.maxStorage || 0, + storageUsed: Math.max(user.storageUsed || 0, 0) + }); + }); +}; diff --git a/lib/handlers/on-list.js b/lib/handlers/on-list.js new file mode 100644 index 00000000..229bf169 --- /dev/null +++ b/lib/handlers/on-list.js @@ -0,0 +1,25 @@ +'use strict'; + +const db = require('../db'); + +// LIST "" "*" +// Returns all folders, query is informational +// folders is either an Array or a Map +module.exports = server => + (server.onList = function(query, session, callback) { + server.logger.debug( + { + tnx: 'list', + cid: session.id + }, + '[%s] LIST for "%s"', + session.id, + query + ); + db.database + .collection('mailboxes') + .find({ + user: session.user.id + }) + .toArray(callback); + }); diff --git a/lib/handlers/on-lsub.js b/lib/handlers/on-lsub.js new file mode 100644 index 00000000..1ac15e5b --- /dev/null +++ b/lib/handlers/on-lsub.js @@ -0,0 +1,25 @@ +'use strict'; + +const db = require('../db'); + +// LSUB "" "*" +// Returns all subscribed folders, query is informational +// folders is either an Array or a Map +module.exports = server => (query, session, callback) => { + server.logger.debug( + { + tnx: 'lsub', + cid: session.id + }, + '[%s] LSUB for "%s"', + session.id, + query + ); + db.database + .collection('mailboxes') + .find({ + user: session.user.id, + subscribed: true + }) + .toArray(callback); +}; diff --git a/lib/handlers/on-move.js b/lib/handlers/on-move.js new file mode 100644 index 00000000..84d7e7c4 --- /dev/null +++ b/lib/handlers/on-move.js @@ -0,0 +1,43 @@ +'use strict'; + +// MOVE / UID MOVE sequence mailbox +module.exports = (server, messageHandler) => (path, update, session, callback) => { + server.logger.debug( + { + tnx: 'move', + cid: session.id + }, + '[%s] Moving messages from "%s" to "%s"', + session.id, + path, + update.destination + ); + + messageHandler.move( + { + user: session.user.id, + // folder to move messages from + source: { + user: session.user.id, + path + }, + // folder to move messages to + destination: { + user: session.user.id, + path: update.destination + }, + session, + // list of UIDs to move + messages: update.messages + }, + (...args) => { + if (args[0]) { + if (args[0].imapResponse) { + return callback(null, args[0].imapResponse); + } + return callback(args[0]); + } + callback(...args); + } + ); +}; diff --git a/lib/handlers/on-open.js b/lib/handlers/on-open.js new file mode 100644 index 00000000..f9ef3adc --- /dev/null +++ b/lib/handlers/on-open.js @@ -0,0 +1,44 @@ +'use strict'; + +const db = require('../db'); + +// SELECT/EXAMINE +module.exports = server => (path, session, callback) => { + server.logger.debug( + { + tnx: 'open', + cid: session.id + }, + '[%s] Opening "%s"', + session.id, + path + ); + db.database.collection('mailboxes').findOne({ + user: session.user.id, + path + }, (err, mailbox) => { + if (err) { + return callback(err); + } + if (!mailbox) { + return callback(null, 'NONEXISTENT'); + } + + db.database + .collection('messages') + .find({ + mailbox: mailbox._id + }) + .project({ + uid: true + }) + .sort([['uid', 1]]) + .toArray((err, messages) => { + if (err) { + return callback(err); + } + mailbox.uidList = messages.map(message => message.uid); + callback(null, mailbox); + }); + }); +}; diff --git a/lib/handlers/on-rename.js b/lib/handlers/on-rename.js new file mode 100644 index 00000000..2aeab376 --- /dev/null +++ b/lib/handlers/on-rename.js @@ -0,0 +1,49 @@ +'use strict'; + +const db = require('../db'); + +// RENAME "path/to/mailbox" "new/path" +// NB! RENAME affects child and hierarchy mailboxes as well, this example does not do this +module.exports = server => (path, newname, session, callback) => { + server.logger.debug( + { + tnx: 'rename', + cid: session.id + }, + '[%s] RENAME "%s" to "%s"', + session.id, + path, + newname + ); + db.database.collection('mailboxes').findOne({ + user: session.user.id, + path: newname + }, (err, mailbox) => { + if (err) { + return callback(err); + } + if (mailbox) { + return callback(null, 'ALREADYEXISTS'); + } + + db.database.collection('mailboxes').findOneAndUpdate({ + user: session.user.id, + path + }, { + $set: { + path: newname + } + }, {}, (err, item) => { + if (err) { + return callback(err); + } + + if (!item || !item.value) { + // was not able to acquire a lock + return callback(null, 'NONEXISTENT'); + } + + callback(null, true); + }); + }); +}; diff --git a/lib/handlers/on-search.js b/lib/handlers/on-search.js new file mode 100644 index 00000000..0fa25e8d --- /dev/null +++ b/lib/handlers/on-search.js @@ -0,0 +1,366 @@ +'use strict'; + +const db = require('../db'); + +/** + * Returns an array of matching UID values + */ +module.exports = server => (path, options, session, callback) => { + db.database.collection('mailboxes').findOne({ + user: session.user.id, + path + }, (err, mailbox) => { + if (err) { + return callback(err); + } + if (!mailbox) { + return callback(null, 'NONEXISTENT'); + } + + // prepare query + + let query = { + mailbox: mailbox._id + }; + + let walkQuery = (parent, ne, node) => { + node.forEach(term => { + switch (term.key) { + case 'all': + if (ne) { + parent.push({ + // should not match anything + _id: -1 + }); + } + break; + + case 'not': + walkQuery(parent, !ne, [].concat(term.value || [])); + break; + + case 'or': { + let $or = []; + + [].concat(term.value || []).forEach(entry => { + walkQuery($or, false, [].concat(entry || [])); + }); + + if ($or.length) { + parent.push({ + $or + }); + } + + break; + } + + case 'text': // search over entire email + case 'body': // search over email body + if (term.value && !ne) { + // fulltext can only be in the root of the query, not in $not, $or expressions + // https://docs.mongodb.com/v3.4/tutorial/text-search-in-aggregation/#restrictions + query.user = session.user.id; + query.$text = { + $search: term.value + }; + } else { + // can not search by text + parent.push({ + // should not match anything + _id: -1 + }); + } + break; + + case 'modseq': + parent.push({ + modseq: { + [!ne ? '$gte' : '$lt']: term.value + } + }); + break; + + case 'uid': + if (Array.isArray(term.value)) { + if (!term.value.length) { + // trying to find a message that does not exist + return callback(null, { + uidList: [], + highestModseq: 0 + }); + } + if (term.value.length !== session.selected.uidList.length) { + // not 1:* + parent.push({ + uid: { + [!ne ? '$in' : '$nin']: term.value + } + }); + } else if (ne) { + parent.push({ + // should not match anything + _id: -1 + }); + } + } else { + parent.push({ + uid: { + [!ne ? '$eq' : '$ne']: term.value + } + }); + } + break; + + case 'flag': + { + switch (term.value) { + case '\\Seen': + case '\\Deleted': + case '\\Flagged': + case '\\Draft': + if (term.exists) { + parent.push({ + [term.value.toLowerCase().substr(1)]: !ne + }); + } else { + parent.push({ + [term.value.toLowerCase().substr(1)]: ne + }); + } + break; + default: + if (term.exists) { + parent.push({ + flags: { + [!ne ? '$eq' : '$ne']: term.value + } + }); + } else { + parent.push({ + flags: { + [!ne ? '$ne' : '$eq']: term.value + } + }); + } + } + } + break; + + case 'header': + { + let regex = Buffer.from(term.value, 'binary').toString().replace(/[-/\\^$*+?.()|[\]{}]/g, '\\$&'); + let entry = term.value + ? { + headers: { + $elemMatch: { + key: term.header, + value: !ne + ? { + $regex: regex, + $options: 'i' + } + : { + $not: { + $regex: regex, + $options: 'i' + } + } + } + } + } + : { + 'headers.key': !ne + ? term.header + : { + $ne: term.header + } + }; + parent.push(entry); + } + break; + + case 'internaldate': + { + let op = false; + let value = new Date(term.value + ' GMT'); + switch (term.operator) { + case '<': + op = '$lt'; + break; + case '<=': + op = '$lte'; + break; + case '>': + op = '$gt'; + break; + case '>=': + op = '$gte'; + break; + } + let entry = !op + ? [ + { + $gte: value + }, + { + $lt: new Date(value.getTime() + 24 * 3600 * 1000) + } + ] + : { + [op]: value + }; + + entry = { + idate: !ne + ? entry + : { + $not: entry + } + }; + + parent.push(entry); + } + break; + + case 'headerdate': + { + let op = false; + let value = new Date(term.value + ' GMT'); + switch (term.operator) { + case '<': + op = '$lt'; + break; + case '<=': + op = '$lte'; + break; + case '>': + op = '$gt'; + break; + case '>=': + op = '$gte'; + break; + } + let entry = !op + ? [ + { + $gte: value + }, + { + $lt: new Date(value.getTime() + 24 * 3600 * 1000) + } + ] + : { + [op]: value + }; + + entry = { + hdate: !ne + ? entry + : { + $not: entry + } + }; + + parent.push(entry); + } + break; + + case 'size': + { + let op = '$eq'; + let value = Number(term.value) || 0; + switch (term.operator) { + case '<': + op = '$lt'; + break; + case '<=': + op = '$lte'; + break; + case '>': + op = '$gt'; + break; + case '>=': + op = '$gte'; + break; + } + + let entry = { + [op]: value + }; + + entry = { + size: !ne + ? entry + : { + $not: entry + } + }; + + parent.push(entry); + } + break; + } + }); + }; + + let $and = []; + walkQuery($and, false, options.query); + if ($and.length) { + query.$and = $and; + } + + server.logger.info( + { + tnx: 'search', + cid: session.id + }, + '[%s] SEARCH %s', + session.id, + JSON.stringify(query) + ); + + let cursor = db.database.collection('messages').find(query).project({ + uid: true, + modseq: true + }); + + let highestModseq = 0; + let uidList = []; + + let processNext = () => { + cursor.next((err, message) => { + if (err) { + server.logger.error( + { + tnx: 'search', + cid: session.id + }, + '[%s] SEARCHFAIL %s error="%s"', + session.id, + JSON.stringify(query), + err.message + ); + return callback(new Error('Can not make requested search query')); + } + if (!message) { + return cursor.close(() => + callback(null, { + uidList, + highestModseq + }) + ); + } + + if (highestModseq < message.modseq) { + highestModseq = message.modseq; + } + + uidList.push(message.uid); + processNext(); + }); + }; + + processNext(); + }); +}; diff --git a/lib/handlers/on-status.js b/lib/handlers/on-status.js new file mode 100644 index 00000000..339953e8 --- /dev/null +++ b/lib/handlers/on-status.js @@ -0,0 +1,56 @@ +'use strict'; + +const db = require('../db'); + +// STATUS (X Y X) +module.exports = server => (path, session, callback) => { + server.logger.debug( + { + tnx: 'status', + cid: session.id + }, + '[%s] Requested status for "%s"', + session.id, + path + ); + db.database.collection('mailboxes').findOne({ + user: session.user.id, + path + }, (err, mailbox) => { + if (err) { + return callback(err); + } + if (!mailbox) { + return callback(null, 'NONEXISTENT'); + } + + db.database + .collection('messages') + .find({ + mailbox: mailbox._id + }) + .count((err, total) => { + if (err) { + return callback(err); + } + db.database + .collection('messages') + .find({ + mailbox: mailbox._id, + seen: false + }) + .count((err, unseen) => { + if (err) { + return callback(err); + } + + return callback(null, { + messages: total, + uidNext: mailbox.uidNext, + uidValidity: mailbox.uidValidity, + unseen + }); + }); + }); + }); +}; diff --git a/lib/handlers/on-store.js b/lib/handlers/on-store.js new file mode 100644 index 00000000..87e677b6 --- /dev/null +++ b/lib/handlers/on-store.js @@ -0,0 +1,355 @@ +'use strict'; + +const imapTools = require('../../imap-core/lib/imap-tools'); +const db = require('../db'); +const consts = require('../consts'); + +// STORE / UID STORE, updates flags for selected UIDs +module.exports = server => (path, update, session, callback) => { + server.logger.debug( + { + tnx: 'store', + cid: session.id + }, + '[%s] Updating messages in "%s"', + session.id, + path + ); + db.database.collection('mailboxes').findOne({ + user: session.user.id, + path + }, (err, mailbox) => { + if (err) { + return callback(err); + } + + if (!mailbox) { + return callback(null, 'NONEXISTENT'); + } + + let query = { + mailbox: mailbox._id + }; + + if (update.unchangedSince) { + query = { + mailbox: mailbox._id, + modseq: { + $lte: update.unchangedSince + } + }; + } + + let queryAll = false; + if (update.messages.length !== session.selected.uidList.length) { + // do not use uid selector for 1:* + query.uid = { + $in: update.messages + }; + } else { + // 1:* + queryAll = true; + } + + let cursor = db.database + .collection('messages') + .find(query) + .project({ + _id: true, + uid: true, + flags: true + }) + .sort([['uid', 1]]); + + let updateEntries = []; + let notifyEntries = []; + + let done = (...args) => { + if (updateEntries.length) { + return db.database.collection('messages').bulkWrite(updateEntries, { + ordered: false, + w: 1 + }, () => { + updateEntries = []; + server.notifier.addEntries(session.user.id, path, notifyEntries, () => { + notifyEntries = []; + server.notifier.fire(session.user.id, path); + if (args[0]) { + // first argument is an error + return callback(...args); + } else { + updateMailboxFlags(mailbox, update, () => callback(...args)); + } + }); + }); + } + server.notifier.fire(session.user.id, path); + if (args[0]) { + // first argument is an error + return callback(...args); + } else { + updateMailboxFlags(mailbox, update, () => callback(...args)); + } + }; + + // We have to process all messages one by one instead of just calling an update + // for all messages as we need to know which messages were exactly modified, + // otherwise we can't send flag update notifications and modify modseq values + let processNext = () => { + cursor.next((err, message) => { + if (err) { + return done(err); + } + if (!message) { + return cursor.close(() => done(null, true)); + } + if (queryAll && !session.selected.uidList.includes(message.uid)) { + // skip processing messages that we do not know about yet + return processNext(); + } + + let flagsupdate = false; // query object for updates + + let updated = false; + let existingFlags = message.flags.map(flag => flag.toLowerCase().trim()); + switch (update.action) { + case 'set': + // check if update set matches current or is different + if ( + // if length does not match + existingFlags.length !== update.value.length || + // or a new flag was found + update.value.filter(flag => !existingFlags.includes(flag.toLowerCase().trim())).length + ) { + updated = true; + } + + message.flags = [].concat(update.value); + + // set flags + if (updated) { + flagsupdate = { + $set: { + flags: message.flags, + seen: message.flags.includes('\\Seen'), + flagged: message.flags.includes('\\Flagged'), + deleted: message.flags.includes('\\Deleted'), + draft: message.flags.includes('\\Draft') + } + }; + } + break; + + case 'add': { + let newFlags = []; + message.flags = message.flags.concat( + update.value.filter(flag => { + if (!existingFlags.includes(flag.toLowerCase().trim())) { + updated = true; + newFlags.push(flag); + return true; + } + return false; + }) + ); + + // add flags + if (updated) { + flagsupdate = { + $addToSet: { + flags: { + $each: newFlags + } + } + }; + + if ( + newFlags.includes('\\Seen') || + newFlags.includes('\\Flagged') || + newFlags.includes('\\Deleted') || + newFlags.includes('\\Draft') + ) { + flagsupdate.$set = {}; + if (newFlags.includes('\\Seen')) { + flagsupdate.$set = { + seen: true + }; + } + if (newFlags.includes('\\Flagged')) { + flagsupdate.$set = { + flagged: true + }; + } + if (newFlags.includes('\\Deleted')) { + flagsupdate.$set = { + deleted: true + }; + } + if (newFlags.includes('\\Draft')) { + flagsupdate.$set = { + draft: true + }; + } + } + } + break; + } + + case 'remove': { + // We need to use the case of existing flags when removing + let oldFlags = []; + let flagsUpdates = update.value.map(flag => flag.toLowerCase().trim()); + message.flags = message.flags.filter(flag => { + if (!flagsUpdates.includes(flag.toLowerCase().trim())) { + return true; + } + oldFlags.push(flag); + updated = true; + return false; + }); + + // remove flags + if (updated) { + flagsupdate = { + $pull: { + flags: { + $in: oldFlags + } + } + }; + if ( + oldFlags.includes('\\Seen') || + oldFlags.includes('\\Flagged') || + oldFlags.includes('\\Deleted') || + oldFlags.includes('\\Draft') + ) { + flagsupdate.$set = {}; + if (oldFlags.includes('\\Seen')) { + flagsupdate.$set = { + seen: false + }; + } + if (oldFlags.includes('\\Flagged')) { + flagsupdate.$set = { + flagged: false + }; + } + if (oldFlags.includes('\\Deleted')) { + flagsupdate.$set = { + deleted: false + }; + } + if (oldFlags.includes('\\Draft')) { + flagsupdate.$set = { + draft: false + }; + } + } + } + break; + } + } + + if (!update.silent) { + // print updated state of the message + session.writeStream.write( + session.formatResponse('FETCH', message.uid, { + uid: update.isUid ? message.uid : false, + flags: message.flags + }) + ); + } + + if (updated) { + updateEntries.push({ + updateOne: { + filter: { + _id: message._id, + // include shard key data as well + mailbox: mailbox._id, + uid: message.uid + }, + update: flagsupdate + } + }); + + notifyEntries.push({ + command: 'FETCH', + ignore: session.id, + uid: message.uid, + flags: message.flags, + message: message._id + }); + + if (updateEntries.length >= consts.BULK_BATCH_SIZE) { + return db.database.collection('messages').bulkWrite(updateEntries, { + ordered: false, + w: 1 + }, err => { + updateEntries = []; + if (err) { + return cursor.close(() => done(err)); + } + + server.notifier.addEntries(session.user.id, path, notifyEntries, () => { + notifyEntries = []; + server.notifier.fire(session.user.id, path); + processNext(); + }); + }); + } else { + processNext(); + } + } else { + processNext(); + } + }); + }; + + processNext(); + }); +}; + +function updateMailboxFlags(mailbox, update, callback) { + if (update.action === 'remove') { + // we didn't add any new flags, so there's nothing to update + return callback(); + } + + let mailboxFlags = imapTools.systemFlags.concat(mailbox.flags || []).map(flag => flag.trim().toLowerCase()); + let newFlags = []; + + // find flags that are not listed with mailbox + update.value.forEach(flag => { + // limit mailbox flags by 100 + if (mailboxFlags.length + newFlags.length >= 100) { + return; + } + // if mailbox does not have such flag, then add it + if (!mailboxFlags.includes(flag.toLowerCase().trim())) { + newFlags.push(flag); + } + }); + + // nothing new found + if (!newFlags.length) { + return callback(); + } + + // found some new flags not yet set for mailbox + // FIXME: Should we send unsolicited FLAGS and PERMANENTFLAGS notifications? Probably not + return db.database.collection('mailboxes').findOneAndUpdate( + { + _id: mailbox._id + }, + { + $addToSet: { + flags: { + $each: newFlags + } + } + }, + {}, + callback + ); +} diff --git a/lib/handlers/on-subscribe.js b/lib/handlers/on-subscribe.js new file mode 100644 index 00000000..e1a17f32 --- /dev/null +++ b/lib/handlers/on-subscribe.js @@ -0,0 +1,35 @@ +'use strict'; + +const db = require('../db'); + +// SUBSCRIBE "path/to/mailbox" +module.exports = server => (path, session, callback) => { + server.logger.debug( + { + tnx: 'subscribe', + cid: session.id + }, + '[%s] SUBSCRIBE to "%s"', + session.id, + path + ); + db.database.collection('mailboxes').findOneAndUpdate({ + user: session.user.id, + path + }, { + $set: { + subscribed: true + } + }, {}, (err, item) => { + if (err) { + return callback(err); + } + + if (!item || !item.value) { + // was not able to acquire a lock + return callback(null, 'NONEXISTENT'); + } + + callback(null, true); + }); +}; diff --git a/lib/handlers/on-unsubscribe.js b/lib/handlers/on-unsubscribe.js new file mode 100644 index 00000000..b2e0351b --- /dev/null +++ b/lib/handlers/on-unsubscribe.js @@ -0,0 +1,35 @@ +'use strict'; + +const db = require('../db'); + +// UNSUBSCRIBE "path/to/mailbox" +module.exports = server => (path, session, callback) => { + server.logger.debug( + { + tnx: 'unsubscribe', + cid: session.id + }, + '[%s] UNSUBSCRIBE from "%s"', + session.id, + path + ); + db.database.collection('mailboxes').findOneAndUpdate({ + user: session.user.id, + path + }, { + $set: { + subscribed: false + } + }, {}, (err, item) => { + if (err) { + return callback(err); + } + + if (!item || !item.value) { + // was not able to acquire a lock + return callback(null, 'NONEXISTENT'); + } + + callback(null, true); + }); +}; diff --git a/lib/imap-notifier.js b/lib/imap-notifier.js index 0def551d..3989c874 100644 --- a/lib/imap-notifier.js +++ b/lib/imap-notifier.js @@ -213,7 +213,7 @@ class ImapNotifier extends EventEmitter { _id: { $in: updated }, - user: mailbox.user + mailbox: mailbox._id }, { // only update modseq if the new value is larger than old one $max: { diff --git a/lib/message-handler.js b/lib/message-handler.js index 70577a9c..1b4fe6d3 100644 --- a/lib/message-handler.js +++ b/lib/message-handler.js @@ -317,7 +317,6 @@ class MessageHandler { checkExistingMessage(user, mailboxId, message, options, callback) { // if a similar message already exists then update existing one this.database.collection('messages').findOne({ - user, mailbox: mailboxId, hdate: message.hdate, msgid: message.msgid @@ -372,7 +371,9 @@ class MessageHandler { this.database.collection('messages').findOneAndUpdate({ _id: existing._id, - user: mailbox.user + // hash key + mailbox: mailbox._id, + uid: existing.uid }, { $set: { uid, @@ -466,7 +467,8 @@ class MessageHandler { this.database.collection('messages').deleteOne({ _id: message._id, - user: mailbox.user + mailbox: mailbox._id, + uid: message.uid }, err => { if (err) { return callback(err); @@ -562,15 +564,11 @@ class MessageHandler { let cursor = this.database .collection('messages') .find({ - user: mailbox, mailbox: mailbox._id, uid: { $in: options.messages || [] } }) - .project({ - uid: 1 - }) .sort([['uid', 1]]); let sourceUid = []; @@ -615,7 +613,10 @@ class MessageHandler { return cursor.close(done); } - sourceUid.unshift(message.uid); + let messageId = message._id; + let messageUid = message.uid; + + sourceUid.unshift(messageUid); this.database.collection('mailboxes').findOneAndUpdate({ _id: target._id }, { @@ -636,66 +637,74 @@ class MessageHandler { let uidNext = item.value.uidNext; destinationUid.unshift(uidNext); - let updateOptions = { - $set: { - mailbox: target._id, - // new mailbox means new UID - uid: uidNext, - // this will be changed later by the notification system - modseq: 0, + // set new mailbox + message.mailbox = target._id; - // retention settings - exp: !!target.retention, - rdate: Date.now() + (target.retention || 0) - } - }; + // new mailbox means new UID + message.uid = uidNext; + + // this will be changed later by the notification system + message.modseq = 0; + + // retention settings + message.exp = !!target.retention; + message.rdate = Date.now() + (target.retention || 0); if (options.markAsSeen) { - updateOptions.$set.seen = true; - updateOptions.$addToSet = { - flags: '\\Seen' - }; + message.seen = true; + if (!message.flags.includes('\\Seen')) { + message.flags.push('\\Seen'); + } } - // update message, change mailbox from old to new one - this.database.collection('messages').findOneAndUpdate({ - _id: message._id, - user: mailbox.user - }, updateOptions, err => { + this.database.collection('messages').insertOne(message, (err, r) => { if (err) { return cursor.close(() => done(err)); } - if (options.session) { - options.session.writeStream.write(options.session.formatResponse('EXPUNGE', message.uid)); - } + let insertId = r.insertedId; - removeEntries.push({ - command: 'EXPUNGE', - ignore: options.session && options.session.id, - uid: message.uid - }); + // delete old message + this.database.collection('messages').deleteOne({ + _id: messageId, + mailbox: mailbox._id, + uid: messageUid + }, err => { + if (err) { + return cursor.close(() => done(err)); + } - existsEntries.push({ - command: 'EXISTS', - uid: uidNext, - message: message._id - }); + if (options.session) { + options.session.writeStream.write(options.session.formatResponse('EXPUNGE', sourceUid)); + } - if (existsEntries.length >= BULK_BATCH_SIZE) { - // mark messages as deleted from old mailbox - return this.notifier.addEntries(mailbox, false, removeEntries, () => { - // mark messages as added to new mailbox - this.notifier.addEntries(target, false, existsEntries, () => { - removeEntries = []; - existsEntries = []; - this.notifier.fire(mailbox.user, mailbox.path); - this.notifier.fire(target.user, target.path); - processNext(); - }); + removeEntries.push({ + command: 'EXPUNGE', + ignore: options.session && options.session.id, + uid: sourceUid }); - } - processNext(); + + existsEntries.push({ + command: 'EXISTS', + uid: uidNext, + message: insertId + }); + + if (existsEntries.length >= BULK_BATCH_SIZE) { + // mark messages as deleted from old mailbox + return this.notifier.addEntries(mailbox, false, removeEntries, () => { + // mark messages as added to new mailbox + this.notifier.addEntries(target, false, existsEntries, () => { + removeEntries = []; + existsEntries = []; + this.notifier.fire(mailbox.user, mailbox.path); + this.notifier.fire(target.user, target.path); + processNext(); + }); + }); + } + processNext(); + }); }); }); }); diff --git a/lib/pop3-connection.js b/lib/pop3-connection.js index 086e72b1..f5c65cea 100644 --- a/lib/pop3-connection.js +++ b/lib/pop3-connection.js @@ -626,7 +626,7 @@ class POP3Connection extends EventEmitter { return next(); } - this._server.onFetchMessage(message.id, this.session, (err, stream) => { + this._server.onFetchMessage(message, this.session, (err, stream) => { if (err) { return next(err); } @@ -676,7 +676,7 @@ class POP3Connection extends EventEmitter { return next(); } - this._server.onFetchMessage(message.id, this.session, (err, stream) => { + this._server.onFetchMessage(message, this.session, (err, stream) => { if (err) { return next(err); } diff --git a/lib/pop3-server.js b/lib/pop3-server.js index 12c6c002..bd9f3c64 100644 --- a/lib/pop3-server.js +++ b/lib/pop3-server.js @@ -171,7 +171,7 @@ class POP3Server extends EventEmitter { } // called when a message body needs to be fetched - onFetchMessage(id, session, callback) { + onFetchMessage(message, session, callback) { // should return a stream object return callback(null, false); } diff --git a/pop3.js b/pop3.js index 4dd75813..81d1785c 100644 --- a/pop3.js +++ b/pop3.js @@ -84,12 +84,12 @@ const serverOptions = { db.database .collection('messages') .find({ - user: session.user.id, mailbox: mailbox._id }) .project({ uid: true, size: true, + mailbox: true, // required to decide if we need to update flags after RETR flags: true, seen: true @@ -109,6 +109,7 @@ const serverOptions = { .map(message => ({ id: message._id.toString(), uid: message.uid, + mailbox: message.mailbox, size: message.size, flags: message.flags, seen: message.seen @@ -120,10 +121,12 @@ const serverOptions = { }); }, - onFetchMessage(id, session, callback) { + onFetchMessage(message, session, callback) { db.database.collection('messages').findOne({ - _id: new ObjectID(id), - user: session.user.id + _id: new ObjectID(message.id), + // shard key + mailbox: message.mailbox, + uid: message.uid }, { mimeTree: true, size: true