Emit counters

This commit is contained in:
Andris Reinman 2017-07-20 16:10:36 +03:00
parent cf20ada049
commit a1936db9e2
10 changed files with 192 additions and 69 deletions

158
api.js
View file

@ -20,7 +20,7 @@ const serverOptions = {
strictRouting: true,
formatters: {
'application/json; q=0.4': (req, res, body) => {
let data = body ? JSON.stringify(body, false, 2) : 'null';
let data = body ? JSON.stringify(body, false, 2) + '\n' : 'null';
res.setHeader('Content-Length', Buffer.byteLength(data));
return data;
}
@ -123,7 +123,7 @@ server.get({ name: 'users', path: '/users' }, (req, res, next) => {
address: true,
storageUsed: true,
quota: true,
setup: true
disabled: true
},
sortAscending: true
};
@ -152,6 +152,7 @@ server.get({ name: 'users', path: '/users' }, (req, res, next) => {
let nextUrl = result.hasNext ? server.router.render('users', {}, { next: result.previous, limit, query: query || '', page: page + 1 }) : false;
let response = {
success: true,
query,
total,
page,
@ -161,12 +162,11 @@ server.get({ name: 'users', path: '/users' }, (req, res, next) => {
id: userData._id.toString(),
username: userData.username,
address: userData.address,
storageUsed: Math.max(userData.storageUsed, 0),
quota: {
allowed: Number(userData.quota) || config.maxStorage * 1024 * 1024,
used: Math.max(Number(userData.storageUsed) || 0, 0)
},
activated: userData.setup
disabled: userData.disabled
}))
};
@ -258,6 +258,7 @@ server.get({ name: 'addresses', path: '/addresses' }, (req, res, next) => {
let nextUrl = result.hasNext ? server.router.render('addresses', {}, { next: result.previous, limit, query: query || '', page: page + 1 }) : false;
let response = {
success: true,
query,
total,
page,
@ -887,6 +888,8 @@ server.get('/users/:user', (req, res, next) => {
username: userData.username,
address: userData.address,
language: userData.language,
retention: userData.retention || false,
@ -907,7 +910,8 @@ server.get('/users/:user', (req, res, next) => {
}
},
address: userData.address
activated: userData.activated,
disabled: userData.disabled
});
return next();
@ -1225,49 +1229,30 @@ server.get('/users/:user/mailboxes/:mailbox', (req, res, next) => {
return next();
}
let getCounter = (mailbox, done) => {
db.redis.get('sum:' + mailbox.toString(), (err, sum) => {
if (err) {
return done(err);
}
if (sum !== null) {
return done(null, sum);
}
// calculate sum
db.database.collection('messages').count({ mailbox }, (err, sum) => {
if (err) {
return done(err);
}
// cache calculated sum in redis
db.redis.multi().set('sum:' + mailbox.toString(), sum).expire('sum:' + mailbox.toString(), consts.MAILBOX_COUNTER_TTL).exec(() => {
done(null, sum);
});
});
});
};
let path = mailboxData.path.split('/');
let name = path.pop();
getCounter(mailbox, (err, sum) => {
getMailboxCounter(mailbox, false, (err, total) => {
if (err) {
// ignore
}
res.json({
success: true,
id: mailbox,
name,
path: mailboxData.path,
specialUse: mailboxData.specialUse,
modifyIndex: mailboxData.modifyIndex,
messages: sum
getMailboxCounter(mailbox, 'unseen', (err, unseen) => {
if (err) {
// ignore
}
res.json({
success: true,
id: mailbox,
name,
path: mailboxData.path,
specialUse: mailboxData.specialUse,
modifyIndex: mailboxData.modifyIndex,
total,
unseen
});
return next();
});
});
return next();
});
});
});
@ -1420,16 +1405,22 @@ function formatJournalData(e) {
let response = [];
response.push('data: ' + JSON.stringify(data, false, 2).split('\n').join('\ndata: '));
response.push('id: ' + e._id.toString());
if (e._id) {
response.push('id: ' + e._id.toString());
}
return response.join('\n') + '\n\n';
}
function loadJournalStream(req, res, user, lastEventId, done) {
console.log('READ');
let query = { user };
if (lastEventId) {
query._id = { $gt: lastEventId };
}
let mailboxes = new Set();
let cursor = db.database.collection('journal').find(query).sort({ _id: 1 });
let processed = 0;
let processNext = () => {
@ -1439,11 +1430,47 @@ function loadJournalStream(req, res, user, lastEventId, done) {
}
if (!e) {
return cursor.close(() => {
// delete all attachments that do not have any active links to message objects
done(null, {
lastEventId,
processed
});
if (!mailboxes.size) {
return done(null, {
lastEventId,
processed
});
}
mailboxes = Array.from(mailboxes);
let mailboxPos = 0;
let emitCounters = () => {
if (mailboxPos >= mailboxes.length) {
return done(null, {
lastEventId,
processed
});
}
let mailbox = mailboxes[mailboxPos++];
getMailboxCounter(mailbox, false, (err, total) => {
if (err) {
// ignore
}
getMailboxCounter(mailbox, 'unseen', (err, unseen) => {
if (err) {
// ignore
}
res.write(
formatJournalData({
command: 'COUNTERS',
_id: lastEventId,
mailbox,
total,
unseen
})
);
setImmediate(emitCounters);
});
});
};
emitCounters();
});
}
@ -1454,6 +1481,16 @@ function loadJournalStream(req, res, user, lastEventId, done) {
return processNext();
}
switch (e.command) {
case 'FETCH':
case 'EXISTS':
case 'EXPUNGE':
if (e.mailbox) {
mailboxes.add(e.mailbox.toString());
}
break;
}
res.write(formatJournalData(e));
processed++;
@ -1464,6 +1501,35 @@ function loadJournalStream(req, res, user, lastEventId, done) {
processNext();
}
function getMailboxCounter(mailbox, type, done) {
let prefix = type ? type : 'sum';
db.redis.get(prefix + ':' + mailbox.toString(), (err, sum) => {
if (err) {
return done(err);
}
if (sum !== null) {
return done(null, Number(sum));
}
// calculate sum
let query = { mailbox };
if (type) {
query[type] = true;
}
db.database.collection('messages').count(query, (err, sum) => {
if (err) {
return done(err);
}
// cache calculated sum in redis
db.redis.multi().set(prefix + ':' + mailbox.toString(), sum).expire(prefix + ':' + mailbox.toString(), consts.MAILBOX_COUNTER_TTL).exec(() => {
done(null, sum);
});
});
});
}
module.exports = done => {
if (!config.imap.enabled) {
return setImmediate(() => done(null, false));

View file

@ -198,7 +198,8 @@ function clearExpiredMessages() {
uid: true,
size: true,
map: true,
magic: true
magic: true,
unseen: true
});
let deleted = 0;

View file

@ -144,7 +144,8 @@ module.exports = server => (path, update, session, callback) => {
{
command: 'EXISTS',
uid: message.uid,
message: message._id
message: message._id,
unseen: message.unseen
},
processNext
);
@ -173,7 +174,8 @@ module.exports = server => (path, update, session, callback) => {
{
command: 'EXISTS',
uid: message.uid,
message: message._id
message: message._id,
unseen: message.unseen
},
processNext
);

View file

@ -36,7 +36,8 @@ module.exports = server => (path, update, session, callback) => {
uid: true,
size: true,
map: true,
magic: true
magic: true,
unseen: true
})
.sort([['uid', 1]]);
@ -102,7 +103,8 @@ module.exports = server => (path, update, session, callback) => {
command: 'EXPUNGE',
ignore: session.id,
uid: message.uid,
message: message._id
message: message._id,
unseen: message.unseen
},
processNext
);
@ -132,7 +134,8 @@ module.exports = server => (path, update, session, callback) => {
command: 'EXPUNGE',
ignore: session.id,
uid: message.uid,
message: message._id
message: message._id,
unseen: message.unseen
},
processNext
);

View file

@ -175,7 +175,8 @@ module.exports = server => (path, options, session, callback) => {
ignore: session.id,
uid: message.uid,
flags: message.flags,
message: message._id
message: message._id,
unseenChange: true
});
if (updateEntries.length >= consts.BULK_BATCH_SIZE) {

View file

@ -28,6 +28,8 @@ module.exports = server => (path, update, session, callback) => {
return callback(null, 'NONEXISTENT');
}
let unseenChange = update.value.includes('\\Seen');
let query = {
mailbox: mailbox._id
};
@ -277,7 +279,8 @@ module.exports = server => (path, update, session, callback) => {
ignore: session.id,
uid: message.uid,
flags: message.flags,
message: message._id
message: message._id,
unseenChange
});
if (updateEntries.length >= consts.BULK_BATCH_SIZE) {

View file

@ -304,14 +304,29 @@ class ImapNotifier extends EventEmitter {
(Array.isArray(entries) ? entries : [].concat(entries || [])).forEach(entry => {
let m = entry.mailbox.toString();
if (!counters.has(m)) {
counters.set(m, 0);
counters.set(m, { total: 0, unseen: 0, unseenChange: false });
}
switch (entry && entry.command) {
case 'EXISTS':
counters.set(m, counters.get(m) + 1);
counters.get(m).total += 1;
if (entry.unseen) {
counters.get(m).unseen += 1;
}
break;
case 'EXPUNGE':
counters.set(m, counters.get(m) - 1);
counters.get(m).total -= 1;
if (entry.unseen) {
counters.get(m).unseen -= 1;
}
break;
case 'FETCH':
if (entry.unseen) {
// either increase or decrese
counters.get(m).unseen += typeof entry.unseen === 'number' ? entry.unseen : 1;
} else if (entry.unseenChange) {
// volatile change, just clear the cache
counters.get(m).unseenChange = true;
}
break;
}
});
@ -329,7 +344,17 @@ class ImapNotifier extends EventEmitter {
let mailbox = row[0];
let delta = row[1];
this.cachedcounter('sum:' + mailbox, delta, consts.MAILBOX_COUNTER_TTL, updateCounter);
this.cachedcounter('total:' + mailbox, delta.total, consts.MAILBOX_COUNTER_TTL, () => {
if (delta.unseenChange) {
// Message info changed in mailbox, so just te be sure, clear the unseen counter as well
// Unseen counter is more volatile and also easier to count (usually only a small number on indexed messages)
this.publisher.del('unseen:' + mailbox, updateCounter);
} else if (delta.unseen) {
this.cachedcounter('unseen:' + mailbox, delta.unseen, consts.MAILBOX_COUNTER_TTL, updateCounter);
} else {
setImmediate(updateCounter);
}
});
};
updateCounter();

View file

@ -293,7 +293,8 @@ class MessageHandler {
uid: message.uid,
ignore: options.session && options.session.id,
message: message._id,
modseq: message.modseq
modseq: message.modseq,
unseen: message.unseen
},
() => {
this.notifier.fire(mailbox.user, mailbox.path);
@ -409,7 +410,8 @@ class MessageHandler {
command: 'EXPUNGE',
ignore: options.session && options.session.id,
uid: existing.uid,
message: existing._id
message: existing._id,
unseen: existing.unseen
},
() => {
this.notifier.addEntries(
@ -420,7 +422,8 @@ class MessageHandler {
uid: updated.uid,
ignore: options.session && options.session.id,
message: updated._id,
modseq: updated.modseq
modseq: updated.modseq,
unseen: updated.unseen
},
() => {
this.notifier.fire(mailbox.user, mailbox.path);
@ -520,7 +523,8 @@ class MessageHandler {
command: 'EXPUNGE',
ignore: options.session && options.session.id,
uid: message.uid,
message: message._id
message: message._id,
unseen: message.unseen
},
() => {
this.notifier.fire(mailbox.user, mailbox.path);
@ -652,6 +656,8 @@ class MessageHandler {
message.exp = !!target.retention;
message.rdate = Date.now() + (target.retention || 0);
let unseen = message.unseen;
if (options.markAsSeen) {
message.unseen = false;
if (!message.flags.includes('\\Seen')) {
@ -684,13 +690,15 @@ class MessageHandler {
command: 'EXPUNGE',
ignore: options.session && options.session.id,
uid: messageUid,
message: messageId
message: messageId,
unseen
});
existsEntries.push({
command: 'EXISTS',
uid: uidNext,
message: insertId
message: insertId,
unseen: message.unseen
});
if (existsEntries.length >= BULK_BATCH_SIZE) {

View file

@ -246,7 +246,8 @@ class UserHandler {
created: new Date(),
// until setup value is not true, this account is not usable
setup: false
activated: false,
disabled: true
}, err => {
if (err) {
log.error('DB', 'CREATEFAIL username=%s error=%s', data.username, err.message);
@ -291,18 +292,29 @@ class UserHandler {
this.database.collection('mailboxes').deleteMany({ user: id }, () => false);
log.error('DB', 'CREATEFAIL username=%s error=%s', data.username, err.message);
return callback(new Error('Database Error, failed to create user'));
let response;
switch (err.code) {
case 11000:
response = 'Selected email address already exists';
break;
default:
response = 'Database Error, failed to create user';
}
return callback(new Error(response));
}
// register this address as the default address for that user
return this.users.collection('users').findOneAndUpdate({
_id: id,
setup: false
activated: false
}, {
$set: {
password: hash,
address,
setup: true
activated: true,
disabled: false
}
}, {}, err => {
if (err) {

View file

@ -284,7 +284,9 @@ function markAsSeen(session, messages, callback) {
uid: message.uid,
flags: message.flags.concat('\\Seen'),
message: new ObjectID(message.id),
modseq: mailboxData.modifyIndex
modseq: mailboxData.modifyIndex,
// Indicate that unseen values are changed. Not sure how much though
unseenChange: true
};
return result;
}),