added support for cahced counters

This commit is contained in:
Andris Reinman 2017-07-18 11:17:36 +03:00
parent c9debc13ca
commit ad1ac6d280
4 changed files with 288 additions and 8 deletions

208
api.js
View file

@ -6,6 +6,7 @@ const log = require('npmlog');
const Joi = require('joi');
const bcrypt = require('bcryptjs');
const tools = require('./lib/tools');
const consts = require('./lib/consts');
const UserHandler = require('./lib/user-handler');
const db = require('./lib/db');
const certs = require('./lib/certs').get('api');
@ -826,6 +827,213 @@ server.get('/users/:user/addresses/:address', (req, res, next) => {
});
});
server.get('/users/:user/mailboxes', (req, res, next) => {
res.charSet('utf-8');
const schema = Joi.object().keys({
user: Joi.string().hex().lowercase().length(24).required()
});
const result = Joi.validate(req.params, schema, {
abortEarly: false,
convert: true
});
if (result.error) {
res.json({
error: result.error.message
});
return next();
}
let user = new ObjectID(result.value.user);
db.users.collection('users').findOne({
_id: user
}, {
fields: {
address: true
}
}, (err, userData) => {
if (err) {
res.json({
error: 'MongoDB Error: ' + err.message
});
return next();
}
if (!userData) {
res.json({
error: 'This user does not exist'
});
return next();
}
db.database
.collection('mailboxes')
.find({
user
})
.toArray((err, mailboxes) => {
if (err) {
res.json({
error: 'MongoDB Error: ' + err.message
});
return next();
}
if (!mailboxes) {
mailboxes = [];
}
let list = new Map();
mailboxes = mailboxes
.map(mailbox => {
list.set(mailbox.path, mailbox);
return mailbox;
})
.sort((a, b) => {
if (a.path === 'INBOX') {
return -1;
}
if (b.path === 'INBOX') {
return 1;
}
if (a.subscribed !== b.subscribed) {
return (a.subscribed ? 0 : 1) - (b.subscribed ? 0 : 1);
}
return a.path.localeCompare(b.path);
});
res.json({
success: true,
mailboxes: mailboxes.map(mailbox => {
let path = mailbox.path.split('/');
let name = path.pop();
return {
id: mailbox._id,
name,
path: mailbox.path,
specialUse: mailbox.specialUse,
modifyIndex: mailbox.modifyIndex
};
})
});
return next();
});
});
});
server.get('/users/:user/mailboxes/:mailbox', (req, res, next) => {
res.charSet('utf-8');
const schema = Joi.object().keys({
user: Joi.string().hex().lowercase().length(24).required(),
mailbox: Joi.string().hex().lowercase().length(24).required()
});
const result = Joi.validate(req.params, schema, {
abortEarly: false,
convert: true
});
if (result.error) {
res.json({
error: result.error.message
});
return next();
}
let user = new ObjectID(result.value.user);
let mailbox = new ObjectID(result.value.mailbox);
db.users.collection('users').findOne({
_id: user
}, {
fields: {
address: true
}
}, (err, userData) => {
if (err) {
res.json({
error: 'MongoDB Error: ' + err.message
});
return next();
}
if (!userData) {
res.json({
error: 'This user does not exist'
});
return next();
}
db.database.collection('mailboxes').findOne({
_id: mailbox,
user
}, (err, mailboxData) => {
if (err) {
res.json({
error: 'MongoDB Error: ' + err.message
});
return next();
}
if (!mailboxData) {
res.json({
error: 'This mailbox does not exist'
});
return next();
}
let getCounter = (mailbox, done) => {
db.redis.get('sum:' + mailbox.toString(), (err, sum) => {
if (err) {
return done(err);
}
if (sum !== null) {
return done(null, sum);
}
// calculate sum
db.database.collection('messages').count({ mailbox }, (err, sum) => {
if (err) {
return done(err);
}
// cache calculated sum in redis
db.redis.multi().set('sum:' + mailbox.toString(), sum).expire('sum:' + mailbox.toString(), consts.MAILBOX_COUNTER_TTL).exec(() => {
done(null, sum);
});
});
});
};
let path = mailboxData.path.split('/');
let name = path.pop();
getCounter(mailbox, (err, sum) => {
if (err) {
// ignore
}
res.json({
success: true,
id: mailbox,
name,
path: mailboxData.path,
specialUse: mailboxData.specialUse,
modifyIndex: mailboxData.modifyIndex,
messages: sum
});
});
return next();
});
});
});
module.exports = done => {
if (!config.imap.enabled) {
return setImmediate(() => done(null, false));

View file

@ -14,5 +14,7 @@ module.exports = {
MAX_RECIPIENTS: 2000,
MAX_FORWARDS: 2000,
JUNK_RETENTION: 30 * 24 * 3600 * 1000
JUNK_RETENTION: 30 * 24 * 3600 * 1000,
MAILBOX_COUNTER_TTL: 24 * 3600
};

View file

@ -2,32 +2,48 @@
const Scripty = require('node-redis-scripty');
const counterScript = `
const ttlCounterScript = `
local key = KEYS[1];
local increment = tonumber(ARGV[1]) or 0;
local limit = tonumber(ARGV[2]) or 0;
local current = tonumber(redis.call("GET", KEYS[1])) or 0;
local current = tonumber(redis.call("GET", key)) or 0;
if current >= limit then
local ttl = tonumber(redis.call("TTL", KEYS[1])) or 0;
local ttl = tonumber(redis.call("TTL", key)) or 0;
return {0, current, ttl};
end;
local updated = tonumber(redis.call("INCRBY", KEYS[1], increment));
local updated = tonumber(redis.call("INCRBY", key, increment));
if current == 0 then
redis.call("EXPIRE", KEYS[1], 86400);
redis.call("EXPIRE", key, 86400);
end;
local ttl = tonumber(redis.call("TTL", KEYS[1])) or 0;
local ttl = tonumber(redis.call("TTL", key)) or 0;
return {1, updated, ttl};
`;
const cachedCounterScript = `
local key = KEYS[1];
local increment = tonumber(ARGV[1]) or 0;
local ttl = tonumber(ARGV[2]) or 0;
if redis.call("EXISTS", key) == 1 then
redis.call("INCRBY", key, increment);
-- extend the life of this counter by ttl seconds
redis.call("EXPIRE", key, ttl);
return redis.call("GET", key);
else
return nil;
end
`;
module.exports = redis => {
let scripty = new Scripty(redis);
return {
ttlcounter(key, count, max, callback) {
scripty.loadScript('counter', counterScript, (err, script) => {
scripty.loadScript('ttlcounter', ttlCounterScript, (err, script) => {
if (err) {
return callback(err);
}
@ -42,6 +58,15 @@ module.exports = redis => {
});
});
});
},
cachedcounter(key, count, ttl, callback) {
scripty.loadScript('cachedCounter', cachedCounterScript, (err, script) => {
if (err) {
return callback(err);
}
script.run(1, key, count, ttl, callback);
});
}
};
};

View file

@ -2,10 +2,12 @@
const config = require('wild-config');
const tools = require('./tools');
const consts = require('./consts');
const crypto = require('crypto');
const EventEmitter = require('events').EventEmitter;
const redis = require('redis');
const log = require('npmlog');
const counters = require('./counters');
class ImapNotifier extends EventEmitter {
constructor(options) {
@ -13,6 +15,7 @@ class ImapNotifier extends EventEmitter {
this.database = options.database;
this.publisher = options.redis || redis.createClient(tools.redisConfig(config.redis));
this.cachedcounter = counters(this.publisher).cachedcounter;
this.logger = options.logger || {
info: log.silly.bind(log, 'IMAP'),
@ -189,6 +192,9 @@ class ImapNotifier extends EventEmitter {
if (err) {
return callback(err);
}
setImmediate(() => this.updateCounters(entries));
return callback(null, r.insertedCount);
});
};
@ -283,6 +289,45 @@ class ImapNotifier extends EventEmitter {
.toArray(callback);
});
}
updateCounters(entries) {
if (!entries) {
return;
}
let counters = new Map();
(Array.isArray(entries) ? entries : [].concat(entries || [])).forEach(entry => {
let m = entry.mailbox.toString();
if (!counters.has(m)) {
counters.set(m, 0);
}
switch (entry && entry.command) {
case 'EXISTS':
counters.set(m, counters.get(m) + 1);
break;
case 'EXPUNGE':
counters.set(m, counters.get(m) - 1);
break;
}
});
let pos = 0;
let rows = Array.from(counters);
let updateCounter = () => {
if (pos >= rows.length) {
return;
}
let row = rows[pos++];
if (!row || !row.length) {
return updateCounter();
}
let mailbox = row[0];
let delta = row[1];
this.cachedcounter('sum:' + mailbox, delta, consts.MAILBOX_COUNTER_TTL, updateCounter);
};
updateCounter();
}
}
module.exports = ImapNotifier;