Merge pull request #452 from nodemailer/elasticsearch-indexing

Elasticsearch indexing
This commit is contained in:
Andris Reinman 2023-03-10 11:22:05 +02:00 committed by GitHub
commit 4370a79192
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 563 additions and 91 deletions

2
api.js
View file

@ -4,7 +4,7 @@ const config = require('wild-config');
const restify = require('restify');
const log = require('npmlog');
const logger = require('restify-logger');
const corsMiddleware = require('@andris/restify-cors-middleware2');
const corsMiddleware = require('restify-cors-middleware2');
const UserHandler = require('./lib/user-handler');
const MailboxHandler = require('./lib/mailbox-handler');
const MessageHandler = require('./lib/message-handler');

View file

@ -8,61 +8,258 @@ const Queue = require('bull');
const db = require('./lib/db');
const errors = require('./lib/errors');
const crypto = require('crypto');
const counters = require('./lib/counters');
const { ObjectId } = require('mongodb');
const libmime = require('libmime');
const punycode = require('punycode/');
const { getClient } = require('./lib/elasticsearch');
let loggelf;
let processlock;
const LOCK_EXPIRE_TTL = 5;
const LOCK_RENEW_TTL = 2;
let FORCE_DISABLE = false;
const processId = crypto.randomBytes(8).toString('hex');
let isCurrentWorker = false;
let indexingQueue;
const FORCE_DISABLED_MESSAGE = 'Can not set up change streams. Not a replica set. Changes are not indexed to ElasticSearch.';
const processId = crypto.randomBytes(8).toString('hex');
async function getLock() {
let lockSuccess = await db.redis.set('indexer', processId, 'NX', 'EX', 10);
if (!lockSuccess) {
throw new Error('Failed to get lock');
}
}
async function monitorChanges() {
if (FORCE_DISABLE) {
log.error('Indexer', FORCE_DISABLED_MESSAGE);
return;
class Indexer {
constuctor() {
this.running = false;
}
await getLock();
async start() {
if (this.running) {
return;
}
this.running = true;
log.info('Indexer', 'Starting indexer');
const pipeline = [
{
$match: {
operationType: 'insert'
this.monitorChanges()
.then()
.catch(err => {
log.error('Indexer', 'Indexing failed error=%s', err.message);
})
.finally(() => {
this.running = false;
});
}
async stop() {
if (!this.running) {
return;
}
this.running = false;
log.info('Indexer', 'Stopping indexer');
try {
if (this.changeStream && !this.changeStream.isClosed()) {
await this.changeStream.close();
}
} catch (err) {
// ignore
}
];
}
const collection = db.database.collection('journal');
const changeStream = collection.watch(pipeline, {});
async processJobEntry(entry) {
let payload;
try {
while (await changeStream.hasNext()) {
console.log(await changeStream.next());
if (!entry.user) {
// nothing to do here
return;
}
} catch (error) {
if (error.code === 40573) {
// not a replica set!
FORCE_DISABLE = true;
let hasFeatureFlag = await db.redis.sismember(`feature:indexing`, entry.user.toString());
if (!hasFeatureFlag) {
log.silly('Indexer', `Feature flag not set, skipping user=%s command=%s message=%s`, entry.user, entry.command, entry.message);
return;
} else {
log.verbose('Indexer', `Feature flag set, processing user=%s command=%s message=%s`, entry.user, entry.command, entry.message);
}
switch (entry.command) {
case 'EXISTS':
payload = {
action: 'new',
message: entry.message.toString(),
mailbox: entry.mailbox.toString(),
uid: entry.uid,
modseq: entry.modseq
};
break;
case 'EXPUNGE':
payload = {
action: 'delete',
message: entry.message.toString(),
mailbox: entry.mailbox.toString(),
uid: entry.uid,
modseq: entry.modseq
};
break;
case 'FETCH':
payload = {
action: 'update',
message: entry.message.toString(),
mailbox: entry.mailbox.toString(),
uid: entry.uid,
flags: entry.flags,
modseq: entry.modseq
};
break;
}
if (payload) {
await indexingQueue.add(payload, {
removeOnComplete: 100,
removeOnFail: 100,
attempts: 5,
backoff: {
type: 'exponential',
delay: 2000
}
});
}
}
async monitorChanges() {
if (FORCE_DISABLE) {
log.error('Indexer', FORCE_DISABLED_MESSAGE);
return;
}
if (changeStream.isClosed()) {
console.log('The change stream is closed. Will not wait on any more changes.');
} else {
throw error;
const pipeline = [
{
$match: {
operationType: 'insert'
}
}
];
const collection = db.database.collection('journal');
let opts = {
allowDiskUse: true
};
let lastId = await db.redis.get('indexer:last');
if (lastId) {
opts.resumeAfter = {
_data: lastId
};
}
this.changeStream = collection.watch(pipeline, opts);
try {
while (await this.changeStream.hasNext()) {
if (!this.running) {
return;
}
let job = await this.changeStream.next();
try {
if (job.fullDocument && job.fullDocument.command) {
await this.processJobEntry(job.fullDocument);
}
await db.redis.set('indexer:last', job._id._data);
} catch (error) {
try {
await this.stop();
} catch (err) {
// ignore
}
throw error;
}
}
} catch (error) {
if (error.code === 40573) {
// not a replica set!
FORCE_DISABLE = true;
log.error('Indexer', FORCE_DISABLED_MESSAGE);
return;
}
if (this.changeStream.isClosed()) {
log.info('Indexer', 'The change stream is closed. Will not wait on any more changes.');
return;
} else {
try {
await this.stop();
} catch (err) {
// ignore
}
throw error;
}
}
}
}
let indexer = new Indexer();
async function renewLock() {
try {
let lockSuccess = await processlock('indexer:lock', processId, LOCK_EXPIRE_TTL);
isCurrentWorker = !!lockSuccess;
} catch (err) {
log.error('Indexer', 'Failed to get lock process=%s err=%s', processId, err.message);
isCurrentWorker = false;
}
if (!isCurrentWorker) {
await indexer.stop();
} else {
await indexer.start();
}
}
async function getLock() {
let renewTimer;
let keepLock = () => {
clearTimeout(renewTimer);
renewTimer = setTimeout(() => {
renewLock().finally(keepLock);
}, LOCK_RENEW_TTL * 1000);
};
renewLock().finally(keepLock);
}
function removeEmptyKeys(obj) {
for (let key of Object.keys(obj)) {
if (obj[key] === null) {
delete obj[key];
}
}
return obj;
}
function formatAddresses(addresses) {
let result = [];
for (let address of [].concat(addresses || [])) {
if (address.group) {
result = result.concat(formatAddresses(address.group));
} else {
let name = address.name || '';
let addr = address.address || '';
try {
name = libmime.decodeWords(name);
} catch (err) {
// ignore?
}
if (/@xn--/.test(addr)) {
addr = addr.substr(0, addr.lastIndexOf('@') + 1) + punycode.toUnicode(addr.substr(addr.lastIndexOf('@') + 1));
}
result.push({ name, addres: addr });
}
}
return result;
}
module.exports.start = callback => {
if (!config.elasticsearch || !config.elasticsearch.indexer || !config.elasticsearch.indexer.enabled) {
return setImmediate(() => callback(null, false));
@ -114,23 +311,248 @@ module.exports.start = callback => {
return setTimeout(() => process.exit(1), 3000);
}
monitorChanges().catch(err => {
indexingQueue = new Queue('indexing', typeof config.dbs.redis === 'object' ? { redis: config.dbs.redis } : config.dbs.redis);
processlock = counters(db.redis).processlock;
getLock().catch(err => {
errors.notify(err);
return setTimeout(() => process.exit(1), 3000);
});
const indexingQueue = new Queue('indexing', typeof config.dbs.redis === 'object' ? { redis: config.dbs.redis } : config.dbs.redis);
const esclient = getClient();
indexingQueue.process(async job => {
try {
if (!job || !job.data || !job.data.ev) {
if (!job || !job.data) {
return false;
}
const data = job.data;
console.log('DATA FOR INDEXING', data);
loggelf({ _msg: 'hellow world' });
const dateKeyTdy = new Date().toISOString().substring(0, 10).replace(/-/g, '');
const dateKeyYdy = new Date(Date.now() - 24 * 3600 * 1000).toISOString().substring(0, 10).replace(/-/g, '');
const tombstoneTdy = `indexer:tomb:${dateKeyTdy}`;
const tombstoneYdy = `indexer:tomb:${dateKeyYdy}`;
switch (data.action) {
case 'new': {
// check tombstone for race conditions (might be already deleted)
let [[err1, isDeleted1], [err2, isDeleted2]] = await db.redis
.multi()
.sismember(tombstoneTdy, data.message)
.sismember(tombstoneYdy, data.message)
.exec();
if (err1) {
log.verbose('Indexing', 'Failed checking tombstone key=%s erro=%s', tombstoneTdy, err1.message);
}
if (err2) {
log.verbose('Indexing', 'Failed checking tombstone key=%s erro=%s', tombstoneYdy, err2.message);
}
if (isDeleted1 || isDeleted2) {
log.info('Indexing', 'Document tombstone found, skip index message=%s', data.message);
break;
}
// fetch message from DB
let messageData = await db.database.collection('messages').findOne(
{
_id: new ObjectId(data.message),
// shard key
mailbox: new ObjectId(data.mailbox),
uid: data.uid
},
{
projection: {
bodystructure: false,
envelope: false,
'mimeTree.childNodes': false,
'mimeTree.header': false
}
}
);
const now = new Date();
let messageObj = removeEmptyKeys({
user: messageData.user.toString(),
mailbox: messageData.mailbox.toString(),
thread: messageData.thread ? messageData.thread.toString() : null,
uid: messageData.uid,
answered: messageData.flags ? messageData.flags.includes('\\Answered') : null,
attachments:
(messageData.attachments &&
messageData.attachments.map(attachment =>
removeEmptyKeys({
cid: attachment.cid || null,
contentType: attachment.contentType || null,
size: attachment.size,
filename: attachment.filename,
id: attachment.id,
disposition: attachment.disposition
})
)) ||
null,
bcc: formatAddresses(messageData.mimeTree && messageData.mimeTree.parsedHeader && messageData.mimeTree.parsedHeader.bcc),
cc: formatAddresses(messageData.mimeTree && messageData.mimeTree.parsedHeader && messageData.mimeTree.parsedHeader.cc),
// Time when stored
created: now.toISOString(),
// Internal Date
idate: (messageData.idate && messageData.idate.toISOString()) || now.toISOString(),
// Header Date
hdate: (messageData.hdate && messageData.hdate.toISOString()) || now.toISOString(),
draft: messageData.flags ? messageData.flags.includes('\\Draft') : null,
flagged: messageData.flags ? messageData.flags.includes('\\Flagged') : null,
flags: messageData.flags || [],
from: formatAddresses(messageData.mimeTree && messageData.mimeTree.parsedHeader && messageData.mimeTree.parsedHeader.from),
// do not index authentication and transport headers
headers: messageData.headers
? messageData.headers.filter(header => !/^x|^received|^arc|^dkim|^authentication/gi.test(header.key))
: null,
inReplyTo: messageData.inReplyTo || null,
msgid: messageData.msgid || null,
replyTo: formatAddresses(
messageData.mimeTree && messageData.mimeTree.parsedHeader && messageData.mimeTree.parsedHeader['reply-to']
),
size: messageData.size || null,
subject: messageData.subject || '',
to: formatAddresses(messageData.mimeTree && messageData.mimeTree.parsedHeader && messageData.mimeTree.parsedHeader.to),
unseen: messageData.flags ? !messageData.flags.includes('\\Seen') : null,
html: (messageData.html && messageData.html.join('\n')) || null,
text: messageData.text || null,
modseq: data.modseq
});
let indexResponse = await esclient.index({
id: messageData._id.toString(),
index: config.elasticsearch.index,
body: messageObj,
refresh: false
});
log.verbose(
'Indexing',
'Document index result=%s message=%s',
indexResponse.body && indexResponse.body.result,
indexResponse.body && indexResponse.body._id
);
break;
}
case 'delete': {
let deleteResponse;
try {
deleteResponse = await esclient.delete({
id: data.message,
index: config.elasticsearch.index,
refresh: false
});
} catch (err) {
if (err.meta && err.meta.body && err.meta.body.result === 'not_found') {
// set tombstone to prevent indexing this message in case of race conditions
await db.redis
.multi()
.sadd(tombstoneTdy, data.message)
.expire(tombstoneTdy, 24 * 3600)
.exec();
}
throw err;
}
log.verbose(
'Indexing',
'Document delete result=%s message=%s',
deleteResponse.body && deleteResponse.body.result,
deleteResponse.body && deleteResponse.body._id
);
break;
}
case 'update': {
let updateRequest = {
id: data.message,
index: config.elasticsearch.index,
refresh: false
};
if (data.modseq && typeof data.modseq === 'number') {
updateRequest.body = {
script: {
lang: 'painless',
source: `
if( ctx._source.modseq >= params.modseq) {
ctx.op = 'none';
} else {
ctx._source.draft = params.draft;
ctx._source.flagged = params.flagged;
ctx._source.flags = params.flags;
ctx._source.unseen = params.unseen;
ctx._source.modseq = params.modseq;
}
`,
params: {
modseq: data.modseq,
draft: data.flags.includes('\\Draft'),
flagged: data.flags.includes('\\Flagged'),
flags: data.flags || [],
unseen: !data.flags.includes('\\Seen')
}
}
};
} else {
updateRequest.body = {
doc: removeEmptyKeys({
draft: data.flags ? data.flags.includes('\\Draft') : null,
flagged: data.flags ? data.flags.includes('\\Flagged') : null,
flags: data.flags || [],
unseen: data.flags ? !data.flags.includes('\\Seen') : null
})
};
}
let updateResponse = await esclient.update(updateRequest);
log.verbose(
'Indexing',
'Document update result=%s message=%s',
updateResponse.body && updateResponse.body.result,
updateResponse.body && updateResponse.body._id
);
}
}
// loggelf({ _msg: 'hello world' });
} catch (err) {
if (err.meta && err.meta.body && err.meta.body.result === 'not_found') {
// missing document, ignore
log.error('Indexing', 'Failed to process indexing request, document not found message=%s', err.meta.body._id);
return;
}
log.error('Indexing', err);
throw err;
}

View file

@ -18,6 +18,8 @@ const TaskHandler = require('../task-handler');
const { publish, FORWARD_ADDED } = require('../events');
const { ExportStream, ImportStream } = require('../export');
const FEATURE_FLAGS = ['indexing'];
module.exports = (db, server, userHandler, settingsHandler) => {
const taskHandler = new TaskHandler({ database: db.database });
@ -338,6 +340,8 @@ module.exports = (db, server, userHandler, settingsHandler) => {
encryptMessages: booleanSchema.default(false),
encryptForwarded: booleanSchema.default(false),
featureFlags: Joi.object(Object.fromEntries(FEATURE_FLAGS.map(flag => [flag, booleanSchema.default(false)]))),
sess: sessSchema,
ip: sessIPSchema
});
@ -923,6 +927,8 @@ module.exports = (db, server, userHandler, settingsHandler) => {
disabled: booleanSchema,
featureFlags: Joi.object(Object.fromEntries(FEATURE_FLAGS.map(flag => [flag, booleanSchema.default(false)]))),
suspended: booleanSchema,
sess: sessSchema,

View file

@ -4,6 +4,7 @@ const fs = require('fs');
const ttlCounterScript = fs.readFileSync(__dirname + '/lua/ttlcounter.lua', 'utf-8');
const cachedCounterScript = fs.readFileSync(__dirname + '/lua/cachedcounter.lua', 'utf-8');
const limitedCounterScript = fs.readFileSync(__dirname + '/lua/limitedcounter.lua', 'utf-8');
const processLockScript = fs.readFileSync(__dirname + '/lua/process-lock.lua', 'utf-8');
const clientVersion = Date.now();
@ -23,6 +24,11 @@ module.exports = redis => {
lua: limitedCounterScript
});
redis.defineCommand('processlock', {
numberOfKeys: 1,
lua: processLockScript
});
let asyncTTLCounter = async (key, count, max, windowSize) => {
if (!max || isNaN(max)) {
return {
@ -67,6 +73,10 @@ module.exports = redis => {
value: (res && res[1]) || 0
});
});
},
async processlock(key, identifier, ttl) {
return await redis.processlock(key, identifier, ttl);
}
};
};

View file

@ -48,4 +48,4 @@ const init = async () => {
return true;
};
module.exports = { init };
module.exports = { init, getClient };

View file

@ -42,12 +42,6 @@ const mappings = {
ignore_above: 24
},
// message ID / ObjectId
id: {
type: 'keyword',
ignore_above: 24
},
// mailbox folder ID / ObjectId
mailbox: {
type: 'keyword',
@ -72,7 +66,7 @@ const mappings = {
attachments: {
type: 'nested',
properties: {
contentId: {
cid: {
type: 'keyword',
ignore_above: 128
},
@ -80,10 +74,7 @@ const mappings = {
type: 'keyword',
ignore_above: 128
},
embedded: {
type: 'boolean'
},
encodedSize: {
size: {
type: 'long'
},
filename: {
@ -95,8 +86,9 @@ const mappings = {
type: 'keyword',
ignore_above: 128
},
inline: {
type: 'boolean'
disposition: {
type: 'keyword',
ignore_above: 128
}
}
},
@ -183,7 +175,7 @@ const mappings = {
ignore_above: 998
},
messageId: {
msgid: {
type: 'keyword',
ignore_above: 998
},
@ -200,18 +192,6 @@ const mappings = {
}
},
sender: {
properties: {
address: {
type: 'keyword',
ignore_above: 256
},
name: {
type: 'text'
}
}
},
size: {
type: 'long'
},
@ -220,10 +200,6 @@ const mappings = {
type: 'text'
},
preview: {
type: 'text'
},
to: {
properties: {
name: {
@ -240,22 +216,22 @@ const mappings = {
type: 'boolean'
},
seen: {
type: 'boolean'
},
html: {
type: 'text',
analyzer: 'htmlStripAnalyzer'
},
plain: {
text: {
type: 'text'
},
type: {
type: 'constant_keyword',
value: 'email'
},
modseq: {
type: 'long'
}
};

View file

@ -196,20 +196,6 @@ class ImapNotifier extends EventEmitter {
id: entry.uid
});
}
console.log(entry.command);
if (entry.command === 'EXISTS') {
console.log('EMAIL ADDED message=%s', entry.message.toString());
}
if (entry.command === 'EXPUNGE') {
console.log('EMAIL DELETED %s', entry.message.toString());
}
if (entry.command === 'FETCH') {
console.log('EMAIL UPDATED %s %s', entry.message.toString(), JSON.stringify(entry.flags));
}
}
let r = await this.database.collection('journal').insertMany(entries, {
@ -324,6 +310,7 @@ class ImapNotifier extends EventEmitter {
if (!counters.has(m)) {
counters.set(m, { total: 0, unseen: 0, unseenChange: false });
}
switch (entry && entry.command) {
case 'EXISTS':
counters.get(m).total += 1;

24
lib/lua/process-lock.lua Normal file
View file

@ -0,0 +1,24 @@
local key = KEYS[1];
local identifier = ARGV[1];
local ttl = tonumber(ARGV[2]) or 0;
if redis.call("EXISTS", key) == 1 then
local existing = redis.call("GET", key);
if existing == identifier then
redis.call("EXPIRE", key, ttl);
return 1;
else
return nil;
end
else
local result = redis.call("SET", key, identifier);
if result then
redis.call("EXPIRE", key, ttl);
return 2;
else
return nil;
end
end

View file

@ -1515,6 +1515,8 @@ class UserHandler {
disabled: true,
suspended: false,
featureFlags: data.featureFlags || {},
created: new Date()
};
@ -1699,6 +1701,20 @@ class UserHandler {
}
}
if (data.featureFlags && Object.keys(data.featureFlags).length) {
let req = this.redis.multi();
for (let featureFlag of Object.keys(data.featureFlags)) {
if (data.featureFlags[featureFlag]) {
req = req.sadd(`feature:${featureFlag}`, user.toString());
}
}
try {
await req.exec();
} catch (err) {
log.error('Redis', 'FEATUREFAIL failed to set feature flags id=%s error=%s', user, err.message);
}
}
try {
await this.logAuthEvent(user, {
action: 'account created',
@ -3192,6 +3208,22 @@ class UserHandler {
maxTimeMS: consts.DB_MAX_TIME_USERS
}
);
if ($set.featureFlags && Object.keys($set.featureFlags).length) {
let req = this.redis.multi();
for (let featureFlag of Object.keys($set.featureFlags)) {
if ($set.featureFlags[featureFlag]) {
req = req.sadd(`feature:${featureFlag}`, user.toString());
} else {
req = req.srem(`feature:${featureFlag}`, user.toString());
}
}
try {
await req.exec();
} catch (err) {
log.error('Redis', 'FEATUREFAIL failed to update feature flags id=%s error=%s', user, err.message);
}
}
} catch (err) {
log.error('DB', 'UPDATEFAIL id=%s error=%s', user, err.message);
err.message = 'Database Error, failed to update user';
@ -3428,6 +3460,21 @@ class UserHandler {
if (r.insertedId) {
await this.users.collection('users').deleteOne({ _id: user });
}
// remove feature flag entries
if (existingAccount.featureFlags && Object.keys(existingAccount.featureFlags).length) {
let req = this.redis.multi();
for (let featureFlag of Object.keys(existingAccount.featureFlags)) {
if (existingAccount.featureFlags[featureFlag]) {
req = req.srem(`feature:${featureFlag}`, user.toString());
}
}
try {
await req.exec();
} catch (err) {
log.error('Redis', 'FEATUREFAIL failed to update feature flags id=%s error=%s', user, err.message);
}
}
}
try {

View file

@ -42,7 +42,7 @@
"supertest": "6.3.3"
},
"dependencies": {
"@andris/restify-cors-middleware2": "2.1.2-patch.3",
"restify-cors-middleware2": "2.2.1",
"@fidm/x509": "1.2.1",
"@opensearch-project/opensearch": "2.2.0",
"@phc/pbkdf2": "1.1.14",
@ -100,7 +100,7 @@
"uuid": "9.0.0",
"wild-config": "1.7.0",
"yargs": "17.7.1",
"zone-mta": "3.6.3"
"zone-mta": "3.6.5"
},
"repository": {
"type": "git",