wildduck/lib/imap-notifier.js

290 lines
8.8 KiB
JavaScript
Raw Normal View History

2017-03-06 05:45:50 +08:00
'use strict';
2017-03-30 01:06:09 +08:00
const config = require('config');
const tools = require('./tools');
2017-03-06 05:45:50 +08:00
const crypto = require('crypto');
const EventEmitter = require('events').EventEmitter;
const redis = require('redis');
2017-03-30 16:44:18 +08:00
const log = require('npmlog');
2017-03-06 05:45:50 +08:00
class ImapNotifier extends EventEmitter {
constructor(options) {
super();
this.database = options.database;
2017-03-30 01:06:09 +08:00
this.publisher = redis.createClient(tools.redisConfig(config.redis));
2017-03-06 05:45:50 +08:00
this.logger = options.logger || {
2017-03-30 16:44:18 +08:00
info: log.silly.bind(log, 'IMAP'),
debug: log.silly.bind(log, 'IMAP'),
error: log.error.bind(log, 'IMAP')
2017-03-06 05:45:50 +08:00
};
if (options.pushOnly) {
// do not need to set up the following if we do not care about updates
return;
}
2017-03-30 01:06:09 +08:00
this.subsriber = redis.createClient(tools.redisConfig(config.redis));
2017-03-06 05:45:50 +08:00
this._listeners = new EventEmitter();
this._listeners.setMaxListeners(0);
let publishTimers = new Map();
let scheduleDataEvent = ev => {
let data;
let fire = () => {
clearTimeout(data.timeout);
publishTimers.delete(ev);
this._listeners.emit(ev);
};
if (publishTimers.has(ev)) {
data = publishTimers.get(ev) || {};
clearTimeout(data.timeout);
data.count++;
if (data.initial < Date.now() - 1000) {
// if the event has been held back already for a second, the fire immediatelly
return fire();
}
} else {
// initialize new event object
data = {
ev,
count: 1,
initial: Date.now(),
timeout: null
};
}
data.timeout = setTimeout(fire, 100);
data.timeout.unref();
if (!publishTimers.has(ev)) {
publishTimers.set(ev, data);
}
};
this.subsriber.on('message', (channel, message) => {
if (channel === 'wd_events') {
try {
let data = JSON.parse(message);
if (data.e && !data.p) {
scheduleDataEvent(data.e);
} else {
this._listeners.emit(data.e, data.p);
}
} catch (E) {
//
}
}
});
this.subsriber.subscribe('wd_events');
2017-03-06 05:45:50 +08:00
}
/**
* Generates hashed event names for mailbox:user pairs
2017-03-06 05:45:50 +08:00
*
* @param {String} path
* @param {String} user
2017-03-06 05:45:50 +08:00
* @returns {String} md5 hex
*/
_eventName(path, user) {
return crypto.createHash('md5').update(user.toString() + ':' + path).digest('hex');
2017-03-06 05:45:50 +08:00
}
/**
* Registers an event handler for path:userid events
2017-03-06 05:45:50 +08:00
*
* @param {String} user
2017-03-06 05:45:50 +08:00
* @param {String} path
* @param {Function} handler Function to run once there are new entries in the journal
*/
addListener(session, path, handler) {
let eventName = this._eventName(session.user.id.toString(), path);
2017-03-06 05:45:50 +08:00
this._listeners.addListener(eventName, handler);
this.logger.debug('New journal listener for %s ("%s:%s")', eventName, session.user.username, path);
}
/**
* Unregisters an event handler for path:user events
2017-03-06 05:45:50 +08:00
*
* @param {String} user
2017-03-06 05:45:50 +08:00
* @param {String} path
* @param {Function} handler Function to run once there are new entries in the journal
*/
removeListener(session, path, handler) {
let eventName = this._eventName(session.user.id.toString(), path);
2017-03-06 05:45:50 +08:00
this._listeners.removeListener(eventName, handler);
this.logger.debug('Removed journal listener from %s ("%s:%s")', eventName, session.user.username, path);
}
/**
* Stores multiple journal entries to db
*
* @param {String} user
2017-03-06 05:45:50 +08:00
* @param {String} path
* @param {Array|Object} entries An array of entries to be journaled
* @param {Function} callback Runs once the entry is either stored or an error occurred
*/
addEntries(user, path, entries, callback) {
2017-03-06 05:45:50 +08:00
if (entries && !Array.isArray(entries)) {
entries = [entries];
} else if (!entries || !entries.length) {
return callback(null, false);
}
let modseqsNeeded = entries.length;
entries.forEach(entry => {
if (entry.modseq) {
modseqsNeeded--;
}
entry.created = new Date();
});
let mailbox;
if (user && typeof user === 'object' && user._id) {
mailbox = user;
user = false;
}
let mailboxQuery = mailbox ? {
_id: mailbox._id
} : {
user,
2017-03-06 05:45:50 +08:00
path
};
let getMailbox = next => {
if (modseqsNeeded) {
return this.database.collection('mailboxes').findOneAndUpdate(mailboxQuery, {
$inc: {
modifyIndex: modseqsNeeded
}
}, {}, (err, item) => {
if (err) {
return callback(err);
}
next(null, item && item.value);
});
}
if (mailbox) {
return next(null, mailbox);
2017-03-06 05:45:50 +08:00
}
this.database.collection('mailboxes').findOne(mailboxQuery, next);
};
getMailbox((err, mailbox) => {
2017-03-06 05:45:50 +08:00
if (err) {
return callback(err);
}
if (!mailbox) {
2017-03-06 05:45:50 +08:00
return callback(null, new Error('Selected mailbox does not exist'));
}
let startIndex = mailbox.modifyIndex;
let updated = 0;
let updateNext = () => {
if (updated >= entries.length) {
return this.database.collection('journal').insertMany(entries, {
w: 1,
ordered: false
}, (err, r) => {
if (err) {
return callback(err);
}
return callback(null, r.insertedCount);
});
}
let entry = entries[updated++];
2017-03-30 02:22:26 +08:00
let setModseq = !entry.modseq;
2017-03-06 05:45:50 +08:00
entry.mailbox = mailbox._id;
2017-03-30 02:22:26 +08:00
if (setModseq) {
entry.modseq = ++startIndex;
}
2017-03-06 05:45:50 +08:00
if (entry.message && setModseq) {
2017-03-06 05:45:50 +08:00
this.database.collection('messages').findOneAndUpdate({
_id: entry.message,
modseq: {
$lt: entry.modseq
}
}, {
$set: {
modseq: entry.modseq
}
}, {}, err => {
if (err) {
this.logger.error('Error updating modseq for message %s. %s', entry.message, err.message);
}
updateNext();
});
} else {
updateNext();
}
};
updateNext();
});
}
/**
* Sends a notification that there are new updates in the selected mailbox
*
* @param {String} user
2017-03-06 05:45:50 +08:00
* @param {String} path
*/
fire(user, path, payload) {
let eventName = this._eventName(user, path);
2017-03-06 05:45:50 +08:00
setImmediate(() => {
let data = JSON.stringify({
e: eventName,
p: payload
});
this.publisher.publish('wd_events', data);
2017-03-06 05:45:50 +08:00
});
}
/**
* Returns all entries from the journal that have higher than provided modification index
*
* @param {String} session
* @param {String} path
* @param {Number} modifyIndex Last known modification id
* @param {Function} callback Returns update entries as an array
*/
getUpdates(session, path, modifyIndex, callback) {
modifyIndex = Number(modifyIndex) || 0;
let user = session.user.id;
2017-03-06 05:45:50 +08:00
this.database.collection('mailboxes').findOne({
user,
2017-03-06 05:45:50 +08:00
path
}, (err, mailbox) => {
if (err) {
return callback(err);
}
if (!mailbox) {
return callback(null, 'NONEXISTENT');
}
this.database.collection('journal').find({
mailbox: mailbox._id,
modseq: {
$gt: modifyIndex
}
2017-04-04 18:30:38 +08:00
}).sort([
['modseq', 1]
]).toArray(callback);
2017-03-06 05:45:50 +08:00
});
}
}
module.exports = ImapNotifier;