do not use locking when adding messages to mailbox

This commit is contained in:
Andris Reinman 2017-05-24 21:56:12 +03:00
parent 6ab1aba26a
commit 60300f82a1
3 changed files with 127 additions and 161 deletions

View file

@ -1764,9 +1764,6 @@ function clearExpiredMessages() {
gcTimeout.unref();
return;
} else if (!lock.success) {
server.logger.debug({
tnx: 'gc'
}, 'Failed to acquire lock error=%s', 'Lock already exists');
gcTimeout = setTimeout(clearExpiredMessages, GC_INTERVAL);
gcTimeout.unref();
return;
@ -1779,10 +1776,6 @@ function clearExpiredMessages() {
tnx: 'gc',
err
}, 'Failed to release lock error=%s', err.message);
} else {
server.logger.debug({
tnx: 'gc'
}, 'Released lock');
}
gcTimeout = setTimeout(clearExpiredMessages, GC_INTERVAL);
gcTimeout.unref();

View file

@ -4,7 +4,6 @@ const config = require('config');
const redis = require('redis');
const uuidV1 = require('uuid/v1');
const ObjectID = require('mongodb').ObjectID;
const RedFour = require('redfour');
const Indexer = require('../imap-core/lib/indexer/indexer');
const ImapNotifier = require('./imap-notifier');
const tools = require('./tools');
@ -44,10 +43,6 @@ class MessageHandler {
pushOnly: true
});
this.counters = counters(redis.createClient(this.redis));
this.redlock = new RedFour({
redis: this.redis,
namespace: 'wildduck'
});
}
getMailbox(options, callback) {
@ -195,7 +190,12 @@ class MessageHandler {
message.text = message.text.length <= maxTextLength ? message.text : message.text.substr(0, maxTextLength);
message.intro = message.text.replace(/\s+/g, ' ').trim();
if (message.intro.length > 128) {
message.intro = message.intro.substr(0, 128) + '…';
let intro = message.intro.substr(0, 128);
let lastSp = intro.lastIndexOf(' ');
if (lastSp > 0) {
intro = intro.substr(0, lastSp);
}
message.intro = intro + '…';
}
}
@ -217,99 +217,84 @@ class MessageHandler {
}).filter(html => html);
}
// Another server might be waiting for the lock
this.redlock.waitAcquireLock(mailbox._id.toString(), 30 * 1000, 10 * 1000, (err, lock) => {
this.database.collection('users').findOneAndUpdate({
_id: mailbox.user
}, {
$inc: {
storageUsed: size
}
}, err => {
if (err) {
return cleanup(err);
}
if (!lock || !lock.success) {
// did not get a insert lock in 10 seconds
return cleanup(new Error('The user you are trying to contact is receiving mail at a rate that prevents additional messages from being delivered. Please resend your message at a later time'));
}
this.database.collection('users').findOneAndUpdate({
_id: mailbox.user
}, {
$inc: {
storageUsed: size
}
}, err => {
if (err) {
this.redlock.releaseLock(lock, () => false);
return cleanup(err);
}
let rollback = err => {
this.database.collection('users').findOneAndUpdate({
_id: mailbox.user
}, {
$inc: {
storageUsed: -size
}
}, () => {
this.redlock.releaseLock(lock, () => cleanup(err));
});
};
// acquire new UID+MODSEQ
this.database.collection('mailboxes').findOneAndUpdate({
_id: mailbox._id
let rollback = err => {
this.database.collection('users').findOneAndUpdate({
_id: mailbox.user
}, {
$inc: {
// allocate bot UID and MODSEQ values so when journal is later sorted by
// modseq then UIDs are always in ascending order
uidNext: 1,
modifyIndex: 1
storageUsed: -size
}
}, (err, item) => {
}, () => {
cleanup(err);
});
};
// acquire new UID+MODSEQ
this.database.collection('mailboxes').findOneAndUpdate({
_id: mailbox._id
}, {
$inc: {
// allocate bot UID and MODSEQ values so when journal is later sorted by
// modseq then UIDs are always in ascending order
uidNext: 1,
modifyIndex: 1
}
}, (err, item) => {
if (err) {
return rollback(err);
}
if (!item || !item.value) {
// was not able to acquire a lock
let err = new Error('Mailbox is missing');
err.imapResponse = 'TRYCREATE';
return rollback(err);
}
let mailbox = item.value;
// updated message object by setting mailbox specific values
message.mailbox = mailbox._id;
message.user = mailbox.user;
message.uid = mailbox.uidNext;
message.modseq = mailbox.modifyIndex + 1;
this.database.collection('messages').insertOne(message, err => {
if (err) {
return rollback(err);
}
if (!item || !item.value) {
// was not able to acquire a lock
let err = new Error('Mailbox is missing');
err.imapResponse = 'TRYCREATE';
return rollback(err);
let uidValidity = mailbox.uidValidity;
let uid = message.uid;
if (options.session && options.session.selected && options.session.selected.mailbox === mailbox.path) {
options.session.writeStream.write(options.session.formatResponse('EXISTS', message.uid));
}
let mailbox = item.value;
// updated message object by setting mailbox specific values
message.mailbox = mailbox._id;
message.user = mailbox.user;
message.uid = mailbox.uidNext;
message.modseq = mailbox.modifyIndex + 1;
this.database.collection('messages').insertOne(message, err => {
if (err) {
return rollback(err);
}
let uidValidity = mailbox.uidValidity;
let uid = message.uid;
if (options.session && options.session.selected && options.session.selected.mailbox === mailbox.path) {
options.session.writeStream.write(options.session.formatResponse('EXISTS', message.uid));
}
this.notifier.addEntries(mailbox, false, {
command: 'EXISTS',
uid: message.uid,
ignore: options.session && options.session.id,
message: message._id,
modseq: message.modseq
}, () => {
this.redlock.releaseLock(lock, () => {
this.notifier.fire(mailbox.user, mailbox.path);
return cleanup(null, true, {
uidValidity,
uid,
id: message._id
});
});
this.notifier.addEntries(mailbox, false, {
command: 'EXISTS',
uid: message.uid,
ignore: options.session && options.session.id,
message: message._id,
modseq: message.modseq
}, () => {
this.notifier.fire(mailbox.user, mailbox.path);
return cleanup(null, true, {
uidValidity,
uid,
id: message._id
});
});
});
@ -345,94 +330,82 @@ class MessageHandler {
// As duplicate message was found, update UID, MODSEQ and FLAGS
// Ensure sequential UID by locking mailbox
this.redlock.waitAcquireLock(mailboxId.toString(), 30 * 1000, 10 * 1000, (err, lock) => {
// acquire new UID+MODSEQ
this.database.collection('mailboxes').findOneAndUpdate({
_id: mailboxId
}, {
$inc: {
// allocate bot UID and MODSEQ values so when journal is later sorted by
// modseq then UIDs are always in ascending order
uidNext: 1,
modifyIndex: 1
}
}, {
returnOriginal: true
}, (err, item) => {
if (err) {
return callback(err);
}
if (!lock || !lock.success) {
// did not get a insert lock in 10 seconds
return callback(new Error('Failed to acquire lock'));
if (!item || !item.value) {
// was not able to acquire a lock
let err = new Error('Mailbox is missing');
err.imapResponse = 'TRYCREATE';
return callback(err);
}
// acquire new UID+MODSEQ
this.database.collection('mailboxes').findOneAndUpdate({
_id: mailboxId
let mailbox = item.value;
let uid = mailbox.uidNext;
let modseq = mailbox.modifyIndex + 1;
this.database.collection('messages').findOneAndUpdate({
_id: existing._id
}, {
$inc: {
// allocate bot UID and MODSEQ values so when journal is later sorted by
// modseq then UIDs are always in ascending order
uidNext: 1,
modifyIndex: 1
$set: {
uid,
modseq,
flags: message.flags
}
}, {
returnOriginal: true
returnOriginal: false
}, (err, item) => {
if (err) {
return this.redlock.releaseLock(lock, () => callback(err));
return callback(err);
}
if (!item || !item.value) {
// was not able to acquire a lock
let err = new Error('Mailbox is missing');
err.imapResponse = 'TRYCREATE';
return this.redlock.releaseLock(lock, () => callback(err));
// message was not found for whatever reason
return callback();
}
let mailbox = item.value;
let uid = mailbox.uidNext;
let modseq = mailbox.modifyIndex + 1;
let updated = item.value;
this.database.collection('messages').findOneAndUpdate({
_id: existing._id
}, {
$set: {
uid,
modseq,
flags: message.flags
}
}, {
returnOriginal: false
}, (err, item) => {
if (err) {
return this.redlock.releaseLock(lock, () => callback(err));
}
if (options.session && options.session.selected && options.session.selected.mailbox === mailbox.path) {
options.session.writeStream.write(options.session.formatResponse('EXPUNGE', existing.uid));
}
if (!item || !item.value) {
// message was not found for whatever reason
return this.redlock.releaseLock(lock, callback);
}
if (options.session && options.session.selected && options.session.selected.mailbox === mailbox.path) {
options.session.writeStream.write(options.session.formatResponse('EXISTS', updated.uid));
}
this.notifier.addEntries(mailbox, false, {
command: 'EXPUNGE',
ignore: options.session && options.session.id,
uid: existing.uid,
message: existing._id
}, () => {
let updated = item.value;
if (options.session && options.session.selected && options.session.selected.mailbox === mailbox.path) {
options.session.writeStream.write(options.session.formatResponse('EXPUNGE', existing.uid));
}
if (options.session && options.session.selected && options.session.selected.mailbox === mailbox.path) {
options.session.writeStream.write(options.session.formatResponse('EXISTS', updated.uid));
}
this.notifier.addEntries(mailbox, false, {
command: 'EXPUNGE',
command: 'EXISTS',
uid: updated.uid,
ignore: options.session && options.session.id,
uid: existing.uid,
message: existing._id
message: updated._id,
modseq: updated.modseq
}, () => {
this.notifier.addEntries(mailbox, false, {
command: 'EXISTS',
uid: updated.uid,
ignore: options.session && options.session.id,
message: updated._id,
modseq: updated.modseq
}, () => {
this.notifier.fire(mailbox.user, mailbox.path);
return this.redlock.releaseLock(lock, () => callback(null, true, {
uidValidity: mailbox.uidValidity,
uid,
id: existing._id
}));
this.notifier.fire(mailbox.user, mailbox.path);
return callback(null, true, {
uidValidity: mailbox.uidValidity,
uid,
id: existing._id
});
});
});

View file

@ -16,7 +16,7 @@
"grunt-cli": "^1.2.0",
"grunt-eslint": "^19.0.0",
"grunt-mocha-test": "^0.13.2",
"mocha": "^3.4.1"
"mocha": "^3.4.2"
},
"dependencies": {
"addressparser": "^1.0.1",
@ -34,7 +34,7 @@
"node-redis-scripty": "0.0.5",
"nodemailer": "^4.0.1",
"npmlog": "^4.1.0",
"qrcode": "^0.8.1",
"qrcode": "^0.8.2",
"redfour": "^1.0.0",
"redis": "^2.7.1",
"restify": "^4.3.0",