mirror of
https://github.com/nodemailer/wildduck.git
synced 2025-01-08 00:48:52 +08:00
407 lines
13 KiB
JavaScript
407 lines
13 KiB
JavaScript
'use strict';
|
|
|
|
const config = require('wild-config');
|
|
const tools = require('./tools');
|
|
const consts = require('./consts');
|
|
const crypto = require('crypto');
|
|
const EventEmitter = require('events').EventEmitter;
|
|
const Redis = require('ioredis');
|
|
const log = require('npmlog');
|
|
const counters = require('./counters');
|
|
|
|
class ImapNotifier extends EventEmitter {
|
|
constructor(options) {
|
|
super();
|
|
|
|
this.database = options.database;
|
|
this.publisher = options.redis || new Redis(tools.redisConfig(config.dbs.redis));
|
|
this.counters = counters(this.publisher);
|
|
|
|
this.logger = options.logger || {
|
|
info: log.silly.bind(log, 'IMAP'),
|
|
debug: log.silly.bind(log, 'IMAP'),
|
|
error: log.error.bind(log, 'IMAP')
|
|
};
|
|
|
|
if (options.pushOnly) {
|
|
// do not need to set up the following if we do not care about updates
|
|
return;
|
|
}
|
|
|
|
this.connectionSessions = new WeakMap();
|
|
|
|
// Subscriber needs its own client connection. This is relevant only in the context of IMAP
|
|
this.subsriber = new Redis(tools.redisConfig(config.dbs.redis));
|
|
this._listeners = new EventEmitter();
|
|
this._listeners.setMaxListeners(0);
|
|
|
|
let publishTimers = new Map();
|
|
let scheduleDataEvent = ev => {
|
|
let data;
|
|
|
|
let fire = () => {
|
|
clearTimeout(data.timeout);
|
|
publishTimers.delete(ev);
|
|
this._listeners.emit(ev);
|
|
this._listeners.emit(ev.split(':').shift() + ':*');
|
|
};
|
|
|
|
if (publishTimers.has(ev)) {
|
|
data = publishTimers.get(ev) || {};
|
|
clearTimeout(data.timeout);
|
|
data.count++;
|
|
|
|
if (data.initial < Date.now() - 1000) {
|
|
// if the event has been held back already for a second, then fire immediatelly
|
|
return fire();
|
|
}
|
|
} else {
|
|
// initialize new event object
|
|
data = {
|
|
ev,
|
|
count: 1,
|
|
initial: Date.now(),
|
|
timeout: null
|
|
};
|
|
}
|
|
|
|
data.timeout = setTimeout(fire, 100);
|
|
data.timeout.unref();
|
|
|
|
if (!publishTimers.has(ev)) {
|
|
publishTimers.set(ev, data);
|
|
}
|
|
};
|
|
|
|
this.subsriber.on('message', (channel, message) => {
|
|
if (channel === 'wd_events') {
|
|
try {
|
|
let data = JSON.parse(message);
|
|
if (data.e && !data.p) {
|
|
scheduleDataEvent(data.e);
|
|
} else if (data.e) {
|
|
this._listeners.emit(data.e, data.p);
|
|
this._listeners.emit(data.e.split(':').shift() + ':*', data.p);
|
|
}
|
|
} catch (E) {
|
|
//
|
|
}
|
|
}
|
|
});
|
|
this.subsriber.subscribe('wd_events');
|
|
}
|
|
|
|
/**
|
|
* Generates hashed event names for mailbox:user pairs
|
|
*
|
|
* @param {String} path
|
|
* @param {String} user
|
|
* @returns {String} md5 hex
|
|
*/
|
|
_eventName(user, path) {
|
|
if (path.length >= 32) {
|
|
path = crypto
|
|
.createHash('md5')
|
|
.update(path)
|
|
.digest('hex');
|
|
}
|
|
return user + ':' + path;
|
|
}
|
|
|
|
/**
|
|
* Registers an event handler for path:userid events
|
|
*
|
|
* @param {String} user
|
|
* @param {String} path
|
|
* @param {Function} handler Function to run once there are new entries in the journal
|
|
*/
|
|
addListener(session, path, handler) {
|
|
let eventName = this._eventName(session.user.id.toString(), path);
|
|
this._listeners.addListener(eventName, handler);
|
|
}
|
|
|
|
/**
|
|
* Unregisters an event handler for path:user events
|
|
*
|
|
* @param {String} user
|
|
* @param {String} path
|
|
* @param {Function} handler Function to run once there are new entries in the journal
|
|
*/
|
|
removeListener(session, path, handler) {
|
|
let eventName = this._eventName(session.user.id.toString(), path);
|
|
this._listeners.removeListener(eventName, handler);
|
|
|
|
this.logger.debug('[%s] Removed journal listener from %s ("%s:%s")', session.id, eventName, session.user.username, path);
|
|
}
|
|
|
|
/**
|
|
* Stores multiple journal entries to db
|
|
*
|
|
* @param {String} user
|
|
* @param {String} path
|
|
* @param {Array|Object} entries An array of entries to be journaled
|
|
* @param {Function} callback Runs once the entry is either stored or an error occurred
|
|
*/
|
|
addEntries(user, path, entries, callback) {
|
|
if (entries && !Array.isArray(entries)) {
|
|
entries = [entries];
|
|
} else if (!entries || !entries.length) {
|
|
return callback(null, false);
|
|
}
|
|
|
|
// find list of message ids that need to be updated
|
|
let updated = entries.filter(entry => !entry.modseq && entry.message).map(entry => entry.message);
|
|
|
|
let getMailbox = next => {
|
|
let mailbox;
|
|
|
|
if (user && typeof user === 'object' && user._id) {
|
|
mailbox = user;
|
|
user = false;
|
|
}
|
|
|
|
let mailboxQuery = mailbox
|
|
? {
|
|
_id: mailbox._id
|
|
}
|
|
: {
|
|
user,
|
|
path
|
|
};
|
|
|
|
if (updated.length) {
|
|
// provision new modseq value
|
|
return this.database.collection('mailboxes').findOneAndUpdate(mailboxQuery, {
|
|
$inc: {
|
|
modifyIndex: 1
|
|
}
|
|
}, {
|
|
returnOriginal: false
|
|
}, (err, item) => {
|
|
if (err) {
|
|
return callback(err);
|
|
}
|
|
|
|
next(null, item && item.value);
|
|
});
|
|
}
|
|
if (mailbox) {
|
|
return next(null, mailbox);
|
|
}
|
|
this.database.collection('mailboxes').findOne(mailboxQuery, next);
|
|
};
|
|
|
|
// final action to push entries to journal
|
|
let pushToJournal = () => {
|
|
this.database.collection('journal').insertMany(entries, {
|
|
w: 1,
|
|
ordered: false
|
|
}, (err, r) => {
|
|
if (err) {
|
|
return callback(err);
|
|
}
|
|
|
|
setImmediate(() => this.updateCounters(entries));
|
|
|
|
return callback(null, r.insertedCount);
|
|
});
|
|
};
|
|
|
|
getMailbox((err, mailboxData) => {
|
|
if (err) {
|
|
return callback(err);
|
|
}
|
|
if (!mailboxData) {
|
|
return callback(null, new Error('Selected mailbox does not exist'));
|
|
}
|
|
|
|
let modseq = mailboxData.modifyIndex;
|
|
let created = new Date();
|
|
entries.forEach(entry => {
|
|
entry.modseq = entry.modseq || modseq;
|
|
entry.created = entry.created || created;
|
|
entry.mailbox = entry.mailbox || mailboxData._id;
|
|
entry.user = mailboxData.user;
|
|
});
|
|
|
|
if (updated.length) {
|
|
this.logger.debug('Updating message collection %s %s entries', mailboxData._id, updated.length);
|
|
this.database.collection('messages').updateMany({
|
|
_id: {
|
|
$in: updated
|
|
},
|
|
mailbox: mailboxData._id
|
|
}, {
|
|
// only update modseq if the new value is larger than old one
|
|
$max: {
|
|
modseq
|
|
}
|
|
}, err => {
|
|
if (err) {
|
|
this.logger.error('Error updating modseq for messages. %s', err.message);
|
|
}
|
|
pushToJournal();
|
|
});
|
|
} else {
|
|
pushToJournal();
|
|
}
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Sends a notification that there are new updates in the selected mailbox
|
|
*
|
|
* @param {String} user
|
|
* @param {String} path
|
|
*/
|
|
fire(user, path, payload) {
|
|
let eventName = this._eventName(user, path);
|
|
setImmediate(() => {
|
|
let data = JSON.stringify({
|
|
e: eventName,
|
|
p: payload
|
|
});
|
|
this.publisher.publish('wd_events', data);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Returns all entries from the journal that have higher than provided modification index
|
|
*
|
|
* @param {String} session
|
|
* @param {String} path
|
|
* @param {Number} modifyIndex Last known modification id
|
|
* @param {Function} callback Returns update entries as an array
|
|
*/
|
|
getUpdates(session, path, modifyIndex, callback) {
|
|
modifyIndex = Number(modifyIndex) || 0;
|
|
let user = session.user.id;
|
|
|
|
this.database.collection('mailboxes').findOne({
|
|
user,
|
|
path
|
|
}, (err, mailbox) => {
|
|
if (err) {
|
|
return callback(err);
|
|
}
|
|
if (!mailbox) {
|
|
return callback(null, 'NONEXISTENT');
|
|
}
|
|
this.database
|
|
.collection('journal')
|
|
.find({
|
|
mailbox: mailbox._id,
|
|
modseq: {
|
|
$gt: modifyIndex
|
|
}
|
|
})
|
|
.sort([['modseq', 1]])
|
|
.toArray(callback);
|
|
});
|
|
}
|
|
|
|
updateCounters(entries) {
|
|
if (!entries) {
|
|
return;
|
|
}
|
|
let counters = new Map();
|
|
(Array.isArray(entries) ? entries : [].concat(entries || [])).forEach(entry => {
|
|
let m = entry.mailbox.toString();
|
|
if (!counters.has(m)) {
|
|
counters.set(m, { total: 0, unseen: 0, unseenChange: false });
|
|
}
|
|
switch (entry && entry.command) {
|
|
case 'EXISTS':
|
|
counters.get(m).total += 1;
|
|
if (entry.unseen) {
|
|
counters.get(m).unseen += 1;
|
|
}
|
|
break;
|
|
case 'EXPUNGE':
|
|
counters.get(m).total -= 1;
|
|
if (entry.unseen) {
|
|
counters.get(m).unseen -= 1;
|
|
}
|
|
break;
|
|
case 'FETCH':
|
|
if (entry.unseen) {
|
|
// either increase or decrese
|
|
counters.get(m).unseen += typeof entry.unseen === 'number' ? entry.unseen : 1;
|
|
} else if (entry.unseenChange) {
|
|
// volatile change, just clear the cache
|
|
counters.get(m).unseenChange = true;
|
|
}
|
|
break;
|
|
}
|
|
});
|
|
|
|
let pos = 0;
|
|
let rows = Array.from(counters);
|
|
let updateCounter = () => {
|
|
if (pos >= rows.length) {
|
|
return;
|
|
}
|
|
let row = rows[pos++];
|
|
if (!row || !row.length) {
|
|
return updateCounter();
|
|
}
|
|
let mailbox = row[0];
|
|
let delta = row[1];
|
|
|
|
this.counters.cachedcounter('total:' + mailbox, delta.total, consts.MAILBOX_COUNTER_TTL, () => {
|
|
if (delta.unseenChange) {
|
|
// Message info changed in mailbox, so just te be sure, clear the unseen counter as well
|
|
// Unseen counter is more volatile and also easier to count (usually only a small number on indexed messages)
|
|
this.publisher.del('unseen:' + mailbox, updateCounter);
|
|
} else if (delta.unseen) {
|
|
this.counters.cachedcounter('unseen:' + mailbox, delta.unseen, consts.MAILBOX_COUNTER_TTL, updateCounter);
|
|
} else {
|
|
setImmediate(updateCounter);
|
|
}
|
|
});
|
|
};
|
|
|
|
updateCounter();
|
|
}
|
|
|
|
allocateConnection(data, callback) {
|
|
if (!data || !data.session || this.connectionSessions.has(data.session)) {
|
|
return callback(null, true);
|
|
}
|
|
|
|
let rlkey = 'lim:' + data.service;
|
|
this.counters.limitedcounter(rlkey, data.user, 1, data.limit || 15, (err, res) => {
|
|
if (err) {
|
|
return callback(err);
|
|
}
|
|
|
|
if (!res.success) {
|
|
return callback(null, false);
|
|
}
|
|
|
|
this.connectionSessions.set(data.session, { service: data.service, user: data.user });
|
|
|
|
return callback(null, true);
|
|
});
|
|
}
|
|
|
|
releaseConnection(data, callback) {
|
|
// unauthenticated sessions are unknown
|
|
if (!data || !data.session || !this.connectionSessions.has(data.session)) {
|
|
return callback(null, true);
|
|
}
|
|
|
|
let entry = this.connectionSessions.get(data.session);
|
|
this.connectionSessions.delete(data.session);
|
|
|
|
let rlkey = 'lim:' + entry.service;
|
|
this.counters.limitedcounter(rlkey, entry.user, -1, 0, err => {
|
|
if (err) {
|
|
this.logger.debug('[%s] Failed to release connection for user %s. %s', data.session.id, entry.user, err.message);
|
|
}
|
|
return callback(null, true);
|
|
});
|
|
}
|
|
}
|
|
|
|
module.exports = ImapNotifier;
|