From 34b3d74b3a5c98a69a5387f4e1f14e92305dce4e Mon Sep 17 00:00:00 2001 From: John Children Date: Mon, 10 Nov 2025 17:36:24 +0000 Subject: [PATCH 1/6] feat: Use an async function to run migrations Enables the use of async functions to run migrations and changes the signature of `new_default_process_async_with_migrations` to take a closure that returns a future instead of a sync closure. --- src/lib.rs | 33 ++++++++++++++++++++++++++------- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index b4bb117..961ec02 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,8 +20,10 @@ mod search; pub mod synchronous; use std::fs::{metadata, set_permissions}; +use std::future::Future; use std::io::{BufRead, BufReader}; use std::path::Path; +use std::pin::Pin; use std::sync::atomic::AtomicU32; use std::sync::{Arc, Mutex, OnceLock}; use std::{fs::File, io::Write}; @@ -165,16 +167,15 @@ pub async fn new_default_process_async() -> TmpPostgrustResult Result<(), Box>, + migrate: impl Fn(&str) -> Pin>>>>, ) -> TmpPostgrustResult { let factory_mutex = TOKIO_POSTGRES_FACTORY .get_or_try_init(|| async { - TmpPostgrustFactory::try_new_async().await.map(|factory| { - factory - .run_migrations(migrate) - .expect("Failed to run migrations."); - tokio::sync::Mutex::new(Some(factory)) - }) + let factory = TmpPostgrustFactory::try_new_async().await?; + factory + .run_migrations_async(migrate) + .await?; + Ok(tokio::sync::Mutex::new(Some(factory))) }) .await?; let guard = factory_mutex.lock().await; @@ -303,6 +304,24 @@ impl TmpPostgrustFactory { Ok(()) } + /// Run migrations against the cache directory, will cause all subsequent instances + /// to be run against a version of the database where the migrations have been applied. + /// + /// # Errors + /// + /// Will error if Postgresql is unable to start or if the migrate function returns + /// an error. + pub async fn run_migrations_async( + &self, + migrate: impl FnOnce(&str) -> Pin>>>>, + ) -> TmpPostgrustResult<()> { + let process = self.start_postgresql(&self.cache_dir)?; + + migrate(&process.connection_string()).await.map_err(TmpPostgrustError::MigrationsFailed)?; + + Ok(()) + } + /// Start a new postgresql instance and return a process guard that will ensure it is cleaned /// up when dropped. #[instrument(skip(self))] From c5740e2f469ad422aa735963f3119593fb4254a9 Mon Sep 17 00:00:00 2001 From: John Children Date: Wed, 12 Nov 2025 14:04:53 +0000 Subject: [PATCH 2/6] fix: Replace box future with generic for migrate Makes the migration function a bit more flexible --- src/lib.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 961ec02..10e1bf7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,7 +23,6 @@ use std::fs::{metadata, set_permissions}; use std::future::Future; use std::io::{BufRead, BufReader}; use std::path::Path; -use std::pin::Pin; use std::sync::atomic::AtomicU32; use std::sync::{Arc, Mutex, OnceLock}; use std::{fs::File, io::Write}; @@ -166,9 +165,12 @@ pub async fn new_default_process_async() -> TmpPostgrustResult Pin>>>>, -) -> TmpPostgrustResult { +pub async fn new_default_process_async_with_migrations( + migrate: impl Fn(&str) -> F, +) -> TmpPostgrustResult +where + F: Future>> +{ let factory_mutex = TOKIO_POSTGRES_FACTORY .get_or_try_init(|| async { let factory = TmpPostgrustFactory::try_new_async().await?; @@ -311,10 +313,13 @@ impl TmpPostgrustFactory { /// /// Will error if Postgresql is unable to start or if the migrate function returns /// an error. - pub async fn run_migrations_async( + pub async fn run_migrations_async( &self, - migrate: impl FnOnce(&str) -> Pin>>>>, - ) -> TmpPostgrustResult<()> { + migrate: impl FnOnce(&str) -> F, + ) -> TmpPostgrustResult<()> + where + F: Future>>, + { let process = self.start_postgresql(&self.cache_dir)?; migrate(&process.connection_string()).await.map_err(TmpPostgrustError::MigrationsFailed)?; From 4ab058bdc5a76e4ecf9e0c5a3f49d4a543af17d5 Mon Sep 17 00:00:00 2001 From: John Children Date: Wed, 12 Nov 2025 14:10:07 +0000 Subject: [PATCH 3/6] chore: Add Send trait for async migration closure --- src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 10e1bf7..764701c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -169,7 +169,7 @@ pub async fn new_default_process_async_with_migrations( migrate: impl Fn(&str) -> F, ) -> TmpPostgrustResult where - F: Future>> + F: Future>> + Send, { let factory_mutex = TOKIO_POSTGRES_FACTORY .get_or_try_init(|| async { @@ -318,7 +318,7 @@ impl TmpPostgrustFactory { migrate: impl FnOnce(&str) -> F, ) -> TmpPostgrustResult<()> where - F: Future>>, + F: Future>> + Send, { let process = self.start_postgresql(&self.cache_dir)?; From 303a23d99e7c1ff67a91a4ab870570f38255758f Mon Sep 17 00:00:00 2001 From: John Children Date: Wed, 12 Nov 2025 14:13:24 +0000 Subject: [PATCH 4/6] chore: Remove FnOnce constraint --- src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 764701c..5c0a637 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -297,7 +297,7 @@ impl TmpPostgrustFactory { /// an error. pub fn run_migrations( &self, - migrate: impl FnOnce(&str) -> Result<(), Box>, + migrate: impl Fn(&str) -> Result<(), Box>, ) -> TmpPostgrustResult<()> { let process = self.start_postgresql(&self.cache_dir)?; @@ -315,7 +315,7 @@ impl TmpPostgrustFactory { /// an error. pub async fn run_migrations_async( &self, - migrate: impl FnOnce(&str) -> F, + migrate: impl Fn(&str) -> F, ) -> TmpPostgrustResult<()> where F: Future>> + Send, From 59ac27bf4ba22d92989a8145df11e8bc5f2a516a Mon Sep 17 00:00:00 2001 From: John Children Date: Wed, 12 Nov 2025 14:22:20 +0000 Subject: [PATCH 5/6] chore: Use BoxedFuture Seems to be the only way to get reasonable lifetimes --- Cargo.toml | 3 ++- src/lib.rs | 10 +++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2813b3b..c6e6fa1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ thiserror = "1.0" tokio = { version = "1.8", features = ["parking_lot", "rt", "sync", "io-util", "process", "macros", "fs"], default-features = false, optional = true } tracing = "0.1" which = "4.0" +futures = { version = "0.3", optional = true } [dev-dependencies] test-log = { version = "0.2", default-features = false, features = ["trace"] } @@ -31,4 +32,4 @@ tracing-subscriber = { version = "0.3", default-features = false, features = ["e [features] default = [] -tokio-process = ["tokio"] +tokio-process = ["tokio", "futures"] diff --git a/src/lib.rs b/src/lib.rs index 5c0a637..fa365c7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,7 +20,6 @@ mod search; pub mod synchronous; use std::fs::{metadata, set_permissions}; -use std::future::Future; use std::io::{BufRead, BufReader}; use std::path::Path; use std::sync::atomic::AtomicU32; @@ -166,10 +165,10 @@ pub async fn new_default_process_async() -> TmpPostgrustResult( - migrate: impl Fn(&str) -> F, + migrate: F, ) -> TmpPostgrustResult where - F: Future>> + Send, + F: for<'r> Fn(&'r str) -> futures::future::BoxFuture<'r, Result<(), Box>>, { let factory_mutex = TOKIO_POSTGRES_FACTORY .get_or_try_init(|| async { @@ -313,12 +312,13 @@ impl TmpPostgrustFactory { /// /// Will error if Postgresql is unable to start or if the migrate function returns /// an error. + #[cfg(feature = "tokio-process")] pub async fn run_migrations_async( &self, - migrate: impl Fn(&str) -> F, + migrate: F, ) -> TmpPostgrustResult<()> where - F: Future>> + Send, + F: for<'r> Fn(&'r str) -> futures::future::BoxFuture<'r, Result<(), Box>>, { let process = self.start_postgresql(&self.cache_dir)?; From c59a338a3ee6040d613a8c73808182015597f0db Mon Sep 17 00:00:00 2001 From: John Children Date: Wed, 12 Nov 2025 14:25:16 +0000 Subject: [PATCH 6/6] chore: Remove futures package futures_util is already depended on by tokio --- Cargo.toml | 4 ++-- src/lib.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c6e6fa1..c51132c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ thiserror = "1.0" tokio = { version = "1.8", features = ["parking_lot", "rt", "sync", "io-util", "process", "macros", "fs"], default-features = false, optional = true } tracing = "0.1" which = "4.0" -futures = { version = "0.3", optional = true } +futures-util = { version = "0.3", optional = true } [dev-dependencies] test-log = { version = "0.2", default-features = false, features = ["trace"] } @@ -32,4 +32,4 @@ tracing-subscriber = { version = "0.3", default-features = false, features = ["e [features] default = [] -tokio-process = ["tokio", "futures"] +tokio-process = ["tokio", "futures-util"] diff --git a/src/lib.rs b/src/lib.rs index fa365c7..595c69b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -168,7 +168,7 @@ pub async fn new_default_process_async_with_migrations( migrate: F, ) -> TmpPostgrustResult where - F: for<'r> Fn(&'r str) -> futures::future::BoxFuture<'r, Result<(), Box>>, + F: for<'r> Fn(&'r str) -> futures_util::future::BoxFuture<'r, Result<(), Box>>, { let factory_mutex = TOKIO_POSTGRES_FACTORY .get_or_try_init(|| async { @@ -318,7 +318,7 @@ impl TmpPostgrustFactory { migrate: F, ) -> TmpPostgrustResult<()> where - F: for<'r> Fn(&'r str) -> futures::future::BoxFuture<'r, Result<(), Box>>, + F: for<'r> Fn(&'r str) -> futures_util::future::BoxFuture<'r, Result<(), Box>>, { let process = self.start_postgresql(&self.cache_dir)?;