From ad1ac6d2803aec049e8b6f7f761c08698af473a6 Mon Sep 17 00:00:00 2001 From: Andris Reinman Date: Tue, 18 Jul 2017 11:17:36 +0300 Subject: [PATCH] added support for cahced counters --- api.js | 208 +++++++++++++++++++++++++++++++++++++++++++ lib/consts.js | 4 +- lib/counters.js | 39 ++++++-- lib/imap-notifier.js | 45 ++++++++++ 4 files changed, 288 insertions(+), 8 deletions(-) diff --git a/api.js b/api.js index 174f66b4..62ec2227 100644 --- a/api.js +++ b/api.js @@ -6,6 +6,7 @@ const log = require('npmlog'); const Joi = require('joi'); const bcrypt = require('bcryptjs'); const tools = require('./lib/tools'); +const consts = require('./lib/consts'); const UserHandler = require('./lib/user-handler'); const db = require('./lib/db'); const certs = require('./lib/certs').get('api'); @@ -826,6 +827,213 @@ server.get('/users/:user/addresses/:address', (req, res, next) => { }); }); +server.get('/users/:user/mailboxes', (req, res, next) => { + res.charSet('utf-8'); + + const schema = Joi.object().keys({ + user: Joi.string().hex().lowercase().length(24).required() + }); + + const result = Joi.validate(req.params, schema, { + abortEarly: false, + convert: true + }); + + if (result.error) { + res.json({ + error: result.error.message + }); + return next(); + } + + let user = new ObjectID(result.value.user); + + db.users.collection('users').findOne({ + _id: user + }, { + fields: { + address: true + } + }, (err, userData) => { + if (err) { + res.json({ + error: 'MongoDB Error: ' + err.message + }); + return next(); + } + if (!userData) { + res.json({ + error: 'This user does not exist' + }); + return next(); + } + + db.database + .collection('mailboxes') + .find({ + user + }) + .toArray((err, mailboxes) => { + if (err) { + res.json({ + error: 'MongoDB Error: ' + err.message + }); + return next(); + } + + if (!mailboxes) { + mailboxes = []; + } + + let list = new Map(); + + mailboxes = mailboxes + .map(mailbox => { + list.set(mailbox.path, mailbox); + return mailbox; + }) + .sort((a, b) => { + if (a.path === 'INBOX') { + return -1; + } + if (b.path === 'INBOX') { + return 1; + } + if (a.subscribed !== b.subscribed) { + return (a.subscribed ? 0 : 1) - (b.subscribed ? 0 : 1); + } + return a.path.localeCompare(b.path); + }); + + res.json({ + success: true, + + mailboxes: mailboxes.map(mailbox => { + let path = mailbox.path.split('/'); + let name = path.pop(); + + return { + id: mailbox._id, + name, + path: mailbox.path, + specialUse: mailbox.specialUse, + modifyIndex: mailbox.modifyIndex + }; + }) + }); + + return next(); + }); + }); +}); + +server.get('/users/:user/mailboxes/:mailbox', (req, res, next) => { + res.charSet('utf-8'); + + const schema = Joi.object().keys({ + user: Joi.string().hex().lowercase().length(24).required(), + mailbox: Joi.string().hex().lowercase().length(24).required() + }); + + const result = Joi.validate(req.params, schema, { + abortEarly: false, + convert: true + }); + + if (result.error) { + res.json({ + error: result.error.message + }); + return next(); + } + + let user = new ObjectID(result.value.user); + let mailbox = new ObjectID(result.value.mailbox); + + db.users.collection('users').findOne({ + _id: user + }, { + fields: { + address: true + } + }, (err, userData) => { + if (err) { + res.json({ + error: 'MongoDB Error: ' + err.message + }); + return next(); + } + if (!userData) { + res.json({ + error: 'This user does not exist' + }); + return next(); + } + + db.database.collection('mailboxes').findOne({ + _id: mailbox, + user + }, (err, mailboxData) => { + if (err) { + res.json({ + error: 'MongoDB Error: ' + err.message + }); + return next(); + } + if (!mailboxData) { + res.json({ + error: 'This mailbox does not exist' + }); + return next(); + } + + let getCounter = (mailbox, done) => { + db.redis.get('sum:' + mailbox.toString(), (err, sum) => { + if (err) { + return done(err); + } + + if (sum !== null) { + return done(null, sum); + } + + // calculate sum + db.database.collection('messages').count({ mailbox }, (err, sum) => { + if (err) { + return done(err); + } + + // cache calculated sum in redis + db.redis.multi().set('sum:' + mailbox.toString(), sum).expire('sum:' + mailbox.toString(), consts.MAILBOX_COUNTER_TTL).exec(() => { + done(null, sum); + }); + }); + }); + }; + + let path = mailboxData.path.split('/'); + let name = path.pop(); + + getCounter(mailbox, (err, sum) => { + if (err) { + // ignore + } + res.json({ + success: true, + id: mailbox, + name, + path: mailboxData.path, + specialUse: mailboxData.specialUse, + modifyIndex: mailboxData.modifyIndex, + messages: sum + }); + }); + + return next(); + }); + }); +}); + module.exports = done => { if (!config.imap.enabled) { return setImmediate(() => done(null, false)); diff --git a/lib/consts.js b/lib/consts.js index 0ae99f32..071e4c47 100644 --- a/lib/consts.js +++ b/lib/consts.js @@ -14,5 +14,7 @@ module.exports = { MAX_RECIPIENTS: 2000, MAX_FORWARDS: 2000, - JUNK_RETENTION: 30 * 24 * 3600 * 1000 + JUNK_RETENTION: 30 * 24 * 3600 * 1000, + + MAILBOX_COUNTER_TTL: 24 * 3600 }; diff --git a/lib/counters.js b/lib/counters.js index 517f15ed..d8918e4c 100644 --- a/lib/counters.js +++ b/lib/counters.js @@ -2,32 +2,48 @@ const Scripty = require('node-redis-scripty'); -const counterScript = ` +const ttlCounterScript = ` +local key = KEYS[1]; local increment = tonumber(ARGV[1]) or 0; local limit = tonumber(ARGV[2]) or 0; -local current = tonumber(redis.call("GET", KEYS[1])) or 0; +local current = tonumber(redis.call("GET", key)) or 0; if current >= limit then - local ttl = tonumber(redis.call("TTL", KEYS[1])) or 0; + local ttl = tonumber(redis.call("TTL", key)) or 0; return {0, current, ttl}; end; -local updated = tonumber(redis.call("INCRBY", KEYS[1], increment)); +local updated = tonumber(redis.call("INCRBY", key, increment)); if current == 0 then - redis.call("EXPIRE", KEYS[1], 86400); + redis.call("EXPIRE", key, 86400); end; -local ttl = tonumber(redis.call("TTL", KEYS[1])) or 0; +local ttl = tonumber(redis.call("TTL", key)) or 0; return {1, updated, ttl}; `; +const cachedCounterScript = ` +local key = KEYS[1]; +local increment = tonumber(ARGV[1]) or 0; +local ttl = tonumber(ARGV[2]) or 0; + +if redis.call("EXISTS", key) == 1 then + redis.call("INCRBY", key, increment); + -- extend the life of this counter by ttl seconds + redis.call("EXPIRE", key, ttl); + return redis.call("GET", key); +else + return nil; +end +`; + module.exports = redis => { let scripty = new Scripty(redis); return { ttlcounter(key, count, max, callback) { - scripty.loadScript('counter', counterScript, (err, script) => { + scripty.loadScript('ttlcounter', ttlCounterScript, (err, script) => { if (err) { return callback(err); } @@ -42,6 +58,15 @@ module.exports = redis => { }); }); }); + }, + + cachedcounter(key, count, ttl, callback) { + scripty.loadScript('cachedCounter', cachedCounterScript, (err, script) => { + if (err) { + return callback(err); + } + script.run(1, key, count, ttl, callback); + }); } }; }; diff --git a/lib/imap-notifier.js b/lib/imap-notifier.js index 2f691d95..eb2e2f65 100644 --- a/lib/imap-notifier.js +++ b/lib/imap-notifier.js @@ -2,10 +2,12 @@ const config = require('wild-config'); const tools = require('./tools'); +const consts = require('./consts'); const crypto = require('crypto'); const EventEmitter = require('events').EventEmitter; const redis = require('redis'); const log = require('npmlog'); +const counters = require('./counters'); class ImapNotifier extends EventEmitter { constructor(options) { @@ -13,6 +15,7 @@ class ImapNotifier extends EventEmitter { this.database = options.database; this.publisher = options.redis || redis.createClient(tools.redisConfig(config.redis)); + this.cachedcounter = counters(this.publisher).cachedcounter; this.logger = options.logger || { info: log.silly.bind(log, 'IMAP'), @@ -189,6 +192,9 @@ class ImapNotifier extends EventEmitter { if (err) { return callback(err); } + + setImmediate(() => this.updateCounters(entries)); + return callback(null, r.insertedCount); }); }; @@ -283,6 +289,45 @@ class ImapNotifier extends EventEmitter { .toArray(callback); }); } + + updateCounters(entries) { + if (!entries) { + return; + } + let counters = new Map(); + (Array.isArray(entries) ? entries : [].concat(entries || [])).forEach(entry => { + let m = entry.mailbox.toString(); + if (!counters.has(m)) { + counters.set(m, 0); + } + switch (entry && entry.command) { + case 'EXISTS': + counters.set(m, counters.get(m) + 1); + break; + case 'EXPUNGE': + counters.set(m, counters.get(m) - 1); + break; + } + }); + + let pos = 0; + let rows = Array.from(counters); + let updateCounter = () => { + if (pos >= rows.length) { + return; + } + let row = rows[pos++]; + if (!row || !row.length) { + return updateCounter(); + } + let mailbox = row[0]; + let delta = row[1]; + + this.cachedcounter('sum:' + mailbox, delta, consts.MAILBOX_COUNTER_TTL, updateCounter); + }; + + updateCounter(); + } } module.exports = ImapNotifier;