From 5cd5c0f68512c98eb3125fbe84d39804f30e30d3 Mon Sep 17 00:00:00 2001 From: Andris Reinman Date: Mon, 24 Apr 2017 16:51:50 +0300 Subject: [PATCH] started with forwarding support --- config/default.js | 17 +++++++++++++ lib/db.js | 20 +++++++++++++++- lib/forward.js | 61 +++++++++++++++++++++++++++++++++++++++++++++++ lmtp.js | 29 +++++++++++++++++++--- package.json | 1 + 5 files changed, 124 insertions(+), 4 deletions(-) create mode 100644 lib/forward.js diff --git a/config/default.js b/config/default.js index af1723c8..1045d3ce 100644 --- a/config/default.js +++ b/config/default.js @@ -62,6 +62,23 @@ module.exports = { host: '0.0.0.0' }, + // push messages to ZoneMTA queue + forwarder: { + enabled: true, + + // which ZoneMTA queue to use + zone: 'default', + + // MongoDB connection url. Do not set if you want to use main database + mongodb: 'mongodb://127.0.0.1:27017/zone-mta', + + // Collection name for GridFS storage + gfs: 'mail', + + // Collection name for the queue + collection: 'zone-queue' + }, + // if this header exists and starts with yes then the message is treated as spam spamHeader: 'X-Rspamd-Spam', diff --git a/lib/db.js b/lib/db.js index 10c05396..cf7652c2 100644 --- a/lib/db.js +++ b/lib/db.js @@ -6,6 +6,7 @@ const mongodb = require('mongodb'); const redis = require('redis'); const MongoClient = mongodb.MongoClient; +module.exports.forwarder = false; module.exports.database = false; module.exports.redis = false; @@ -16,6 +17,23 @@ module.exports.connect = callback => { } module.exports.database = database; module.exports.redis = redis.createClient(tools.redisConfig(config.redis)); - callback(null, database); + + if (!config.forwarder.enabled) { + return callback(null, database); + } + + if (!config.forwarder.mongodb) { + module.exports.forwarder = database; + return callback(null, database); + } + + MongoClient.connect(config.mongo, (err, forwarderDatabase) => { + if (err) { + database.close(); + return callback(err); + } + module.exports.forwarder = forwarderDatabase; + return callback(null, database); + }); }); }; diff --git a/lib/forward.js b/lib/forward.js new file mode 100644 index 00000000..6eda5447 --- /dev/null +++ b/lib/forward.js @@ -0,0 +1,61 @@ +'use strict'; + +const config = require('config'); +const db = require('db'); +const SeqIndex = require('seq-index'); +const seqIndex = new SeqIndex(); + +module.exports = (options, callback) => { + if (!config.forwarder.enabled) { + return callback(null, false); + } + + let id = options.id || seqIndex.get(); + let seq = 0; + let documents = []; + + // TODO: create and store message body + headers + dkim hash + + for (let i = 0, len = options.to.length; i < len; i++) { + + let recipient = options.to[i]; + let deliveryZone = config.forwarder.zone || 'default'; + let recipientDomain = recipient.substr(recipient.lastIndexOf('@') + 1).replace(/[\[\]]/g, ''); + + seq++; + let date = new Date(); + let deliverySeq = (seq < 0x100 ? '0' : '') + (seq < 0x10 ? '0' : '') + seq.toString(16); + let delivery = { + id, + seq: deliverySeq, + + // Actual delivery data + domain: recipientDomain, + sendingZone: deliveryZone, + + // actual recipient address + recipient, + + locked: false, + lockTime: 0, + + // earliest time to attempt delivery, defaults to now + queued: date, + + // queued date might change but created should not + created: date + }; + + documents.push(delivery); + } + + db.forwarder.collection(config.forwarder.collection).insertMany(documents, { + w: 1, + ordered: false + }, err => { + if (err) { + return callback(err); + } + callback(null, true); + }); +}; diff --git a/lmtp.js b/lmtp.js index dbc628a8..1e90b1b1 100644 --- a/lmtp.js +++ b/lmtp.js @@ -8,6 +8,7 @@ const SMTPServer = require('smtp-server').SMTPServer; const tools = require('./lib/tools'); const MessageHandler = require('./lib/message-handler'); const db = require('./lib/db'); +const forward = require('./lib/forward'); const fs = require('fs'); let messageHandler; @@ -115,7 +116,7 @@ const serverOptions = { stream.once('end', () => { let spamHeader = config.spamHeader && config.spamHeader.toLowerCase(); - + let sender = tools.normalizeAddress(session.envelope.mailFrom && session.envelope.mailFrom.address || ''); let responses = []; let users = session.users; let stored = 0; @@ -144,8 +145,9 @@ const serverOptions = { chunks.unshift(header); chunklen += header.length; + let raw = Buffer.concat(chunks, chunklen); let prepared = messageHandler.prepareMessage({ - raw: Buffer.concat(chunks, chunklen) + raw }); let maildata = messageHandler.indexer.processContent(prepared.id, prepared.mimeTree); @@ -169,6 +171,12 @@ const serverOptions = { } } : []); + let forwardTargets = new Set(); + if (user.forward) { + // forward all messages + forwardTargets.add(user.forward); + } + let matchingFilters = []; let filterActions = new Map(); @@ -186,6 +194,10 @@ const serverOptions = { filterActions = filter.action; } else { Object.keys(filter.action).forEach(key => { + if (key === 'forward') { + forwardTargets.add(filter.action[key]); + return; + } // if a previous filter already has set a value then do not touch it if (!filterActions.has(key)) { filterActions.set(key, filter.action[key]); @@ -194,6 +206,17 @@ const serverOptions = { } }); + if (forwardTargets.size) { + // messages needs to be forwarded, so store it to outbound queue + forward({ + user, + sender, + recipient, + forward: Array.from(forwardTargets), + raw + }, () => false); + } + if (filterActions.has('delete') && filterActions.get('delete')) { // nothing to do with the message, just continue responses.push({ @@ -244,7 +267,7 @@ const serverOptions = { meta: { source: 'LMTP', - from: tools.normalizeAddress(session.envelope.mailFrom && session.envelope.mailFrom.address || ''), + from: sender, to: recipient, origin: session.remoteAddress, originhost: session.clientHostname, diff --git a/package.json b/package.json index f20ef32b..4bd813c9 100644 --- a/package.json +++ b/package.json @@ -37,6 +37,7 @@ "redfour": "^1.0.0", "redis": "^2.7.1", "restify": "^4.3.0", + "seq-index": "^1.1.0", "smtp-server": "^3.0.1", "speakeasy": "^2.0.0", "toml": "^2.3.2",