Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 3 additions & 55 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,10 +443,7 @@ mod tests {
use super::*;
use datafusion::{
common::test_util::batches_to_string,
execution::cache::{
DefaultListFilesCache, cache_manager::CacheManagerConfig,
cache_unit::DefaultFileStatisticsCache,
},
execution::cache::{DefaultListFilesCache, cache_manager::CacheManagerConfig},
prelude::{ParquetReadOptions, col, lit, split_part},
};
use insta::assert_snapshot;
Expand Down Expand Up @@ -647,9 +644,9 @@ mod tests {
+-----------------------------------+-----------------+---------------------+------+------------------+
| filename | file_size_bytes | metadata_size_bytes | hits | extra |
+-----------------------------------+-----------------+---------------------+------+------------------+
| alltypes_plain.parquet | 1851 | 8882 | 5 | page_index=false |
| alltypes_plain.parquet | 1851 | 8882 | 8 | page_index=false |
| alltypes_tiny_pages.parquet | 454233 | 269266 | 2 | page_index=true |
| lz4_raw_compressed_larger.parquet | 380836 | 1347 | 3 | page_index=false |
| lz4_raw_compressed_larger.parquet | 380836 | 1347 | 4 | page_index=false |
+-----------------------------------+-----------------+---------------------+------+------------------+
");

Expand Down Expand Up @@ -689,55 +686,6 @@ mod tests {

// When the cache manager creates a StatisticsCache by default,
// the contents will show up here
let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename";
let df = ctx.sql(sql).await?;
let rbs = df.collect().await?;
assert_snapshot!(batches_to_string(&rbs),@r"
++
++
");

Ok(())
}

// Can be removed when https://github.com/apache/datafusion/issues/19217 is resolved
#[tokio::test]
async fn test_statistics_cache_override() -> Result<(), DataFusionError> {
// Install a specific StatisticsCache implementation
let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default());
let cache_config = CacheManagerConfig::default()
.with_files_statistics_cache(Some(file_statistics_cache.clone()));
let runtime = RuntimeEnvBuilder::new()
.with_cache_manager(cache_config)
.build()?;
let config = SessionConfig::new().with_collect_statistics(true);
let ctx = SessionContext::new_with_config_rt(config, Arc::new(runtime));

ctx.register_udtf(
"statistics_cache",
Arc::new(StatisticsCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
);

for filename in [
"alltypes_plain",
"alltypes_tiny_pages",
"lz4_raw_compressed_larger",
] {
ctx.sql(
format!(
"create external table {filename}
stored as parquet
location '../parquet-testing/data/{filename}.parquet'",
)
.as_str(),
)
.await?
.collect()
.await?;
}

let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename";
let df = ctx.sql(sql).await?;
let rbs = df.collect().await?;
Expand Down
30 changes: 15 additions & 15 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use datafusion_datasource::{
};
use datafusion_execution::cache::TableScopedPath;
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
Expand Down Expand Up @@ -188,7 +187,7 @@ pub struct ListingTable {
/// The SQL definition for this table, if any
definition: Option<String>,
/// Cache for collected file statistics
collected_statistics: Arc<dyn FileStatisticsCache>,
collected_statistics: Option<Arc<dyn FileStatisticsCache>>,
/// Constraints applied to this table
constraints: Constraints,
/// Column default expressions for columns that are not physically present in the data files
Expand Down Expand Up @@ -232,7 +231,7 @@ impl ListingTable {
schema_source,
options,
definition: None,
collected_statistics: Arc::new(DefaultFileStatisticsCache::default()),
collected_statistics: None,
constraints: Constraints::default(),
column_defaults: HashMap::new(),
expr_adapter_factory: config.expr_adapter_factory,
Expand Down Expand Up @@ -261,10 +260,8 @@ impl ListingTable {
/// Setting a statistics cache on the `SessionContext` can avoid refetching statistics
/// multiple times in the same session.
///
/// If `None`, creates a new [`DefaultFileStatisticsCache`] scoped to this query.
pub fn with_cache(mut self, cache: Option<Arc<dyn FileStatisticsCache>>) -> Self {
self.collected_statistics =
cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default()));
self.collected_statistics = cache;
self
}

Expand Down Expand Up @@ -810,7 +807,8 @@ impl ListingTable {
let meta = &part_file.object_meta;

// Check cache first - if we have valid cached statistics and ordering
if let Some(cached) = self.collected_statistics.get(path)
if let Some(cache) = &self.collected_statistics
&& let Some(cached) = cache.get(path)
&& cached.is_valid_for(meta)
{
// Return cached statistics and ordering
Expand All @@ -827,14 +825,16 @@ impl ListingTable {
let statistics = Arc::new(file_meta.statistics);

// Store in cache
self.collected_statistics.put(
path,
CachedFileMetadata::new(
meta.clone(),
Arc::clone(&statistics),
file_meta.ordering.clone(),
),
);
if let Some(cache) = &self.collected_statistics {
cache.put(
path,
CachedFileMetadata::new(
meta.clone(),
Arc::clone(&statistics),
file_meta.ordering.clone(),
),
);
}

Ok((statistics, file_meta.ordering))
}
Expand Down
Loading