From 43b8757a5ee4fde84bc909688eff4e778165e9d3 Mon Sep 17 00:00:00 2001 From: Andris Reinman Date: Thu, 28 Sep 2017 16:17:45 +0300 Subject: [PATCH] publish messages to db before sending to clients --- lib/irc/connection.js | 374 ++++++++++++++++++++++++++++++++++++++---- lib/irc/server.js | 74 ++++++--- 2 files changed, 393 insertions(+), 55 deletions(-) diff --git a/lib/irc/connection.js b/lib/irc/connection.js index 4c9781fb..fd9556dc 100644 --- a/lib/irc/connection.js +++ b/lib/irc/connection.js @@ -6,8 +6,9 @@ const EventEmitter = require('events'); const os = require('os'); const codes = require('./codes'); const db = require('../db'); +const ObjectID = require('mongodb').ObjectID; -const PING_TIMEOUT = 120 * 1000; +const PING_TIMEOUT = 10 * 120 * 1000; const SOCKET_TIMEOUT = 5 * 60 * 1000; class IRCConnection extends EventEmitter { @@ -21,6 +22,8 @@ class IRCConnection extends EventEmitter { this._authenticating = false; + this.subscriptions = new Set(); + this.remoteAddress = this._socket.remoteAddress; this.id = crypto .randomBytes(8) @@ -41,6 +44,56 @@ class IRCConnection extends EventEmitter { this.capStarted = false; this.capEnded = false; this.capEnabled = new Set(); + + this.accumulateTimer = false; + this.accumulateStart = false; + this.fetching = false; + this.dofetch = false; + this.lastFetchedItem = new ObjectID(); + + this.nickChanges = new Set(); + + this.subscriber = data => { + switch (data.action) { + case 'message': { + clearTimeout(this.accumulateTimer); + let time = Date.now(); + if (this.accumulateStart && this.accumulateStart < time - 1000) { + this.accumulateStart = false; + return this.fetchMessages(); + } + if (!this.accumulateStart) { + this.accumulateStart = time; + } + this.accumulateTimer = setTimeout(() => this.fetchMessages(), 80); + this.accumulateTimer.unref(); + break; + } + + case 'nick': { + if (this.nickChanges.has(data.changeId)) { + break; + } + this.nickChanges.add(data.changeId); + + if (data.user === this.session.auth.id.toString() && data.nick !== this.session.nick) { + let currentSource = this.getFormattedName(); + this.session.nick = data.nick; + this.send({ source: currentSource, verb: 'NICK', target: false, params: this.session.nick }); + } else if (data.currentSource) { + this.send({ source: data.currentSource, verb: 'NICK', target: false, params: data.nick }); + } + + // FIXME: nick change with additional login does not have currentSource set yet! + // FIXME: subscribing to a channel is not propagated to existing sessions for same user + + //console.log('NICK CHANGE'); + //console.log(data); + + break; + } + } + }; } init() { @@ -100,6 +153,63 @@ class IRCConnection extends EventEmitter { this._socket.write(payload); } + fetchMessages(force) { + if (!force && this.fetching) { + this.dofetch = true; + return false; + } + this.fetching = true; + this.dofetch = false; + + let query = { + _id: { $gt: this.lastFetchedItem }, + rcpt: this.session.auth.id + }; + + let cursor = db.database.collection('chat').find(query); + + let clear = () => + cursor.close(() => { + if (this.dofetch) { + return setImmediate(() => this.fetchMessages(true)); + } else { + this.fetching = false; + } + }); + + let processNext = () => { + cursor.next((err, message) => { + if (err) { + this.server.logger.error( + { + err, + tnx: 'chat', + cid: this.id + }, + 'Failed iterating db cursor. %s', + err.message + ); + return; + } + if (!message) { + return clear(); + } + this.lastFetchedItem = message._id; + + if (message.channel && message.session.toString() === this.session.id.toString()) { + // ignore messages from self + return setImmediate(processNext); + } + + this.send({ source: message.nick, verb: 'PRIVMSG', message: message.message }); + + setImmediate(processNext); + }); + }; + + processNext(); + } + send(payload) { if (!this._socket || !this._socket.writable) { return; @@ -170,6 +280,11 @@ class IRCConnection extends EventEmitter { * @event */ _onClose(/* hadError */) { + this.subscriptions.forEach(subscriptionKey => { + this.unsubscribe(subscriptionKey); + }); + this.subscriptions.clear(); + if (this._closed) { return; } @@ -230,6 +345,7 @@ class IRCConnection extends EventEmitter { _resetSession() { this.session = { + id: new ObjectID(), state: 'AUTHORIZATION', remoteAddress: this.remoteAddress }; @@ -443,10 +559,41 @@ class IRCConnection extends EventEmitter { this.session.nick = this.session.nick || userData.username; this.session.ns = ns || 'root'; - next(null, { - id: userData._id, - username: userData.username - }); + db.database + .collection('channels') + .find({ + members: userData._id + }) + .project({ + _id: true, + channel: true + }) + .toArray((err, channels) => { + if (err) { + return next(err); + } + if (Array.isArray(channels)) { + channels.forEach(channelData => { + this.server.logger.info( + { + tnx: 'setup', + cid: this.id, + channel: channelData._id + }, + 'Joining %s to channel %s', + userData._id, + channelData._id + ); + this.subscribe([this.session.ns, '#', channelData._id].join('/')); + }); + } + this.subscribe([this.session.ns, '%', userData._id].join('/')); + + next(null, { + id: userData._id, + username: userData.username + }); + }); }); } ); @@ -459,6 +606,7 @@ class IRCConnection extends EventEmitter { let verifyUser = done => { if (nickview === this.session.auth.username.replace(/\./g, '')) { + // allow the same nick as username return done(); } db.users.collection('users').findOne({ @@ -512,7 +660,7 @@ class IRCConnection extends EventEmitter { } if (nickData.user.toString() === user.toString()) { - return next(null, false); + return next(null, nickData._id); } err = new Error('Requested nick is already in use'); @@ -528,17 +676,17 @@ class IRCConnection extends EventEmitter { } // try to remove old nicks - db.database.collection('nicks').deleteOne({ + db.database.collection('nicks').deleteMany({ ns, nickview: { $ne: nickview }, user - }, () => next(null, true)); + }, () => next(null, insertId)); }); }); } verifyNickChange(currentSource, next) { - this.getNick(this.session.nick, (err, changed) => { + this.getNick(this.session.nick, (err, nickId) => { if (err) { currentSource = currentSource || this.getFormattedName(); this.send({ verb: err.verb || 'ERR_UNAVAILRESOURCE', params: this.session.nick, message: err.message }); @@ -549,19 +697,42 @@ class IRCConnection extends EventEmitter { this.send({ source: currentSource, verb: 'NICK', target: false, params: this.session.nick }); } - if (changed) { - /* - this.notifyAll({ source: currentSource, verb: 'NICK', target: false, params: this.session.nick }, ()=>{ + let updatedSource = this.getFormattedName(); + if (nickId && currentSource !== updatedSource) { + let eventData = { + action: 'nick', + changeId: new ObjectID().toString(), + nickId: nickId.toString(), + user: this.session.auth.id.toString(), + old: currentSource, + new: updatedSource, + nick: this.session.nick + }; + this.subscriptions.forEach(subscriptionKey => { + this.publish(subscriptionKey, eventData); }); - */ } - this.server.nick(this); + //this.server.nick(this); return next(); }); } + subscribe(subscriptionKey) { + this.subscriptions.add(subscriptionKey); + this.server.subscribe(this, subscriptionKey, this.subscriber); + } + + unsubscribe(subscriptionKey) { + this.subscriptions.delete(subscriptionKey); + this.server.unsubscribe(this, subscriptionKey); + } + + publish(subscriptionKey, data) { + this.server.publish(this, subscriptionKey, data); + } + checkAuth() { if (!this.session.auth) { this.send({ verb: 'ERR_NOTREGISTERED', params: 'PRIVMSG', message: 'Authentication required to chat in this server' }); @@ -669,16 +840,15 @@ class IRCConnection extends EventEmitter { if (this.connectionPass) { this.authenticate(this.session.user, this.connectionPass, (err, auth) => { - if (err) { - this.server.quit(this, 'User registration failed. ' + err.message); + if (err || !auth) { + let message = err ? err.message : 'Authentication failed'; + this.send('ERROR :Closing link: (' + this.getFormattedName(true) + ') [' + message + ']'); + this.server.quit(this, 'User registration failed. ' + message); return this.close(); } - if (auth) { - this.session.auth = auth; - this.checkSessionStart(); - return this.verifyNickChange(false, next); - } - return next(); + this.session.auth = auth; + this.checkSessionStart(); + return this.verifyNickChange(false, next); }); } else { this.checkSessionStart(); @@ -711,8 +881,8 @@ class IRCConnection extends EventEmitter { let tryCount = 0; let tryGetChannel = () => { db.database.collection('channels').findOne({ - channelview, - ns: this.session.ns + ns: this.session.ns, + channelview }, (err, channelData) => { if (err) { this.send({ verb: 'ERR_FILEERROR', params: channel, message: err.message }); @@ -720,16 +890,39 @@ class IRCConnection extends EventEmitter { } if (channelData) { - this.server.join(channel, this); - return next(); + return db.database.collection('channels').findOneAndUpdate({ + _id: channelData._id + }, { + $addToSet: { + members: this.session.auth.id + } + }, { + returnOriginal: false + }, (err, result) => { + if (err) { + this.send({ verb: 'ERR_FILEERROR', params: channel, message: err.message }); + return next(); + } + + if (!result || !result.value) { + this.send({ verb: 'ERR_NOSUCHCHANNEL', params: channel, message: 'Could not open channel' }); + return next(); + } + + this.subscribe([this.session.ns, '#', channelData._id].join('/')); + + this.server.join(channel, this); + return next(); + }); } db.database.collection('channels').insertOne({ channel, channelview, ns: this.session.ns, - user: this.session.auth.id, - mode: [] + mode: [], + owner: this.session.auth.id, + members: [this.session.auth.id] }, err => { if (err) { if (err.code === 11000 && tryCount++ < 5) { @@ -767,9 +960,29 @@ class IRCConnection extends EventEmitter { return next(); } - this.server.leave(channel, this, params.slice(1).join(' ')); + db.database.collection('channels').findOneAndUpdate({ + ns: this.session.ns, + channelview: channel.toLowerCase().replace(/\./g, '') + }, { + $pull: { + members: this.session.auth.id + } + }, { + returnOriginal: false + }, (err, result) => { + if (err) { + this.send({ verb: 'ERR_FILEERROR', params: channel, message: err.message }); + return next(); + } - return next(); + if (!result || !result.value) { + this.send({ verb: 'ERR_NOSUCHCHANNEL', params: channel, message: 'Could not find channel' }); + return next(); + } + + this.server.leave(channel, this, params.slice(1).join(' ')); + return next(); + }); } command_PRIVMSG(params, next) { @@ -804,9 +1017,104 @@ class IRCConnection extends EventEmitter { return next(); } - this.server.send(target, this, params.slice(1).join(' ')); + let resolveTarget = done => { + if (/^[#&\s]/.test(target)) { + // channel + db.database.collection('channels').findOne({ + ns: this.session.ns, + channelview: target.toLowerCase().replace(/\./g, '') + }, (err, channelData) => { + if (err) { + this.send({ verb: 'ERR_FILEERROR', params: target, message: err.message }); + return next(); + } - return next(); + if (!channelData) { + this.send({ verb: 'ERR_NOSUCHCHANNEL', params: target, message: 'No such channel' }); + return next(); + } + + done(false, { + type: 'channel', + channel: channelData, + targets: channelData.members || [] + }); + }); + } else { + // nick + // channel + db.database.collection('nicks').findOne({ + ns: this.session.ns, + nickview: target.toLowerCase().replace(/\./g, '') + }, (err, nickData) => { + if (err) { + this.send({ verb: 'ERR_FILEERROR', params: target, message: err.message }); + return next(); + } + + if (!nickData) { + this.send({ verb: 'ERR_NOSUCHNICK', params: target, message: 'No such nick/channel' }); + return next(); + } + + done(false, { + type: 'nick', + nick: nickData, + targets: [nickData.user].concat(nickData.user.toString() !== this.session.auth.id.toString() ? this.session.auth.id : []) + }); + }); + } + }; + + resolveTarget((err, targetData) => { + if (err) { + this.send({ verb: 'ERR_FILEERROR', params: target, message: err.message }); + return next(); + } + + let msgId = new ObjectID(); + let time = new Date(); + let message = params.slice(1).join(' '); + let channel = (targetData.type === 'channel' && { id: targetData.channel._id, name: targetData.channel.channel }) || false; + let inserts = targetData.targets.map(user => { + let entry = { + insertOne: { + msgId, + channel, + session: this.session.id, + ns: this.session.ns, + from: this.session.auth.id, + nick: this.getFormattedName(), + rcpt: user, + time, + message + } + }; + return entry; + }); + + db.database.collection('chat').bulkWrite(inserts, { ordered: false }, err => { + if (err) { + this.send({ verb: 'ERR_FILEERROR', params: target, message: err.message }); + return next(); + } + + if (channel) { + this.publish([this.session.ns, '#', channel.id].join('/'), { + action: 'message', + msgId: msgId.toString() + }); + } else { + targetData.targets.map(user => { + this.publish([this.session.ns, '%', user].join('/'), { + action: 'message', + msgId: msgId.toString() + }); + }); + } + return next(); + }); + }); } command_CAP(params, next) { @@ -907,6 +1215,7 @@ class IRCConnection extends EventEmitter { case 'IDENTIFY': { return this.authenticate(this.session.user, params.join(' '), (err, auth) => { if (err) { + this.send('ERROR :Closing link: (' + this.getFormattedName(true) + ') [' + err.message + ']'); this.server.quit(this, 'User registration failed. ' + err.message); return this.close(); } @@ -982,6 +1291,7 @@ class IRCConnection extends EventEmitter { this.authenticate(user, password, (err, auth) => { this._authenticating = false; if (err) { + this.send('ERROR :Closing link: (' + this.getFormattedName(true) + ') [' + err.message + ']'); this.server.quit(this, 'User registration failed. ' + err.message); return this.close(); } diff --git a/lib/irc/server.js b/lib/irc/server.js index f8588fc9..39eb7ecc 100644 --- a/lib/irc/server.js +++ b/lib/irc/server.js @@ -1,5 +1,6 @@ 'use strict'; +const config = require('wild-config'); const EventEmitter = require('events'); const net = require('net'); const tls = require('tls'); @@ -7,6 +8,8 @@ const packageData = require('../../package.json'); const tlsOptions = require('../../imap-core/lib/tls-options'); const shared = require('nodemailer/lib/shared'); const IRCConnection = require('./connection'); +const tools = require('../tools'); +const redis = require('redis'); const CLOSE_TIMEOUT = 1 * 1000; // how much to wait until pending connections are terminated @@ -68,9 +71,57 @@ class IRCServer extends EventEmitter { this.server = (this.options.secure ? tls : net).createServer(this.options, socket => this._onConnect(socket)); + this.publisher = redis.createClient(tools.redisConfig(config.dbs.redis)); + this.subsriber = redis.createClient(tools.redisConfig(config.dbs.redis)); + + this.subscribers = new Map(); + this._listeners = new EventEmitter(); + this._listeners.setMaxListeners(0); + + this.subsriber.on('message', (channel, message) => { + if (this.subscribers.has(channel)) { + let data; + try { + data = JSON.parse(message); + } catch (E) { + return; + } + this._listeners.emit(channel, data); + } + }); + this._setListeners(); } + subscribe(session, channel, handler) { + if (!this.subscribers.has(channel)) { + this.subscribers.set(channel, new Map([[session, handler]])); + this.subsriber.subscribe(channel); + } else if (!this.subscribers.get(channel).has(session)) { + this.subscribers.get(channel).set(session, handler); + } else { + return; + } + this._listeners.addListener(channel, handler); + } + + unsubscribe(session, channel) { + if (!this.subscribers.has(channel) || !this.subscribers.get(channel).has(session)) { + return; + } + let handler = this.subscribers.get(channel).get(session); + this._listeners.removeListener(channel, handler); + this.subscribers.get(channel).delete(session); + if (!this.subscribers.get(channel).size) { + this.subscribers.delete(channel); + this.subsriber.unsubscribe(channel); + } + } + + publish(session, channel, data) { + this.publisher.publish(channel, JSON.stringify(data)); + } + _setListeners() { this.server.on('listening', () => this._onListening()); this.server.on('close', () => this._onClose()); @@ -275,29 +326,6 @@ class IRCServer extends EventEmitter { entry.nick = client.session.nick; this._nicks.set(client.session.nick.toLowerCase(), client); } - - send(target, client, message) { - let nameLC = target.toLowerCase(); - let clientName = client.getFormattedName(); - if (/^[#&]/.test(target)) { - if (!this._channels.has(nameLC)) { - client.send({ verb: 'ERR_NOSUCHNICK', params: target, message: 'No such nick/channel' }); - return; - } - let channel = this._channels.get(nameLC); - channel.clients.forEach(c => { - if (c !== client) { - c.send({ source: clientName, target, verb: 'PRIVMSG', message }); - } - }); - } else if (this._nicks.has(nameLC)) { - let client = this._nicks.get(nameLC); - client.send({ source: clientName, verb: 'PRIVMSG', message }); - } else { - client.send({ verb: 'ERR_NOSUCHNICK', params: target, message: 'No such nick/channel' }); - return; - } - } } module.exports = IRCServer;