mirror of
https://github.com/nodemailer/wildduck.git
synced 2024-11-10 09:32:28 +08:00
646 lines
23 KiB
JavaScript
646 lines
23 KiB
JavaScript
'use strict';
|
|
|
|
const log = require('npmlog');
|
|
const config = require('wild-config');
|
|
const Gelf = require('gelf');
|
|
const os = require('os');
|
|
const { Queue, Worker } = require('bullmq');
|
|
const db = require('./lib/db');
|
|
const errors = require('./lib/errors');
|
|
const crypto = require('crypto');
|
|
const counters = require('./lib/counters');
|
|
const { ObjectId } = require('mongodb');
|
|
const libmime = require('libmime');
|
|
const punycode = require('punycode/');
|
|
const { getClient } = require('./lib/elasticsearch');
|
|
|
|
let loggelf;
|
|
let processlock;
|
|
let queueWorkers = {};
|
|
|
|
const LOCK_EXPIRE_TTL = 5;
|
|
const LOCK_RENEW_TTL = 2;
|
|
|
|
let FORCE_DISABLE = false;
|
|
const processId = crypto.randomBytes(8).toString('hex');
|
|
let isCurrentWorker = false;
|
|
let liveIndexingQueue;
|
|
|
|
const FORCE_DISABLED_MESSAGE = 'Can not set up change streams. Not a replica set. Changes are not indexed to ElasticSearch.';
|
|
|
|
class Indexer {
|
|
constuctor() {
|
|
this.running = false;
|
|
}
|
|
|
|
async start() {
|
|
if (this.running) {
|
|
return;
|
|
}
|
|
this.running = true;
|
|
log.info('Indexer', 'Starting indexer');
|
|
|
|
this.monitorChanges()
|
|
.then()
|
|
.catch(err => {
|
|
log.error('Indexer', 'Indexing failed error=%s', err.message);
|
|
})
|
|
.finally(() => {
|
|
this.running = false;
|
|
});
|
|
}
|
|
async stop() {
|
|
if (!this.running) {
|
|
return;
|
|
}
|
|
this.running = false;
|
|
log.info('Indexer', 'Stopping indexer');
|
|
try {
|
|
if (this.changeStream && !this.changeStream.isClosed()) {
|
|
await this.changeStream.close();
|
|
}
|
|
} catch (err) {
|
|
// ignore
|
|
}
|
|
}
|
|
|
|
async processJobEntry(entry) {
|
|
let payload;
|
|
|
|
if (!entry.user) {
|
|
// nothing to do here
|
|
return;
|
|
}
|
|
|
|
switch (entry.command) {
|
|
case 'EXISTS':
|
|
payload = {
|
|
action: 'new',
|
|
message: entry.message.toString(),
|
|
mailbox: entry.mailbox.toString(),
|
|
uid: entry.uid,
|
|
modseq: entry.modseq,
|
|
user: entry.user.toString()
|
|
};
|
|
break;
|
|
case 'EXPUNGE':
|
|
payload = {
|
|
action: 'delete',
|
|
message: entry.message.toString(),
|
|
mailbox: entry.mailbox.toString(),
|
|
uid: entry.uid,
|
|
modseq: entry.modseq,
|
|
user: entry.user.toString()
|
|
};
|
|
break;
|
|
case 'FETCH':
|
|
payload = {
|
|
action: 'update',
|
|
message: entry.message.toString(),
|
|
mailbox: entry.mailbox.toString(),
|
|
uid: entry.uid,
|
|
flags: entry.flags,
|
|
modseq: entry.modseq,
|
|
user: entry.user.toString()
|
|
};
|
|
break;
|
|
}
|
|
|
|
if (payload) {
|
|
let hasFeatureFlag =
|
|
(config.enabledFeatureFlags && config.enabledFeatureFlags.indexer) || (await db.redis.sismember(`feature:indexing`, entry.user.toString()));
|
|
|
|
if (!hasFeatureFlag) {
|
|
log.silly('Indexer', `Feature flag not set, skipping user=%s command=%s message=%s`, entry.user, entry.command, entry.message);
|
|
return;
|
|
} else {
|
|
log.verbose('Indexer', `Feature flag set, processing user=%s command=%s message=%s`, entry.user, entry.command, entry.message);
|
|
}
|
|
|
|
await liveIndexingQueue.add('journal', payload, {
|
|
removeOnComplete: 100,
|
|
removeOnFail: 100,
|
|
attempts: 5,
|
|
backoff: {
|
|
type: 'exponential',
|
|
delay: 2000
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
async monitorChanges() {
|
|
if (FORCE_DISABLE) {
|
|
log.error('Indexer', FORCE_DISABLED_MESSAGE);
|
|
return;
|
|
}
|
|
|
|
const pipeline = [
|
|
{
|
|
$match: {
|
|
operationType: 'insert'
|
|
}
|
|
}
|
|
];
|
|
|
|
const collection = db.database.collection('journal');
|
|
let opts = {
|
|
allowDiskUse: true
|
|
};
|
|
|
|
let lastId = await db.redis.get('indexer:last');
|
|
if (lastId) {
|
|
opts.resumeAfter = {
|
|
_data: lastId
|
|
};
|
|
}
|
|
|
|
this.changeStream = collection.watch(pipeline, opts);
|
|
|
|
try {
|
|
while (await this.changeStream.hasNext()) {
|
|
if (!this.running) {
|
|
return;
|
|
}
|
|
|
|
let job = await this.changeStream.next();
|
|
|
|
try {
|
|
if (job.fullDocument && job.fullDocument.command) {
|
|
await this.processJobEntry(job.fullDocument);
|
|
}
|
|
|
|
await db.redis.set('indexer:last', job._id._data);
|
|
} catch (error) {
|
|
try {
|
|
await this.stop();
|
|
} catch (err) {
|
|
// ignore
|
|
}
|
|
throw error;
|
|
}
|
|
}
|
|
} catch (error) {
|
|
if (error.code === 40573) {
|
|
// not a replica set!
|
|
FORCE_DISABLE = true;
|
|
log.error('Indexer', FORCE_DISABLED_MESSAGE);
|
|
return;
|
|
}
|
|
|
|
if (this.changeStream.isClosed()) {
|
|
log.info('Indexer', 'The change stream is closed. Will not wait on any more changes.');
|
|
return;
|
|
} else {
|
|
try {
|
|
await this.stop();
|
|
} catch (err) {
|
|
// ignore
|
|
}
|
|
throw error;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
let indexer = new Indexer();
|
|
|
|
async function renewLock() {
|
|
try {
|
|
let lockSuccess = await processlock('indexer:lock', processId, LOCK_EXPIRE_TTL);
|
|
isCurrentWorker = !!lockSuccess;
|
|
} catch (err) {
|
|
log.error('Indexer', 'Failed to get lock process=%s err=%s', processId, err.message);
|
|
isCurrentWorker = false;
|
|
}
|
|
|
|
if (!isCurrentWorker) {
|
|
await indexer.stop();
|
|
} else {
|
|
await indexer.start();
|
|
}
|
|
}
|
|
|
|
async function getLock() {
|
|
let renewTimer;
|
|
let keepLock = () => {
|
|
clearTimeout(renewTimer);
|
|
renewTimer = setTimeout(() => {
|
|
renewLock().finally(keepLock);
|
|
}, LOCK_RENEW_TTL * 1000);
|
|
};
|
|
|
|
renewLock().finally(keepLock);
|
|
}
|
|
|
|
function removeEmptyKeys(obj) {
|
|
for (let key of Object.keys(obj)) {
|
|
if (obj[key] === null) {
|
|
delete obj[key];
|
|
}
|
|
}
|
|
return obj;
|
|
}
|
|
|
|
function formatAddresses(addresses) {
|
|
let result = [];
|
|
for (let address of [].concat(addresses || [])) {
|
|
if (address.group) {
|
|
result = result.concat(formatAddresses(address.group));
|
|
} else {
|
|
let name = address.name || '';
|
|
let addr = address.address || '';
|
|
try {
|
|
name = libmime.decodeWords(name);
|
|
} catch (err) {
|
|
// ignore?
|
|
}
|
|
|
|
if (/@xn--/.test(addr)) {
|
|
addr = addr.substr(0, addr.lastIndexOf('@') + 1) + punycode.toUnicode(addr.substr(addr.lastIndexOf('@') + 1));
|
|
}
|
|
|
|
result.push({ name, address: addr });
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
|
|
function indexingJob(esclient) {
|
|
return async job => {
|
|
try {
|
|
if (!job || !job.data) {
|
|
return false;
|
|
}
|
|
const data = job.data;
|
|
|
|
const dateKeyTdy = new Date().toISOString().substring(0, 10).replace(/-/g, '');
|
|
const dateKeyYdy = new Date(Date.now() - 24 * 3600 * 1000).toISOString().substring(0, 10).replace(/-/g, '');
|
|
const tombstoneTdy = `indexer:tomb:${dateKeyTdy}`;
|
|
const tombstoneYdy = `indexer:tomb:${dateKeyYdy}`;
|
|
|
|
switch (data.action) {
|
|
case 'new': {
|
|
// check tombstone for race conditions (might be already deleted)
|
|
|
|
let [[err1, isDeleted1], [err2, isDeleted2]] = await db.redis
|
|
.multi()
|
|
.sismember(tombstoneTdy, data.message)
|
|
.sismember(tombstoneYdy, data.message)
|
|
.exec();
|
|
|
|
if (err1) {
|
|
log.verbose('Indexing', 'Failed checking tombstone key=%s erro=%s', tombstoneTdy, err1.message);
|
|
}
|
|
|
|
if (err2) {
|
|
log.verbose('Indexing', 'Failed checking tombstone key=%s erro=%s', tombstoneYdy, err2.message);
|
|
}
|
|
|
|
if (isDeleted1 || isDeleted2) {
|
|
log.info('Indexing', 'Document tombstone found, skip index message=%s', data.message);
|
|
break;
|
|
}
|
|
|
|
// fetch message from DB
|
|
let messageData = await db.database.collection('messages').findOne(
|
|
{
|
|
_id: new ObjectId(data.message),
|
|
// shard key
|
|
mailbox: new ObjectId(data.mailbox),
|
|
uid: data.uid
|
|
},
|
|
{
|
|
projection: {
|
|
bodystructure: false,
|
|
envelope: false,
|
|
'mimeTree.childNodes': false,
|
|
'mimeTree.header': false
|
|
}
|
|
}
|
|
);
|
|
|
|
if (!messageData) {
|
|
log.info('Indexing', 'Message not found from DB, skip index message=%s', data.message);
|
|
break;
|
|
}
|
|
|
|
const now = messageData._id.getTimestamp();
|
|
|
|
const messageObj = removeEmptyKeys({
|
|
user: messageData.user.toString(),
|
|
mailbox: messageData.mailbox.toString(),
|
|
|
|
thread: messageData.thread ? messageData.thread.toString() : null,
|
|
uid: messageData.uid,
|
|
answered: messageData.flags ? messageData.flags.includes('\\Answered') : null,
|
|
|
|
attachments:
|
|
(messageData.attachments &&
|
|
messageData.attachments.map(attachment =>
|
|
removeEmptyKeys({
|
|
cid: attachment.cid || null,
|
|
contentType: attachment.contentType || null,
|
|
size: attachment.size,
|
|
filename: attachment.filename,
|
|
id: attachment.id,
|
|
disposition: attachment.disposition
|
|
})
|
|
)) ||
|
|
null,
|
|
|
|
bcc: formatAddresses(messageData.mimeTree && messageData.mimeTree.parsedHeader && messageData.mimeTree.parsedHeader.bcc),
|
|
cc: formatAddresses(messageData.mimeTree && messageData.mimeTree.parsedHeader && messageData.mimeTree.parsedHeader.cc),
|
|
|
|
// Time when stored
|
|
created: now.toISOString(),
|
|
|
|
// Internal Date
|
|
idate: (messageData.idate && messageData.idate.toISOString()) || now.toISOString(),
|
|
|
|
// Header Date
|
|
hdate: (messageData.hdate && messageData.hdate.toISOString()) || now.toISOString(),
|
|
|
|
draft: messageData.flags ? messageData.flags.includes('\\Draft') : null,
|
|
|
|
flagged: messageData.flags ? messageData.flags.includes('\\Flagged') : null,
|
|
|
|
flags: messageData.flags || [],
|
|
|
|
from: formatAddresses(messageData.mimeTree && messageData.mimeTree.parsedHeader && messageData.mimeTree.parsedHeader.from),
|
|
|
|
// do not index authentication and transport headers
|
|
headers: messageData.headers
|
|
? messageData.headers.filter(header => !/^x|^received|^arc|^dkim|^authentication/gi.test(header.key))
|
|
: null,
|
|
|
|
inReplyTo: messageData.inReplyTo || null,
|
|
|
|
msgid: messageData.msgid || null,
|
|
|
|
replyTo: formatAddresses(messageData.mimeTree && messageData.mimeTree.parsedHeader && messageData.mimeTree.parsedHeader['reply-to']),
|
|
|
|
size: messageData.size || null,
|
|
|
|
subject: messageData.subject || '',
|
|
|
|
to: formatAddresses(messageData.mimeTree && messageData.mimeTree.parsedHeader && messageData.mimeTree.parsedHeader.to),
|
|
|
|
unseen: messageData.flags ? !messageData.flags.includes('\\Seen') : null,
|
|
|
|
html: (messageData.html && messageData.html.join('\n')) || null,
|
|
|
|
text: messageData.text || null,
|
|
|
|
modseq: data.modseq
|
|
});
|
|
|
|
let indexResponse = await esclient.index({
|
|
id: messageData._id.toString(),
|
|
index: config.elasticsearch.index,
|
|
body: messageObj,
|
|
refresh: false
|
|
});
|
|
|
|
log.verbose(
|
|
'Indexing',
|
|
'Document index result=%s message=%s',
|
|
indexResponse.body && indexResponse.body.result,
|
|
indexResponse.body && indexResponse.body._id
|
|
);
|
|
|
|
loggelf({
|
|
short_message: '[INDEXER]',
|
|
_mail_action: `indexer_${data.action}`,
|
|
_user: data.user,
|
|
_mailbox: data.mailbox,
|
|
_uid: data.uid,
|
|
_modseq: data.modseq,
|
|
_indexer_result: indexResponse.body && indexResponse.body.result,
|
|
_indexer_message: indexResponse.body && indexResponse.body._id
|
|
});
|
|
|
|
break;
|
|
}
|
|
|
|
case 'delete': {
|
|
let deleteResponse;
|
|
try {
|
|
deleteResponse = await esclient.delete({
|
|
id: data.message,
|
|
index: config.elasticsearch.index,
|
|
refresh: false
|
|
});
|
|
} catch (err) {
|
|
if (err.meta && err.meta.body && err.meta.body.result === 'not_found') {
|
|
// set tombstone to prevent indexing this message in case of race conditions
|
|
await db.redis
|
|
.multi()
|
|
.sadd(tombstoneTdy, data.message)
|
|
.expire(tombstoneTdy, 24 * 3600)
|
|
.exec();
|
|
}
|
|
throw err;
|
|
}
|
|
|
|
log.verbose(
|
|
'Indexing',
|
|
'Document delete result=%s message=%s',
|
|
deleteResponse.body && deleteResponse.body.result,
|
|
deleteResponse.body && deleteResponse.body._id
|
|
);
|
|
|
|
loggelf({
|
|
short_message: '[INDEXER]',
|
|
_mail_action: `indexer_${data.action}`,
|
|
_user: data.user,
|
|
_mailbox: data.mailbox,
|
|
_uid: data.uid,
|
|
_modseq: data.modseq,
|
|
_indexer_result: deleteResponse.body && deleteResponse.body.result,
|
|
_indexer_message: deleteResponse.body && deleteResponse.body._id
|
|
});
|
|
break;
|
|
}
|
|
|
|
case 'update': {
|
|
let updateRequest = {
|
|
id: data.message,
|
|
index: config.elasticsearch.index,
|
|
refresh: false
|
|
};
|
|
|
|
if (data.modseq && typeof data.modseq === 'number') {
|
|
updateRequest.body = {
|
|
script: {
|
|
lang: 'painless',
|
|
source: `
|
|
if( ctx._source.modseq >= params.modseq) {
|
|
ctx.op = 'none';
|
|
} else {
|
|
ctx._source.draft = params.draft;
|
|
ctx._source.flagged = params.flagged;
|
|
ctx._source.flags = params.flags;
|
|
ctx._source.unseen = params.unseen;
|
|
ctx._source.modseq = params.modseq;
|
|
}
|
|
`,
|
|
params: {
|
|
modseq: data.modseq,
|
|
draft: data.flags.includes('\\Draft'),
|
|
flagged: data.flags.includes('\\Flagged'),
|
|
flags: data.flags || [],
|
|
unseen: !data.flags.includes('\\Seen')
|
|
}
|
|
}
|
|
};
|
|
} else {
|
|
updateRequest.body = {
|
|
doc: removeEmptyKeys({
|
|
draft: data.flags ? data.flags.includes('\\Draft') : null,
|
|
flagged: data.flags ? data.flags.includes('\\Flagged') : null,
|
|
flags: data.flags || [],
|
|
unseen: data.flags ? !data.flags.includes('\\Seen') : null
|
|
})
|
|
};
|
|
}
|
|
|
|
let updateResponse = await esclient.update(updateRequest);
|
|
|
|
log.verbose(
|
|
'Indexing',
|
|
'Document update result=%s message=%s',
|
|
updateResponse.body && updateResponse.body.result,
|
|
updateResponse.body && updateResponse.body._id
|
|
);
|
|
|
|
loggelf({
|
|
short_message: '[INDEXER]',
|
|
_mail_action: `indexer_${data.action}`,
|
|
_user: data.user,
|
|
_mailbox: data.mailbox,
|
|
_uid: data.uid,
|
|
|
|
_modseq: data.modseq,
|
|
_flags: data.flags && data.flags.join(', '),
|
|
_indexer_result: updateResponse.body && updateResponse.body.result,
|
|
_indexer_message: updateResponse.body && updateResponse.body._id
|
|
});
|
|
}
|
|
}
|
|
|
|
// loggelf({ _msg: 'hello world' });
|
|
} catch (err) {
|
|
if (err.meta && err.meta.body && err.meta.body.result === 'not_found') {
|
|
// missing document, ignore
|
|
log.error('Indexing', 'Failed to process indexing request, document not found message=%s', err.meta.body._id);
|
|
return;
|
|
}
|
|
|
|
log.error('Indexing', err);
|
|
|
|
const data = job.data;
|
|
loggelf({
|
|
short_message: '[INDEXER]',
|
|
_mail_action: `indexer_${data.action}`,
|
|
_user: data.user,
|
|
_mailbox: data.mailbox,
|
|
_uid: data.uid,
|
|
_modseq: data.modseq,
|
|
_indexer_message: err.meta && err.meta.body && err.meta.body._id,
|
|
_error: err.message,
|
|
_err_code: err.meta && err.meta.body && err.meta.body.result
|
|
});
|
|
|
|
throw err;
|
|
}
|
|
};
|
|
}
|
|
|
|
module.exports.start = callback => {
|
|
if (!config.elasticsearch || !config.elasticsearch.indexer || !config.elasticsearch.indexer.enabled) {
|
|
return setImmediate(() => callback(null, false));
|
|
}
|
|
|
|
const component = config.log.gelf.component || 'wildduck';
|
|
const hostname = config.log.gelf.hostname || os.hostname();
|
|
const gelf =
|
|
config.log.gelf && config.log.gelf.enabled
|
|
? new Gelf(config.log.gelf.options)
|
|
: {
|
|
// placeholder
|
|
emit: (key, message) => log.info('Gelf', JSON.stringify(message))
|
|
};
|
|
|
|
loggelf = message => {
|
|
if (typeof message === 'string') {
|
|
message = {
|
|
short_message: message
|
|
};
|
|
}
|
|
|
|
message = message || {};
|
|
|
|
if (!message.short_message || message.short_message.indexOf(component.toUpperCase()) !== 0) {
|
|
message.short_message = component.toUpperCase() + ' ' + (message.short_message || '');
|
|
}
|
|
|
|
message.facility = component; // facility is deprecated but set by the driver if not provided
|
|
message.host = hostname;
|
|
message.timestamp = Date.now() / 1000;
|
|
message._component = component;
|
|
Object.keys(message).forEach(key => {
|
|
if (!message[key]) {
|
|
delete message[key];
|
|
}
|
|
});
|
|
try {
|
|
gelf.emit('gelf.log', message);
|
|
} catch (err) {
|
|
log.error('Gelf', err);
|
|
}
|
|
};
|
|
|
|
db.connect(err => {
|
|
if (err) {
|
|
log.error('Db', 'Failed to setup database connection');
|
|
errors.notify(err);
|
|
return setTimeout(() => process.exit(1), 3000);
|
|
}
|
|
|
|
liveIndexingQueue = new Queue('live_indexing', db.queueConf);
|
|
|
|
processlock = counters(db.redis).processlock;
|
|
|
|
getLock().catch(err => {
|
|
errors.notify(err);
|
|
return setTimeout(() => process.exit(1), 3000);
|
|
});
|
|
|
|
const esclient = getClient();
|
|
|
|
queueWorkers.liveIndexing = new Worker(
|
|
'live_indexing',
|
|
indexingJob(esclient),
|
|
Object.assign(
|
|
{
|
|
concurrency: 1
|
|
},
|
|
db.queueConf
|
|
)
|
|
);
|
|
|
|
queueWorkers.backlogIndexing = new Worker(
|
|
'backlog_indexing',
|
|
indexingJob(esclient),
|
|
Object.assign(
|
|
{
|
|
concurrency: 1
|
|
},
|
|
db.queueConf
|
|
)
|
|
);
|
|
|
|
callback();
|
|
});
|
|
};
|