Migrate from Bull to BullMQ for background processing

This commit is contained in:
Andris Reinman 2023-03-10 15:47:47 +02:00
parent 4370a79192
commit e7cef85d0f
No known key found for this signature in database
GPG key ID: DC6C83F4D584D364
8 changed files with 467 additions and 349 deletions

View file

@ -1,8 +1,7 @@
module.exports = {
upgrade: true,
reject: [
// FIXME: v4.x.x throws if not maxRetriesPerRequest: null, enableReadyCheck: false
// https://github.com/OptimalBits/bull/blob/develop/CHANGELOG.md#breaking-changes
'bull'
// mongodb 5.x driver does not support callbacks, only promises
'mongodb'
]
};

View file

@ -4,7 +4,7 @@ const log = require('npmlog');
const config = require('wild-config');
const Gelf = require('gelf');
const os = require('os');
const Queue = require('bull');
const { Queue, Worker } = require('bullmq');
const db = require('./lib/db');
const errors = require('./lib/errors');
const crypto = require('crypto');
@ -16,6 +16,7 @@ const { getClient } = require('./lib/elasticsearch');
let loggelf;
let processlock;
let queueWorkers = {};
const LOCK_EXPIRE_TTL = 5;
const LOCK_RENEW_TTL = 2;
@ -71,14 +72,6 @@ class Indexer {
return;
}
let hasFeatureFlag = await db.redis.sismember(`feature:indexing`, entry.user.toString());
if (!hasFeatureFlag) {
log.silly('Indexer', `Feature flag not set, skipping user=%s command=%s message=%s`, entry.user, entry.command, entry.message);
return;
} else {
log.verbose('Indexer', `Feature flag set, processing user=%s command=%s message=%s`, entry.user, entry.command, entry.message);
}
switch (entry.command) {
case 'EXISTS':
payload = {
@ -111,7 +104,15 @@ class Indexer {
}
if (payload) {
await indexingQueue.add(payload, {
let hasFeatureFlag = await db.redis.sismember(`feature:indexing`, entry.user.toString());
if (!hasFeatureFlag) {
log.silly('Indexer', `Feature flag not set, skipping user=%s command=%s message=%s`, entry.user, entry.command, entry.message);
return;
} else {
log.verbose('Indexer', `Feature flag set, processing user=%s command=%s message=%s`, entry.user, entry.command, entry.message);
}
await indexingQueue.add('journal', payload, {
removeOnComplete: 100,
removeOnFail: 100,
attempts: 5,
@ -311,7 +312,7 @@ module.exports.start = callback => {
return setTimeout(() => process.exit(1), 3000);
}
indexingQueue = new Queue('indexing', typeof config.dbs.redis === 'object' ? { redis: config.dbs.redis } : config.dbs.redis);
indexingQueue = new Queue('indexing', db.queueConf);
processlock = counters(db.redis).processlock;
@ -321,7 +322,9 @@ module.exports.start = callback => {
});
const esclient = getClient();
indexingQueue.process(async job => {
queueWorkers.indexing = new Worker(
'indexing',
async job => {
try {
if (!job || !job.data) {
return false;
@ -556,7 +559,14 @@ module.exports.start = callback => {
log.error('Indexing', err);
throw err;
}
});
},
Object.assign(
{
concurrency: 1
},
db.queueConf
)
);
callback();
});

View file

@ -1,17 +1,40 @@
'use strict';
const config = require('wild-config');
const tools = require('./tools');
const mongodb = require('mongodb');
const Redis = require('ioredis');
const redisUrl = require('./redis-url');
const log = require('npmlog');
const packageData = require('../package.json');
const MongoClient = mongodb.MongoClient;
module.exports.database = false;
module.exports.gridfs = false;
module.exports.users = false;
module.exports.senderDb = false;
module.exports.redis = false;
module.exports.redisConfig = false;
const REDIS_CONF = Object.assign(
{
// some defaults
maxRetriesPerRequest: null,
showFriendlyErrorStack: true,
retryStrategy(times) {
const delay = !times ? 1000 : Math.min(2 ** times * 500, 15 * 1000);
log.info('Redis', 'Connection retry times=%s delay=%s', times, delay);
return delay;
},
connectionName: `${packageData.name}@${packageData.version}[${process.pid}]`
},
typeof redisConf === 'string' ? redisUrl(config.dbs.redis) : config.dbs.redis || {}
);
module.exports.redisConfig = REDIS_CONF;
module.exports.queueConf = {
connection: Object.assign({ connectionName: `${REDIS_CONF.connectionName}[notify]` }, REDIS_CONF),
prefix: `wd:bull`
};
module.exports.redis = new Redis(REDIS_CONF);
let getDBConnection = (main, config, callback) => {
if (main) {
@ -70,10 +93,7 @@ module.exports.connect = callback => {
}
module.exports.senderDb = sdb || module.exports.database;
module.exports.redisConfig = tools.redisConfig(config.dbs.redis);
module.exports.redis = new Redis(module.exports.redisConfig);
module.exports.redis.connect(() => callback(null, module.exports.database));
callback();
});
});
});

View file

@ -2,7 +2,7 @@
// Pushes events to processing queue
const Queue = require('bull');
const { Queue } = require('bullmq');
const log = require('npmlog');
let webhooksQueue;
@ -54,13 +54,8 @@ module.exports = {
if (!webhooksQueue) {
webhooksQueue = new Queue('webhooks', {
createClient(type /*, config*/) {
if (type === 'bclient') {
// most probably never called
return redisClient.duplicate();
}
return redisClient;
}
connection: redisClient,
prefix: `wd:bull`
});
}
@ -73,7 +68,7 @@ module.exports = {
});
try {
let job = await webhooksQueue.add(data, {
let job = await webhooksQueue.add('wehook', data, {
removeOnComplete: true,
removeOnFail: 500,
attempts: 5,

View file

@ -79,6 +79,7 @@ class MailboxHandler {
return callback(err);
}
console.log('MAILBOX CREATED');
publish(this.redis, {
ev: MAILBOX_CREATED,
user,

74
lib/redis-url.js Normal file
View file

@ -0,0 +1,74 @@
'use strict';
module.exports = redisConf => {
let parsedRedisUrl = new URL(redisConf);
let parsedUrl = {};
let usernameAllowed = false;
for (let key of parsedRedisUrl.searchParams.keys()) {
let value = parsedRedisUrl.searchParams.get(key);
if (!value) {
continue;
}
switch (key) {
case 'password':
parsedUrl.password = value;
break;
case 'db':
{
if (value && !isNaN(value)) {
parsedUrl.db = Number(value);
}
}
break;
case 'allowUsernameInURI':
if (/^(true|1|yes|y)$/i.test(value)) {
usernameAllowed = true;
}
break;
}
}
for (let key of ['hostname', 'port', 'password', 'pathname', 'protocol', 'username']) {
let value = parsedRedisUrl[key];
if (!value) {
continue;
}
switch (key) {
case 'hostname':
parsedUrl.host = value;
break;
case 'port':
parsedUrl.port = Number(value);
break;
case 'password':
parsedUrl.password = value;
break;
case 'username':
if (usernameAllowed) {
parsedUrl.username = value;
}
break;
case 'pathname': {
let pathname = value.slice(1);
if (pathname && !isNaN(pathname)) {
parsedUrl.db = Number(pathname);
}
break;
}
case 'protocol':
if (value.toLowerCase() === 'rediss:') {
parsedUrl.tls = {};
}
break;
}
}
return parsedUrl;
};

View file

@ -42,7 +42,6 @@
"supertest": "6.3.3"
},
"dependencies": {
"restify-cors-middleware2": "2.2.1",
"@fidm/x509": "1.2.1",
"@opensearch-project/opensearch": "2.2.0",
"@phc/pbkdf2": "1.1.14",
@ -54,7 +53,7 @@
"base32.js": "0.1.0",
"bcryptjs": "2.4.3",
"bson": "5.0.1",
"bull": "3.29.3",
"bullmq": "3.10.1",
"fido2-lib": "3.3.5",
"gelf": "2.0.1",
"generate-password": "1.7.0",
@ -77,7 +76,7 @@
"mailsplit": "5.4.0",
"mobileconfig": "2.4.0",
"mongo-cursor-pagination": "8.1.3",
"mongodb": "4.12.1",
"mongodb": "4.14.0",
"mongodb-extended-json": "1.11.1",
"msgpack5": "6.0.2",
"node-forge": "1.3.1",
@ -90,6 +89,7 @@
"pwnedpasswords": "1.0.6",
"qrcode": "1.5.1",
"restify": "11.1.0",
"restify-cors-middleware2": "2.2.1",
"restify-logger": "2.0.1",
"saslprep": "1.0.3",
"seq-index": "1.1.0",

View file

@ -4,7 +4,7 @@ const log = require('npmlog');
const config = require('wild-config');
const Gelf = require('gelf');
const os = require('os');
const Queue = require('bull');
const { Queue, Worker } = require('bullmq');
const db = require('./lib/db');
const tools = require('./lib/tools');
const { ObjectId } = require('mongodb');
@ -13,6 +13,7 @@ const packageData = require('./package.json');
const { MARKED_SPAM, MARKED_HAM } = require('./lib/events');
let loggelf;
let queueWorkers = {};
async function postWebhook(webhook, data) {
let res;
@ -117,10 +118,11 @@ module.exports.start = callback => {
}
};
const webhooksQueue = new Queue('webhooks', typeof config.dbs.redis === 'object' ? { redis: config.dbs.redis } : config.dbs.redis);
const webhooksPostQueue = new Queue('webhooks_post', typeof config.dbs.redis === 'object' ? { redis: config.dbs.redis } : config.dbs.redis);
const webhooksPostQueue = new Queue('webhooks_post', db.queueConf);
webhooksQueue.process(async job => {
queueWorkers.webhooks = new Worker(
'webhooks',
async job => {
try {
if (!job || !job.data || !job.data.ev) {
return false;
@ -215,6 +217,7 @@ module.exports.start = callback => {
count++;
try {
await webhooksPostQueue.add(
'webhook',
{ data: Object.assign({ id: `${whid.toHexString()}:${count}` }, data), webhook },
{
removeOnComplete: true,
@ -235,15 +238,31 @@ module.exports.start = callback => {
log.error('Webhooks', err);
throw err;
}
});
},
Object.assign(
{
concurrency: 1
},
db.queueConf
)
);
webhooksPostQueue.process(async job => {
queueWorkers.webhooksPost = new Worker(
'webhooks_post',
async job => {
if (!job || !job.data) {
return false;
}
const { data, webhook } = job.data;
return await postWebhook(webhook, data);
});
},
Object.assign(
{
concurrency: 1
},
db.queueConf
)
);
callback();
};