mirror of
https://github.com/nodemailer/wildduck.git
synced 2025-11-11 00:41:37 +08:00
Index emails for accounts with indexing feature flag
This commit is contained in:
parent
eb496b509c
commit
b8efbfce4f
8 changed files with 560 additions and 88 deletions
498
indexer.js
498
indexer.js
|
|
@ -8,61 +8,258 @@ const Queue = require('bull');
|
||||||
const db = require('./lib/db');
|
const db = require('./lib/db');
|
||||||
const errors = require('./lib/errors');
|
const errors = require('./lib/errors');
|
||||||
const crypto = require('crypto');
|
const crypto = require('crypto');
|
||||||
|
const counters = require('./lib/counters');
|
||||||
|
const { ObjectId } = require('mongodb');
|
||||||
|
const libmime = require('libmime');
|
||||||
|
const punycode = require('punycode/');
|
||||||
|
const { getClient } = require('./lib/elasticsearch');
|
||||||
|
|
||||||
let loggelf;
|
let loggelf;
|
||||||
|
let processlock;
|
||||||
|
|
||||||
|
const LOCK_EXPIRE_TTL = 5;
|
||||||
|
const LOCK_RENEW_TTL = 2;
|
||||||
|
|
||||||
let FORCE_DISABLE = false;
|
let FORCE_DISABLE = false;
|
||||||
|
const processId = crypto.randomBytes(8).toString('hex');
|
||||||
|
let isCurrentWorker = false;
|
||||||
|
let indexingQueue;
|
||||||
|
|
||||||
const FORCE_DISABLED_MESSAGE = 'Can not set up change streams. Not a replica set. Changes are not indexed to ElasticSearch.';
|
const FORCE_DISABLED_MESSAGE = 'Can not set up change streams. Not a replica set. Changes are not indexed to ElasticSearch.';
|
||||||
|
|
||||||
const processId = crypto.randomBytes(8).toString('hex');
|
class Indexer {
|
||||||
|
constuctor() {
|
||||||
async function getLock() {
|
this.running = false;
|
||||||
let lockSuccess = await db.redis.set('indexer', processId, 'NX', 'EX', 10);
|
|
||||||
if (!lockSuccess) {
|
|
||||||
throw new Error('Failed to get lock');
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async function monitorChanges() {
|
|
||||||
if (FORCE_DISABLE) {
|
|
||||||
log.error('Indexer', FORCE_DISABLED_MESSAGE);
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
await getLock();
|
async start() {
|
||||||
|
if (this.running) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.running = true;
|
||||||
|
log.info('Indexer', 'Starting indexer');
|
||||||
|
|
||||||
const pipeline = [
|
this.monitorChanges()
|
||||||
{
|
.then()
|
||||||
$match: {
|
.catch(err => {
|
||||||
operationType: 'insert'
|
log.error('Indexer', 'Indexing failed error=%s', err.message);
|
||||||
|
})
|
||||||
|
.finally(() => {
|
||||||
|
this.running = false;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
async stop() {
|
||||||
|
if (!this.running) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.running = false;
|
||||||
|
log.info('Indexer', 'Stopping indexer');
|
||||||
|
try {
|
||||||
|
if (this.changeStream && !this.changeStream.isClosed()) {
|
||||||
|
await this.changeStream.close();
|
||||||
}
|
}
|
||||||
|
} catch (err) {
|
||||||
|
// ignore
|
||||||
}
|
}
|
||||||
];
|
}
|
||||||
|
|
||||||
const collection = db.database.collection('journal');
|
async processJobEntry(entry) {
|
||||||
const changeStream = collection.watch(pipeline, {});
|
let payload;
|
||||||
|
|
||||||
try {
|
if (!entry.user) {
|
||||||
while (await changeStream.hasNext()) {
|
// nothing to do here
|
||||||
console.log(await changeStream.next());
|
return;
|
||||||
}
|
}
|
||||||
} catch (error) {
|
|
||||||
if (error.code === 40573) {
|
let hasFeatureFlag = await db.redis.sismember(`feature:indexing`, entry.user.toString());
|
||||||
// not a replica set!
|
if (!hasFeatureFlag) {
|
||||||
FORCE_DISABLE = true;
|
log.silly('Indexer', `Feature flag not set, skipping user=%s command=%s message=%s`, entry.user, entry.command, entry.message);
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
log.verbose('Indexer', `Feature flag set, processing user=%s command=%s message=%s`, entry.user, entry.command, entry.message);
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (entry.command) {
|
||||||
|
case 'EXISTS':
|
||||||
|
payload = {
|
||||||
|
action: 'new',
|
||||||
|
message: entry.message.toString(),
|
||||||
|
mailbox: entry.mailbox.toString(),
|
||||||
|
uid: entry.uid,
|
||||||
|
modseq: entry.modseq
|
||||||
|
};
|
||||||
|
break;
|
||||||
|
case 'EXPUNGE':
|
||||||
|
payload = {
|
||||||
|
action: 'delete',
|
||||||
|
message: entry.message.toString(),
|
||||||
|
mailbox: entry.mailbox.toString(),
|
||||||
|
uid: entry.uid,
|
||||||
|
modseq: entry.modseq
|
||||||
|
};
|
||||||
|
break;
|
||||||
|
case 'FETCH':
|
||||||
|
payload = {
|
||||||
|
action: 'update',
|
||||||
|
message: entry.message.toString(),
|
||||||
|
mailbox: entry.mailbox.toString(),
|
||||||
|
uid: entry.uid,
|
||||||
|
flags: entry.flags,
|
||||||
|
modseq: entry.modseq
|
||||||
|
};
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (payload) {
|
||||||
|
await indexingQueue.add(payload, {
|
||||||
|
removeOnComplete: 100,
|
||||||
|
removeOnFail: 100,
|
||||||
|
attempts: 5,
|
||||||
|
backoff: {
|
||||||
|
type: 'exponential',
|
||||||
|
delay: 2000
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async monitorChanges() {
|
||||||
|
if (FORCE_DISABLE) {
|
||||||
log.error('Indexer', FORCE_DISABLED_MESSAGE);
|
log.error('Indexer', FORCE_DISABLED_MESSAGE);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (changeStream.isClosed()) {
|
const pipeline = [
|
||||||
console.log('The change stream is closed. Will not wait on any more changes.');
|
{
|
||||||
} else {
|
$match: {
|
||||||
throw error;
|
operationType: 'insert'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
];
|
||||||
|
|
||||||
|
const collection = db.database.collection('journal');
|
||||||
|
let opts = {
|
||||||
|
allowDiskUse: true
|
||||||
|
};
|
||||||
|
|
||||||
|
let lastId = await db.redis.get('indexer:last');
|
||||||
|
if (lastId) {
|
||||||
|
opts.resumeAfter = {
|
||||||
|
_data: lastId
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
this.changeStream = collection.watch(pipeline, opts);
|
||||||
|
|
||||||
|
try {
|
||||||
|
while (await this.changeStream.hasNext()) {
|
||||||
|
if (!this.running) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let job = await this.changeStream.next();
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (job.fullDocument && job.fullDocument.command) {
|
||||||
|
await this.processJobEntry(job.fullDocument);
|
||||||
|
}
|
||||||
|
|
||||||
|
await db.redis.set('indexer:last', job._id._data);
|
||||||
|
} catch (error) {
|
||||||
|
try {
|
||||||
|
await this.stop();
|
||||||
|
} catch (err) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
if (error.code === 40573) {
|
||||||
|
// not a replica set!
|
||||||
|
FORCE_DISABLE = true;
|
||||||
|
log.error('Indexer', FORCE_DISABLED_MESSAGE);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.changeStream.isClosed()) {
|
||||||
|
log.info('Indexer', 'The change stream is closed. Will not wait on any more changes.');
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
await this.stop();
|
||||||
|
} catch (err) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let indexer = new Indexer();
|
||||||
|
|
||||||
|
async function renewLock() {
|
||||||
|
try {
|
||||||
|
let lockSuccess = await processlock('indexer:lock', processId, LOCK_EXPIRE_TTL);
|
||||||
|
isCurrentWorker = !!lockSuccess;
|
||||||
|
} catch (err) {
|
||||||
|
log.error('Indexer', 'Failed to get lock process=%s err=%s', processId, err.message);
|
||||||
|
isCurrentWorker = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!isCurrentWorker) {
|
||||||
|
await indexer.stop();
|
||||||
|
} else {
|
||||||
|
await indexer.start();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function getLock() {
|
||||||
|
let renewTimer;
|
||||||
|
let keepLock = () => {
|
||||||
|
clearTimeout(renewTimer);
|
||||||
|
renewTimer = setTimeout(() => {
|
||||||
|
renewLock().finally(keepLock);
|
||||||
|
}, LOCK_RENEW_TTL * 1000);
|
||||||
|
};
|
||||||
|
|
||||||
|
renewLock().finally(keepLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
function removeEmptyKeys(obj) {
|
||||||
|
for (let key of Object.keys(obj)) {
|
||||||
|
if (obj[key] === null) {
|
||||||
|
delete obj[key];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return obj;
|
||||||
|
}
|
||||||
|
|
||||||
|
function formatAddresses(addresses) {
|
||||||
|
let result = [];
|
||||||
|
for (let address of [].concat(addresses || [])) {
|
||||||
|
if (address.group) {
|
||||||
|
result = result.concat(formatAddresses(address.group));
|
||||||
|
} else {
|
||||||
|
let name = address.name || '';
|
||||||
|
let addr = address.address || '';
|
||||||
|
try {
|
||||||
|
name = libmime.decodeWords(name);
|
||||||
|
} catch (err) {
|
||||||
|
// ignore?
|
||||||
|
}
|
||||||
|
|
||||||
|
if (/@xn--/.test(addr)) {
|
||||||
|
addr = addr.substr(0, addr.lastIndexOf('@') + 1) + punycode.toUnicode(addr.substr(addr.lastIndexOf('@') + 1));
|
||||||
|
}
|
||||||
|
|
||||||
|
result.push({ name, addres: addr });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
module.exports.start = callback => {
|
module.exports.start = callback => {
|
||||||
if (!config.elasticsearch || !config.elasticsearch.indexer || !config.elasticsearch.indexer.enabled) {
|
if (!config.elasticsearch || !config.elasticsearch.indexer || !config.elasticsearch.indexer.enabled) {
|
||||||
return setImmediate(() => callback(null, false));
|
return setImmediate(() => callback(null, false));
|
||||||
|
|
@ -114,23 +311,248 @@ module.exports.start = callback => {
|
||||||
return setTimeout(() => process.exit(1), 3000);
|
return setTimeout(() => process.exit(1), 3000);
|
||||||
}
|
}
|
||||||
|
|
||||||
monitorChanges().catch(err => {
|
indexingQueue = new Queue('indexing', typeof config.dbs.redis === 'object' ? { redis: config.dbs.redis } : config.dbs.redis);
|
||||||
|
|
||||||
|
processlock = counters(db.redis).processlock;
|
||||||
|
|
||||||
|
getLock().catch(err => {
|
||||||
errors.notify(err);
|
errors.notify(err);
|
||||||
return setTimeout(() => process.exit(1), 3000);
|
return setTimeout(() => process.exit(1), 3000);
|
||||||
});
|
});
|
||||||
|
|
||||||
const indexingQueue = new Queue('indexing', typeof config.dbs.redis === 'object' ? { redis: config.dbs.redis } : config.dbs.redis);
|
const esclient = getClient();
|
||||||
|
|
||||||
indexingQueue.process(async job => {
|
indexingQueue.process(async job => {
|
||||||
try {
|
try {
|
||||||
if (!job || !job.data || !job.data.ev) {
|
if (!job || !job.data) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
const data = job.data;
|
const data = job.data;
|
||||||
console.log('DATA FOR INDEXING', data);
|
|
||||||
|
|
||||||
loggelf({ _msg: 'hellow world' });
|
const dateKeyTdy = new Date().toISOString().substring(0, 10).replace(/-/g, '');
|
||||||
|
const dateKeyYdy = new Date(Date.now() - 24 * 3600 * 1000).toISOString().substring(0, 10).replace(/-/g, '');
|
||||||
|
const tombstoneTdy = `indexer:tomb:${dateKeyTdy}`;
|
||||||
|
const tombstoneYdy = `indexer:tomb:${dateKeyYdy}`;
|
||||||
|
|
||||||
|
switch (data.action) {
|
||||||
|
case 'new': {
|
||||||
|
// check tombstone for race conditions (might be already deleted)
|
||||||
|
|
||||||
|
let [[err1, isDeleted1], [err2, isDeleted2]] = await db.redis
|
||||||
|
.multi()
|
||||||
|
.sismember(tombstoneTdy, data.message)
|
||||||
|
.sismember(tombstoneYdy, data.message)
|
||||||
|
.exec();
|
||||||
|
|
||||||
|
if (err1) {
|
||||||
|
log.verbose('Indexing', 'Failed checking tombstone key=%s erro=%s', tombstoneTdy, err1.message);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (err2) {
|
||||||
|
log.verbose('Indexing', 'Failed checking tombstone key=%s erro=%s', tombstoneYdy, err2.message);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isDeleted1 || isDeleted2) {
|
||||||
|
log.info('Indexing', 'Document tombstone found, skip index message=%s', data.message);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetch message from DB
|
||||||
|
let messageData = await db.database.collection('messages').findOne(
|
||||||
|
{
|
||||||
|
_id: new ObjectId(data.message),
|
||||||
|
// shard key
|
||||||
|
mailbox: new ObjectId(data.mailbox),
|
||||||
|
uid: data.uid
|
||||||
|
},
|
||||||
|
{
|
||||||
|
projection: {
|
||||||
|
bodystructure: false,
|
||||||
|
envelope: false,
|
||||||
|
'mimeTree.childNodes': false,
|
||||||
|
'mimeTree.header': false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
const now = new Date();
|
||||||
|
|
||||||
|
let messageObj = removeEmptyKeys({
|
||||||
|
user: messageData.user.toString(),
|
||||||
|
mailbox: messageData.mailbox.toString(),
|
||||||
|
|
||||||
|
thread: messageData.thread ? messageData.thread.toString() : null,
|
||||||
|
uid: messageData.uid,
|
||||||
|
answered: messageData.flags ? messageData.flags.includes('\\Answered') : null,
|
||||||
|
|
||||||
|
attachments:
|
||||||
|
(messageData.attachments &&
|
||||||
|
messageData.attachments.map(attachment =>
|
||||||
|
removeEmptyKeys({
|
||||||
|
cid: attachment.cid || null,
|
||||||
|
contentType: attachment.contentType || null,
|
||||||
|
size: attachment.size,
|
||||||
|
filename: attachment.filename,
|
||||||
|
id: attachment.id,
|
||||||
|
disposition: attachment.disposition
|
||||||
|
})
|
||||||
|
)) ||
|
||||||
|
null,
|
||||||
|
|
||||||
|
bcc: formatAddresses(messageData.mimeTree && messageData.mimeTree.parsedHeader && messageData.mimeTree.parsedHeader.bcc),
|
||||||
|
cc: formatAddresses(messageData.mimeTree && messageData.mimeTree.parsedHeader && messageData.mimeTree.parsedHeader.cc),
|
||||||
|
|
||||||
|
// Time when stored
|
||||||
|
created: now.toISOString(),
|
||||||
|
|
||||||
|
// Internal Date
|
||||||
|
idate: (messageData.idate && messageData.idate.toISOString()) || now.toISOString(),
|
||||||
|
|
||||||
|
// Header Date
|
||||||
|
hdate: (messageData.hdate && messageData.hdate.toISOString()) || now.toISOString(),
|
||||||
|
|
||||||
|
draft: messageData.flags ? messageData.flags.includes('\\Draft') : null,
|
||||||
|
|
||||||
|
flagged: messageData.flags ? messageData.flags.includes('\\Flagged') : null,
|
||||||
|
|
||||||
|
flags: messageData.flags || [],
|
||||||
|
|
||||||
|
from: formatAddresses(messageData.mimeTree && messageData.mimeTree.parsedHeader && messageData.mimeTree.parsedHeader.from),
|
||||||
|
|
||||||
|
// do not index authentication and transport headers
|
||||||
|
headers: messageData.headers
|
||||||
|
? messageData.headers.filter(header => !/^x|^received|^arc|^dkim|^authentication/gi.test(header.key))
|
||||||
|
: null,
|
||||||
|
|
||||||
|
inReplyTo: messageData.inReplyTo || null,
|
||||||
|
|
||||||
|
msgid: messageData.msgid || null,
|
||||||
|
|
||||||
|
replyTo: formatAddresses(
|
||||||
|
messageData.mimeTree && messageData.mimeTree.parsedHeader && messageData.mimeTree.parsedHeader['reply-to']
|
||||||
|
),
|
||||||
|
|
||||||
|
size: messageData.size || null,
|
||||||
|
|
||||||
|
subject: messageData.subject || '',
|
||||||
|
|
||||||
|
to: formatAddresses(messageData.mimeTree && messageData.mimeTree.parsedHeader && messageData.mimeTree.parsedHeader.to),
|
||||||
|
|
||||||
|
unseen: messageData.flags ? !messageData.flags.includes('\\Seen') : null,
|
||||||
|
|
||||||
|
html: (messageData.html && messageData.html.join('\n')) || null,
|
||||||
|
|
||||||
|
text: messageData.text || null,
|
||||||
|
|
||||||
|
modseq: data.modseq
|
||||||
|
});
|
||||||
|
|
||||||
|
let indexResponse = await esclient.index({
|
||||||
|
id: messageData._id.toString(),
|
||||||
|
index: config.elasticsearch.index,
|
||||||
|
body: messageObj,
|
||||||
|
refresh: false
|
||||||
|
});
|
||||||
|
|
||||||
|
log.verbose(
|
||||||
|
'Indexing',
|
||||||
|
'Document index result=%s message=%s',
|
||||||
|
indexResponse.body && indexResponse.body.result,
|
||||||
|
indexResponse.body && indexResponse.body._id
|
||||||
|
);
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case 'delete': {
|
||||||
|
let deleteResponse;
|
||||||
|
try {
|
||||||
|
deleteResponse = await esclient.delete({
|
||||||
|
id: data.message,
|
||||||
|
index: config.elasticsearch.index,
|
||||||
|
refresh: false
|
||||||
|
});
|
||||||
|
} catch (err) {
|
||||||
|
if (err.meta && err.meta.body && err.meta.body.result === 'not_found') {
|
||||||
|
// set tombstone to prevent indexing this message in case of race conditions
|
||||||
|
await db.redis
|
||||||
|
.multi()
|
||||||
|
.sadd(tombstoneTdy, data.message)
|
||||||
|
.expire(tombstoneTdy, 24 * 3600)
|
||||||
|
.exec();
|
||||||
|
}
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
|
||||||
|
log.verbose(
|
||||||
|
'Indexing',
|
||||||
|
'Document delete result=%s message=%s',
|
||||||
|
deleteResponse.body && deleteResponse.body.result,
|
||||||
|
deleteResponse.body && deleteResponse.body._id
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case 'update': {
|
||||||
|
let updateRequest = {
|
||||||
|
id: data.message,
|
||||||
|
index: config.elasticsearch.index,
|
||||||
|
refresh: false
|
||||||
|
};
|
||||||
|
|
||||||
|
if (data.modseq && typeof data.modseq === 'number') {
|
||||||
|
updateRequest.body = {
|
||||||
|
script: {
|
||||||
|
lang: 'painless',
|
||||||
|
source: `
|
||||||
|
if( ctx._source.modseq >= params.modseq) {
|
||||||
|
ctx.op = 'none';
|
||||||
|
} else {
|
||||||
|
ctx._source.draft = params.draft;
|
||||||
|
ctx._source.flagged = params.flagged;
|
||||||
|
ctx._source.flags = params.flags;
|
||||||
|
ctx._source.unseen = params.unseen;
|
||||||
|
ctx._source.modseq = params.modseq;
|
||||||
|
}
|
||||||
|
`,
|
||||||
|
params: {
|
||||||
|
modseq: data.modseq,
|
||||||
|
draft: data.flags.includes('\\Draft'),
|
||||||
|
flagged: data.flags.includes('\\Flagged'),
|
||||||
|
flags: data.flags || [],
|
||||||
|
unseen: !data.flags.includes('\\Seen')
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
updateRequest.body = {
|
||||||
|
doc: removeEmptyKeys({
|
||||||
|
draft: data.flags ? data.flags.includes('\\Draft') : null,
|
||||||
|
flagged: data.flags ? data.flags.includes('\\Flagged') : null,
|
||||||
|
flags: data.flags || [],
|
||||||
|
unseen: data.flags ? !data.flags.includes('\\Seen') : null
|
||||||
|
})
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
let updateResponse = await esclient.update(updateRequest);
|
||||||
|
|
||||||
|
log.verbose(
|
||||||
|
'Indexing',
|
||||||
|
'Document update result=%s message=%s',
|
||||||
|
updateResponse.body && updateResponse.body.result,
|
||||||
|
updateResponse.body && updateResponse.body._id
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// loggelf({ _msg: 'hello world' });
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
if (err.meta && err.meta.body && err.meta.body.result === 'not_found') {
|
||||||
|
// missing document, ignore
|
||||||
|
log.error('Indexing', 'Failed to process indexing request, document not found message=%s', err.meta.body._id);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
log.error('Indexing', err);
|
log.error('Indexing', err);
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,8 @@ const TaskHandler = require('../task-handler');
|
||||||
const { publish, FORWARD_ADDED } = require('../events');
|
const { publish, FORWARD_ADDED } = require('../events');
|
||||||
const { ExportStream, ImportStream } = require('../export');
|
const { ExportStream, ImportStream } = require('../export');
|
||||||
|
|
||||||
|
const FEATURE_FLAGS = ['indexing'];
|
||||||
|
|
||||||
module.exports = (db, server, userHandler, settingsHandler) => {
|
module.exports = (db, server, userHandler, settingsHandler) => {
|
||||||
const taskHandler = new TaskHandler({ database: db.database });
|
const taskHandler = new TaskHandler({ database: db.database });
|
||||||
|
|
||||||
|
|
@ -338,6 +340,8 @@ module.exports = (db, server, userHandler, settingsHandler) => {
|
||||||
encryptMessages: booleanSchema.default(false),
|
encryptMessages: booleanSchema.default(false),
|
||||||
encryptForwarded: booleanSchema.default(false),
|
encryptForwarded: booleanSchema.default(false),
|
||||||
|
|
||||||
|
featureFlags: Joi.object(Object.fromEntries(FEATURE_FLAGS.map(flag => [flag, booleanSchema.default(false)]))),
|
||||||
|
|
||||||
sess: sessSchema,
|
sess: sessSchema,
|
||||||
ip: sessIPSchema
|
ip: sessIPSchema
|
||||||
});
|
});
|
||||||
|
|
@ -923,6 +927,8 @@ module.exports = (db, server, userHandler, settingsHandler) => {
|
||||||
|
|
||||||
disabled: booleanSchema,
|
disabled: booleanSchema,
|
||||||
|
|
||||||
|
featureFlags: Joi.object(Object.fromEntries(FEATURE_FLAGS.map(flag => [flag, booleanSchema.default(false)]))),
|
||||||
|
|
||||||
suspended: booleanSchema,
|
suspended: booleanSchema,
|
||||||
|
|
||||||
sess: sessSchema,
|
sess: sessSchema,
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ const fs = require('fs');
|
||||||
const ttlCounterScript = fs.readFileSync(__dirname + '/lua/ttlcounter.lua', 'utf-8');
|
const ttlCounterScript = fs.readFileSync(__dirname + '/lua/ttlcounter.lua', 'utf-8');
|
||||||
const cachedCounterScript = fs.readFileSync(__dirname + '/lua/cachedcounter.lua', 'utf-8');
|
const cachedCounterScript = fs.readFileSync(__dirname + '/lua/cachedcounter.lua', 'utf-8');
|
||||||
const limitedCounterScript = fs.readFileSync(__dirname + '/lua/limitedcounter.lua', 'utf-8');
|
const limitedCounterScript = fs.readFileSync(__dirname + '/lua/limitedcounter.lua', 'utf-8');
|
||||||
|
const processLockScript = fs.readFileSync(__dirname + '/lua/process-lock.lua', 'utf-8');
|
||||||
|
|
||||||
const clientVersion = Date.now();
|
const clientVersion = Date.now();
|
||||||
|
|
||||||
|
|
@ -23,6 +24,11 @@ module.exports = redis => {
|
||||||
lua: limitedCounterScript
|
lua: limitedCounterScript
|
||||||
});
|
});
|
||||||
|
|
||||||
|
redis.defineCommand('processlock', {
|
||||||
|
numberOfKeys: 1,
|
||||||
|
lua: processLockScript
|
||||||
|
});
|
||||||
|
|
||||||
let asyncTTLCounter = async (key, count, max, windowSize) => {
|
let asyncTTLCounter = async (key, count, max, windowSize) => {
|
||||||
if (!max || isNaN(max)) {
|
if (!max || isNaN(max)) {
|
||||||
return {
|
return {
|
||||||
|
|
@ -67,6 +73,10 @@ module.exports = redis => {
|
||||||
value: (res && res[1]) || 0
|
value: (res && res[1]) || 0
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
async processlock(key, identifier, ttl) {
|
||||||
|
return await redis.processlock(key, identifier, ttl);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -48,4 +48,4 @@ const init = async () => {
|
||||||
return true;
|
return true;
|
||||||
};
|
};
|
||||||
|
|
||||||
module.exports = { init };
|
module.exports = { init, getClient };
|
||||||
|
|
|
||||||
|
|
@ -42,12 +42,6 @@ const mappings = {
|
||||||
ignore_above: 24
|
ignore_above: 24
|
||||||
},
|
},
|
||||||
|
|
||||||
// message ID / ObjectId
|
|
||||||
id: {
|
|
||||||
type: 'keyword',
|
|
||||||
ignore_above: 24
|
|
||||||
},
|
|
||||||
|
|
||||||
// mailbox folder ID / ObjectId
|
// mailbox folder ID / ObjectId
|
||||||
mailbox: {
|
mailbox: {
|
||||||
type: 'keyword',
|
type: 'keyword',
|
||||||
|
|
@ -72,7 +66,7 @@ const mappings = {
|
||||||
attachments: {
|
attachments: {
|
||||||
type: 'nested',
|
type: 'nested',
|
||||||
properties: {
|
properties: {
|
||||||
contentId: {
|
cid: {
|
||||||
type: 'keyword',
|
type: 'keyword',
|
||||||
ignore_above: 128
|
ignore_above: 128
|
||||||
},
|
},
|
||||||
|
|
@ -80,10 +74,7 @@ const mappings = {
|
||||||
type: 'keyword',
|
type: 'keyword',
|
||||||
ignore_above: 128
|
ignore_above: 128
|
||||||
},
|
},
|
||||||
embedded: {
|
size: {
|
||||||
type: 'boolean'
|
|
||||||
},
|
|
||||||
encodedSize: {
|
|
||||||
type: 'long'
|
type: 'long'
|
||||||
},
|
},
|
||||||
filename: {
|
filename: {
|
||||||
|
|
@ -95,8 +86,9 @@ const mappings = {
|
||||||
type: 'keyword',
|
type: 'keyword',
|
||||||
ignore_above: 128
|
ignore_above: 128
|
||||||
},
|
},
|
||||||
inline: {
|
disposition: {
|
||||||
type: 'boolean'
|
type: 'keyword',
|
||||||
|
ignore_above: 128
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
@ -183,7 +175,7 @@ const mappings = {
|
||||||
ignore_above: 998
|
ignore_above: 998
|
||||||
},
|
},
|
||||||
|
|
||||||
messageId: {
|
msgid: {
|
||||||
type: 'keyword',
|
type: 'keyword',
|
||||||
ignore_above: 998
|
ignore_above: 998
|
||||||
},
|
},
|
||||||
|
|
@ -200,18 +192,6 @@ const mappings = {
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
sender: {
|
|
||||||
properties: {
|
|
||||||
address: {
|
|
||||||
type: 'keyword',
|
|
||||||
ignore_above: 256
|
|
||||||
},
|
|
||||||
name: {
|
|
||||||
type: 'text'
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
|
|
||||||
size: {
|
size: {
|
||||||
type: 'long'
|
type: 'long'
|
||||||
},
|
},
|
||||||
|
|
@ -220,10 +200,6 @@ const mappings = {
|
||||||
type: 'text'
|
type: 'text'
|
||||||
},
|
},
|
||||||
|
|
||||||
preview: {
|
|
||||||
type: 'text'
|
|
||||||
},
|
|
||||||
|
|
||||||
to: {
|
to: {
|
||||||
properties: {
|
properties: {
|
||||||
name: {
|
name: {
|
||||||
|
|
@ -240,22 +216,22 @@ const mappings = {
|
||||||
type: 'boolean'
|
type: 'boolean'
|
||||||
},
|
},
|
||||||
|
|
||||||
seen: {
|
|
||||||
type: 'boolean'
|
|
||||||
},
|
|
||||||
|
|
||||||
html: {
|
html: {
|
||||||
type: 'text',
|
type: 'text',
|
||||||
analyzer: 'htmlStripAnalyzer'
|
analyzer: 'htmlStripAnalyzer'
|
||||||
},
|
},
|
||||||
|
|
||||||
plain: {
|
text: {
|
||||||
type: 'text'
|
type: 'text'
|
||||||
},
|
},
|
||||||
|
|
||||||
type: {
|
type: {
|
||||||
type: 'constant_keyword',
|
type: 'constant_keyword',
|
||||||
value: 'email'
|
value: 'email'
|
||||||
|
},
|
||||||
|
|
||||||
|
modseq: {
|
||||||
|
type: 'long'
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -196,20 +196,6 @@ class ImapNotifier extends EventEmitter {
|
||||||
id: entry.uid
|
id: entry.uid
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
console.log(entry.command);
|
|
||||||
|
|
||||||
if (entry.command === 'EXISTS') {
|
|
||||||
console.log('EMAIL ADDED message=%s', entry.message.toString());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (entry.command === 'EXPUNGE') {
|
|
||||||
console.log('EMAIL DELETED %s', entry.message.toString());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (entry.command === 'FETCH') {
|
|
||||||
console.log('EMAIL UPDATED %s %s', entry.message.toString(), JSON.stringify(entry.flags));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let r = await this.database.collection('journal').insertMany(entries, {
|
let r = await this.database.collection('journal').insertMany(entries, {
|
||||||
|
|
@ -324,6 +310,7 @@ class ImapNotifier extends EventEmitter {
|
||||||
if (!counters.has(m)) {
|
if (!counters.has(m)) {
|
||||||
counters.set(m, { total: 0, unseen: 0, unseenChange: false });
|
counters.set(m, { total: 0, unseen: 0, unseenChange: false });
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (entry && entry.command) {
|
switch (entry && entry.command) {
|
||||||
case 'EXISTS':
|
case 'EXISTS':
|
||||||
counters.get(m).total += 1;
|
counters.get(m).total += 1;
|
||||||
|
|
|
||||||
24
lib/lua/process-lock.lua
Normal file
24
lib/lua/process-lock.lua
Normal file
|
|
@ -0,0 +1,24 @@
|
||||||
|
local key = KEYS[1];
|
||||||
|
|
||||||
|
local identifier = ARGV[1];
|
||||||
|
local ttl = tonumber(ARGV[2]) or 0;
|
||||||
|
|
||||||
|
if redis.call("EXISTS", key) == 1 then
|
||||||
|
|
||||||
|
local existing = redis.call("GET", key);
|
||||||
|
if existing == identifier then
|
||||||
|
redis.call("EXPIRE", key, ttl);
|
||||||
|
return 1;
|
||||||
|
else
|
||||||
|
return nil;
|
||||||
|
end
|
||||||
|
|
||||||
|
else
|
||||||
|
local result = redis.call("SET", key, identifier);
|
||||||
|
if result then
|
||||||
|
redis.call("EXPIRE", key, ttl);
|
||||||
|
return 2;
|
||||||
|
else
|
||||||
|
return nil;
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
@ -1515,6 +1515,8 @@ class UserHandler {
|
||||||
disabled: true,
|
disabled: true,
|
||||||
suspended: false,
|
suspended: false,
|
||||||
|
|
||||||
|
featureFlags: data.featureFlags || {},
|
||||||
|
|
||||||
created: new Date()
|
created: new Date()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -1699,6 +1701,20 @@ class UserHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (data.featureFlags && Object.keys(data.featureFlags).length) {
|
||||||
|
let req = this.redis.multi();
|
||||||
|
for (let featureFlag of Object.keys(data.featureFlags)) {
|
||||||
|
if (data.featureFlags[featureFlag]) {
|
||||||
|
req = req.sadd(`feature:${featureFlag}`, user.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
await req.exec();
|
||||||
|
} catch (err) {
|
||||||
|
log.error('Redis', 'FEATUREFAIL failed to set feature flags id=%s error=%s', user, err.message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await this.logAuthEvent(user, {
|
await this.logAuthEvent(user, {
|
||||||
action: 'account created',
|
action: 'account created',
|
||||||
|
|
@ -3192,6 +3208,22 @@ class UserHandler {
|
||||||
maxTimeMS: consts.DB_MAX_TIME_USERS
|
maxTimeMS: consts.DB_MAX_TIME_USERS
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
if ($set.featureFlags && Object.keys($set.featureFlags).length) {
|
||||||
|
let req = this.redis.multi();
|
||||||
|
for (let featureFlag of Object.keys($set.featureFlags)) {
|
||||||
|
if ($set.featureFlags[featureFlag]) {
|
||||||
|
req = req.sadd(`feature:${featureFlag}`, user.toString());
|
||||||
|
} else {
|
||||||
|
req = req.srem(`feature:${featureFlag}`, user.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
await req.exec();
|
||||||
|
} catch (err) {
|
||||||
|
log.error('Redis', 'FEATUREFAIL failed to update feature flags id=%s error=%s', user, err.message);
|
||||||
|
}
|
||||||
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
log.error('DB', 'UPDATEFAIL id=%s error=%s', user, err.message);
|
log.error('DB', 'UPDATEFAIL id=%s error=%s', user, err.message);
|
||||||
err.message = 'Database Error, failed to update user';
|
err.message = 'Database Error, failed to update user';
|
||||||
|
|
@ -3428,6 +3460,21 @@ class UserHandler {
|
||||||
if (r.insertedId) {
|
if (r.insertedId) {
|
||||||
await this.users.collection('users').deleteOne({ _id: user });
|
await this.users.collection('users').deleteOne({ _id: user });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// remove feature flag entries
|
||||||
|
if (existingAccount.featureFlags && Object.keys(existingAccount.featureFlags).length) {
|
||||||
|
let req = this.redis.multi();
|
||||||
|
for (let featureFlag of Object.keys(existingAccount.featureFlags)) {
|
||||||
|
if (existingAccount.featureFlags[featureFlag]) {
|
||||||
|
req = req.srem(`feature:${featureFlag}`, user.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
await req.exec();
|
||||||
|
} catch (err) {
|
||||||
|
log.error('Redis', 'FEATUREFAIL failed to update feature flags id=%s error=%s', user, err.message);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue