diff --git a/config/default.js b/config/default.js index e1b25d7c..8e898d96 100644 --- a/config/default.js +++ b/config/default.js @@ -67,11 +67,12 @@ module.exports = { host: '0.0.0.0' }, - // push messages to ZoneMTA queue - forwarder: { + // push messages to ZoneMTA queue for delivery + sender: { + // if false, then no messages are sent enabled: true, - // which ZoneMTA queue to use + // which ZoneMTA queue to use by default zone: 'default', // MongoDB connection url. Do not set if you want to use main database diff --git a/lib/db.js b/lib/db.js index 50635da3..9ae762d1 100644 --- a/lib/db.js +++ b/lib/db.js @@ -6,7 +6,7 @@ const mongodb = require('mongodb'); const redis = require('redis'); const MongoClient = mongodb.MongoClient; -module.exports.forwarder = false; +module.exports.senderDb = false; module.exports.database = false; module.exports.redis = false; module.exports.redisConfig = false; @@ -20,21 +20,21 @@ module.exports.connect = callback => { module.exports.redisConfig = tools.redisConfig(config.redis); module.exports.redis = redis.createClient(module.exports.redisConfig); - if (!config.forwarder.enabled) { + if (!config.sender.enabled) { return callback(null, database); } - if (!config.forwarder.mongo) { - module.exports.forwarder = database; + if (!config.sender.mongo) { + module.exports.senderDb = database; return callback(null, database); } - MongoClient.connect(config.forwarder.mongo, (err, forwarderDatabase) => { + MongoClient.connect(config.sender.mongo, (err, forwarderDatabase) => { if (err) { database.close(); return callback(err); } - module.exports.forwarder = forwarderDatabase; + module.exports.senderDb = forwarderDatabase; return callback(null, database); }); }); diff --git a/lib/forward.js b/lib/forward.js index 90980226..23c6238e 100644 --- a/lib/forward.js +++ b/lib/forward.js @@ -1,187 +1,32 @@ 'use strict'; const config = require('config'); -const db = require('./db'); -const SeqIndex = require('seq-index'); -const DkimStream = require('./dkim-stream'); -const MessageSplitter = require('./message-splitter'); -const seqIndex = new SeqIndex(); -const GridFs = require('grid-fs'); - -let gridstore; +const maildrop = require('./maildrop'); module.exports = (options, callback) => { - if (!config.forwarder.enabled) { + if (!config.sender.enabled) { return callback(null, false); } - let id = options.id || seqIndex.get(); - let seq = 0; - let documents = []; - - let envelope = { - id, + let message = maildrop({ from: options.sender, to: options.forward, - interface: 'forwarder', - transtype: 'API', - time: Date.now(), - dkim: { - hashAlgo: 'sha256' - } - }; - - let messageSplitter = new MessageSplitter(); - let dkimStream = new DkimStream(); - - messageSplitter.once('headers', headers => { - envelope.headers = headers.getList(); - }); - - dkimStream.on('hash', bodyHash => { - // store relaxed body hash for signing - envelope.dkim.bodyHash = bodyHash; - envelope.bodySize = dkimStream.byteLength; - }); - - messageSplitter.once('error', err => dkimStream.emit('error', err)); + interface: 'forwarder' + }, callback); setImmediate(() => { - messageSplitter.pipe(dkimStream); let pos = 0; let writeNextChunk = () => { if (pos >= options.chunks.length) { - return messageSplitter.end(); + return message.end(); } let chunk = options.chunks[pos++]; - if (!messageSplitter.write(chunk)) { - return messageSplitter.once('drain', writeNextChunk); + if (!message.write(chunk)) { + return message.once('drain', writeNextChunk); } else { setImmediate(writeNextChunk); } }; setImmediate(writeNextChunk); }); - - store(id, dkimStream, err => { - - if (err) { - return callback(err); - } - - setMeta(id, envelope, err => { - if (err) { - return removeMessage(id, () => callback(err)); - } - - for (let i = 0, len = envelope.to.length; i < len; i++) { - - let recipient = envelope.to[i]; - let deliveryZone = config.forwarder.zone || 'default'; - let recipientDomain = recipient.substr(recipient.lastIndexOf('@') + 1).replace(/[\[\]]/g, ''); - - seq++; - let date = new Date(); - let deliverySeq = (seq < 0x100 ? '0' : '') + (seq < 0x10 ? '0' : '') + seq.toString(16); - let delivery = { - id, - seq: deliverySeq, - - // Actual delivery data - domain: recipientDomain, - sendingZone: deliveryZone, - - // actual recipient address - recipient, - - locked: false, - lockTime: 0, - - // earliest time to attempt delivery, defaults to now - queued: date, - - // queued date might change but created should not - created: date - }; - - documents.push(delivery); - } - - db.forwarder.collection(config.forwarder.collection). - insertMany(documents, { - w: 1, - ordered: false - }, err => { - if (err) { - return callback(err); - } - - callback(null, id); - }); - }); - }); }; - -function store(id, stream, callback) { - gridstore = gridstore || new GridFs(db.forwarder, config.forwarder.gfs); - - let returned = false; - let store = gridstore.createWriteStream('message ' + id, { - fsync: true, - content_type: 'message/rfc822', - metadata: { - created: new Date() - } - }); - - stream.once('error', err => { - if (returned) { - return; - } - returned = true; - - store.once('close', () => { - removeMessage(id, () => callback(err)); - }); - - store.end(); - }); - - store.once('error', err => { - if (returned) { - return; - } - returned = true; - callback(err); - }); - - store.on('close', () => { - if (returned) { - return; - } - returned = true; - - return callback(null, id); - }); - - stream.pipe(store); -} - -function removeMessage(id, callback) { - gridstore.unlink('message ' + id, callback); -} - -function setMeta(id, data, callback) { - db.forwarder.collection(config.forwarder.gfs + '.files').findAndModify({ - filename: 'message ' + id - }, false, { - $set: { - 'metadata.data': data - } - }, {}, err => { - if (err) { - return callback(err); - } - return callback(); - }); -} diff --git a/lib/maildrop.js b/lib/maildrop.js new file mode 100644 index 00000000..2e107720 --- /dev/null +++ b/lib/maildrop.js @@ -0,0 +1,181 @@ +'use strict'; + +const config = require('config'); +const db = require('./db'); +const SeqIndex = require('seq-index'); +const DkimStream = require('./dkim-stream'); +const MessageSplitter = require('./message-splitter'); +const seqIndex = new SeqIndex(); +const GridFs = require('grid-fs'); + +let gridstore; + +module.exports = (options, callback) => { + if (!config.sender.enabled) { + return callback(null, false); + } + + let id = options.id || seqIndex.get(); + let seq = 0; + let documents = []; + + let envelope = { + id, + + from: options.from, + to: Array.isArray(options.to) ? options.to : [].concat(options.to || []), + + interface: options.interface || 'maildrop', + transtype: 'API', + time: Date.now(), + + dkim: { + hashAlgo: 'sha256' + } + }; + + if (!envelope.to.length) { + return callback(null, false); + } + + let messageSplitter = new MessageSplitter(); + let dkimStream = new DkimStream(); + + messageSplitter.once('headers', headers => { + envelope.headers = headers.getList(); + }); + + dkimStream.on('hash', bodyHash => { + // store relaxed body hash for signing + envelope.dkim.bodyHash = bodyHash; + envelope.bodySize = dkimStream.byteLength; + }); + + messageSplitter.once('error', err => dkimStream.emit('error', err)); + + store(id, dkimStream, err => { + + if (err) { + return callback(err); + } + + setMeta(id, envelope, err => { + if (err) { + return removeMessage(id, () => callback(err)); + } + + let date = new Date(); + + for (let i = 0, len = envelope.to.length; i < len; i++) { + + let recipient = envelope.to[i]; + let deliveryZone = options.zone || config.sender.zone || 'default'; + let recipientDomain = recipient.substr(recipient.lastIndexOf('@') + 1).replace(/[\[\]]/g, ''); + + seq++; + let deliverySeq = (seq < 0x100 ? '0' : '') + (seq < 0x10 ? '0' : '') + seq.toString(16); + let delivery = { + id, + seq: deliverySeq, + + // Actual delivery data + domain: recipientDomain, + sendingZone: deliveryZone, + + // actual recipient address + recipient, + + locked: false, + lockTime: 0, + + // earliest time to attempt delivery, defaults to now + queued: date, + + // queued date might change but created should not + created: date + }; + + documents.push(delivery); + } + + db.senderDb.collection(config.sender.collection). + insertMany(documents, { + w: 1, + ordered: false + }, err => { + if (err) { + return callback(err); + } + + callback(null, id); + }); + }); + }); + + messageSplitter.pipe(dkimStream); + return messageSplitter; +}; + +function store(id, stream, callback) { + gridstore = gridstore || new GridFs(db.senderDb, config.sender.gfs); + + let returned = false; + let store = gridstore.createWriteStream('message ' + id, { + fsync: true, + content_type: 'message/rfc822', + metadata: { + created: new Date() + } + }); + + stream.once('error', err => { + if (returned) { + return; + } + returned = true; + + store.once('close', () => { + removeMessage(id, () => callback(err)); + }); + + store.end(); + }); + + store.once('error', err => { + if (returned) { + return; + } + returned = true; + callback(err); + }); + + store.on('close', () => { + if (returned) { + return; + } + returned = true; + + return callback(null, id); + }); + + stream.pipe(store); +} + +function removeMessage(id, callback) { + gridstore.unlink('message ' + id, callback); +} + +function setMeta(id, data, callback) { + db.senderDb.collection(config.sender.gfs + '.files').findAndModify({ + filename: 'message ' + id + }, false, { + $set: { + 'metadata.data': data + } + }, {}, err => { + if (err) { + return callback(err); + } + return callback(); + }); +} diff --git a/lmtp.js b/lmtp.js index 4848f549..0eaae91d 100644 --- a/lmtp.js +++ b/lmtp.js @@ -161,7 +161,7 @@ const serverOptions = { let mailboxQueryValue = 'INBOX'; let filters = (user.filters || []).concat(spamHeader ? { - id: 'wdspam', + id: 'SPAM', query: { headers: { [spamHeader]: 'Yes' @@ -209,7 +209,8 @@ const serverOptions = { forwardTargets.add(user.forward); } - if (!forwardTargets.size) { + // never forward messages marked as spam + if (!forwardTargets.size || filterActions.get('spam')) { return setImmediate(done); } @@ -235,9 +236,9 @@ const serverOptions = { forwardMessage((err, id) => { if (err) { - log.error('LMTP', 'FRWRDFAIL error=%s', err.message); + log.error('LMTP', '%s FRWRDFAIL from=%s to=%s target=%s error=%s', prepared.id.toString(), sender, recipient, Array.from(forwardTargets).join(','), err.message); } else if (id) { - log.silly('LMTP', 'FRWRDOK id=%s', id); + log.silly('LMTP', '%s FRWRDOK id=%s from=%s to=%s target=%s', prepared.id.toString(), id, sender, recipient, Array.from(forwardTargets).join(',')); } if (filterActions.get('delete')) { diff --git a/package.json b/package.json index a73d8845..da86ef38 100644 --- a/package.json +++ b/package.json @@ -16,7 +16,7 @@ "grunt-cli": "^1.2.0", "grunt-eslint": "^19.0.0", "grunt-mocha-test": "^0.13.2", - "mocha": "^3.2.0" + "mocha": "^3.3.0" }, "dependencies": { "bcryptjs": "^2.4.3", @@ -24,14 +24,14 @@ "generate-password": "^1.3.0", "grid-fs": "^1.0.1", "html-to-text": "^3.2.0", - "iconv-lite": "^0.4.15", + "iconv-lite": "^0.4.17", "joi": "^10.4.1", "libbase64": "^0.1.0", "libmime": "^3.1.0", "libqp": "^1.1.0", "mailsplit": "^4.0.2", "marked": "^0.3.6", - "mongodb": "^2.2.25", + "mongodb": "^2.2.26", "node-redis-scripty": "0.0.5", "nodemailer": "^4.0.1", "npmlog": "^4.0.2",