Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ thiserror = "1.0"
tokio = { version = "1.8", features = ["parking_lot", "rt", "sync", "io-util", "process", "macros", "fs"], default-features = false, optional = true }
tracing = "0.1"
which = "4.0"
futures-util = { version = "0.3", optional = true }

[dev-dependencies]
test-log = { version = "0.2", default-features = false, features = ["trace"] }
Expand All @@ -31,4 +32,4 @@ tracing-subscriber = { version = "0.3", default-features = false, features = ["e

[features]
default = []
tokio-process = ["tokio"]
tokio-process = ["tokio", "futures-util"]
44 changes: 34 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,19 @@ pub async fn new_default_process_async() -> TmpPostgrustResult<asynchronous::Pro
/// Will panic if a `TmpPostgrustFactory::try_new_async` returns an error the first time the function
/// is called.
#[cfg(feature = "tokio-process")]
pub async fn new_default_process_async_with_migrations(
migrate: impl Fn(&str) -> Result<(), Box<dyn std::error::Error + Send + Sync>>,
) -> TmpPostgrustResult<asynchronous::ProcessGuard> {
pub async fn new_default_process_async_with_migrations<F>(
migrate: F,
) -> TmpPostgrustResult<asynchronous::ProcessGuard>
where
F: for<'r> Fn(&'r str) -> futures_util::future::BoxFuture<'r, Result<(), Box<dyn std::error::Error + Send + Sync>>>,
{
let factory_mutex = TOKIO_POSTGRES_FACTORY
.get_or_try_init(|| async {
TmpPostgrustFactory::try_new_async().await.map(|factory| {
factory
.run_migrations(migrate)
.expect("Failed to run migrations.");
tokio::sync::Mutex::new(Some(factory))
})
let factory = TmpPostgrustFactory::try_new_async().await?;
factory
.run_migrations_async(migrate)
.await?;
Ok(tokio::sync::Mutex::new(Some(factory)))
})
.await?;
let guard = factory_mutex.lock().await;
Expand Down Expand Up @@ -294,7 +296,7 @@ impl TmpPostgrustFactory {
/// an error.
pub fn run_migrations(
&self,
migrate: impl FnOnce(&str) -> Result<(), Box<dyn std::error::Error + Send + Sync>>,
migrate: impl Fn(&str) -> Result<(), Box<dyn std::error::Error + Send + Sync>>,
) -> TmpPostgrustResult<()> {
let process = self.start_postgresql(&self.cache_dir)?;

Expand All @@ -303,6 +305,28 @@ impl TmpPostgrustFactory {
Ok(())
}

/// Run migrations against the cache directory, will cause all subsequent instances
/// to be run against a version of the database where the migrations have been applied.
///
/// # Errors
///
/// Will error if Postgresql is unable to start or if the migrate function returns
/// an error.
#[cfg(feature = "tokio-process")]
pub async fn run_migrations_async<F>(
&self,
migrate: F,
) -> TmpPostgrustResult<()>
where
F: for<'r> Fn(&'r str) -> futures_util::future::BoxFuture<'r, Result<(), Box<dyn std::error::Error + Send + Sync>>>,
{
let process = self.start_postgresql(&self.cache_dir)?;

migrate(&process.connection_string()).await.map_err(TmpPostgrustError::MigrationsFailed)?;

Ok(())
}

/// Start a new postgresql instance and return a process guard that will ensure it is cleaned
/// up when dropped.
#[instrument(skip(self))]
Expand Down