started with forwarding support

This commit is contained in:
Andris Reinman 2017-04-24 16:51:50 +03:00
parent ca9f416caf
commit 5cd5c0f685
5 changed files with 124 additions and 4 deletions

View file

@ -62,6 +62,23 @@ module.exports = {
host: '0.0.0.0'
},
// push messages to ZoneMTA queue
forwarder: {
enabled: true,
// which ZoneMTA queue to use
zone: 'default',
// MongoDB connection url. Do not set if you want to use main database
mongodb: 'mongodb://127.0.0.1:27017/zone-mta',
// Collection name for GridFS storage
gfs: 'mail',
// Collection name for the queue
collection: 'zone-queue'
},
// if this header exists and starts with yes then the message is treated as spam
spamHeader: 'X-Rspamd-Spam',

View file

@ -6,6 +6,7 @@ const mongodb = require('mongodb');
const redis = require('redis');
const MongoClient = mongodb.MongoClient;
module.exports.forwarder = false;
module.exports.database = false;
module.exports.redis = false;
@ -16,6 +17,23 @@ module.exports.connect = callback => {
}
module.exports.database = database;
module.exports.redis = redis.createClient(tools.redisConfig(config.redis));
callback(null, database);
if (!config.forwarder.enabled) {
return callback(null, database);
}
if (!config.forwarder.mongodb) {
module.exports.forwarder = database;
return callback(null, database);
}
MongoClient.connect(config.mongo, (err, forwarderDatabase) => {
if (err) {
database.close();
return callback(err);
}
module.exports.forwarder = forwarderDatabase;
return callback(null, database);
});
});
};

61
lib/forward.js Normal file
View file

@ -0,0 +1,61 @@
'use strict';
const config = require('config');
const db = require('db');
const SeqIndex = require('seq-index');
const seqIndex = new SeqIndex();
module.exports = (options, callback) => {
if (!config.forwarder.enabled) {
return callback(null, false);
}
let id = options.id || seqIndex.get();
let seq = 0;
let documents = [];
// TODO: create and store message body + headers + dkim hash
for (let i = 0, len = options.to.length; i < len; i++) {
let recipient = options.to[i];
let deliveryZone = config.forwarder.zone || 'default';
let recipientDomain = recipient.substr(recipient.lastIndexOf('@') + 1).replace(/[\[\]]/g, '');
seq++;
let date = new Date();
let deliverySeq = (seq < 0x100 ? '0' : '') + (seq < 0x10 ? '0' : '') + seq.toString(16);
let delivery = {
id,
seq: deliverySeq,
// Actual delivery data
domain: recipientDomain,
sendingZone: deliveryZone,
// actual recipient address
recipient,
locked: false,
lockTime: 0,
// earliest time to attempt delivery, defaults to now
queued: date,
// queued date might change but created should not
created: date
};
documents.push(delivery);
}
db.forwarder.collection(config.forwarder.collection).insertMany(documents, {
w: 1,
ordered: false
}, err => {
if (err) {
return callback(err);
}
callback(null, true);
});
};

29
lmtp.js
View file

@ -8,6 +8,7 @@ const SMTPServer = require('smtp-server').SMTPServer;
const tools = require('./lib/tools');
const MessageHandler = require('./lib/message-handler');
const db = require('./lib/db');
const forward = require('./lib/forward');
const fs = require('fs');
let messageHandler;
@ -115,7 +116,7 @@ const serverOptions = {
stream.once('end', () => {
let spamHeader = config.spamHeader && config.spamHeader.toLowerCase();
let sender = tools.normalizeAddress(session.envelope.mailFrom && session.envelope.mailFrom.address || '');
let responses = [];
let users = session.users;
let stored = 0;
@ -144,8 +145,9 @@ const serverOptions = {
chunks.unshift(header);
chunklen += header.length;
let raw = Buffer.concat(chunks, chunklen);
let prepared = messageHandler.prepareMessage({
raw: Buffer.concat(chunks, chunklen)
raw
});
let maildata = messageHandler.indexer.processContent(prepared.id, prepared.mimeTree);
@ -169,6 +171,12 @@ const serverOptions = {
}
} : []);
let forwardTargets = new Set();
if (user.forward) {
// forward all messages
forwardTargets.add(user.forward);
}
let matchingFilters = [];
let filterActions = new Map();
@ -186,6 +194,10 @@ const serverOptions = {
filterActions = filter.action;
} else {
Object.keys(filter.action).forEach(key => {
if (key === 'forward') {
forwardTargets.add(filter.action[key]);
return;
}
// if a previous filter already has set a value then do not touch it
if (!filterActions.has(key)) {
filterActions.set(key, filter.action[key]);
@ -194,6 +206,17 @@ const serverOptions = {
}
});
if (forwardTargets.size) {
// messages needs to be forwarded, so store it to outbound queue
forward({
user,
sender,
recipient,
forward: Array.from(forwardTargets),
raw
}, () => false);
}
if (filterActions.has('delete') && filterActions.get('delete')) {
// nothing to do with the message, just continue
responses.push({
@ -244,7 +267,7 @@ const serverOptions = {
meta: {
source: 'LMTP',
from: tools.normalizeAddress(session.envelope.mailFrom && session.envelope.mailFrom.address || ''),
from: sender,
to: recipient,
origin: session.remoteAddress,
originhost: session.clientHostname,

View file

@ -37,6 +37,7 @@
"redfour": "^1.0.0",
"redis": "^2.7.1",
"restify": "^4.3.0",
"seq-index": "^1.1.0",
"smtp-server": "^3.0.1",
"speakeasy": "^2.0.0",
"toml": "^2.3.2",