diff --git a/crates/services/src/task_manager/mod.rs b/crates/services/src/task_manager/mod.rs index 51104c06..422a80db 100644 --- a/crates/services/src/task_manager/mod.rs +++ b/crates/services/src/task_manager/mod.rs @@ -61,7 +61,13 @@ pub(crate) struct TaskManagerIpc { tx_fts: mpsc::Sender, tx_bayes: mpsc::Sender, tx_alarm: mpsc::Sender, - locked: AHashMap, Instant>, + locked: AHashMap, Locked>, + revision: u64, +} + +struct Locked { + expires: Instant, + revision: u64, } pub fn spawn_task_manager(inner: Arc) { @@ -144,6 +150,7 @@ pub fn spawn_task_manager(inner: Arc) { tx_bayes: tx_index_2, tx_alarm: tx_index_3, locked: Default::default(), + revision: 0, }; let rx = inner.ipc.task_tx.clone(); loop { @@ -188,6 +195,7 @@ impl TaskQueueManager for Server { let now_timestamp = now(); let now = Instant::now(); let mut next_event = None; + ipc.revision += 1; let _ = self .core .storage @@ -199,18 +207,20 @@ impl TaskQueueManager for Server { if task.due <= now_timestamp { match ipc.locked.entry(key.to_vec()) { Entry::Occupied(mut entry) => { - let expires = entry.get_mut(); - if *expires <= now { - *expires = Instant::now() + let locked = entry.get_mut(); + if locked.expires <= now { + locked.expires = Instant::now() + std::time::Duration::from_secs(task.lock_expiry() + 1); tasks.push(task); } + locked.revision = ipc.revision; } Entry::Vacant(entry) => { - entry.insert( - Instant::now() + entry.insert(Locked { + expires: Instant::now() + std::time::Duration::from_secs(task.lock_expiry() + 1), - ); + revision: ipc.revision, + }); tasks.push(task); } } @@ -260,7 +270,8 @@ impl TaskQueueManager for Server { // Delete expired locks let now = Instant::now(); - ipc.locked.retain(|_, expires| *expires > now); + ipc.locked + .retain(|_, locked| locked.expires > now && locked.revision == ipc.revision); next_event.map_or(LONG_1Y_SLUMBER, |timestamp| { Duration::from_secs(timestamp.saturating_sub(store::write::now())) })