This commit is contained in:
Andris Reinman 2023-03-09 12:36:28 +02:00
parent ddcd8ee8bf
commit eb496b509c
No known key found for this signature in database
GPG key ID: DC6C83F4D584D364
3 changed files with 123 additions and 41 deletions

View file

@ -5,9 +5,64 @@ const config = require('wild-config');
const Gelf = require('gelf');
const os = require('os');
const Queue = require('bull');
const db = require('./lib/db');
const errors = require('./lib/errors');
const crypto = require('crypto');
let loggelf;
let FORCE_DISABLE = false;
const FORCE_DISABLED_MESSAGE = 'Can not set up change streams. Not a replica set. Changes are not indexed to ElasticSearch.';
const processId = crypto.randomBytes(8).toString('hex');
async function getLock() {
let lockSuccess = await db.redis.set('indexer', processId, 'NX', 'EX', 10);
if (!lockSuccess) {
throw new Error('Failed to get lock');
}
}
async function monitorChanges() {
if (FORCE_DISABLE) {
log.error('Indexer', FORCE_DISABLED_MESSAGE);
return;
}
await getLock();
const pipeline = [
{
$match: {
operationType: 'insert'
}
}
];
const collection = db.database.collection('journal');
const changeStream = collection.watch(pipeline, {});
try {
while (await changeStream.hasNext()) {
console.log(await changeStream.next());
}
} catch (error) {
if (error.code === 40573) {
// not a replica set!
FORCE_DISABLE = true;
log.error('Indexer', FORCE_DISABLED_MESSAGE);
return;
}
if (changeStream.isClosed()) {
console.log('The change stream is closed. Will not wait on any more changes.');
} else {
throw error;
}
}
}
module.exports.start = callback => {
if (!config.elasticsearch || !config.elasticsearch.indexer || !config.elasticsearch.indexer.enabled) {
return setImmediate(() => callback(null, false));
@ -52,22 +107,35 @@ module.exports.start = callback => {
}
};
const indexingQueue = new Queue('indexing', typeof config.dbs.redis === 'object' ? { redis: config.dbs.redis } : config.dbs.redis);
indexingQueue.process(async job => {
try {
if (!job || !job.data || !job.data.ev) {
return false;
}
const data = job.data;
console.log('DATA FOR INDEXING', data);
loggelf({ _msg: 'hellow world' });
} catch (err) {
log.error('Indexing', err);
throw err;
db.connect(err => {
if (err) {
log.error('Db', 'Failed to setup database connection');
errors.notify(err);
return setTimeout(() => process.exit(1), 3000);
}
});
callback();
monitorChanges().catch(err => {
errors.notify(err);
return setTimeout(() => process.exit(1), 3000);
});
const indexingQueue = new Queue('indexing', typeof config.dbs.redis === 'object' ? { redis: config.dbs.redis } : config.dbs.redis);
indexingQueue.process(async job => {
try {
if (!job || !job.data || !job.data.ev) {
return false;
}
const data = job.data;
console.log('DATA FOR INDEXING', data);
loggelf({ _msg: 'hellow world' });
} catch (err) {
log.error('Indexing', err);
throw err;
}
});
callback();
});
};

View file

@ -196,6 +196,20 @@ class ImapNotifier extends EventEmitter {
id: entry.uid
});
}
console.log(entry.command);
if (entry.command === 'EXISTS') {
console.log('EMAIL ADDED message=%s', entry.message.toString());
}
if (entry.command === 'EXPUNGE') {
console.log('EMAIL DELETED %s', entry.message.toString());
}
if (entry.command === 'FETCH') {
console.log('EMAIL UPDATED %s %s', entry.message.toString(), JSON.stringify(entry.flags));
}
}
let r = await this.database.collection('journal').insertMany(entries, {

View file

@ -1,6 +1,6 @@
{
"name": "wildduck",
"version": "1.38.2",
"version": "1.38.3",
"description": "IMAP/POP3 server built with Node.js and MongoDB",
"main": "server.js",
"scripts": {
@ -23,20 +23,20 @@
"license": "EUPL-1.2",
"homepage": "https://wildduck.email/",
"devDependencies": {
"ajv": "8.11.2",
"ajv": "8.12.0",
"chai": "4.3.7",
"docsify-cli": "4.4.4",
"eslint": "8.29.0",
"eslint": "8.35.0",
"eslint-config-nodemailer": "1.2.0",
"eslint-config-prettier": "8.5.0",
"grunt": "1.5.3",
"eslint-config-prettier": "8.7.0",
"grunt": "1.6.1",
"grunt-cli": "1.4.3",
"grunt-eslint": "24.0.1",
"grunt-mocha-test": "0.13.3",
"grunt-shell-spawn": "0.4.0",
"grunt-wait": "0.3.0",
"imapflow": "1.0.117",
"mailparser": "3.6.2",
"imapflow": "1.0.121",
"mailparser": "3.6.4",
"mocha": "10.2.0",
"request": "2.88.2",
"supertest": "6.3.3"
@ -44,52 +44,52 @@
"dependencies": {
"@andris/restify-cors-middleware2": "2.1.2-patch.3",
"@fidm/x509": "1.2.1",
"@opensearch-project/opensearch": "^2.1.0",
"@opensearch-project/opensearch": "2.2.0",
"@phc/pbkdf2": "1.1.14",
"@postalsys/vmc": "1.0.6",
"@root/acme": "3.1.0",
"@root/csr": "0.8.1",
"accesscontrol": "2.2.1",
"axios": "1.2.1",
"axios": "1.3.4",
"base32.js": "0.1.0",
"bcryptjs": "2.4.3",
"bson": "4.7.0",
"bson": "5.0.1",
"bull": "3.29.3",
"fido2-lib": "3.3.4",
"fido2-lib": "3.3.5",
"gelf": "2.0.1",
"generate-password": "1.7.0",
"hash-wasm": "4.9.0",
"he": "1.2.0",
"html-to-text": "9.0.3",
"html-to-text": "9.0.4",
"humanname": "0.2.2",
"iconv-lite": "0.6.3",
"ioredfour": "1.2.0-ioredis-06",
"ioredis": "5.2.4",
"ioredfour": "1.2.0-ioredis-07",
"ioredis": "5.3.1",
"ipaddr.js": "2.0.1",
"isemail": "3.2.0",
"joi": "17.7.0",
"joi": "17.8.3",
"js-yaml": "4.1.0",
"key-fingerprint": "1.1.0",
"libbase64": "1.2.1",
"libmime": "5.2.0",
"libmime": "5.2.1",
"libqp": "2.0.1",
"mailauth": "4.0.2",
"mailauth": "4.3.1",
"mailsplit": "5.4.0",
"mobileconfig": "2.4.0",
"mongo-cursor-pagination": "8.1.2",
"mongo-cursor-pagination": "8.1.3",
"mongodb": "4.12.1",
"mongodb-extended-json": "1.11.1",
"msgpack5": "6.0.2",
"node-forge": "1.3.1",
"node-html-parser": "6.1.4",
"nodemailer": "6.8.0",
"node-html-parser": "6.1.5",
"nodemailer": "6.9.1",
"npmlog": "7.0.1",
"openpgp": "5.5.0",
"openpgp": "5.7.0",
"pem-jwk": "2.0.0",
"punycode": "2.1.1",
"punycode": "2.3.0",
"pwnedpasswords": "1.0.6",
"qrcode": "1.5.1",
"restify": "10.0.0",
"restify": "11.1.0",
"restify-logger": "2.0.1",
"saslprep": "1.0.3",
"seq-index": "1.1.0",
@ -99,8 +99,8 @@
"unixcrypt": "1.1.0",
"uuid": "9.0.0",
"wild-config": "1.7.0",
"yargs": "17.6.2",
"zone-mta": "3.5.1"
"yargs": "17.7.1",
"zone-mta": "3.6.3"
},
"repository": {
"type": "git",