This commit is contained in:
Andris Reinman 2017-05-02 16:21:56 +03:00
commit c172fe9779
6 changed files with 207 additions and 179 deletions

View file

@ -67,11 +67,12 @@ module.exports = {
host: '0.0.0.0'
},
// push messages to ZoneMTA queue
forwarder: {
// push messages to ZoneMTA queue for delivery
sender: {
// if false, then no messages are sent
enabled: true,
// which ZoneMTA queue to use
// which ZoneMTA queue to use by default
zone: 'default',
// MongoDB connection url. Do not set if you want to use main database

View file

@ -6,7 +6,7 @@ const mongodb = require('mongodb');
const redis = require('redis');
const MongoClient = mongodb.MongoClient;
module.exports.forwarder = false;
module.exports.senderDb = false;
module.exports.database = false;
module.exports.redis = false;
module.exports.redisConfig = false;
@ -20,21 +20,21 @@ module.exports.connect = callback => {
module.exports.redisConfig = tools.redisConfig(config.redis);
module.exports.redis = redis.createClient(module.exports.redisConfig);
if (!config.forwarder.enabled) {
if (!config.sender.enabled) {
return callback(null, database);
}
if (!config.forwarder.mongo) {
module.exports.forwarder = database;
if (!config.sender.mongo) {
module.exports.senderDb = database;
return callback(null, database);
}
MongoClient.connect(config.forwarder.mongo, (err, forwarderDatabase) => {
MongoClient.connect(config.sender.mongo, (err, forwarderDatabase) => {
if (err) {
database.close();
return callback(err);
}
module.exports.forwarder = forwarderDatabase;
module.exports.senderDb = forwarderDatabase;
return callback(null, database);
});
});

View file

@ -1,187 +1,32 @@
'use strict';
const config = require('config');
const db = require('./db');
const SeqIndex = require('seq-index');
const DkimStream = require('./dkim-stream');
const MessageSplitter = require('./message-splitter');
const seqIndex = new SeqIndex();
const GridFs = require('grid-fs');
let gridstore;
const maildrop = require('./maildrop');
module.exports = (options, callback) => {
if (!config.forwarder.enabled) {
if (!config.sender.enabled) {
return callback(null, false);
}
let id = options.id || seqIndex.get();
let seq = 0;
let documents = [];
let envelope = {
id,
let message = maildrop({
from: options.sender,
to: options.forward,
interface: 'forwarder',
transtype: 'API',
time: Date.now(),
dkim: {
hashAlgo: 'sha256'
}
};
let messageSplitter = new MessageSplitter();
let dkimStream = new DkimStream();
messageSplitter.once('headers', headers => {
envelope.headers = headers.getList();
});
dkimStream.on('hash', bodyHash => {
// store relaxed body hash for signing
envelope.dkim.bodyHash = bodyHash;
envelope.bodySize = dkimStream.byteLength;
});
messageSplitter.once('error', err => dkimStream.emit('error', err));
interface: 'forwarder'
}, callback);
setImmediate(() => {
messageSplitter.pipe(dkimStream);
let pos = 0;
let writeNextChunk = () => {
if (pos >= options.chunks.length) {
return messageSplitter.end();
return message.end();
}
let chunk = options.chunks[pos++];
if (!messageSplitter.write(chunk)) {
return messageSplitter.once('drain', writeNextChunk);
if (!message.write(chunk)) {
return message.once('drain', writeNextChunk);
} else {
setImmediate(writeNextChunk);
}
};
setImmediate(writeNextChunk);
});
store(id, dkimStream, err => {
if (err) {
return callback(err);
}
setMeta(id, envelope, err => {
if (err) {
return removeMessage(id, () => callback(err));
}
for (let i = 0, len = envelope.to.length; i < len; i++) {
let recipient = envelope.to[i];
let deliveryZone = config.forwarder.zone || 'default';
let recipientDomain = recipient.substr(recipient.lastIndexOf('@') + 1).replace(/[\[\]]/g, '');
seq++;
let date = new Date();
let deliverySeq = (seq < 0x100 ? '0' : '') + (seq < 0x10 ? '0' : '') + seq.toString(16);
let delivery = {
id,
seq: deliverySeq,
// Actual delivery data
domain: recipientDomain,
sendingZone: deliveryZone,
// actual recipient address
recipient,
locked: false,
lockTime: 0,
// earliest time to attempt delivery, defaults to now
queued: date,
// queued date might change but created should not
created: date
};
documents.push(delivery);
}
db.forwarder.collection(config.forwarder.collection).
insertMany(documents, {
w: 1,
ordered: false
}, err => {
if (err) {
return callback(err);
}
callback(null, id);
});
});
});
};
function store(id, stream, callback) {
gridstore = gridstore || new GridFs(db.forwarder, config.forwarder.gfs);
let returned = false;
let store = gridstore.createWriteStream('message ' + id, {
fsync: true,
content_type: 'message/rfc822',
metadata: {
created: new Date()
}
});
stream.once('error', err => {
if (returned) {
return;
}
returned = true;
store.once('close', () => {
removeMessage(id, () => callback(err));
});
store.end();
});
store.once('error', err => {
if (returned) {
return;
}
returned = true;
callback(err);
});
store.on('close', () => {
if (returned) {
return;
}
returned = true;
return callback(null, id);
});
stream.pipe(store);
}
function removeMessage(id, callback) {
gridstore.unlink('message ' + id, callback);
}
function setMeta(id, data, callback) {
db.forwarder.collection(config.forwarder.gfs + '.files').findAndModify({
filename: 'message ' + id
}, false, {
$set: {
'metadata.data': data
}
}, {}, err => {
if (err) {
return callback(err);
}
return callback();
});
}

181
lib/maildrop.js Normal file
View file

@ -0,0 +1,181 @@
'use strict';
const config = require('config');
const db = require('./db');
const SeqIndex = require('seq-index');
const DkimStream = require('./dkim-stream');
const MessageSplitter = require('./message-splitter');
const seqIndex = new SeqIndex();
const GridFs = require('grid-fs');
let gridstore;
module.exports = (options, callback) => {
if (!config.sender.enabled) {
return callback(null, false);
}
let id = options.id || seqIndex.get();
let seq = 0;
let documents = [];
let envelope = {
id,
from: options.from,
to: Array.isArray(options.to) ? options.to : [].concat(options.to || []),
interface: options.interface || 'maildrop',
transtype: 'API',
time: Date.now(),
dkim: {
hashAlgo: 'sha256'
}
};
if (!envelope.to.length) {
return callback(null, false);
}
let messageSplitter = new MessageSplitter();
let dkimStream = new DkimStream();
messageSplitter.once('headers', headers => {
envelope.headers = headers.getList();
});
dkimStream.on('hash', bodyHash => {
// store relaxed body hash for signing
envelope.dkim.bodyHash = bodyHash;
envelope.bodySize = dkimStream.byteLength;
});
messageSplitter.once('error', err => dkimStream.emit('error', err));
store(id, dkimStream, err => {
if (err) {
return callback(err);
}
setMeta(id, envelope, err => {
if (err) {
return removeMessage(id, () => callback(err));
}
let date = new Date();
for (let i = 0, len = envelope.to.length; i < len; i++) {
let recipient = envelope.to[i];
let deliveryZone = options.zone || config.sender.zone || 'default';
let recipientDomain = recipient.substr(recipient.lastIndexOf('@') + 1).replace(/[\[\]]/g, '');
seq++;
let deliverySeq = (seq < 0x100 ? '0' : '') + (seq < 0x10 ? '0' : '') + seq.toString(16);
let delivery = {
id,
seq: deliverySeq,
// Actual delivery data
domain: recipientDomain,
sendingZone: deliveryZone,
// actual recipient address
recipient,
locked: false,
lockTime: 0,
// earliest time to attempt delivery, defaults to now
queued: date,
// queued date might change but created should not
created: date
};
documents.push(delivery);
}
db.senderDb.collection(config.sender.collection).
insertMany(documents, {
w: 1,
ordered: false
}, err => {
if (err) {
return callback(err);
}
callback(null, id);
});
});
});
messageSplitter.pipe(dkimStream);
return messageSplitter;
};
function store(id, stream, callback) {
gridstore = gridstore || new GridFs(db.senderDb, config.sender.gfs);
let returned = false;
let store = gridstore.createWriteStream('message ' + id, {
fsync: true,
content_type: 'message/rfc822',
metadata: {
created: new Date()
}
});
stream.once('error', err => {
if (returned) {
return;
}
returned = true;
store.once('close', () => {
removeMessage(id, () => callback(err));
});
store.end();
});
store.once('error', err => {
if (returned) {
return;
}
returned = true;
callback(err);
});
store.on('close', () => {
if (returned) {
return;
}
returned = true;
return callback(null, id);
});
stream.pipe(store);
}
function removeMessage(id, callback) {
gridstore.unlink('message ' + id, callback);
}
function setMeta(id, data, callback) {
db.senderDb.collection(config.sender.gfs + '.files').findAndModify({
filename: 'message ' + id
}, false, {
$set: {
'metadata.data': data
}
}, {}, err => {
if (err) {
return callback(err);
}
return callback();
});
}

View file

@ -161,7 +161,7 @@ const serverOptions = {
let mailboxQueryValue = 'INBOX';
let filters = (user.filters || []).concat(spamHeader ? {
id: 'wdspam',
id: 'SPAM',
query: {
headers: {
[spamHeader]: 'Yes'
@ -209,7 +209,8 @@ const serverOptions = {
forwardTargets.add(user.forward);
}
if (!forwardTargets.size) {
// never forward messages marked as spam
if (!forwardTargets.size || filterActions.get('spam')) {
return setImmediate(done);
}
@ -235,9 +236,9 @@ const serverOptions = {
forwardMessage((err, id) => {
if (err) {
log.error('LMTP', 'FRWRDFAIL error=%s', err.message);
log.error('LMTP', '%s FRWRDFAIL from=%s to=%s target=%s error=%s', prepared.id.toString(), sender, recipient, Array.from(forwardTargets).join(','), err.message);
} else if (id) {
log.silly('LMTP', 'FRWRDOK id=%s', id);
log.silly('LMTP', '%s FRWRDOK id=%s from=%s to=%s target=%s', prepared.id.toString(), id, sender, recipient, Array.from(forwardTargets).join(','));
}
if (filterActions.get('delete')) {

View file

@ -16,7 +16,7 @@
"grunt-cli": "^1.2.0",
"grunt-eslint": "^19.0.0",
"grunt-mocha-test": "^0.13.2",
"mocha": "^3.2.0"
"mocha": "^3.3.0"
},
"dependencies": {
"bcryptjs": "^2.4.3",
@ -24,14 +24,14 @@
"generate-password": "^1.3.0",
"grid-fs": "^1.0.1",
"html-to-text": "^3.2.0",
"iconv-lite": "^0.4.15",
"iconv-lite": "^0.4.17",
"joi": "^10.4.1",
"libbase64": "^0.1.0",
"libmime": "^3.1.0",
"libqp": "^1.1.0",
"mailsplit": "^4.0.2",
"marked": "^0.3.6",
"mongodb": "^2.2.25",
"mongodb": "^2.2.26",
"node-redis-scripty": "0.0.5",
"nodemailer": "^4.0.1",
"npmlog": "^4.0.2",