Remove from locked task queue events no longer present in the db

This commit is contained in:
mdecimus 2025-06-07 23:11:22 +02:00
parent fe074ad369
commit 10e932a43a

View file

@ -61,7 +61,13 @@ pub(crate) struct TaskManagerIpc {
tx_fts: mpsc::Sender<Task>, tx_fts: mpsc::Sender<Task>,
tx_bayes: mpsc::Sender<Task>, tx_bayes: mpsc::Sender<Task>,
tx_alarm: mpsc::Sender<Task>, tx_alarm: mpsc::Sender<Task>,
locked: AHashMap<Vec<u8>, Instant>, locked: AHashMap<Vec<u8>, Locked>,
revision: u64,
}
struct Locked {
expires: Instant,
revision: u64,
} }
pub fn spawn_task_manager(inner: Arc<Inner>) { pub fn spawn_task_manager(inner: Arc<Inner>) {
@ -144,6 +150,7 @@ pub fn spawn_task_manager(inner: Arc<Inner>) {
tx_bayes: tx_index_2, tx_bayes: tx_index_2,
tx_alarm: tx_index_3, tx_alarm: tx_index_3,
locked: Default::default(), locked: Default::default(),
revision: 0,
}; };
let rx = inner.ipc.task_tx.clone(); let rx = inner.ipc.task_tx.clone();
loop { loop {
@ -188,6 +195,7 @@ impl TaskQueueManager for Server {
let now_timestamp = now(); let now_timestamp = now();
let now = Instant::now(); let now = Instant::now();
let mut next_event = None; let mut next_event = None;
ipc.revision += 1;
let _ = self let _ = self
.core .core
.storage .storage
@ -199,18 +207,20 @@ impl TaskQueueManager for Server {
if task.due <= now_timestamp { if task.due <= now_timestamp {
match ipc.locked.entry(key.to_vec()) { match ipc.locked.entry(key.to_vec()) {
Entry::Occupied(mut entry) => { Entry::Occupied(mut entry) => {
let expires = entry.get_mut(); let locked = entry.get_mut();
if *expires <= now { if locked.expires <= now {
*expires = Instant::now() locked.expires = Instant::now()
+ std::time::Duration::from_secs(task.lock_expiry() + 1); + std::time::Duration::from_secs(task.lock_expiry() + 1);
tasks.push(task); tasks.push(task);
} }
locked.revision = ipc.revision;
} }
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
entry.insert( entry.insert(Locked {
Instant::now() expires: Instant::now()
+ std::time::Duration::from_secs(task.lock_expiry() + 1), + std::time::Duration::from_secs(task.lock_expiry() + 1),
); revision: ipc.revision,
});
tasks.push(task); tasks.push(task);
} }
} }
@ -260,7 +270,8 @@ impl TaskQueueManager for Server {
// Delete expired locks // Delete expired locks
let now = Instant::now(); let now = Instant::now();
ipc.locked.retain(|_, expires| *expires > now); ipc.locked
.retain(|_, locked| locked.expires > now && locked.revision == ipc.revision);
next_event.map_or(LONG_1Y_SLUMBER, |timestamp| { next_event.map_or(LONG_1Y_SLUMBER, |timestamp| {
Duration::from_secs(timestamp.saturating_sub(store::write::now())) Duration::from_secs(timestamp.saturating_sub(store::write::now()))
}) })