Refactored journal updates

This commit is contained in:
Andris Reinman 2017-04-12 21:59:30 +03:00
parent 36f3d7eb8d
commit 963656ad2d
2 changed files with 62 additions and 67 deletions

View file

@ -138,33 +138,33 @@ class ImapNotifier extends EventEmitter {
return callback(null, false);
}
let modseqsNeeded = entries.length;
entries.forEach(entry => {
if (entry.modseq) {
modseqsNeeded--;
}
entry.created = new Date();
});
let mailbox;
if (user && typeof user === 'object' && user._id) {
mailbox = user;
user = false;
}
let mailboxQuery = mailbox ? {
_id: mailbox._id
} : {
user,
path
};
// find list of message ids that need to be updated
let updated = entries.filter(entry => !entry.modseq && entry.message).map(entry => entry.message);
let getMailbox = next => {
if (modseqsNeeded) {
let mailbox;
if (user && typeof user === 'object' && user._id) {
mailbox = user;
user = false;
}
let mailboxQuery = mailbox ? {
_id: mailbox._id
} : {
user,
path
};
if (updated.length) {
// provision new modseq value
return this.database.collection('mailboxes').findOneAndUpdate(mailboxQuery, {
$inc: {
modifyIndex: modseqsNeeded
modifyIndex: 1
}
}, {}, (err, item) => {
}, {
returnOriginal: false
}, (err, item) => {
if (err) {
return callback(err);
}
@ -178,6 +178,19 @@ class ImapNotifier extends EventEmitter {
this.database.collection('mailboxes').findOne(mailboxQuery, next);
};
// final action to push entries to journal
let pushToJournal = () => {
this.database.collection('journal').insertMany(entries, {
w: 1,
ordered: false
}, (err, r) => {
if (err) {
return callback(err);
}
return callback(null, r.insertedCount);
});
};
getMailbox((err, mailbox) => {
if (err) {
return callback(err);
@ -186,52 +199,32 @@ class ImapNotifier extends EventEmitter {
return callback(null, new Error('Selected mailbox does not exist'));
}
let startIndex = mailbox.modifyIndex;
let modseq = mailbox.modifyIndex;
let created = new Date();
entries.forEach(entry => {
entry.modseq = entry.modseq || modseq;
entry.created = entry.created || created;
});
let updated = 0;
let updateNext = () => {
if (updated >= entries.length) {
return this.database.collection('journal').insertMany(entries, {
w: 1,
ordered: false
}, (err, r) => {
if (err) {
return callback(err);
}
return callback(null, r.insertedCount);
});
}
let entry = entries[updated++];
let setModseq = !entry.modseq;
entry.mailbox = mailbox._id;
if (setModseq) {
entry.modseq = ++startIndex;
}
if (entry.message && setModseq) {
this.database.collection('messages').findOneAndUpdate({
_id: entry.message,
modseq: {
$lt: entry.modseq
}
}, {
$set: {
modseq: entry.modseq
}
}, {}, err => {
if (err) {
this.logger.error('Error updating modseq for message %s. %s', entry.message, err.message);
}
updateNext();
});
} else {
updateNext();
}
};
updateNext();
if (updated.length) {
this.database.collection('messages').updateMany({
_id: {
$in: updated
}
}, {
// only update modseq if the new value is larger than old one
$max: {
modseq
}
}, err => {
if (err) {
this.logger.error('Error updating modseq for messages. %s', err.message);
}
pushToJournal();
});
} else {
pushToJournal();
}
});
}

View file

@ -1,5 +1,7 @@
'use strict';
// Simple LMTP server that accepts all messages for valid recipients
const config = require('config');
const log = require('npmlog');
const SMTPServer = require('smtp-server').SMTPServer;