Added partial support for running Zone-mta plugins for message queueing

This commit is contained in:
Andris Reinman 2022-06-21 10:30:26 +03:00
parent 5da63a290d
commit 73e01827cb
No known key found for this signature in database
GPG key ID: DC6C83F4D584D364
12 changed files with 195 additions and 231 deletions

View file

@ -86,7 +86,7 @@ processes=1
#cipher="aes192" # only for decrypting legacy values (if there are any)
[plugins]
# @include "plugins/*.toml"
# @include "plugins.toml"
[tasks]
# if enabled then process jobs like deleting expired messages etc

6
config/plugins.toml Normal file
View file

@ -0,0 +1,6 @@
pluginsPath = "./plugins"
[conf]
# @include "plugins/*.toml"

View file

@ -0,0 +1,9 @@
["core/rspamd"]
enabled = false # ["receiver"]
url = "http://maildev.zone.wtf:11333/check"
interfaces = ["maildrop"]
ignoreOrigins = []
maxSize = 5242880
dropSpam = false
rewriteSubject = false
ip = true

View file

@ -2990,10 +2990,14 @@ module.exports = (db, server, messageHandler, userHandler, storageHandler, setti
reason: 'submit',
from: envelope.from,
to: envelope.to,
sendTime
sendTime,
runPlugins: true
},
(err, ...args) => {
if (err || !args[0]) {
if (err && !err.code && err.name === 'SMTPReject') {
err.code = 'MessageRejected';
}
if (err) {
err.code = err.code || 'ERRCOMPOSE';
} else {

View file

@ -469,11 +469,16 @@ module.exports = (db, server, messageHandler, userHandler, settingsHandler) => {
reason: 'submit',
from: compiledEnvelope.from,
to: compiledEnvelope.to,
sendTime
sendTime,
runPlugins: true
},
(err, ...args) => {
if (err || !args[0]) {
if (err) {
if (!err.code && err.name === 'SMTPReject') {
err.code = 'MessageRejected';
}
err.code = err.code || 'ERRCOMPOSE';
}
err.responseCode = 500;

View file

@ -10,11 +10,15 @@ module.exports = (options, callback) => {
targets: options.targets,
interface: 'forwarder'
interface: 'forwarder',
runPlugins: true
};
let message = options.maildrop.push(mail, (err, ...args) => {
if (err || !args[0]) {
if (err && !err.code && err.name === 'SMTPReject') {
err.code = 'MessageRejected';
}
if (err) {
err.code = err.code || 'ERRCOMPOSE';
}

View file

@ -12,6 +12,9 @@ const addressparser = require('nodemailer/lib/addressparser');
const punycode = require('punycode/');
const crypto = require('crypto');
const tools = require('./tools');
const plugins = require('./plugins');
const PassThrough = require('stream').PassThrough;
const util = require('util');
class Maildropper {
constructor(options) {
@ -105,6 +108,33 @@ class Maildropper {
envelope.reason = options.reason;
}
let messageInfo = {
'message-id': '<>',
from: envelope.from || '<>',
to: [].concat(envelope.to || []).join(',') || '<>',
src: envelope.origin,
format() {
let values = [];
Object.keys(this).forEach(key => {
if (typeof this[key] === 'function' || typeof this[key] === 'undefined') {
return;
}
values.push(util.format('%s=%s', key, !/^"/.test(this[key]) && /\s/.test(this[key]) ? JSON.stringify(this[key]) : this[key]));
});
return values.join(' ');
},
keys() {
let data = {};
Object.keys(this).forEach(key => {
if (typeof this[key] === 'function' || typeof this[key] === 'undefined') {
return;
}
data[key] = this[key];
});
return data;
}
};
let deliveries = [];
if (options.targets) {
@ -186,94 +216,126 @@ class Maildropper {
messageSplitter.once('error', err => dkimStream.emit('error', err));
this.store(id, dkimStream, err => {
plugins.handler.runHooks('message:store', [envelope, dkimStream], err => {
if (err) {
return callback(err);
if (dkimStream.readable) {
dkimStream.resume(); // let the original stream to end normally before displaying the error message
}
return setImmediate(() => callback(err));
}
if (this.checkLoop(envelope, deliveries)) {
// looped message
let err = new Error('Message loop detected');
err.responseCode = 500;
err.code = 'ELOOP';
return this.removeMessage(id, () => callback(err));
}
envelope.headers = envelope.headers.getList();
this.setMeta(id, envelope, err => {
this.store(id, dkimStream, err => {
if (err) {
return callback(err);
}
if (this.checkLoop(envelope, deliveries)) {
// looped message
let err = new Error('Message loop detected');
err.responseCode = 500;
err.code = 'ELOOP';
return this.removeMessage(id, () => callback(err));
}
let date = new Date();
for (let i = 0, len = deliveries.length; i < len; i++) {
let recipient = deliveries[i];
let deliveryZone = options.zone || this.zone || 'default';
let recipientDomain = recipient.to.substr(recipient.to.lastIndexOf('@') + 1).replace(/[[\]]/g, '');
seq++;
let deliverySeq = (seq < 0x100 ? '0' : '') + (seq < 0x10 ? '0' : '') + seq.toString(16);
let delivery = {
id,
seq: deliverySeq,
// Actual delivery data
domain: recipientDomain,
sendingZone: deliveryZone,
assigned: 'no',
// actual recipient address
recipient: recipient.to,
locked: false,
lockTime: 0,
// earliest time to attempt delivery, defaults to now
queued: options.sendTime || date,
// queued date might change but created should not
created: date
};
if (recipient.http) {
delivery.http = recipient.http;
delivery.targetUrl = recipient.targetUrl;
plugins.handler.runHooks('message:queue', [envelope, messageInfo], err => {
if (err) {
return setImmediate(() => this.removeMessage(id, () => callback(err)));
}
['mx', 'mxPort', 'mxAuth', 'mxSecure'].forEach(key => {
if (recipient[key]) {
delivery[key] = recipient[key];
}
});
if (recipient.skipSRS) {
delivery.skipSRS = true;
}
documents.push(delivery);
}
this.db.senderDb.collection(this.collection).insertMany(
documents,
{
writeConcern: 1,
ordered: false
},
err => {
envelope.headers = envelope.headers.getList();
this.setMeta(id, envelope, err => {
if (err) {
return callback(err);
return this.removeMessage(id, () => callback(err));
}
callback(null, envelope);
}
);
let date = new Date();
for (let i = 0, len = deliveries.length; i < len; i++) {
let recipient = deliveries[i];
let deliveryZone = options.zone || this.zone || 'default';
let recipientDomain = recipient.to.substr(recipient.to.lastIndexOf('@') + 1).replace(/[[\]]/g, '');
seq++;
let deliverySeq = (seq < 0x100 ? '0' : '') + (seq < 0x10 ? '0' : '') + seq.toString(16);
let delivery = {
id,
seq: deliverySeq,
// Actual delivery data
domain: recipientDomain,
sendingZone: deliveryZone,
assigned: 'no',
// actual recipient address
recipient: recipient.to,
locked: false,
lockTime: 0,
// earliest time to attempt delivery, defaults to now
queued: options.sendTime || date,
// queued date might change but created should not
created: date
};
if (recipient.http) {
delivery.http = recipient.http;
delivery.targetUrl = recipient.targetUrl;
}
['mx', 'mxPort', 'mxAuth', 'mxSecure'].forEach(key => {
if (recipient[key]) {
delivery[key] = recipient[key];
}
});
if (recipient.skipSRS) {
delivery.skipSRS = true;
}
documents.push(delivery);
}
this.db.senderDb.collection(this.collection).insertMany(
documents,
{
writeConcern: 1,
ordered: false
},
err => {
if (err) {
return callback(err);
}
callback(null, envelope);
}
);
});
});
});
});
messageSplitter.pipe(dkimStream);
return messageSplitter;
if (options.runPlugins) {
// message submissions
let source = new PassThrough();
let raw = new PassThrough();
plugins.handler.runAnalyzerHooks(envelope, source, raw);
raw.pipe(messageSplitter);
messageSplitter.pipe(dkimStream);
source.on('error', err => raw.emit('error', err));
raw.on('error', err => messageSplitter.emit('error', err));
return source;
} else {
// default, no plugins (autoreplies etc.)
messageSplitter.pipe(dkimStream);
return messageSplitter;
}
}
convertAddresses(addresses, withNames, addressList) {

View file

@ -1,144 +1,19 @@
'use strict';
const config = require('wild-config');
const pathlib = require('path');
const log = require('npmlog');
const PluginHandler = require('zone-mta/lib/plugin-handler');
const db = require('./db');
const WD_PATH = pathlib.join(__dirname, '..');
const CONFIG_PATH = config.configDirectory || WD_PATH;
module.exports.handler = false;
const hooks = new Map();
class PluginInstance {
constructor(key, config) {
this.db = db;
this.key = key;
this.config = config || {};
this.logger = {};
['silly', 'verbose', 'info', 'http', 'warn', 'error', 'debug', 'err'].forEach(level => {
this.logger[level] = (...args) => {
switch (level) {
case 'debug':
level = 'verbose';
break;
case 'err':
level = 'error';
break;
}
log[level]('[' + key + ']', ...args);
};
});
}
addHook(hook, handler) {
hook = (hook || '')
.toString()
.replace(/\s+/g, '')
.toLowerCase();
if (!hook) {
return;
}
if (!hooks.has(hook)) {
hooks.set(hook, []);
}
hooks.get(hook).push({ plugin: this, handler });
}
init(done) {
if (!this.config.path) {
this.logger.debug('Plugin path not provided, skipping');
return setImmediate(done);
}
try {
let pluginPath = this.config.path.replace(/\$WD/g, WD_PATH).replace(/\$CONFIG/g, CONFIG_PATH);
this.module = require(pluginPath); //eslint-disable-line global-require
} catch (E) {
this.logger.error('Failed to load plugin. %s', E.message);
return setImmediate(done);
}
if (typeof this.module.init !== 'function') {
this.logger.debug('Init method not found');
return setImmediate(done);
}
try {
return this.module.init(this, err => {
if (err) {
this.logger.error('Initialization resulted with an error. %s', err.message);
} else {
this.logger.debug('Plugin "%s" initialized', this.module.title || this.key);
}
return setImmediate(done);
});
} catch (E) {
this.logger.error('Failed executing init method. %s', E.message);
return setImmediate(done);
}
}
}
module.exports.init = next => {
let keys = Object.keys(config.plugins || {});
let pos = 0;
let loadNextPlugin = () => {
if (pos >= keys.length) {
return setImmediate(next);
}
let key = keys[pos++];
if (!config.plugins[key].enabled) {
return setImmediate(loadNextPlugin);
}
let plugin = new PluginInstance(key, config.plugins[key]);
plugin.init(loadNextPlugin);
};
setImmediate(loadNextPlugin);
};
module.exports.runHooks = (hook, ...args) => {
let next = args.pop();
hook = (hook || '')
.toString()
.replace(/\s+/g, '')
.toLowerCase();
if (!hook || !hooks.has(hook)) {
return setImmediate(next);
}
let handlers = hooks.get(hook);
let pos = 0;
let processHandler = () => {
if (pos >= handlers.length) {
return setImmediate(next);
}
let entry = handlers[pos++];
let returned = false;
try {
entry.handler(...args, err => {
if (returned) {
return;
}
returned = true;
if (err) {
entry.plugin.logger.error('Failed processing hook %s. %s', hook, err.message);
}
setImmediate(processHandler);
});
} catch (E) {
if (returned) {
return;
}
returned = true;
entry.plugin.logger.error('Failed processing hook %s. %s', hook, E.message);
setImmediate(processHandler);
}
};
setImmediate(processHandler);
module.exports.init = context => {
module.exports.handler = new PluginHandler({
logger: log,
pluginsPath: config.plugins.pluginsPath,
plugins: config.plugins.conf,
context,
log: config.log,
db
});
};

View file

@ -22,7 +22,7 @@
"ajv": "8.11.0",
"chai": "4.3.6",
"docsify-cli": "4.4.4",
"eslint": "8.17.0",
"eslint": "8.18.0",
"eslint-config-nodemailer": "1.2.0",
"eslint-config-prettier": "8.5.0",
"grunt": "1.5.3",
@ -31,7 +31,7 @@
"grunt-mocha-test": "0.13.3",
"grunt-shell-spawn": "0.4.0",
"grunt-wait": "0.3.0",
"imapflow": "1.0.99",
"imapflow": "1.0.100",
"mailparser": "3.5.0",
"mocha": "10.0.0",
"request": "2.88.2",
@ -48,7 +48,7 @@
"base32.js": "0.1.0",
"bcryptjs": "2.4.3",
"bull": "3.29.3",
"fido2-lib": "3.2.0",
"fido2-lib": "3.2.4",
"gelf": "2.0.1",
"generate-password": "1.7.0",
"he": "1.2.0",
@ -74,7 +74,7 @@
"node-html-parser": "5.3.3",
"nodemailer": "6.7.5",
"npmlog": "6.0.2",
"openpgp": "5.2.1",
"openpgp": "5.3.0",
"pem-jwk": "2.0.0",
"punycode": "2.1.1",
"pwnedpasswords": "1.0.6",
@ -89,8 +89,9 @@
"unix-crypt-td-js": "1.1.4",
"unixcrypt": "1.1.0",
"uuid": "8.3.2",
"wild-config": "1.6.0",
"yargs": "17.5.1"
"wild-config": "1.6.1",
"yargs": "17.5.1",
"zone-mta": "3.4.0"
},
"repository": {
"type": "git",

3
plugins/core/rspamd.js Normal file
View file

@ -0,0 +1,3 @@
'use strict';
module.exports = require('zone-mta/plugins/core/rspamd');

View file

@ -6,9 +6,8 @@ module.exports.init = (app, done) => {
// do your initialization stuff here
// init hook is called immediatelly after server is started
app.addHook('init', next => {
app.logger.info('Example plugin initialized. Value1=%s', app.config.value1);
next();
app.addHook('init', async () => {
app.logger.info('Example plugin initialized. Value1=%s', JSON.stringify(app.config));
});
setImmediate(done);

View file

@ -98,14 +98,10 @@ db.connect(err => {
}
}
plugins.init(err => {
if (err) {
log.error('App', 'Failed to start plugins');
errors.notify(err);
return setTimeout(() => process.exit(1), 3000);
}
plugins.runHooks('init', () => {
plugins.init('receiver');
plugins.handler.load(() => {
log.verbose('Plugins', 'Plugins loaded');
plugins.handler.runHooks('init', [], () => {
log.info('App', 'All servers started, ready to process some mail');
});
});