2017-03-21 06:07:23 +08:00
'use strict' ;
const config = require ( 'config' ) ;
const uuidV1 = require ( 'uuid/v1' ) ;
const ObjectID = require ( 'mongodb' ) . ObjectID ;
const RedFour = require ( 'redfour' ) ;
const Indexer = require ( '../imap-core/lib/indexer/indexer' ) ;
const ImapNotifier = require ( './imap-notifier' ) ;
2017-03-30 01:06:09 +08:00
const tools = require ( './tools' ) ;
2017-04-02 00:22:47 +08:00
const libmime = require ( 'libmime' ) ;
2017-03-21 06:07:23 +08:00
2017-04-09 17:33:10 +08:00
// home many modifications to cache before writing
const BULK _BATCH _SIZE = 150 ;
2017-03-21 06:07:23 +08:00
class MessageHandler {
constructor ( database ) {
this . database = database ;
this . indexer = new Indexer ( {
database
} ) ;
this . notifier = new ImapNotifier ( {
database ,
pushOnly : true
} ) ;
this . redlock = new RedFour ( {
2017-03-30 01:06:09 +08:00
redis : tools . redisConfig ( config . redis ) ,
2017-03-21 06:07:23 +08:00
namespace : 'wildduck'
} ) ;
}
getMailbox ( options , callback ) {
let query = { } ;
if ( options . mailbox ) {
if ( typeof options . mailbox === 'object' && options . mailbox . _id ) {
2017-04-10 22:12:47 +08:00
return setImmediate ( ( ) => callback ( null , options . mailbox ) ) ;
2017-03-21 06:07:23 +08:00
}
query . _id = options . mailbox ;
} else {
2017-03-22 16:30:10 +08:00
query . user = options . user ;
2017-04-11 05:36:22 +08:00
if ( options . specialUse ) {
query . specialUse = options . specialUse ;
} else {
query . path = options . path ;
}
2017-03-21 06:07:23 +08:00
}
2017-04-11 05:36:22 +08:00
2017-03-21 06:07:23 +08:00
this . database . collection ( 'mailboxes' ) . findOne ( query , ( err , mailbox ) => {
if ( err ) {
return callback ( err ) ;
}
if ( ! mailbox ) {
let err = new Error ( 'Mailbox is missing' ) ;
err . imapResponse = 'TRYCREATE' ;
return callback ( err ) ;
}
callback ( null , mailbox ) ;
} ) ;
}
2017-04-11 05:36:22 +08:00
// Monster method for inserting new messages to a mailbox
// TODO: Refactor into smaller pieces
2017-03-21 06:07:23 +08:00
add ( options , callback ) {
2017-04-13 16:35:39 +08:00
let prepared = options . prepared || this . prepareMessage ( options ) ;
let id = prepared . id ;
let mimeTree = prepared . mimeTree ;
let size = prepared . size ;
let bodystructure = prepared . bodystructure ;
let envelope = prepared . envelope ;
let idate = prepared . idate ;
let hdate = prepared . hdate ;
let flags = prepared . flags ;
let msgid = prepared . msgid ;
let headers = prepared . headers ;
2017-04-02 00:22:47 +08:00
2017-03-21 06:07:23 +08:00
this . getMailbox ( options , ( err , mailbox ) => {
if ( err ) {
return callback ( err ) ;
}
2017-04-11 05:36:22 +08:00
// if a similar message already exists then update existing one
2017-04-10 22:12:47 +08:00
let checkExisting = next => {
this . database . collection ( 'messages' ) . findOne ( {
mailbox : mailbox . _id ,
hdate ,
msgid
} , ( err , existing ) => {
if ( err ) {
return callback ( err ) ;
}
2017-04-05 16:39:42 +08:00
2017-04-10 22:12:47 +08:00
if ( ! existing ) {
2017-04-11 05:36:22 +08:00
// nothing to do here, continue adding message
2017-04-10 22:12:47 +08:00
return next ( ) ;
}
2017-04-05 16:39:42 +08:00
2017-04-10 22:12:47 +08:00
if ( options . skipExisting ) {
// message already exists, just skip it
2017-04-12 19:52:29 +08:00
return callback ( null , false , {
2017-04-12 20:18:22 +08:00
id : existing . _id
2017-04-12 19:52:29 +08:00
} ) ;
2017-04-10 22:12:47 +08:00
}
2017-04-05 16:39:42 +08:00
2017-04-11 05:36:22 +08:00
// As duplicate message was found, update UID, MODSEQ and FLAGS
// Ensure sequential UID by locking mailbox
this . redlock . waitAcquireLock ( mailbox . _id . toString ( ) , 30 * 1000 , 10 * 1000 , ( err , lock ) => {
2017-04-10 22:12:47 +08:00
if ( err ) {
return callback ( err ) ;
}
2017-04-11 05:36:22 +08:00
if ( ! lock || ! lock . success ) {
// did not get a insert lock in 10 seconds
return callback ( new Error ( 'Failed to acquire lock' ) ) ;
}
// acquire new UID+MODSEQ
this . database . collection ( 'mailboxes' ) . findOneAndUpdate ( {
_id : mailbox . _id
} , {
$inc : {
// allocate bot UID and MODSEQ values so when journal is later sorted by
// modseq then UIDs are always in ascending order
uidNext : 1 ,
modifyIndex : 1
}
} , {
returnOriginal : true
} , ( err , item ) => {
if ( err ) {
return this . redlock . releaseLock ( lock , ( ) => callback ( err ) ) ;
}
if ( ! item || ! item . value ) {
// was not able to acquire a lock
let err = new Error ( 'Mailbox is missing' ) ;
err . imapResponse = 'TRYCREATE' ;
return this . redlock . releaseLock ( lock , ( ) => callback ( err ) ) ;
}
let mailbox = item . value ;
let uid = mailbox . uidNext ;
let modseq = mailbox . modifyIndex + 1 ;
this . database . collection ( 'messages' ) . findOneAndUpdate ( {
_id : existing . _id
} , {
$set : {
uid ,
modseq ,
flags
}
} , {
returnOriginal : false
} , ( err , item ) => {
if ( err ) {
return this . redlock . releaseLock ( lock , ( ) => callback ( err ) ) ;
}
if ( ! item || ! item . value ) {
// message was not found for whatever reason
return this . redlock . releaseLock ( lock , next ) ;
}
let updated = item . value ;
if ( options . session && options . session . selected && options . session . selected . mailbox === mailbox . path ) {
options . session . writeStream . write ( options . session . formatResponse ( 'EXPUNGE' , existing . uid ) ) ;
}
if ( options . session && options . session . selected && options . session . selected . mailbox === mailbox . path ) {
options . session . writeStream . write ( options . session . formatResponse ( 'EXISTS' , updated . uid ) ) ;
}
this . notifier . addEntries ( mailbox , false , {
command : 'EXPUNGE' ,
ignore : options . session && options . session . id ,
uid : existing . uid ,
message : existing . _id
} , ( ) => {
this . notifier . addEntries ( mailbox , false , {
command : 'EXISTS' ,
uid : updated . uid ,
ignore : options . session && options . session . id ,
message : updated . _id ,
modseq : updated . modseq
} , ( ) => {
this . notifier . fire ( mailbox . user , mailbox . path ) ;
return this . redlock . releaseLock ( lock , ( ) => callback ( null , true , {
uidValidity : mailbox . uidValidity ,
2017-04-12 19:52:29 +08:00
uid ,
id : existing . _id
2017-04-11 05:36:22 +08:00
} ) ) ;
} ) ;
} ) ;
} ) ;
} ) ;
2017-04-10 22:12:47 +08:00
} ) ;
} ) ;
} ;
2017-04-05 16:39:42 +08:00
2017-04-10 22:12:47 +08:00
checkExisting ( ( ) => {
this . indexer . processContent ( id , mimeTree , ( err , maildata ) => {
if ( err ) {
return callback ( err ) ;
}
2017-04-05 16:39:42 +08:00
2017-04-10 22:12:47 +08:00
// prepare message object
let message = {
_id : id ,
2017-04-13 16:35:39 +08:00
idate ,
2017-04-10 22:12:47 +08:00
hdate ,
flags ,
size ,
meta : options . meta || { } ,
headers ,
mimeTree ,
envelope ,
bodystructure ,
msgid ,
// use boolean for more common flags
seen : flags . includes ( '\\Seen' ) ,
flagged : flags . includes ( '\\Flagged' ) ,
2017-04-15 23:28:19 +08:00
deleted : flags . includes ( '\\Deleted' ) ,
draft : flags . includes ( '\\Draft' )
2017-04-10 22:12:47 +08:00
} ;
if ( maildata . attachments && maildata . attachments . length ) {
message . attachments = maildata . attachments ;
message . hasAttachments = true ;
} else {
message . hasAttachments = false ;
2017-04-05 16:39:42 +08:00
}
2017-04-10 22:12:47 +08:00
let maxTextLength = 300 * 1024 ;
2017-04-05 16:39:42 +08:00
2017-04-10 22:12:47 +08:00
if ( maildata . text ) {
message . text = maildata . text . replace ( /\r\n/g , '\n' ) . trim ( ) ;
message . text = message . text . length <= maxTextLength ? message . text : message . text . substr ( 0 , maxTextLength ) ;
message . intro = message . text . replace ( /\s+/g , ' ' ) . trim ( ) ;
if ( message . intro . length > 128 ) {
message . intro = message . intro . substr ( 0 , 128 ) + '…' ;
2017-04-05 16:39:42 +08:00
}
2017-04-10 22:12:47 +08:00
}
2017-04-05 16:39:42 +08:00
2017-04-10 22:12:47 +08:00
if ( maildata . html && maildata . html . length ) {
let htmlSize = 0 ;
message . html = maildata . html . map ( html => {
if ( htmlSize >= maxTextLength || ! html ) {
return '' ;
}
2017-04-05 16:39:42 +08:00
2017-04-10 22:12:47 +08:00
if ( htmlSize + Buffer . byteLength ( html ) <= maxTextLength ) {
htmlSize += Buffer . byteLength ( html ) ;
return html ;
}
2017-03-21 06:07:23 +08:00
2017-04-10 22:12:47 +08:00
html = html . substr ( 0 , htmlSize + Buffer . byteLength ( html ) - maxTextLength ) ;
htmlSize += Buffer . byteLength ( html ) ;
return html ;
} ) . filter ( html => html ) ;
2017-03-21 06:07:23 +08:00
}
2017-04-10 22:12:47 +08:00
// Another server might be waiting for the lock
this . redlock . waitAcquireLock ( mailbox . _id . toString ( ) , 30 * 1000 , 10 * 1000 , ( err , lock ) => {
2017-03-21 06:07:23 +08:00
if ( err ) {
return callback ( err ) ;
}
2017-04-10 22:12:47 +08:00
if ( ! lock || ! lock . success ) {
// did not get a insert lock in 10 seconds
return callback ( new Error ( 'The user you are trying to contact is receiving mail at a rate that prevents additional messages from being delivered. Please resend your message at a later time' ) ) ;
}
2017-03-21 06:07:23 +08:00
2017-04-10 22:12:47 +08:00
this . database . collection ( 'users' ) . findOneAndUpdate ( {
_id : mailbox . user
2017-03-27 04:58:05 +08:00
} , {
$inc : {
2017-04-10 22:12:47 +08:00
storageUsed : size
2017-03-27 04:58:05 +08:00
}
2017-04-10 22:12:47 +08:00
} , err => {
2017-03-21 06:07:23 +08:00
if ( err ) {
2017-04-10 22:12:47 +08:00
this . redlock . releaseLock ( lock , ( ) => false ) ;
return callback ( err ) ;
2017-03-21 06:07:23 +08:00
}
2017-04-10 22:12:47 +08:00
let rollback = err => {
this . database . collection ( 'users' ) . findOneAndUpdate ( {
_id : mailbox . user
} , {
$inc : {
storageUsed : - size
}
} , ( ) => {
this . redlock . releaseLock ( lock , ( ) => callback ( err ) ) ;
} ) ;
} ;
2017-04-02 01:15:10 +08:00
2017-04-10 22:12:47 +08:00
// acquire new UID+MODSEQ
this . database . collection ( 'mailboxes' ) . findOneAndUpdate ( {
_id : mailbox . _id
} , {
$inc : {
// allocate bot UID and MODSEQ values so when journal is later sorted by
// modseq then UIDs are always in ascending order
uidNext : 1 ,
modifyIndex : 1
}
} , ( err , item ) => {
2017-03-27 04:58:05 +08:00
if ( err ) {
2017-03-30 01:06:09 +08:00
return rollback ( err ) ;
2017-03-27 04:58:05 +08:00
}
2017-04-10 22:12:47 +08:00
if ( ! item || ! item . value ) {
// was not able to acquire a lock
let err = new Error ( 'Mailbox is missing' ) ;
err . imapResponse = 'TRYCREATE' ;
return rollback ( err ) ;
}
2017-03-27 04:58:05 +08:00
2017-04-10 22:12:47 +08:00
let mailbox = item . value ;
// updated message object by setting mailbox specific values
message . mailbox = mailbox . _id ;
message . user = mailbox . user ;
message . uid = mailbox . uidNext ;
message . modseq = mailbox . modifyIndex + 1 ;
this . database . collection ( 'messages' ) . insertOne ( message , err => {
if ( err ) {
return rollback ( err ) ;
}
let uidValidity = mailbox . uidValidity ;
let uid = message . uid ;
if ( options . session && options . session . selected && options . session . selected . mailbox === mailbox . path ) {
options . session . writeStream . write ( options . session . formatResponse ( 'EXISTS' , message . uid ) ) ;
}
this . notifier . addEntries ( mailbox , false , {
command : 'EXISTS' ,
uid : message . uid ,
ignore : options . session && options . session . id ,
message : message . _id ,
modseq : message . modseq
} , ( ) => {
this . redlock . releaseLock ( lock , ( ) => {
this . notifier . fire ( mailbox . user , mailbox . path ) ;
return callback ( null , true , {
uidValidity ,
2017-04-12 19:52:29 +08:00
uid ,
id : message . _id
2017-04-10 22:12:47 +08:00
} ) ;
2017-03-27 04:58:05 +08:00
} ) ;
2017-03-21 06:07:23 +08:00
} ) ;
} ) ;
} ) ;
} ) ;
} ) ;
} ) ;
} ) ;
} ) ;
}
2017-03-27 15:36:45 +08:00
updateQuota ( mailbox , inc , callback ) {
inc = inc || { } ;
this . database . collection ( 'mailboxes' ) . findOneAndUpdate ( {
_id : mailbox . _id
} , {
$inc : {
2017-04-04 18:30:38 +08:00
storageUsed : Number ( inc . storageUsed ) || 0
2017-03-27 15:36:45 +08:00
}
} , ( ) => {
this . database . collection ( 'users' ) . findOneAndUpdate ( {
_id : mailbox . user
} , {
$inc : {
2017-04-04 18:30:38 +08:00
storageUsed : Number ( inc . storageUsed ) || 0
2017-03-27 15:36:45 +08:00
}
} , callback ) ;
} ) ;
}
2017-04-10 22:12:47 +08:00
del ( options , callback ) {
this . database . collection ( 'messages' ) . findOne ( options . query , ( err , message ) => {
2017-03-27 15:36:45 +08:00
if ( err ) {
return callback ( err ) ;
}
if ( ! message ) {
return callback ( new Error ( 'Message does not exist' ) ) ;
}
2017-04-10 22:12:47 +08:00
this . getMailbox ( {
mailbox : options . mailbox || message . mailbox
2017-03-27 15:36:45 +08:00
} , ( err , mailbox ) => {
if ( err ) {
return callback ( err ) ;
}
this . database . collection ( 'messages' ) . deleteOne ( {
_id : message . _id
} , err => {
if ( err ) {
return callback ( err ) ;
}
this . updateQuota ( mailbox , {
2017-04-04 18:30:38 +08:00
storageUsed : - message . size
2017-03-27 15:36:45 +08:00
} , ( ) => {
// remove link to message from attachments (if any exist)
this . database . collection ( 'attachments.files' ) . updateMany ( {
'metadata.messages' : message . _id
} , {
$pull : {
'metadata.messages' : message . _id
}
} , {
multi : true ,
w : 1
} , err => {
if ( err ) {
// ignore as we don't really care if we have orphans or not
}
2017-04-10 22:12:47 +08:00
if ( options . session && options . session . selected && options . session . selected . mailbox === mailbox . path ) {
options . session . writeStream . write ( options . session . formatResponse ( 'EXPUNGE' , message . uid ) ) ;
}
2017-03-27 15:36:45 +08:00
this . notifier . addEntries ( mailbox , false , {
command : 'EXPUNGE' ,
2017-04-10 22:12:47 +08:00
ignore : options . session && options . session . id ,
2017-03-27 15:36:45 +08:00
uid : message . uid ,
message : message . _id
} , ( ) => {
this . notifier . fire ( mailbox . user , mailbox . path ) ;
// delete all attachments that do not have any active links to message objects
this . database . collection ( 'attachments.files' ) . deleteMany ( {
'metadata.messages' : {
$size : 0
}
} , err => {
if ( err ) {
// ignore as we don't really care if we have orphans or not
}
return callback ( null , true ) ;
} ) ;
} ) ;
} ) ;
} ) ;
} ) ;
} ) ;
} ) ;
}
2017-04-09 17:33:10 +08:00
move ( options , callback ) {
this . getMailbox ( options . source , ( err , mailbox ) => {
if ( err ) {
return callback ( err ) ;
}
this . getMailbox ( options . destination , ( err , target ) => {
if ( err ) {
return callback ( err ) ;
}
this . database . collection ( 'mailboxes' ) . findOneAndUpdate ( {
_id : mailbox . _id
} , {
$inc : {
// increase the mailbox modification index
// to indicate that something happened
modifyIndex : 1
}
} , {
uidNext : true
} , ( ) => {
let cursor = this . database . collection ( 'messages' ) . find ( {
mailbox : mailbox . _id ,
uid : {
$in : options . messages || [ ]
}
} ) . project ( {
uid : 1
} ) . sort ( [
[ 'uid' , 1 ]
] ) ;
let sourceUid = [ ] ;
let destinationUid = [ ] ;
let removeEntries = [ ] ;
let existsEntries = [ ] ;
let done = err => {
let next = ( ) => {
if ( err ) {
return callback ( err ) ;
}
return callback ( null , true , {
uidValidity : target . uidValidity ,
sourceUid ,
destinationUid
} ) ;
} ;
if ( existsEntries . length ) {
// mark messages as deleted from old mailbox
return this . notifier . addEntries ( mailbox , false , removeEntries , ( ) => {
// mark messages as added to new mailbox
this . notifier . addEntries ( target , false , existsEntries , ( ) => {
this . notifier . fire ( mailbox . user , mailbox . path ) ;
this . notifier . fire ( target . user , target . path ) ;
next ( ) ;
} ) ;
} ) ;
}
next ( ) ;
} ;
let processNext = ( ) => {
cursor . next ( ( err , message ) => {
if ( err ) {
return done ( err ) ;
}
if ( ! message ) {
return cursor . close ( done ) ;
}
sourceUid . unshift ( message . uid ) ;
this . database . collection ( 'mailboxes' ) . findOneAndUpdate ( {
_id : target . _id
} , {
$inc : {
uidNext : 1
}
} , {
uidNext : true
} , ( err , item ) => {
if ( err ) {
return done ( err ) ;
}
if ( ! item || ! item . value ) {
return done ( new Error ( 'Mailbox disappeared' ) ) ;
}
let uidNext = item . value . uidNext ;
destinationUid . unshift ( uidNext ) ;
let updateOptions = {
$set : {
mailbox : target . _id ,
// new mailbox means new UID
uid : uidNext ,
// this will be changed later by the notification system
modseq : 0
}
} ;
if ( options . markAsSeen ) {
updateOptions . $set . seen = true ;
updateOptions . $addToSet = {
flags : '\\Seen'
} ;
}
// update message, change mailbox from old to new one
this . database . collection ( 'messages' ) . findOneAndUpdate ( {
_id : message . _id
} , updateOptions , err => {
if ( err ) {
return done ( err ) ;
}
if ( options . session ) {
options . session . writeStream . write ( options . session . formatResponse ( 'EXPUNGE' , message . uid ) ) ;
}
removeEntries . push ( {
command : 'EXPUNGE' ,
ignore : options . session && options . session . id ,
uid : message . uid
} ) ;
existsEntries . push ( {
command : 'EXISTS' ,
uid : uidNext ,
message : message . _id
} ) ;
if ( existsEntries . length >= BULK _BATCH _SIZE ) {
// mark messages as deleted from old mailbox
return this . notifier . addEntries ( mailbox , false , removeEntries , ( ) => {
// mark messages as added to new mailbox
this . notifier . addEntries ( target , false , existsEntries , ( ) => {
removeEntries = [ ] ;
existsEntries = [ ] ;
this . notifier . fire ( mailbox . user , mailbox . path ) ;
this . notifier . fire ( target . user , target . path ) ;
processNext ( ) ;
} ) ;
} ) ;
}
processNext ( ) ;
} ) ;
} ) ;
} ) ;
} ;
processNext ( ) ;
} ) ;
} ) ;
} ) ;
}
2017-04-13 16:35:39 +08:00
generateIndexedHeaders ( headersArray ) {
return ( headersArray || [ ] ) . map ( line => {
line = Buffer . from ( line , 'binary' ) . toString ( ) ;
let key = line . substr ( 0 , line . indexOf ( ':' ) ) . trim ( ) . toLowerCase ( ) ;
let value = line . substr ( line . indexOf ( ':' ) + 1 ) . trim ( ) . toLowerCase ( ) . replace ( /\s*\r?\n\s*/g , ' ' ) ;
try {
value = libmime . decodeWords ( value ) ;
} catch ( E ) {
// ignore
}
// trim long values as mongodb indexed fields can not be too long
if ( Buffer . byteLength ( key , 'utf-8' ) >= 255 ) {
key = Buffer . from ( key ) . slice ( 0 , 255 ) . toString ( ) ;
key = key . substr ( 0 , key . length - 4 ) ;
}
if ( Buffer . byteLength ( value , 'utf-8' ) >= 880 ) {
// value exceeds MongoDB max indexed value length
value = Buffer . from ( value ) . slice ( 0 , 880 ) . toString ( ) ;
// remove last 4 chars to be sure we do not have any incomplete unicode sequences
value = value . substr ( 0 , value . length - 4 ) ;
}
return {
key ,
value
} ;
} ) ;
}
prepareMessage ( options ) {
let id = new ObjectID ( ) ;
let mimeTree = this . indexer . parseMimeTree ( options . raw ) ;
let size = this . indexer . getSize ( mimeTree ) ;
let bodystructure = this . indexer . getBodyStructure ( mimeTree ) ;
let envelope = this . indexer . getEnvelope ( mimeTree ) ;
let idate = options . date && new Date ( options . date ) || new Date ( ) ;
let hdate = mimeTree . parsedHeader . date && new Date ( mimeTree . parsedHeader . date ) || false ;
let flags = [ ] . concat ( options . flags || [ ] ) ;
if ( ! hdate || hdate . toString ( ) === 'Invalid Date' ) {
hdate = idate ;
}
let msgid = envelope [ 9 ] || ( '<' + uuidV1 ( ) + '@wildduck.email>' ) ;
let headers = this . generateIndexedHeaders ( mimeTree . header ) ;
return {
id ,
mimeTree ,
size ,
bodystructure ,
envelope ,
idate ,
hdate ,
flags ,
msgid ,
headers
} ;
}
2017-03-21 06:07:23 +08:00
}
module . exports = MessageHandler ;