'use strict'; const crypto = require('crypto'); const EventEmitter = require('events').EventEmitter; const redis = require('redis'); class ImapNotifier extends EventEmitter { constructor(options) { super(); this.database = options.database; this.subsriber = redis.createClient(); this.publisher = redis.createClient(); let logfunc = (...args) => { let level = args.shift() || 'DEBUG'; let message = args.shift() || ''; console.log([level].concat(message || '').join(' '), ...args); // eslint-disable-line no-console }; this.logger = options.logger || { info: logfunc.bind(null, 'INFO'), debug: logfunc.bind(null, 'DEBUG'), error: logfunc.bind(null, 'ERROR') }; this._listeners = new EventEmitter(); this._listeners.setMaxListeners(0); this.publishTimer = false; this.subsriber.on('message', (channel, message) => { if (channel === 'wd_events') { try { let data = JSON.parse(message); this._listeners.emit(data.e, data.p); } catch (E) { // } } }); this.subsriber.subscribe('wd_events'); } /** * Generates hashed event names for mailbox:username pairs * * @param {String} path * @param {String} username * @returns {String} md5 hex */ _eventName(path, username) { return crypto.createHash('md5').update(username + ':' + path).digest('hex'); } /** * Registers an event handler for path:username events * * @param {String} username * @param {String} path * @param {Function} handler Function to run once there are new entries in the journal */ addListener(session, path, handler) { let eventName = this._eventName(session.user.username, path); this._listeners.addListener(eventName, handler); this.logger.debug('New journal listener for %s ("%s:%s")', eventName, session.user.username, path); } /** * Unregisters an event handler for path:username events * * @param {String} username * @param {String} path * @param {Function} handler Function to run once there are new entries in the journal */ removeListener(session, path, handler) { let eventName = this._eventName(session.user.username, path); this._listeners.removeListener(eventName, handler); this.logger.debug('Removed journal listener from %s ("%s:%s")', eventName, session.user.username, path); } /** * Stores multiple journal entries to db * * @param {String} username * @param {String} path * @param {Array|Object} entries An array of entries to be journaled * @param {Function} callback Runs once the entry is either stored or an error occurred */ addEntries(username, path, entries, callback) { if (entries && !Array.isArray(entries)) { entries = [entries]; } else if (!entries || !entries.length) { return callback(null, false); } entries.forEach(entry => { entry.created = new Date(); }); this.database.collection('mailboxes').findOneAndUpdate({ username, path }, { $inc: { modifyIndex: entries.length } }, {}, (err, item) => { if (err) { return callback(err); } if (!item || !item.value) { // was not able to acquire a lock return callback(null, new Error('Selected mailbox does not exist')); } let mailbox = item.value; let startIndex = mailbox.modifyIndex; let updated = 0; let updateNext = () => { if (updated >= entries.length) { return this.database.collection('journal').insertMany(entries, { w: 1, ordered: false }, (err, r) => { if (err) { return callback(err); } return callback(null, r.insertedCount); }); } let entry = entries[updated++]; entry.mailbox = mailbox._id; entry.modseq = ++startIndex; if (entry.message) { this.database.collection('messages').findOneAndUpdate({ _id: entry.message, modseq: { $lt: entry.modseq } }, { $set: { modseq: entry.modseq } }, {}, err => { if (err) { this.logger.error('Error updating modseq for message %s. %s', entry.message, err.message); } updateNext(); }); } else { updateNext(); } }; updateNext(); }); } /** * Sends a notification that there are new updates in the selected mailbox * * @param {String} username * @param {String} path */ fire(username, path, payload) { let eventName = this._eventName(username, path); setImmediate(() => { let data = JSON.stringify({ e: eventName, p: payload }); this.publisher.publish('wd_events', data); }); } /** * Returns all entries from the journal that have higher than provided modification index * * @param {String} session * @param {String} path * @param {Number} modifyIndex Last known modification id * @param {Function} callback Returns update entries as an array */ getUpdates(session, path, modifyIndex, callback) { modifyIndex = Number(modifyIndex) || 0; let username = session.user.username; this.database.collection('mailboxes').findOne({ username, path }, (err, mailbox) => { if (err) { return callback(err); } if (!mailbox) { return callback(null, 'NONEXISTENT'); } this.database.collection('journal').find({ mailbox: mailbox._id, modseq: { $gt: modifyIndex } }).toArray(callback); }); } } module.exports = ImapNotifier;