publish messages to db before sending to clients

This commit is contained in:
Andris Reinman 2017-09-28 16:17:45 +03:00
parent ddbf1fb2da
commit 43b8757a5e
2 changed files with 393 additions and 55 deletions

View file

@ -6,8 +6,9 @@ const EventEmitter = require('events');
const os = require('os');
const codes = require('./codes');
const db = require('../db');
const ObjectID = require('mongodb').ObjectID;
const PING_TIMEOUT = 120 * 1000;
const PING_TIMEOUT = 10 * 120 * 1000;
const SOCKET_TIMEOUT = 5 * 60 * 1000;
class IRCConnection extends EventEmitter {
@ -21,6 +22,8 @@ class IRCConnection extends EventEmitter {
this._authenticating = false;
this.subscriptions = new Set();
this.remoteAddress = this._socket.remoteAddress;
this.id = crypto
.randomBytes(8)
@ -41,6 +44,56 @@ class IRCConnection extends EventEmitter {
this.capStarted = false;
this.capEnded = false;
this.capEnabled = new Set();
this.accumulateTimer = false;
this.accumulateStart = false;
this.fetching = false;
this.dofetch = false;
this.lastFetchedItem = new ObjectID();
this.nickChanges = new Set();
this.subscriber = data => {
switch (data.action) {
case 'message': {
clearTimeout(this.accumulateTimer);
let time = Date.now();
if (this.accumulateStart && this.accumulateStart < time - 1000) {
this.accumulateStart = false;
return this.fetchMessages();
}
if (!this.accumulateStart) {
this.accumulateStart = time;
}
this.accumulateTimer = setTimeout(() => this.fetchMessages(), 80);
this.accumulateTimer.unref();
break;
}
case 'nick': {
if (this.nickChanges.has(data.changeId)) {
break;
}
this.nickChanges.add(data.changeId);
if (data.user === this.session.auth.id.toString() && data.nick !== this.session.nick) {
let currentSource = this.getFormattedName();
this.session.nick = data.nick;
this.send({ source: currentSource, verb: 'NICK', target: false, params: this.session.nick });
} else if (data.currentSource) {
this.send({ source: data.currentSource, verb: 'NICK', target: false, params: data.nick });
}
// FIXME: nick change with additional login does not have currentSource set yet!
// FIXME: subscribing to a channel is not propagated to existing sessions for same user
//console.log('NICK CHANGE');
//console.log(data);
break;
}
}
};
}
init() {
@ -100,6 +153,63 @@ class IRCConnection extends EventEmitter {
this._socket.write(payload);
}
fetchMessages(force) {
if (!force && this.fetching) {
this.dofetch = true;
return false;
}
this.fetching = true;
this.dofetch = false;
let query = {
_id: { $gt: this.lastFetchedItem },
rcpt: this.session.auth.id
};
let cursor = db.database.collection('chat').find(query);
let clear = () =>
cursor.close(() => {
if (this.dofetch) {
return setImmediate(() => this.fetchMessages(true));
} else {
this.fetching = false;
}
});
let processNext = () => {
cursor.next((err, message) => {
if (err) {
this.server.logger.error(
{
err,
tnx: 'chat',
cid: this.id
},
'Failed iterating db cursor. %s',
err.message
);
return;
}
if (!message) {
return clear();
}
this.lastFetchedItem = message._id;
if (message.channel && message.session.toString() === this.session.id.toString()) {
// ignore messages from self
return setImmediate(processNext);
}
this.send({ source: message.nick, verb: 'PRIVMSG', message: message.message });
setImmediate(processNext);
});
};
processNext();
}
send(payload) {
if (!this._socket || !this._socket.writable) {
return;
@ -170,6 +280,11 @@ class IRCConnection extends EventEmitter {
* @event
*/
_onClose(/* hadError */) {
this.subscriptions.forEach(subscriptionKey => {
this.unsubscribe(subscriptionKey);
});
this.subscriptions.clear();
if (this._closed) {
return;
}
@ -230,6 +345,7 @@ class IRCConnection extends EventEmitter {
_resetSession() {
this.session = {
id: new ObjectID(),
state: 'AUTHORIZATION',
remoteAddress: this.remoteAddress
};
@ -443,10 +559,41 @@ class IRCConnection extends EventEmitter {
this.session.nick = this.session.nick || userData.username;
this.session.ns = ns || 'root';
next(null, {
id: userData._id,
username: userData.username
});
db.database
.collection('channels')
.find({
members: userData._id
})
.project({
_id: true,
channel: true
})
.toArray((err, channels) => {
if (err) {
return next(err);
}
if (Array.isArray(channels)) {
channels.forEach(channelData => {
this.server.logger.info(
{
tnx: 'setup',
cid: this.id,
channel: channelData._id
},
'Joining %s to channel %s',
userData._id,
channelData._id
);
this.subscribe([this.session.ns, '#', channelData._id].join('/'));
});
}
this.subscribe([this.session.ns, '%', userData._id].join('/'));
next(null, {
id: userData._id,
username: userData.username
});
});
});
}
);
@ -459,6 +606,7 @@ class IRCConnection extends EventEmitter {
let verifyUser = done => {
if (nickview === this.session.auth.username.replace(/\./g, '')) {
// allow the same nick as username
return done();
}
db.users.collection('users').findOne({
@ -512,7 +660,7 @@ class IRCConnection extends EventEmitter {
}
if (nickData.user.toString() === user.toString()) {
return next(null, false);
return next(null, nickData._id);
}
err = new Error('Requested nick is already in use');
@ -528,17 +676,17 @@ class IRCConnection extends EventEmitter {
}
// try to remove old nicks
db.database.collection('nicks').deleteOne({
db.database.collection('nicks').deleteMany({
ns,
nickview: { $ne: nickview },
user
}, () => next(null, true));
}, () => next(null, insertId));
});
});
}
verifyNickChange(currentSource, next) {
this.getNick(this.session.nick, (err, changed) => {
this.getNick(this.session.nick, (err, nickId) => {
if (err) {
currentSource = currentSource || this.getFormattedName();
this.send({ verb: err.verb || 'ERR_UNAVAILRESOURCE', params: this.session.nick, message: err.message });
@ -549,19 +697,42 @@ class IRCConnection extends EventEmitter {
this.send({ source: currentSource, verb: 'NICK', target: false, params: this.session.nick });
}
if (changed) {
/*
this.notifyAll({ source: currentSource, verb: 'NICK', target: false, params: this.session.nick }, ()=>{
let updatedSource = this.getFormattedName();
if (nickId && currentSource !== updatedSource) {
let eventData = {
action: 'nick',
changeId: new ObjectID().toString(),
nickId: nickId.toString(),
user: this.session.auth.id.toString(),
old: currentSource,
new: updatedSource,
nick: this.session.nick
};
this.subscriptions.forEach(subscriptionKey => {
this.publish(subscriptionKey, eventData);
});
*/
}
this.server.nick(this);
//this.server.nick(this);
return next();
});
}
subscribe(subscriptionKey) {
this.subscriptions.add(subscriptionKey);
this.server.subscribe(this, subscriptionKey, this.subscriber);
}
unsubscribe(subscriptionKey) {
this.subscriptions.delete(subscriptionKey);
this.server.unsubscribe(this, subscriptionKey);
}
publish(subscriptionKey, data) {
this.server.publish(this, subscriptionKey, data);
}
checkAuth() {
if (!this.session.auth) {
this.send({ verb: 'ERR_NOTREGISTERED', params: 'PRIVMSG', message: 'Authentication required to chat in this server' });
@ -669,16 +840,15 @@ class IRCConnection extends EventEmitter {
if (this.connectionPass) {
this.authenticate(this.session.user, this.connectionPass, (err, auth) => {
if (err) {
this.server.quit(this, 'User registration failed. ' + err.message);
if (err || !auth) {
let message = err ? err.message : 'Authentication failed';
this.send('ERROR :Closing link: (' + this.getFormattedName(true) + ') [' + message + ']');
this.server.quit(this, 'User registration failed. ' + message);
return this.close();
}
if (auth) {
this.session.auth = auth;
this.checkSessionStart();
return this.verifyNickChange(false, next);
}
return next();
this.session.auth = auth;
this.checkSessionStart();
return this.verifyNickChange(false, next);
});
} else {
this.checkSessionStart();
@ -711,8 +881,8 @@ class IRCConnection extends EventEmitter {
let tryCount = 0;
let tryGetChannel = () => {
db.database.collection('channels').findOne({
channelview,
ns: this.session.ns
ns: this.session.ns,
channelview
}, (err, channelData) => {
if (err) {
this.send({ verb: 'ERR_FILEERROR', params: channel, message: err.message });
@ -720,16 +890,39 @@ class IRCConnection extends EventEmitter {
}
if (channelData) {
this.server.join(channel, this);
return next();
return db.database.collection('channels').findOneAndUpdate({
_id: channelData._id
}, {
$addToSet: {
members: this.session.auth.id
}
}, {
returnOriginal: false
}, (err, result) => {
if (err) {
this.send({ verb: 'ERR_FILEERROR', params: channel, message: err.message });
return next();
}
if (!result || !result.value) {
this.send({ verb: 'ERR_NOSUCHCHANNEL', params: channel, message: 'Could not open channel' });
return next();
}
this.subscribe([this.session.ns, '#', channelData._id].join('/'));
this.server.join(channel, this);
return next();
});
}
db.database.collection('channels').insertOne({
channel,
channelview,
ns: this.session.ns,
user: this.session.auth.id,
mode: []
mode: [],
owner: this.session.auth.id,
members: [this.session.auth.id]
}, err => {
if (err) {
if (err.code === 11000 && tryCount++ < 5) {
@ -767,9 +960,29 @@ class IRCConnection extends EventEmitter {
return next();
}
this.server.leave(channel, this, params.slice(1).join(' '));
db.database.collection('channels').findOneAndUpdate({
ns: this.session.ns,
channelview: channel.toLowerCase().replace(/\./g, '')
}, {
$pull: {
members: this.session.auth.id
}
}, {
returnOriginal: false
}, (err, result) => {
if (err) {
this.send({ verb: 'ERR_FILEERROR', params: channel, message: err.message });
return next();
}
return next();
if (!result || !result.value) {
this.send({ verb: 'ERR_NOSUCHCHANNEL', params: channel, message: 'Could not find channel' });
return next();
}
this.server.leave(channel, this, params.slice(1).join(' '));
return next();
});
}
command_PRIVMSG(params, next) {
@ -804,9 +1017,104 @@ class IRCConnection extends EventEmitter {
return next();
}
this.server.send(target, this, params.slice(1).join(' '));
let resolveTarget = done => {
if (/^[#&\s]/.test(target)) {
// channel
db.database.collection('channels').findOne({
ns: this.session.ns,
channelview: target.toLowerCase().replace(/\./g, '')
}, (err, channelData) => {
if (err) {
this.send({ verb: 'ERR_FILEERROR', params: target, message: err.message });
return next();
}
return next();
if (!channelData) {
this.send({ verb: 'ERR_NOSUCHCHANNEL', params: target, message: 'No such channel' });
return next();
}
done(false, {
type: 'channel',
channel: channelData,
targets: channelData.members || []
});
});
} else {
// nick
// channel
db.database.collection('nicks').findOne({
ns: this.session.ns,
nickview: target.toLowerCase().replace(/\./g, '')
}, (err, nickData) => {
if (err) {
this.send({ verb: 'ERR_FILEERROR', params: target, message: err.message });
return next();
}
if (!nickData) {
this.send({ verb: 'ERR_NOSUCHNICK', params: target, message: 'No such nick/channel' });
return next();
}
done(false, {
type: 'nick',
nick: nickData,
targets: [nickData.user].concat(nickData.user.toString() !== this.session.auth.id.toString() ? this.session.auth.id : [])
});
});
}
};
resolveTarget((err, targetData) => {
if (err) {
this.send({ verb: 'ERR_FILEERROR', params: target, message: err.message });
return next();
}
let msgId = new ObjectID();
let time = new Date();
let message = params.slice(1).join(' ');
let channel = (targetData.type === 'channel' && { id: targetData.channel._id, name: targetData.channel.channel }) || false;
let inserts = targetData.targets.map(user => {
let entry = {
insertOne: {
msgId,
channel,
session: this.session.id,
ns: this.session.ns,
from: this.session.auth.id,
nick: this.getFormattedName(),
rcpt: user,
time,
message
}
};
return entry;
});
db.database.collection('chat').bulkWrite(inserts, { ordered: false }, err => {
if (err) {
this.send({ verb: 'ERR_FILEERROR', params: target, message: err.message });
return next();
}
if (channel) {
this.publish([this.session.ns, '#', channel.id].join('/'), {
action: 'message',
msgId: msgId.toString()
});
} else {
targetData.targets.map(user => {
this.publish([this.session.ns, '%', user].join('/'), {
action: 'message',
msgId: msgId.toString()
});
});
}
return next();
});
});
}
command_CAP(params, next) {
@ -907,6 +1215,7 @@ class IRCConnection extends EventEmitter {
case 'IDENTIFY': {
return this.authenticate(this.session.user, params.join(' '), (err, auth) => {
if (err) {
this.send('ERROR :Closing link: (' + this.getFormattedName(true) + ') [' + err.message + ']');
this.server.quit(this, 'User registration failed. ' + err.message);
return this.close();
}
@ -982,6 +1291,7 @@ class IRCConnection extends EventEmitter {
this.authenticate(user, password, (err, auth) => {
this._authenticating = false;
if (err) {
this.send('ERROR :Closing link: (' + this.getFormattedName(true) + ') [' + err.message + ']');
this.server.quit(this, 'User registration failed. ' + err.message);
return this.close();
}

View file

@ -1,5 +1,6 @@
'use strict';
const config = require('wild-config');
const EventEmitter = require('events');
const net = require('net');
const tls = require('tls');
@ -7,6 +8,8 @@ const packageData = require('../../package.json');
const tlsOptions = require('../../imap-core/lib/tls-options');
const shared = require('nodemailer/lib/shared');
const IRCConnection = require('./connection');
const tools = require('../tools');
const redis = require('redis');
const CLOSE_TIMEOUT = 1 * 1000; // how much to wait until pending connections are terminated
@ -68,9 +71,57 @@ class IRCServer extends EventEmitter {
this.server = (this.options.secure ? tls : net).createServer(this.options, socket => this._onConnect(socket));
this.publisher = redis.createClient(tools.redisConfig(config.dbs.redis));
this.subsriber = redis.createClient(tools.redisConfig(config.dbs.redis));
this.subscribers = new Map();
this._listeners = new EventEmitter();
this._listeners.setMaxListeners(0);
this.subsriber.on('message', (channel, message) => {
if (this.subscribers.has(channel)) {
let data;
try {
data = JSON.parse(message);
} catch (E) {
return;
}
this._listeners.emit(channel, data);
}
});
this._setListeners();
}
subscribe(session, channel, handler) {
if (!this.subscribers.has(channel)) {
this.subscribers.set(channel, new Map([[session, handler]]));
this.subsriber.subscribe(channel);
} else if (!this.subscribers.get(channel).has(session)) {
this.subscribers.get(channel).set(session, handler);
} else {
return;
}
this._listeners.addListener(channel, handler);
}
unsubscribe(session, channel) {
if (!this.subscribers.has(channel) || !this.subscribers.get(channel).has(session)) {
return;
}
let handler = this.subscribers.get(channel).get(session);
this._listeners.removeListener(channel, handler);
this.subscribers.get(channel).delete(session);
if (!this.subscribers.get(channel).size) {
this.subscribers.delete(channel);
this.subsriber.unsubscribe(channel);
}
}
publish(session, channel, data) {
this.publisher.publish(channel, JSON.stringify(data));
}
_setListeners() {
this.server.on('listening', () => this._onListening());
this.server.on('close', () => this._onClose());
@ -275,29 +326,6 @@ class IRCServer extends EventEmitter {
entry.nick = client.session.nick;
this._nicks.set(client.session.nick.toLowerCase(), client);
}
send(target, client, message) {
let nameLC = target.toLowerCase();
let clientName = client.getFormattedName();
if (/^[#&]/.test(target)) {
if (!this._channels.has(nameLC)) {
client.send({ verb: 'ERR_NOSUCHNICK', params: target, message: 'No such nick/channel' });
return;
}
let channel = this._channels.get(nameLC);
channel.clients.forEach(c => {
if (c !== client) {
c.send({ source: clientName, target, verb: 'PRIVMSG', message });
}
});
} else if (this._nicks.has(nameLC)) {
let client = this._nicks.get(nameLC);
client.send({ source: clientName, verb: 'PRIVMSG', message });
} else {
client.send({ verb: 'ERR_NOSUCHNICK', params: target, message: 'No such nick/channel' });
return;
}
}
}
module.exports = IRCServer;