Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 70 additions & 41 deletions src/fenic/_backends/local/semantic_operators/parse_pdf.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from textwrap import dedent
from typing import List, Optional, Tuple
from typing import List, Optional, Tuple, Union

import fitz
import jinja2
Expand All @@ -10,7 +10,11 @@
BaseSingleColumnFilePathOperator,
CompletionOnlyRequestSender,
)
from fenic._backends.local.utils.doc_loader import DocFolderLoader
from fenic._backends.local.utils.doc_loader import (
DocFolderLoader,
resolve_and_coalesce_pages,
validate_pages_argument,
)
from fenic._inference.language_model import InferenceConfiguration, LanguageModel
from fenic._inference.types import LMRequestFile, LMRequestMessages
from fenic.core._logical_plan.resolved_types import ResolvedModelAlias
Expand Down Expand Up @@ -50,12 +54,14 @@ def __init__(
describe_images: bool = False,
model_alias: Optional[ResolvedModelAlias] = None,
max_output_tokens: Optional[int] = None,
pages: Optional[Union[pl.Series, int, List[Union[int, List[int]]]]] = None,
):
self.page_separator = page_separator
self.describe_images = describe_images
self.model = model
self.model_alias = model_alias
self.max_output_tokens = max_output_tokens
self.pages = pages

DocFolderLoader.check_file_extensions(input.to_list(), "pdf")

Expand Down Expand Up @@ -108,12 +114,19 @@ def build_request_messages_batch(self) -> Tuple[List[Optional[LMRequestMessages]
List of the each chunk size (page count) per PDF (page_counts_per_chunk_per_row)"""
messages_batch = []
page_counts_per_chunk_per_row = []
for path in self.input:
for idx, path in enumerate(self.input):
if not path:
messages_batch.append(None)
page_counts_per_chunk_per_row.append([1])
else:
file_chunks = self._get_file_chunks(path)
# pages can be a literal int, list of ranges, or a logical expression that resolves to an int or list of ranges
row_pages = self.pages.to_list()[idx] if isinstance(self.pages, pl.Series) else self.pages

# Validate pages if it's not None (validation happens here for column values)
if row_pages is not None:
validate_pages_argument(row_pages)

file_chunks = self._get_file_chunks(path, row_pages)
page_counts_per_chunk = []
for file in file_chunks:
messages_batch.append(
Expand All @@ -123,57 +136,73 @@ def build_request_messages_batch(self) -> Tuple[List[Optional[LMRequestMessages]
page_counts_per_chunk_per_row.append(page_counts_per_chunk)
return messages_batch, page_counts_per_chunk_per_row


def _get_file_chunks(self, file_path: str) -> List[LMRequestFile]:
def _get_file_chunks(self, file_path: str, pages: Optional[Union[int, List[Union[int, List[int]]]]] = None) -> List[LMRequestFile]:
"""Get the page chunks for the PDF file.

Limit the pages based on the model's output token limit and internal max pages per chunk.

Args:
file_path: Path to the PDF file
pages: Optional pages specification (1-indexed). If None, process all pages.

Returns:
List of LMRequestFile objects
List of (start_page, end_page) tuples (inclusive, 0-indexed)
"""
chunks = []
range_start_page = 0
range_tokens = 0
range_page_count = 0

with fitz.open(file_path) as doc:
total_pages = doc.page_count
for page_num in range(total_pages):
text = doc[page_num].get_text("text")
page_tokens = self.model.count_tokens(text)
# Check if we need to start a new range, either by reaching the token limit or the requested page range size
would_exceed_tokens = range_tokens > 0 and (range_tokens + page_tokens) * PDF_MARKDOWN_OUTPUT_TOKEN_MULTIPLIER > self.model.model_parameters.max_output_tokens
would_exceed_page_limit = range_page_count >= PDF_MAX_PAGES_CHUNK

if would_exceed_tokens or would_exceed_page_limit:
# Save current batch
last_page = page_num - 1
page_range = (range_start_page, last_page)
with fitz.open() as doc_chunk:
doc_chunk.insert_pdf(doc, from_page=range_start_page, to_page=last_page)
chunks.append(LMRequestFile(path=file_path, pdf_chunk_bytes=doc_chunk.tobytes(), page_range=page_range))
range_start_page = page_num
range_tokens = page_tokens
range_page_count = 1
else:
range_tokens += page_tokens
range_page_count += 1

# Add the last batch if there are remaining pages
if range_start_page < total_pages:
if range_start_page == 0:
# whole pdf fits in one chunk, no need to keep data in memory
chunks.append(LMRequestFile(path=file_path, pdf_chunk_bytes=None, page_range=(0, total_pages - 1)))
else:
# multi-page chunk
with fitz.open() as doc_chunk:
doc_chunk.insert_pdf(doc, from_page=range_start_page, to_page=total_pages - 1)
chunks.append(LMRequestFile(path=file_path, pdf_chunk_bytes=doc_chunk.tobytes(), page_range=(range_start_page, total_pages - 1)))

# Resolve page ranges
if pages is not None:
resolved_ranges = resolve_and_coalesce_pages(pages, total_pages)
# Filter out ranges that exceed the document's page count
resolved_ranges = [(start, min(end, total_pages - 1)) for start, end in resolved_ranges if start < total_pages]
else:
# Process all pages
resolved_ranges = [(0, total_pages - 1)]

# Process each range
for range_start, range_end in resolved_ranges:
# Track current chunk within this range
chunk_start_page = range_start
chunk_tokens = 0
chunk_page_count = 0

for page_num in range(range_start, range_end + 1):
text = doc[page_num].get_text("text")
page_tokens = self.model.count_tokens(text)

# Check if we need to start a new chunk
would_exceed_tokens = chunk_tokens > 0 and (chunk_tokens + page_tokens) * PDF_MARKDOWN_OUTPUT_TOKEN_MULTIPLIER > self.model.model_parameters.max_output_tokens
would_exceed_page_limit = chunk_page_count >= PDF_MAX_PAGES_CHUNK

if would_exceed_tokens or would_exceed_page_limit:
# Save current chunk
last_page = page_num - 1
page_range = (chunk_start_page, last_page)
with fitz.open() as doc_chunk:
doc_chunk.insert_pdf(doc, from_page=chunk_start_page, to_page=last_page)
chunks.append(LMRequestFile(path=file_path, pdf_chunk_bytes=doc_chunk.tobytes(), page_range=page_range))

# Start new chunk
chunk_start_page = page_num
chunk_tokens = page_tokens
chunk_page_count = 1
else:
chunk_tokens += page_tokens
chunk_page_count += 1

# Add the last chunk for this range if there are remaining pages
if chunk_start_page <= range_end:
if chunk_start_page == 0 and range_end == total_pages - 1 and len(resolved_ranges) == 1:
# Whole PDF fits in one chunk, no need to keep data in memory
chunks.append(LMRequestFile(path=file_path, pdf_chunk_bytes=None, page_range=(0, total_pages - 1)))
else:
# Multi-page chunk or partial PDF
with fitz.open() as doc_chunk:
doc_chunk.insert_pdf(doc, from_page=chunk_start_page, to_page=range_end)
chunks.append(LMRequestFile(path=file_path, pdf_chunk_bytes=doc_chunk.tobytes(), page_range=(chunk_start_page, range_end)))

return chunks

Expand Down
23 changes: 18 additions & 5 deletions src/fenic/_backends/local/transpiler/expr_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,18 +729,31 @@ def sem_summarize_fn(batch: pl.Series) -> pl.Series:
@_convert_expr.register(SemanticParsePDFExpr)
def _convert_parse_pdf_expr(self, logical: SemanticParsePDFExpr) -> pl.Expr:
def parse_pdf_fn(batch: pl.Series) -> pl.Series:
if batch.dtype == pl.Struct:
fields = batch.struct.fields
docs_series = batch.struct.field(fields[0])
pages_series_or_static = batch.struct.field(fields[1])
else:
docs_series = batch
pages_series_or_static = logical.pages

return SemanticParsePDF(
input=batch,
input=docs_series,
model=self.session_state.get_language_model(logical.model_alias),
page_separator=logical.page_separator,
describe_images=logical.describe_images,
model_alias=logical.model_alias,
max_output_tokens=logical.max_output_tokens,
pages=pages_series_or_static,
).execute()

return self._convert_expr(logical.expr).map_batches(
parse_pdf_fn, return_dtype=pl.Utf8
)
if isinstance(logical.pages, LogicalExpr):
return pl.struct(self._convert_expr(logical.expr), self._convert_expr(logical.pages)).map_batches(
parse_pdf_fn, return_dtype=pl.Utf8
)
else:
return self._convert_expr(logical.expr).map_batches(
parse_pdf_fn, return_dtype=pl.Utf8
)

@_convert_expr.register(ArrayJoinExpr)
def _convert_array_join_expr(self, logical: ArrayJoinExpr) -> pl.Expr:
Expand Down
90 changes: 89 additions & 1 deletion src/fenic/_backends/local/utils/doc_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import List, Literal, Optional, Tuple
from typing import List, Literal, Optional, Tuple, Union

import fitz # PyMuPDF
import polars as pl
Expand All @@ -26,6 +26,94 @@

logger = logging.getLogger(__name__)


def validate_pages_argument(pages: Optional[Union[int, List[Union[int, List[int]]]]]) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of making the page range a argument a proper Pydantic type with built-in validation like our other configuration objects?

Copy link
Contributor Author

@YoungVor YoungVor Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a good idea- It would be more clear to the user and easier to validate.

I wanted to have something working, but can certainly make the change before we merge.

For the column case, the type would still be array[int] or array[array[int]] though, right? But we can convert it to the pydantic type during the validate step while resolving the column

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We spoke in person -

  • for the Column case, it will continue to be a array[int] OR array[array[int]]
  • we'll keep the ability to do array[array[int]], because it lets the user white-list an entire document without enumerating the pages
  • drawback is that validating the page mask must be done dynamically on plan conversion
  • We'll fail loudly (instead of nulling the row) if the page mask is malformed
  • adding a convenience pydantic model is a nice to have for the static (not Column) case, I'll add it

"""Validate the pages argument.
Args:
pages: Either an int, or a list of ints or pairs of ints (ranges), or None
Raises:
ValidationError: If the pages argument is invalid
"""
if pages is None:
return
if isinstance(pages, int):
if pages <= 0:
raise ValidationError("Page numbers must be positive integers")
elif isinstance(pages, list):
for item in pages:
if isinstance(item, int):
if item <= 0:
raise ValidationError("Page numbers must be positive integers")
elif isinstance(item, list):
if len(item) != 2:
raise ValidationError("Page ranges must be pairs of two numbers")
if not all(isinstance(x, int) for x in item):
raise ValidationError("Page range values must be integers")
if item[0] <= 0 or item[1] <= 0:
raise ValidationError("Page numbers must be positive integers")
if item[1] < item[0]:
raise ValidationError(f"Invalid page range [{item[0]}, {item[1]}]: end page must be >= start page")
else:
raise ValidationError(f"Invalid pages element type: {type(item).__name__}. Expected int or list of two ints")
else:
raise ValidationError(f"Invalid pages type: {type(pages).__name__}. Expected int, list, or Column")


def resolve_and_coalesce_pages(pages: Union[int, List[Union[int, List[int]]]], total_pages: int) -> List[Tuple[int, int]]:
"""Resolve and coalesce page specifications into sorted, non-overlapping ranges.
Converts page numbers and ranges into a sorted list of non-overlapping page ranges.
All page numbers are 1-indexed as input but converted to 0-indexed ranges for internal use.
Args:
pages: Either a single page number (int) or a list of page numbers and/or ranges.
Page numbers are 1-indexed. Ranges are represented as [start, end] (inclusive).
Returns:
List of (start, end) tuples representing 0-indexed page ranges (inclusive).
Ranges are sorted and non-overlapping.
Examples:
>>> resolve_and_coalesce_pages(5)
[(4, 4)]
>>> resolve_and_coalesce_pages([1, 3, 5])
[(0, 0), (2, 2), (4, 4)]
>>> resolve_and_coalesce_pages([1, [2, 4], 3, 5])
[(0, 0), (1, 3), (4, 4)]
>>> resolve_and_coalesce_pages([[1, 3], [2, 5], 7])
[(0, 4), (6, 6)]
"""
# Convert to list of (start, end) tuples (0-indexed, inclusive)
ranges = []
if isinstance(pages, int):
# Single page: convert 1-indexed to 0-indexed
ranges.append((pages - 1, min(pages - 1, total_pages - 1)))
else:
for item in pages:
# Single page: convert 1-indexed to 0-indexed
# every range is capped by the total number of pages in the document
ranges.append((item - 1, min(item - 1, total_pages - 1)) if isinstance(item, int) else (item[0] - 1, min(item[1] - 1, total_pages - 1)))

# Sort by start page
ranges.sort()

# Coalesce overlapping ranges
if not ranges:
return []
coalesced = [ranges[0]]
for start, end in ranges[1:]:
last_start, last_end = coalesced[-1]
# Check if ranges overlap or are adjacent
if start <= last_end + 1:
# Merge ranges
coalesced[-1] = (last_start, max(last_end, end))
else:
# No overlap, add new range
coalesced.append((start, end))
return coalesced

class DocFolderLoader:
"""A class that encapsulates folder traversal and multi-threaded file processing.
Expand Down
28 changes: 28 additions & 0 deletions src/fenic/api/functions/semantic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from pydantic import BaseModel, ConfigDict, validate_call

from fenic._backends.local.utils.doc_loader import validate_pages_argument
from fenic.api.column import Column, ColumnOrName
from fenic.core._logical_plan.expressions import (
AliasExpr,
Expand Down Expand Up @@ -597,6 +598,7 @@ def parse_pdf(
page_separator: Optional[str] = None,
describe_images: bool = False, # for images that aren't tables
max_output_tokens: Optional[int] = None,
pages: Optional[Union[Column, int, List[Union[int, List[int]]]]] = None,
) -> Column:
r"""Parses a column of PDF paths into markdown.

Expand All @@ -609,12 +611,19 @@ def parse_pdf(
page_separator: Optional page separator to use for the parsing. If the separator includes the {page} placeholder, the model will replace it with the current page number.
describe_images: Flag to describe images in the PDF. If True, the prompt will ask the model to include a description of the image in the markdown output. If False, the prompt asks the model to ignore images that aren't tables or charts.
max_output_tokens: Optional maximum number of output tokens per ~3 pages of PDF (does not include reasoning tokens). If None, don't constrain the model's output.
pages: Optional pages or page ranges to parse. Can be:
- An int (single page number, 1-indexed)
- A list of ints and/or pairs of ints (e.g., [1, [3, 5], 7] to parse pages 1, 3-5, and 7)
- A Column expression that resolves to an int or list of ints or ranges
If None, all pages will be parsed.

Note:
For Gemini models, this function uses the google file API, uploading PDF files to Google's file store and deleting them after each request.
A Column expression for pages is limited by its dtype, so it must either be list of ranges (list of lists size 2) or a list of page numbers, not both. Rows can contain None/empty list to parse all pages.

Raises:
ExecutionError: If paths in the column are not valid PDF files.
ValidationError: If the pages argument is invalid.

Example: Parse PDF paths in a column into markdown
```python
Expand All @@ -633,15 +642,34 @@ def parse_pdf(
pdf_markdown = pdf_metadata.select(semantic.parse_pdf(col("file_path"), page_separator="--- PAGE BREAK ---")
pdf_markdown.select(col("markdown_content")).show()
```

Example: Parsing PDFs with a page range - take only the pages 1-2 and 5-7
```python
pdf_metadata = local_session.read.pdf_metadata("data/docs/**/*.pdf")
pdf_markdown = semantic.parse_pdf(col("file_path"), pages=[[1,2], [5,7]])
pdf_markdown.select(col("markdown_content")).show()
```

Example: Parsing PDFs with a page range column - take only the first and last page
```python
pdf_metadata = local_session.read.pdf_metadata("data/docs/**/*.pdf")
pdf_markdown = semantic.parse_pdf(col("file_path"), pages=array(lit(1), col("page_count"))
pdf_markdown.select(col("markdown_content")).show()
```
"""
resolved_model_alias = _resolve_model_alias(model_alias)

# Validate pages if it's not a Column
if not isinstance(pages, Column):
validate_pages_argument(pages)

return Column._from_logical_expr(
SemanticParsePDFExpr(
Column._from_col_or_name(column)._logical_expr,
model_alias=resolved_model_alias,
page_separator=page_separator,
describe_images=describe_images,
max_output_tokens=max_output_tokens,
pages=pages if not isinstance(pages, Column) else pages._logical_expr,
)
)
Loading
Loading