Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions parquet/src/bin/parquet-concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
//!

use clap::Parser;
use parquet::bloom_filter::Sbbf;
use parquet::column::writer::ColumnCloseResult;
use parquet::errors::{ParquetError, Result};
use parquet::file::metadata::ParquetMetaDataReader;
use parquet::file::metadata::{ColumnChunkMetaData, PageIndexPolicy, ParquetMetaDataReader};
use parquet::file::properties::WriterProperties;
use parquet::file::reader::ChunkReader;
use parquet::file::writer::SerializedFileWriter;
use std::fs::File;
use std::sync::Arc;
Expand All @@ -56,6 +58,10 @@ struct Args {
input: Vec<String>,
}

fn read_bloom_filter<R: ChunkReader>(column: &ColumnChunkMetaData, input: &R) -> Option<Sbbf> {
Sbbf::read_from_column_chunk(column, input).ok().flatten()
}

impl Args {
fn run(&self) -> Result<()> {
if self.input.is_empty() {
Expand All @@ -71,7 +77,10 @@ impl Args {
.iter()
.map(|x| {
let reader = File::open(x)?;
let metadata = ParquetMetaDataReader::new().parse_and_finish(&reader)?;
// Enable reading page indexes if present
let metadata = ParquetMetaDataReader::new()
.with_page_index_policy(PageIndexPolicy::Optional)
.parse_and_finish(&reader)?;
Ok((reader, metadata))
})
.collect::<Result<Vec<_>>>()?;
Expand All @@ -91,16 +100,26 @@ impl Args {
let mut writer = SerializedFileWriter::new(output, schema, props)?;

for (input, metadata) in inputs {
for rg in metadata.row_groups() {
let column_indexes = metadata.column_index();
let offset_indexes = metadata.offset_index();

for (rg_idx, rg) in metadata.row_groups().iter().enumerate() {
let rg_column_indexes = column_indexes.and_then(|ci| ci.get(rg_idx));
let rg_offset_indexes = offset_indexes.and_then(|oi| oi.get(rg_idx));
let mut rg_out = writer.next_row_group()?;
for column in rg.columns() {
for (col_idx, column) in rg.columns().iter().enumerate() {
let bloom_filter = read_bloom_filter(column, &input);
let column_index = rg_column_indexes.and_then(|row| row.get(col_idx)).cloned();

let offset_index = rg_offset_indexes.and_then(|row| row.get(col_idx)).cloned();

let result = ColumnCloseResult {
bytes_written: column.compressed_size() as _,
rows_written: rg.num_rows() as _,
metadata: column.clone(),
bloom_filter: None,
column_index: None,
offset_index: None,
bloom_filter,
column_index,
offset_index,
};
rg_out.append_column(&input, result)?;
}
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/bloom_filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ impl Sbbf {
}

/// Read a new bloom filter from the given offset in the given reader.
pub(crate) fn read_from_column_chunk<R: ChunkReader>(
pub fn read_from_column_chunk<R: ChunkReader>(
column_metadata: &ColumnChunkMetaData,
reader: &R,
) -> Result<Option<Self>, ParquetError> {
Expand Down
Loading