From 948f9d98390e07ab76005b5ace8551a0ed5d6500 Mon Sep 17 00:00:00 2001 From: Prasanna Sairam Date: Tue, 9 Sep 2025 21:49:23 +0530 Subject: [PATCH 01/12] Accumulate Metrics for Workflow --- .../activities/metadata_extraction/sql.py | 5 +- application_sdk/outputs/__init__.py | 80 ++++++++++++++++++- application_sdk/outputs/iceberg.py | 4 +- application_sdk/outputs/json.py | 4 +- application_sdk/outputs/parquet.py | 4 +- application_sdk/server/fastapi/__init__.py | 1 + application_sdk/services/objectstore.py | 2 +- tests/unit/outputs/test_iceberg.py | 3 + tests/unit/outputs/test_json_output.py | 6 +- tests/unit/outputs/test_parquet_output.py | 31 ++++--- 10 files changed, 120 insertions(+), 20 deletions(-) diff --git a/application_sdk/activities/metadata_extraction/sql.py b/application_sdk/activities/metadata_extraction/sql.py index db2faa43c..5667ca2dc 100644 --- a/application_sdk/activities/metadata_extraction/sql.py +++ b/application_sdk/activities/metadata_extraction/sql.py @@ -21,6 +21,7 @@ from application_sdk.observability.logger_adaptor import get_logger from application_sdk.outputs.json import JsonOutput from application_sdk.outputs.parquet import ParquetOutput +from application_sdk.outputs import WorkflowPhase from application_sdk.services.atlan_storage import AtlanStorage from application_sdk.services.secretstore import SecretStore from application_sdk.transformers import TransformerInterface @@ -260,6 +261,7 @@ async def query_executor( ) parquet_output = ParquetOutput( + phase=WorkflowPhase.EXTRACT, output_prefix=output_prefix, output_path=output_path, output_suffix=output_suffix, @@ -491,8 +493,9 @@ async def transform_data( ) raw_input = raw_input.get_batched_daft_dataframe() transformed_output = JsonOutput( - output_path=output_path, output_suffix="transformed", + phase=WorkflowPhase.TRANSFORM, + output_path=output_path, output_prefix=output_prefix, typename=typename, chunk_start=workflow_args.get("chunk_start"), diff --git a/application_sdk/outputs/__init__.py b/application_sdk/outputs/__init__.py index 7f7535c95..21ef1c994 100644 --- a/application_sdk/outputs/__init__.py +++ b/application_sdk/outputs/__init__.py @@ -6,6 +6,7 @@ import inspect from abc import ABC, abstractmethod +from enum import Enum from typing import ( TYPE_CHECKING, Any, @@ -23,19 +24,29 @@ from temporalio import activity from application_sdk.activities.common.models import ActivityStatistics -from application_sdk.activities.common.utils import get_object_store_prefix +from application_sdk.activities.common.utils import get_object_store_prefix, build_output_path from application_sdk.common.dataframe_utils import is_empty_dataframe from application_sdk.observability.logger_adaptor import get_logger from application_sdk.services.objectstore import ObjectStore +from application_sdk.constants import TEMPORARY_PATH logger = get_logger(__name__) activity.logger = logger + if TYPE_CHECKING: import daft # type: ignore import pandas as pd +class WorkflowPhase(Enum): + """Enumeration of workflow phases for data processing.""" + + EXTRACT = "Extract" + TRANSFORM = "Transform" + PUBLISH = "Publish" + + class Output(ABC): """Abstract base class for output handlers. @@ -51,6 +62,7 @@ class Output(ABC): output_path: str output_prefix: str + phase: WorkflowPhase total_record_count: int chunk_count: int statistics: List[int] = [] @@ -214,7 +226,7 @@ async def get_statistics( Exception: If there's an error writing the statistics """ try: - statistics = await self.write_statistics() + statistics = await self.write_statistics(typename) if not statistics: raise ValueError("No statistics data available") statistics = ActivityStatistics.model_validate(statistics) @@ -225,7 +237,7 @@ async def get_statistics( logger.error(f"Error getting statistics: {str(e)}") raise - async def write_statistics(self) -> Optional[Dict[str, Any]]: + async def write_statistics(self, typename: Optional[str] = None) -> Optional[Dict[str, Any]]: """Write statistics about the output to a JSON file. This method writes statistics including total record count and chunk count @@ -253,6 +265,68 @@ async def write_statistics(self) -> Optional[Dict[str, Any]]: source=output_file_name, destination=destination_file_path, ) + + if typename: + statistics["typename"] = typename + # Update aggregated statistics at run root in object store + try: + await self._update_run_aggregate(destination_file_path, statistics) + except Exception as e: + logger.warning(f"Failed to update aggregated statistics: {str(e)}") return statistics except Exception as e: logger.error(f"Error writing statistics: {str(e)}") + + async def _update_run_aggregate( + self, per_path_destination: str, statistics: Dict[str, Any] + ) -> None: + """Aggregate stats into a single file at the workflow run root. + + Args: + per_path_destination: Object store destination path for this stats file + (used as key in the aggregate map) + statistics: The statistics dictionary to store + """ + # Build the workflow run root path directly using utility functions (no path manipulation!) + # build_output_path() returns: "artifacts/apps/{app}/workflows/{workflow_id}/{run_id}" + # We need the local path: "./local/tmp/artifacts/apps/{app}/workflows/{workflow_id}/{run_id}" + workflow_run_root_relative = build_output_path() + output_file_name = f"{TEMPORARY_PATH}{workflow_run_root_relative}/statistics.json.ignore" + destination_file_path = get_object_store_prefix(output_file_name) + + + # Load existing aggregate from object store if present + # New structure: {"Extract": [...], "Transform": [...], "Publish": [...]} + aggregate_by_phase: Dict[str, List[Dict[str, Any]]] = { + "Extract": [], + "Transform": [], + "Publish": [] + } + + try: + # Download existing aggregate file if present + await ObjectStore.download_file( + source=destination_file_path, + destination=output_file_name, + ) + # Load existing JSON structure + with open(output_file_name, "r") as f: + existing_aggregate = orjson.loads(f.read()) + # Phase-based structure + aggregate_by_phase.update(existing_aggregate) + except Exception: + logger.info( + "No existing aggregate found or failed to read. Initializing a new aggregate structure." + ) + + # Add this entry to the appropriate phase + aggregate_by_phase[phase].append(statistics) + + with open(output_file_name, "w") as f: + f.write(orjson.dumps(aggregate_by_phase).decode("utf-8")) + + # Upload aggregate to object store + await ObjectStore.upload_file( + source=output_file_name, + destination=destination_file_path, + ) diff --git a/application_sdk/outputs/iceberg.py b/application_sdk/outputs/iceberg.py index c253dbb34..2e8e2a066 100644 --- a/application_sdk/outputs/iceberg.py +++ b/application_sdk/outputs/iceberg.py @@ -6,7 +6,7 @@ from application_sdk.observability.logger_adaptor import get_logger from application_sdk.observability.metrics_adaptor import MetricType, get_metrics -from application_sdk.outputs import Output +from application_sdk.outputs import Output, WorkflowPhase logger = get_logger(__name__) activity.logger = logger @@ -26,6 +26,7 @@ def __init__( iceberg_catalog: Catalog, iceberg_namespace: str, iceberg_table: Union[str, Table], + phase: WorkflowPhase, mode: str = "append", total_record_count: int = 0, chunk_count: int = 0, @@ -42,6 +43,7 @@ def __init__( """ self.total_record_count = total_record_count self.chunk_count = chunk_count + self.phase = phase self.iceberg_catalog = iceberg_catalog self.iceberg_namespace = iceberg_namespace self.iceberg_table = iceberg_table diff --git a/application_sdk/outputs/json.py b/application_sdk/outputs/json.py index 561f4e3a8..4767bbc62 100644 --- a/application_sdk/outputs/json.py +++ b/application_sdk/outputs/json.py @@ -9,7 +9,7 @@ from application_sdk.constants import DAPR_MAX_GRPC_MESSAGE_LENGTH from application_sdk.observability.logger_adaptor import get_logger from application_sdk.observability.metrics_adaptor import MetricType, get_metrics -from application_sdk.outputs import Output +from application_sdk.outputs import Output, WorkflowPhase from application_sdk.services.objectstore import ObjectStore logger = get_logger(__name__) @@ -82,6 +82,7 @@ class JsonOutput(Output): def __init__( self, output_suffix: str, + phase: WorkflowPhase, output_path: Optional[str] = None, output_prefix: Optional[str] = None, typename: Optional[str] = None, @@ -117,6 +118,7 @@ def __init__( self.output_path = output_path self.output_suffix = output_suffix self.output_prefix = output_prefix + self.phase = phase self.typename = typename self.chunk_start = chunk_start self.total_record_count = total_record_count diff --git a/application_sdk/outputs/parquet.py b/application_sdk/outputs/parquet.py index fd75bf0f2..b5f119db0 100644 --- a/application_sdk/outputs/parquet.py +++ b/application_sdk/outputs/parquet.py @@ -7,7 +7,7 @@ from application_sdk.constants import DAPR_MAX_GRPC_MESSAGE_LENGTH from application_sdk.observability.logger_adaptor import get_logger from application_sdk.observability.metrics_adaptor import MetricType, get_metrics -from application_sdk.outputs import Output +from application_sdk.outputs import Output, WorkflowPhase from application_sdk.services.objectstore import ObjectStore logger = get_logger(__name__) @@ -41,6 +41,7 @@ class ParquetOutput(Output): def __init__( self, + phase: WorkflowPhase, output_path: str = "", output_suffix: str = "", output_prefix: str = "", @@ -75,6 +76,7 @@ def __init__( self.output_path = output_path self.output_suffix = output_suffix self.output_prefix = output_prefix + self.phase = phase self.typename = typename self.chunk_size = chunk_size self.buffer_size = buffer_size diff --git a/application_sdk/server/fastapi/__init__.py b/application_sdk/server/fastapi/__init__.py index 190a05fb6..dab1f88e4 100644 --- a/application_sdk/server/fastapi/__init__.py +++ b/application_sdk/server/fastapi/__init__.py @@ -755,3 +755,4 @@ async def start( ) ) await server.serve() + diff --git a/application_sdk/services/objectstore.py b/application_sdk/services/objectstore.py index 3dbcfaf2d..f4404ab1e 100644 --- a/application_sdk/services/objectstore.py +++ b/application_sdk/services/objectstore.py @@ -270,7 +270,7 @@ async def upload_prefix( ) await cls.upload_file(file_path, store_key, store_name) - logger.info(f"Completed uploading directory {source} to object store") + logger.info(f"Completed uploading directory ---- {source} to object store") except Exception as e: logger.error( f"An unexpected error occurred while uploading directory: {str(e)}" diff --git a/tests/unit/outputs/test_iceberg.py b/tests/unit/outputs/test_iceberg.py index 9c8c68721..a52e64504 100644 --- a/tests/unit/outputs/test_iceberg.py +++ b/tests/unit/outputs/test_iceberg.py @@ -5,6 +5,7 @@ from pyiceberg.catalog import Catalog from pyiceberg.table import Table +from application_sdk.outputs import WorkflowPhase from application_sdk.outputs.iceberg import IcebergOutput @@ -24,6 +25,7 @@ def iceberg_output(mock_catalog: Catalog) -> IcebergOutput: iceberg_catalog=mock_catalog, iceberg_namespace="test_namespace", iceberg_table="test_table", + phase=WorkflowPhase.EXTRACT, mode="append", ) @@ -34,6 +36,7 @@ def test_iceberg_output_initialization(mock_catalog: Catalog) -> None: iceberg_catalog=mock_catalog, iceberg_namespace="test_namespace", iceberg_table="test_table", + phase=WorkflowPhase.EXTRACT, mode="append", ) diff --git a/tests/unit/outputs/test_json_output.py b/tests/unit/outputs/test_json_output.py index 3efa5ddf0..85873e6ec 100644 --- a/tests/unit/outputs/test_json_output.py +++ b/tests/unit/outputs/test_json_output.py @@ -7,6 +7,7 @@ import pytest from hypothesis import HealthCheck, given, settings +from application_sdk.outputs import WorkflowPhase from application_sdk.outputs.json import JsonOutput from application_sdk.test_utils.hypothesis.strategies.outputs.json_output import ( chunk_size_strategy, @@ -32,8 +33,9 @@ async def test_init(base_output_path: str, config: Dict[str, Any]) -> None: # Create a safe output path by joining base_output_path with config's output_path safe_path = str(Path(base_output_path) / config["output_path"]) json_output = JsonOutput( # type: ignore - output_path=safe_path, output_suffix=config["output_suffix"], + phase=WorkflowPhase.EXTRACT, + output_path=safe_path, output_prefix=config["output_prefix"], chunk_size=config["chunk_size"], ) @@ -48,6 +50,7 @@ async def test_init(base_output_path: str, config: Dict[str, Any]) -> None: async def test_write_dataframe_empty(base_output_path: str) -> None: json_output = JsonOutput( # type: ignore output_suffix="tests/raw", + phase=WorkflowPhase.EXTRACT, output_path=base_output_path, output_prefix="test_prefix", chunk_size=100000, @@ -137,6 +140,7 @@ async def test_write_dataframe_multiple_chunks( async def test_write_dataframe_error(base_output_path: str) -> None: json_output = JsonOutput( # type: ignore output_suffix="tests/raw", + phase=WorkflowPhase.EXTRACT, output_path=base_output_path, output_prefix="test_prefix", chunk_size=100000, diff --git a/tests/unit/outputs/test_parquet_output.py b/tests/unit/outputs/test_parquet_output.py index 42e1fcc87..36228e14a 100644 --- a/tests/unit/outputs/test_parquet_output.py +++ b/tests/unit/outputs/test_parquet_output.py @@ -5,6 +5,7 @@ import pandas as pd import pytest +from application_sdk.outputs import WorkflowPhase from application_sdk.outputs.parquet import ParquetOutput @@ -45,7 +46,7 @@ class TestParquetOutputInit: def test_init_default_values(self, base_output_path: str): """Test ParquetOutput initialization with default values.""" - parquet_output = ParquetOutput(output_path=base_output_path) + parquet_output = ParquetOutput(phase=WorkflowPhase.EXTRACT, output_path=base_output_path) # The output path gets modified by adding suffix, so check it ends with the base path assert base_output_path in parquet_output.output_path @@ -64,6 +65,7 @@ def test_init_default_values(self, base_output_path: str): def test_init_custom_values(self, base_output_path: str): """Test ParquetOutput initialization with custom values.""" parquet_output = ParquetOutput( + phase=WorkflowPhase.EXTRACT, output_path=base_output_path, output_suffix="test_suffix", output_prefix="test_prefix", @@ -91,6 +93,7 @@ def test_init_custom_values(self, base_output_path: str): def test_init_creates_output_directory(self, base_output_path: str): """Test that initialization creates the output directory.""" parquet_output = ParquetOutput( + phase=WorkflowPhase.EXTRACT, output_path=base_output_path, output_suffix="test_dir", typename="test_table", @@ -106,7 +109,7 @@ class TestParquetOutputPathGen: def test_path_gen_with_markers(self, base_output_path: str): """Test path generation with start and end markers.""" - parquet_output = ParquetOutput(output_path=base_output_path) + parquet_output = ParquetOutput(phase=WorkflowPhase.EXTRACT, output_path=base_output_path) path = parquet_output.path_gen(start_marker="start_123", end_marker="end_456") @@ -114,7 +117,7 @@ def test_path_gen_with_markers(self, base_output_path: str): def test_path_gen_without_chunk_start(self, base_output_path: str): """Test path generation without chunk start.""" - parquet_output = ParquetOutput(output_path=base_output_path) + parquet_output = ParquetOutput(phase=WorkflowPhase.EXTRACT, output_path=base_output_path) path = parquet_output.path_gen(chunk_count=5) @@ -122,7 +125,7 @@ def test_path_gen_without_chunk_start(self, base_output_path: str): def test_path_gen_with_chunk_start(self, base_output_path: str): """Test path generation with chunk start.""" - parquet_output = ParquetOutput(output_path=base_output_path) + parquet_output = ParquetOutput(phase=WorkflowPhase.EXTRACT, output_path=base_output_path) path = parquet_output.path_gen(chunk_start=10, chunk_count=3) @@ -135,7 +138,7 @@ class TestParquetOutputWriteDataframe: @pytest.mark.asyncio async def test_write_empty_dataframe(self, base_output_path: str): """Test writing an empty DataFrame.""" - parquet_output = ParquetOutput(output_path=base_output_path) + parquet_output = ParquetOutput(phase=WorkflowPhase.EXTRACT, output_path=base_output_path) empty_df = pd.DataFrame() await parquet_output.write_dataframe(empty_df) @@ -159,6 +162,7 @@ async def test_write_dataframe_success( mock_prefix.return_value = "test/output/path" parquet_output = ParquetOutput( + phase=WorkflowPhase.EXTRACT, output_path=base_output_path, output_suffix="test" ) @@ -188,6 +192,7 @@ async def test_write_dataframe_with_custom_path_gen( mock_prefix.return_value = "test/output/path" parquet_output = ParquetOutput( + phase=WorkflowPhase.EXTRACT, output_path=base_output_path, start_marker="test_start", end_marker="test_end", @@ -213,7 +218,7 @@ async def test_write_dataframe_error_handling( with patch("pandas.DataFrame.to_parquet") as mock_to_parquet: mock_to_parquet.side_effect = Exception("Test error") - parquet_output = ParquetOutput(output_path=base_output_path) + parquet_output = ParquetOutput(phase=WorkflowPhase.EXTRACT, output_path=base_output_path) with pytest.raises(Exception, match="Test error"): await parquet_output.write_dataframe(sample_dataframe) @@ -230,7 +235,7 @@ async def test_write_daft_dataframe_empty(self, base_output_path: str): mock_df.count_rows.return_value = 0 mock_daft.return_value = mock_df - parquet_output = ParquetOutput(output_path=base_output_path) + parquet_output = ParquetOutput(phase=WorkflowPhase.EXTRACT, output_path=base_output_path) await parquet_output.write_daft_dataframe(mock_df) @@ -256,6 +261,7 @@ async def test_write_daft_dataframe_success(self, base_output_path: str): mock_df.write_parquet = MagicMock() parquet_output = ParquetOutput( + phase=WorkflowPhase.EXTRACT, output_path=base_output_path, ) @@ -295,6 +301,7 @@ async def test_write_daft_dataframe_with_parameter_overrides( mock_df.write_parquet = MagicMock() parquet_output = ParquetOutput( + phase=WorkflowPhase.EXTRACT, output_path=base_output_path, ) @@ -331,6 +338,7 @@ async def test_write_daft_dataframe_with_default_parameters( mock_df.write_parquet = MagicMock() parquet_output = ParquetOutput( + phase=WorkflowPhase.EXTRACT, output_path=base_output_path, ) @@ -364,7 +372,7 @@ async def test_write_daft_dataframe_with_execution_configuration( mock_df.count_rows.return_value = 1000 mock_df.write_parquet = MagicMock() - parquet_output = ParquetOutput(output_path=base_output_path) + parquet_output = ParquetOutput(phase=WorkflowPhase.EXTRACT, output_path=base_output_path) await parquet_output.write_daft_dataframe(mock_df) @@ -384,7 +392,7 @@ async def test_write_daft_dataframe_error_handling(self, base_output_path: str): mock_df = MagicMock() mock_df.count_rows.side_effect = Exception("Count rows error") - parquet_output = ParquetOutput(output_path=base_output_path) + parquet_output = ParquetOutput(phase=WorkflowPhase.EXTRACT, output_path=base_output_path) with pytest.raises(Exception, match="Count rows error"): await parquet_output.write_daft_dataframe(mock_df) @@ -396,6 +404,7 @@ class TestParquetOutputUtilityMethods: def test_get_full_path(self, base_output_path: str): """Test get_full_path method.""" parquet_output = ParquetOutput( + phase=WorkflowPhase.EXTRACT, output_path=base_output_path, output_suffix="test_suffix", typename="test_table", @@ -425,7 +434,7 @@ async def test_pandas_write_metrics( mock_metrics = MagicMock() mock_get_metrics.return_value = mock_metrics - parquet_output = ParquetOutput(output_path=base_output_path) + parquet_output = ParquetOutput(phase=WorkflowPhase.EXTRACT, output_path=base_output_path) await parquet_output.write_dataframe(sample_dataframe) @@ -456,7 +465,7 @@ async def test_daft_write_metrics(self, base_output_path: str): mock_df.count_rows.return_value = 1000 mock_df.write_parquet = MagicMock() - parquet_output = ParquetOutput(output_path=base_output_path) + parquet_output = ParquetOutput(phase=WorkflowPhase.EXTRACT, output_path=base_output_path) await parquet_output.write_daft_dataframe(mock_df) From f20f3f59c3d8dbdacb8a8dc13aead2d6604585cc Mon Sep 17 00:00:00 2001 From: Prasanna Sairam Date: Wed, 10 Sep 2025 01:28:51 +0530 Subject: [PATCH 02/12] Added logs --- application_sdk/outputs/__init__.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/application_sdk/outputs/__init__.py b/application_sdk/outputs/__init__.py index 21ef1c994..232e16097 100644 --- a/application_sdk/outputs/__init__.py +++ b/application_sdk/outputs/__init__.py @@ -287,6 +287,7 @@ async def _update_run_aggregate( (used as key in the aggregate map) statistics: The statistics dictionary to store """ + logger.info(f"Starting _update_run_aggregate for phase: {self.phase} (value: {self.phase.value})") # Build the workflow run root path directly using utility functions (no path manipulation!) # build_output_path() returns: "artifacts/apps/{app}/workflows/{workflow_id}/{run_id}" # We need the local path: "./local/tmp/artifacts/apps/{app}/workflows/{workflow_id}/{run_id}" @@ -314,16 +315,19 @@ async def _update_run_aggregate( existing_aggregate = orjson.loads(f.read()) # Phase-based structure aggregate_by_phase.update(existing_aggregate) + logger.info(f"Successfully loaded existing aggregates") except Exception: logger.info( "No existing aggregate found or failed to read. Initializing a new aggregate structure." ) # Add this entry to the appropriate phase - aggregate_by_phase[phase].append(statistics) + logger.info(f"Adding statistics to phase '{self.phase.value}'") + aggregate_by_phase[self.phase.value].append(statistics) with open(output_file_name, "w") as f: f.write(orjson.dumps(aggregate_by_phase).decode("utf-8")) + logger.info(f"Successfully updated aggregate with entries for phase '{self.phase.value}'") # Upload aggregate to object store await ObjectStore.upload_file( From 4884ad560c148695e3fe075775b736f437b3db02 Mon Sep 17 00:00:00 2001 From: Prasanna Sairam Date: Tue, 4 Nov 2025 01:01:07 +0530 Subject: [PATCH 03/12] Simplified metrics --- .../activities/metadata_extraction/sql.py | 3 - application_sdk/outputs/__init__.py | 58 ++++++++++++------- application_sdk/outputs/iceberg.py | 4 +- application_sdk/outputs/json.py | 4 +- application_sdk/outputs/parquet.py | 4 +- docs/docs/concepts/outputs.md | 47 +++++++++++++++ tests/unit/outputs/test_iceberg.py | 3 - tests/unit/outputs/test_json_output.py | 4 -- tests/unit/outputs/test_parquet_output.py | 33 +++++------ 9 files changed, 101 insertions(+), 59 deletions(-) diff --git a/application_sdk/activities/metadata_extraction/sql.py b/application_sdk/activities/metadata_extraction/sql.py index 5667ca2dc..7985b3dd9 100644 --- a/application_sdk/activities/metadata_extraction/sql.py +++ b/application_sdk/activities/metadata_extraction/sql.py @@ -21,7 +21,6 @@ from application_sdk.observability.logger_adaptor import get_logger from application_sdk.outputs.json import JsonOutput from application_sdk.outputs.parquet import ParquetOutput -from application_sdk.outputs import WorkflowPhase from application_sdk.services.atlan_storage import AtlanStorage from application_sdk.services.secretstore import SecretStore from application_sdk.transformers import TransformerInterface @@ -261,7 +260,6 @@ async def query_executor( ) parquet_output = ParquetOutput( - phase=WorkflowPhase.EXTRACT, output_prefix=output_prefix, output_path=output_path, output_suffix=output_suffix, @@ -494,7 +492,6 @@ async def transform_data( raw_input = raw_input.get_batched_daft_dataframe() transformed_output = JsonOutput( output_suffix="transformed", - phase=WorkflowPhase.TRANSFORM, output_path=output_path, output_prefix=output_prefix, typename=typename, diff --git a/application_sdk/outputs/__init__.py b/application_sdk/outputs/__init__.py index 232e16097..3be0c8eed 100644 --- a/application_sdk/outputs/__init__.py +++ b/application_sdk/outputs/__init__.py @@ -39,14 +39,6 @@ import pandas as pd -class WorkflowPhase(Enum): - """Enumeration of workflow phases for data processing.""" - - EXTRACT = "Extract" - TRANSFORM = "Transform" - PUBLISH = "Publish" - - class Output(ABC): """Abstract base class for output handlers. @@ -62,11 +54,23 @@ class Output(ABC): output_path: str output_prefix: str - phase: WorkflowPhase total_record_count: int chunk_count: int statistics: List[int] = [] + def _infer_phase_from_path(self) -> Optional[str]: + """Infer phase from output path by checking for raw/transformed directories. + + Returns: + Optional[str]: "Extract" for raw, "Transform" for transformed, else None. + """ + path_parts = str(self.output_path).split("/") + if "raw" in path_parts: + return "Extract" + if "transformed" in path_parts: + return "Transform" + return None + def estimate_dataframe_file_size( self, dataframe: "pd.DataFrame", file_type: Literal["json", "parquet"] ) -> int: @@ -287,7 +291,12 @@ async def _update_run_aggregate( (used as key in the aggregate map) statistics: The statistics dictionary to store """ - logger.info(f"Starting _update_run_aggregate for phase: {self.phase} (value: {self.phase.value})") + inferred_phase = self._infer_phase_from_path() + if inferred_phase is None: + logger.info("Phase could not be inferred from path. Skipping aggregation.") + return + + logger.info(f"Starting _update_run_aggregate for phase: {inferred_phase}") # Build the workflow run root path directly using utility functions (no path manipulation!) # build_output_path() returns: "artifacts/apps/{app}/workflows/{workflow_id}/{run_id}" # We need the local path: "./local/tmp/artifacts/apps/{app}/workflows/{workflow_id}/{run_id}" @@ -297,13 +306,13 @@ async def _update_run_aggregate( # Load existing aggregate from object store if present - # New structure: {"Extract": [...], "Transform": [...], "Publish": [...]} - aggregate_by_phase: Dict[str, List[Dict[str, Any]]] = { - "Extract": [], - "Transform": [], - "Publish": [] + # Structure: {"Extract": {"typename": {"record_count": N}}, "Transform": {...}, "Publish": {...}} + aggregate_by_phase: Dict[str, Dict[str, Dict[str, Any]]] = { + "Extract": {}, + "Transform": {}, + "Publish": {} } - + try: # Download existing aggregate file if present await ObjectStore.download_file( @@ -321,13 +330,22 @@ async def _update_run_aggregate( "No existing aggregate found or failed to read. Initializing a new aggregate structure." ) - # Add this entry to the appropriate phase - logger.info(f"Adding statistics to phase '{self.phase.value}'") - aggregate_by_phase[self.phase.value].append(statistics) + # Accumulate statistics by typename within the phase + typename = statistics.get("typename", "unknown") + + if typename not in aggregate_by_phase[inferred_phase]: + aggregate_by_phase[inferred_phase][typename] = { + "record_count": 0 + } + + logger.info(f"Accumulating statistics for phase '{inferred_phase}', typename '{typename}': +{statistics['total_record_count']} records") + + # Accumulate the record count + aggregate_by_phase[inferred_phase][typename]["record_count"] += statistics["total_record_count"] with open(output_file_name, "w") as f: f.write(orjson.dumps(aggregate_by_phase).decode("utf-8")) - logger.info(f"Successfully updated aggregate with entries for phase '{self.phase.value}'") + logger.info(f"Successfully updated aggregate with accumulated stats for phase '{inferred_phase}'") # Upload aggregate to object store await ObjectStore.upload_file( diff --git a/application_sdk/outputs/iceberg.py b/application_sdk/outputs/iceberg.py index 2e8e2a066..c253dbb34 100644 --- a/application_sdk/outputs/iceberg.py +++ b/application_sdk/outputs/iceberg.py @@ -6,7 +6,7 @@ from application_sdk.observability.logger_adaptor import get_logger from application_sdk.observability.metrics_adaptor import MetricType, get_metrics -from application_sdk.outputs import Output, WorkflowPhase +from application_sdk.outputs import Output logger = get_logger(__name__) activity.logger = logger @@ -26,7 +26,6 @@ def __init__( iceberg_catalog: Catalog, iceberg_namespace: str, iceberg_table: Union[str, Table], - phase: WorkflowPhase, mode: str = "append", total_record_count: int = 0, chunk_count: int = 0, @@ -43,7 +42,6 @@ def __init__( """ self.total_record_count = total_record_count self.chunk_count = chunk_count - self.phase = phase self.iceberg_catalog = iceberg_catalog self.iceberg_namespace = iceberg_namespace self.iceberg_table = iceberg_table diff --git a/application_sdk/outputs/json.py b/application_sdk/outputs/json.py index 4767bbc62..561f4e3a8 100644 --- a/application_sdk/outputs/json.py +++ b/application_sdk/outputs/json.py @@ -9,7 +9,7 @@ from application_sdk.constants import DAPR_MAX_GRPC_MESSAGE_LENGTH from application_sdk.observability.logger_adaptor import get_logger from application_sdk.observability.metrics_adaptor import MetricType, get_metrics -from application_sdk.outputs import Output, WorkflowPhase +from application_sdk.outputs import Output from application_sdk.services.objectstore import ObjectStore logger = get_logger(__name__) @@ -82,7 +82,6 @@ class JsonOutput(Output): def __init__( self, output_suffix: str, - phase: WorkflowPhase, output_path: Optional[str] = None, output_prefix: Optional[str] = None, typename: Optional[str] = None, @@ -118,7 +117,6 @@ def __init__( self.output_path = output_path self.output_suffix = output_suffix self.output_prefix = output_prefix - self.phase = phase self.typename = typename self.chunk_start = chunk_start self.total_record_count = total_record_count diff --git a/application_sdk/outputs/parquet.py b/application_sdk/outputs/parquet.py index b5f119db0..fd75bf0f2 100644 --- a/application_sdk/outputs/parquet.py +++ b/application_sdk/outputs/parquet.py @@ -7,7 +7,7 @@ from application_sdk.constants import DAPR_MAX_GRPC_MESSAGE_LENGTH from application_sdk.observability.logger_adaptor import get_logger from application_sdk.observability.metrics_adaptor import MetricType, get_metrics -from application_sdk.outputs import Output, WorkflowPhase +from application_sdk.outputs import Output from application_sdk.services.objectstore import ObjectStore logger = get_logger(__name__) @@ -41,7 +41,6 @@ class ParquetOutput(Output): def __init__( self, - phase: WorkflowPhase, output_path: str = "", output_suffix: str = "", output_prefix: str = "", @@ -76,7 +75,6 @@ def __init__( self.output_path = output_path self.output_suffix = output_suffix self.output_prefix = output_prefix - self.phase = phase self.typename = typename self.chunk_size = chunk_size self.buffer_size = buffer_size diff --git a/docs/docs/concepts/outputs.md b/docs/docs/concepts/outputs.md index c7aa70e77..f76940e75 100644 --- a/docs/docs/concepts/outputs.md +++ b/docs/docs/concepts/outputs.md @@ -103,6 +103,53 @@ async def query_executor( * **`ParquetOutput`:** Similar to `JsonOutput` but writes DataFrames to Parquet format files. Uses `daft.DataFrame.write_parquet()` or `pandas.DataFrame.to_parquet()`. Also uploads files to object storage after local processing. * **`IcebergOutput`:** Writes DataFrames directly to an Iceberg table using `pyiceberg`. +## Phase Inference for Aggregation + +Outputs automatically participate in workflow-level statistics aggregation based on their `output_path`: + +- Paths containing directories starting with `"raw"` (e.g., `"/raw/table"`, `"/raw/database"`) are categorized as **Extract** phase +- Paths containing `"transformed"` directories are categorized as **Transform** phase +- Other paths skip aggregation entirely + +This replaces the previous explicit `WorkflowPhase` parameter. Aggregation accumulates statistics by phase and typename in object store. + +#### Example Structure +```json +{ + "Extract": { + "database": {"record_count": 1500}, + "schema": {"record_count": 8500}, + "table": {"record_count": 25784}, + "column": {"record_count": 482476} + }, + "Transform": { + "database": {"record_count": 1500}, + "schema": {"record_count": 8600}, + "table": {"record_count": 25247}, + "column": {"record_count": 460084} + } +} +``` + +#### Code Example +```python +# Extract phase output - auto-categorized and accumulated +parquet_output = ParquetOutput( + output_path="/tmp/artifacts", + output_suffix="raw/table" +) # Stats accumulated under Extract.table.record_count + +# Transform phase output - auto-categorized and accumulated +json_output = JsonOutput( + output_suffix="transformed", + output_path="/tmp/artifacts" +) # Stats accumulated under Transform.database.record_count +``` + +### Removed Features +- `WorkflowPhase` enum and `phase` constructor parameter have been removed +- No explicit phase passing required; inference is automatic and safe + ## Summary The `outputs` module complements the `inputs` module by providing classes to write data processed within activities. `JsonOutput` and `ParquetOutput` are commonly used for saving intermediate DataFrames to local files (and then uploading them to object storage), making the data available for subsequent activities like transformations. \ No newline at end of file diff --git a/tests/unit/outputs/test_iceberg.py b/tests/unit/outputs/test_iceberg.py index a52e64504..9c8c68721 100644 --- a/tests/unit/outputs/test_iceberg.py +++ b/tests/unit/outputs/test_iceberg.py @@ -5,7 +5,6 @@ from pyiceberg.catalog import Catalog from pyiceberg.table import Table -from application_sdk.outputs import WorkflowPhase from application_sdk.outputs.iceberg import IcebergOutput @@ -25,7 +24,6 @@ def iceberg_output(mock_catalog: Catalog) -> IcebergOutput: iceberg_catalog=mock_catalog, iceberg_namespace="test_namespace", iceberg_table="test_table", - phase=WorkflowPhase.EXTRACT, mode="append", ) @@ -36,7 +34,6 @@ def test_iceberg_output_initialization(mock_catalog: Catalog) -> None: iceberg_catalog=mock_catalog, iceberg_namespace="test_namespace", iceberg_table="test_table", - phase=WorkflowPhase.EXTRACT, mode="append", ) diff --git a/tests/unit/outputs/test_json_output.py b/tests/unit/outputs/test_json_output.py index 85873e6ec..444f9708c 100644 --- a/tests/unit/outputs/test_json_output.py +++ b/tests/unit/outputs/test_json_output.py @@ -7,7 +7,6 @@ import pytest from hypothesis import HealthCheck, given, settings -from application_sdk.outputs import WorkflowPhase from application_sdk.outputs.json import JsonOutput from application_sdk.test_utils.hypothesis.strategies.outputs.json_output import ( chunk_size_strategy, @@ -34,7 +33,6 @@ async def test_init(base_output_path: str, config: Dict[str, Any]) -> None: safe_path = str(Path(base_output_path) / config["output_path"]) json_output = JsonOutput( # type: ignore output_suffix=config["output_suffix"], - phase=WorkflowPhase.EXTRACT, output_path=safe_path, output_prefix=config["output_prefix"], chunk_size=config["chunk_size"], @@ -50,7 +48,6 @@ async def test_init(base_output_path: str, config: Dict[str, Any]) -> None: async def test_write_dataframe_empty(base_output_path: str) -> None: json_output = JsonOutput( # type: ignore output_suffix="tests/raw", - phase=WorkflowPhase.EXTRACT, output_path=base_output_path, output_prefix="test_prefix", chunk_size=100000, @@ -140,7 +137,6 @@ async def test_write_dataframe_multiple_chunks( async def test_write_dataframe_error(base_output_path: str) -> None: json_output = JsonOutput( # type: ignore output_suffix="tests/raw", - phase=WorkflowPhase.EXTRACT, output_path=base_output_path, output_prefix="test_prefix", chunk_size=100000, diff --git a/tests/unit/outputs/test_parquet_output.py b/tests/unit/outputs/test_parquet_output.py index 36228e14a..003e489ec 100644 --- a/tests/unit/outputs/test_parquet_output.py +++ b/tests/unit/outputs/test_parquet_output.py @@ -5,7 +5,6 @@ import pandas as pd import pytest -from application_sdk.outputs import WorkflowPhase from application_sdk.outputs.parquet import ParquetOutput @@ -46,7 +45,7 @@ class TestParquetOutputInit: def test_init_default_values(self, base_output_path: str): """Test ParquetOutput initialization with default values.""" - parquet_output = ParquetOutput(phase=WorkflowPhase.EXTRACT, output_path=base_output_path) + parquet_output = ParquetOutput(output_path=base_output_path) # The output path gets modified by adding suffix, so check it ends with the base path assert base_output_path in parquet_output.output_path @@ -65,7 +64,6 @@ def test_init_default_values(self, base_output_path: str): def test_init_custom_values(self, base_output_path: str): """Test ParquetOutput initialization with custom values.""" parquet_output = ParquetOutput( - phase=WorkflowPhase.EXTRACT, output_path=base_output_path, output_suffix="test_suffix", output_prefix="test_prefix", @@ -93,7 +91,6 @@ def test_init_custom_values(self, base_output_path: str): def test_init_creates_output_directory(self, base_output_path: str): """Test that initialization creates the output directory.""" parquet_output = ParquetOutput( - phase=WorkflowPhase.EXTRACT, output_path=base_output_path, output_suffix="test_dir", typename="test_table", @@ -109,7 +106,7 @@ class TestParquetOutputPathGen: def test_path_gen_with_markers(self, base_output_path: str): """Test path generation with start and end markers.""" - parquet_output = ParquetOutput(phase=WorkflowPhase.EXTRACT, output_path=base_output_path) + parquet_output = ParquetOutput(output_path=base_output_path) path = parquet_output.path_gen(start_marker="start_123", end_marker="end_456") @@ -117,7 +114,7 @@ def test_path_gen_with_markers(self, base_output_path: str): def test_path_gen_without_chunk_start(self, base_output_path: str): """Test path generation without chunk start.""" - parquet_output = ParquetOutput(phase=WorkflowPhase.EXTRACT, output_path=base_output_path) + parquet_output = ParquetOutput(output_path=base_output_path) path = parquet_output.path_gen(chunk_count=5) @@ -125,7 +122,7 @@ def test_path_gen_without_chunk_start(self, base_output_path: str): def test_path_gen_with_chunk_start(self, base_output_path: str): """Test path generation with chunk start.""" - parquet_output = ParquetOutput(phase=WorkflowPhase.EXTRACT, output_path=base_output_path) + parquet_output = ParquetOutput(output_path=base_output_path) path = parquet_output.path_gen(chunk_start=10, chunk_count=3) @@ -138,7 +135,7 @@ class TestParquetOutputWriteDataframe: @pytest.mark.asyncio async def test_write_empty_dataframe(self, base_output_path: str): """Test writing an empty DataFrame.""" - parquet_output = ParquetOutput(phase=WorkflowPhase.EXTRACT, output_path=base_output_path) + parquet_output = ParquetOutput(output_path=base_output_path) empty_df = pd.DataFrame() await parquet_output.write_dataframe(empty_df) @@ -162,7 +159,6 @@ async def test_write_dataframe_success( mock_prefix.return_value = "test/output/path" parquet_output = ParquetOutput( - phase=WorkflowPhase.EXTRACT, output_path=base_output_path, output_suffix="test" ) @@ -192,7 +188,6 @@ async def test_write_dataframe_with_custom_path_gen( mock_prefix.return_value = "test/output/path" parquet_output = ParquetOutput( - phase=WorkflowPhase.EXTRACT, output_path=base_output_path, start_marker="test_start", end_marker="test_end", @@ -218,7 +213,7 @@ async def test_write_dataframe_error_handling( with patch("pandas.DataFrame.to_parquet") as mock_to_parquet: mock_to_parquet.side_effect = Exception("Test error") - parquet_output = ParquetOutput(phase=WorkflowPhase.EXTRACT, output_path=base_output_path) + parquet_output = ParquetOutput(output_path=base_output_path) with pytest.raises(Exception, match="Test error"): await parquet_output.write_dataframe(sample_dataframe) @@ -235,7 +230,7 @@ async def test_write_daft_dataframe_empty(self, base_output_path: str): mock_df.count_rows.return_value = 0 mock_daft.return_value = mock_df - parquet_output = ParquetOutput(phase=WorkflowPhase.EXTRACT, output_path=base_output_path) + parquet_output = ParquetOutput(output_path=base_output_path) await parquet_output.write_daft_dataframe(mock_df) @@ -261,7 +256,6 @@ async def test_write_daft_dataframe_success(self, base_output_path: str): mock_df.write_parquet = MagicMock() parquet_output = ParquetOutput( - phase=WorkflowPhase.EXTRACT, output_path=base_output_path, ) @@ -301,7 +295,6 @@ async def test_write_daft_dataframe_with_parameter_overrides( mock_df.write_parquet = MagicMock() parquet_output = ParquetOutput( - phase=WorkflowPhase.EXTRACT, output_path=base_output_path, ) @@ -338,7 +331,7 @@ async def test_write_daft_dataframe_with_default_parameters( mock_df.write_parquet = MagicMock() parquet_output = ParquetOutput( - phase=WorkflowPhase.EXTRACT, + output_path=base_output_path, ) @@ -372,7 +365,7 @@ async def test_write_daft_dataframe_with_execution_configuration( mock_df.count_rows.return_value = 1000 mock_df.write_parquet = MagicMock() - parquet_output = ParquetOutput(phase=WorkflowPhase.EXTRACT, output_path=base_output_path) + parquet_output = ParquetOutput(output_path=base_output_path) await parquet_output.write_daft_dataframe(mock_df) @@ -392,7 +385,7 @@ async def test_write_daft_dataframe_error_handling(self, base_output_path: str): mock_df = MagicMock() mock_df.count_rows.side_effect = Exception("Count rows error") - parquet_output = ParquetOutput(phase=WorkflowPhase.EXTRACT, output_path=base_output_path) + parquet_output = ParquetOutput(output_path=base_output_path) with pytest.raises(Exception, match="Count rows error"): await parquet_output.write_daft_dataframe(mock_df) @@ -404,7 +397,7 @@ class TestParquetOutputUtilityMethods: def test_get_full_path(self, base_output_path: str): """Test get_full_path method.""" parquet_output = ParquetOutput( - phase=WorkflowPhase.EXTRACT, + output_path=base_output_path, output_suffix="test_suffix", typename="test_table", @@ -434,7 +427,7 @@ async def test_pandas_write_metrics( mock_metrics = MagicMock() mock_get_metrics.return_value = mock_metrics - parquet_output = ParquetOutput(phase=WorkflowPhase.EXTRACT, output_path=base_output_path) + parquet_output = ParquetOutput(output_path=base_output_path) await parquet_output.write_dataframe(sample_dataframe) @@ -465,7 +458,7 @@ async def test_daft_write_metrics(self, base_output_path: str): mock_df.count_rows.return_value = 1000 mock_df.write_parquet = MagicMock() - parquet_output = ParquetOutput(phase=WorkflowPhase.EXTRACT, output_path=base_output_path) + parquet_output = ParquetOutput(output_path=base_output_path) await parquet_output.write_daft_dataframe(mock_df) From b84c6b7702e1a4d770e14ac00a6ea186aa53a825 Mon Sep 17 00:00:00 2001 From: Prasanna Sairam Date: Thu, 6 Nov 2025 01:18:50 +0530 Subject: [PATCH 04/12] Corrected README --- docs/docs/concepts/outputs.md | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/docs/docs/concepts/outputs.md b/docs/docs/concepts/outputs.md index f76940e75..c9a441990 100644 --- a/docs/docs/concepts/outputs.md +++ b/docs/docs/concepts/outputs.md @@ -131,25 +131,6 @@ This replaces the previous explicit `WorkflowPhase` parameter. Aggregation accum } ``` -#### Code Example -```python -# Extract phase output - auto-categorized and accumulated -parquet_output = ParquetOutput( - output_path="/tmp/artifacts", - output_suffix="raw/table" -) # Stats accumulated under Extract.table.record_count - -# Transform phase output - auto-categorized and accumulated -json_output = JsonOutput( - output_suffix="transformed", - output_path="/tmp/artifacts" -) # Stats accumulated under Transform.database.record_count -``` - -### Removed Features -- `WorkflowPhase` enum and `phase` constructor parameter have been removed -- No explicit phase passing required; inference is automatic and safe - ## Summary The `outputs` module complements the `inputs` module by providing classes to write data processed within activities. `JsonOutput` and `ParquetOutput` are commonly used for saving intermediate DataFrames to local files (and then uploading them to object storage), making the data available for subsequent activities like transformations. \ No newline at end of file From 58c5f6782fec9847e950ee19c95baf221a273935 Mon Sep 17 00:00:00 2001 From: Prasanna Sairam Date: Thu, 6 Nov 2025 01:19:27 +0530 Subject: [PATCH 05/12] Corrected README --- application_sdk/services/objectstore.py | 2 +- docs/docs/concepts/outputs.md | 28 ------------------------- 2 files changed, 1 insertion(+), 29 deletions(-) diff --git a/application_sdk/services/objectstore.py b/application_sdk/services/objectstore.py index f4404ab1e..3dbcfaf2d 100644 --- a/application_sdk/services/objectstore.py +++ b/application_sdk/services/objectstore.py @@ -270,7 +270,7 @@ async def upload_prefix( ) await cls.upload_file(file_path, store_key, store_name) - logger.info(f"Completed uploading directory ---- {source} to object store") + logger.info(f"Completed uploading directory {source} to object store") except Exception as e: logger.error( f"An unexpected error occurred while uploading directory: {str(e)}" diff --git a/docs/docs/concepts/outputs.md b/docs/docs/concepts/outputs.md index c9a441990..c7aa70e77 100644 --- a/docs/docs/concepts/outputs.md +++ b/docs/docs/concepts/outputs.md @@ -103,34 +103,6 @@ async def query_executor( * **`ParquetOutput`:** Similar to `JsonOutput` but writes DataFrames to Parquet format files. Uses `daft.DataFrame.write_parquet()` or `pandas.DataFrame.to_parquet()`. Also uploads files to object storage after local processing. * **`IcebergOutput`:** Writes DataFrames directly to an Iceberg table using `pyiceberg`. -## Phase Inference for Aggregation - -Outputs automatically participate in workflow-level statistics aggregation based on their `output_path`: - -- Paths containing directories starting with `"raw"` (e.g., `"/raw/table"`, `"/raw/database"`) are categorized as **Extract** phase -- Paths containing `"transformed"` directories are categorized as **Transform** phase -- Other paths skip aggregation entirely - -This replaces the previous explicit `WorkflowPhase` parameter. Aggregation accumulates statistics by phase and typename in object store. - -#### Example Structure -```json -{ - "Extract": { - "database": {"record_count": 1500}, - "schema": {"record_count": 8500}, - "table": {"record_count": 25784}, - "column": {"record_count": 482476} - }, - "Transform": { - "database": {"record_count": 1500}, - "schema": {"record_count": 8600}, - "table": {"record_count": 25247}, - "column": {"record_count": 460084} - } -} -``` - ## Summary The `outputs` module complements the `inputs` module by providing classes to write data processed within activities. `JsonOutput` and `ParquetOutput` are commonly used for saving intermediate DataFrames to local files (and then uploading them to object storage), making the data available for subsequent activities like transformations. \ No newline at end of file From 995c7dff6ac09378ae1e2aaf307e4377a9cf766e Mon Sep 17 00:00:00 2001 From: Prasanna Sairam Date: Thu, 6 Nov 2025 01:28:10 +0530 Subject: [PATCH 06/12] Revert changes --- application_sdk/activities/metadata_extraction/sql.py | 1 + application_sdk/server/fastapi/__init__.py | 1 - tests/unit/outputs/test_json_output.py | 2 +- tests/unit/outputs/test_parquet_output.py | 1 - 4 files changed, 2 insertions(+), 3 deletions(-) diff --git a/application_sdk/activities/metadata_extraction/sql.py b/application_sdk/activities/metadata_extraction/sql.py index 9fdf115a7..dda62d193 100644 --- a/application_sdk/activities/metadata_extraction/sql.py +++ b/application_sdk/activities/metadata_extraction/sql.py @@ -831,6 +831,7 @@ async def transform_data( raw_input = raw_input.get_batched_daft_dataframe() transformed_output = JsonOutput( + output_path=output_path, output_suffix="transformed", typename=typename, chunk_start=workflow_args.get("chunk_start"), diff --git a/application_sdk/server/fastapi/__init__.py b/application_sdk/server/fastapi/__init__.py index a73b28ad2..c2b536a06 100644 --- a/application_sdk/server/fastapi/__init__.py +++ b/application_sdk/server/fastapi/__init__.py @@ -811,4 +811,3 @@ async def start( ) ) await server.serve() - diff --git a/tests/unit/outputs/test_json_output.py b/tests/unit/outputs/test_json_output.py index e23da75ca..d6bacb4c2 100644 --- a/tests/unit/outputs/test_json_output.py +++ b/tests/unit/outputs/test_json_output.py @@ -32,8 +32,8 @@ async def test_init(base_output_path: str, config: Dict[str, Any]) -> None: # Create a safe output path by joining base_output_path with config's output_path safe_path = str(Path(base_output_path) / config["output_path"]) json_output = JsonOutput( # type: ignore - output_suffix=config["output_suffix"], output_path=safe_path, + output_suffix=config["output_suffix"], output_prefix=config["output_prefix"], chunk_size=config["chunk_size"], ) diff --git a/tests/unit/outputs/test_parquet_output.py b/tests/unit/outputs/test_parquet_output.py index 145560d49..cddba4735 100644 --- a/tests/unit/outputs/test_parquet_output.py +++ b/tests/unit/outputs/test_parquet_output.py @@ -402,7 +402,6 @@ async def test_write_daft_dataframe_with_default_parameters( mock_df.write_parquet.return_value = mock_result parquet_output = ParquetOutput( - output_path=base_output_path, ) From 61bb35808500dfb0a8b3a52246c34f9333643205 Mon Sep 17 00:00:00 2001 From: Prasanna Sairam Date: Thu, 6 Nov 2025 02:20:53 +0530 Subject: [PATCH 07/12] Added typename for transform --- application_sdk/activities/metadata_extraction/sql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/application_sdk/activities/metadata_extraction/sql.py b/application_sdk/activities/metadata_extraction/sql.py index dda62d193..9ab39092f 100644 --- a/application_sdk/activities/metadata_extraction/sql.py +++ b/application_sdk/activities/metadata_extraction/sql.py @@ -850,7 +850,7 @@ async def transform_data( dataframe=dataframe, **workflow_args ) await transformed_output.write_daft_dataframe(transform_metadata) - return await transformed_output.get_statistics() + return await transformed_output.get_statistics(typename=typename) @activity.defn @auto_heartbeater From 774730d29e1664d3a1dcbe0fd21f2d76353ac277 Mon Sep 17 00:00:00 2001 From: Prasanna Sairam Date: Thu, 6 Nov 2025 17:50:44 +0530 Subject: [PATCH 08/12] Removed comments --- application_sdk/outputs/__init__.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/application_sdk/outputs/__init__.py b/application_sdk/outputs/__init__.py index b5d8da4a6..c23f76601 100644 --- a/application_sdk/outputs/__init__.py +++ b/application_sdk/outputs/__init__.py @@ -445,6 +445,7 @@ async def write_statistics(self, typename: Optional[str] = None) -> Optional[Dic except Exception as e: logger.error(f"Error writing statistics: {str(e)}") + #TODO Do we need locking here ? async def _update_run_aggregate( self, per_path_destination: str, statistics: Dict[str, Any] ) -> None: @@ -461,14 +462,10 @@ async def _update_run_aggregate( return logger.info(f"Starting _update_run_aggregate for phase: {inferred_phase}") - # Build the workflow run root path directly using utility functions (no path manipulation!) - # build_output_path() returns: "artifacts/apps/{app}/workflows/{workflow_id}/{run_id}" - # We need the local path: "./local/tmp/artifacts/apps/{app}/workflows/{workflow_id}/{run_id}" workflow_run_root_relative = build_output_path() output_file_name = f"{TEMPORARY_PATH}{workflow_run_root_relative}/statistics.json.ignore" destination_file_path = get_object_store_prefix(output_file_name) - # Load existing aggregate from object store if present # Structure: {"Extract": {"typename": {"record_count": N}}, "Transform": {...}, "Publish": {...}} aggregate_by_phase: Dict[str, Dict[str, Dict[str, Any]]] = { From 505ab3fa3e82d6bc4c723411b003980c858a9b05 Mon Sep 17 00:00:00 2001 From: Prasanna Sairam Date: Thu, 6 Nov 2025 18:02:29 +0530 Subject: [PATCH 09/12] Added UI --- tests/unit/outputs/test_output.py | 114 ++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/tests/unit/outputs/test_output.py b/tests/unit/outputs/test_output.py index 23468e75a..30d08f933 100644 --- a/tests/unit/outputs/test_output.py +++ b/tests/unit/outputs/test_output.py @@ -1,6 +1,7 @@ """Unit tests for output interface.""" from typing import Any +import json from unittest.mock import AsyncMock, mock_open, patch import pandas as pd @@ -161,3 +162,116 @@ async def test_write_statistics_error(self): assert result is None mock_logger.assert_called_once() assert "Error writing statistics" in mock_logger.call_args[0][0] + + async def test__update_run_aggregate_skips_when_phase_unknown(self): + """Skips aggregation when phase cannot be inferred from output_path.""" + # Ensure no 'raw' or 'transformed' in path so phase is None + self.output.output_path = "/tmp/no-phase/path" + stats = {"typename": "table", "total_record_count": 10} + + with patch( + "application_sdk.services.objectstore.ObjectStore.download_file", + new_callable=AsyncMock, + ) as mock_dl, patch( + "application_sdk.services.objectstore.ObjectStore.upload_file", + new_callable=AsyncMock, + ) as mock_ul, patch("builtins.open", mock_open()) as m: + await self.output._update_run_aggregate("ignored", stats) + mock_dl.assert_not_awaited() + mock_ul.assert_not_awaited() + # No reads/writes when phase is unknown + assert m.call_count == 0 + + async def test__update_run_aggregate_creates_new_aggregate_extract(self): + """Creates a new aggregate structure and writes stats for Extract phase.""" + # Make phase inference return "Extract" + self.output.output_path = "/tmp/run/raw/path" + stats = {"typename": "table", "total_record_count": 7} + + with patch( + "application_sdk.outputs.build_output_path", return_value="workflow/run" + ), patch( + "application_sdk.outputs.get_object_store_prefix", + return_value="os://bucket/statistics.json.ignore", + ), patch( + "application_sdk.services.objectstore.ObjectStore.download_file", + new_callable=AsyncMock, + ) as mock_dl, patch( + "application_sdk.services.objectstore.ObjectStore.upload_file", + new_callable=AsyncMock, + ) as mock_ul, patch("builtins.open", mock_open()) as m: + # Simulate no existing aggregate in object store + mock_dl.side_effect = Exception("not found") + + await self.output._update_run_aggregate("ignored", stats) + + handle = m() + # One write with the aggregated payload + write_calls = handle.write.call_args_list + assert len(write_calls) == 1 + payload = write_calls[0].args[0] + data = json.loads(payload) + + assert data["Extract"]["table"]["record_count"] == 7 + # Other phases should exist, even if empty + assert "Transform" in data and "Publish" in data + + mock_ul.assert_awaited_once() + + async def test__update_run_aggregate_accumulates_existing_transform(self): + """Accumulates total_record_count into existing Transform aggregate.""" + # Make phase inference return "Transform" + self.output.output_path = "/tmp/run/transformed/path" + stats = {"typename": "table", "total_record_count": 3} + + existing = {"Extract": {}, "Transform": {"table": {"record_count": 5}}, "Publish": {}} + m = mock_open(read_data=json.dumps(existing)) + + with patch( + "application_sdk.outputs.build_output_path", return_value="workflow/run" + ), patch( + "application_sdk.outputs.get_object_store_prefix", + return_value="os://bucket/statistics.json.ignore", + ), patch( + "application_sdk.services.objectstore.ObjectStore.download_file", + new_callable=AsyncMock, + ) as mock_dl, patch( + "application_sdk.services.objectstore.ObjectStore.upload_file", + new_callable=AsyncMock, + ) as mock_ul, patch("builtins.open", m) as mo: + await self.output._update_run_aggregate("ignored", stats) + + handle = mo() + written = handle.write.call_args[0][0] + data = json.loads(written) + + # 5 existing + 3 new + assert data["Transform"]["table"]["record_count"] == 8 + mock_dl.assert_awaited_once() + mock_ul.assert_awaited_once() + + async def test__update_run_aggregate_defaults_unknown_typename(self): + """Uses 'unknown' typename when not provided in statistics.""" + self.output.output_path = "/tmp/run/raw/path" + stats = {"total_record_count": 4} # no 'typename' + + with patch( + "application_sdk.outputs.build_output_path", return_value="workflow/run" + ), patch( + "application_sdk.outputs.get_object_store_prefix", + return_value="os://bucket/statistics.json.ignore", + ), patch( + "application_sdk.services.objectstore.ObjectStore.download_file", + new_callable=AsyncMock, + ) as mock_dl, patch( + "application_sdk.services.objectstore.ObjectStore.upload_file", + new_callable=AsyncMock, + ) as mock_ul, patch("builtins.open", mock_open()) as m: + mock_dl.side_effect = Exception("not found") + + await self.output._update_run_aggregate("ignored", stats) + + payload = m().write.call_args[0][0] + data = json.loads(payload) + assert data["Extract"]["unknown"]["record_count"] == 4 + mock_ul.assert_awaited_once() From 0b043625eea5da492ec8558d84781c2c8c402793 Mon Sep 17 00:00:00 2001 From: Prasanna Sairam Date: Thu, 13 Nov 2025 09:59:05 +0530 Subject: [PATCH 10/12] Adding functionality for method level locking --- application_sdk/decorators/__init__.py | 2 + application_sdk/decorators/method_lock.py | 89 ++++++++++++ application_sdk/outputs/__init__.py | 2 + tests/unit/decorators/test_method_lock.py | 168 ++++++++++++++++++++++ 4 files changed, 261 insertions(+) create mode 100644 application_sdk/decorators/method_lock.py create mode 100644 tests/unit/decorators/test_method_lock.py diff --git a/application_sdk/decorators/__init__.py b/application_sdk/decorators/__init__.py index e69de29bb..139597f9c 100644 --- a/application_sdk/decorators/__init__.py +++ b/application_sdk/decorators/__init__.py @@ -0,0 +1,2 @@ + + diff --git a/application_sdk/decorators/method_lock.py b/application_sdk/decorators/method_lock.py new file mode 100644 index 000000000..ebaba5bcc --- /dev/null +++ b/application_sdk/decorators/method_lock.py @@ -0,0 +1,89 @@ +from __future__ import annotations + +import asyncio +from functools import wraps +from typing import Any, Awaitable, Callable, Optional + +from temporalio import activity + +from application_sdk.clients.redis import RedisClientAsync +from application_sdk.constants import ( + APPLICATION_NAME, + IS_LOCKING_DISABLED, +) +from application_sdk.observability.logger_adaptor import get_logger + +logger = get_logger(__name__) + + +def lock_per_run( + lock_name: Optional[str] = None, ttl_seconds: int = 10 +) -> Callable[[Callable[..., Awaitable[Any]]], Callable[..., Awaitable[Any]]]: + """Serialize an async method within an activity per workflow run. + + Uses Redis SET NX EX for acquisition and an owner-verified release. + The lock key is namespaced and scoped to the current workflow run: + ``{APPLICATION_NAME}:meth:{method_name}:run:{workflow_run_id}``. + + Args: + lock_name: Optional explicit lock name. Defaults to the wrapped method's name. + ttl_seconds: Lock TTL in seconds. Should cover worst-case wait + execution time. + + Returns: + A decorator for async callables to guard them with a per-run distributed lock. + """ + + def _decorate( + fn: Callable[..., Awaitable[Any]] + ) -> Callable[..., Awaitable[Any]]: + @wraps(fn) + async def _wrapped(*args: Any, **kwargs: Any) -> Any: + if IS_LOCKING_DISABLED: + return await fn(*args, **kwargs) + + run_id = activity.info().workflow_run_id + name = lock_name or fn.__name__ + + resource_id = f"{APPLICATION_NAME}:meth:{name}:run:{run_id}" + owner_id = f"{APPLICATION_NAME}:{run_id}" + + async with RedisClientAsync() as rc: + # Acquire with retry + retry_count = 0 + while True: + logger.debug(f"Attempting to acquire lock: {resource_id}, owner: {owner_id}") + acquired = await rc._acquire_lock( + resource_id, owner_id, ttl_seconds + ) + if acquired: + logger.info(f"Lock acquired: {resource_id}, owner: {owner_id}") + break + retry_count += 1 + logger.debug( + f"Lock not available, retrying (attempt {retry_count}): {resource_id}" + ) + await asyncio.sleep(5) + + try: + return await fn(*args, **kwargs) + finally: + # Best-effort release; TTL guarantees cleanup if this fails + try: + logger.debug(f"Releasing lock: {resource_id}, owner: {owner_id}") + released, result = await rc._release_lock(resource_id, owner_id) + if released: + logger.info(f"Lock released successfully: {resource_id}") + else: + logger.warning( + f"Lock release failed (may already be released): {resource_id}, result: {result}" + ) + except Exception as e: + logger.warning( + f"Exception during lock release for {resource_id}: {e}. TTL will handle cleanup." + ) + + return _wrapped + + return _decorate + + diff --git a/application_sdk/outputs/__init__.py b/application_sdk/outputs/__init__.py index c23f76601..2059c9417 100644 --- a/application_sdk/outputs/__init__.py +++ b/application_sdk/outputs/__init__.py @@ -31,6 +31,7 @@ from application_sdk.observability.metrics_adaptor import MetricType from application_sdk.services.objectstore import ObjectStore from application_sdk.constants import TEMPORARY_PATH +from application_sdk.decorators.method_lock import lock_per_run logger = get_logger(__name__) activity.logger = logger @@ -446,6 +447,7 @@ async def write_statistics(self, typename: Optional[str] = None) -> Optional[Dic logger.error(f"Error writing statistics: {str(e)}") #TODO Do we need locking here ? + @lock_per_run() async def _update_run_aggregate( self, per_path_destination: str, statistics: Dict[str, Any] ) -> None: diff --git a/tests/unit/decorators/test_method_lock.py b/tests/unit/decorators/test_method_lock.py new file mode 100644 index 000000000..db2c77ece --- /dev/null +++ b/tests/unit/decorators/test_method_lock.py @@ -0,0 +1,168 @@ +"""Unit tests for method-level lock decorator.""" + +from types import SimpleNamespace +from unittest.mock import AsyncMock, patch + +import pytest + +from application_sdk.decorators.method_lock import lock_per_run + + +class TestLockPerRun: + """Test lock_per_run decorator.""" + + @patch("application_sdk.decorators.method_lock.RedisClientAsync") + @patch("application_sdk.decorators.method_lock.activity") + @patch("application_sdk.decorators.method_lock.IS_LOCKING_DISABLED", False) + async def test_lock_per_run_acquires_and_releases_success( + self, mock_activity, mock_redis_client_class + ): + """Test successful lock acquisition and release.""" + # Setup mocks + mock_activity.info.return_value = SimpleNamespace(workflow_run_id="run-123") + mock_client = AsyncMock() + mock_client._acquire_lock.return_value = True + mock_client._release_lock.return_value = (True, None) + mock_redis_client_class.return_value.__aenter__.return_value = mock_client + + # Execute + @lock_per_run() + async def sample(x: int) -> str: + return f"ok-{x}" + + result = await sample(42) + + # Verify + assert result == "ok-42" + mock_client._acquire_lock.assert_called_once() + mock_client._release_lock.assert_called_once() + # Verify lock key format + call_args = mock_client._acquire_lock.call_args + assert call_args[0][0].endswith(":run:run-123") + assert ":meth:sample:" in call_args[0][0] + + @patch("application_sdk.decorators.method_lock.RedisClientAsync") + @patch("application_sdk.decorators.method_lock.activity") + @patch("application_sdk.decorators.method_lock.IS_LOCKING_DISABLED", False) + @patch("application_sdk.decorators.method_lock.asyncio.sleep") + async def test_lock_per_run_retries_on_failure( + self, mock_sleep, mock_activity, mock_redis_client_class + ): + """Test lock retries when acquisition fails initially.""" + # Setup mocks + mock_activity.info.return_value = SimpleNamespace(workflow_run_id="run-456") + mock_client = AsyncMock() + # First call fails, second succeeds + mock_client._acquire_lock.side_effect = [False, True] + mock_client._release_lock.return_value = (True, None) + mock_redis_client_class.return_value.__aenter__.return_value = mock_client + + # Execute + @lock_per_run() + async def sample() -> str: + return "ok" + + result = await sample() + + # Verify + assert result == "ok" + assert mock_client._acquire_lock.call_count == 2 + mock_sleep.assert_called_once() # Should sleep once between retries + mock_client._release_lock.assert_called_once() + + @patch("application_sdk.decorators.method_lock.activity") + @patch("application_sdk.decorators.method_lock.IS_LOCKING_DISABLED", True) + async def test_lock_per_run_skips_when_disabled(self, mock_activity): + """Test lock is skipped when IS_LOCKING_DISABLED is True.""" + # Setup mock + mock_activity.info.return_value = SimpleNamespace(workflow_run_id="run-abc") + + # Execute + @lock_per_run() + async def sample() -> str: + return "ok" + + result = await sample() + + # Verify - function executes without Redis calls + assert result == "ok" + + @patch("application_sdk.decorators.method_lock.RedisClientAsync") + @patch("application_sdk.decorators.method_lock.activity") + @patch("application_sdk.decorators.method_lock.IS_LOCKING_DISABLED", False) + async def test_lock_per_run_releases_on_exception( + self, mock_activity, mock_redis_client_class + ): + """Test lock is released even when wrapped function raises exception.""" + # Setup mocks + mock_activity.info.return_value = SimpleNamespace(workflow_run_id="run-err") + mock_client = AsyncMock() + mock_client._acquire_lock.return_value = True + mock_client._release_lock.return_value = (True, None) + mock_redis_client_class.return_value.__aenter__.return_value = mock_client + + # Execute + @lock_per_run() + async def failing() -> None: + raise RuntimeError("boom") + + # Verify exception is raised + with pytest.raises(RuntimeError, match="boom"): + await failing() + + # Verify release was called even on exception + mock_client._acquire_lock.assert_called_once() + mock_client._release_lock.assert_called_once() + + @patch("application_sdk.decorators.method_lock.RedisClientAsync") + @patch("application_sdk.decorators.method_lock.activity") + @patch("application_sdk.decorators.method_lock.IS_LOCKING_DISABLED", False) + async def test_lock_per_run_custom_lock_name( + self, mock_activity, mock_redis_client_class + ): + """Test custom lock name is used when provided.""" + # Setup mocks + mock_activity.info.return_value = SimpleNamespace(workflow_run_id="run-custom") + mock_client = AsyncMock() + mock_client._acquire_lock.return_value = True + mock_client._release_lock.return_value = (True, None) + mock_redis_client_class.return_value.__aenter__.return_value = mock_client + + # Execute + @lock_per_run(lock_name="custom_lock") + async def sample() -> str: + return "ok" + + await sample() + + # Verify custom lock name in resource_id + call_args = mock_client._acquire_lock.call_args + resource_id = call_args[0][0] + assert ":meth:custom_lock:" in resource_id + assert ":meth:sample:" not in resource_id + + @patch("application_sdk.decorators.method_lock.RedisClientAsync") + @patch("application_sdk.decorators.method_lock.activity") + @patch("application_sdk.decorators.method_lock.IS_LOCKING_DISABLED", False) + async def test_lock_per_run_custom_ttl( + self, mock_activity, mock_redis_client_class + ): + """Test custom TTL is used when provided.""" + # Setup mocks + mock_activity.info.return_value = SimpleNamespace(workflow_run_id="run-ttl") + mock_client = AsyncMock() + mock_client._acquire_lock.return_value = True + mock_client._release_lock.return_value = (True, None) + mock_redis_client_class.return_value.__aenter__.return_value = mock_client + + # Execute + @lock_per_run(ttl_seconds=300) + async def sample() -> str: + return "ok" + + await sample() + + # Verify custom TTL is passed to acquire_lock + call_args = mock_client._acquire_lock.call_args + ttl_seconds = call_args[0][2] + assert ttl_seconds == 300 From aa59f9664ac113a959fb9bd0bc6c8b2a3f9e4bb5 Mon Sep 17 00:00:00 2001 From: Prasanna Sairam Date: Fri, 14 Nov 2025 12:41:47 +0530 Subject: [PATCH 11/12] Adding statistics files for each chunk, removing aggregate --- application_sdk/decorators/__init__.py | 1 - application_sdk/decorators/method_lock.py | 89 ------------ application_sdk/outputs/__init__.py | 98 +++---------- tests/unit/decorators/test_method_lock.py | 168 ---------------------- tests/unit/outputs/test_output.py | 112 --------------- 5 files changed, 16 insertions(+), 452 deletions(-) delete mode 100644 application_sdk/decorators/method_lock.py delete mode 100644 tests/unit/decorators/test_method_lock.py diff --git a/application_sdk/decorators/__init__.py b/application_sdk/decorators/__init__.py index 139597f9c..8b1378917 100644 --- a/application_sdk/decorators/__init__.py +++ b/application_sdk/decorators/__init__.py @@ -1,2 +1 @@ - diff --git a/application_sdk/decorators/method_lock.py b/application_sdk/decorators/method_lock.py deleted file mode 100644 index ebaba5bcc..000000000 --- a/application_sdk/decorators/method_lock.py +++ /dev/null @@ -1,89 +0,0 @@ -from __future__ import annotations - -import asyncio -from functools import wraps -from typing import Any, Awaitable, Callable, Optional - -from temporalio import activity - -from application_sdk.clients.redis import RedisClientAsync -from application_sdk.constants import ( - APPLICATION_NAME, - IS_LOCKING_DISABLED, -) -from application_sdk.observability.logger_adaptor import get_logger - -logger = get_logger(__name__) - - -def lock_per_run( - lock_name: Optional[str] = None, ttl_seconds: int = 10 -) -> Callable[[Callable[..., Awaitable[Any]]], Callable[..., Awaitable[Any]]]: - """Serialize an async method within an activity per workflow run. - - Uses Redis SET NX EX for acquisition and an owner-verified release. - The lock key is namespaced and scoped to the current workflow run: - ``{APPLICATION_NAME}:meth:{method_name}:run:{workflow_run_id}``. - - Args: - lock_name: Optional explicit lock name. Defaults to the wrapped method's name. - ttl_seconds: Lock TTL in seconds. Should cover worst-case wait + execution time. - - Returns: - A decorator for async callables to guard them with a per-run distributed lock. - """ - - def _decorate( - fn: Callable[..., Awaitable[Any]] - ) -> Callable[..., Awaitable[Any]]: - @wraps(fn) - async def _wrapped(*args: Any, **kwargs: Any) -> Any: - if IS_LOCKING_DISABLED: - return await fn(*args, **kwargs) - - run_id = activity.info().workflow_run_id - name = lock_name or fn.__name__ - - resource_id = f"{APPLICATION_NAME}:meth:{name}:run:{run_id}" - owner_id = f"{APPLICATION_NAME}:{run_id}" - - async with RedisClientAsync() as rc: - # Acquire with retry - retry_count = 0 - while True: - logger.debug(f"Attempting to acquire lock: {resource_id}, owner: {owner_id}") - acquired = await rc._acquire_lock( - resource_id, owner_id, ttl_seconds - ) - if acquired: - logger.info(f"Lock acquired: {resource_id}, owner: {owner_id}") - break - retry_count += 1 - logger.debug( - f"Lock not available, retrying (attempt {retry_count}): {resource_id}" - ) - await asyncio.sleep(5) - - try: - return await fn(*args, **kwargs) - finally: - # Best-effort release; TTL guarantees cleanup if this fails - try: - logger.debug(f"Releasing lock: {resource_id}, owner: {owner_id}") - released, result = await rc._release_lock(resource_id, owner_id) - if released: - logger.info(f"Lock released successfully: {resource_id}") - else: - logger.warning( - f"Lock release failed (may already be released): {resource_id}, result: {result}" - ) - except Exception as e: - logger.warning( - f"Exception during lock release for {resource_id}: {e}. TTL will handle cleanup." - ) - - return _wrapped - - return _decorate - - diff --git a/application_sdk/outputs/__init__.py b/application_sdk/outputs/__init__.py index 2059c9417..4fb71a173 100644 --- a/application_sdk/outputs/__init__.py +++ b/application_sdk/outputs/__init__.py @@ -30,8 +30,6 @@ from application_sdk.observability.logger_adaptor import get_logger from application_sdk.observability.metrics_adaptor import MetricType from application_sdk.services.objectstore import ObjectStore -from application_sdk.constants import TEMPORARY_PATH -from application_sdk.decorators.method_lock import lock_per_run logger = get_logger(__name__) activity.logger = logger @@ -423,10 +421,22 @@ async def write_statistics(self, typename: Optional[str] = None) -> Optional[Dic "partitions": self.partitions, } - # Write the statistics to a json file - output_file_name = f"{self.output_path}/statistics.json.ignore" - with open(output_file_name, "w") as f: - f.write(orjson.dumps(statistics).decode("utf-8")) + # Ensure typename is included in the statistics payload (if provided) + if typename: + statistics["typename"] = typename + + # Write the statistics to a json file inside a dedicated statistics/ folder + statistics_dir = os.path.join(self.output_path, "statistics") + os.makedirs(statistics_dir, exist_ok=True) + output_file_name = f"{statistics_dir}/statistics.json.ignore" + # If chunk_start is provided, include it in the statistics filename + try: + cs = getattr(self, "chunk_start", None) + if cs is not None: + output_file_name = f"{statistics_dir}/statistics-chunk-{cs}.json.ignore" + except Exception: + # If accessing chunk_start fails, fallback to default filename + pass destination_file_path = get_object_store_prefix(output_file_name) # Push the file to the object store @@ -435,83 +445,7 @@ async def write_statistics(self, typename: Optional[str] = None) -> Optional[Dic destination=destination_file_path, ) - if typename: - statistics["typename"] = typename - # Update aggregated statistics at run root in object store - try: - await self._update_run_aggregate(destination_file_path, statistics) - except Exception as e: - logger.warning(f"Failed to update aggregated statistics: {str(e)}") return statistics except Exception as e: logger.error(f"Error writing statistics: {str(e)}") - #TODO Do we need locking here ? - @lock_per_run() - async def _update_run_aggregate( - self, per_path_destination: str, statistics: Dict[str, Any] - ) -> None: - """Aggregate stats into a single file at the workflow run root. - - Args: - per_path_destination: Object store destination path for this stats file - (used as key in the aggregate map) - statistics: The statistics dictionary to store - """ - inferred_phase = self._infer_phase_from_path() - if inferred_phase is None: - logger.info("Phase could not be inferred from path. Skipping aggregation.") - return - - logger.info(f"Starting _update_run_aggregate for phase: {inferred_phase}") - workflow_run_root_relative = build_output_path() - output_file_name = f"{TEMPORARY_PATH}{workflow_run_root_relative}/statistics.json.ignore" - destination_file_path = get_object_store_prefix(output_file_name) - - # Load existing aggregate from object store if present - # Structure: {"Extract": {"typename": {"record_count": N}}, "Transform": {...}, "Publish": {...}} - aggregate_by_phase: Dict[str, Dict[str, Dict[str, Any]]] = { - "Extract": {}, - "Transform": {}, - "Publish": {} - } - - try: - # Download existing aggregate file if present - await ObjectStore.download_file( - source=destination_file_path, - destination=output_file_name, - ) - # Load existing JSON structure - with open(output_file_name, "r") as f: - existing_aggregate = orjson.loads(f.read()) - # Phase-based structure - aggregate_by_phase.update(existing_aggregate) - logger.info(f"Successfully loaded existing aggregates") - except Exception: - logger.info( - "No existing aggregate found or failed to read. Initializing a new aggregate structure." - ) - - # Accumulate statistics by typename within the phase - typename = statistics.get("typename", "unknown") - - if typename not in aggregate_by_phase[inferred_phase]: - aggregate_by_phase[inferred_phase][typename] = { - "record_count": 0 - } - - logger.info(f"Accumulating statistics for phase '{inferred_phase}', typename '{typename}': +{statistics['total_record_count']} records") - - # Accumulate the record count - aggregate_by_phase[inferred_phase][typename]["record_count"] += statistics["total_record_count"] - - with open(output_file_name, "w") as f: - f.write(orjson.dumps(aggregate_by_phase).decode("utf-8")) - logger.info(f"Successfully updated aggregate with accumulated stats for phase '{inferred_phase}'") - - # Upload aggregate to object store - await ObjectStore.upload_file( - source=output_file_name, - destination=destination_file_path, - ) diff --git a/tests/unit/decorators/test_method_lock.py b/tests/unit/decorators/test_method_lock.py deleted file mode 100644 index db2c77ece..000000000 --- a/tests/unit/decorators/test_method_lock.py +++ /dev/null @@ -1,168 +0,0 @@ -"""Unit tests for method-level lock decorator.""" - -from types import SimpleNamespace -from unittest.mock import AsyncMock, patch - -import pytest - -from application_sdk.decorators.method_lock import lock_per_run - - -class TestLockPerRun: - """Test lock_per_run decorator.""" - - @patch("application_sdk.decorators.method_lock.RedisClientAsync") - @patch("application_sdk.decorators.method_lock.activity") - @patch("application_sdk.decorators.method_lock.IS_LOCKING_DISABLED", False) - async def test_lock_per_run_acquires_and_releases_success( - self, mock_activity, mock_redis_client_class - ): - """Test successful lock acquisition and release.""" - # Setup mocks - mock_activity.info.return_value = SimpleNamespace(workflow_run_id="run-123") - mock_client = AsyncMock() - mock_client._acquire_lock.return_value = True - mock_client._release_lock.return_value = (True, None) - mock_redis_client_class.return_value.__aenter__.return_value = mock_client - - # Execute - @lock_per_run() - async def sample(x: int) -> str: - return f"ok-{x}" - - result = await sample(42) - - # Verify - assert result == "ok-42" - mock_client._acquire_lock.assert_called_once() - mock_client._release_lock.assert_called_once() - # Verify lock key format - call_args = mock_client._acquire_lock.call_args - assert call_args[0][0].endswith(":run:run-123") - assert ":meth:sample:" in call_args[0][0] - - @patch("application_sdk.decorators.method_lock.RedisClientAsync") - @patch("application_sdk.decorators.method_lock.activity") - @patch("application_sdk.decorators.method_lock.IS_LOCKING_DISABLED", False) - @patch("application_sdk.decorators.method_lock.asyncio.sleep") - async def test_lock_per_run_retries_on_failure( - self, mock_sleep, mock_activity, mock_redis_client_class - ): - """Test lock retries when acquisition fails initially.""" - # Setup mocks - mock_activity.info.return_value = SimpleNamespace(workflow_run_id="run-456") - mock_client = AsyncMock() - # First call fails, second succeeds - mock_client._acquire_lock.side_effect = [False, True] - mock_client._release_lock.return_value = (True, None) - mock_redis_client_class.return_value.__aenter__.return_value = mock_client - - # Execute - @lock_per_run() - async def sample() -> str: - return "ok" - - result = await sample() - - # Verify - assert result == "ok" - assert mock_client._acquire_lock.call_count == 2 - mock_sleep.assert_called_once() # Should sleep once between retries - mock_client._release_lock.assert_called_once() - - @patch("application_sdk.decorators.method_lock.activity") - @patch("application_sdk.decorators.method_lock.IS_LOCKING_DISABLED", True) - async def test_lock_per_run_skips_when_disabled(self, mock_activity): - """Test lock is skipped when IS_LOCKING_DISABLED is True.""" - # Setup mock - mock_activity.info.return_value = SimpleNamespace(workflow_run_id="run-abc") - - # Execute - @lock_per_run() - async def sample() -> str: - return "ok" - - result = await sample() - - # Verify - function executes without Redis calls - assert result == "ok" - - @patch("application_sdk.decorators.method_lock.RedisClientAsync") - @patch("application_sdk.decorators.method_lock.activity") - @patch("application_sdk.decorators.method_lock.IS_LOCKING_DISABLED", False) - async def test_lock_per_run_releases_on_exception( - self, mock_activity, mock_redis_client_class - ): - """Test lock is released even when wrapped function raises exception.""" - # Setup mocks - mock_activity.info.return_value = SimpleNamespace(workflow_run_id="run-err") - mock_client = AsyncMock() - mock_client._acquire_lock.return_value = True - mock_client._release_lock.return_value = (True, None) - mock_redis_client_class.return_value.__aenter__.return_value = mock_client - - # Execute - @lock_per_run() - async def failing() -> None: - raise RuntimeError("boom") - - # Verify exception is raised - with pytest.raises(RuntimeError, match="boom"): - await failing() - - # Verify release was called even on exception - mock_client._acquire_lock.assert_called_once() - mock_client._release_lock.assert_called_once() - - @patch("application_sdk.decorators.method_lock.RedisClientAsync") - @patch("application_sdk.decorators.method_lock.activity") - @patch("application_sdk.decorators.method_lock.IS_LOCKING_DISABLED", False) - async def test_lock_per_run_custom_lock_name( - self, mock_activity, mock_redis_client_class - ): - """Test custom lock name is used when provided.""" - # Setup mocks - mock_activity.info.return_value = SimpleNamespace(workflow_run_id="run-custom") - mock_client = AsyncMock() - mock_client._acquire_lock.return_value = True - mock_client._release_lock.return_value = (True, None) - mock_redis_client_class.return_value.__aenter__.return_value = mock_client - - # Execute - @lock_per_run(lock_name="custom_lock") - async def sample() -> str: - return "ok" - - await sample() - - # Verify custom lock name in resource_id - call_args = mock_client._acquire_lock.call_args - resource_id = call_args[0][0] - assert ":meth:custom_lock:" in resource_id - assert ":meth:sample:" not in resource_id - - @patch("application_sdk.decorators.method_lock.RedisClientAsync") - @patch("application_sdk.decorators.method_lock.activity") - @patch("application_sdk.decorators.method_lock.IS_LOCKING_DISABLED", False) - async def test_lock_per_run_custom_ttl( - self, mock_activity, mock_redis_client_class - ): - """Test custom TTL is used when provided.""" - # Setup mocks - mock_activity.info.return_value = SimpleNamespace(workflow_run_id="run-ttl") - mock_client = AsyncMock() - mock_client._acquire_lock.return_value = True - mock_client._release_lock.return_value = (True, None) - mock_redis_client_class.return_value.__aenter__.return_value = mock_client - - # Execute - @lock_per_run(ttl_seconds=300) - async def sample() -> str: - return "ok" - - await sample() - - # Verify custom TTL is passed to acquire_lock - call_args = mock_client._acquire_lock.call_args - ttl_seconds = call_args[0][2] - assert ttl_seconds == 300 diff --git a/tests/unit/outputs/test_output.py b/tests/unit/outputs/test_output.py index 30d08f933..83669f17c 100644 --- a/tests/unit/outputs/test_output.py +++ b/tests/unit/outputs/test_output.py @@ -163,115 +163,3 @@ async def test_write_statistics_error(self): mock_logger.assert_called_once() assert "Error writing statistics" in mock_logger.call_args[0][0] - async def test__update_run_aggregate_skips_when_phase_unknown(self): - """Skips aggregation when phase cannot be inferred from output_path.""" - # Ensure no 'raw' or 'transformed' in path so phase is None - self.output.output_path = "/tmp/no-phase/path" - stats = {"typename": "table", "total_record_count": 10} - - with patch( - "application_sdk.services.objectstore.ObjectStore.download_file", - new_callable=AsyncMock, - ) as mock_dl, patch( - "application_sdk.services.objectstore.ObjectStore.upload_file", - new_callable=AsyncMock, - ) as mock_ul, patch("builtins.open", mock_open()) as m: - await self.output._update_run_aggregate("ignored", stats) - mock_dl.assert_not_awaited() - mock_ul.assert_not_awaited() - # No reads/writes when phase is unknown - assert m.call_count == 0 - - async def test__update_run_aggregate_creates_new_aggregate_extract(self): - """Creates a new aggregate structure and writes stats for Extract phase.""" - # Make phase inference return "Extract" - self.output.output_path = "/tmp/run/raw/path" - stats = {"typename": "table", "total_record_count": 7} - - with patch( - "application_sdk.outputs.build_output_path", return_value="workflow/run" - ), patch( - "application_sdk.outputs.get_object_store_prefix", - return_value="os://bucket/statistics.json.ignore", - ), patch( - "application_sdk.services.objectstore.ObjectStore.download_file", - new_callable=AsyncMock, - ) as mock_dl, patch( - "application_sdk.services.objectstore.ObjectStore.upload_file", - new_callable=AsyncMock, - ) as mock_ul, patch("builtins.open", mock_open()) as m: - # Simulate no existing aggregate in object store - mock_dl.side_effect = Exception("not found") - - await self.output._update_run_aggregate("ignored", stats) - - handle = m() - # One write with the aggregated payload - write_calls = handle.write.call_args_list - assert len(write_calls) == 1 - payload = write_calls[0].args[0] - data = json.loads(payload) - - assert data["Extract"]["table"]["record_count"] == 7 - # Other phases should exist, even if empty - assert "Transform" in data and "Publish" in data - - mock_ul.assert_awaited_once() - - async def test__update_run_aggregate_accumulates_existing_transform(self): - """Accumulates total_record_count into existing Transform aggregate.""" - # Make phase inference return "Transform" - self.output.output_path = "/tmp/run/transformed/path" - stats = {"typename": "table", "total_record_count": 3} - - existing = {"Extract": {}, "Transform": {"table": {"record_count": 5}}, "Publish": {}} - m = mock_open(read_data=json.dumps(existing)) - - with patch( - "application_sdk.outputs.build_output_path", return_value="workflow/run" - ), patch( - "application_sdk.outputs.get_object_store_prefix", - return_value="os://bucket/statistics.json.ignore", - ), patch( - "application_sdk.services.objectstore.ObjectStore.download_file", - new_callable=AsyncMock, - ) as mock_dl, patch( - "application_sdk.services.objectstore.ObjectStore.upload_file", - new_callable=AsyncMock, - ) as mock_ul, patch("builtins.open", m) as mo: - await self.output._update_run_aggregate("ignored", stats) - - handle = mo() - written = handle.write.call_args[0][0] - data = json.loads(written) - - # 5 existing + 3 new - assert data["Transform"]["table"]["record_count"] == 8 - mock_dl.assert_awaited_once() - mock_ul.assert_awaited_once() - - async def test__update_run_aggregate_defaults_unknown_typename(self): - """Uses 'unknown' typename when not provided in statistics.""" - self.output.output_path = "/tmp/run/raw/path" - stats = {"total_record_count": 4} # no 'typename' - - with patch( - "application_sdk.outputs.build_output_path", return_value="workflow/run" - ), patch( - "application_sdk.outputs.get_object_store_prefix", - return_value="os://bucket/statistics.json.ignore", - ), patch( - "application_sdk.services.objectstore.ObjectStore.download_file", - new_callable=AsyncMock, - ) as mock_dl, patch( - "application_sdk.services.objectstore.ObjectStore.upload_file", - new_callable=AsyncMock, - ) as mock_ul, patch("builtins.open", mock_open()) as m: - mock_dl.side_effect = Exception("not found") - - await self.output._update_run_aggregate("ignored", stats) - - payload = m().write.call_args[0][0] - data = json.loads(payload) - assert data["Extract"]["unknown"]["record_count"] == 4 - mock_ul.assert_awaited_once() From 7ecb6057ccd8cddb538450c5a77527b45f447f94 Mon Sep 17 00:00:00 2001 From: Anurag Badoni Date: Fri, 14 Nov 2025 22:57:36 +0530 Subject: [PATCH 12/12] fix path --- application_sdk/outputs/__init__.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/application_sdk/outputs/__init__.py b/application_sdk/outputs/__init__.py index 4fb71a173..1af98f918 100644 --- a/application_sdk/outputs/__init__.py +++ b/application_sdk/outputs/__init__.py @@ -438,6 +438,10 @@ async def write_statistics(self, typename: Optional[str] = None) -> Optional[Dic # If accessing chunk_start fails, fallback to default filename pass + # Write the statistics dictionary to the JSON file + with open(output_file_name, "wb") as f: + f.write(orjson.dumps(statistics)) + destination_file_path = get_object_store_prefix(output_file_name) # Push the file to the object store await ObjectStore.upload_file( @@ -448,4 +452,5 @@ async def write_statistics(self, typename: Optional[str] = None) -> Optional[Dic return statistics except Exception as e: logger.error(f"Error writing statistics: {str(e)}") + raise