diff --git a/crates/store/src/backend/mysql/write.rs b/crates/store/src/backend/mysql/write.rs index 8d78ddbe..f206e1dd 100644 --- a/crates/store/src/backend/mysql/write.rs +++ b/crates/store/src/backend/mysql/write.rs @@ -37,23 +37,29 @@ impl MysqlStore { let mut conn = self.conn_pool.get_conn().await.map_err(into_error)?; loop { - match self.write_trx(&mut conn, &batch).await { + let err = match self.write_trx(&mut conn, &batch).await { Ok(result) => { return Ok(result); } - Err(CommitError::Mysql(Error::Server(err))) + Err(err) => err, + }; + + let _ = conn.query_drop("ROLLBACK;").await; + + match err { + CommitError::Mysql(Error::Server(err)) if [1062, 1213].contains(&err.code) && retry_count < MAX_COMMIT_ATTEMPTS && start.elapsed() < MAX_COMMIT_TIME => {} - Err(CommitError::Retry) => { + CommitError::Retry => { if retry_count > MAX_COMMIT_ATTEMPTS || start.elapsed() > MAX_COMMIT_TIME { return Err(trc::StoreEvent::AssertValueFailed.into()); } } - Err(CommitError::Mysql(err)) => { + CommitError::Mysql(err) => { return Err(into_error(err)); } - Err(CommitError::Internal(err)) => { + CommitError::Internal(err) => { return Err(err); } } @@ -318,13 +324,7 @@ impl MysqlStore { } } - match trx.commit().await { - Ok(_) => Ok(result), - Err(err) => { - let _ = conn.query_drop("ROLLBACK;").await; - Err(err.into()) - } - } + trx.commit().await.map(|_| result).map_err(Into::into) } pub(crate) async fn purge_store(&self) -> trc::Result<()> {