diff --git a/application_sdk/activities/metadata_extraction/sql.py b/application_sdk/activities/metadata_extraction/sql.py index dda62d193..9ab39092f 100644 --- a/application_sdk/activities/metadata_extraction/sql.py +++ b/application_sdk/activities/metadata_extraction/sql.py @@ -850,7 +850,7 @@ async def transform_data( dataframe=dataframe, **workflow_args ) await transformed_output.write_daft_dataframe(transform_metadata) - return await transformed_output.get_statistics() + return await transformed_output.get_statistics(typename=typename) @activity.defn @auto_heartbeater diff --git a/application_sdk/decorators/__init__.py b/application_sdk/decorators/__init__.py index e69de29bb..8b1378917 100644 --- a/application_sdk/decorators/__init__.py +++ b/application_sdk/decorators/__init__.py @@ -0,0 +1 @@ + diff --git a/application_sdk/outputs/__init__.py b/application_sdk/outputs/__init__.py index 4db007c60..1af98f918 100644 --- a/application_sdk/outputs/__init__.py +++ b/application_sdk/outputs/__init__.py @@ -25,7 +25,7 @@ from temporalio import activity from application_sdk.activities.common.models import ActivityStatistics -from application_sdk.activities.common.utils import get_object_store_prefix +from application_sdk.activities.common.utils import get_object_store_prefix, build_output_path from application_sdk.common.dataframe_utils import is_empty_dataframe from application_sdk.observability.logger_adaptor import get_logger from application_sdk.observability.metrics_adaptor import MetricType @@ -34,6 +34,7 @@ logger = get_logger(__name__) activity.logger = logger + if TYPE_CHECKING: import daft # type: ignore import pandas as pd @@ -71,6 +72,19 @@ class Output(ABC): current_buffer_size_bytes: int partitions: List[int] + def _infer_phase_from_path(self) -> Optional[str]: + """Infer phase from output path by checking for raw/transformed directories. + + Returns: + Optional[str]: "Extract" for raw, "Transform" for transformed, else None. + """ + path_parts = str(self.output_path).split("/") + if "raw" in path_parts: + return "Extract" + if "transformed" in path_parts: + return "Transform" + return None + def estimate_dataframe_record_size(self, dataframe: "pd.DataFrame") -> int: """Estimate File size of a DataFrame by sampling a few records.""" if len(dataframe) == 0: @@ -330,7 +344,7 @@ async def get_statistics( Exception: If there's an error writing the statistics """ try: - statistics = await self.write_statistics() + statistics = await self.write_statistics(typename) if not statistics: raise ValueError("No statistics data available") statistics = ActivityStatistics.model_validate(statistics) @@ -390,7 +404,7 @@ async def _flush_buffer(self, chunk: "pd.DataFrame", chunk_part: int): logger.error(f"Error flushing buffer to files: {str(e)}") raise e - async def write_statistics(self) -> Optional[Dict[str, Any]]: + async def write_statistics(self, typename: Optional[str] = None) -> Optional[Dict[str, Any]]: """Write statistics about the output to a JSON file. This method writes statistics including total record count and chunk count @@ -407,10 +421,26 @@ async def write_statistics(self) -> Optional[Dict[str, Any]]: "partitions": self.partitions, } - # Write the statistics to a json file - output_file_name = f"{self.output_path}/statistics.json.ignore" - with open(output_file_name, "w") as f: - f.write(orjson.dumps(statistics).decode("utf-8")) + # Ensure typename is included in the statistics payload (if provided) + if typename: + statistics["typename"] = typename + + # Write the statistics to a json file inside a dedicated statistics/ folder + statistics_dir = os.path.join(self.output_path, "statistics") + os.makedirs(statistics_dir, exist_ok=True) + output_file_name = f"{statistics_dir}/statistics.json.ignore" + # If chunk_start is provided, include it in the statistics filename + try: + cs = getattr(self, "chunk_start", None) + if cs is not None: + output_file_name = f"{statistics_dir}/statistics-chunk-{cs}.json.ignore" + except Exception: + # If accessing chunk_start fails, fallback to default filename + pass + + # Write the statistics dictionary to the JSON file + with open(output_file_name, "wb") as f: + f.write(orjson.dumps(statistics)) destination_file_path = get_object_store_prefix(output_file_name) # Push the file to the object store @@ -418,6 +448,9 @@ async def write_statistics(self) -> Optional[Dict[str, Any]]: source=output_file_name, destination=destination_file_path, ) + return statistics except Exception as e: logger.error(f"Error writing statistics: {str(e)}") + raise + diff --git a/tests/unit/outputs/test_output.py b/tests/unit/outputs/test_output.py index 23468e75a..83669f17c 100644 --- a/tests/unit/outputs/test_output.py +++ b/tests/unit/outputs/test_output.py @@ -1,6 +1,7 @@ """Unit tests for output interface.""" from typing import Any +import json from unittest.mock import AsyncMock, mock_open, patch import pandas as pd @@ -161,3 +162,4 @@ async def test_write_statistics_error(self): assert result is None mock_logger.assert_called_once() assert "Error writing statistics" in mock_logger.call_args[0][0] +