diff --git a/src/fenic/_backends/local/semantic_operators/parse_pdf.py b/src/fenic/_backends/local/semantic_operators/parse_pdf.py index 6107df80..387872ea 100644 --- a/src/fenic/_backends/local/semantic_operators/parse_pdf.py +++ b/src/fenic/_backends/local/semantic_operators/parse_pdf.py @@ -1,6 +1,6 @@ import logging from textwrap import dedent -from typing import List, Optional, Tuple +from typing import List, Optional, Tuple, Union import fitz import jinja2 @@ -10,7 +10,11 @@ BaseSingleColumnFilePathOperator, CompletionOnlyRequestSender, ) -from fenic._backends.local.utils.doc_loader import DocFolderLoader +from fenic._backends.local.utils.doc_loader import ( + DocFolderLoader, + resolve_and_coalesce_pages, + validate_pages_argument, +) from fenic._inference.language_model import InferenceConfiguration, LanguageModel from fenic._inference.types import LMRequestFile, LMRequestMessages from fenic.core._logical_plan.resolved_types import ResolvedModelAlias @@ -50,12 +54,14 @@ def __init__( describe_images: bool = False, model_alias: Optional[ResolvedModelAlias] = None, max_output_tokens: Optional[int] = None, + pages: Optional[Union[pl.Series, int, List[Union[int, List[int]]]]] = None, ): self.page_separator = page_separator self.describe_images = describe_images self.model = model self.model_alias = model_alias self.max_output_tokens = max_output_tokens + self.pages = pages DocFolderLoader.check_file_extensions(input.to_list(), "pdf") @@ -108,12 +114,19 @@ def build_request_messages_batch(self) -> Tuple[List[Optional[LMRequestMessages] List of the each chunk size (page count) per PDF (page_counts_per_chunk_per_row)""" messages_batch = [] page_counts_per_chunk_per_row = [] - for path in self.input: + for idx, path in enumerate(self.input): if not path: messages_batch.append(None) page_counts_per_chunk_per_row.append([1]) else: - file_chunks = self._get_file_chunks(path) + # pages can be a literal int, list of ranges, or a logical expression that resolves to an int or list of ranges + row_pages = self.pages.to_list()[idx] if isinstance(self.pages, pl.Series) else self.pages + + # Validate pages if it's not None (validation happens here for column values) + if row_pages is not None: + validate_pages_argument(row_pages) + + file_chunks = self._get_file_chunks(path, row_pages) page_counts_per_chunk = [] for file in file_chunks: messages_batch.append( @@ -123,57 +136,73 @@ def build_request_messages_batch(self) -> Tuple[List[Optional[LMRequestMessages] page_counts_per_chunk_per_row.append(page_counts_per_chunk) return messages_batch, page_counts_per_chunk_per_row - - def _get_file_chunks(self, file_path: str) -> List[LMRequestFile]: + def _get_file_chunks(self, file_path: str, pages: Optional[Union[int, List[Union[int, List[int]]]]] = None) -> List[LMRequestFile]: """Get the page chunks for the PDF file. Limit the pages based on the model's output token limit and internal max pages per chunk. Args: file_path: Path to the PDF file + pages: Optional pages specification (1-indexed). If None, process all pages. Returns: List of LMRequestFile objects - List of (start_page, end_page) tuples (inclusive, 0-indexed) """ chunks = [] - range_start_page = 0 - range_tokens = 0 - range_page_count = 0 with fitz.open(file_path) as doc: total_pages = doc.page_count - for page_num in range(total_pages): - text = doc[page_num].get_text("text") - page_tokens = self.model.count_tokens(text) - # Check if we need to start a new range, either by reaching the token limit or the requested page range size - would_exceed_tokens = range_tokens > 0 and (range_tokens + page_tokens) * PDF_MARKDOWN_OUTPUT_TOKEN_MULTIPLIER > self.model.model_parameters.max_output_tokens - would_exceed_page_limit = range_page_count >= PDF_MAX_PAGES_CHUNK - - if would_exceed_tokens or would_exceed_page_limit: - # Save current batch - last_page = page_num - 1 - page_range = (range_start_page, last_page) - with fitz.open() as doc_chunk: - doc_chunk.insert_pdf(doc, from_page=range_start_page, to_page=last_page) - chunks.append(LMRequestFile(path=file_path, pdf_chunk_bytes=doc_chunk.tobytes(), page_range=page_range)) - range_start_page = page_num - range_tokens = page_tokens - range_page_count = 1 - else: - range_tokens += page_tokens - range_page_count += 1 - - # Add the last batch if there are remaining pages - if range_start_page < total_pages: - if range_start_page == 0: - # whole pdf fits in one chunk, no need to keep data in memory - chunks.append(LMRequestFile(path=file_path, pdf_chunk_bytes=None, page_range=(0, total_pages - 1))) - else: - # multi-page chunk - with fitz.open() as doc_chunk: - doc_chunk.insert_pdf(doc, from_page=range_start_page, to_page=total_pages - 1) - chunks.append(LMRequestFile(path=file_path, pdf_chunk_bytes=doc_chunk.tobytes(), page_range=(range_start_page, total_pages - 1))) + + # Resolve page ranges + if pages is not None: + resolved_ranges = resolve_and_coalesce_pages(pages, total_pages) + # Filter out ranges that exceed the document's page count + resolved_ranges = [(start, min(end, total_pages - 1)) for start, end in resolved_ranges if start < total_pages] + else: + # Process all pages + resolved_ranges = [(0, total_pages - 1)] + + # Process each range + for range_start, range_end in resolved_ranges: + # Track current chunk within this range + chunk_start_page = range_start + chunk_tokens = 0 + chunk_page_count = 0 + + for page_num in range(range_start, range_end + 1): + text = doc[page_num].get_text("text") + page_tokens = self.model.count_tokens(text) + + # Check if we need to start a new chunk + would_exceed_tokens = chunk_tokens > 0 and (chunk_tokens + page_tokens) * PDF_MARKDOWN_OUTPUT_TOKEN_MULTIPLIER > self.model.model_parameters.max_output_tokens + would_exceed_page_limit = chunk_page_count >= PDF_MAX_PAGES_CHUNK + + if would_exceed_tokens or would_exceed_page_limit: + # Save current chunk + last_page = page_num - 1 + page_range = (chunk_start_page, last_page) + with fitz.open() as doc_chunk: + doc_chunk.insert_pdf(doc, from_page=chunk_start_page, to_page=last_page) + chunks.append(LMRequestFile(path=file_path, pdf_chunk_bytes=doc_chunk.tobytes(), page_range=page_range)) + + # Start new chunk + chunk_start_page = page_num + chunk_tokens = page_tokens + chunk_page_count = 1 + else: + chunk_tokens += page_tokens + chunk_page_count += 1 + + # Add the last chunk for this range if there are remaining pages + if chunk_start_page <= range_end: + if chunk_start_page == 0 and range_end == total_pages - 1 and len(resolved_ranges) == 1: + # Whole PDF fits in one chunk, no need to keep data in memory + chunks.append(LMRequestFile(path=file_path, pdf_chunk_bytes=None, page_range=(0, total_pages - 1))) + else: + # Multi-page chunk or partial PDF + with fitz.open() as doc_chunk: + doc_chunk.insert_pdf(doc, from_page=chunk_start_page, to_page=range_end) + chunks.append(LMRequestFile(path=file_path, pdf_chunk_bytes=doc_chunk.tobytes(), page_range=(chunk_start_page, range_end))) return chunks diff --git a/src/fenic/_backends/local/transpiler/expr_converter.py b/src/fenic/_backends/local/transpiler/expr_converter.py index e95d9a37..6def3e2d 100644 --- a/src/fenic/_backends/local/transpiler/expr_converter.py +++ b/src/fenic/_backends/local/transpiler/expr_converter.py @@ -729,18 +729,31 @@ def sem_summarize_fn(batch: pl.Series) -> pl.Series: @_convert_expr.register(SemanticParsePDFExpr) def _convert_parse_pdf_expr(self, logical: SemanticParsePDFExpr) -> pl.Expr: def parse_pdf_fn(batch: pl.Series) -> pl.Series: + if batch.dtype == pl.Struct: + fields = batch.struct.fields + docs_series = batch.struct.field(fields[0]) + pages_series_or_static = batch.struct.field(fields[1]) + else: + docs_series = batch + pages_series_or_static = logical.pages + return SemanticParsePDF( - input=batch, + input=docs_series, model=self.session_state.get_language_model(logical.model_alias), page_separator=logical.page_separator, describe_images=logical.describe_images, model_alias=logical.model_alias, max_output_tokens=logical.max_output_tokens, + pages=pages_series_or_static, ).execute() - - return self._convert_expr(logical.expr).map_batches( - parse_pdf_fn, return_dtype=pl.Utf8 - ) + if isinstance(logical.pages, LogicalExpr): + return pl.struct(self._convert_expr(logical.expr), self._convert_expr(logical.pages)).map_batches( + parse_pdf_fn, return_dtype=pl.Utf8 + ) + else: + return self._convert_expr(logical.expr).map_batches( + parse_pdf_fn, return_dtype=pl.Utf8 + ) @_convert_expr.register(ArrayJoinExpr) def _convert_array_join_expr(self, logical: ArrayJoinExpr) -> pl.Expr: diff --git a/src/fenic/_backends/local/utils/doc_loader.py b/src/fenic/_backends/local/utils/doc_loader.py index 0195e164..bafe2497 100644 --- a/src/fenic/_backends/local/utils/doc_loader.py +++ b/src/fenic/_backends/local/utils/doc_loader.py @@ -6,7 +6,7 @@ import re from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path -from typing import List, Literal, Optional, Tuple +from typing import List, Literal, Optional, Tuple, Union import fitz # PyMuPDF import polars as pl @@ -26,6 +26,94 @@ logger = logging.getLogger(__name__) + +def validate_pages_argument(pages: Optional[Union[int, List[Union[int, List[int]]]]]) -> None: + """Validate the pages argument. + + Args: + pages: Either an int, or a list of ints or pairs of ints (ranges), or None + + Raises: + ValidationError: If the pages argument is invalid + """ + if pages is None: + return + if isinstance(pages, int): + if pages <= 0: + raise ValidationError("Page numbers must be positive integers") + elif isinstance(pages, list): + for item in pages: + if isinstance(item, int): + if item <= 0: + raise ValidationError("Page numbers must be positive integers") + elif isinstance(item, list): + if len(item) != 2: + raise ValidationError("Page ranges must be pairs of two numbers") + if not all(isinstance(x, int) for x in item): + raise ValidationError("Page range values must be integers") + if item[0] <= 0 or item[1] <= 0: + raise ValidationError("Page numbers must be positive integers") + if item[1] < item[0]: + raise ValidationError(f"Invalid page range [{item[0]}, {item[1]}]: end page must be >= start page") + else: + raise ValidationError(f"Invalid pages element type: {type(item).__name__}. Expected int or list of two ints") + else: + raise ValidationError(f"Invalid pages type: {type(pages).__name__}. Expected int, list, or Column") + + +def resolve_and_coalesce_pages(pages: Union[int, List[Union[int, List[int]]]], total_pages: int) -> List[Tuple[int, int]]: + """Resolve and coalesce page specifications into sorted, non-overlapping ranges. + + Converts page numbers and ranges into a sorted list of non-overlapping page ranges. + All page numbers are 1-indexed as input but converted to 0-indexed ranges for internal use. + + Args: + pages: Either a single page number (int) or a list of page numbers and/or ranges. + Page numbers are 1-indexed. Ranges are represented as [start, end] (inclusive). + + Returns: + List of (start, end) tuples representing 0-indexed page ranges (inclusive). + Ranges are sorted and non-overlapping. + + Examples: + >>> resolve_and_coalesce_pages(5) + [(4, 4)] + >>> resolve_and_coalesce_pages([1, 3, 5]) + [(0, 0), (2, 2), (4, 4)] + >>> resolve_and_coalesce_pages([1, [2, 4], 3, 5]) + [(0, 0), (1, 3), (4, 4)] + >>> resolve_and_coalesce_pages([[1, 3], [2, 5], 7]) + [(0, 4), (6, 6)] + """ + # Convert to list of (start, end) tuples (0-indexed, inclusive) + ranges = [] + if isinstance(pages, int): + # Single page: convert 1-indexed to 0-indexed + ranges.append((pages - 1, min(pages - 1, total_pages - 1))) + else: + for item in pages: + # Single page: convert 1-indexed to 0-indexed + # every range is capped by the total number of pages in the document + ranges.append((item - 1, min(item - 1, total_pages - 1)) if isinstance(item, int) else (item[0] - 1, min(item[1] - 1, total_pages - 1))) + + # Sort by start page + ranges.sort() + + # Coalesce overlapping ranges + if not ranges: + return [] + coalesced = [ranges[0]] + for start, end in ranges[1:]: + last_start, last_end = coalesced[-1] + # Check if ranges overlap or are adjacent + if start <= last_end + 1: + # Merge ranges + coalesced[-1] = (last_start, max(last_end, end)) + else: + # No overlap, add new range + coalesced.append((start, end)) + return coalesced + class DocFolderLoader: """A class that encapsulates folder traversal and multi-threaded file processing. diff --git a/src/fenic/api/functions/semantic.py b/src/fenic/api/functions/semantic.py index 4c8dcbc6..e6e11d3b 100644 --- a/src/fenic/api/functions/semantic.py +++ b/src/fenic/api/functions/semantic.py @@ -4,6 +4,7 @@ from pydantic import BaseModel, ConfigDict, validate_call +from fenic._backends.local.utils.doc_loader import validate_pages_argument from fenic.api.column import Column, ColumnOrName from fenic.core._logical_plan.expressions import ( AliasExpr, @@ -597,6 +598,7 @@ def parse_pdf( page_separator: Optional[str] = None, describe_images: bool = False, # for images that aren't tables max_output_tokens: Optional[int] = None, + pages: Optional[Union[Column, int, List[Union[int, List[int]]]]] = None, ) -> Column: r"""Parses a column of PDF paths into markdown. @@ -609,12 +611,19 @@ def parse_pdf( page_separator: Optional page separator to use for the parsing. If the separator includes the {page} placeholder, the model will replace it with the current page number. describe_images: Flag to describe images in the PDF. If True, the prompt will ask the model to include a description of the image in the markdown output. If False, the prompt asks the model to ignore images that aren't tables or charts. max_output_tokens: Optional maximum number of output tokens per ~3 pages of PDF (does not include reasoning tokens). If None, don't constrain the model's output. + pages: Optional pages or page ranges to parse. Can be: + - An int (single page number, 1-indexed) + - A list of ints and/or pairs of ints (e.g., [1, [3, 5], 7] to parse pages 1, 3-5, and 7) + - A Column expression that resolves to an int or list of ints or ranges + If None, all pages will be parsed. Note: For Gemini models, this function uses the google file API, uploading PDF files to Google's file store and deleting them after each request. + A Column expression for pages is limited by its dtype, so it must either be list of ranges (list of lists size 2) or a list of page numbers, not both. Rows can contain None/empty list to parse all pages. Raises: ExecutionError: If paths in the column are not valid PDF files. + ValidationError: If the pages argument is invalid. Example: Parse PDF paths in a column into markdown ```python @@ -633,9 +642,27 @@ def parse_pdf( pdf_markdown = pdf_metadata.select(semantic.parse_pdf(col("file_path"), page_separator="--- PAGE BREAK ---") pdf_markdown.select(col("markdown_content")).show() ``` + + Example: Parsing PDFs with a page range - take only the pages 1-2 and 5-7 + ```python + pdf_metadata = local_session.read.pdf_metadata("data/docs/**/*.pdf") + pdf_markdown = semantic.parse_pdf(col("file_path"), pages=[[1,2], [5,7]]) + pdf_markdown.select(col("markdown_content")).show() + ``` + + Example: Parsing PDFs with a page range column - take only the first and last page + ```python + pdf_metadata = local_session.read.pdf_metadata("data/docs/**/*.pdf") + pdf_markdown = semantic.parse_pdf(col("file_path"), pages=array(lit(1), col("page_count")) + pdf_markdown.select(col("markdown_content")).show() + ``` """ resolved_model_alias = _resolve_model_alias(model_alias) + # Validate pages if it's not a Column + if not isinstance(pages, Column): + validate_pages_argument(pages) + return Column._from_logical_expr( SemanticParsePDFExpr( Column._from_col_or_name(column)._logical_expr, @@ -643,5 +670,6 @@ def parse_pdf( page_separator=page_separator, describe_images=describe_images, max_output_tokens=max_output_tokens, + pages=pages if not isinstance(pages, Column) else pages._logical_expr, ) ) \ No newline at end of file diff --git a/src/fenic/core/_logical_plan/expressions/semantic.py b/src/fenic/core/_logical_plan/expressions/semantic.py index 260e6b2d..dfd6f3a8 100644 --- a/src/fenic/core/_logical_plan/expressions/semantic.py +++ b/src/fenic/core/_logical_plan/expressions/semantic.py @@ -632,12 +632,14 @@ def __init__( page_separator: Optional[str] = None, describe_images: bool = False, max_output_tokens: Optional[int] = None, + pages: Optional[Union[LogicalExpr, int, List[Union[int, List[int]]]]] = None, ): self.expr = expr self.model_alias = model_alias self.page_separator = page_separator self.describe_images = describe_images self.max_output_tokens = max_output_tokens + self.pages = pages # Initialize validator for composition-based type validation self._validator = SignatureValidator(self.function_name) @@ -649,7 +651,10 @@ def validator(self) -> SignatureValidator: def children(self) -> List[LogicalExpr]: """Return the child expressions.""" - return [self.expr] + children = [self.expr] + if isinstance(self.pages, LogicalExpr): + children.append(self.pages) + return children def to_column_field(self, plan: LogicalPlan, session_state: BaseSessionState) -> ColumnField: """Handle signature validation and completion parameter validation.""" @@ -668,4 +673,5 @@ def _eq_specific(self, other: SemanticParsePDFExpr) -> bool: return (self.model_alias == other.model_alias and self.page_separator == other.page_separator and self.describe_images == other.describe_images - and self.max_output_tokens == other.max_output_tokens) \ No newline at end of file + and self.max_output_tokens == other.max_output_tokens + and self.pages == other.pages) \ No newline at end of file diff --git a/src/fenic/core/_logical_plan/signatures/semantic.py b/src/fenic/core/_logical_plan/signatures/semantic.py index 86ccb5b4..0e099370 100644 --- a/src/fenic/core/_logical_plan/signatures/semantic.py +++ b/src/fenic/core/_logical_plan/signatures/semantic.py @@ -8,7 +8,7 @@ ReturnTypeStrategy, ) from fenic.core._logical_plan.signatures.registry import FunctionRegistry -from fenic.core._logical_plan.signatures.type_signature import Exact +from fenic.core._logical_plan.signatures.type_signature import Exact, VariadicAny from fenic.core.types.datatypes import MarkdownType, StringType @@ -50,9 +50,10 @@ def register_semantic_signatures(): )) # Parse PDF - parse PDF files with OCR/VLMs + # Accepts 1 arg (file_path) or 2 args (file_path, pages column) FunctionRegistry.register("semantic.parse_pdf", FunctionSignature( function_name="semantic.parse_pdf", - type_signature=Exact([StringType]), + type_signature=VariadicAny(expected_min_args=1), return_type=MarkdownType )) diff --git a/tests/_backends/local/functions/test_semantic_parse_pdf.py b/tests/_backends/local/functions/test_semantic_parse_pdf.py index 55c49f8c..09594735 100644 --- a/tests/_backends/local/functions/test_semantic_parse_pdf.py +++ b/tests/_backends/local/functions/test_semantic_parse_pdf.py @@ -5,15 +5,16 @@ import pytest from pydantic import BaseModel +from pydantic import ValidationError as PydanticValidationError -from fenic import SemanticConfig, Session, SessionConfig, col, semantic +from fenic import SemanticConfig, Session, SessionConfig, col, lit, semantic from fenic.api.session.config import ( GoogleDeveloperLanguageModel, OpenAILanguageModel, OpenRouterLanguageModel, ) from fenic.core._inference.model_catalog import ModelProvider, model_catalog -from fenic.core.error import ValidationError +from fenic.core.error import ExecutionError, ValidationError from fenic.core.types import ColumnField, MarkdownType from tests.conftest import _save_pdf_file @@ -29,9 +30,9 @@ # keeping the more expensive models off by default # test_processing_engine is an OpenRouter tool choice for processing PDFs vlms_to_test = [ + (OpenRouterLanguageModel, "google/gemini-2.0-flash-lite-001", "native"), (OpenRouterLanguageModel, "openai/gpt-4.1-nano", "mistral-ocr"), (OpenRouterLanguageModel, "openai/gpt-4.1-nano", "pdf-text"), - (OpenRouterLanguageModel, "google/gemini-2.0-flash-lite-001", "native"), #(OpenAILanguageModel, "gpt-5-nano", None), (OpenAILanguageModel, "gpt-4o-mini", None), #(OpenAILanguageModel, "o3", None), @@ -155,6 +156,327 @@ def test_semantic_parse_pdf_without_models(): session.create_dataframe({"pdf_path": ["test.pdf"]}).select(semantic.parse_pdf(col("pdf_path")).alias("markdown_content")) session.stop() + +def test_semantic_parse_pdf_invalid_pages(temp_dir_just_one_file): + """Test that invalid pages argument raises ValidationError.""" + + # Session with models for column tests + session_config = SessionConfig( + app_name="semantic_parse_pdf_invalid_pages_with_models", + semantic=SemanticConfig( + language_models={"test_model": GoogleDeveloperLanguageModel( + model_name="gemini-2.0-flash-lite", + rpm=10, + tpm=1_000_000, + )} + ), + ) + session = Session.get_or_create(session_config) + + # Create a dummy PDF file for testing + dummy_pdf = os.path.join(temp_dir_just_one_file, "dummy.pdf") + _save_pdf_file(Path(dummy_pdf), page_count=10, text_content=["Test content"]) + + try: + df = session.create_dataframe({"pdf_path": [dummy_pdf]}) + + # Test 1: Negative page number (static) + with pytest.raises(ValidationError, match="Page numbers must be positive integers"): + df.select(semantic.parse_pdf(col("pdf_path"), pages=-1)) + + # Test 1b: Negative page number (column) + with pytest.raises(ExecutionError, match="Page numbers must be positive integers"): + df.select(semantic.parse_pdf(col("pdf_path"), pages=lit(-1))).collect() + + # Test 2: Zero page number (static) + with pytest.raises(ValidationError, match="Page numbers must be positive integers"): + df.select(semantic.parse_pdf(col("pdf_path"), pages=0)) + + # Test 2b: Zero page number (column) + with pytest.raises(ExecutionError, match="Page numbers must be positive integers"): + df.select(semantic.parse_pdf(col("pdf_path"), pages=lit(0))).collect() + + # Test 3: Invalid range - end < start (static) + with pytest.raises(ValidationError, match="end page must be >= start page"): + df.select(semantic.parse_pdf(col("pdf_path"), pages=[[5, 3]])) + + # Test 3b: Invalid range - end < start (column)[ + df_with_pages = session.create_dataframe({"pages":[[[5, 3]]], "pdf_path": [dummy_pdf]}) + with pytest.raises(ExecutionError, match="end page must be >= start page"): + df_with_pages.select(semantic.parse_pdf(col("pdf_path"), pages=col("pages"))).collect() + + # Test 4: Invalid range - single element (static) + with pytest.raises(ValidationError, match="Page ranges must be pairs of two numbers"): + df.select(semantic.parse_pdf(col("pdf_path"), pages=[[5]])) + + # Test 4b: Invalid range - single element (column) + df_with_pages = session.create_dataframe({"pages":[[[5]]], "pdf_path": [dummy_pdf]}) + with pytest.raises(ExecutionError, match="Page ranges must be pairs of two numbers"): + df_with_pages.select(semantic.parse_pdf(col("pdf_path"), pages=col("pages"))).collect() + + # Test 5: Invalid range - three elements (static) + with pytest.raises(ValidationError, match="Page ranges must be pairs of two numbers"): + df.select(semantic.parse_pdf(col("pdf_path"), pages=[[1, 2, 3]])) + + # Test 5b: Invalid range - three elements (column) + df_with_pages = session.create_dataframe({"pages":[[[1, 2, 3]]], "pdf_path": [dummy_pdf]}) + with pytest.raises(ExecutionError, match="Page ranges must be pairs of two numbers"): + df_with_pages.select(semantic.parse_pdf(col("pdf_path"), pages=col("pages"))).collect() + + # Test 6: Negative page in range (static) + with pytest.raises(ValidationError, match="Page numbers must be positive integers"): + df.select(semantic.parse_pdf(col("pdf_path"), pages=[[-1, 3]])) + + # Test 6b: Negative page in range (column) + df_with_pages = session.create_dataframe({"pages":[[[-1, 3]]], "pdf_path": [dummy_pdf]}) + with pytest.raises(ExecutionError, match="Page numbers must be positive integers"): + df_with_pages.select(semantic.parse_pdf(col("pdf_path"), pages=col("pages"))).collect() + + # Test 7: Empty page range (static) + with pytest.raises(PydanticValidationError): + df.select(semantic.parse_pdf(col("pdf_path"), pages=[[[1,2], []]])) + with pytest.raises(PydanticValidationError): + df.select(semantic.parse_pdf(col("pdf_path"), pages=[[[1,2], None]])) + + # Test 7b: Empty page range (column) + df_with_pages = session.create_dataframe({"pages":[[[1,2], []]], "pdf_path": [dummy_pdf]}) + with pytest.raises(ExecutionError, match="Page ranges must be pairs of two numbers"): + df_with_pages.select(semantic.parse_pdf(col("pdf_path"), pages=col("pages"))).collect() + df_with_pages = session.create_dataframe({"pages":[[[1,2], None]], "pdf_path": [dummy_pdf]}) + with pytest.raises(ExecutionError, match="Invalid pages element type: NoneType. Expected int or list of two ints"): + df_with_pages.select(semantic.parse_pdf(col("pdf_path"), pages=col("pages"))).collect() + + # Test 8: Invalid pages type (static) + with pytest.raises(PydanticValidationError): + df.select(semantic.parse_pdf(col("pdf_path"), pages="test")) + + # Test 8b: Invalid pages type (column) + df_with_pages = session.create_dataframe({"pages":["test"], "pdf_path": [dummy_pdf]}) + with pytest.raises(ExecutionError, match="Invalid pages type: str. Expected int, list, or Column"): + df_with_pages.select(semantic.parse_pdf(col("pdf_path"), pages=col("pages"))).collect() + + # Test 9: Invalid pages element type (static) + with pytest.raises(PydanticValidationError): + df.select(semantic.parse_pdf(col("pdf_path"), pages=[[1,2], [3,"test"]])) + + finally: + session.stop() + + +def test_semantic_parse_pdf_with_static_page_ranges(temp_dir_just_one_file): + """Test PDF parsing with static page ranges (mix of int and list).""" + # Just test with one model (not with google api) + test_model_class, test_model_name, _ = vlms_to_test[0] + + # Create a PDF with 10 pages, each with different content + page_contents = [f"Page {i+1} content: This is unique text for page {i+1}." for i in range(10)] + pdf_path = os.path.join(temp_dir_just_one_file, "test_10_pages.pdf") + _save_pdf_file(Path(pdf_path), + title="Test PDF", + author="Test Author", + page_count=10, + text_content=page_contents) + + local_session = _setup_session_with_vlm(test_model_class=test_model_class, model_name=test_model_name) + + try: + df = local_session.create_dataframe({"pdf_path": [pdf_path]}) + + # Parse only pages 1, 3-5, 7 (1-indexed) + # Should get content from pages 1, 3, 4, 5, 7 + result = df.select( + semantic.parse_pdf(col("pdf_path"), pages=[1, [3, 5], 7]).alias("markdown_content") + ).collect() + + markdown = result.data["markdown_content"][0] + assert markdown is not None and markdown != "" + + # Check that we got the right pages + assert "Page 1 content" in markdown + assert "Page 3 content" in markdown + assert "Page 4 content" in markdown + assert "Page 5 content" in markdown + assert "Page 7 content" in markdown + + # Check that we didn't get other pages + assert "Page 2 content" not in markdown + assert "Page 6 content" not in markdown + assert "Page 8 content" not in markdown + assert "Page 9 content" not in markdown + assert "Page 10 content" not in markdown + finally: + local_session.stop() + + +def test_semantic_parse_pdf_with_column_page_lists(temp_dir_just_one_file): + """Test PDF parsing with column page ranges (mix of int and list).""" + # Just test with one model (not with google api) + test_model_class, test_model_name, _ = vlms_to_test[0] + + # Create two PDFs with different page counts + page_contents_1 = [f"PDF1 Page {i+1}: Unique content for first PDF page {i+1}." for i in range(8)] + page_contents_2 = [f"PDF2 Page {i+1}: Unique content for second PDF page {i+1}." for i in range(8)] + page_contents_3 = [f"PDF3 Page {i+1}: Unique content for third PDF page {i+1}." for i in range(3)] + + pdf_path_1 = os.path.join(temp_dir_just_one_file, "test_pdf_1.pdf") + pdf_path_2 = os.path.join(temp_dir_just_one_file, "test_pdf_2.pdf") + pdf_path_3 = os.path.join(temp_dir_just_one_file, "test_pdf_3.pdf") + + _save_pdf_file(Path(pdf_path_1), + title="Test PDF 1", + author="Test Author", + page_count=10, + text_content=page_contents_1) + + _save_pdf_file(Path(pdf_path_2), + title="Test PDF 2", + author="Test Author", + page_count=8, + text_content=page_contents_2) + + _save_pdf_file(Path(pdf_path_3), + title="Test PDF 3", + author="Test Author", + page_count=5, + text_content=page_contents_3) + local_session = _setup_session_with_vlm(test_model_class=test_model_class, model_name=test_model_name) + + try: + # Row 1: Parse pages 2, 5-7 from PDF 1 + # Row 2: Parse page 3 from PDF 2 + df = local_session.create_dataframe({ + "pdf_path": [pdf_path_1, pdf_path_2, pdf_path_3, pdf_path_3], + "pages": [[2, 5, 7], [3], [], None] # Column with different page specs per row + }) + + result = df.select( + semantic.parse_pdf(col("pdf_path"), pages=col("pages")).alias("markdown_content") + ).collect() + + # Check first row (PDF 1, pages 2, 5-7) + markdown_1 = result.data["markdown_content"][0] + assert markdown_1 is not None and markdown_1 != "" + assert "PDF1 Page 2:" in markdown_1 + assert "PDF1 Page 5:" in markdown_1 + assert "PDF1 Page 7:" in markdown_1 + # Should not contain other pages + assert "PDF1 Page 1:" not in markdown_1 + assert "PDF1 Page 3:" not in markdown_1 + assert "PDF1 Page 4:" not in markdown_1 + assert "PDF1 Page 6:" not in markdown_1 + assert "PDF1 Page 8:" not in markdown_1 + + # Check second row (PDF 2, page 3) + markdown_2 = result.data["markdown_content"][1] + assert markdown_2 is not None and markdown_2 != "" + assert "PDF2 Page 3:" in markdown_2 + # Should not contain other pages + assert "PDF2 Page 1:" not in markdown_2 + assert "PDF2 Page 2:" not in markdown_2 + assert "PDF2 Page 4:" not in markdown_2 + assert "PDF2 Page 5:" not in markdown_2 + assert "PDF2 Page 6:" not in markdown_2 + assert "PDF2 Page 7:" not in markdown_2 + assert "PDF2 Page 8:" not in markdown_2 + + # Check third row (empty list) should get no pages + markdown_3 = result.data["markdown_content"][2] + assert markdown_3 == "" + + # Check fourth row (None), should get all pages + markdown_4 = result.data["markdown_content"][3] + assert markdown_4 is not None and markdown_4 != "" + assert "PDF3 Page 1:" in markdown_4 + assert "PDF3 Page 2:" in markdown_4 + assert "PDF3 Page 3:" in markdown_4 + finally: + local_session.stop() + +def test_semantic_parse_pdf_with_column_page_ranges(temp_dir_just_one_file): + """Test PDF parsing with column page ranges (mix of int and list).""" + # Just test with one model (not with google api) + test_model_class, test_model_name, _ = vlms_to_test[0] + + # Create two PDFs with different page counts + page_contents_1 = [f"PDF1 Page {i+1}: Unique content for first PDF page {i+1}." for i in range(8)] + page_contents_2 = [f"PDF2 Page {i+1}: Unique content for second PDF page {i+1}." for i in range(8)] + page_contents_3 = [f"PDF3 Page {i+1}: Unique content for third PDF page {i+1}." for i in range(3)] + + pdf_path_1 = os.path.join(temp_dir_just_one_file, "test_pdf_1.pdf") + pdf_path_2 = os.path.join(temp_dir_just_one_file, "test_pdf_2.pdf") + pdf_path_3 = os.path.join(temp_dir_just_one_file, "test_pdf_3.pdf") + + _save_pdf_file(Path(pdf_path_1), + title="Test PDF 1", + author="Test Author", + page_count=10, + text_content=page_contents_1) + + _save_pdf_file(Path(pdf_path_2), + title="Test PDF 2", + author="Test Author", + page_count=8, + text_content=page_contents_2) + + _save_pdf_file(Path(pdf_path_3), + title="Test PDF 3", + author="Test Author", + page_count=5, + text_content=page_contents_3) + local_session = _setup_session_with_vlm(test_model_class=test_model_class, model_name=test_model_name) + + try: + # Row 1: Parse pages 2, 5-7 from PDF 1 + # Row 2: Parse page 3 from PDF 2 + df = local_session.create_dataframe({ + "pdf_path": [pdf_path_1, pdf_path_2, pdf_path_3, pdf_path_3], + "pages": [[[2,2], [5, 7]], [[3,3]], [], None] # Column with different page specs per row + }) + + result = df.select( + semantic.parse_pdf(col("pdf_path"), pages=col("pages")).alias("markdown_content") + ).collect() + + # Check first row (PDF 1, pages 2, 5-7) + markdown_1 = result.data["markdown_content"][0] + assert markdown_1 is not None and markdown_1 != "" + assert "PDF1 Page 2:" in markdown_1 + assert "PDF1 Page 5:" in markdown_1 + assert "PDF1 Page 6:" in markdown_1 + assert "PDF1 Page 7:" in markdown_1 + # Should not contain other pages + assert "PDF1 Page 1:" not in markdown_1 + assert "PDF1 Page 3:" not in markdown_1 + assert "PDF1 Page 4:" not in markdown_1 + assert "PDF1 Page 8:" not in markdown_1 + + # Check second row (PDF 2, page 3) + markdown_2 = result.data["markdown_content"][1] + assert markdown_2 is not None and markdown_2 != "" + assert "PDF2 Page 3:" in markdown_2 + # Should not contain other pages + assert "PDF2 Page 1:" not in markdown_2 + assert "PDF2 Page 2:" not in markdown_2 + assert "PDF2 Page 4:" not in markdown_2 + assert "PDF2 Page 5:" not in markdown_2 + assert "PDF2 Page 6:" not in markdown_2 + assert "PDF2 Page 7:" not in markdown_2 + assert "PDF2 Page 8:" not in markdown_2 + + # Check third row (empty list) should get no pages + markdown_3 = result.data["markdown_content"][2] + assert markdown_3 == "" + + # Check fourth row (None), should get all pages + markdown_4 = result.data["markdown_content"][3] + assert markdown_4 is not None and markdown_4 != "" + assert "PDF3 Page 1:" in markdown_4 + assert "PDF3 Page 2:" in markdown_4 + assert "PDF3 Page 3:" in markdown_4 + + finally: + local_session.stop() + def _make_test_pdf_paths(text_content: list[str], temp_dir: str, pdf_count: int, diff --git a/tests/_backends/local/semantic_operators/test_parse_pdf.py b/tests/_backends/local/semantic_operators/test_parse_pdf.py index 872b0760..fe34e195 100644 --- a/tests/_backends/local/semantic_operators/test_parse_pdf.py +++ b/tests/_backends/local/semantic_operators/test_parse_pdf.py @@ -523,7 +523,6 @@ def test_pdf_chunking_based_on_internal_limit(self, temp_dir_just_one_file, mock check_chunk_content_and_order(result3[2], chunks=1, chunk_max_size=test_chunk_max_size) check_chunk_content_and_order(result3[3], chunks=2, chunk_max_size=test_chunk_max_size) - def test_pdf_chunking_with_page_separator(self, temp_dir_just_one_file, mock_language_model, monkeypatch): # create a pdfs with varying page counts. Mock max_output_tokens to be something larger than the total number of tokens in the pdfs. page_counts = [1, 5, 10, 20] @@ -572,4 +571,122 @@ def test_pdf_chunking_with_page_separator(self, temp_dir_just_one_file, mock_lan check_chunk_page_separators(result3[0], pages=page_counts[0], chunk_max_size=test_chunk_max_size) check_chunk_page_separators(result3[1], pages=page_counts[1], chunk_max_size=test_chunk_max_size) check_chunk_page_separators(result3[2], pages=page_counts[2], chunk_max_size=test_chunk_max_size) - check_chunk_page_separators(result3[3], pages=page_counts[3], chunk_max_size=test_chunk_max_size) \ No newline at end of file + check_chunk_page_separators(result3[3], pages=page_counts[3], chunk_max_size=test_chunk_max_size) + + + + def test_pdf_chunking_with_page_ranges(self, temp_dir_just_one_file, mock_language_model, monkeypatch): + """Test PDF parsing with page ranges specified.""" + # Create two PDFs: one with 15 pages, one with 5 pages + file_15_pages = os.path.join(temp_dir_just_one_file, "file_15_pages.pdf") + file_5_pages = os.path.join(temp_dir_just_one_file, "file_5_pages.pdf") + _save_pdf_file(file_15_pages, page_count=15, text_content="dummy text") + _save_pdf_file(file_5_pages, page_count=5, text_content="dummy text") + + mock_language_model.max_output_tokens = 100_000 + mock_language_model.count_tokens.return_value = 50 + mock_language_model.get_completions.side_effect = mock_get_completions + + test_chunk_max_size = 3 + monkeypatch.setattr("fenic._backends.local.semantic_operators.parse_pdf.PDF_MAX_PAGES_CHUNK", test_chunk_max_size) + + # Test 1: Single page number (page 5, 1-indexed) + input1 = pl.Series("input", [file_15_pages, file_5_pages]) + pages1 = pl.Series("pages", [5, 5]) # Request page 5 for both PDFs + + parse_pdf1 = ParsePDF( + input=input1, + model=mock_language_model, + pages=pages1, + ) + + result1 = parse_pdf1.execute() + assert result1.shape == (2,) + # Each PDF request should have exactly one chunk + _test_chunk_count(result1[0], expected_chunks=1) + _test_chunk_count(result1[1], expected_chunks=1) + # First PDF (15 pages) should have page 5 (0-indexed as page 4) + assert "start_page:'4'" in result1[0] + # Second PDF (5 pages) should have page 5 (0-indexed as page 4) + assert "start_page:'4'" in result1[1] + + # Test 2: Overlapping ranges and single ints + # Request pages: 1, [3, 5], 4, 7 -> should coalesce to [1], [3-5], [7] -> 0-indexed: [0], [2-4], [6] + input2 = pl.Series("input", [file_15_pages, file_5_pages]) + pages2 = pl.Series("pages", [[1, [3, 5], 4, 7], [1, [3, 5], 4, 7]], dtype=pl.Object) + + parse_pdf2 = ParsePDF( + input=input2, + model=mock_language_model, + pages=pages2, + ) + + result2 = parse_pdf2.execute() + assert result2.shape == (2,) + + # First PDF (15 pages): should process pages 0, 2-4, 6 (0-indexed) + # With chunk_max_size=3, we should get: [0], [2,3,4], [6] + # So we expect start_pages: 0, 2, 6 + assert "start_page:'0'" in result2[0] + assert "start_page:'2'" in result2[0] + assert "start_page:'6'" in result2[0] + + # Second PDF (5 pages): pages 0, 2-4 are valid, page 6 is out of range + # Should get: [0], [2,3,4] + assert "start_page:'0'" in result2[1] + assert "start_page:'2'" in result2[1] + _test_chunk_count(result2[1], expected_chunks=2) + + # Test 3: static overlapping ranges and single ints + # Request pages: 1, [3, 5], 4, 7 -> should coalesce to [1], [3-5], [7] -> 0-indexed: [0], [2-4], [6] + input2 = pl.Series("input", [file_15_pages, file_5_pages]) + pages2 = [1, [3, 5], 4, 7] + + parse_pdf2 = ParsePDF( + input=input2, + model=mock_language_model, + pages=pages2, + ) + + result2 = parse_pdf2.execute() + assert result2.shape == (2,) + + # First PDF (15 pages): should process pages 0, 2-4, 6 (0-indexed) + # With chunk_max_size=3, we should get: [0], [2,3,4], [6] + # So we expect start_pages: 0, 2, 6 + assert "start_page:'0'" in result2[0] + assert "start_page:'2'" in result2[0] + assert "start_page:'6'" in result2[0] + + # Second PDF (5 pages): pages 0, 2-4 are valid, page 6 is out of range + # Should get: [0], [2,3,4] + assert "start_page:'0'" in result2[1] + assert "start_page:'2'" in result2[1] + _test_chunk_count(result2[1], expected_chunks=2) + + # Test 4: Column with mixed single int and list of ranges + input3 = pl.Series("input", [file_15_pages, file_5_pages]) + # Row 1: single int (page 3), Row 2: list with ranges [1, [2, 4]] + pages3 = pl.Series("pages", [3, [1, [2, 4]]], dtype=pl.Object) + + parse_pdf3 = ParsePDF( + input=input3, + model=mock_language_model, + pages=pages3, + ) + + result3 = parse_pdf3.execute() + assert result3.shape == (2,) + + # First PDF: page 3 (0-indexed as 2) + assert "start_page:'2'" in result3[0] + _test_chunk_count(result3[0], expected_chunks=1) + + # Second PDF: pages 1, [2, 4] -> 0-indexed [0], [1-3], coalesced to [0-3] + # With chunk_max_size=3, should get: [0,1,2], [3] + assert "start_page:'0'" in result3[1] + assert "start_page:'3'" in result3[1] + _test_chunk_count(result3[1], expected_chunks=2) + +def _test_chunk_count(response_string: str, expected_chunks: int) -> None: + assert response_string.count("start_page:'") == expected_chunks \ No newline at end of file