Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions parquet/benches/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,22 @@ fn criterion_benchmark(c: &mut Criterion) {
})
});

let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(true);
c.bench_function("decode metadata with stats mask", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_metadata_with_options(&meta_data, Some(&options))
.unwrap();
})
});

let options = ParquetMetaDataOptions::new().with_skip_encoding_stats(true);
c.bench_function("decode metadata with skip PES", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_metadata_with_options(&meta_data, Some(&options))
.unwrap();
})
});

let buf: Bytes = black_box(encoded_meta()).into();
c.bench_function("decode parquet metadata (wide)", |b| {
b.iter(|| {
Expand All @@ -187,6 +203,20 @@ fn criterion_benchmark(c: &mut Criterion) {
ParquetMetaDataReader::decode_metadata_with_options(&buf, Some(&options)).unwrap();
})
});

let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(true);
c.bench_function("decode metadata (wide) with stats mask", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_metadata_with_options(&buf, Some(&options)).unwrap();
})
});

let options = ParquetMetaDataOptions::new().with_skip_encoding_stats(true);
c.bench_function("decode metadata (wide) with skip PES", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_metadata_with_options(&buf, Some(&options)).unwrap();
})
});
}

criterion_group!(benches, criterion_benchmark);
Expand Down
91 changes: 91 additions & 0 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,35 @@ impl ArrowReaderOptions {
self
}

/// Set whether to convert the [`encoding_stats`] in the Parquet `ColumnMetaData` to a bitmask.
///
/// See [`ColumnChunkMetaData::page_encoding_stats_mask`] for an explanation of why this
/// might be desirable.
///
/// [`ColumnChunkMetaData::page_encoding_stats_mask`]:
/// crate::file::metadata::ColumnChunkMetaData::page_encoding_stats_mask
/// [`encoding_stats`]:
/// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
self.metadata_options.set_encoding_stats_as_mask(val);
self
}

/// Set whether to skip decoding the [`encoding_stats`] field of the Parquet `ColumnMetaData`.
///
/// [`encoding_stats`]:
/// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
pub fn with_skip_encoding_stats(mut self, val: bool) -> Self {
self.metadata_options.set_skip_encoding_stats(val);
self
}

/// Provide a list of column indices for which to decode `encoding_stats`.
pub fn with_keep_encoding_stats(mut self, keep: &[usize]) -> Self {
self.metadata_options.set_keep_encoding_stats(keep);
self
}

/// Provide the file decryption properties to use when reading encrypted parquet files.
///
/// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided.
Expand Down Expand Up @@ -1282,6 +1311,68 @@ mod tests {
assert_eq!(expected.as_ref(), builder.metadata.as_ref());
}

#[test]
fn test_page_encoding_stats_mask() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_tiny_pages.parquet");
let file = File::open(path).unwrap();

let arrow_options = ArrowReaderOptions::new().with_encoding_stats_as_mask(true);
let builder =
ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();

let row_group_metadata = builder.metadata.row_group(0);

// test page encoding stats
let page_encoding_stats = row_group_metadata
.column(0)
.page_encoding_stats_mask()
.unwrap();
assert!(page_encoding_stats.is_only(Encoding::PLAIN));
let page_encoding_stats = row_group_metadata
.column(2)
.page_encoding_stats_mask()
.unwrap();
assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
}

#[test]
fn test_page_encoding_stats_skipped() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_tiny_pages.parquet");
let file = File::open(path).unwrap();

// test skipping all
let arrow_options = ArrowReaderOptions::new().with_skip_encoding_stats(true);
let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
file.try_clone().unwrap(),
arrow_options,
)
.unwrap();

let row_group_metadata = builder.metadata.row_group(0);
for column in row_group_metadata.columns() {
assert!(column.page_encoding_stats().is_none());
assert!(column.page_encoding_stats_mask().is_none());
}

// test skipping all but one column and converting to mask
let arrow_options = ArrowReaderOptions::new()
.with_encoding_stats_as_mask(true)
.with_keep_encoding_stats(&[0]);
let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
file.try_clone().unwrap(),
arrow_options,
)
.unwrap();

let row_group_metadata = builder.metadata.row_group(0);
for (idx, column) in row_group_metadata.columns().iter().enumerate() {
assert!(column.page_encoding_stats().is_none());
assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
}
}

#[test]
fn test_arrow_reader_single_column() {
let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
Expand Down
15 changes: 15 additions & 0 deletions parquet/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,11 @@ impl EncodingMask {
self.0 & (1 << (val as i32)) != 0
}

/// Test if this mask has only the bit for the given [`Encoding`] set.
pub fn is_only(&self, val: Encoding) -> bool {
self.0 == (1 << (val as i32))
}

/// Test if all [`Encoding`]s in a given set are present in this mask.
pub fn all_set<'a>(&self, mut encodings: impl Iterator<Item = &'a Encoding>) -> bool {
encodings.all(|&e| self.is_set(e))
Expand Down Expand Up @@ -2498,4 +2503,14 @@ mod tests {
"Parquet error: Attempt to create invalid mask: 0x2"
);
}

#[test]
fn test_encoding_mask_is_only() {
let mask = EncodingMask::new_from_encodings([Encoding::PLAIN].iter());
assert!(mask.is_only(Encoding::PLAIN));

let mask =
EncodingMask::new_from_encodings([Encoding::PLAIN, Encoding::PLAIN_DICTIONARY].iter());
assert!(!mask.is_only(Encoding::PLAIN));
}
}
55 changes: 47 additions & 8 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,7 @@ pub struct ColumnChunkMetaData {
statistics: Option<Statistics>,
geo_statistics: Option<Box<geo_statistics::GeospatialStatistics>>,
encoding_stats: Option<Vec<PageEncodingStats>>,
encoding_stats_mask: Option<EncodingMask>,
bloom_filter_offset: Option<i64>,
bloom_filter_length: Option<i32>,
offset_index_offset: Option<i64>,
Expand Down Expand Up @@ -1050,12 +1051,43 @@ impl ColumnChunkMetaData {
self.geo_statistics.as_deref()
}

/// Returns the offset for the page encoding stats,
/// or `None` if no page encoding stats are available.
/// Returns the page encoding statistics, or `None` if no page encoding statistics
/// are available.
pub fn page_encoding_stats(&self) -> Option<&Vec<PageEncodingStats>> {
self.encoding_stats.as_ref()
}

/// Returns the page encoding statistics reduced to a bitmask, or `None` if statistics are
/// not available.
///
/// The [`PageEncodingStats`] struct was added to the Parquet specification specifically to
/// enable fast determination of whether all pages in a column chunk are dictionary encoded
/// (see <https://github.com/apache/parquet-format/pull/16>).
/// Decoding the full page encoding statistics, however, can be very costly, and is not
/// necessary to support the aforementioned use case. As an alternative, this crate can
/// instead distill the list of `PageEncodingStats` down to a bitmask of just the encodings
/// used for data pages
/// (see [`ParquetMetaDataOptions::set_encoding_stats_as_mask`]).
/// To test for an all-dictionary-encoded chunk one could use this bitmask in the following way:
///
/// ```rust
/// use parquet::basic::Encoding;
/// use parquet::file::metadata::ColumnChunkMetaData;
/// // test if all data pages in the column chunk are dictionary encoded
/// fn is_all_dictionary_encoded(col_meta: &ColumnChunkMetaData) -> bool {
/// // check that dictionary encoding was used
/// col_meta.dictionary_page_offset().is_some()
/// && col_meta.page_encoding_stats_mask().is_some_and(|mask| {
/// // mask should only have one bit set, either for PLAIN_DICTIONARY or
/// // RLE_DICTIONARY
/// mask.is_only(Encoding::PLAIN_DICTIONARY) || mask.is_only(Encoding::RLE_DICTIONARY)
/// })
/// }
/// ```
pub fn page_encoding_stats_mask(&self) -> Option<&EncodingMask> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this should be data_page_encoding_stats_mask (or just data_page_encoding_stats) to make it clear it only has the stats for data pages.

self.encoding_stats_mask.as_ref()
}

/// Returns the offset for the bloom filter.
pub fn bloom_filter_offset(&self) -> Option<i64> {
self.bloom_filter_offset
Expand Down Expand Up @@ -1178,6 +1210,7 @@ impl ColumnChunkMetaDataBuilder {
statistics: None,
geo_statistics: None,
encoding_stats: None,
encoding_stats_mask: None,
bloom_filter_offset: None,
bloom_filter_length: None,
offset_index_offset: None,
Expand Down Expand Up @@ -1278,6 +1311,12 @@ impl ColumnChunkMetaDataBuilder {
self
}

/// Sets page encoding stats mask for this column chunk.
pub fn set_page_encoding_stats_mask(mut self, value: EncodingMask) -> Self {
self.0.encoding_stats_mask = Some(value);
self
}

/// Clears the page encoding stats for this column chunk.
pub fn clear_page_encoding_stats(mut self) -> Self {
self.0.encoding_stats = None;
Expand Down Expand Up @@ -1882,9 +1921,9 @@ mod tests {
.build();

#[cfg(not(feature = "encryption"))]
let base_expected_size = 2766;
let base_expected_size = 2798;
#[cfg(feature = "encryption")]
let base_expected_size = 2934;
let base_expected_size = 2966;

assert_eq!(parquet_meta.memory_size(), base_expected_size);

Expand Down Expand Up @@ -1913,9 +1952,9 @@ mod tests {
.build();

#[cfg(not(feature = "encryption"))]
let bigger_expected_size = 3192;
let bigger_expected_size = 3224;
#[cfg(feature = "encryption")]
let bigger_expected_size = 3360;
let bigger_expected_size = 3392;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for adding the mask to ColumnChunkMetaData. An alternative might be to create an enum with Vec and mask variants if we don't want more bloat.


// more set fields means more memory usage
assert!(bigger_expected_size > base_expected_size);
Expand Down Expand Up @@ -1962,7 +2001,7 @@ mod tests {
.set_row_groups(row_group_meta.clone())
.build();

let base_expected_size = 2058;
let base_expected_size = 2074;
assert_eq!(parquet_meta_data.memory_size(), base_expected_size);

let footer_key = "0123456789012345".as_bytes();
Expand All @@ -1988,7 +2027,7 @@ mod tests {
.set_file_decryptor(Some(decryptor))
.build();

let expected_size_with_decryptor = 3072;
let expected_size_with_decryptor = 3088;
assert!(expected_size_with_decryptor > base_expected_size);

assert_eq!(
Expand Down
Loading
Loading