diff --git a/backend/api/controllers/result.ts b/backend/api/controllers/result.ts index 9d07d06ad..02b573064 100644 --- a/backend/api/controllers/result.ts +++ b/backend/api/controllers/result.ts @@ -22,7 +22,7 @@ import { } from "../../anticheat/index"; import MonkeyStatusCodes from "../../constants/monkey-status-codes"; import { incrementResult } from "../../utils/prometheus"; -import George from "../../tasks/george"; +import * as George from "../../tasks/george"; try { if (anticheatImplemented() === false) throw new Error("undefined"); diff --git a/backend/api/controllers/user.ts b/backend/api/controllers/user.ts index 0dd517f01..313110a3e 100644 --- a/backend/api/controllers/user.ts +++ b/backend/api/controllers/user.ts @@ -5,7 +5,7 @@ import Logger from "../../utils/logger"; import { MonkeyResponse } from "../../utils/monkey-response"; import { linkAccount } from "../../utils/discord"; import { buildAgentLog } from "../../utils/misc"; -import George from "../../tasks/george"; +import * as George from "../../tasks/george"; import admin from "firebase-admin"; export async function createNewUser( diff --git a/backend/jobs/update-leaderboards.ts b/backend/jobs/update-leaderboards.ts index bf3f605e8..ed775e965 100644 --- a/backend/jobs/update-leaderboards.ts +++ b/backend/jobs/update-leaderboards.ts @@ -1,6 +1,6 @@ import { CronJob } from "cron"; import { announceLbUpdate } from "../dal/bot"; -import George from "../tasks/george"; +import * as George from "../tasks/george"; import * as LeaderboardsDAL from "../dal/leaderboards"; import { getCachedConfiguration } from "../init/configuration"; diff --git a/backend/server.ts b/backend/server.ts index 396bdb647..68754fce4 100644 --- a/backend/server.ts +++ b/backend/server.ts @@ -10,7 +10,7 @@ import { Server } from "http"; import { version } from "./version"; import { recordServerVersion } from "./utils/prometheus"; import * as RedisClient from "./init/redis"; -import George from "./tasks/george"; +import { initJobQueue } from "./tasks/george"; import Logger from "./utils/logger"; async function bootServer(port: number): Promise { @@ -38,7 +38,7 @@ async function bootServer(port: number): Promise { Logger.success("Connected to redis"); Logger.info("Initializing task queues..."); - George.initJobQueue(RedisClient.getConnection()); + await initJobQueue(RedisClient.getConnection()); Logger.success("Task queues initialized"); } diff --git a/backend/tasks/george.ts b/backend/tasks/george.ts index 2f92e9847..4c3d19d1f 100644 --- a/backend/tasks/george.ts +++ b/backend/tasks/george.ts @@ -1,7 +1,5 @@ -import { lock } from "../utils/misc"; import type IORedis from "ioredis"; import { Queue, QueueScheduler } from "bullmq"; -import { isConnected } from "../init/redis"; const QUEUE_NAME = "george-tasks"; @@ -17,89 +15,115 @@ function buildGeorgeTask(command: string, taskArguments: any[]): GeorgeTask { }; } -class George { - static jobQueue: Queue; - static jobQueueScheduler: QueueScheduler; +let jobQueue: Queue; +let jobQueueScheduler: QueueScheduler; - static initJobQueue(redisConnection: IORedis.Redis | undefined): void { - this.jobQueue = new Queue(QUEUE_NAME, { - connection: redisConnection, - defaultJobOptions: { - removeOnComplete: true, - removeOnFail: true, - attempts: 3, - backoff: { - type: "exponential", - delay: 2000, - }, +export async function initJobQueue( + redisConnection: IORedis.Redis | undefined +): Promise { + if (jobQueue || !redisConnection) { + return; + } + + jobQueue = new Queue(QUEUE_NAME, { + connection: redisConnection, + defaultJobOptions: { + removeOnComplete: true, + removeOnFail: true, + attempts: 3, + backoff: { + type: "exponential", + delay: 2000, }, - }); + }, + }); - this.jobQueueScheduler = new QueueScheduler(QUEUE_NAME, { - connection: redisConnection, - }); - } - - static async updateDiscordRole( - discordId: string, - wpm: number - ): Promise { - const command = "updateRole"; - const updateDiscordRoleTask = buildGeorgeTask(command, [discordId, wpm]); - await this.jobQueue.add(command, updateDiscordRoleTask); - } - - static async linkDiscord(discordId: string, uid: string): Promise { - const command = "linkDiscord"; - const linkDiscordTask = buildGeorgeTask(command, [discordId, uid]); - await this.jobQueue.add(command, linkDiscordTask); - } - - static async unlinkDiscord(discordId: string, uid: string): Promise { - const command = "unlinkDiscord"; - const unlinkDiscordTask = buildGeorgeTask(command, [discordId, uid]); - await this.jobQueue.add(command, unlinkDiscordTask); - } - - static async awardChallenge( - discordId: string, - challengeName: string - ): Promise { - const command = "awardChallenge"; - const awardChallengeTask = buildGeorgeTask(command, [ - discordId, - challengeName, - ]); - await this.jobQueue.add(command, awardChallengeTask); - } - - static async announceLbUpdate( - newRecords: any[], - leaderboardId: string - ): Promise { - const command = "announceLbUpdate"; - - const leaderboardUpdateTasks = newRecords.map((record) => { - const taskData = buildGeorgeTask(command, [ - record.discordId ?? record.name, - record.rank, - leaderboardId, - record.wpm, - record.raw, - record.acc, - record.consistency, - ]); - - return { - name: command, - data: taskData, - }; - }); - - await this.jobQueue.addBulk(leaderboardUpdateTasks); - } + jobQueueScheduler = new QueueScheduler(QUEUE_NAME, { + autorun: false, + connection: redisConnection, + }); + await jobQueueScheduler.run(); } -export default lock(George, () => { - return !isConnected(); -}); +async function addToQueue(command: string, task: GeorgeTask): Promise { + if (!jobQueue) { + return; + } + + await jobQueue.add(command, task); +} + +async function addToQueueBulk( + tasks: { name: string; data: GeorgeTask }[] +): Promise { + if (!jobQueue) { + return; + } + + await jobQueue.addBulk(tasks); +} + +export async function updateDiscordRole( + discordId: string, + wpm: number +): Promise { + const command = "updateRole"; + const updateDiscordRoleTask = buildGeorgeTask(command, [discordId, wpm]); + await addToQueue(command, updateDiscordRoleTask); +} + +export async function linkDiscord( + discordId: string, + uid: string +): Promise { + const command = "linkDiscord"; + const linkDiscordTask = buildGeorgeTask(command, [discordId, uid]); + await addToQueue(command, linkDiscordTask); +} + +export async function unlinkDiscord( + discordId: string, + uid: string +): Promise { + const command = "unlinkDiscord"; + const unlinkDiscordTask = buildGeorgeTask(command, [discordId, uid]); + await addToQueue(command, unlinkDiscordTask); +} + +export async function awardChallenge( + discordId: string, + challengeName: string +): Promise { + const command = "awardChallenge"; + const awardChallengeTask = buildGeorgeTask(command, [ + discordId, + challengeName, + ]); + await addToQueue(command, awardChallengeTask); +} + +export async function announceLbUpdate( + newRecords: any[], + leaderboardId: string +): Promise { + const command = "announceLbUpdate"; + + const leaderboardUpdateTasks = newRecords.map((record) => { + const taskData = buildGeorgeTask(command, [ + record.discordId ?? record.name, + record.rank, + leaderboardId, + record.wpm, + record.raw, + record.acc, + record.consistency, + ]); + + return { + name: command, + data: taskData, + }; + }); + + await addToQueueBulk(leaderboardUpdateTasks); +} diff --git a/backend/utils/misc.ts b/backend/utils/misc.ts index bb0dde8e4..6bcf91da4 100644 --- a/backend/utils/misc.ts +++ b/backend/utils/misc.ts @@ -80,34 +80,3 @@ export function padNumbers( number.toString().padStart(maxLength, fillString) ); } - -/** - * Locks all static methods of a class. - * @param target The class to lock. - * @param isLocked Callback to determine if a static method is locked. - * @returns The locked class. - */ -export function lock(target: T, isLocked: () => boolean): T { - const propertyNames = Object.getOwnPropertyNames(target); - - propertyNames.forEach((propertyName) => { - const descriptor = Object.getOwnPropertyDescriptor(target, propertyName); - - const isMethod = descriptor?.value instanceof Function; - if (!isMethod) { - return; - } - - const originalMethod = descriptor.value; - descriptor.value = function (...args: any[]): any { - if (isLocked()) { - return; - } - return originalMethod.apply(this, args); - }; - - Object.defineProperty(target, propertyName, descriptor); - }); - - return target; -}