diff --git a/crates/store/src/backend/s3/mod.rs b/crates/store/src/backend/s3/mod.rs index cfbc4e61..f519380d 100644 --- a/crates/store/src/backend/s3/mod.rs +++ b/crates/store/src/backend/s3/mod.rs @@ -15,6 +15,7 @@ use utils::{ pub struct S3Store { bucket: Bucket, prefix: Option, + max_retries: u32, } impl S3Store { @@ -64,6 +65,9 @@ impl S3Store { config.new_build_error(prefix.as_str(), format!("Failed to create bucket: {err:?}")) }) .ok()?, + max_retries: config + .property_or_default((&prefix, "max-retries"), "3") + .unwrap_or(3), prefix: config.value((&prefix, "key-prefix")).map(|s| s.to_string()), }) } @@ -74,56 +78,101 @@ impl S3Store { range: Range, ) -> trc::Result>> { let path = self.build_key(key); - let response = if range.start != 0 || range.end != usize::MAX { - self.bucket - .get_object_range( - path, - range.start as u64, - Some(range.end.saturating_sub(1) as u64), - ) - .await - } else { - self.bucket.get_object(path).await - } - .map_err(into_error)?; + let mut retries_left = self.max_retries; - match response.status_code() { - 200..=299 => Ok(Some(response.to_vec())), - 404 => Ok(None), - code => Err(trc::StoreEvent::S3Error - .reason(String::from_utf8_lossy(response.as_slice())) - .ctx(trc::Key::Code, code)), + loop { + let response = if range.start != 0 || range.end != usize::MAX { + self.bucket + .get_object_range( + &path, + range.start as u64, + Some(range.end.saturating_sub(1) as u64), + ) + .await + } else { + self.bucket.get_object(&path).await + } + .map_err(into_error)?; + + match response.status_code() { + 200..=299 => return Ok(Some(response.to_vec())), + 404 => return Ok(None), + 500..=599 if retries_left > 0 => { + // wait backoff + tokio::time::sleep(Duration::from_secs( + 1 << (self.max_retries - retries_left).max(16), + )) + .await; + + retries_left -= 1; + } + code => { + return Err(trc::StoreEvent::S3Error + .reason(String::from_utf8_lossy(response.as_slice())) + .ctx(trc::Key::Code, code)) + } + } } } pub(crate) async fn put_blob(&self, key: &[u8], data: &[u8]) -> trc::Result<()> { - let response = self - .bucket - .put_object(self.build_key(key), data) - .await - .map_err(into_error)?; + let mut retries_left = self.max_retries; - match response.status_code() { - 200..=299 => Ok(()), - code => Err(trc::StoreEvent::S3Error - .reason(String::from_utf8_lossy(response.as_slice())) - .ctx(trc::Key::Code, code)), + loop { + let response = self + .bucket + .put_object(self.build_key(key), data) + .await + .map_err(into_error)?; + + match response.status_code() { + 200..=299 => return Ok(()), + 500..=599 if retries_left > 0 => { + // wait backoff + tokio::time::sleep(Duration::from_secs( + 1 << (self.max_retries - retries_left).max(16), + )) + .await; + + retries_left -= 1; + } + code => { + return Err(trc::StoreEvent::S3Error + .reason(String::from_utf8_lossy(response.as_slice())) + .ctx(trc::Key::Code, code)) + } + } } } pub(crate) async fn delete_blob(&self, key: &[u8]) -> trc::Result { - let response = self - .bucket - .delete_object(self.build_key(key)) - .await - .map_err(into_error)?; + let mut retries_left = self.max_retries; - match response.status_code() { - 200..=299 => Ok(true), - 404 => Ok(false), - code => Err(trc::StoreEvent::S3Error - .reason(String::from_utf8_lossy(response.as_slice())) - .ctx(trc::Key::Code, code)), + loop { + let response = self + .bucket + .delete_object(self.build_key(key)) + .await + .map_err(into_error)?; + + match response.status_code() { + 200..=299 => return Ok(true), + 404 => return Ok(false), + 500..=599 if retries_left > 0 => { + // wait backoff + tokio::time::sleep(Duration::from_secs( + 1 << (self.max_retries - retries_left).max(16), + )) + .await; + + retries_left -= 1; + } + code => { + return Err(trc::StoreEvent::S3Error + .reason(String::from_utf8_lossy(response.as_slice())) + .ctx(trc::Key::Code, code)) + } + } } }