initial plugin system

This commit is contained in:
Andris Reinman 2017-10-26 14:57:19 +03:00
parent 2c6a87eb7c
commit 60daa09f67
7 changed files with 519 additions and 343 deletions

View file

@ -1,10 +1,10 @@
[example]
name="Example Plugin"
enabled = true
# $WD: path of wildduck module root
# $CONFIG: path of config root
path="$WD/plugins/example.js"
path = "$WD/plugins/example.js"
# Additional config options
value1 = "Example config option"

View file

@ -5,6 +5,7 @@ const ObjectID = require('mongodb').ObjectID;
const db = require('./db');
const forward = require('./forward');
const autoreply = require('./autoreply');
const plugins = require('./plugins');
const defaultSpamHeaderKeys = [
{
@ -206,265 +207,272 @@ class FilterHandler {
}))
);
let forwardTargets = new Set();
let forwardTargetUrls = new Set();
let matchingFilters = [];
let filterActions = new Map();
let forwardTargets = new Map();
filters
// apply all filters to the message
.map(filter => checkFilter(filter, prepared, maildata))
// remove all unmatched filters
.filter(filter => filter)
// apply filter actions
.forEach(filter => {
matchingFilters.push(filter.id);
plugins.runHooks('filter', filters, forwardTargets, () => {
let matchingFilters = [];
let filterActions = new Map();
// apply matching filter
if (!filterActions) {
filterActions = filter.action;
} else {
Object.keys(filter.action).forEach(key => {
if (key === 'forward') {
forwardTargets.add(filter.action[key]);
return;
}
filters
// apply all filters to the message
.map(filter => checkFilter(filter, prepared, maildata))
// remove all unmatched filters
.filter(filter => filter)
// apply filter actions
.forEach(filter => {
matchingFilters.push(filter.id);
if (key === 'targetUrl') {
forwardTargetUrls.add(filter.action[key]);
return;
}
// if a previous filter already has set a value then do not touch it
if (!filterActions.has(key)) {
filterActions.set(key, filter.action[key]);
}
});
}
});
let forwardMessage = done => {
if (userData.forward && !filterActions.get('delete')) {
// forward to default recipient only if the message is not deleted
forwardTargets.add(userData.forward);
}
if (userData.targetUrl && !filterActions.get('delete')) {
// forward to default URL only if the message is not deleted
forwardTargetUrls.add(userData.targetUrl);
}
// never forward messages marked as spam
if ((!forwardTargets.size && !forwardTargetUrls.size) || filterActions.get('spam')) {
return setImmediate(done);
}
// check limiting counters
this.messageHandler.counters.ttlcounter(
'wdf:' + userData._id.toString(),
forwardTargets.size + forwardTargetUrls.size,
userData.forwards,
false,
(err, result) => {
if (err) {
// failed checks
log.error('LMTP', 'FRWRDFAIL key=%s error=%s', 'wdf:' + userData._id.toString(), err.message);
} else if (!result.success) {
log.silly('LMTP', 'FRWRDFAIL key=%s error=%s', 'wdf:' + userData._id.toString(), 'Precondition failed');
return done();
}
forward(
{
parentId: prepared.id,
userData,
sender,
recipient,
forward: forwardTargets.size ? Array.from(forwardTargets) : false,
targetUrl: forwardTargetUrls.size ? Array.from(forwardTargetUrls) : false,
chunks,
chunklen
},
done
);
}
);
};
let sendAutoreply = done => {
// never reply to messages marked as spam
if (!sender || !userData.autoreply || filterActions.get('spam')) {
return setImmediate(done);
}
autoreply(
{
parentId: prepared.id,
userData,
sender,
recipient,
chunks,
chunklen,
messageHandler: this.messageHandler
},
done
);
};
let outbound = [];
forwardMessage((err, id) => {
if (err) {
log.error(
'LMTP',
'%s FRWRDFAIL from=%s to=%s target=%s error=%s',
prepared.id.toString(),
sender,
recipient,
Array.from(forwardTargets)
.concat(forwardTargetUrls)
.join(','),
err.message
);
} else if (id) {
outbound.push(id);
log.silly(
'LMTP',
'%s FRWRDOK id=%s from=%s to=%s target=%s',
prepared.id.toString(),
id,
sender,
recipient,
Array.from(forwardTargets)
.concat(Array.from(forwardTargetUrls))
.join(',')
);
}
sendAutoreply((err, id) => {
if (err) {
log.error('LMTP', '%s AUTOREPLYFAIL from=%s to=%s error=%s', prepared.id.toString(), '<>', sender, err.message);
} else if (id) {
outbound.push(id);
log.silly('LMTP', '%s AUTOREPLYOK id=%s from=%s to=%s', prepared.id.toString(), id, '<>', sender);
}
if (filterActions.get('delete')) {
// nothing to do with the message, just continue
return callback(null, {
userData,
response: 'Message dropped by policy as ' + prepared.id.toString()
});
}
// apply filter results to the message
filterActions.forEach((value, key) => {
switch (key) {
case 'spam':
if (value > 0) {
// positive value is spam
mailboxQueryKey = 'specialUse';
mailboxQueryValue = '\\Junk';
// apply matching filter
if (!filterActions) {
filterActions = filter.action;
} else {
Object.keys(filter.action).forEach(key => {
if (key === 'forward') {
[].concat(filter.action[key] || []).forEach(address => {
forwardTargets.set(address, { type: 'mail', value: address });
});
return;
}
break;
case 'seen':
if (value) {
flags.push('\\Seen');
if (key === 'targetUrl') {
[].concat(filter.action[key] || []).forEach(address => {
forwardTargets.set(address, { type: 'http', value: address });
});
return;
}
break;
case 'flag':
if (value) {
flags.push('\\Flagged');
// if a previous filter already has set a value then do not touch it
if (!filterActions.has(key)) {
filterActions.set(key, filter.action[key]);
}
break;
case 'mailbox':
if (value) {
// positive value is spam
mailboxQueryKey = 'mailbox';
mailboxQueryValue = value;
}
break;
});
}
});
let messageOpts = {
user: userData._id,
[mailboxQueryKey]: mailboxQueryValue,
prepared,
maildata,
meta: options.meta,
filters: matchingFilters,
date: false,
flags,
// if similar message exists, then skip
skipExisting: true
};
let received = [].concat((prepared.mimeTree.parsedHeader && prepared.mimeTree.parsedHeader.received) || []);
if (received.length) {
let receivedData = parseReceived(received[0]);
if (!receivedData.has('id') && received.length > 1) {
receivedData = parseReceived(received[1]);
}
if (receivedData.has('with')) {
messageOpts.meta.transtype = receivedData.get('with');
}
if (receivedData.has('id')) {
messageOpts.meta.queueId = receivedData.get('id');
}
if (receivedData.has('from')) {
messageOpts.meta.origin = receivedData.get('from');
}
let forwardMessage = done => {
if (userData.forward && !filterActions.get('delete')) {
// forward to default recipient only if the message is not deleted
forwardTargets.set(userData.forward, { type: 'mail', value: userData.forward });
}
if (outbound && outbound.length) {
messageOpts.outbound = [].concat(outbound || []);
if (userData.targetUrl && !filterActions.get('delete')) {
// forward to default URL only if the message is not deleted
forwardTargets.set(userData.targetUrl, { type: 'http', value: userData.targetUrl });
}
this.messageHandler.encryptMessage(
userData.encryptMessages ? userData.pubKey : false,
{
chunks,
chunklen
},
(err, encrypted) => {
if (!err && encrypted) {
messageOpts.prepared = this.messageHandler.prepareMessage({
raw: Buffer.concat([extraHeader, encrypted]),
indexedHeaders: this.spamHeaderKeys
});
messageOpts.maildata = this.messageHandler.indexer.getMaildata(messageOpts.prepared.mimeTree);
// never forward messages marked as spam
if (!forwardTargets.size || filterActions.get('spam')) {
return setImmediate(done);
}
// check limiting counters
this.messageHandler.counters.ttlcounter(
'wdf:' + userData._id.toString(),
forwardTargets.size,
userData.forwards,
false,
(err, result) => {
if (err) {
// failed checks
log.error('LMTP', 'FRWRDFAIL key=%s error=%s', 'wdf:' + userData._id.toString(), err.message);
} else if (!result.success) {
log.silly('LMTP', 'FRWRDFAIL key=%s error=%s', 'wdf:' + userData._id.toString(), 'Precondition failed');
return done();
}
this.messageHandler.add(messageOpts, (err, inserted, info) => {
// push to response list
callback(
null,
{
userData,
response: err ? err : 'Message stored as ' + info.id.toString()
},
!encrypted
? {
mimeTree: messageOpts.prepared.mimeTree,
maildata: messageOpts.maildata
}
: false
);
});
forward(
{
parentId: prepared.id,
userData,
sender,
recipient,
targets: forwardTargets.size
? Array.from(forwardTargets).map(row => ({ type: row[1].type, value: row[1].value }))
: false,
chunks,
chunklen
},
done
);
}
);
};
let sendAutoreply = done => {
// never reply to messages marked as spam
if (!sender || !userData.autoreply || filterActions.get('spam')) {
return setImmediate(done);
}
autoreply(
{
parentId: prepared.id,
userData,
sender,
recipient,
chunks,
chunklen,
messageHandler: this.messageHandler
},
done
);
};
let outbound = [];
forwardMessage((err, id) => {
if (err) {
log.error(
'LMTP',
'%s FRWRDFAIL from=%s to=%s target=%s error=%s',
prepared.id.toString(),
sender,
recipient,
Array.from(forwardTargets)
.map(row => row[0])
.join(','),
err.message
);
} else if (id) {
outbound.push(id);
log.silly(
'LMTP',
'%s FRWRDOK id=%s from=%s to=%s target=%s',
prepared.id.toString(),
id,
sender,
recipient,
Array.from(forwardTargets)
.map(row => row[0])
.join(',')
);
}
sendAutoreply((err, id) => {
if (err) {
log.error('LMTP', '%s AUTOREPLYFAIL from=%s to=%s error=%s', prepared.id.toString(), '<>', sender, err.message);
} else if (id) {
outbound.push(id);
log.silly('LMTP', '%s AUTOREPLYOK id=%s from=%s to=%s', prepared.id.toString(), id, '<>', sender);
}
if (filterActions.get('delete')) {
// nothing to do with the message, just continue
return callback(null, {
userData,
response: 'Message dropped by policy as ' + prepared.id.toString()
});
}
// apply filter results to the message
filterActions.forEach((value, key) => {
switch (key) {
case 'spam':
if (value > 0) {
// positive value is spam
mailboxQueryKey = 'specialUse';
mailboxQueryValue = '\\Junk';
}
break;
case 'seen':
if (value) {
flags.push('\\Seen');
}
break;
case 'flag':
if (value) {
flags.push('\\Flagged');
}
break;
case 'mailbox':
if (value) {
// positive value is spam
mailboxQueryKey = 'mailbox';
mailboxQueryValue = value;
}
break;
}
});
let messageOpts = {
user: userData._id,
[mailboxQueryKey]: mailboxQueryValue,
prepared,
maildata,
meta: options.meta,
filters: matchingFilters,
date: false,
flags,
// if similar message exists, then skip
skipExisting: true
};
let received = [].concat((prepared.mimeTree.parsedHeader && prepared.mimeTree.parsedHeader.received) || []);
if (received.length) {
let receivedData = parseReceived(received[0]);
if (!receivedData.has('id') && received.length > 1) {
receivedData = parseReceived(received[1]);
}
if (receivedData.has('with')) {
messageOpts.meta.transtype = receivedData.get('with');
}
if (receivedData.has('id')) {
messageOpts.meta.queueId = receivedData.get('id');
}
if (receivedData.has('from')) {
messageOpts.meta.origin = receivedData.get('from');
}
}
if (outbound && outbound.length) {
messageOpts.outbound = [].concat(outbound || []);
}
this.messageHandler.encryptMessage(
userData.encryptMessages ? userData.pubKey : false,
{
chunks,
chunklen
},
(err, encrypted) => {
if (!err && encrypted) {
messageOpts.prepared = this.messageHandler.prepareMessage({
raw: Buffer.concat([extraHeader, encrypted]),
indexedHeaders: this.spamHeaderKeys
});
messageOpts.maildata = this.messageHandler.indexer.getMaildata(messageOpts.prepared.mimeTree);
}
this.messageHandler.add(messageOpts, (err, inserted, info) => {
// push to response list
callback(
null,
{
userData,
response: err ? err : 'Message stored as ' + info.id.toString()
},
!encrypted
? {
mimeTree: messageOpts.prepared.mimeTree,
maildata: messageOpts.maildata
}
: false
);
});
}
);
});
});
});
});

View file

@ -9,38 +9,33 @@ module.exports = (options, callback) => {
return callback(null, false);
}
let message = maildrop(
{
parentId: options.parentId,
reason: 'forward',
let mail = {
parentId: options.parentId,
reason: 'forward',
from: options.sender,
to: options.recipient,
targets: options.targets,
interface: 'forwarder'
};
let message = maildrop(mail, (err, ...args) => {
if (err || !args[0]) {
return callback(err, ...args);
}
db.database.collection('messagelog').insertOne({
id: args[0].id,
messageId: args[0].messageId,
action: 'FORWARD',
parentId: options.parentId,
from: options.sender,
to: options.recipient,
forward: options.forward,
http: !!options.targetUrl,
targeUrl: options.targetUrl,
interface: 'forwarder'
},
(err, ...args) => {
if (err || !args[0]) {
return callback(err, ...args);
}
db.database.collection('messagelog').insertOne({
id: args[0].id,
messageId: args[0].messageId,
action: 'FORWARD',
parentId: options.parentId,
from: options.sender,
to: options.recipient,
forward: options.forward,
http: !!options.targetUrl,
targeUrl: options.targetUrl,
created: new Date()
}, () => callback(err, args && args[0] && args[0].id));
}
);
targets: options.targets,
created: new Date()
}, () => callback(err, args && args[0] && args[0].id));
});
setImmediate(() => {
let pos = 0;

View file

@ -12,6 +12,7 @@ const os = require('os');
const hostname = os.hostname().toLowerCase();
const addressparser = require('addressparser');
const punycode = require('punycode');
const plugins = require('./plugins');
let gridstore;
@ -157,20 +158,32 @@ module.exports = (options, callback) => {
let deliveries = [];
if (options.targeUrl) {
let targetUrls = [].concat(options.targeUrl || []).map(targetUrl => ({
to: options.to,
http: true,
targetUrl
}));
deliveries = deliveries.concat(targetUrls);
}
if (options.forward) {
let forwards = [].concat(options.forward || []).map(forward => ({
to: forward
}));
deliveries = deliveries.concat(forwards);
if (options.targets) {
options.targets.forEach(target => {
switch (target.type) {
case 'mail':
deliveries.push({
to: target.value
});
break;
case 'relay':
deliveries.push({
to: options.to,
mx: target.value.mx,
mxPort: target.value.mxPort,
mxAuth: target.value.mxAuth,
mxSecure: target.value.mxSecure
});
break;
case 'http':
deliveries.push({
to: options.to,
http: true,
targetUrl: target.value
});
break;
}
});
}
if (!deliveries.length) {
@ -204,62 +217,72 @@ module.exports = (options, callback) => {
return callback(err);
}
envelope.headers = envelope.headers.getList();
setMeta(id, envelope, err => {
if (err) {
return removeMessage(id, () => callback(err));
}
let date = new Date();
for (let i = 0, len = deliveries.length; i < len; i++) {
let recipient = deliveries[i];
let deliveryZone = options.zone || config.sender.zone || 'default';
let recipientDomain = recipient.to.substr(recipient.to.lastIndexOf('@') + 1).replace(/[[\]]/g, '');
seq++;
let deliverySeq = (seq < 0x100 ? '0' : '') + (seq < 0x10 ? '0' : '') + seq.toString(16);
let delivery = {
id,
seq: deliverySeq,
// Actual delivery data
domain: recipientDomain,
sendingZone: deliveryZone,
assigned: 'no',
// actual recipient address
recipient: recipient.to,
http: recipient.http,
targetUrl: recipient.targetUrl,
locked: false,
lockTime: 0,
// earliest time to attempt delivery, defaults to now
queued: date,
// queued date might change but created should not
created: date
};
documents.push(delivery);
}
db.senderDb.collection(config.sender.collection).insertMany(documents, {
w: 1,
ordered: false
}, err => {
plugins.runHooks('maildrop', options, envelope, deliveries, () => {
envelope.headers = envelope.headers.getList();
setMeta(id, envelope, err => {
if (err) {
return callback(err);
return removeMessage(id, () => callback(err));
}
callback(null, envelope);
let date = new Date();
for (let i = 0, len = deliveries.length; i < len; i++) {
let recipient = deliveries[i];
let deliveryZone = options.zone || config.sender.zone || 'default';
let recipientDomain = recipient.to.substr(recipient.to.lastIndexOf('@') + 1).replace(/[[\]]/g, '');
seq++;
let deliverySeq = (seq < 0x100 ? '0' : '') + (seq < 0x10 ? '0' : '') + seq.toString(16);
let delivery = {
id,
seq: deliverySeq,
// Actual delivery data
domain: recipientDomain,
sendingZone: deliveryZone,
assigned: 'no',
// actual recipient address
recipient: recipient.to,
locked: false,
lockTime: 0,
// earliest time to attempt delivery, defaults to now
queued: date,
// queued date might change but created should not
created: date
};
if (recipient.http) {
delivery.http = recipient.http;
delivery.targetUrl = recipient.targetUrl;
}
['mx', 'mxPort', 'mxAuth', 'mxSecure'].forEach(key => {
if (recipient[key]) {
delivery[key] = recipient[key];
}
});
documents.push(delivery);
}
db.senderDb.collection(config.sender.collection).insertMany(documents, {
w: 1,
ordered: false
}, err => {
if (err) {
return callback(err);
}
callback(null, envelope);
});
});
});
});
messageSplitter.pipe(dkimStream);
return messageSplitter;
};

View file

@ -2,11 +2,143 @@
const config = require('wild-config');
const pathlib = require('path');
const log = require('npmlog');
const db = require('./db');
const WD_PATH = pathlib.join(__dirname, '..');
const CONFIG_PATH = pathlib.join(__dirname, '..');
const CONFIG_PATH = config.configDirectory || WD_PATH;
module.exports = next => {
console.log(config);
setImmediate(next);
const hooks = new Map();
class PluginInstance {
constructor(key, config) {
this.db = db;
this.key = key;
this.config = config || {};
this.logger = {};
['silly', 'verbose', 'info', 'http', 'warn', 'error', 'debug', 'err'].forEach(level => {
this.logger[level] = (...args) => {
switch (level) {
case 'debug':
level = 'verbose';
break;
case 'err':
level = 'error';
break;
}
log[level]('[' + key + ']', ...args);
};
});
}
addHook(hook, handler) {
hook = (hook || '')
.toString()
.replace(/\s+/g, '')
.toLowerCase();
if (!hook) {
return;
}
if (!hooks.has(hook)) {
hooks.set(hook, []);
}
hooks.get(hook).push({ plugin: this, handler });
}
init(done) {
if (!this.config.path) {
this.logger.debug('Plugin path not provided, skipping');
return setImmediate(done);
}
try {
let pluginPath = this.config.path.replace(/\$WD/g, WD_PATH).replace(/\$CONFIG/g, CONFIG_PATH);
this.module = require(pluginPath); //eslint-disable-line global-require
} catch (E) {
this.logger.error('Failed to load plugin. %s', E.message);
return setImmediate(done);
}
if (typeof this.module.init !== 'function') {
this.logger.debug('Init method not found');
return setImmediate(done);
}
try {
return this.module.init(this, err => {
if (err) {
this.logger.error('Initialization resulted with an error. %s', err.message);
} else {
this.logger.debug('Plugin "%s" initialized', this.module.title || this.key);
}
return setImmediate(done);
});
} catch (E) {
this.logger.error('Failed executing init method. %s', E.message);
return setImmediate(done);
}
}
}
module.exports.init = next => {
let keys = Object.keys(config.plugins || {});
let pos = 0;
let loadNextPlugin = () => {
if (pos >= keys.length) {
return setImmediate(next);
}
let key = keys[pos++];
if (!config.plugins[key].enabled) {
return setImmediate(loadNextPlugin);
}
let plugin = new PluginInstance(key, config.plugins[key]);
plugin.init(loadNextPlugin);
};
setImmediate(loadNextPlugin);
};
module.exports.runHooks = (hook, ...args) => {
let next = args.pop();
hook = (hook || '')
.toString()
.replace(/\s+/g, '')
.toLowerCase();
if (!hook || !hooks.has(hook)) {
return setImmediate(next);
}
let handlers = hooks.get(hook);
let pos = 0;
let processHandler = () => {
if (pos >= handlers.length) {
return setImmediate(next);
}
let entry = handlers[pos++];
let returned = false;
try {
entry.handler(...args, err => {
if (returned) {
return;
}
returned = true;
if (err) {
entry.plugin.logger.error('Failed processing hook %s. %s', hook, err.message);
}
setImmediate(processHandler);
});
} catch (E) {
if (returned) {
return;
}
returned = true;
entry.plugin.logger.error('Failed processing hook %s. %s', hook, E.message);
setImmediate(processHandler);
}
};
setImmediate(processHandler);
};

15
plugins/example.js Normal file
View file

@ -0,0 +1,15 @@
'use strict';
module.exports.title = 'Example Plugin';
module.exports.init = (app, done) => {
// do your initialization stuff here
// init hook is called immediatelly after server is started
app.addHook('init', next => {
app.logger.info('Example plugin initialized. Value1=%s', app.config.value1);
next();
});
setImmediate(done);
};

View file

@ -81,13 +81,16 @@ db.connect(err => {
}
}
plugins(err => {
plugins.init(err => {
if (err) {
log.error('App', 'Failed to start plugins');
errors.notify(err);
return setTimeout(() => process.exit(1), 3000);
}
log.info('App', 'All servers started, ready to process some mail');
plugins.runHooks('init', () => {
log.info('App', 'All servers started, ready to process some mail');
});
});
});
});