wildduck/lib/imap-notifier.js

431 lines
14 KiB
JavaScript
Raw Normal View History

2017-03-06 05:45:50 +08:00
'use strict';
2017-07-16 19:37:33 +08:00
const config = require('wild-config');
2017-03-30 01:06:09 +08:00
const tools = require('./tools');
2017-07-18 16:17:36 +08:00
const consts = require('./consts');
2017-03-06 05:45:50 +08:00
const EventEmitter = require('events').EventEmitter;
2017-10-03 16:18:23 +08:00
const Redis = require('ioredis');
2017-03-30 16:44:18 +08:00
const log = require('npmlog');
2017-07-18 16:17:36 +08:00
const counters = require('./counters');
2021-02-26 20:00:13 +08:00
const { publish, MARKED_SPAM, MARKED_HAM } = require('./events');
2017-03-06 05:45:50 +08:00
class ImapNotifier extends EventEmitter {
constructor(options) {
super();
this.database = options.database;
2017-10-03 16:18:23 +08:00
this.publisher = options.redis || new Redis(tools.redisConfig(config.dbs.redis));
this.counters = counters(this.publisher);
2017-03-30 01:06:09 +08:00
2017-03-06 05:45:50 +08:00
this.logger = options.logger || {
2017-03-30 16:44:18 +08:00
info: log.silly.bind(log, 'IMAP'),
debug: log.silly.bind(log, 'IMAP'),
error: log.error.bind(log, 'IMAP')
2017-03-06 05:45:50 +08:00
};
if (options.pushOnly) {
// do not need to set up the following if we do not care about updates
return;
}
this.connectionSessions = new WeakMap();
2017-07-17 21:32:31 +08:00
// Subscriber needs its own client connection. This is relevant only in the context of IMAP
this.subscriber = new Redis(tools.redisConfig(config.dbs.redis));
2017-03-06 05:45:50 +08:00
this._listeners = new EventEmitter();
this._listeners.setMaxListeners(0);
let publishTimers = new Map();
let scheduleDataEvent = ev => {
let data;
let fire = () => {
clearTimeout(data.timeout);
publishTimers.delete(ev);
this._listeners.emit(ev);
};
if (publishTimers.has(ev)) {
data = publishTimers.get(ev) || {};
clearTimeout(data.timeout);
data.count++;
if (data.initial < Date.now() - 1000) {
// if the event has been held back already for a second, then fire immediatelly
return fire();
}
} else {
// initialize new event object
data = {
ev,
count: 1,
initial: Date.now(),
timeout: null
};
}
data.timeout = setTimeout(() => {
fire();
}, 100);
data.timeout.unref();
if (!publishTimers.has(ev)) {
publishTimers.set(ev, data);
}
};
this.subscriber.on('message', (channel, message) => {
if (channel === 'wd_events') {
let data;
// if e present at beginning, check if p also is present
// if no p -> no json parse
// if p -> json parse ONLY p
// if e not in beginning but p is -> json parse whole
let needFullParse = true;
if (message.length === 32 && message[2] === 'e' && message[5] === '"' && message[6 + 24] === '"') {
// there is only e, no p -> no need for full parse
needFullParse = false;
}
if (!needFullParse) {
// get e and continue
data = { e: message.slice(6, 6 + 24) };
} else {
// full parse
try {
data = JSON.parse(message);
} catch (E) {
return;
}
}
if (this._listeners._events[data.e]?.length > 0) {
// do not schedule or fire/emit empty events
if (data.e && !data.p) {
// events without payload are scheduled, these are notifications about changes in journal
scheduleDataEvent(data.e);
} else if (data.e) {
// events with payload are triggered immediatelly, these are actions for doing something
this._listeners.emit(data.e, data.p);
}
}
}
});
2017-03-06 05:45:50 +08:00
this.subscriber.subscribe('wd_events');
2017-03-06 05:45:50 +08:00
}
/**
* Registers an event handler for userid events
2017-03-06 05:45:50 +08:00
*
* @param {Object} session
2017-03-06 05:45:50 +08:00
* @param {Function} handler Function to run once there are new entries in the journal
*/
addListener(session, handler) {
this._listeners.addListener(session.user.id.toString(), handler);
2017-11-22 22:22:36 +08:00
this.logger.debug('[%s] New journal listener for %s (%s)', session.id, session.user.id.toString(), session.user.username);
2017-03-06 05:45:50 +08:00
}
/**
* Unregisters an event handler for path:user events
2017-03-06 05:45:50 +08:00
*
* @param {Object} session
2017-03-06 05:45:50 +08:00
* @param {Function} handler Function to run once there are new entries in the journal
*/
removeListener(session, handler) {
this._listeners.removeListener(session.user.id.toString(), handler);
2017-03-06 05:45:50 +08:00
this.logger.debug('[%s] Removed journal listener from %s (%s)', session.id, session.user.id.toString(), session.user.username);
2017-03-06 05:45:50 +08:00
}
/**
* Stores multiple journal entries to db
*
* @param {String} mailbox Mailbox ID
2017-03-06 05:45:50 +08:00
* @param {Array|Object} entries An array of entries to be journaled
* @param {Function} callback Runs once the entry is either stored or an error occurred
*/
addEntries(mailbox, entries, callback) {
2017-03-06 05:45:50 +08:00
if (entries && !Array.isArray(entries)) {
entries = [entries];
} else if (!entries || !entries.length) {
return callback(null, false);
}
2017-04-13 02:59:30 +08:00
// find list of message ids that need to be updated
let updated = entries.filter(entry => !entry.modseq && entry.message).map(entry => entry.message);
let getMailbox = next => {
if (!mailbox) {
return next(null, false);
}
let mailboxData;
let mailboxQuery;
if (mailbox._id) {
// we were already provided a mailbox object
mailboxQuery = {
2017-06-03 14:51:58 +08:00
_id: mailbox._id
};
mailboxData = mailbox;
} else {
mailboxQuery = {
_id: mailbox
};
}
2017-04-13 02:59:30 +08:00
if (updated.length) {
// provision new modseq value
2017-11-22 22:22:36 +08:00
return this.database.collection('mailboxes').findOneAndUpdate(
mailboxQuery,
{
$inc: {
modifyIndex: 1
}
},
{
2021-06-15 15:47:18 +08:00
returnDocument: 'after'
2017-11-22 22:22:36 +08:00
},
(err, item) => {
if (err) {
return callback(err);
}
2017-11-22 22:22:36 +08:00
next(null, item && item.value);
}
);
}
if (mailboxData) {
return next(null, mailboxData);
2017-03-06 05:45:50 +08:00
}
this.database.collection('mailboxes').findOne(mailboxQuery, next);
};
2017-04-13 02:59:30 +08:00
// final action to push entries to journal
2021-02-26 20:00:13 +08:00
let pushToJournal = async () => {
for (let entry of entries) {
if (entry.command === 'EXISTS' && entry.junk) {
await publish(this.publisher, {
ev: entry.junk > 0 ? MARKED_SPAM : MARKED_HAM,
user: entry.user.toString(),
mailbox: entry.mailbox.toString(),
message: entry.message.toString(),
id: entry.uid
});
}
}
2017-07-18 16:17:36 +08:00
2021-02-26 20:00:13 +08:00
let r = await this.database.collection('journal').insertMany(entries, {
writeConcern: 1,
ordered: false
});
2017-07-18 16:17:36 +08:00
2021-02-26 20:00:13 +08:00
setImmediate(() => this.updateCounters(entries));
return r.insertedCount;
2017-04-13 02:59:30 +08:00
};
2017-08-11 03:20:21 +08:00
getMailbox((err, mailboxData) => {
2017-03-06 05:45:50 +08:00
if (err) {
return callback(err);
}
2017-08-11 03:20:21 +08:00
if (!mailboxData) {
2018-10-12 16:13:54 +08:00
let err = new Error('Selected mailbox does not exist');
2021-05-22 01:14:43 +08:00
err.responseCode = 404;
2018-10-12 16:13:54 +08:00
err.code = 'NoSuchMailbox';
return callback(null, err);
2017-03-06 05:45:50 +08:00
}
2017-08-11 03:20:21 +08:00
let modseq = mailboxData.modifyIndex;
2017-04-13 02:59:30 +08:00
let created = new Date();
2017-04-13 02:59:30 +08:00
entries.forEach(entry => {
entry.modseq = entry.modseq || modseq;
entry.created = entry.created || created;
2017-08-11 03:20:21 +08:00
entry.mailbox = entry.mailbox || mailboxData._id;
entry.user = mailboxData.user;
2017-04-13 02:59:30 +08:00
});
2017-03-06 05:45:50 +08:00
2017-04-13 02:59:30 +08:00
if (updated.length) {
2017-08-11 03:20:21 +08:00
this.logger.debug('Updating message collection %s %s entries', mailboxData._id, updated.length);
2017-11-22 22:22:36 +08:00
this.database.collection('messages').updateMany(
{
_id: {
$in: updated
},
mailbox: mailboxData._id
},
2017-11-22 22:22:36 +08:00
{
// only update modseq if the new value is larger than old one
$max: {
modseq
}
},
err => {
if (err) {
this.logger.error('Error updating modseq for messages. %s', err.message);
}
2021-02-26 20:00:13 +08:00
pushToJournal()
.then(count => callback(null, count))
.catch(err => callback(err));
2017-04-13 02:59:30 +08:00
}
2017-11-22 22:22:36 +08:00
);
2017-04-13 02:59:30 +08:00
} else {
2021-02-26 20:00:13 +08:00
pushToJournal()
.then(count => callback(null, count))
.catch(err => callback(err));
2017-04-13 02:59:30 +08:00
}
2017-03-06 05:45:50 +08:00
});
}
/**
* Sends a notification that there are new updates in the selected mailbox
*
* @param {String} user User ID
2017-03-06 05:45:50 +08:00
*/
fire(user, payload) {
if (!user) {
return;
}
2017-03-06 05:45:50 +08:00
setImmediate(() => {
let data = JSON.stringify({
e: user.toString(),
p: payload
});
this.publisher.publish('wd_events', data);
2017-03-06 05:45:50 +08:00
});
}
/**
* Returns all entries from the journal that have higher than provided modification index
*
* @param {String} mailbox Mailbox ID
2017-03-06 05:45:50 +08:00
* @param {Number} modifyIndex Last known modification id
* @param {Function} callback Returns update entries as an array
*/
getUpdates(mailbox, modifyIndex, callback) {
2017-03-06 05:45:50 +08:00
modifyIndex = Number(modifyIndex) || 0;
this.database
.collection('journal')
.find({
mailbox: mailbox._id || mailbox,
modseq: {
$gt: modifyIndex
2017-11-22 22:22:36 +08:00
}
})
.toArray(callback);
2017-03-06 05:45:50 +08:00
}
2017-07-18 16:17:36 +08:00
updateCounters(entries) {
if (!entries) {
return;
}
let counters = new Map();
(Array.isArray(entries) ? entries : [].concat(entries || [])).forEach(entry => {
let m = entry.mailbox.toString();
if (!counters.has(m)) {
2017-07-20 21:10:36 +08:00
counters.set(m, { total: 0, unseen: 0, unseenChange: false });
2017-07-18 16:17:36 +08:00
}
2017-07-18 16:17:36 +08:00
switch (entry && entry.command) {
case 'EXISTS':
2017-07-20 21:10:36 +08:00
counters.get(m).total += 1;
if (entry.unseen) {
counters.get(m).unseen += 1;
}
2017-07-18 16:17:36 +08:00
break;
case 'EXPUNGE':
2017-07-20 21:10:36 +08:00
counters.get(m).total -= 1;
if (entry.unseen) {
counters.get(m).unseen -= 1;
}
break;
case 'FETCH':
if (entry.unseen) {
// either increase or decrese
counters.get(m).unseen += typeof entry.unseen === 'number' ? entry.unseen : 1;
} else if (entry.unseenChange) {
// volatile change, just clear the cache
counters.get(m).unseenChange = true;
}
2017-07-18 16:17:36 +08:00
break;
}
});
let pos = 0;
let rows = Array.from(counters);
let updateCounter = () => {
if (pos >= rows.length) {
return;
}
let row = rows[pos++];
if (!row || !row.length) {
return updateCounter();
}
let mailbox = row[0];
let delta = row[1];
this.counters.cachedcounter('total:' + mailbox, delta.total, consts.MAILBOX_COUNTER_TTL, () => {
2017-07-20 21:10:36 +08:00
if (delta.unseenChange) {
// Message info changed in mailbox, so just te be sure, clear the unseen counter as well
// Unseen counter is more volatile and also easier to count (usually only a small number on indexed messages)
this.publisher.del('unseen:' + mailbox, updateCounter);
} else if (delta.unseen) {
this.counters.cachedcounter('unseen:' + mailbox, delta.unseen, consts.MAILBOX_COUNTER_TTL, updateCounter);
2017-07-20 21:10:36 +08:00
} else {
setImmediate(updateCounter);
}
});
2017-07-18 16:17:36 +08:00
};
updateCounter();
}
allocateConnection(data, callback) {
if (!data || !data.session || this.connectionSessions.has(data.session)) {
return callback(null, true);
}
let rlkey = 'lim:' + data.service;
this.counters.limitedcounter(rlkey, data.user, 1, data.limit || 15, (err, res) => {
if (err) {
return callback(err);
}
if (!res.success) {
return callback(null, false);
}
this.connectionSessions.set(data.session, { service: data.service, user: data.user });
return callback(null, true);
});
}
releaseConnection(data, callback) {
// unauthenticated sessions are unknown
if (!data || !data.session || !this.connectionSessions.has(data.session)) {
return callback(null, true);
}
let entry = this.connectionSessions.get(data.session);
this.connectionSessions.delete(data.session);
let rlkey = 'lim:' + entry.service;
this.counters.limitedcounter(rlkey, entry.user, -1, 0, err => {
if (err) {
this.logger.debug('[%s] Failed to release connection for user %s. %s', data.session.id, entry.user, err.message);
}
return callback(null, true);
});
}
2017-03-06 05:45:50 +08:00
}
module.exports = ImapNotifier;