fixed session recordings not getting cleaned up - fixes #310

This commit is contained in:
Eugene Pankov 2022-10-02 12:41:23 +02:00
parent fd993c42f2
commit a8e21b38d4
No known key found for this signature in database
GPG key ID: 5896FCBBDD1CF4F4
3 changed files with 44 additions and 7 deletions

View file

@ -5,7 +5,7 @@ use anyhow::Result;
use sea_orm::sea_query::Expr;
use sea_orm::{
ActiveModelTrait, ColumnTrait, ConnectOptions, Database, DatabaseConnection, EntityTrait,
QueryFilter, TransactionTrait,
ModelTrait, QueryFilter, TransactionTrait,
};
use tracing::*;
use uuid::Uuid;
@ -20,6 +20,7 @@ use warpgate_db_entities::{
use warpgate_db_migrations::migrate_database;
use crate::consts::{BUILTIN_ADMIN_ROLE_NAME, BUILTIN_ADMIN_TARGET_NAME};
use crate::recordings::SessionRecordings;
pub async fn connect_to_db(config: &WarpgateConfig) -> Result<DatabaseConnection> {
let mut url = url::Url::parse(&config.store.database_url.expose_secret()[..])?;
@ -281,7 +282,11 @@ async fn migrate_config_into_db(
Ok(())
}
pub async fn cleanup_db(db: &mut DatabaseConnection, retention: &Duration) -> Result<()> {
pub async fn cleanup_db(
db: &mut DatabaseConnection,
recordings: &mut SessionRecordings,
retention: &Duration,
) -> Result<()> {
use warpgate_db_entities::{Recording, Session};
let cutoff = chrono::Utc::now() - chrono::Duration::from_std(*retention)?;
@ -290,12 +295,22 @@ pub async fn cleanup_db(db: &mut DatabaseConnection, retention: &Duration) -> Re
.exec(db)
.await?;
Recording::Entity::delete_many()
let recordings_to_delete = Recording::Entity::find()
.filter(Expr::col(Session::Column::Ended).is_not_null())
.filter(Expr::col(Session::Column::Ended).lt(cutoff))
.exec(db)
.all(db)
.await?;
for recording in recordings_to_delete {
if let Err(error) = recordings
.remove(&recording.session_id, &recording.name)
.await
{
error!(session=%recording.session_id, name=%recording.name, %error, "Failed to remove recording");
}
recording.delete(db).await?;
}
Session::Entity::delete_many()
.filter(Expr::col(Session::Column::Ended).is_not_null())
.filter(Expr::col(Session::Column::Ended).lt(cutoff))

View file

@ -1,5 +1,5 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use bytes::Bytes;
@ -104,7 +104,23 @@ impl SessionRecordings {
live.get(id).map(|sender| sender.subscribe())
}
pub fn path_for(&self, session_id: &SessionId, name: &dyn AsRef<std::path::Path>) -> PathBuf {
pub async fn remove<P: AsRef<Path>>(&self, session_id: &SessionId, name: P) -> Result<()> {
let path = self.path_for(session_id, name);
tokio::fs::remove_file(&path).await?;
if let Some(parent) = path.parent() {
if tokio::fs::read_dir(parent)
.await?
.next_entry()
.await?
.is_none()
{
tokio::fs::remove_dir(parent).await?;
}
}
Ok(())
}
pub fn path_for<P: AsRef<Path>>(&self, session_id: &SessionId, name: P) -> PathBuf {
self.path.join(session_id.to_string()).join(&name)
}
}

View file

@ -57,7 +57,13 @@ pub(crate) async fn command(cli: &crate::Cli) -> Result<()> {
let retention = { services.config.lock().await.store.log.retention };
let interval = retention / 10;
#[allow(clippy::explicit_auto_deref)]
match cleanup_db(&mut *services.db.lock().await, &retention).await {
match cleanup_db(
&mut *services.db.lock().await,
&mut *services.recordings.lock().await,
&retention,
)
.await
{
Err(error) => error!(?error, "Failed to cleanup the database"),
Ok(_) => debug!("Database cleaned up, next in {:?}", interval),
}