Add later queue (#3789) bruception

* Add later queue

* Fix test

* renamed property

* update name

* missing rename

* try 23 times

Co-authored-by: Miodec <jack@monkeytype.com>
This commit is contained in:
Bruce Berrios 2023-01-23 08:07:11 -05:00 committed by GitHub
parent 4bfa2739dd
commit 74fe4c864d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 205 additions and 85 deletions

View file

@ -22,6 +22,7 @@ const dailyLeaderboardsConfig = {
dailyLeaderboardCacheSize: 3,
topResultsToAnnounce: 3,
xpRewardBrackets: [],
scheduleRewardsModeRules: [],
};
describe("Daily Leaderboards", () => {

View file

@ -71,6 +71,7 @@ export const BASE_CONFIGURATION: MonkeyTypes.Configuration = {
maxResults: 0,
leaderboardExpirationTimeInDays: 0,
validModeRules: [],
scheduleRewardsModeRules: [],
// GOTCHA! MUST ATLEAST BE 1, LRUCache module will make process crash and die
dailyLeaderboardCacheSize: 1,
topResultsToAnnounce: 1, // This should never be 0. Setting to zero will announce all results.
@ -398,6 +399,28 @@ export const CONFIGURATION_FORM_SCHEMA: ObjectSchema = {
},
},
},
scheduleRewardsModeRules: {
type: "array",
label: "Schedule Rewards Mode Rules",
items: {
type: "object",
label: "Rule",
fields: {
language: {
type: "string",
label: "Language",
},
mode: {
type: "string",
label: "Mode",
},
mode2: {
type: "string",
label: "Secondary Mode",
},
},
},
},
dailyLeaderboardCacheSize: {
type: "number",
label: "Daily Leaderboard Cache Size",

View file

@ -1,5 +1,4 @@
import updateLeaderboards from "./update-leaderboards";
import deleteOldLogs from "./delete-old-logs";
import announceDailyLeaderboards from "./announce-daily-leaderboards";
export default [updateLeaderboards, deleteOldLogs, announceDailyLeaderboards];
export default [updateLeaderboards, deleteOldLogs];

View file

@ -1,3 +1,4 @@
import LaterQueue from "./later-queue";
import GeorgeQueue from "./george-queue";
export default [GeorgeQueue];
export default [GeorgeQueue, LaterQueue];

View file

@ -0,0 +1,69 @@
import LRUCache from "lru-cache";
import Logger from "../utils/logger";
import { MonkeyQueue } from "./monkey-queue";
import { getCurrentDayTimestamp } from "../utils/misc";
const QUEUE_NAME = "later";
type LaterTasks = "daily-leaderboard-results";
export interface LaterTask {
taskName: LaterTasks;
ctx: any;
}
const ONE_MINUTE_IN_MILLISECONDS = 1000 * 60;
const ONE_DAY_IN_MILLISECONDS = 1000 * 60 * 60 * 24;
class LaterQueue extends MonkeyQueue<LaterTask> {
private scheduledJobCache = new LRUCache<string, boolean>({
max: 100,
});
async scheduleForTomorrow(
taskName: LaterTasks,
taskId: string,
taskContext: any
): Promise<void> {
const currentDayTimestamp = getCurrentDayTimestamp();
const jobId = `${taskName}:${currentDayTimestamp}:${taskId}`;
if (this.scheduledJobCache.has(jobId)) {
return;
}
const task: LaterTask = {
taskName,
ctx: {
...taskContext,
yesterdayTimestamp: currentDayTimestamp,
},
};
const nowTimestamp = Date.now();
const delay =
currentDayTimestamp +
ONE_DAY_IN_MILLISECONDS -
nowTimestamp +
ONE_MINUTE_IN_MILLISECONDS;
await this.add("todo-tomorrow", task, {
delay,
jobId, // Prevent duplicate jobs
backoff: 60 * ONE_MINUTE_IN_MILLISECONDS, // Try again every hour on failure
attempts: 23,
});
this.scheduledJobCache.set(jobId, true);
Logger.info(`Scheduled ${taskName} for ${new Date(nowTimestamp + delay)}`);
}
}
export default new LaterQueue(QUEUE_NAME, {
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: false,
},
});

View file

@ -1,11 +1,17 @@
import IORedis from "ioredis";
import { Queue, QueueOptions, QueueScheduler } from "bullmq";
import {
BulkJobOptions,
JobsOptions,
Queue,
QueueOptions,
QueueScheduler,
} from "bullmq";
export class MonkeyQueue<T> {
jobQueue: Queue;
_queueScheduler: QueueScheduler;
queueName: string;
queueOpts: QueueOptions;
private jobQueue: Queue;
private _queueScheduler: QueueScheduler;
public readonly queueName: string;
private queueOpts: QueueOptions;
constructor(queueName: string, queueOpts: QueueOptions) {
this.queueName = queueName;
@ -27,15 +33,17 @@ export class MonkeyQueue<T> {
});
}
async add(taskName: string, task: T): Promise<void> {
async add(taskName: string, task: T, jobOpts?: JobsOptions): Promise<void> {
if (!this.jobQueue) {
return;
}
await this.jobQueue.add(taskName, task);
await this.jobQueue.add(taskName, task, jobOpts);
}
async addBulk(tasks: { name: string; data: T }[]): Promise<void> {
async addBulk(
tasks: { name: string; data: T; opts?: BulkJobOptions }[]
): Promise<void> {
if (!this.jobQueue) {
return;
}

View file

@ -12,6 +12,7 @@ import { version } from "./version";
import { recordServerVersion } from "./utils/prometheus";
import * as RedisClient from "./init/redis";
import queues from "./queues";
import workers from "./workers";
import Logger from "./utils/logger";
async function bootServer(port: number): Promise<Server> {
@ -39,12 +40,19 @@ async function bootServer(port: number): Promise<Server> {
if (RedisClient.isConnected()) {
Logger.success("Connected to redis");
const connection = RedisClient.getConnection();
Logger.info("Initializing task queues...");
Logger.info("Initializing queues...");
queues.forEach((queue) => {
queue.init(RedisClient.getConnection());
queue.init(connection);
});
Logger.success("Task queues initialized");
Logger.success("Queues initialized");
Logger.info("Initializing workers...");
workers.forEach((worker) => {
worker(connection).run();
});
Logger.success("Workers initialized");
}
initializeDailyLeaderboardsCache(liveConfiguration.dailyLeaderboards);

View file

@ -77,6 +77,7 @@ declare namespace MonkeyTypes {
leaderboardExpirationTimeInDays: number;
maxResults: number;
validModeRules: ValidModeRule[];
scheduleRewardsModeRules: ValidModeRule[];
dailyLeaderboardCacheSize: number;
topResultsToAnnounce: number;
xpRewardBrackets: RewardBracket[];

View file

@ -1,6 +1,7 @@
import _ from "lodash";
import LRUCache from "lru-cache";
import * as RedisClient from "../init/redis";
import LaterQueue from "../queues/later-queue";
import { getCurrentDayTimestamp, matchesAPattern, kogascore } from "./misc";
interface DailyLeaderboardEntry {
@ -27,12 +28,16 @@ export class DailyLeaderboard {
private leaderboardScoresKeyName: string;
private leaderboardModeKey: string;
private customTime: number;
private modeRule: MonkeyTypes.ValidModeRule;
constructor(modeRule: MonkeyTypes.ValidModeRule, customTime = -1) {
const { language, mode, mode2 } = modeRule;
constructor(language: string, mode: string, mode2: string, customTime = -1) {
this.leaderboardModeKey = `${language}:${mode}:${mode2}`;
this.leaderboardResultsKeyName = `${resultsNamespace}:${this.leaderboardModeKey}`;
this.leaderboardScoresKeyName = `${scoresNamespace}:${this.leaderboardModeKey}`;
this.customTime = customTime;
this.modeRule = modeRule;
}
private getTodaysLeaderboardKeys(): {
@ -87,6 +92,21 @@ export class DailyLeaderboard {
JSON.stringify(entry)
);
if (
isValidModeRule(
this.modeRule,
dailyLeaderboardsConfig.scheduleRewardsModeRules
)
) {
await LaterQueue.scheduleForTomorrow(
"daily-leaderboard-results",
this.leaderboardModeKey,
{
modeRule: this.modeRule,
}
);
}
if (rank === null) {
return -1;
}
@ -183,6 +203,20 @@ export function initializeDailyLeaderboardsCache(
});
}
function isValidModeRule(
modeRule: MonkeyTypes.ValidModeRule,
modeRules: MonkeyTypes.ValidModeRule[]
): boolean {
const { language, mode, mode2 } = modeRule;
return modeRules.some((rule) => {
const matchesLanguage = matchesAPattern(language, rule.language);
const matchesMode = matchesAPattern(mode, rule.mode);
const matchesMode2 = matchesAPattern(mode2, rule.mode2);
return matchesLanguage && matchesMode && matchesMode2;
});
}
export function getDailyLeaderboard(
language: string,
mode: string,
@ -192,12 +226,8 @@ export function getDailyLeaderboard(
): DailyLeaderboard | null {
const { validModeRules, enabled } = dailyLeaderboardsConfig;
const isValidMode = validModeRules.some((rule) => {
const matchesLanguage = matchesAPattern(language, rule.language);
const matchesMode = matchesAPattern(mode, rule.mode);
const matchesMode2 = matchesAPattern(mode2, rule.mode2);
return matchesLanguage && matchesMode && matchesMode2;
});
const modeRule = { language, mode, mode2 };
const isValidMode = isValidModeRule(modeRule, validModeRules);
if (!enabled || !isValidMode || !DAILY_LEADERBOARDS) {
return null;
@ -206,12 +236,7 @@ export function getDailyLeaderboard(
const key = `${language}:${mode}:${mode2}:${customTimestamp}`;
if (!DAILY_LEADERBOARDS.has(key)) {
const dailyLeaderboard = new DailyLeaderboard(
language,
mode,
mode2,
customTimestamp
);
const dailyLeaderboard = new DailyLeaderboard(modeRule, customTimestamp);
DAILY_LEADERBOARDS.set(key, dailyLeaderboard);
}

View file

@ -0,0 +1,3 @@
import LaterWorker from "./later-worker";
export default [LaterWorker];

View file

@ -1,46 +1,31 @@
import _ from "lodash";
import { CronJob } from "cron";
import {
getCurrentDayTimestamp,
getOrdinalNumberString,
mapRange,
} from "../utils/misc";
import { getCachedConfiguration } from "../init/configuration";
import { DailyLeaderboard } from "../utils/daily-leaderboards";
import GeorgeQueue from "../queues/george-queue";
import IORedis from "ioredis";
import { Worker, Job } from "bullmq";
import Logger from "../utils/logger";
import { addToInboxBulk } from "../dal/user";
import GeorgeQueue from "../queues/george-queue";
import { buildMonkeyMail } from "../utils/monkey-mail";
import { DailyLeaderboard } from "../utils/daily-leaderboards";
import { getCachedConfiguration } from "../init/configuration";
import { getOrdinalNumberString, mapRange } from "../utils/misc";
import LaterQueue, { LaterTask } from "../queues/later-queue";
const CRON_SCHEDULE = "1 0 * * *"; // At 00:01.
const ONE_DAY_IN_MILLISECONDS = 24 * 60 * 60 * 1000;
interface DailyLeaderboardMailContext {
yesterdayTimestamp: number;
modeRule: MonkeyTypes.ValidModeRule;
}
const leaderboardsToAnnounce = [
{
language: "english",
mode: "time",
mode2: "15",
},
{
language: "english",
mode: "time",
mode2: "60",
},
];
async function announceDailyLeaderboard(
language: string,
mode: string,
mode2: string,
dailyLeaderboardsConfig: MonkeyTypes.Configuration["dailyLeaderboards"],
inboxConfig: MonkeyTypes.Configuration["users"]["inbox"]
async function handleDailyLeaderboardResults(
ctx: DailyLeaderboardMailContext
): Promise<void> {
const yesterday = getCurrentDayTimestamp() - ONE_DAY_IN_MILLISECONDS;
const dailyLeaderboard = new DailyLeaderboard(
language,
mode,
mode2,
yesterday
);
const { yesterdayTimestamp, modeRule } = ctx;
const { language, mode, mode2 } = modeRule;
const {
dailyLeaderboards: dailyLeaderboardsConfig,
users: { inbox: inboxConfig },
} = await getCachedConfiguration(false);
const dailyLeaderboard = new DailyLeaderboard(modeRule, yesterdayTimestamp);
const allResults = await dailyLeaderboard.getResults(
0,
@ -51,6 +36,7 @@ async function announceDailyLeaderboard(
if (allResults.length === 0) {
return;
}
const { maxResults, xpRewardBrackets } = dailyLeaderboardsConfig;
if (inboxConfig.enabled && xpRewardBrackets.length > 0) {
@ -108,32 +94,28 @@ async function announceDailyLeaderboard(
const leaderboardId = `${mode} ${mode2} ${language}`;
await GeorgeQueue.announceDailyLeaderboardTopResults(
leaderboardId,
yesterday,
yesterdayTimestamp,
topResults
);
}
async function announceDailyLeaderboards(): Promise<void> {
const {
dailyLeaderboards,
users: { inbox },
maintenance,
} = await getCachedConfiguration();
if (!dailyLeaderboards.enabled || maintenance) {
return;
async function jobHandler(job: Job): Promise<void> {
const { taskName, ctx }: LaterTask = job.data;
Logger.info(`Starting job: ${taskName}`);
const start = performance.now();
if (taskName === "daily-leaderboard-results") {
await handleDailyLeaderboardResults(ctx);
}
await Promise.allSettled(
leaderboardsToAnnounce.map(async ({ language, mode, mode2 }) => {
return announceDailyLeaderboard(
language,
mode,
mode2,
dailyLeaderboards,
inbox
);
})
);
const elapsed = performance.now() - start;
Logger.success(`Job: ${taskName} - completed in ${elapsed}ms`);
}
export default new CronJob(CRON_SCHEDULE, announceDailyLeaderboards);
export default (redisConnection?: IORedis.Redis): Worker =>
new Worker(LaterQueue.queueName, jobHandler, {
autorun: false,
connection: redisConnection,
});