diff --git a/indexer.js b/indexer.js index f0d49cc9..9b326afb 100644 --- a/indexer.js +++ b/indexer.js @@ -5,9 +5,64 @@ const config = require('wild-config'); const Gelf = require('gelf'); const os = require('os'); const Queue = require('bull'); +const db = require('./lib/db'); +const errors = require('./lib/errors'); +const crypto = require('crypto'); let loggelf; +let FORCE_DISABLE = false; + +const FORCE_DISABLED_MESSAGE = 'Can not set up change streams. Not a replica set. Changes are not indexed to ElasticSearch.'; + +const processId = crypto.randomBytes(8).toString('hex'); + +async function getLock() { + let lockSuccess = await db.redis.set('indexer', processId, 'NX', 'EX', 10); + if (!lockSuccess) { + throw new Error('Failed to get lock'); + } +} + +async function monitorChanges() { + if (FORCE_DISABLE) { + log.error('Indexer', FORCE_DISABLED_MESSAGE); + return; + } + + await getLock(); + + const pipeline = [ + { + $match: { + operationType: 'insert' + } + } + ]; + + const collection = db.database.collection('journal'); + const changeStream = collection.watch(pipeline, {}); + + try { + while (await changeStream.hasNext()) { + console.log(await changeStream.next()); + } + } catch (error) { + if (error.code === 40573) { + // not a replica set! + FORCE_DISABLE = true; + log.error('Indexer', FORCE_DISABLED_MESSAGE); + return; + } + + if (changeStream.isClosed()) { + console.log('The change stream is closed. Will not wait on any more changes.'); + } else { + throw error; + } + } +} + module.exports.start = callback => { if (!config.elasticsearch || !config.elasticsearch.indexer || !config.elasticsearch.indexer.enabled) { return setImmediate(() => callback(null, false)); @@ -52,22 +107,35 @@ module.exports.start = callback => { } }; - const indexingQueue = new Queue('indexing', typeof config.dbs.redis === 'object' ? { redis: config.dbs.redis } : config.dbs.redis); - - indexingQueue.process(async job => { - try { - if (!job || !job.data || !job.data.ev) { - return false; - } - const data = job.data; - console.log('DATA FOR INDEXING', data); - - loggelf({ _msg: 'hellow world' }); - } catch (err) { - log.error('Indexing', err); - throw err; + db.connect(err => { + if (err) { + log.error('Db', 'Failed to setup database connection'); + errors.notify(err); + return setTimeout(() => process.exit(1), 3000); } - }); - callback(); + monitorChanges().catch(err => { + errors.notify(err); + return setTimeout(() => process.exit(1), 3000); + }); + + const indexingQueue = new Queue('indexing', typeof config.dbs.redis === 'object' ? { redis: config.dbs.redis } : config.dbs.redis); + + indexingQueue.process(async job => { + try { + if (!job || !job.data || !job.data.ev) { + return false; + } + const data = job.data; + console.log('DATA FOR INDEXING', data); + + loggelf({ _msg: 'hellow world' }); + } catch (err) { + log.error('Indexing', err); + throw err; + } + }); + + callback(); + }); }; diff --git a/lib/imap-notifier.js b/lib/imap-notifier.js index 7c99c9c7..369b2d75 100644 --- a/lib/imap-notifier.js +++ b/lib/imap-notifier.js @@ -196,6 +196,20 @@ class ImapNotifier extends EventEmitter { id: entry.uid }); } + + console.log(entry.command); + + if (entry.command === 'EXISTS') { + console.log('EMAIL ADDED message=%s', entry.message.toString()); + } + + if (entry.command === 'EXPUNGE') { + console.log('EMAIL DELETED %s', entry.message.toString()); + } + + if (entry.command === 'FETCH') { + console.log('EMAIL UPDATED %s %s', entry.message.toString(), JSON.stringify(entry.flags)); + } } let r = await this.database.collection('journal').insertMany(entries, { diff --git a/package.json b/package.json index a3affedb..24fc7660 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "wildduck", - "version": "1.38.2", + "version": "1.38.3", "description": "IMAP/POP3 server built with Node.js and MongoDB", "main": "server.js", "scripts": { @@ -23,20 +23,20 @@ "license": "EUPL-1.2", "homepage": "https://wildduck.email/", "devDependencies": { - "ajv": "8.11.2", + "ajv": "8.12.0", "chai": "4.3.7", "docsify-cli": "4.4.4", - "eslint": "8.29.0", + "eslint": "8.35.0", "eslint-config-nodemailer": "1.2.0", - "eslint-config-prettier": "8.5.0", - "grunt": "1.5.3", + "eslint-config-prettier": "8.7.0", + "grunt": "1.6.1", "grunt-cli": "1.4.3", "grunt-eslint": "24.0.1", "grunt-mocha-test": "0.13.3", "grunt-shell-spawn": "0.4.0", "grunt-wait": "0.3.0", - "imapflow": "1.0.117", - "mailparser": "3.6.2", + "imapflow": "1.0.121", + "mailparser": "3.6.4", "mocha": "10.2.0", "request": "2.88.2", "supertest": "6.3.3" @@ -44,52 +44,52 @@ "dependencies": { "@andris/restify-cors-middleware2": "2.1.2-patch.3", "@fidm/x509": "1.2.1", - "@opensearch-project/opensearch": "^2.1.0", + "@opensearch-project/opensearch": "2.2.0", "@phc/pbkdf2": "1.1.14", "@postalsys/vmc": "1.0.6", "@root/acme": "3.1.0", "@root/csr": "0.8.1", "accesscontrol": "2.2.1", - "axios": "1.2.1", + "axios": "1.3.4", "base32.js": "0.1.0", "bcryptjs": "2.4.3", - "bson": "4.7.0", + "bson": "5.0.1", "bull": "3.29.3", - "fido2-lib": "3.3.4", + "fido2-lib": "3.3.5", "gelf": "2.0.1", "generate-password": "1.7.0", "hash-wasm": "4.9.0", "he": "1.2.0", - "html-to-text": "9.0.3", + "html-to-text": "9.0.4", "humanname": "0.2.2", "iconv-lite": "0.6.3", - "ioredfour": "1.2.0-ioredis-06", - "ioredis": "5.2.4", + "ioredfour": "1.2.0-ioredis-07", + "ioredis": "5.3.1", "ipaddr.js": "2.0.1", "isemail": "3.2.0", - "joi": "17.7.0", + "joi": "17.8.3", "js-yaml": "4.1.0", "key-fingerprint": "1.1.0", "libbase64": "1.2.1", - "libmime": "5.2.0", + "libmime": "5.2.1", "libqp": "2.0.1", - "mailauth": "4.0.2", + "mailauth": "4.3.1", "mailsplit": "5.4.0", "mobileconfig": "2.4.0", - "mongo-cursor-pagination": "8.1.2", + "mongo-cursor-pagination": "8.1.3", "mongodb": "4.12.1", "mongodb-extended-json": "1.11.1", "msgpack5": "6.0.2", "node-forge": "1.3.1", - "node-html-parser": "6.1.4", - "nodemailer": "6.8.0", + "node-html-parser": "6.1.5", + "nodemailer": "6.9.1", "npmlog": "7.0.1", - "openpgp": "5.5.0", + "openpgp": "5.7.0", "pem-jwk": "2.0.0", - "punycode": "2.1.1", + "punycode": "2.3.0", "pwnedpasswords": "1.0.6", "qrcode": "1.5.1", - "restify": "10.0.0", + "restify": "11.1.0", "restify-logger": "2.0.1", "saslprep": "1.0.3", "seq-index": "1.1.0", @@ -99,8 +99,8 @@ "unixcrypt": "1.1.0", "uuid": "9.0.0", "wild-config": "1.7.0", - "yargs": "17.6.2", - "zone-mta": "3.5.1" + "yargs": "17.7.1", + "zone-mta": "3.6.3" }, "repository": { "type": "git",