diff --git a/parquet/src/bin/parquet-concat.rs b/parquet/src/bin/parquet-concat.rs index e8ce4ca1dbed..a6f1aef78110 100644 --- a/parquet/src/bin/parquet-concat.rs +++ b/parquet/src/bin/parquet-concat.rs @@ -37,10 +37,12 @@ //! use clap::Parser; +use parquet::bloom_filter::Sbbf; use parquet::column::writer::ColumnCloseResult; use parquet::errors::{ParquetError, Result}; -use parquet::file::metadata::ParquetMetaDataReader; +use parquet::file::metadata::{ColumnChunkMetaData, PageIndexPolicy, ParquetMetaDataReader}; use parquet::file::properties::WriterProperties; +use parquet::file::reader::ChunkReader; use parquet::file::writer::SerializedFileWriter; use std::fs::File; use std::sync::Arc; @@ -56,6 +58,10 @@ struct Args { input: Vec, } +fn read_bloom_filter(column: &ColumnChunkMetaData, input: &R) -> Option { + Sbbf::read_from_column_chunk(column, input).ok().flatten() +} + impl Args { fn run(&self) -> Result<()> { if self.input.is_empty() { @@ -71,7 +77,10 @@ impl Args { .iter() .map(|x| { let reader = File::open(x)?; - let metadata = ParquetMetaDataReader::new().parse_and_finish(&reader)?; + // Enable reading page indexes if present + let metadata = ParquetMetaDataReader::new() + .with_page_index_policy(PageIndexPolicy::Optional) + .parse_and_finish(&reader)?; Ok((reader, metadata)) }) .collect::>>()?; @@ -91,16 +100,26 @@ impl Args { let mut writer = SerializedFileWriter::new(output, schema, props)?; for (input, metadata) in inputs { - for rg in metadata.row_groups() { + let column_indexes = metadata.column_index(); + let offset_indexes = metadata.offset_index(); + + for (rg_idx, rg) in metadata.row_groups().iter().enumerate() { + let rg_column_indexes = column_indexes.and_then(|ci| ci.get(rg_idx)); + let rg_offset_indexes = offset_indexes.and_then(|oi| oi.get(rg_idx)); let mut rg_out = writer.next_row_group()?; - for column in rg.columns() { + for (col_idx, column) in rg.columns().iter().enumerate() { + let bloom_filter = read_bloom_filter(column, &input); + let column_index = rg_column_indexes.and_then(|row| row.get(col_idx)).cloned(); + + let offset_index = rg_offset_indexes.and_then(|row| row.get(col_idx)).cloned(); + let result = ColumnCloseResult { bytes_written: column.compressed_size() as _, rows_written: rg.num_rows() as _, metadata: column.clone(), - bloom_filter: None, - column_index: None, - offset_index: None, + bloom_filter, + column_index, + offset_index, }; rg_out.append_column(&input, result)?; } diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 290a887b2960..364928d366aa 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -333,7 +333,7 @@ impl Sbbf { } /// Read a new bloom filter from the given offset in the given reader. - pub(crate) fn read_from_column_chunk( + pub fn read_from_column_chunk( column_metadata: &ColumnChunkMetaData, reader: &R, ) -> Result, ParquetError> {