From 60daa09f6736df0ff1f507c7b2aa2ac59b97766a Mon Sep 17 00:00:00 2001 From: Andris Reinman Date: Thu, 26 Oct 2017 14:57:19 +0300 Subject: [PATCH] initial plugin system --- config/plugins/example.toml | 4 +- lib/filter-handler.js | 492 ++++++++++++++++++------------------ lib/forward.js | 53 ++-- lib/maildrop.js | 151 ++++++----- lib/plugins.js | 140 +++++++++- plugins/example.js | 15 ++ worker.js | 7 +- 7 files changed, 519 insertions(+), 343 deletions(-) create mode 100644 plugins/example.js diff --git a/config/plugins/example.toml b/config/plugins/example.toml index 11718a46..040e0343 100644 --- a/config/plugins/example.toml +++ b/config/plugins/example.toml @@ -1,10 +1,10 @@ [example] -name="Example Plugin" +enabled = true # $WD: path of wildduck module root # $CONFIG: path of config root -path="$WD/plugins/example.js" +path = "$WD/plugins/example.js" # Additional config options value1 = "Example config option" diff --git a/lib/filter-handler.js b/lib/filter-handler.js index a2e36fb9..3c8ef544 100644 --- a/lib/filter-handler.js +++ b/lib/filter-handler.js @@ -5,6 +5,7 @@ const ObjectID = require('mongodb').ObjectID; const db = require('./db'); const forward = require('./forward'); const autoreply = require('./autoreply'); +const plugins = require('./plugins'); const defaultSpamHeaderKeys = [ { @@ -206,265 +207,272 @@ class FilterHandler { })) ); - let forwardTargets = new Set(); - let forwardTargetUrls = new Set(); - let matchingFilters = []; - let filterActions = new Map(); + let forwardTargets = new Map(); - filters - // apply all filters to the message - .map(filter => checkFilter(filter, prepared, maildata)) - // remove all unmatched filters - .filter(filter => filter) - // apply filter actions - .forEach(filter => { - matchingFilters.push(filter.id); + plugins.runHooks('filter', filters, forwardTargets, () => { + let matchingFilters = []; + let filterActions = new Map(); - // apply matching filter - if (!filterActions) { - filterActions = filter.action; - } else { - Object.keys(filter.action).forEach(key => { - if (key === 'forward') { - forwardTargets.add(filter.action[key]); - return; - } + filters + // apply all filters to the message + .map(filter => checkFilter(filter, prepared, maildata)) + // remove all unmatched filters + .filter(filter => filter) + // apply filter actions + .forEach(filter => { + matchingFilters.push(filter.id); - if (key === 'targetUrl') { - forwardTargetUrls.add(filter.action[key]); - return; - } - - // if a previous filter already has set a value then do not touch it - if (!filterActions.has(key)) { - filterActions.set(key, filter.action[key]); - } - }); - } - }); - - let forwardMessage = done => { - if (userData.forward && !filterActions.get('delete')) { - // forward to default recipient only if the message is not deleted - forwardTargets.add(userData.forward); - } - - if (userData.targetUrl && !filterActions.get('delete')) { - // forward to default URL only if the message is not deleted - forwardTargetUrls.add(userData.targetUrl); - } - - // never forward messages marked as spam - if ((!forwardTargets.size && !forwardTargetUrls.size) || filterActions.get('spam')) { - return setImmediate(done); - } - - // check limiting counters - this.messageHandler.counters.ttlcounter( - 'wdf:' + userData._id.toString(), - forwardTargets.size + forwardTargetUrls.size, - userData.forwards, - false, - (err, result) => { - if (err) { - // failed checks - log.error('LMTP', 'FRWRDFAIL key=%s error=%s', 'wdf:' + userData._id.toString(), err.message); - } else if (!result.success) { - log.silly('LMTP', 'FRWRDFAIL key=%s error=%s', 'wdf:' + userData._id.toString(), 'Precondition failed'); - return done(); - } - - forward( - { - parentId: prepared.id, - userData, - sender, - recipient, - - forward: forwardTargets.size ? Array.from(forwardTargets) : false, - targetUrl: forwardTargetUrls.size ? Array.from(forwardTargetUrls) : false, - - chunks, - chunklen - }, - done - ); - } - ); - }; - - let sendAutoreply = done => { - // never reply to messages marked as spam - if (!sender || !userData.autoreply || filterActions.get('spam')) { - return setImmediate(done); - } - - autoreply( - { - parentId: prepared.id, - userData, - sender, - recipient, - chunks, - chunklen, - messageHandler: this.messageHandler - }, - done - ); - }; - - let outbound = []; - - forwardMessage((err, id) => { - if (err) { - log.error( - 'LMTP', - '%s FRWRDFAIL from=%s to=%s target=%s error=%s', - prepared.id.toString(), - sender, - recipient, - Array.from(forwardTargets) - .concat(forwardTargetUrls) - .join(','), - err.message - ); - } else if (id) { - outbound.push(id); - log.silly( - 'LMTP', - '%s FRWRDOK id=%s from=%s to=%s target=%s', - prepared.id.toString(), - id, - sender, - recipient, - Array.from(forwardTargets) - .concat(Array.from(forwardTargetUrls)) - .join(',') - ); - } - - sendAutoreply((err, id) => { - if (err) { - log.error('LMTP', '%s AUTOREPLYFAIL from=%s to=%s error=%s', prepared.id.toString(), '<>', sender, err.message); - } else if (id) { - outbound.push(id); - log.silly('LMTP', '%s AUTOREPLYOK id=%s from=%s to=%s', prepared.id.toString(), id, '<>', sender); - } - - if (filterActions.get('delete')) { - // nothing to do with the message, just continue - return callback(null, { - userData, - response: 'Message dropped by policy as ' + prepared.id.toString() - }); - } - - // apply filter results to the message - filterActions.forEach((value, key) => { - switch (key) { - case 'spam': - if (value > 0) { - // positive value is spam - mailboxQueryKey = 'specialUse'; - mailboxQueryValue = '\\Junk'; + // apply matching filter + if (!filterActions) { + filterActions = filter.action; + } else { + Object.keys(filter.action).forEach(key => { + if (key === 'forward') { + [].concat(filter.action[key] || []).forEach(address => { + forwardTargets.set(address, { type: 'mail', value: address }); + }); + return; } - break; - case 'seen': - if (value) { - flags.push('\\Seen'); + + if (key === 'targetUrl') { + [].concat(filter.action[key] || []).forEach(address => { + forwardTargets.set(address, { type: 'http', value: address }); + }); + return; } - break; - case 'flag': - if (value) { - flags.push('\\Flagged'); + + // if a previous filter already has set a value then do not touch it + if (!filterActions.has(key)) { + filterActions.set(key, filter.action[key]); } - break; - case 'mailbox': - if (value) { - // positive value is spam - mailboxQueryKey = 'mailbox'; - mailboxQueryValue = value; - } - break; + }); } }); - let messageOpts = { - user: userData._id, - [mailboxQueryKey]: mailboxQueryValue, - - prepared, - maildata, - - meta: options.meta, - - filters: matchingFilters, - - date: false, - flags, - - // if similar message exists, then skip - skipExisting: true - }; - - let received = [].concat((prepared.mimeTree.parsedHeader && prepared.mimeTree.parsedHeader.received) || []); - if (received.length) { - let receivedData = parseReceived(received[0]); - - if (!receivedData.has('id') && received.length > 1) { - receivedData = parseReceived(received[1]); - } - - if (receivedData.has('with')) { - messageOpts.meta.transtype = receivedData.get('with'); - } - - if (receivedData.has('id')) { - messageOpts.meta.queueId = receivedData.get('id'); - } - - if (receivedData.has('from')) { - messageOpts.meta.origin = receivedData.get('from'); - } + let forwardMessage = done => { + if (userData.forward && !filterActions.get('delete')) { + // forward to default recipient only if the message is not deleted + forwardTargets.set(userData.forward, { type: 'mail', value: userData.forward }); } - if (outbound && outbound.length) { - messageOpts.outbound = [].concat(outbound || []); + if (userData.targetUrl && !filterActions.get('delete')) { + // forward to default URL only if the message is not deleted + forwardTargets.set(userData.targetUrl, { type: 'http', value: userData.targetUrl }); } - this.messageHandler.encryptMessage( - userData.encryptMessages ? userData.pubKey : false, - { - chunks, - chunklen - }, - (err, encrypted) => { - if (!err && encrypted) { - messageOpts.prepared = this.messageHandler.prepareMessage({ - raw: Buffer.concat([extraHeader, encrypted]), - indexedHeaders: this.spamHeaderKeys - }); - messageOpts.maildata = this.messageHandler.indexer.getMaildata(messageOpts.prepared.mimeTree); + // never forward messages marked as spam + if (!forwardTargets.size || filterActions.get('spam')) { + return setImmediate(done); + } + + // check limiting counters + this.messageHandler.counters.ttlcounter( + 'wdf:' + userData._id.toString(), + forwardTargets.size, + userData.forwards, + false, + (err, result) => { + if (err) { + // failed checks + log.error('LMTP', 'FRWRDFAIL key=%s error=%s', 'wdf:' + userData._id.toString(), err.message); + } else if (!result.success) { + log.silly('LMTP', 'FRWRDFAIL key=%s error=%s', 'wdf:' + userData._id.toString(), 'Precondition failed'); + return done(); } - this.messageHandler.add(messageOpts, (err, inserted, info) => { - // push to response list - callback( - null, - { - userData, - response: err ? err : 'Message stored as ' + info.id.toString() - }, - !encrypted - ? { - mimeTree: messageOpts.prepared.mimeTree, - maildata: messageOpts.maildata - } - : false - ); - }); + forward( + { + parentId: prepared.id, + userData, + sender, + recipient, + + targets: forwardTargets.size + ? Array.from(forwardTargets).map(row => ({ type: row[1].type, value: row[1].value })) + : false, + + chunks, + chunklen + }, + done + ); } ); + }; + + let sendAutoreply = done => { + // never reply to messages marked as spam + if (!sender || !userData.autoreply || filterActions.get('spam')) { + return setImmediate(done); + } + + autoreply( + { + parentId: prepared.id, + userData, + sender, + recipient, + chunks, + chunklen, + messageHandler: this.messageHandler + }, + done + ); + }; + + let outbound = []; + + forwardMessage((err, id) => { + if (err) { + log.error( + 'LMTP', + '%s FRWRDFAIL from=%s to=%s target=%s error=%s', + prepared.id.toString(), + sender, + recipient, + Array.from(forwardTargets) + .map(row => row[0]) + .join(','), + err.message + ); + } else if (id) { + outbound.push(id); + log.silly( + 'LMTP', + '%s FRWRDOK id=%s from=%s to=%s target=%s', + prepared.id.toString(), + id, + sender, + recipient, + Array.from(forwardTargets) + .map(row => row[0]) + .join(',') + ); + } + + sendAutoreply((err, id) => { + if (err) { + log.error('LMTP', '%s AUTOREPLYFAIL from=%s to=%s error=%s', prepared.id.toString(), '<>', sender, err.message); + } else if (id) { + outbound.push(id); + log.silly('LMTP', '%s AUTOREPLYOK id=%s from=%s to=%s', prepared.id.toString(), id, '<>', sender); + } + + if (filterActions.get('delete')) { + // nothing to do with the message, just continue + return callback(null, { + userData, + response: 'Message dropped by policy as ' + prepared.id.toString() + }); + } + + // apply filter results to the message + filterActions.forEach((value, key) => { + switch (key) { + case 'spam': + if (value > 0) { + // positive value is spam + mailboxQueryKey = 'specialUse'; + mailboxQueryValue = '\\Junk'; + } + break; + case 'seen': + if (value) { + flags.push('\\Seen'); + } + break; + case 'flag': + if (value) { + flags.push('\\Flagged'); + } + break; + case 'mailbox': + if (value) { + // positive value is spam + mailboxQueryKey = 'mailbox'; + mailboxQueryValue = value; + } + break; + } + }); + + let messageOpts = { + user: userData._id, + [mailboxQueryKey]: mailboxQueryValue, + + prepared, + maildata, + + meta: options.meta, + + filters: matchingFilters, + + date: false, + flags, + + // if similar message exists, then skip + skipExisting: true + }; + + let received = [].concat((prepared.mimeTree.parsedHeader && prepared.mimeTree.parsedHeader.received) || []); + if (received.length) { + let receivedData = parseReceived(received[0]); + + if (!receivedData.has('id') && received.length > 1) { + receivedData = parseReceived(received[1]); + } + + if (receivedData.has('with')) { + messageOpts.meta.transtype = receivedData.get('with'); + } + + if (receivedData.has('id')) { + messageOpts.meta.queueId = receivedData.get('id'); + } + + if (receivedData.has('from')) { + messageOpts.meta.origin = receivedData.get('from'); + } + } + + if (outbound && outbound.length) { + messageOpts.outbound = [].concat(outbound || []); + } + + this.messageHandler.encryptMessage( + userData.encryptMessages ? userData.pubKey : false, + { + chunks, + chunklen + }, + (err, encrypted) => { + if (!err && encrypted) { + messageOpts.prepared = this.messageHandler.prepareMessage({ + raw: Buffer.concat([extraHeader, encrypted]), + indexedHeaders: this.spamHeaderKeys + }); + messageOpts.maildata = this.messageHandler.indexer.getMaildata(messageOpts.prepared.mimeTree); + } + + this.messageHandler.add(messageOpts, (err, inserted, info) => { + // push to response list + callback( + null, + { + userData, + response: err ? err : 'Message stored as ' + info.id.toString() + }, + !encrypted + ? { + mimeTree: messageOpts.prepared.mimeTree, + maildata: messageOpts.maildata + } + : false + ); + }); + } + ); + }); }); }); }); diff --git a/lib/forward.js b/lib/forward.js index 061ccf99..0352aaaa 100644 --- a/lib/forward.js +++ b/lib/forward.js @@ -9,38 +9,33 @@ module.exports = (options, callback) => { return callback(null, false); } - let message = maildrop( - { - parentId: options.parentId, - reason: 'forward', + let mail = { + parentId: options.parentId, + reason: 'forward', + from: options.sender, + to: options.recipient, + + targets: options.targets, + + interface: 'forwarder' + }; + + let message = maildrop(mail, (err, ...args) => { + if (err || !args[0]) { + return callback(err, ...args); + } + db.database.collection('messagelog').insertOne({ + id: args[0].id, + messageId: args[0].messageId, + action: 'FORWARD', + parentId: options.parentId, from: options.sender, to: options.recipient, - - forward: options.forward, - http: !!options.targetUrl, - targeUrl: options.targetUrl, - - interface: 'forwarder' - }, - (err, ...args) => { - if (err || !args[0]) { - return callback(err, ...args); - } - db.database.collection('messagelog').insertOne({ - id: args[0].id, - messageId: args[0].messageId, - action: 'FORWARD', - parentId: options.parentId, - from: options.sender, - to: options.recipient, - forward: options.forward, - http: !!options.targetUrl, - targeUrl: options.targetUrl, - created: new Date() - }, () => callback(err, args && args[0] && args[0].id)); - } - ); + targets: options.targets, + created: new Date() + }, () => callback(err, args && args[0] && args[0].id)); + }); setImmediate(() => { let pos = 0; diff --git a/lib/maildrop.js b/lib/maildrop.js index 1d496146..58d59560 100644 --- a/lib/maildrop.js +++ b/lib/maildrop.js @@ -12,6 +12,7 @@ const os = require('os'); const hostname = os.hostname().toLowerCase(); const addressparser = require('addressparser'); const punycode = require('punycode'); +const plugins = require('./plugins'); let gridstore; @@ -157,20 +158,32 @@ module.exports = (options, callback) => { let deliveries = []; - if (options.targeUrl) { - let targetUrls = [].concat(options.targeUrl || []).map(targetUrl => ({ - to: options.to, - http: true, - targetUrl - })); - deliveries = deliveries.concat(targetUrls); - } - - if (options.forward) { - let forwards = [].concat(options.forward || []).map(forward => ({ - to: forward - })); - deliveries = deliveries.concat(forwards); + if (options.targets) { + options.targets.forEach(target => { + switch (target.type) { + case 'mail': + deliveries.push({ + to: target.value + }); + break; + case 'relay': + deliveries.push({ + to: options.to, + mx: target.value.mx, + mxPort: target.value.mxPort, + mxAuth: target.value.mxAuth, + mxSecure: target.value.mxSecure + }); + break; + case 'http': + deliveries.push({ + to: options.to, + http: true, + targetUrl: target.value + }); + break; + } + }); } if (!deliveries.length) { @@ -204,62 +217,72 @@ module.exports = (options, callback) => { return callback(err); } - envelope.headers = envelope.headers.getList(); - setMeta(id, envelope, err => { - if (err) { - return removeMessage(id, () => callback(err)); - } - - let date = new Date(); - - for (let i = 0, len = deliveries.length; i < len; i++) { - let recipient = deliveries[i]; - let deliveryZone = options.zone || config.sender.zone || 'default'; - let recipientDomain = recipient.to.substr(recipient.to.lastIndexOf('@') + 1).replace(/[[\]]/g, ''); - - seq++; - let deliverySeq = (seq < 0x100 ? '0' : '') + (seq < 0x10 ? '0' : '') + seq.toString(16); - let delivery = { - id, - seq: deliverySeq, - - // Actual delivery data - domain: recipientDomain, - sendingZone: deliveryZone, - - assigned: 'no', - - // actual recipient address - recipient: recipient.to, - http: recipient.http, - targetUrl: recipient.targetUrl, - - locked: false, - lockTime: 0, - - // earliest time to attempt delivery, defaults to now - queued: date, - - // queued date might change but created should not - created: date - }; - - documents.push(delivery); - } - - db.senderDb.collection(config.sender.collection).insertMany(documents, { - w: 1, - ordered: false - }, err => { + plugins.runHooks('maildrop', options, envelope, deliveries, () => { + envelope.headers = envelope.headers.getList(); + setMeta(id, envelope, err => { if (err) { - return callback(err); + return removeMessage(id, () => callback(err)); } - callback(null, envelope); + let date = new Date(); + + for (let i = 0, len = deliveries.length; i < len; i++) { + let recipient = deliveries[i]; + let deliveryZone = options.zone || config.sender.zone || 'default'; + let recipientDomain = recipient.to.substr(recipient.to.lastIndexOf('@') + 1).replace(/[[\]]/g, ''); + + seq++; + let deliverySeq = (seq < 0x100 ? '0' : '') + (seq < 0x10 ? '0' : '') + seq.toString(16); + let delivery = { + id, + seq: deliverySeq, + + // Actual delivery data + domain: recipientDomain, + sendingZone: deliveryZone, + + assigned: 'no', + + // actual recipient address + recipient: recipient.to, + + locked: false, + lockTime: 0, + + // earliest time to attempt delivery, defaults to now + queued: date, + + // queued date might change but created should not + created: date + }; + + if (recipient.http) { + delivery.http = recipient.http; + delivery.targetUrl = recipient.targetUrl; + } + + ['mx', 'mxPort', 'mxAuth', 'mxSecure'].forEach(key => { + if (recipient[key]) { + delivery[key] = recipient[key]; + } + }); + + documents.push(delivery); + } + + db.senderDb.collection(config.sender.collection).insertMany(documents, { + w: 1, + ordered: false + }, err => { + if (err) { + return callback(err); + } + + callback(null, envelope); + }); }); }); }); - messageSplitter.pipe(dkimStream); return messageSplitter; }; diff --git a/lib/plugins.js b/lib/plugins.js index 2fc5102f..48d01ad0 100644 --- a/lib/plugins.js +++ b/lib/plugins.js @@ -2,11 +2,143 @@ const config = require('wild-config'); const pathlib = require('path'); +const log = require('npmlog'); +const db = require('./db'); const WD_PATH = pathlib.join(__dirname, '..'); -const CONFIG_PATH = pathlib.join(__dirname, '..'); +const CONFIG_PATH = config.configDirectory || WD_PATH; -module.exports = next => { - console.log(config); - setImmediate(next); +const hooks = new Map(); + +class PluginInstance { + constructor(key, config) { + this.db = db; + + this.key = key; + this.config = config || {}; + + this.logger = {}; + ['silly', 'verbose', 'info', 'http', 'warn', 'error', 'debug', 'err'].forEach(level => { + this.logger[level] = (...args) => { + switch (level) { + case 'debug': + level = 'verbose'; + break; + case 'err': + level = 'error'; + break; + } + log[level]('[' + key + ']', ...args); + }; + }); + } + + addHook(hook, handler) { + hook = (hook || '') + .toString() + .replace(/\s+/g, '') + .toLowerCase(); + if (!hook) { + return; + } + if (!hooks.has(hook)) { + hooks.set(hook, []); + } + hooks.get(hook).push({ plugin: this, handler }); + } + + init(done) { + if (!this.config.path) { + this.logger.debug('Plugin path not provided, skipping'); + return setImmediate(done); + } + try { + let pluginPath = this.config.path.replace(/\$WD/g, WD_PATH).replace(/\$CONFIG/g, CONFIG_PATH); + this.module = require(pluginPath); //eslint-disable-line global-require + } catch (E) { + this.logger.error('Failed to load plugin. %s', E.message); + return setImmediate(done); + } + + if (typeof this.module.init !== 'function') { + this.logger.debug('Init method not found'); + return setImmediate(done); + } + + try { + return this.module.init(this, err => { + if (err) { + this.logger.error('Initialization resulted with an error. %s', err.message); + } else { + this.logger.debug('Plugin "%s" initialized', this.module.title || this.key); + } + return setImmediate(done); + }); + } catch (E) { + this.logger.error('Failed executing init method. %s', E.message); + return setImmediate(done); + } + } +} + +module.exports.init = next => { + let keys = Object.keys(config.plugins || {}); + + let pos = 0; + let loadNextPlugin = () => { + if (pos >= keys.length) { + return setImmediate(next); + } + let key = keys[pos++]; + if (!config.plugins[key].enabled) { + return setImmediate(loadNextPlugin); + } + let plugin = new PluginInstance(key, config.plugins[key]); + plugin.init(loadNextPlugin); + }; + setImmediate(loadNextPlugin); +}; + +module.exports.runHooks = (hook, ...args) => { + let next = args.pop(); + + hook = (hook || '') + .toString() + .replace(/\s+/g, '') + .toLowerCase(); + + if (!hook || !hooks.has(hook)) { + return setImmediate(next); + } + + let handlers = hooks.get(hook); + let pos = 0; + let processHandler = () => { + if (pos >= handlers.length) { + return setImmediate(next); + } + let entry = handlers[pos++]; + let returned = false; + try { + entry.handler(...args, err => { + if (returned) { + return; + } + returned = true; + + if (err) { + entry.plugin.logger.error('Failed processing hook %s. %s', hook, err.message); + } + setImmediate(processHandler); + }); + } catch (E) { + if (returned) { + return; + } + returned = true; + entry.plugin.logger.error('Failed processing hook %s. %s', hook, E.message); + setImmediate(processHandler); + } + }; + setImmediate(processHandler); }; diff --git a/plugins/example.js b/plugins/example.js new file mode 100644 index 00000000..734d2fcf --- /dev/null +++ b/plugins/example.js @@ -0,0 +1,15 @@ +'use strict'; + +module.exports.title = 'Example Plugin'; + +module.exports.init = (app, done) => { + // do your initialization stuff here + + // init hook is called immediatelly after server is started + app.addHook('init', next => { + app.logger.info('Example plugin initialized. Value1=%s', app.config.value1); + next(); + }); + + setImmediate(done); +}; diff --git a/worker.js b/worker.js index 1a1640ce..d0d4ae2a 100644 --- a/worker.js +++ b/worker.js @@ -81,13 +81,16 @@ db.connect(err => { } } - plugins(err => { + plugins.init(err => { if (err) { log.error('App', 'Failed to start plugins'); errors.notify(err); return setTimeout(() => process.exit(1), 3000); } - log.info('App', 'All servers started, ready to process some mail'); + + plugins.runHooks('init', () => { + log.info('App', 'All servers started, ready to process some mail'); + }); }); }); });