Added imap.maxConnections option to limit parallel connections for an user (defaults to 15)

This commit is contained in:
Andris Reinman 2017-11-10 17:27:15 +02:00
parent 247cb2073a
commit b284477a45
5 changed files with 154 additions and 9 deletions

View file

@ -257,6 +257,17 @@ class IMAPConnection extends EventEmitter {
this._server.connections.delete(this);
if (typeof this._server.notifier.releaseConnection === 'function') {
this._server.notifier.releaseConnection(
{
service: 'imap',
session: this.session,
user: this.user
},
() => false
);
}
if (this._closed) {
return;
}

View file

@ -3,6 +3,9 @@
const fs = require('fs');
const ttlCounterScript = fs.readFileSync(__dirname + '/lua/ttlcounter.lua', 'utf-8');
const cachedCounterScript = fs.readFileSync(__dirname + '/lua/cachedcounter.lua', 'utf-8');
const limitedCounterScript = fs.readFileSync(__dirname + '/lua/limitedcounter.lua', 'utf-8');
const clientVersion = Date.now();
module.exports = redis => {
redis.defineCommand('ttlcounter', {
@ -15,6 +18,11 @@ module.exports = redis => {
lua: cachedCounterScript
});
redis.defineCommand('limitedcounter', {
numberOfKeys: 1,
lua: limitedCounterScript
});
return {
ttlcounter(key, count, max, windowSize, callback) {
redis.ttlcounter(key, count, max, windowSize || 86400, (err, res) => {
@ -36,6 +44,18 @@ module.exports = redis => {
}
callback(null, res);
});
},
limitedcounter(key, entry, count, limit, callback) {
redis.limitedcounter(key, entry, count, limit, clientVersion, (err, res) => {
if (err) {
return callback(err);
}
return callback(null, {
success: !!((res && res[0]) || 0),
value: (res && res[1]) || 0
});
});
}
};
};

View file

@ -1,5 +1,7 @@
'use strict';
const config = require('wild-config');
module.exports = (server, userHandler) => (login, session, callback) => {
let username = (login.username || '').toString().trim();
@ -25,11 +27,39 @@ module.exports = (server, userHandler) => (login, session, callback) => {
return callback();
}
callback(null, {
user: {
id: result.user,
username: result.username
let checkConnectionLimits = next => {
if (typeof server.notifier.allocateConnection === 'function') {
return server.notifier.allocateConnection(
{
service: 'imap',
session,
user: result.user,
limit: config.imap.maxConnections || 15
},
next
);
}
return next(null, true);
};
checkConnectionLimits((err, success) => {
if (err) {
return callback(err);
}
if (!success) {
err = new Error('[ALERT] Too many simultaneous connections.');
err.response = 'NO';
return callback(err);
}
callback(null, {
user: {
id: result.user,
username: result.username
}
});
});
}
);

View file

@ -15,7 +15,7 @@ class ImapNotifier extends EventEmitter {
this.database = options.database;
this.publisher = options.redis || new Redis(tools.redisConfig(config.dbs.redis));
this.cachedcounter = counters(this.publisher).cachedcounter;
this.counters = counters(this.publisher);
this.logger = options.logger || {
info: log.silly.bind(log, 'IMAP'),
@ -28,6 +28,8 @@ class ImapNotifier extends EventEmitter {
return;
}
this.connectionSessions = new WeakMap();
// Subscriber needs its own client connection. This is relevant only in the context of IMAP
this.subsriber = new Redis(tools.redisConfig(config.dbs.redis));
this._listeners = new EventEmitter();
@ -116,8 +118,6 @@ class ImapNotifier extends EventEmitter {
addListener(session, path, handler) {
let eventName = this._eventName(session.user.id.toString(), path);
this._listeners.addListener(eventName, handler);
this.logger.debug('[%s] New journal listener for %s ("%s:%s")', session.id, eventName, session.user.username, path);
}
/**
@ -348,13 +348,13 @@ class ImapNotifier extends EventEmitter {
let mailbox = row[0];
let delta = row[1];
this.cachedcounter('total:' + mailbox, delta.total, consts.MAILBOX_COUNTER_TTL, () => {
this.counters.cachedcounter('total:' + mailbox, delta.total, consts.MAILBOX_COUNTER_TTL, () => {
if (delta.unseenChange) {
// Message info changed in mailbox, so just te be sure, clear the unseen counter as well
// Unseen counter is more volatile and also easier to count (usually only a small number on indexed messages)
this.publisher.del('unseen:' + mailbox, updateCounter);
} else if (delta.unseen) {
this.cachedcounter('unseen:' + mailbox, delta.unseen, consts.MAILBOX_COUNTER_TTL, updateCounter);
this.counters.cachedcounter('unseen:' + mailbox, delta.unseen, consts.MAILBOX_COUNTER_TTL, updateCounter);
} else {
setImmediate(updateCounter);
}
@ -363,6 +363,45 @@ class ImapNotifier extends EventEmitter {
updateCounter();
}
allocateConnection(data, callback) {
if (!data || !data.session || this.connectionSessions.has(data.session)) {
return callback(null, true);
}
let rlkey = 'lim:' + data.service;
this.counters.limitedcounter(rlkey, data.user, 1, data.limit || 15, (err, res) => {
if (err) {
return callback(err);
}
if (!res.success) {
return callback(null, false);
}
this.connectionSessions.set(data.session, { service: data.service, user: data.user });
return callback(null, true);
});
}
releaseConnection(data, callback) {
// unauthenticated sessions are unknown
if (!data || !data.session || !this.connectionSessions.has(data.session)) {
return callback(null, true);
}
let entry = this.connectionSessions.get(data.session);
this.connectionSessions.delete(data.session);
let rlkey = 'lim:' + entry.service;
this.counters.limitedcounter(rlkey, entry.user, -1, 0, err => {
if (err) {
this.logger.debug('[%s] Failed to release connection for user %s. %s', data.session.id, entry.user, err.message);
}
return callback(null, true);
});
}
}
module.exports = ImapNotifier;

View file

@ -0,0 +1,45 @@
local key = KEYS[1];
local entry = ARGV[1];
local increment = tonumber(ARGV[2]) or 0;
local limit = tonumber(ARGV[3]) or 0;
local clientVersion = tonumber(ARGV[4]) or 0;
local existingVersion = tonumber(redis.call("HGET", key, "_version")) or 0;
-- Limited counter is not exact. Every client should use timestampt or incrementing value
-- as client ID. Whenever a new client is introduced, existing counter cache is wiped.
-- This should ensure that normally counters are limited but on a server failure when a client
-- restarts then old values to not collide with new ones.
if clientVersion > existingVersion then
redis.call("DEL", key);
redis.call("HSET", key, "_version", clientVersion);
end;
local current = tonumber(redis.call("HGET", key, entry)) or 0;
if increment == 0 then
return {1, current};
end;
if increment < 0 then
-- Remove entry
if current < 1 then
-- nothing to do here
return {1, 0};
end;
current = tonumber(redis.call("HINCRBY", key, entry, increment)) or 0;
return {1, current};
end;
-- Add entry
if current >= limit then
-- over capacity
return {0, current};
end;
current = tonumber(redis.call("HINCRBY", key, entry, increment)) or 0;
return {1, current};