diff --git a/api.js b/api.js index e8b62ee7..d93e14ca 100644 --- a/api.js +++ b/api.js @@ -4,7 +4,7 @@ const config = require('wild-config'); const restify = require('restify'); const log = require('npmlog'); const logger = require('restify-logger'); -const corsMiddleware = require('@andris/restify-cors-middleware2'); +const corsMiddleware = require('restify-cors-middleware2'); const UserHandler = require('./lib/user-handler'); const MailboxHandler = require('./lib/mailbox-handler'); const MessageHandler = require('./lib/message-handler'); diff --git a/indexer.js b/indexer.js index 9b326afb..bd1892a9 100644 --- a/indexer.js +++ b/indexer.js @@ -8,61 +8,258 @@ const Queue = require('bull'); const db = require('./lib/db'); const errors = require('./lib/errors'); const crypto = require('crypto'); +const counters = require('./lib/counters'); +const { ObjectId } = require('mongodb'); +const libmime = require('libmime'); +const punycode = require('punycode/'); +const { getClient } = require('./lib/elasticsearch'); let loggelf; +let processlock; + +const LOCK_EXPIRE_TTL = 5; +const LOCK_RENEW_TTL = 2; let FORCE_DISABLE = false; +const processId = crypto.randomBytes(8).toString('hex'); +let isCurrentWorker = false; +let indexingQueue; const FORCE_DISABLED_MESSAGE = 'Can not set up change streams. Not a replica set. Changes are not indexed to ElasticSearch.'; -const processId = crypto.randomBytes(8).toString('hex'); - -async function getLock() { - let lockSuccess = await db.redis.set('indexer', processId, 'NX', 'EX', 10); - if (!lockSuccess) { - throw new Error('Failed to get lock'); - } -} - -async function monitorChanges() { - if (FORCE_DISABLE) { - log.error('Indexer', FORCE_DISABLED_MESSAGE); - return; +class Indexer { + constuctor() { + this.running = false; } - await getLock(); + async start() { + if (this.running) { + return; + } + this.running = true; + log.info('Indexer', 'Starting indexer'); - const pipeline = [ - { - $match: { - operationType: 'insert' + this.monitorChanges() + .then() + .catch(err => { + log.error('Indexer', 'Indexing failed error=%s', err.message); + }) + .finally(() => { + this.running = false; + }); + } + async stop() { + if (!this.running) { + return; + } + this.running = false; + log.info('Indexer', 'Stopping indexer'); + try { + if (this.changeStream && !this.changeStream.isClosed()) { + await this.changeStream.close(); } + } catch (err) { + // ignore } - ]; + } - const collection = db.database.collection('journal'); - const changeStream = collection.watch(pipeline, {}); + async processJobEntry(entry) { + let payload; - try { - while (await changeStream.hasNext()) { - console.log(await changeStream.next()); + if (!entry.user) { + // nothing to do here + return; } - } catch (error) { - if (error.code === 40573) { - // not a replica set! - FORCE_DISABLE = true; + + let hasFeatureFlag = await db.redis.sismember(`feature:indexing`, entry.user.toString()); + if (!hasFeatureFlag) { + log.silly('Indexer', `Feature flag not set, skipping user=%s command=%s message=%s`, entry.user, entry.command, entry.message); + return; + } else { + log.verbose('Indexer', `Feature flag set, processing user=%s command=%s message=%s`, entry.user, entry.command, entry.message); + } + + switch (entry.command) { + case 'EXISTS': + payload = { + action: 'new', + message: entry.message.toString(), + mailbox: entry.mailbox.toString(), + uid: entry.uid, + modseq: entry.modseq + }; + break; + case 'EXPUNGE': + payload = { + action: 'delete', + message: entry.message.toString(), + mailbox: entry.mailbox.toString(), + uid: entry.uid, + modseq: entry.modseq + }; + break; + case 'FETCH': + payload = { + action: 'update', + message: entry.message.toString(), + mailbox: entry.mailbox.toString(), + uid: entry.uid, + flags: entry.flags, + modseq: entry.modseq + }; + break; + } + + if (payload) { + await indexingQueue.add(payload, { + removeOnComplete: 100, + removeOnFail: 100, + attempts: 5, + backoff: { + type: 'exponential', + delay: 2000 + } + }); + } + } + + async monitorChanges() { + if (FORCE_DISABLE) { log.error('Indexer', FORCE_DISABLED_MESSAGE); return; } - if (changeStream.isClosed()) { - console.log('The change stream is closed. Will not wait on any more changes.'); - } else { - throw error; + const pipeline = [ + { + $match: { + operationType: 'insert' + } + } + ]; + + const collection = db.database.collection('journal'); + let opts = { + allowDiskUse: true + }; + + let lastId = await db.redis.get('indexer:last'); + if (lastId) { + opts.resumeAfter = { + _data: lastId + }; + } + + this.changeStream = collection.watch(pipeline, opts); + + try { + while (await this.changeStream.hasNext()) { + if (!this.running) { + return; + } + + let job = await this.changeStream.next(); + + try { + if (job.fullDocument && job.fullDocument.command) { + await this.processJobEntry(job.fullDocument); + } + + await db.redis.set('indexer:last', job._id._data); + } catch (error) { + try { + await this.stop(); + } catch (err) { + // ignore + } + throw error; + } + } + } catch (error) { + if (error.code === 40573) { + // not a replica set! + FORCE_DISABLE = true; + log.error('Indexer', FORCE_DISABLED_MESSAGE); + return; + } + + if (this.changeStream.isClosed()) { + log.info('Indexer', 'The change stream is closed. Will not wait on any more changes.'); + return; + } else { + try { + await this.stop(); + } catch (err) { + // ignore + } + throw error; + } } } } +let indexer = new Indexer(); + +async function renewLock() { + try { + let lockSuccess = await processlock('indexer:lock', processId, LOCK_EXPIRE_TTL); + isCurrentWorker = !!lockSuccess; + } catch (err) { + log.error('Indexer', 'Failed to get lock process=%s err=%s', processId, err.message); + isCurrentWorker = false; + } + + if (!isCurrentWorker) { + await indexer.stop(); + } else { + await indexer.start(); + } +} + +async function getLock() { + let renewTimer; + let keepLock = () => { + clearTimeout(renewTimer); + renewTimer = setTimeout(() => { + renewLock().finally(keepLock); + }, LOCK_RENEW_TTL * 1000); + }; + + renewLock().finally(keepLock); +} + +function removeEmptyKeys(obj) { + for (let key of Object.keys(obj)) { + if (obj[key] === null) { + delete obj[key]; + } + } + return obj; +} + +function formatAddresses(addresses) { + let result = []; + for (let address of [].concat(addresses || [])) { + if (address.group) { + result = result.concat(formatAddresses(address.group)); + } else { + let name = address.name || ''; + let addr = address.address || ''; + try { + name = libmime.decodeWords(name); + } catch (err) { + // ignore? + } + + if (/@xn--/.test(addr)) { + addr = addr.substr(0, addr.lastIndexOf('@') + 1) + punycode.toUnicode(addr.substr(addr.lastIndexOf('@') + 1)); + } + + result.push({ name, addres: addr }); + } + } + return result; +} + module.exports.start = callback => { if (!config.elasticsearch || !config.elasticsearch.indexer || !config.elasticsearch.indexer.enabled) { return setImmediate(() => callback(null, false)); @@ -114,23 +311,248 @@ module.exports.start = callback => { return setTimeout(() => process.exit(1), 3000); } - monitorChanges().catch(err => { + indexingQueue = new Queue('indexing', typeof config.dbs.redis === 'object' ? { redis: config.dbs.redis } : config.dbs.redis); + + processlock = counters(db.redis).processlock; + + getLock().catch(err => { errors.notify(err); return setTimeout(() => process.exit(1), 3000); }); - const indexingQueue = new Queue('indexing', typeof config.dbs.redis === 'object' ? { redis: config.dbs.redis } : config.dbs.redis); - + const esclient = getClient(); indexingQueue.process(async job => { try { - if (!job || !job.data || !job.data.ev) { + if (!job || !job.data) { return false; } const data = job.data; - console.log('DATA FOR INDEXING', data); - loggelf({ _msg: 'hellow world' }); + const dateKeyTdy = new Date().toISOString().substring(0, 10).replace(/-/g, ''); + const dateKeyYdy = new Date(Date.now() - 24 * 3600 * 1000).toISOString().substring(0, 10).replace(/-/g, ''); + const tombstoneTdy = `indexer:tomb:${dateKeyTdy}`; + const tombstoneYdy = `indexer:tomb:${dateKeyYdy}`; + + switch (data.action) { + case 'new': { + // check tombstone for race conditions (might be already deleted) + + let [[err1, isDeleted1], [err2, isDeleted2]] = await db.redis + .multi() + .sismember(tombstoneTdy, data.message) + .sismember(tombstoneYdy, data.message) + .exec(); + + if (err1) { + log.verbose('Indexing', 'Failed checking tombstone key=%s erro=%s', tombstoneTdy, err1.message); + } + + if (err2) { + log.verbose('Indexing', 'Failed checking tombstone key=%s erro=%s', tombstoneYdy, err2.message); + } + + if (isDeleted1 || isDeleted2) { + log.info('Indexing', 'Document tombstone found, skip index message=%s', data.message); + break; + } + + // fetch message from DB + let messageData = await db.database.collection('messages').findOne( + { + _id: new ObjectId(data.message), + // shard key + mailbox: new ObjectId(data.mailbox), + uid: data.uid + }, + { + projection: { + bodystructure: false, + envelope: false, + 'mimeTree.childNodes': false, + 'mimeTree.header': false + } + } + ); + + const now = new Date(); + + let messageObj = removeEmptyKeys({ + user: messageData.user.toString(), + mailbox: messageData.mailbox.toString(), + + thread: messageData.thread ? messageData.thread.toString() : null, + uid: messageData.uid, + answered: messageData.flags ? messageData.flags.includes('\\Answered') : null, + + attachments: + (messageData.attachments && + messageData.attachments.map(attachment => + removeEmptyKeys({ + cid: attachment.cid || null, + contentType: attachment.contentType || null, + size: attachment.size, + filename: attachment.filename, + id: attachment.id, + disposition: attachment.disposition + }) + )) || + null, + + bcc: formatAddresses(messageData.mimeTree && messageData.mimeTree.parsedHeader && messageData.mimeTree.parsedHeader.bcc), + cc: formatAddresses(messageData.mimeTree && messageData.mimeTree.parsedHeader && messageData.mimeTree.parsedHeader.cc), + + // Time when stored + created: now.toISOString(), + + // Internal Date + idate: (messageData.idate && messageData.idate.toISOString()) || now.toISOString(), + + // Header Date + hdate: (messageData.hdate && messageData.hdate.toISOString()) || now.toISOString(), + + draft: messageData.flags ? messageData.flags.includes('\\Draft') : null, + + flagged: messageData.flags ? messageData.flags.includes('\\Flagged') : null, + + flags: messageData.flags || [], + + from: formatAddresses(messageData.mimeTree && messageData.mimeTree.parsedHeader && messageData.mimeTree.parsedHeader.from), + + // do not index authentication and transport headers + headers: messageData.headers + ? messageData.headers.filter(header => !/^x|^received|^arc|^dkim|^authentication/gi.test(header.key)) + : null, + + inReplyTo: messageData.inReplyTo || null, + + msgid: messageData.msgid || null, + + replyTo: formatAddresses( + messageData.mimeTree && messageData.mimeTree.parsedHeader && messageData.mimeTree.parsedHeader['reply-to'] + ), + + size: messageData.size || null, + + subject: messageData.subject || '', + + to: formatAddresses(messageData.mimeTree && messageData.mimeTree.parsedHeader && messageData.mimeTree.parsedHeader.to), + + unseen: messageData.flags ? !messageData.flags.includes('\\Seen') : null, + + html: (messageData.html && messageData.html.join('\n')) || null, + + text: messageData.text || null, + + modseq: data.modseq + }); + + let indexResponse = await esclient.index({ + id: messageData._id.toString(), + index: config.elasticsearch.index, + body: messageObj, + refresh: false + }); + + log.verbose( + 'Indexing', + 'Document index result=%s message=%s', + indexResponse.body && indexResponse.body.result, + indexResponse.body && indexResponse.body._id + ); + + break; + } + + case 'delete': { + let deleteResponse; + try { + deleteResponse = await esclient.delete({ + id: data.message, + index: config.elasticsearch.index, + refresh: false + }); + } catch (err) { + if (err.meta && err.meta.body && err.meta.body.result === 'not_found') { + // set tombstone to prevent indexing this message in case of race conditions + await db.redis + .multi() + .sadd(tombstoneTdy, data.message) + .expire(tombstoneTdy, 24 * 3600) + .exec(); + } + throw err; + } + + log.verbose( + 'Indexing', + 'Document delete result=%s message=%s', + deleteResponse.body && deleteResponse.body.result, + deleteResponse.body && deleteResponse.body._id + ); + break; + } + + case 'update': { + let updateRequest = { + id: data.message, + index: config.elasticsearch.index, + refresh: false + }; + + if (data.modseq && typeof data.modseq === 'number') { + updateRequest.body = { + script: { + lang: 'painless', + source: ` + if( ctx._source.modseq >= params.modseq) { + ctx.op = 'none'; + } else { + ctx._source.draft = params.draft; + ctx._source.flagged = params.flagged; + ctx._source.flags = params.flags; + ctx._source.unseen = params.unseen; + ctx._source.modseq = params.modseq; + } + `, + params: { + modseq: data.modseq, + draft: data.flags.includes('\\Draft'), + flagged: data.flags.includes('\\Flagged'), + flags: data.flags || [], + unseen: !data.flags.includes('\\Seen') + } + } + }; + } else { + updateRequest.body = { + doc: removeEmptyKeys({ + draft: data.flags ? data.flags.includes('\\Draft') : null, + flagged: data.flags ? data.flags.includes('\\Flagged') : null, + flags: data.flags || [], + unseen: data.flags ? !data.flags.includes('\\Seen') : null + }) + }; + } + + let updateResponse = await esclient.update(updateRequest); + + log.verbose( + 'Indexing', + 'Document update result=%s message=%s', + updateResponse.body && updateResponse.body.result, + updateResponse.body && updateResponse.body._id + ); + } + } + + // loggelf({ _msg: 'hello world' }); } catch (err) { + if (err.meta && err.meta.body && err.meta.body.result === 'not_found') { + // missing document, ignore + log.error('Indexing', 'Failed to process indexing request, document not found message=%s', err.meta.body._id); + return; + } + log.error('Indexing', err); throw err; } diff --git a/lib/api/users.js b/lib/api/users.js index 97c99b7b..a245ef5f 100644 --- a/lib/api/users.js +++ b/lib/api/users.js @@ -18,6 +18,8 @@ const TaskHandler = require('../task-handler'); const { publish, FORWARD_ADDED } = require('../events'); const { ExportStream, ImportStream } = require('../export'); +const FEATURE_FLAGS = ['indexing']; + module.exports = (db, server, userHandler, settingsHandler) => { const taskHandler = new TaskHandler({ database: db.database }); @@ -338,6 +340,8 @@ module.exports = (db, server, userHandler, settingsHandler) => { encryptMessages: booleanSchema.default(false), encryptForwarded: booleanSchema.default(false), + featureFlags: Joi.object(Object.fromEntries(FEATURE_FLAGS.map(flag => [flag, booleanSchema.default(false)]))), + sess: sessSchema, ip: sessIPSchema }); @@ -923,6 +927,8 @@ module.exports = (db, server, userHandler, settingsHandler) => { disabled: booleanSchema, + featureFlags: Joi.object(Object.fromEntries(FEATURE_FLAGS.map(flag => [flag, booleanSchema.default(false)]))), + suspended: booleanSchema, sess: sessSchema, diff --git a/lib/counters.js b/lib/counters.js index f4f53ea5..11bbe0a1 100644 --- a/lib/counters.js +++ b/lib/counters.js @@ -4,6 +4,7 @@ const fs = require('fs'); const ttlCounterScript = fs.readFileSync(__dirname + '/lua/ttlcounter.lua', 'utf-8'); const cachedCounterScript = fs.readFileSync(__dirname + '/lua/cachedcounter.lua', 'utf-8'); const limitedCounterScript = fs.readFileSync(__dirname + '/lua/limitedcounter.lua', 'utf-8'); +const processLockScript = fs.readFileSync(__dirname + '/lua/process-lock.lua', 'utf-8'); const clientVersion = Date.now(); @@ -23,6 +24,11 @@ module.exports = redis => { lua: limitedCounterScript }); + redis.defineCommand('processlock', { + numberOfKeys: 1, + lua: processLockScript + }); + let asyncTTLCounter = async (key, count, max, windowSize) => { if (!max || isNaN(max)) { return { @@ -67,6 +73,10 @@ module.exports = redis => { value: (res && res[1]) || 0 }); }); + }, + + async processlock(key, identifier, ttl) { + return await redis.processlock(key, identifier, ttl); } }; }; diff --git a/lib/elasticsearch.js b/lib/elasticsearch.js index 25dc226a..d2e98f45 100644 --- a/lib/elasticsearch.js +++ b/lib/elasticsearch.js @@ -48,4 +48,4 @@ const init = async () => { return true; }; -module.exports = { init }; +module.exports = { init, getClient }; diff --git a/lib/ensure-es-index.js b/lib/ensure-es-index.js index f095b3c8..b96a377d 100644 --- a/lib/ensure-es-index.js +++ b/lib/ensure-es-index.js @@ -42,12 +42,6 @@ const mappings = { ignore_above: 24 }, - // message ID / ObjectId - id: { - type: 'keyword', - ignore_above: 24 - }, - // mailbox folder ID / ObjectId mailbox: { type: 'keyword', @@ -72,7 +66,7 @@ const mappings = { attachments: { type: 'nested', properties: { - contentId: { + cid: { type: 'keyword', ignore_above: 128 }, @@ -80,10 +74,7 @@ const mappings = { type: 'keyword', ignore_above: 128 }, - embedded: { - type: 'boolean' - }, - encodedSize: { + size: { type: 'long' }, filename: { @@ -95,8 +86,9 @@ const mappings = { type: 'keyword', ignore_above: 128 }, - inline: { - type: 'boolean' + disposition: { + type: 'keyword', + ignore_above: 128 } } }, @@ -183,7 +175,7 @@ const mappings = { ignore_above: 998 }, - messageId: { + msgid: { type: 'keyword', ignore_above: 998 }, @@ -200,18 +192,6 @@ const mappings = { } }, - sender: { - properties: { - address: { - type: 'keyword', - ignore_above: 256 - }, - name: { - type: 'text' - } - } - }, - size: { type: 'long' }, @@ -220,10 +200,6 @@ const mappings = { type: 'text' }, - preview: { - type: 'text' - }, - to: { properties: { name: { @@ -240,22 +216,22 @@ const mappings = { type: 'boolean' }, - seen: { - type: 'boolean' - }, - html: { type: 'text', analyzer: 'htmlStripAnalyzer' }, - plain: { + text: { type: 'text' }, type: { type: 'constant_keyword', value: 'email' + }, + + modseq: { + type: 'long' } }; diff --git a/lib/imap-notifier.js b/lib/imap-notifier.js index 369b2d75..afac8162 100644 --- a/lib/imap-notifier.js +++ b/lib/imap-notifier.js @@ -196,20 +196,6 @@ class ImapNotifier extends EventEmitter { id: entry.uid }); } - - console.log(entry.command); - - if (entry.command === 'EXISTS') { - console.log('EMAIL ADDED message=%s', entry.message.toString()); - } - - if (entry.command === 'EXPUNGE') { - console.log('EMAIL DELETED %s', entry.message.toString()); - } - - if (entry.command === 'FETCH') { - console.log('EMAIL UPDATED %s %s', entry.message.toString(), JSON.stringify(entry.flags)); - } } let r = await this.database.collection('journal').insertMany(entries, { @@ -324,6 +310,7 @@ class ImapNotifier extends EventEmitter { if (!counters.has(m)) { counters.set(m, { total: 0, unseen: 0, unseenChange: false }); } + switch (entry && entry.command) { case 'EXISTS': counters.get(m).total += 1; diff --git a/lib/lua/process-lock.lua b/lib/lua/process-lock.lua new file mode 100644 index 00000000..1975e128 --- /dev/null +++ b/lib/lua/process-lock.lua @@ -0,0 +1,24 @@ +local key = KEYS[1]; + +local identifier = ARGV[1]; +local ttl = tonumber(ARGV[2]) or 0; + +if redis.call("EXISTS", key) == 1 then + + local existing = redis.call("GET", key); + if existing == identifier then + redis.call("EXPIRE", key, ttl); + return 1; + else + return nil; + end + +else + local result = redis.call("SET", key, identifier); + if result then + redis.call("EXPIRE", key, ttl); + return 2; + else + return nil; + end +end \ No newline at end of file diff --git a/lib/user-handler.js b/lib/user-handler.js index fa66f6ea..b3004bc0 100644 --- a/lib/user-handler.js +++ b/lib/user-handler.js @@ -1515,6 +1515,8 @@ class UserHandler { disabled: true, suspended: false, + featureFlags: data.featureFlags || {}, + created: new Date() }; @@ -1699,6 +1701,20 @@ class UserHandler { } } + if (data.featureFlags && Object.keys(data.featureFlags).length) { + let req = this.redis.multi(); + for (let featureFlag of Object.keys(data.featureFlags)) { + if (data.featureFlags[featureFlag]) { + req = req.sadd(`feature:${featureFlag}`, user.toString()); + } + } + try { + await req.exec(); + } catch (err) { + log.error('Redis', 'FEATUREFAIL failed to set feature flags id=%s error=%s', user, err.message); + } + } + try { await this.logAuthEvent(user, { action: 'account created', @@ -3192,6 +3208,22 @@ class UserHandler { maxTimeMS: consts.DB_MAX_TIME_USERS } ); + + if ($set.featureFlags && Object.keys($set.featureFlags).length) { + let req = this.redis.multi(); + for (let featureFlag of Object.keys($set.featureFlags)) { + if ($set.featureFlags[featureFlag]) { + req = req.sadd(`feature:${featureFlag}`, user.toString()); + } else { + req = req.srem(`feature:${featureFlag}`, user.toString()); + } + } + try { + await req.exec(); + } catch (err) { + log.error('Redis', 'FEATUREFAIL failed to update feature flags id=%s error=%s', user, err.message); + } + } } catch (err) { log.error('DB', 'UPDATEFAIL id=%s error=%s', user, err.message); err.message = 'Database Error, failed to update user'; @@ -3428,6 +3460,21 @@ class UserHandler { if (r.insertedId) { await this.users.collection('users').deleteOne({ _id: user }); } + + // remove feature flag entries + if (existingAccount.featureFlags && Object.keys(existingAccount.featureFlags).length) { + let req = this.redis.multi(); + for (let featureFlag of Object.keys(existingAccount.featureFlags)) { + if (existingAccount.featureFlags[featureFlag]) { + req = req.srem(`feature:${featureFlag}`, user.toString()); + } + } + try { + await req.exec(); + } catch (err) { + log.error('Redis', 'FEATUREFAIL failed to update feature flags id=%s error=%s', user, err.message); + } + } } try { diff --git a/package.json b/package.json index 24fc7660..68106658 100644 --- a/package.json +++ b/package.json @@ -42,7 +42,7 @@ "supertest": "6.3.3" }, "dependencies": { - "@andris/restify-cors-middleware2": "2.1.2-patch.3", + "restify-cors-middleware2": "2.2.1", "@fidm/x509": "1.2.1", "@opensearch-project/opensearch": "2.2.0", "@phc/pbkdf2": "1.1.14", @@ -100,7 +100,7 @@ "uuid": "9.0.0", "wild-config": "1.7.0", "yargs": "17.7.1", - "zone-mta": "3.6.3" + "zone-mta": "3.6.5" }, "repository": { "type": "git",