Lock journaling when pushing new messages to minimize risks of races

when mushing multiple messages at once
This commit is contained in:
Andris Reinman 2017-03-19 15:57:53 +02:00
parent 1b0b0eab50
commit a0851a93c0
6 changed files with 189 additions and 75 deletions

View file

@ -1,3 +1,5 @@
/* eslint no-console: 0 */
'use strict';
const recipient = process.argv[2];
@ -13,7 +15,7 @@ const nodemailer = require('nodemailer');
const transporter = nodemailer.createTransport({
host: 'localhost',
port: config.smtp.port,
logger: true
logger: false
});
transporter.sendMail({
@ -30,4 +32,13 @@ transporter.sendMail({
path: __dirname + '/swan.jpg',
filename: 'swän.jpg'
}]
}, (err, info) => {
if (err && err.response) {
console.log('Message failed: %s', err.response);
} else if (err) {
console.log(err);
} else {
console.log(info);
}
});
}

View file

@ -30,12 +30,52 @@ class ImapNotifier extends EventEmitter {
this._listeners = new EventEmitter();
this._listeners.setMaxListeners(0);
this.publishTimer = false;
let publishTimers = new Map();
let scheduleDataEvent = ev => {
let data;
let fire = () => {
clearTimeout(data.timeout);
publishTimers.delete(ev);
this._listeners.emit(ev);
};
if (publishTimers.has(ev)) {
data = publishTimers.get(ev) || {};
clearTimeout(data.timeout);
data.count++;
if (data.initial < Date.now() - 1000) {
// if the event has been held back already for a second, the fire immediatelly
return fire();
}
} else {
// initialize new event object
data = {
ev,
count: 1,
initial: Date.now(),
timeout: null
};
}
data.timeout = setTimeout(fire, 100);
data.timeout.unref();
if (!publishTimers.has(ev)) {
publishTimers.set(ev, data);
}
};
this.subsriber.on('message', (channel, message) => {
if (channel === 'wd_events') {
try {
let data = JSON.parse(message);
this._listeners.emit(data.e, data.p);
if (data.e && !data.p) {
scheduleDataEvent(data.e);
} else {
this._listeners.emit(data.e, data.p);
}
} catch (E) {
//
}
@ -98,28 +138,54 @@ class ImapNotifier extends EventEmitter {
return callback(null, false);
}
let modseqsNeeded = entries.length;
entries.forEach(entry => {
if (entry.modseq) {
modseqsNeeded--;
}
entry.created = new Date();
});
this.database.collection('mailboxes').findOneAndUpdate({
let mailbox;
if (username && typeof username === 'object' && username._id) {
mailbox = username;
username = false;
}
let mailboxQuery = mailbox ? {
_id: mailbox._id
} : {
username,
path
}, {
$inc: {
modifyIndex: entries.length
};
let getMailbox = next => {
if (modseqsNeeded) {
return this.database.collection('mailboxes').findOneAndUpdate(mailboxQuery, {
$inc: {
modifyIndex: modseqsNeeded
}
}, {}, (err, item) => {
if (err) {
return callback(err);
}
next(null, item && item.value);
});
}
}, {}, (err, item) => {
if (mailbox) {
return next(null, mailbox);
}
this.database.collection('mailboxes').findOne(mailboxQuery, next);
};
getMailbox((err, mailbox) => {
if (err) {
return callback(err);
}
if (!item || !item.value) {
// was not able to acquire a lock
if (!mailbox) {
return callback(null, new Error('Selected mailbox does not exist'));
}
let mailbox = item.value;
let startIndex = mailbox.modifyIndex;
let updated = 0;
@ -137,11 +203,14 @@ class ImapNotifier extends EventEmitter {
}
let entry = entries[updated++];
let setModseq = !!entry.modseq;
entry.mailbox = mailbox._id;
entry.modseq = ++startIndex;
if (!setModseq) {
entry.modseq = ++startIndex;
}
if (entry.message) {
if (entry.message && setModseq) {
this.database.collection('messages').findOneAndUpdate({
_id: entry.message,
modseq: {
@ -210,6 +279,8 @@ class ImapNotifier extends EventEmitter {
modseq: {
$gt: modifyIndex
}
}).sort({
modseq: 1
}).toArray(callback);
});
}

138
imap.js
View file

@ -13,9 +13,10 @@ const bcrypt = require('bcryptjs');
const ObjectID = require('mongodb').ObjectID;
const Indexer = require('./imap-core/lib/indexer/indexer');
const fs = require('fs');
const RedFour = require('redfour');
// Setup server
let serverOptions = {
const serverOptions = {
secure: config.imap.secure,
id: {
name: 'test'
@ -36,11 +37,15 @@ if (config.imap.cert) {
serverOptions.cert = fs.readFileSync(config.imap.cert);
}
let server = new IMAPServer(serverOptions);
const server = new IMAPServer(serverOptions);
const redlock = new RedFour({
redis: config.redis,
namespace: 'wildduck'
});
let database;
server.onAuth = function (login, session, callback) {
let username = (login.username || '').toString().trim();
@ -1171,77 +1176,98 @@ server.addToMailbox = (username, path, meta, date, flags, raw, callback) => {
return callback(err);
}
// acquire new UID
database.collection('mailboxes').findOneAndUpdate({
_id: mailbox._id
}, {
$inc: {
uidNext: 1
}
}, {}, (err, item) => {
// calculate size before removing large attachments from mime tree
let size = server.indexer.getSize(mimeTree);
// move large attachments to GridStore
server.indexer.storeAttachments(id, mimeTree, 50 * 1024, err => {
if (err) {
return callback(err);
}
if (!item || !item.value) {
// was not able to acquire a lock
let err = new Error('Mailbox is missing');
err.imapResponse = 'TRYCREATE';
return callback(err);
}
let mailbox = item.value;
// calculate size before removing large attachments from mime tree
let size = server.indexer.getSize(mimeTree);
// move large attachments to GridStore
server.indexer.storeAttachments(id, mimeTree, 50 * 1024, err => {
// Another server might be waiting for the lock like this.
redlock.waitAcquireLock(mailbox._id.toString(), 30 * 1000, 10 * 1000, (err, lock) => {
if (err) {
return callback(err);
}
let internaldate = date && new Date(date) || new Date();
let headerdate = mimeTree.parsedHeader.date && new Date(mimeTree.parsedHeader.date) || false;
if (!headerdate || headerdate.toString() === 'Invalid Date') {
headerdate = internaldate;
if (!lock || !lock.success) {
// did not get a insert lock in 10 seconds
return callback(new Error('The user you are trying to contact is receiving mail at a rate that prevents additional messages from being delivered. Please resend your message at a later time'));
}
let message = {
_id: id,
mailbox: mailbox._id,
uid: mailbox.uidNext,
internaldate,
headerdate,
flags,
size,
meta,
modseq: 0,
mimeTree,
envelope,
bodystructure,
messageId
};
database.collection('messages').insertOne(message, err => {
// acquire new UID+MODSEQ
database.collection('mailboxes').findOneAndUpdate({
_id: mailbox._id
}, {
$inc: {
// allocate bot UID and MODSEQ values so when journal is later sorted by
// modseq then UIDs are always in ascending order
uidNext: 1,
modifyIndex: 1
}
}, (err, item) => {
if (err) {
redlock.releaseLock(lock, () => false);
return callback(err);
}
server.notifier.addEntries(username, path, {
command: 'EXISTS',
uid: message.uid,
message: message._id
}, () => {
if (!item || !item.value) {
// was not able to acquire a lock
let err = new Error('Mailbox is missing');
err.imapResponse = 'TRYCREATE';
redlock.releaseLock(lock, () => false);
return callback(err);
}
let mailbox = item.value;
let internaldate = date && new Date(date) || new Date();
let headerdate = mimeTree.parsedHeader.date && new Date(mimeTree.parsedHeader.date) || false;
if (!headerdate || headerdate.toString() === 'Invalid Date') {
headerdate = internaldate;
}
let message = {
_id: id,
mailbox: mailbox._id,
uid: mailbox.uidNext,
internaldate,
headerdate,
flags,
size,
meta,
modseq: mailbox.modifyIndex + 1,
mimeTree,
envelope,
bodystructure,
messageId
};
database.collection('messages').insertOne(message, err => {
if (err) {
redlock.releaseLock(lock, () => false);
return callback(err);
}
let uidValidity = mailbox.uidValidity;
let uid = message.uid;
server.notifier.fire(username, path);
server.notifier.addEntries(mailbox, false, {
command: 'EXISTS',
uid: message.uid,
message: message._id,
modseq: message.modseq
}, () => {
return callback(null, true, {
uidValidity,
uid
redlock.releaseLock(lock, () => {
server.notifier.fire(username, path);
return callback(null, true, {
uidValidity,
uid
});
});
});
});
});

View file

@ -88,6 +88,11 @@ db.journal.createIndex({
modseq: 1
});
db.journal.createIndex({
mailbox: 1,
modseq: -1
});
db.journal.createIndex({
created: 1
}, {

View file

@ -18,7 +18,7 @@
"grunt-eslint": "^19.0.0",
"grunt-mocha-test": "^0.13.2",
"mocha": "^3.2.0",
"nodemailer": "^3.1.5"
"nodemailer": "^3.1.7"
},
"dependencies": {
"addressparser": "^1.0.1",
@ -30,12 +30,13 @@
"libbase64": "^0.1.0",
"libmime": "^3.1.0",
"mailparser": "^2.0.2",
"mongodb": "^2.2.24",
"mongodb": "^2.2.25",
"nodemailer-fetch": "^2.1.0",
"npmlog": "^4.0.2",
"redis": "^2.6.5",
"redfour": "^1.0.0",
"redis": "^2.7.1",
"restify": "^4.3.0",
"smtp-server": "^2.0.2",
"smtp-server": "^2.0.3",
"toml": "^2.3.2",
"utf7": "^1.0.2",
"uuid": "^3.0.1"

View file

@ -126,7 +126,7 @@ const server = new SMTPServer({
chunklen -= header.length;
if (err) {
log.error('SMTP', err);
return callback(err);
}
storeNext();