diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 0d2ffa5361f6..5ce43382fe43 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -260,5 +260,10 @@ name = "row_selector" harness = false required-features = ["arrow"] +[[bench]] +name = "row_selection_state" +harness = false +required-features = ["arrow"] + [lib] bench = false diff --git a/parquet/benches/row_selection_state.rs b/parquet/benches/row_selection_state.rs new file mode 100644 index 000000000000..893f1c382cc8 --- /dev/null +++ b/parquet/benches/row_selection_state.rs @@ -0,0 +1,506 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::hint; +use std::sync::Arc; + +use arrow_array::builder::StringViewBuilder; +use arrow_array::{ArrayRef, Float64Array, Int32Array, RecordBatch, StringViewArray}; +use arrow_schema::{DataType, Field, Schema}; +use bytes::Bytes; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use parquet::arrow::ArrowWriter; +use parquet::arrow::arrow_reader::{ + ParquetRecordBatchReaderBuilder, RowSelection, RowSelectionStrategy, RowSelector, +}; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; + +const TOTAL_ROWS: usize = 1 << 20; +const BATCH_SIZE: usize = 1 << 10; +const BASE_SEED: u64 = 0xA55AA55A; +const AVG_SELECTOR_LENGTHS: &[usize] = &[4, 8, 12, 16, 20, 24, 28, 32, 36, 40]; +const COLUMN_WIDTHS: &[usize] = &[2, 4, 8, 16, 32]; +const UTF8VIEW_LENS: &[usize] = &[4, 8, 16, 32, 64, 128, 256]; +const BENCH_MODES: &[BenchMode] = &[BenchMode::ReadSelector, BenchMode::ReadMask]; + +struct DataProfile { + name: &'static str, + build_batch: fn(usize) -> RecordBatch, +} + +const DATA_PROFILES: &[DataProfile] = &[ + DataProfile { + name: "int32", + build_batch: build_int32_batch, + }, + DataProfile { + name: "float64", + build_batch: build_float64_batch, + }, + DataProfile { + name: "utf8view", + build_batch: build_utf8view_batch, + }, +]; + +fn criterion_benchmark(c: &mut Criterion) { + let scenarios = [ + /* uniform50 (50% selected, constant run lengths, starts with skip) + ```text + ┌───────────────┐ + │ │ skip + │ │ + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + │ │ skip + │ │ + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + │ ... │ + └───────────────┘ + ``` */ + Scenario { + name: "uniform50", + select_ratio: 0.5, + start_with_select: false, + distribution: RunDistribution::Constant, + }, + /* spread50 (50% selected, large jitter in run lengths, starts with skip) + ```text + ┌───────────────┐ + │ │ skip (long) + │ │ + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select (short) + │ │ skip (short) + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select (long) + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + │ │ skip (medium) + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select (medium) + │ ... │ + └───────────────┘ + ``` */ + Scenario { + name: "spread50", + select_ratio: 0.5, + start_with_select: false, + distribution: RunDistribution::Uniform { spread: 0.9 }, + }, + /* sparse20 (20% selected, bimodal: occasional long runs, starts with skip) + ```text + ┌───────────────┐ + │ │ skip (long) + │ │ + │ │ + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select (short) + │ │ skip (long) + │ │ + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select (occasional long) + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + │ ... │ + └───────────────┘ + ``` */ + Scenario { + name: "sparse20", + select_ratio: 0.2, + start_with_select: false, + distribution: RunDistribution::Bimodal { + long_factor: 6.0, + long_prob: 0.1, + }, + }, + /* dense80 (80% selected, bimodal: occasional long runs, starts with select) + ```text + ┌───────────────┐ + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select (long) + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + │ │ skip (short) + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select (long) + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + │ │ skip (very short) + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select (long) + │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + │ ... │ + └───────────────┘ + ``` */ + Scenario { + name: "dense80", + select_ratio: 0.8, + start_with_select: true, + distribution: RunDistribution::Bimodal { + long_factor: 4.0, + long_prob: 0.05, + }, + }, + ]; + + let base_parquet = build_parquet_data(TOTAL_ROWS, build_int32_batch); + let base_scenario = &scenarios[0]; + + for (idx, scenario) in scenarios.iter().enumerate() { + // The first scenario is a special case for backwards compatibility with + // existing benchmark result formats. + let suite = if idx == 0 { "len" } else { "scenario" }; + bench_over_lengths( + c, + suite, + scenario.name, + &base_parquet, + scenario, + BASE_SEED ^ ((idx as u64) << 16), + ); + } + + for (profile_idx, profile) in DATA_PROFILES.iter().enumerate() { + let parquet_data = build_parquet_data(TOTAL_ROWS, profile.build_batch); + bench_over_lengths( + c, + "dtype", + profile.name, + &parquet_data, + base_scenario, + BASE_SEED ^ ((profile_idx as u64) << 24), + ); + } + + for (offset, &column_count) in COLUMN_WIDTHS.iter().enumerate() { + let parquet_data = write_parquet_batch(build_int32_columns_batch(TOTAL_ROWS, column_count)); + let variant_label = format!("C{:02}", column_count); + bench_over_lengths( + c, + "columns", + &variant_label, + &parquet_data, + base_scenario, + BASE_SEED ^ ((offset as u64) << 32), + ); + } + + for (offset, &len) in UTF8VIEW_LENS.iter().enumerate() { + let batch = build_utf8view_batch_with_len(TOTAL_ROWS, len); + let parquet_data = write_parquet_batch(batch); + let variant_label = format!("utf8view-L{:03}", len); + bench_over_lengths( + c, + "utf8view-len", + &variant_label, + &parquet_data, + base_scenario, + BASE_SEED ^ ((offset as u64) << 40), + ); + } +} + +fn bench_over_lengths( + c: &mut Criterion, + suite: &str, + variant: &str, + parquet_data: &Bytes, + scenario: &Scenario, + seed_base: u64, +) { + for (offset, &avg_len) in AVG_SELECTOR_LENGTHS.iter().enumerate() { + let selectors = + generate_selectors(avg_len, TOTAL_ROWS, scenario, seed_base + offset as u64); + let stats = SelectorStats::new(&selectors); + let selection = RowSelection::from(selectors); + let suffix = format!( + "{}-{}-{}-L{:02}-avg{:.1}-sel{:02}", + suite, + scenario.name, + variant, + avg_len, + stats.average_selector_len, + (stats.select_ratio * 100.0).round() as u32 + ); + + let bench_input = BenchInput { + parquet_data: parquet_data.clone(), + selection, + }; + + for &mode in BENCH_MODES { + c.bench_with_input( + BenchmarkId::new(mode.label(), &suffix), + &bench_input, + |b, input| { + b.iter(|| { + let total = + run_read(&input.parquet_data, &input.selection, mode.strategy()); + hint::black_box(total); + }); + }, + ); + } + } +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); + +struct BenchInput { + parquet_data: Bytes, + selection: RowSelection, +} + +fn run_read( + parquet_data: &Bytes, + selection: &RowSelection, + strategy: RowSelectionStrategy, +) -> usize { + let reader = ParquetRecordBatchReaderBuilder::try_new(parquet_data.clone()) + .unwrap() + .with_batch_size(BATCH_SIZE) + .with_row_selection(selection.clone()) + .with_row_selection_strategy(strategy) + .build() + .unwrap(); + + let mut total_rows = 0usize; + for batch in reader { + let batch = batch.unwrap(); + total_rows += batch.num_rows(); + } + total_rows +} + +fn build_parquet_data(total_rows: usize, build_batch: fn(usize) -> RecordBatch) -> Bytes { + let batch = build_batch(total_rows); + write_parquet_batch(batch) +} + +fn build_single_column_batch(data_type: DataType, array: ArrayRef) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new("value", data_type, false)])); + RecordBatch::try_new(schema, vec![array]).unwrap() +} + +fn build_int32_batch(total_rows: usize) -> RecordBatch { + let values = Int32Array::from_iter_values((0..total_rows).map(|v| v as i32)); + build_single_column_batch(DataType::Int32, Arc::new(values) as ArrayRef) +} + +fn build_float64_batch(total_rows: usize) -> RecordBatch { + let values = Float64Array::from_iter_values((0..total_rows).map(|v| v as f64)); + build_single_column_batch(DataType::Float64, Arc::new(values) as ArrayRef) +} + +fn build_utf8view_batch(total_rows: usize) -> RecordBatch { + let mut builder = StringViewBuilder::new(); + // Mix short and long values. + for i in 0..total_rows { + match i % 5 { + 0 => builder.append_value("alpha"), + 1 => builder.append_value("beta"), + 2 => builder.append_value("gamma"), + 3 => builder.append_value("delta"), + _ => builder.append_value("a longer utf8 string payload to test view storage"), + } + } + let values: StringViewArray = builder.finish(); + build_single_column_batch(DataType::Utf8View, Arc::new(values) as ArrayRef) +} + +fn build_utf8view_batch_with_len(total_rows: usize, len: usize) -> RecordBatch { + let mut builder = StringViewBuilder::new(); + let value: String = "a".repeat(len); + for _ in 0..total_rows { + builder.append_value(&value); + } + let values: StringViewArray = builder.finish(); + build_single_column_batch(DataType::Utf8View, Arc::new(values) as ArrayRef) +} + +fn build_int32_columns_batch(total_rows: usize, num_columns: usize) -> RecordBatch { + let base_values: ArrayRef = Arc::new(Int32Array::from_iter_values( + (0..total_rows).map(|v| v as i32), + )); + let mut fields = Vec::with_capacity(num_columns); + let mut columns = Vec::with_capacity(num_columns); + for idx in 0..num_columns { + fields.push(Field::new(format!("value{}", idx), DataType::Int32, false)); + columns.push(base_values.clone()); + } + let schema = Arc::new(Schema::new(fields)); + RecordBatch::try_new(schema, columns).unwrap() +} + +fn write_parquet_batch(batch: RecordBatch) -> Bytes { + let schema = batch.schema(); + let mut writer = ArrowWriter::try_new(Vec::new(), schema.clone(), None).unwrap(); + writer.write(&batch).unwrap(); + let buffer = writer.into_inner().unwrap(); + Bytes::from(buffer) +} + +#[derive(Clone)] +struct Scenario { + name: &'static str, + select_ratio: f64, + start_with_select: bool, + distribution: RunDistribution, +} + +#[derive(Clone)] +enum RunDistribution { + Constant, + Uniform { spread: f64 }, + Bimodal { long_factor: f64, long_prob: f64 }, +} + +fn generate_selectors( + avg_selector_len: usize, + total_rows: usize, + scenario: &Scenario, + seed: u64, +) -> Vec { + assert!( + (0.0..=1.0).contains(&scenario.select_ratio), + "select_ratio must be in [0, 1]" + ); + + let mut select_mean = scenario.select_ratio * 2.0 * avg_selector_len as f64; + let mut skip_mean = (1.0 - scenario.select_ratio) * 2.0 * avg_selector_len as f64; + + select_mean = select_mean.max(1.0); + skip_mean = skip_mean.max(1.0); + + let sum = select_mean + skip_mean; + // Rebalance the sampled select/skip run lengths so their sum matches the requested + // average selector length while respecting the configured selectivity ratio. + let scale = if sum == 0.0 { + 1.0 + } else { + (2.0 * avg_selector_len as f64) / sum + }; + select_mean *= scale; + skip_mean *= scale; + + let mut rng = StdRng::seed_from_u64(seed ^ (avg_selector_len as u64).wrapping_mul(0x9E3779B1)); + let mut selectors = Vec::with_capacity(total_rows / avg_selector_len.max(1)); + let mut remaining = total_rows; + let mut is_select = scenario.start_with_select; + + while remaining > 0 { + let mean = if is_select { select_mean } else { skip_mean }; + let len = sample_length(mean, &scenario.distribution, &mut rng).max(1); + let len = len.min(remaining); + selectors.push(if is_select { + RowSelector::select(len) + } else { + RowSelector::skip(len) + }); + remaining -= len; + if remaining == 0 { + break; + } + is_select = !is_select; + } + + let selection: RowSelection = selectors.into(); + selection.into() +} + +fn sample_length(mean: f64, distribution: &RunDistribution, rng: &mut StdRng) -> usize { + match distribution { + RunDistribution::Constant => mean.round().max(1.0) as usize, + RunDistribution::Uniform { spread } => { + let spread = spread.clamp(0.0, 0.99); + let lower = (mean * (1.0 - spread)).max(1.0); + let upper = (mean * (1.0 + spread)).max(lower + f64::EPSILON); + if (upper - lower) < 1.0 { + lower.round().max(1.0) as usize + } else { + let low = lower.floor() as usize; + let high = upper.ceil() as usize; + rng.random_range(low..=high).max(1) + } + } + RunDistribution::Bimodal { + long_factor, + long_prob, + } => { + let long_prob = long_prob.clamp(0.0, 0.5); + let short_prob = 1.0 - long_prob; + let short_factor = if short_prob == 0.0 { + 1.0 / long_factor.max(f64::EPSILON) + } else { + (1.0 - long_prob * long_factor).max(0.0) / short_prob + }; + let use_long = rng.random_bool(long_prob); + let factor = if use_long { + *long_factor + } else { + short_factor.max(0.1) + }; + (mean * factor).round().max(1.0) as usize + } + } +} + +#[derive(Clone, Copy)] +enum BenchMode { + ReadSelector, + ReadMask, +} + +impl BenchMode { + fn label(self) -> &'static str { + match self { + BenchMode::ReadSelector => "read_selector", + BenchMode::ReadMask => "read_mask", + } + } + + fn strategy(self) -> RowSelectionStrategy { + match self { + BenchMode::ReadSelector => RowSelectionStrategy::Selectors, + BenchMode::ReadMask => RowSelectionStrategy::Mask, + } + } +} + +struct SelectorStats { + average_selector_len: f64, + select_ratio: f64, +} + +impl SelectorStats { + fn new(selectors: &[RowSelector]) -> Self { + if selectors.is_empty() { + return Self { + average_selector_len: 0.0, + select_ratio: 0.0, + }; + } + + let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); + let selected_rows: usize = selectors + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + + Self { + average_selector_len: total_rows as f64 / selectors.len() as f64, + select_ratio: if total_rows == 0 { + 0.0 + } else { + selected_rows as f64 / total_rows as f64 + }, + } + } +} diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 4e12c55c9f33..bbd994c29c01 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -17,12 +17,12 @@ //! Contains reader which reads parquet data into arrow [`RecordBatch`] -use arrow_array::Array; use arrow_array::cast::AsArray; -use arrow_array::{RecordBatch, RecordBatchReader}; +use arrow_array::{Array, RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef}; +use arrow_select::filter::filter_record_batch; pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; -pub use selection::{RowSelection, RowSelector}; +pub use selection::{RowSelection, RowSelectionCursor, RowSelectionStrategy, RowSelector}; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -43,6 +43,7 @@ use crate::file::reader::{ChunkReader, SerializedPageReader}; use crate::schema::types::SchemaDescriptor; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; +// Exposed so integration tests and benchmarks can temporarily override the threshold. pub use read_plan::{ReadPlan, ReadPlanBuilder}; mod filter; @@ -124,6 +125,8 @@ pub struct ArrowReaderBuilder { pub(crate) selection: Option, + pub(crate) selection_strategy: RowSelectionStrategy, + pub(crate) limit: Option, pub(crate) offset: Option, @@ -145,6 +148,7 @@ impl Debug for ArrowReaderBuilder { .field("projection", &self.projection) .field("filter", &self.filter) .field("selection", &self.selection) + .field("selection_strategy", &self.selection_strategy) .field("limit", &self.limit) .field("offset", &self.offset) .field("metrics", &self.metrics) @@ -164,6 +168,7 @@ impl ArrowReaderBuilder { projection: ProjectionMask::all(), filter: None, selection: None, + selection_strategy: RowSelectionStrategy::default(), limit: None, offset: None, metrics: ArrowReaderMetrics::Disabled, @@ -212,6 +217,14 @@ impl ArrowReaderBuilder { } } + /// Configure how row selections should be materialised during execution + pub fn with_row_selection_strategy(self, strategy: RowSelectionStrategy) -> Self { + Self { + selection_strategy: strategy, + ..self + } + } + /// Provide a [`RowSelection`] to filter out rows, and avoid fetching their /// data into memory. /// @@ -886,11 +899,12 @@ impl ParquetRecordBatchReaderBuilder { metadata, schema: _, fields, - batch_size: _, + batch_size, row_groups, projection, mut filter, selection, + selection_strategy, limit, offset, metrics, @@ -899,9 +913,7 @@ impl ParquetRecordBatchReaderBuilder { } = self; // Try to avoid allocate large buffer - let batch_size = self - .batch_size - .min(metadata.file_metadata().num_rows() as usize); + let batch_size = batch_size.min(metadata.file_metadata().num_rows() as usize); let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect()); @@ -911,7 +923,9 @@ impl ParquetRecordBatchReaderBuilder { row_groups, }; - let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection); + let mut plan_builder = ReadPlanBuilder::new(batch_size) + .with_selection(selection) + .with_selection_strategy(selection_strategy); // Update selection based on any filters if let Some(filter) = filter.as_mut() { @@ -934,12 +948,16 @@ impl ParquetRecordBatchReaderBuilder { let array_reader = ArrayReaderBuilder::new(&reader, &metrics) .build_array_reader(fields.as_deref(), &projection)?; - let read_plan = plan_builder + let mut plan_builder = plan_builder .limited(reader.num_rows()) .with_offset(offset) .with_limit(limit) - .build_limited() - .build(); + .build_limited(); + + let preferred_strategy = plan_builder.preferred_selection_strategy(); + plan_builder = plan_builder.with_selection_strategy(preferred_strategy); + + let read_plan = plan_builder.build(); Ok(ParquetRecordBatchReader::new(array_reader, read_plan)) } @@ -1059,9 +1077,82 @@ impl ParquetRecordBatchReader { let mut read_records = 0; let batch_size = self.batch_size(); match self.read_plan.selection_mut() { - Some(selection) => { - while read_records < batch_size && !selection.is_empty() { - let front = selection.pop_front().unwrap(); + Some(selection_cursor) => { + if selection_cursor.is_mask_backed() { + // Stream the record batch reader using contiguous segments of the selection + // mask, avoiding the need to materialize intermediate `RowSelector` ranges. + while !selection_cursor.is_empty() { + let Some(mask_chunk) = selection_cursor.next_mask_chunk(batch_size) else { + return Ok(None); + }; + + if mask_chunk.initial_skip > 0 { + let skipped = + self.array_reader.skip_records(mask_chunk.initial_skip)?; + if skipped != mask_chunk.initial_skip { + return Err(general_err!( + "failed to skip rows, expected {}, got {}", + mask_chunk.initial_skip, + skipped + )); + } + } + + if mask_chunk.chunk_rows == 0 { + if selection_cursor.is_empty() && mask_chunk.selected_rows == 0 { + return Ok(None); + } + continue; + } + + let mask = selection_cursor + .mask_values_for(&mask_chunk) + .ok_or_else(|| general_err!("row selection mask out of bounds"))?; + + let read = self.array_reader.read_records(mask_chunk.chunk_rows)?; + if read == 0 { + return Err(general_err!( + "reached end of column while expecting {} rows", + mask_chunk.chunk_rows + )); + } + if read != mask_chunk.chunk_rows { + return Err(general_err!( + "insufficient rows read from array reader - expected {}, got {}", + mask_chunk.chunk_rows, + read + )); + } + + let array = self.array_reader.consume_batch()?; + // The column reader exposes the projection as a struct array; convert this + // into a record batch before applying the boolean filter mask. + let struct_array = array.as_struct_opt().ok_or_else(|| { + ArrowError::ParquetError( + "Struct array reader should return struct array".to_string(), + ) + })?; + + let filtered_batch = + filter_record_batch(&RecordBatch::from(struct_array), &mask)?; + + if filtered_batch.num_rows() != mask_chunk.selected_rows { + return Err(general_err!( + "filtered rows mismatch selection - expected {}, got {}", + mask_chunk.selected_rows, + filtered_batch.num_rows() + )); + } + + if filtered_batch.num_rows() == 0 { + continue; + } + + return Ok(Some(filtered_batch)); + } + } + while read_records < batch_size && !selection_cursor.is_empty() { + let front = selection_cursor.next_selector(); if front.skip { let skipped = self.array_reader.skip_records(front.row_count)?; @@ -1087,7 +1178,7 @@ impl ParquetRecordBatchReader { Some(remaining) if remaining != 0 => { // if page row count less than batch_size we must set batch size to page row count. // add check avoid dead loop - selection.push_front(RowSelector::select(remaining)); + selection_cursor.return_selector(RowSelector::select(remaining)); need_read } _ => front.row_count, @@ -1194,26 +1285,6 @@ mod tests { use std::path::PathBuf; use std::sync::Arc; - use arrow_array::builder::*; - use arrow_array::cast::AsArray; - use arrow_array::types::{ - Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type, - DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType, - Time64MicrosecondType, - }; - use arrow_array::*; - use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256}; - use arrow_data::{ArrayData, ArrayDataBuilder}; - use arrow_schema::{ - ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit, - }; - use arrow_select::concat::concat_batches; - use bytes::Bytes; - use half::f16; - use num_traits::PrimInt; - use rand::{Rng, RngCore, rng}; - use tempfile::tempfile; - use crate::arrow::arrow_reader::{ ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector, @@ -1233,6 +1304,27 @@ mod tests { use crate::schema::parser::parse_message_type; use crate::schema::types::{Type, TypePtr}; use crate::util::test_common::rand_gen::RandGen; + use arrow::compute::kernels::cmp::eq; + use arrow::compute::or; + use arrow_array::builder::*; + use arrow_array::cast::AsArray; + use arrow_array::types::{ + Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type, + DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType, + Time64MicrosecondType, + }; + use arrow_array::*; + use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256}; + use arrow_data::{ArrayData, ArrayDataBuilder}; + use arrow_schema::{ + ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit, + }; + use arrow_select::concat::concat_batches; + use bytes::Bytes; + use half::f16; + use num_traits::PrimInt; + use rand::{Rng, RngCore, rng}; + use tempfile::tempfile; #[test] fn test_arrow_reader_all_columns() { @@ -4613,6 +4705,93 @@ mod tests { assert_eq!(out, batch.slice(2, 1)); } + #[test] + fn test_row_selection_interleaved_skip() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new( + "v", + ArrowDataType::Int32, + false, + )])); + + let values = Int32Array::from(vec![0, 1, 2, 3, 4]); + let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as ArrayRef)]).unwrap(); + + let mut buffer = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None).unwrap(); + writer.write(&batch)?; + writer.close()?; + + let selection = RowSelection::from(vec![ + RowSelector::select(1), + RowSelector::skip(2), + RowSelector::select(2), + ]); + + let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))? + .with_batch_size(4) + .with_row_selection(selection) + .build()?; + + let out = reader.next().unwrap()?; + assert_eq!(out.num_rows(), 3); + let values = out + .column(0) + .as_primitive::() + .values(); + assert_eq!(values, &[0, 3, 4]); + assert!(reader.next().is_none()); + Ok(()) + } + + #[test] + fn test_row_selection_mask_sparse_rows() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new( + "v", + ArrowDataType::Int32, + false, + )])); + + let values = Int32Array::from((0..30).collect::>()); + let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as ArrayRef)])?; + + let mut buffer = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None)?; + writer.write(&batch)?; + writer.close()?; + + let total_rows = batch.num_rows(); + let ranges = (1..total_rows) + .step_by(2) + .map(|i| i..i + 1) + .collect::>(); + let selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), total_rows); + + let selectors: Vec = selection.clone().into(); + assert!(total_rows < selectors.len() * 8); + + let bytes = Bytes::from(buffer); + + let reader = ParquetRecordBatchReaderBuilder::try_new(bytes.clone())? + .with_batch_size(7) + .with_row_selection(selection) + .build()?; + + let mut collected = Vec::new(); + for batch in reader { + let batch = batch?; + collected.extend_from_slice( + batch + .column(0) + .as_primitive::() + .values(), + ); + } + + let expected: Vec = (1..total_rows).step_by(2).map(|i| i as i32).collect(); + assert_eq!(collected, expected); + Ok(()) + } + fn test_decimal32_roundtrip() { let d = |values: Vec, p: u8| { let iter = values.into_iter(); @@ -4995,6 +5174,73 @@ mod tests { c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r)); } + #[test] + fn test_row_filter_full_page_skip_is_handled() { + let first_value: i64 = 1111; + let last_value: i64 = 9999; + let num_rows: usize = 12; + + // build data with row selection average length 4 + // The result would be (1111 XXXX) ... (4 page in the middle)... (XXXX 9999) + // The Row Selection would be [1111, (skip 10), 9999] + let schema = Arc::new(Schema::new(vec![ + Field::new("key", arrow_schema::DataType::Int64, false), + Field::new("value", arrow_schema::DataType::Int64, false), + ])); + + let mut int_values: Vec = (0..num_rows as i64).collect(); + int_values[0] = first_value; + int_values[num_rows - 1] = last_value; + let keys = Int64Array::from(int_values.clone()); + let values = Int64Array::from(int_values.clone()); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef], + ) + .unwrap(); + + let props = WriterProperties::builder() + .set_write_batch_size(2) + .set_data_page_row_count_limit(2) + .build(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let data = Bytes::from(buffer); + + let options = ArrowReaderOptions::new().with_page_index(true); + let builder = + ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options).unwrap(); + let schema = builder.parquet_schema().clone(); + let filter_mask = ProjectionMask::leaves(&schema, [0]); + + let options = ArrowReaderOptions::new().with_page_index(true); + let predicate = ArrowPredicateFn::new(filter_mask, move |batch: RecordBatch| { + let column = batch.column(0); + let match_first = eq(column, &Int64Array::new_scalar(first_value))?; + let match_second = eq(column, &Int64Array::new_scalar(last_value))?; + or(&match_first, &match_second) + }); + + // The batch size is set to 12 to read all rows in one go after filtering + // If the Reader chooses mask to handle filter, it might cause panic because the mid 4 pages may not be decoded. + let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(data, options) + .unwrap() + .with_row_filter(RowFilter::new(vec![Box::new(predicate)])) + .with_batch_size(12) + .build() + .unwrap(); + + // Predicate pruning used to panic once mask-backed plans removed whole pages. + // Collecting into batches validates the plan now downgrades to selectors instead. + let schema = reader.schema().clone(); + let batches = reader.collect::, _>>().unwrap(); + let result = concat_batches(&schema, &batches).unwrap(); + assert_eq!(result.num_rows(), 2); + } + #[test] fn test_get_row_group_column_bloom_filter_with_length() { // convert to new parquet file with bloom_filter_length diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 2210f47df2c1..5b5191115f80 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -20,12 +20,12 @@ use crate::arrow::array_reader::ArrayReader; use crate::arrow::arrow_reader::{ - ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelector, + ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor, + RowSelectionStrategy, RowSelector, }; use crate::errors::{ParquetError, Result}; use arrow_array::Array; use arrow_select::filter::prep_null_mask_filter; -use std::collections::VecDeque; /// A builder for [`ReadPlan`] #[derive(Clone, Debug)] @@ -33,6 +33,8 @@ pub struct ReadPlanBuilder { batch_size: usize, /// Current to apply, includes all filters selection: Option, + /// Strategy to use when materialising the row selection + selection_strategy: RowSelectionStrategy, } impl ReadPlanBuilder { @@ -41,6 +43,7 @@ impl ReadPlanBuilder { Self { batch_size, selection: None, + selection_strategy: RowSelectionStrategy::default(), } } @@ -50,6 +53,12 @@ impl ReadPlanBuilder { self } + /// Configure the strategy to use when materialising the [`RowSelection`] + pub fn with_selection_strategy(mut self, strategy: RowSelectionStrategy) -> Self { + self.selection_strategy = strategy; + self + } + /// Returns the current selection, if any pub fn selection(&self) -> Option<&RowSelection> { self.selection.as_ref() @@ -79,6 +88,38 @@ impl ReadPlanBuilder { self.selection.as_ref().map(|s| s.row_count()) } + /// Returns the preferred [`RowSelectionStrategy`] for materialising the current selection. + pub fn preferred_selection_strategy(&self) -> RowSelectionStrategy { + match self.selection_strategy { + RowSelectionStrategy::Selectors => RowSelectionStrategy::Selectors, + RowSelectionStrategy::Mask => RowSelectionStrategy::Mask, + RowSelectionStrategy::Auto { threshold, .. } => { + let selection = match self.selection.as_ref() { + Some(selection) => selection, + None => return RowSelectionStrategy::Selectors, + }; + + let trimmed = selection.clone().trim(); + let selectors: Vec = trimmed.into(); + if selectors.is_empty() { + return RowSelectionStrategy::Mask; + } + + let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); + let selector_count = selectors.len(); + if selector_count == 0 { + return RowSelectionStrategy::Mask; + } + + if total_rows < selector_count.saturating_mul(threshold) { + RowSelectionStrategy::Mask + } else { + RowSelectionStrategy::Selectors + } + } + } + } + /// Evaluates an [`ArrowPredicate`], updating this plan's `selection` /// /// If the current `selection` is `Some`, the resulting [`RowSelection`] @@ -126,12 +167,21 @@ impl ReadPlanBuilder { if !self.selects_any() { self.selection = Some(RowSelection::from(vec![])); } + let selection_strategy = match self.selection_strategy { + RowSelectionStrategy::Auto { .. } => self.preferred_selection_strategy(), + strategy => strategy, + }; let Self { batch_size, selection, + selection_strategy: _, } = self; - let selection = selection.map(|s| s.trim().into()); + let selection = selection.map(|s| { + let trimmed = s.trim(); + let selectors: Vec = trimmed.into(); + RowSelectionCursor::new(selectors, selection_strategy) + }); ReadPlan { batch_size, @@ -233,12 +283,12 @@ pub struct ReadPlan { /// The number of rows to read in each batch batch_size: usize, /// Row ranges to be selected from the data source - selection: Option>, + selection: Option, } impl ReadPlan { /// Returns a mutable reference to the selection, if any - pub fn selection_mut(&mut self) -> Option<&mut VecDeque> { + pub fn selection_mut(&mut self) -> Option<&mut RowSelectionCursor> { self.selection.as_mut() } @@ -248,3 +298,36 @@ impl ReadPlan { self.batch_size } } + +#[cfg(test)] +mod tests { + use super::*; + + fn builder_with_selection(selection: RowSelection) -> ReadPlanBuilder { + ReadPlanBuilder::new(1024).with_selection(Some(selection)) + } + + #[test] + fn preferred_selection_strategy_prefers_mask_by_default() { + let selection = RowSelection::from(vec![RowSelector::select(8)]); + let builder = builder_with_selection(selection); + assert_eq!( + builder.preferred_selection_strategy(), + RowSelectionStrategy::Mask + ); + } + + #[test] + fn preferred_selection_strategy_prefers_selectors_when_threshold_small() { + let selection = RowSelection::from(vec![RowSelector::select(8)]); + let builder = + builder_with_selection(selection).with_selection_strategy(RowSelectionStrategy::Auto { + threshold: 1, + safe_strategy: true, + }); + assert_eq!( + builder.preferred_selection_strategy(), + RowSelectionStrategy::Selectors + ); + } +} diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 1eb7c85d1d88..ff975e2bc42d 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -16,12 +16,39 @@ // under the License. use arrow_array::{Array, BooleanArray}; +use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; use arrow_select::filter::SlicesIterator; use std::cmp::Ordering; use std::collections::VecDeque; use std::ops::Range; -use crate::file::page_index::offset_index::PageLocation; +use crate::arrow::ProjectionMask; +use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation}; + +/// Strategy for materialising [`RowSelection`] during execution. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum RowSelectionStrategy { + /// Use a queue of [`RowSelector`] values + Selectors, + /// Use a boolean mask to materialise the selection + Mask, + /// Choose between [`Self::Mask`] and [`Self::Selectors`] based on selector density + Auto { + /// Average selector length below which masks are preferred + threshold: usize, + /// Fallback to selectors when mask would be unsafe (e.g. page skipping) + safe_strategy: bool, + }, +} + +impl Default for RowSelectionStrategy { + fn default() -> Self { + Self::Auto { + threshold: 32, + safe_strategy: true, + } + } +} /// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when /// scanning a parquet file @@ -213,6 +240,39 @@ impl RowSelection { ranges } + /// Returns true if this selection would skip any data pages within the provided columns + fn selection_skips_any_page( + &self, + projection: &ProjectionMask, + columns: &[OffsetIndexMetaData], + ) -> bool { + columns.iter().enumerate().any(|(leaf_idx, column)| { + if !projection.leaf_included(leaf_idx) { + return false; + } + + let locations = column.page_locations(); + if locations.is_empty() { + return false; + } + + let ranges = self.scan_ranges(locations); + !ranges.is_empty() && ranges.len() < locations.len() + }) + } + + /// Returns true if selectors should be forced, preventing mask materialisation + pub(crate) fn should_force_selectors( + &self, + projection: &ProjectionMask, + offset_index: Option<&[OffsetIndexMetaData]>, + ) -> bool { + match offset_index { + Some(columns) => self.selection_skips_any_page(projection, columns), + None => false, + } + } + /// Splits off the first `row_count` from this [`RowSelection`] pub fn split_off(&mut self, row_count: usize) -> Self { let mut total_count = 0; @@ -691,6 +751,183 @@ fn union_row_selections(left: &[RowSelector], right: &[RowSelector]) -> RowSelec iter.collect() } +/// Cursor for iterating a [`RowSelection`] during execution within a +/// [`ReadPlan`](crate::arrow::arrow_reader::ReadPlan). +/// +/// This keeps per-reader state such as the current position and delegates the +/// actual storage strategy to the internal `RowSelectionBacking`. +#[derive(Debug)] +pub struct RowSelectionCursor { + /// Backing storage describing how the selection is materialised + storage: RowSelectionBacking, + /// Current absolute offset into the selection + position: usize, +} + +/// Backing storage that powers [`RowSelectionCursor`]. +/// +/// The cursor either walks a boolean mask (dense representation) or a queue +/// of [`RowSelector`] ranges (sparse representation). +#[derive(Debug)] +pub enum RowSelectionBacking { + Mask(BooleanBuffer), + Selectors(VecDeque), +} + +/// Result of computing the next chunk to read when using a bitmap mask +#[derive(Debug)] +pub struct MaskChunk { + /// Number of leading rows to skip before reaching selected rows + pub initial_skip: usize, + /// Total rows covered by this chunk (selected + skipped) + pub chunk_rows: usize, + /// Rows actually selected within the chunk + pub selected_rows: usize, + /// Starting offset within the mask where the chunk begins + pub mask_start: usize, +} + +impl RowSelectionCursor { + /// Create a cursor, choosing an efficient backing representation + pub(crate) fn new(selectors: Vec, strategy: RowSelectionStrategy) -> Self { + let storage = match strategy { + RowSelectionStrategy::Mask => { + RowSelectionBacking::Mask(boolean_mask_from_selectors(&selectors)) + } + RowSelectionStrategy::Selectors => RowSelectionBacking::Selectors(selectors.into()), + RowSelectionStrategy::Auto { .. } => { + panic!("RowSelectionStrategy::Auto must be resolved before creating cursor") + } + }; + + Self { + storage, + position: 0, + } + } + + /// Returns `true` when no further rows remain + pub fn is_empty(&self) -> bool { + match &self.storage { + RowSelectionBacking::Mask(mask) => self.position >= mask.len(), + RowSelectionBacking::Selectors(selectors) => selectors.is_empty(), + } + } + + /// Current position within the overall selection + pub fn position(&self) -> usize { + self.position + } + + /// Return the next [`RowSelector`] when using the sparse representation + pub fn next_selector(&mut self) -> RowSelector { + match &mut self.storage { + RowSelectionBacking::Selectors(selectors) => { + let selector = selectors.pop_front().unwrap(); + self.position += selector.row_count; + selector + } + RowSelectionBacking::Mask(_) => { + unreachable!("next_selector called for mask-based RowSelectionCursor") + } + } + } + + /// Return a selector to the front, rewinding the position (sparse-only) + pub fn return_selector(&mut self, selector: RowSelector) { + match &mut self.storage { + RowSelectionBacking::Selectors(selectors) => { + self.position = self.position.saturating_sub(selector.row_count); + selectors.push_front(selector); + } + RowSelectionBacking::Mask(_) => { + unreachable!("return_selector called for mask-based RowSelectionCursor") + } + } + } + + /// Returns `true` if the cursor is backed by a boolean mask + pub fn is_mask_backed(&self) -> bool { + matches!(self.storage, RowSelectionBacking::Mask(_)) + } + + /// Advance through the mask representation, producing the next chunk summary + pub fn next_mask_chunk(&mut self, batch_size: usize) -> Option { + if !self.is_mask_backed() { + unreachable!("next_mask_chunk called for selector-based RowSelectionCursor") + } + let (initial_skip, chunk_rows, selected_rows, mask_start, end_position) = { + let mask = match &self.storage { + RowSelectionBacking::Mask(mask) => mask, + RowSelectionBacking::Selectors(_) => return None, + }; + + if self.position >= mask.len() { + return None; + } + + let start_position = self.position; + let mut cursor = start_position; + let mut initial_skip = 0; + + while cursor < mask.len() && !mask.value(cursor) { + initial_skip += 1; + cursor += 1; + } + + let mask_start = cursor; + let mut chunk_rows = 0; + let mut selected_rows = 0; + + // Advance until enough rows have been selected to satisfy the batch size, + // or until the mask is exhausted. This mirrors the behaviour of the legacy + // `RowSelector` queue-based iteration. + while cursor < mask.len() && selected_rows < batch_size { + chunk_rows += 1; + if mask.value(cursor) { + selected_rows += 1; + } + cursor += 1; + } + + (initial_skip, chunk_rows, selected_rows, mask_start, cursor) + }; + + self.position = end_position; + + Some(MaskChunk { + initial_skip, + chunk_rows, + selected_rows, + mask_start, + }) + } + + /// Materialise the boolean values for a mask-backed chunk + pub fn mask_values_for(&self, chunk: &MaskChunk) -> Option { + match &self.storage { + RowSelectionBacking::Mask(mask) => { + if chunk.mask_start.saturating_add(chunk.chunk_rows) > mask.len() { + return None; + } + Some(BooleanArray::from( + mask.slice(chunk.mask_start, chunk.chunk_rows), + )) + } + RowSelectionBacking::Selectors(_) => None, + } + } +} + +fn boolean_mask_from_selectors(selectors: &[RowSelector]) -> BooleanBuffer { + let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); + let mut builder = BooleanBufferBuilder::new(total_rows); + for selector in selectors { + builder.append_n(selector.row_count, !selector.skip); + } + builder.finish() +} + #[cfg(test)] mod tests { use super::*; diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 44c5465202e7..9f2b22ed29d7 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -486,6 +486,7 @@ impl ParquetRecordBatchStreamBuilder { projection, filter, selection, + selection_strategy, limit, offset, metrics, @@ -507,6 +508,7 @@ impl ParquetRecordBatchStreamBuilder { projection, filter, selection, + selection_strategy, batch_size, row_groups, limit, @@ -769,15 +771,17 @@ mod tests { use crate::file::metadata::ParquetMetaDataReader; use crate::file::properties::WriterProperties; use arrow::compute::kernels::cmp::eq; + use arrow::compute::or; use arrow::error::Result as ArrowResult; use arrow_array::builder::{ListBuilder, StringBuilder}; use arrow_array::cast::AsArray; use arrow_array::types::Int32Type; use arrow_array::{ - Array, ArrayRef, Int8Array, Int32Array, RecordBatchReader, Scalar, StringArray, + Array, ArrayRef, Int8Array, Int32Array, Int64Array, RecordBatchReader, Scalar, StringArray, StructArray, UInt64Array, }; use arrow_schema::{DataType, Field, Schema}; + use arrow_select::concat::concat_batches; use futures::{StreamExt, TryStreamExt}; use rand::{Rng, rng}; use std::collections::HashMap; @@ -1203,6 +1207,78 @@ mod tests { assert_eq!(actual_rows, expected_rows); } + #[tokio::test] + async fn test_row_filter_full_page_skip_is_handled_async() { + let first_value: i64 = 1111; + let last_value: i64 = 9999; + let num_rows: usize = 12; + + // build data with row selection average length 4 + // The result would be (1111 XXXX) ... (4 page in the middle)... (XXXX 9999) + // The Row Selection would be [1111, (skip 10), 9999] + let schema = Arc::new(Schema::new(vec![ + Field::new("key", DataType::Int64, false), + Field::new("value", DataType::Int64, false), + ])); + + let mut int_values: Vec = (0..num_rows as i64).collect(); + int_values[0] = first_value; + int_values[num_rows - 1] = last_value; + let keys = Int64Array::from(int_values.clone()); + let values = Int64Array::from(int_values.clone()); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef], + ) + .unwrap(); + + let props = WriterProperties::builder() + .set_write_batch_size(2) + .set_data_page_row_count_limit(2) + .build(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let data = Bytes::from(buffer); + + let builder = ParquetRecordBatchStreamBuilder::new_with_options( + TestReader::new(data.clone()), + ArrowReaderOptions::new().with_page_index(true), + ) + .await + .unwrap(); + let schema = builder.parquet_schema().clone(); + let filter_mask = ProjectionMask::leaves(&schema, [0]); + + let predicate = ArrowPredicateFn::new(filter_mask, move |batch: RecordBatch| { + let column = batch.column(0); + let match_first = eq(column, &Int64Array::new_scalar(first_value))?; + let match_second = eq(column, &Int64Array::new_scalar(last_value))?; + or(&match_first, &match_second) + }); + + // The batch size is set to 12 to read all rows in one go after filtering + // If the Reader chooses mask to handle filter, it might cause panic because the mid 4 pages may not be decoded. + let stream = ParquetRecordBatchStreamBuilder::new_with_options( + TestReader::new(data), + ArrowReaderOptions::new().with_page_index(true), + ) + .await + .unwrap() + .with_row_filter(RowFilter::new(vec![Box::new(predicate)])) + .with_batch_size(12) + .build() + .unwrap(); + + // Collecting into batches validates the plan now downgrades to selectors instead of panicking. + let schema = stream.schema().clone(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + let result = concat_batches(&schema, &batches).unwrap(); + assert_eq!(result.num_rows(), 2); + } + #[tokio::test] async fn test_row_filter() { let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]); diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index b26a21132c4d..c5d09b5e7e83 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -162,6 +162,7 @@ impl ParquetPushDecoderBuilder { /// Create a [`ParquetPushDecoder`] with the configured options pub fn build(self) -> Result { + let selection_strategy = self.selection_strategy; let Self { input: NoInput, metadata: parquet_metadata, @@ -175,6 +176,7 @@ impl ParquetPushDecoderBuilder { limit, offset, metrics, + selection_strategy: _, max_predicate_cache_size, } = self; @@ -196,6 +198,7 @@ impl ParquetPushDecoderBuilder { metrics, max_predicate_cache_size, buffers, + selection_strategy, ); // Initialize the decoder with the configured options diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index a0ced8aa8522..4545c8d60e6e 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -23,7 +23,7 @@ use crate::arrow::ProjectionMask; use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache}; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; use crate::arrow::arrow_reader::{ - ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection, + ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection, RowSelectionStrategy, }; use crate::arrow::in_memory_row_group::ColumnChunkData; use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder; @@ -31,6 +31,7 @@ use crate::arrow::push_decoder::reader_builder::filter::CacheInfo; use crate::arrow::schema::ParquetField; use crate::errors::ParquetError; use crate::file::metadata::ParquetMetaData; +use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::util::push_buffers::PushBuffers; use bytes::Bytes; use data::DataRequest; @@ -155,6 +156,9 @@ pub(crate) struct RowGroupReaderBuilder { /// The metrics collector metrics: ArrowReaderMetrics, + /// Strategy for materialising row selections + selection_strategy: RowSelectionStrategy, + /// Current state of the decoder. /// /// It is taken when processing, and must be put back before returning @@ -179,6 +183,7 @@ impl RowGroupReaderBuilder { metrics: ArrowReaderMetrics, max_predicate_cache_size: usize, buffers: PushBuffers, + selection_strategy: RowSelectionStrategy, ) -> Self { Self { batch_size, @@ -190,6 +195,7 @@ impl RowGroupReaderBuilder { offset, metrics, max_predicate_cache_size, + selection_strategy, state: Some(RowGroupDecoderState::Finished), buffers, } @@ -233,7 +239,9 @@ impl RowGroupReaderBuilder { "Internal Error: next_row_group called while still reading a row group. Expected Finished state, got {state:?}" ))); } - let plan_builder = ReadPlanBuilder::new(self.batch_size).with_selection(selection); + let plan_builder = ReadPlanBuilder::new(self.batch_size) + .with_selection(selection) + .with_selection_strategy(self.selection_strategy); let row_group_info = RowGroupInfo { row_group_idx, @@ -484,7 +492,7 @@ impl RowGroupReaderBuilder { } // Apply any limit and offset - let plan_builder = plan_builder + let mut plan_builder = plan_builder .limited(row_count) .with_offset(self.offset) .with_limit(self.limit) @@ -524,6 +532,34 @@ impl RowGroupReaderBuilder { // so don't call with_cache_projection here .build(); + match self.selection_strategy { + RowSelectionStrategy::Auto { + threshold: _threshold, + safe_strategy, + } => { + let preferred_strategy = plan_builder.preferred_selection_strategy(); + let offset_index = self.row_group_offset_index(row_group_idx); + let force_selectors = safe_strategy + && matches!(preferred_strategy, RowSelectionStrategy::Mask) + && plan_builder.selection().is_some_and(|selection| { + selection.should_force_selectors(&self.projection, offset_index) + }); + + let resolved_strategy = if force_selectors { + RowSelectionStrategy::Selectors + } else { + preferred_strategy + }; + + plan_builder = plan_builder.with_selection_strategy(resolved_strategy); + } + _ => { + // If a non-auto strategy is specified, override any plan builder strategy + plan_builder = + plan_builder.with_selection_strategy(self.selection_strategy); + } + } + let row_group_info = RowGroupInfo { row_group_idx, row_count, @@ -650,14 +686,24 @@ impl RowGroupReaderBuilder { Some(ProjectionMask::leaves(schema, included_leaves)) } } + + /// Get the offset index for the specified row group, if any + fn row_group_offset_index(&self, row_group_idx: usize) -> Option<&[OffsetIndexMetaData]> { + self.metadata + .offset_index() + .filter(|index| !index.is_empty()) + .and_then(|index| index.get(row_group_idx)) + .map(|columns| columns.as_slice()) + } } #[cfg(test)] mod tests { use super::*; + #[test] // Verify that the size of RowGroupDecoderState does not grow too large fn test_structure_size() { - assert_eq!(std::mem::size_of::(), 184); + assert_eq!(std::mem::size_of::(), 200); } }