Refactor queues (#3783) Bruception

This commit is contained in:
Bruce Berrios 2022-11-30 13:39:16 -05:00 committed by GitHub
parent 618c9ec931
commit c1dfc53e58
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 163 additions and 153 deletions

View file

@ -31,7 +31,7 @@ import {
incrementResult,
incrementDailyLeaderboard,
} from "../../utils/prometheus";
import * as George from "../../tasks/george";
import GeorgeQueue from "../../queues/george-queue";
import { getDailyLeaderboard } from "../../utils/daily-leaderboards";
import AutoRoleList from "../../constants/auto-roles";
import * as UserDAL from "../../dal/user";
@ -334,7 +334,7 @@ export async function addResult(
if (result.mode === "time" && String(result.mode2) === "60") {
incrementBananas(uid, result.wpm);
if (isPb && user.discordId) {
George.updateDiscordRole(user.discordId, result.wpm);
GeorgeQueue.updateDiscordRole(user.discordId, result.wpm);
}
}
@ -343,7 +343,7 @@ export async function addResult(
AutoRoleList.includes(result.challenge) &&
user.discordId
) {
George.awardChallenge(user.discordId, result.challenge);
GeorgeQueue.awardChallenge(user.discordId, result.challenge);
} else {
delete result.challenge;
}

View file

@ -5,7 +5,7 @@ import Logger from "../../utils/logger";
import { MonkeyResponse } from "../../utils/monkey-response";
import * as DiscordUtils from "../../utils/discord";
import { buildAgentLog, sanitizeString } from "../../utils/misc";
import * as George from "../../tasks/george";
import GeorgeQueue from "../../queues/george-queue";
import admin from "firebase-admin";
import { deleteAllApeKeys } from "../../dal/ape-keys";
import { deleteAllPresets } from "../../dal/preset";
@ -285,7 +285,7 @@ export async function linkDiscord(
await UserDAL.linkDiscord(uid, discordId, discordAvatar);
George.linkDiscord(discordId, uid);
GeorgeQueue.linkDiscord(discordId, uid);
Logger.logToDb("user_discord_link", `linked to ${discordId}`, uid);
return new MonkeyResponse("Discord account linked", {
@ -304,7 +304,7 @@ export async function unlinkDiscord(
throw new MonkeyError(404, "User does not have a linked Discord account");
}
George.unlinkDiscord(userInfo.discordId, uid);
GeorgeQueue.unlinkDiscord(userInfo.discordId, uid);
await UserDAL.unlinkDiscord(uid);
Logger.logToDb("user_discord_unlinked", userInfo.discordId, uid);

View file

@ -7,7 +7,7 @@ import {
} from "../utils/misc";
import { getCachedConfiguration } from "../init/configuration";
import { DailyLeaderboard } from "../utils/daily-leaderboards";
import { announceDailyLeaderboardTopResults } from "../tasks/george";
import GeorgeQueue from "../queues/george-queue";
import { addToInboxBulk } from "../dal/user";
import { buildMonkeyMail } from "../utils/monkey-mail";
@ -105,7 +105,7 @@ async function announceDailyLeaderboard(
);
const leaderboardId = `${mode} ${mode2} ${language}`;
await announceDailyLeaderboardTopResults(
await GeorgeQueue.announceDailyLeaderboardTopResults(
leaderboardId,
yesterday,
topResults

View file

@ -1,5 +1,5 @@
import { CronJob } from "cron";
import * as George from "../tasks/george";
import GeorgeQueue from "../queues/george-queue";
import * as LeaderboardsDAL from "../dal/leaderboards";
import { getCachedConfiguration } from "../init/configuration";
@ -52,7 +52,7 @@ async function updateLeaderboardAndNotifyChanges(
if (newRecords.length > 0) {
const leaderboardId = `time ${leaderboardTime} english`;
await George.announceLeaderboardUpdate(newRecords, leaderboardId);
await GeorgeQueue.announceLeaderboardUpdate(newRecords, leaderboardId);
}
}

View file

@ -0,0 +1,101 @@
import { MonkeyQueue } from "./monkey-queue";
const QUEUE_NAME = "george-tasks";
interface GeorgeTask {
name: string;
args: any[];
}
function buildGeorgeTask(taskName: string, taskArgs: any[]): GeorgeTask {
return {
name: taskName,
args: taskArgs,
};
}
class GeorgeQueue extends MonkeyQueue<GeorgeTask> {
async updateDiscordRole(discordId: string, wpm: number): Promise<void> {
const taskName = "updateRole";
const updateDiscordRoleTask = buildGeorgeTask(taskName, [discordId, wpm]);
await this.add(taskName, updateDiscordRoleTask);
}
async linkDiscord(discordId: string, uid: string): Promise<void> {
const taskName = "linkDiscord";
const linkDiscordTask = buildGeorgeTask(taskName, [discordId, uid]);
await this.add(taskName, linkDiscordTask);
}
async unlinkDiscord(discordId: string, uid: string): Promise<void> {
const taskName = "unlinkDiscord";
const unlinkDiscordTask = buildGeorgeTask(taskName, [discordId, uid]);
await this.add(taskName, unlinkDiscordTask);
}
async awardChallenge(
discordId: string,
challengeName: string
): Promise<void> {
const taskName = "awardChallenge";
const awardChallengeTask = buildGeorgeTask(taskName, [
discordId,
challengeName,
]);
await this.add(taskName, awardChallengeTask);
}
async announceLeaderboardUpdate(
newRecords: any[],
leaderboardId: string
): Promise<void> {
const taskName = "announceLeaderboardUpdate";
const leaderboardUpdateTasks = newRecords.map((record) => {
const taskData = buildGeorgeTask(taskName, [
record.discordId ?? record.name,
record.rank,
leaderboardId,
record.wpm,
record.raw,
record.acc,
record.consistency,
]);
return {
name: taskName,
data: taskData,
};
});
await this.addBulk(leaderboardUpdateTasks);
}
async announceDailyLeaderboardTopResults(
leaderboardId: string,
leaderboardTimestamp: number,
topResults: any[]
): Promise<void> {
const taskName = "announceDailyLeaderboardTopResults";
const dailyLeaderboardTopResultsTask = buildGeorgeTask(taskName, [
leaderboardId,
leaderboardTimestamp,
topResults,
]);
await this.add(taskName, dailyLeaderboardTopResultsTask);
}
}
export default new GeorgeQueue(QUEUE_NAME, {
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
attempts: 3,
backoff: {
type: "exponential",
delay: 2000,
},
},
});

View file

@ -0,0 +1,3 @@
import GeorgeQueue from "./george-queue";
export default [GeorgeQueue];

View file

@ -0,0 +1,45 @@
import IORedis from "ioredis";
import { Queue, QueueOptions, QueueScheduler } from "bullmq";
export class MonkeyQueue<T> {
jobQueue: Queue;
_queueScheduler: QueueScheduler;
queueName: string;
queueOpts: QueueOptions;
constructor(queueName: string, queueOpts: QueueOptions) {
this.queueName = queueName;
this.queueOpts = queueOpts;
}
init(redisConnection?: IORedis.Redis): void {
if (this.jobQueue || !redisConnection) {
return;
}
this.jobQueue = new Queue(this.queueName, {
...this.queueOpts,
connection: redisConnection,
});
this._queueScheduler = new QueueScheduler(this.queueName, {
connection: redisConnection,
});
}
async add(taskName: string, task: T): Promise<void> {
if (!this.jobQueue) {
return;
}
await this.jobQueue.add(taskName, task);
}
async addBulk(tasks: { name: string; data: T }[]): Promise<void> {
if (!this.jobQueue) {
return;
}
await this.jobQueue.addBulk(tasks);
}
}

View file

@ -11,7 +11,7 @@ import { Server } from "http";
import { version } from "./version";
import { recordServerVersion } from "./utils/prometheus";
import * as RedisClient from "./init/redis";
import { initJobQueue } from "./tasks/george";
import queues from "./queues";
import Logger from "./utils/logger";
async function bootServer(port: number): Promise<Server> {
@ -41,7 +41,9 @@ async function bootServer(port: number): Promise<Server> {
Logger.success("Connected to redis");
Logger.info("Initializing task queues...");
initJobQueue(RedisClient.getConnection());
queues.forEach((queue) => {
queue.init(RedisClient.getConnection());
});
Logger.success("Task queues initialized");
}

View file

@ -1,141 +0,0 @@
import type IORedis from "ioredis";
import { Queue, QueueScheduler } from "bullmq";
const QUEUE_NAME = "george-tasks";
interface GeorgeTask {
name: string;
args: any[];
}
function buildGeorgeTask(taskName: string, taskArgs: any[]): GeorgeTask {
return {
name: taskName,
args: taskArgs,
};
}
let jobQueue: Queue;
let _queueScheduler: QueueScheduler;
export function initJobQueue(redisConnection: IORedis.Redis | undefined): void {
if (jobQueue || !redisConnection) {
return;
}
jobQueue = new Queue(QUEUE_NAME, {
connection: redisConnection,
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
attempts: 3,
backoff: {
type: "exponential",
delay: 2000,
},
},
});
_queueScheduler = new QueueScheduler(QUEUE_NAME, {
connection: redisConnection,
});
}
async function addToQueue(taskName: string, task: GeorgeTask): Promise<void> {
if (!jobQueue) {
return;
}
await jobQueue.add(taskName, task);
}
async function addToQueueBulk(
tasks: { name: string; data: GeorgeTask }[]
): Promise<void> {
if (!jobQueue) {
return;
}
await jobQueue.addBulk(tasks);
}
export async function updateDiscordRole(
discordId: string,
wpm: number
): Promise<void> {
const taskName = "updateRole";
const updateDiscordRoleTask = buildGeorgeTask(taskName, [discordId, wpm]);
await addToQueue(taskName, updateDiscordRoleTask);
}
export async function linkDiscord(
discordId: string,
uid: string
): Promise<void> {
const taskName = "linkDiscord";
const linkDiscordTask = buildGeorgeTask(taskName, [discordId, uid]);
await addToQueue(taskName, linkDiscordTask);
}
export async function unlinkDiscord(
discordId: string,
uid: string
): Promise<void> {
const taskName = "unlinkDiscord";
const unlinkDiscordTask = buildGeorgeTask(taskName, [discordId, uid]);
await addToQueue(taskName, unlinkDiscordTask);
}
export async function awardChallenge(
discordId: string,
challengeName: string
): Promise<void> {
const taskName = "awardChallenge";
const awardChallengeTask = buildGeorgeTask(taskName, [
discordId,
challengeName,
]);
await addToQueue(taskName, awardChallengeTask);
}
export async function announceLeaderboardUpdate(
newRecords: any[],
leaderboardId: string
): Promise<void> {
const taskName = "announceLeaderboardUpdate";
const leaderboardUpdateTasks = newRecords.map((record) => {
const taskData = buildGeorgeTask(taskName, [
record.discordId ?? record.name,
record.rank,
leaderboardId,
record.wpm,
record.raw,
record.acc,
record.consistency,
]);
return {
name: taskName,
data: taskData,
};
});
await addToQueueBulk(leaderboardUpdateTasks);
}
export async function announceDailyLeaderboardTopResults(
leaderboardId: string,
leaderboardTimestamp: number,
topResults: any[]
): Promise<void> {
const taskName = "announceDailyLeaderboardTopResults";
const dailyLeaderboardTopResultsTask = buildGeorgeTask(taskName, [
leaderboardId,
leaderboardTimestamp,
topResults,
]);
await addToQueue(taskName, dailyLeaderboardTopResultsTask);
}