diff --git a/crates/store/src/backend/composite/read_replica.rs b/crates/store/src/backend/composite/read_replica.rs index de1b9f25..38e4c681 100644 --- a/crates/store/src/backend/composite/read_replica.rs +++ b/crates/store/src/backend/composite/read_replica.rs @@ -29,7 +29,12 @@ pub struct SQLReadReplica { } impl SQLReadReplica { - pub fn open(config: &mut Config, prefix: impl AsKey, stores: &Stores) -> Option { + pub async fn open( + config: &mut Config, + prefix: impl AsKey, + stores: &Stores, + create_tables: bool, + ) -> Option { let prefix = prefix.as_key(); let primary_id = config.value_require((&prefix, "primary"))?.to_string(); let replica_ids = config @@ -75,6 +80,30 @@ impl SQLReadReplica { } } if !replicas.is_empty() { + if create_tables { + match &primary { + #[cfg(feature = "postgres")] + Store::PostgreSQL(store) => { + if let Err(err) = store.create_tables().await { + config.new_build_error( + (&prefix, "primary"), + format!("Failed to create tables: {err}"), + ); + } + } + #[cfg(feature = "mysql")] + Store::MySQL(store) => { + if let Err(err) = store.create_tables().await { + config.new_build_error( + (&prefix, "primary"), + format!("Failed to create tables: {err}"), + ); + } + } + _ => panic!("Invalid store type"), + } + } + Some(Self { primary, replicas, diff --git a/crates/store/src/backend/mysql/main.rs b/crates/store/src/backend/mysql/main.rs index c0c9669b..8f518cc6 100644 --- a/crates/store/src/backend/mysql/main.rs +++ b/crates/store/src/backend/mysql/main.rs @@ -14,7 +14,11 @@ use crate::*; use super::{into_error, MysqlStore}; impl MysqlStore { - pub async fn open(config: &mut Config, prefix: impl AsKey) -> Option { + pub async fn open( + config: &mut Config, + prefix: impl AsKey, + create_tables: bool, + ) -> Option { let prefix = prefix.as_key(); let mut opts = OptsBuilder::default() .ip_or_hostname(config.value_require((&prefix, "host"))?.to_string()) @@ -74,14 +78,16 @@ impl MysqlStore { conn_pool: Pool::new(opts), }; - if let Err(err) = db.create_tables().await { - config.new_build_error(prefix.as_str(), format!("Failed to create tables: {err}")); + if create_tables { + if let Err(err) = db.create_tables().await { + config.new_build_error(prefix.as_str(), format!("Failed to create tables: {err}")); + } } Some(db) } - pub(super) async fn create_tables(&self) -> trc::Result<()> { + pub(crate) async fn create_tables(&self) -> trc::Result<()> { let mut conn = self.conn_pool.get_conn().await.map_err(into_error)?; for table in [ diff --git a/crates/store/src/backend/postgres/main.rs b/crates/store/src/backend/postgres/main.rs index b2700088..5195c4da 100644 --- a/crates/store/src/backend/postgres/main.rs +++ b/crates/store/src/backend/postgres/main.rs @@ -15,7 +15,11 @@ use tokio_postgres::NoTls; use utils::{config::utils::AsKey, rustls_client_config}; impl PostgresStore { - pub async fn open(config: &mut utils::config::Config, prefix: impl AsKey) -> Option { + pub async fn open( + config: &mut utils::config::Config, + prefix: impl AsKey, + create_tables: bool, + ) -> Option { let prefix = prefix.as_key(); let mut cfg = Config::new(); cfg.dbname = config @@ -61,14 +65,16 @@ impl PostgresStore { .ok()?, }; - if let Err(err) = db.create_tables().await { - config.new_build_error(prefix.as_str(), format!("Failed to create tables: {err}")); + if create_tables { + if let Err(err) = db.create_tables().await { + config.new_build_error(prefix.as_str(), format!("Failed to create tables: {err}")); + } } Some(db) } - pub(super) async fn create_tables(&self) -> trc::Result<()> { + pub(crate) async fn create_tables(&self) -> trc::Result<()> { let conn = self.conn_pool.get().await.map_err(into_error)?; for table in [ diff --git a/crates/store/src/config.rs b/crates/store/src/config.rs index 71fd4ed1..bfe13c89 100644 --- a/crates/store/src/config.rs +++ b/crates/store/src/config.rs @@ -130,7 +130,11 @@ impl Stores { } #[cfg(feature = "postgres")] "postgresql" => { - if let Some(db) = PostgresStore::open(config, prefix).await.map(Store::from) { + if let Some(db) = + PostgresStore::open(config, prefix, config.is_active_store(id)) + .await + .map(Store::from) + { self.stores.insert(store_id.clone(), db.clone()); self.fts_stores.insert(store_id.clone(), db.clone().into()); self.blob_stores.insert( @@ -142,7 +146,10 @@ impl Stores { } #[cfg(feature = "mysql")] "mysql" => { - if let Some(db) = MysqlStore::open(config, prefix).await.map(Store::from) { + if let Some(db) = MysqlStore::open(config, prefix, config.is_active_store(id)) + .await + .map(Store::from) + { self.stores.insert(store_id.clone(), db.clone()); self.fts_stores.insert(store_id.clone(), db.clone().into()); self.blob_stores.insert( @@ -222,10 +229,15 @@ impl Stores { for (id, protocol) in composite_stores { let prefix = ("store", id.as_str()); match protocol.as_str() { - "composite-read" => { + "sql-read-replica" => { if let Some(db) = crate::backend::composite::read_replica::SQLReadReplica::open( - config, prefix, self, - ) { + config, + prefix, + self, + config.is_active_store(&id), + ) + .await + { self.stores.insert(id, Store::SQLReadReplica(db.into())); } } @@ -346,3 +358,26 @@ impl Stores { } } } + +trait IsActiveStore { + fn is_active_store(&self, id: &str) -> bool; +} + +impl IsActiveStore for Config { + fn is_active_store(&self, id: &str) -> bool { + for key in [ + "storage.data", + "storage.blob", + "storage.lookup", + "storage.fts", + ] { + if let Some(store_id) = self.value(key) { + if store_id == id { + return true; + } + } + } + + false + } +}