wildduck/indexer.js

655 lines
23 KiB
JavaScript

'use strict';
const log = require('npmlog');
const config = require('wild-config');
const Gelf = require('gelf');
const os = require('os');
const { Queue, Worker } = require('bullmq');
const db = require('./lib/db');
const errors = require('./lib/errors');
const crypto = require('crypto');
const counters = require('./lib/counters');
const { ObjectId } = require('mongodb');
const libmime = require('libmime');
const punycode = require('punycode.js');
const { getClient } = require('./lib/elasticsearch');
let loggelf;
let processlock;
let queueWorkers = {};
const LOCK_EXPIRE_TTL = 5;
const LOCK_RENEW_TTL = 2;
let FORCE_DISABLE = false;
const processId = crypto.randomBytes(8).toString('hex');
let isCurrentWorker = false;
let liveIndexingQueue;
const FORCE_DISABLED_MESSAGE = 'Can not set up change streams. Not a replica set. Changes are not indexed to ElasticSearch.';
class Indexer {
constuctor() {
this.running = false;
}
async start() {
if (this.running) {
return;
}
this.running = true;
log.info('Indexer', 'Starting indexer');
this.monitorChanges()
.then()
.catch(err => {
log.error('Indexer', 'Indexing failed error=%s', err.message);
})
.finally(() => {
this.running = false;
});
}
async stop() {
if (!this.running) {
return;
}
this.running = false;
log.info('Indexer', 'Stopping indexer');
try {
if (this.changeStream && !this.changeStream.closed) {
await this.changeStream.close();
}
} catch (err) {
// ignore
}
}
async processJobEntry(entry) {
let payload;
if (!entry.user) {
// nothing to do here
return;
}
switch (entry.command) {
case 'EXISTS':
payload = {
action: 'new',
message: entry.message.toString(),
mailbox: entry.mailbox.toString(),
uid: entry.uid,
modseq: entry.modseq,
user: entry.user.toString()
};
break;
case 'EXPUNGE':
payload = {
action: 'delete',
message: entry.message.toString(),
mailbox: entry.mailbox.toString(),
uid: entry.uid,
modseq: entry.modseq,
user: entry.user.toString()
};
break;
case 'FETCH':
payload = {
action: 'update',
message: entry.message.toString(),
mailbox: entry.mailbox.toString(),
uid: entry.uid,
flags: entry.flags,
modseq: entry.modseq,
user: entry.user.toString()
};
break;
}
if (payload) {
let hasFeatureFlag =
(config.enabledFeatureFlags && config.enabledFeatureFlags.indexer) || (await db.redis.sismember(`feature:indexing`, entry.user.toString()));
if (!hasFeatureFlag) {
log.silly('Indexer', `Feature flag not set, skipping user=%s command=%s message=%s`, entry.user, entry.command, entry.message);
return;
} else {
log.verbose('Indexer', `Feature flag set, processing user=%s command=%s message=%s`, entry.user, entry.command, entry.message);
}
await liveIndexingQueue.add('journal', payload, {
removeOnComplete: 100,
removeOnFail: 100,
attempts: 5,
backoff: {
type: 'exponential',
delay: 2000
}
});
}
}
async monitorChanges() {
if (FORCE_DISABLE) {
log.error('Indexer', FORCE_DISABLED_MESSAGE);
return;
}
const pipeline = [
{
$match: {
operationType: 'insert'
}
}
];
const collection = db.database.collection('journal');
let opts = {
allowDiskUse: true
};
let lastId = await db.redis.get('indexer:last');
if (lastId) {
opts.resumeAfter = {
_data: lastId
};
}
this.changeStream = collection.watch(pipeline, opts);
try {
while (await this.changeStream.hasNext()) {
if (!this.running) {
return;
}
let job = await this.changeStream.next();
try {
if (job.fullDocument && job.fullDocument.command) {
await this.processJobEntry(job.fullDocument);
}
await db.redis.set('indexer:last', job._id._data);
} catch (error) {
try {
await this.stop();
} catch (err) {
// ignore
}
throw error;
}
}
} catch (error) {
if (error.code === 40573) {
// not a replica set!
FORCE_DISABLE = true;
log.error('Indexer', FORCE_DISABLED_MESSAGE);
return;
}
if (error.errorLabels && error.errorLabels.includes('NonResumableChangeStreamError')) {
// can't resume previous cursor
await db.redis.del('indexer:last');
log.info('Indexer', 'Can not resume existing cursor');
return;
}
if (this.changeStream && this.changeStream.closed) {
log.info('Indexer', 'The change stream is closed. Will not wait on any more changes.');
return;
} else {
try {
await this.stop();
} catch (err) {
// ignore
}
throw error;
}
}
}
}
let indexer = new Indexer();
async function renewLock() {
try {
let lockSuccess = await processlock('indexer:lock', processId, LOCK_EXPIRE_TTL);
isCurrentWorker = !!lockSuccess;
} catch (err) {
log.error('Indexer', 'Failed to get lock process=%s err=%s', processId, err.message);
isCurrentWorker = false;
}
if (!isCurrentWorker) {
await indexer.stop();
} else {
await indexer.start();
}
}
async function getLock() {
let renewTimer;
let keepLock = () => {
clearTimeout(renewTimer);
renewTimer = setTimeout(() => {
renewLock().finally(keepLock);
}, LOCK_RENEW_TTL * 1000);
};
renewLock().finally(keepLock);
}
function removeEmptyKeys(obj) {
for (let key of Object.keys(obj)) {
if (obj[key] === null) {
delete obj[key];
}
}
return obj;
}
function formatAddresses(addresses) {
let result = [];
for (let address of [].concat(addresses || [])) {
if (address.group) {
result = result.concat(formatAddresses(address.group));
} else {
let name = address.name || '';
let addr = address.address || '';
try {
name = libmime.decodeWords(name);
} catch (err) {
// ignore?
}
if (/@xn--/.test(addr)) {
addr = addr.substr(0, addr.lastIndexOf('@') + 1) + punycode.toUnicode(addr.substr(addr.lastIndexOf('@') + 1));
}
result.push({ name, address: addr });
}
}
return result;
}
function indexingJob(esclient) {
return async job => {
try {
if (!job || !job.data) {
return false;
}
const data = job.data;
const dateKeyTdy = new Date().toISOString().substring(0, 10).replace(/-/g, '');
const dateKeyYdy = new Date(Date.now() - 24 * 3600 * 1000).toISOString().substring(0, 10).replace(/-/g, '');
const tombstoneTdy = `indexer:tomb:${dateKeyTdy}`;
const tombstoneYdy = `indexer:tomb:${dateKeyYdy}`;
switch (data.action) {
case 'new': {
// check tombstone for race conditions (might be already deleted)
let [[err1, isDeleted1], [err2, isDeleted2]] = await db.redis
.multi()
.sismember(tombstoneTdy, data.message)
.sismember(tombstoneYdy, data.message)
.exec();
if (err1) {
log.verbose('Indexing', 'Failed checking tombstone key=%s erro=%s', tombstoneTdy, err1.message);
}
if (err2) {
log.verbose('Indexing', 'Failed checking tombstone key=%s erro=%s', tombstoneYdy, err2.message);
}
if (isDeleted1 || isDeleted2) {
log.info('Indexing', 'Document tombstone found, skip index message=%s', data.message);
break;
}
// fetch message from DB
let messageData = await db.database.collection('messages').findOne(
{
_id: new ObjectId(data.message),
// shard key
mailbox: new ObjectId(data.mailbox),
uid: data.uid
},
{
projection: {
bodystructure: false,
envelope: false,
'mimeTree.childNodes': false,
'mimeTree.header': false
}
}
);
if (!messageData) {
log.info('Indexing', 'Message not found from DB, skip index message=%s', data.message);
break;
}
const now = messageData._id.getTimestamp();
const messageObj = removeEmptyKeys({
user: messageData.user.toString(),
mailbox: messageData.mailbox.toString(),
thread: messageData.thread ? messageData.thread.toString() : null,
uid: messageData.uid,
answered: messageData.flags ? messageData.flags.includes('\\Answered') : null,
ha: (messageData.attachments && messageData.attachments.length > 0) || false,
attachments:
(messageData.attachments &&
messageData.attachments.map(attachment =>
removeEmptyKeys({
cid: attachment.cid || null,
contentType: attachment.contentType || null,
size: attachment.size,
filename: attachment.filename,
id: attachment.id,
disposition: attachment.disposition
})
)) ||
null,
bcc: formatAddresses(messageData.mimeTree && messageData.mimeTree.parsedHeader && messageData.mimeTree.parsedHeader.bcc),
cc: formatAddresses(messageData.mimeTree && messageData.mimeTree.parsedHeader && messageData.mimeTree.parsedHeader.cc),
// Time when stored
created: now.toISOString(),
// Internal Date
idate: (messageData.idate && messageData.idate.toISOString()) || now.toISOString(),
// Header Date
hdate: (messageData.hdate && messageData.hdate.toISOString()) || now.toISOString(),
draft: messageData.flags ? messageData.flags.includes('\\Draft') : null,
flagged: messageData.flags ? messageData.flags.includes('\\Flagged') : null,
flags: messageData.flags || [],
from: formatAddresses(messageData.mimeTree && messageData.mimeTree.parsedHeader && messageData.mimeTree.parsedHeader.from),
// do not index authentication and transport headers
headers: messageData.headers
? messageData.headers.filter(header => !/^x|^received|^arc|^dkim|^authentication/gi.test(header.key))
: null,
inReplyTo: messageData.inReplyTo || null,
msgid: messageData.msgid || null,
replyTo: formatAddresses(messageData.mimeTree && messageData.mimeTree.parsedHeader && messageData.mimeTree.parsedHeader['reply-to']),
size: messageData.size || null,
subject: messageData.subject || '',
to: formatAddresses(messageData.mimeTree && messageData.mimeTree.parsedHeader && messageData.mimeTree.parsedHeader.to),
unseen: messageData.flags ? !messageData.flags.includes('\\Seen') : null,
html: (messageData.html && messageData.html.join('\n')) || null,
text: messageData.text || null,
modseq: data.modseq
});
let indexResponse = await esclient.index({
id: messageData._id.toString(),
index: config.elasticsearch.index,
body: messageObj,
refresh: false
});
log.verbose(
'Indexing',
'Document index result=%s message=%s',
indexResponse.body && indexResponse.body.result,
indexResponse.body && indexResponse.body._id
);
loggelf({
short_message: '[INDEXER]',
_mail_action: `indexer_${data.action}`,
_user: data.user,
_mailbox: data.mailbox,
_uid: data.uid,
_modseq: data.modseq,
_indexer_result: indexResponse.body && indexResponse.body.result,
_indexer_message: indexResponse.body && indexResponse.body._id
});
break;
}
case 'delete': {
let deleteResponse;
try {
deleteResponse = await esclient.delete({
id: data.message,
index: config.elasticsearch.index,
refresh: false
});
} catch (err) {
if (err.meta && err.meta.body && err.meta.body.result === 'not_found') {
// set tombstone to prevent indexing this message in case of race conditions
await db.redis
.multi()
.sadd(tombstoneTdy, data.message)
.expire(tombstoneTdy, 24 * 3600)
.exec();
}
throw err;
}
log.verbose(
'Indexing',
'Document delete result=%s message=%s',
deleteResponse.body && deleteResponse.body.result,
deleteResponse.body && deleteResponse.body._id
);
loggelf({
short_message: '[INDEXER]',
_mail_action: `indexer_${data.action}`,
_user: data.user,
_mailbox: data.mailbox,
_uid: data.uid,
_modseq: data.modseq,
_indexer_result: deleteResponse.body && deleteResponse.body.result,
_indexer_message: deleteResponse.body && deleteResponse.body._id
});
break;
}
case 'update': {
let updateRequest = {
id: data.message,
index: config.elasticsearch.index,
refresh: false
};
if (data.modseq && typeof data.modseq === 'number') {
updateRequest.body = {
script: {
lang: 'painless',
source: `
if( ctx._source.modseq >= params.modseq) {
ctx.op = 'none';
} else {
ctx._source.draft = params.draft;
ctx._source.flagged = params.flagged;
ctx._source.flags = params.flags;
ctx._source.unseen = params.unseen;
ctx._source.modseq = params.modseq;
}
`,
params: {
modseq: data.modseq,
draft: data.flags.includes('\\Draft'),
flagged: data.flags.includes('\\Flagged'),
flags: data.flags || [],
unseen: !data.flags.includes('\\Seen')
}
}
};
} else {
updateRequest.body = {
doc: removeEmptyKeys({
draft: data.flags ? data.flags.includes('\\Draft') : null,
flagged: data.flags ? data.flags.includes('\\Flagged') : null,
flags: data.flags || [],
unseen: data.flags ? !data.flags.includes('\\Seen') : null
})
};
}
let updateResponse = await esclient.update(updateRequest);
log.verbose(
'Indexing',
'Document update result=%s message=%s',
updateResponse.body && updateResponse.body.result,
updateResponse.body && updateResponse.body._id
);
loggelf({
short_message: '[INDEXER]',
_mail_action: `indexer_${data.action}`,
_user: data.user,
_mailbox: data.mailbox,
_uid: data.uid,
_modseq: data.modseq,
_flags: data.flags && data.flags.join(', '),
_indexer_result: updateResponse.body && updateResponse.body.result,
_indexer_message: updateResponse.body && updateResponse.body._id
});
}
}
// loggelf({ _msg: 'hello world' });
} catch (err) {
if (err.meta && err.meta.body && err.meta.body.result === 'not_found') {
// missing document, ignore
log.error('Indexing', 'Failed to process indexing request, document not found message=%s', err.meta.body._id);
return;
}
log.error('Indexing', err);
const data = job.data;
loggelf({
short_message: '[INDEXER]',
_mail_action: `indexer_${data.action}`,
_user: data.user,
_mailbox: data.mailbox,
_uid: data.uid,
_modseq: data.modseq,
_indexer_message: err.meta && err.meta.body && err.meta.body._id,
_error: err.message,
_err_code: err.meta && err.meta.body && err.meta.body.result
});
throw err;
}
};
}
module.exports.start = callback => {
if (!config.elasticsearch || !config.elasticsearch.indexer || !config.elasticsearch.indexer.enabled) {
return setImmediate(() => callback(null, false));
}
const component = config.log.gelf.component || 'wildduck';
const hostname = config.log.gelf.hostname || os.hostname();
const gelf =
config.log.gelf && config.log.gelf.enabled
? new Gelf(config.log.gelf.options)
: {
// placeholder
emit: (key, message) => log.info('Gelf', JSON.stringify(message))
};
loggelf = message => {
if (typeof message === 'string') {
message = {
short_message: message
};
}
message = message || {};
if (!message.short_message || message.short_message.indexOf(component.toUpperCase()) !== 0) {
message.short_message = component.toUpperCase() + ' ' + (message.short_message || '');
}
message.facility = component; // facility is deprecated but set by the driver if not provided
message.host = hostname;
message.timestamp = Date.now() / 1000;
message._component = component;
Object.keys(message).forEach(key => {
if (!message[key]) {
delete message[key];
}
});
try {
gelf.emit('gelf.log', message);
} catch (err) {
log.error('Gelf', err);
}
};
db.connect(err => {
if (err) {
log.error('Db', 'Failed to setup database connection');
errors.notify(err);
return setTimeout(() => process.exit(1), 3000);
}
liveIndexingQueue = new Queue('live_indexing', db.queueConf);
processlock = counters(db.redis).processlock;
getLock().catch(err => {
errors.notify(err);
return setTimeout(() => process.exit(1), 3000);
});
const esclient = getClient();
queueWorkers.liveIndexing = new Worker(
'live_indexing',
indexingJob(esclient),
Object.assign(
{
concurrency: 1
},
db.queueConf
)
);
queueWorkers.backlogIndexing = new Worker(
'backlog_indexing',
indexingJob(esclient),
Object.assign(
{
concurrency: 1
},
db.queueConf
)
);
callback();
});
};