diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index 84a55c5b..48969eaf 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -486,6 +486,7 @@ class _SourceRefreshOptions: class _ExecutionOptions: max_inflight_rows: int | None = None max_inflight_bytes: int | None = None + timeout: datetime.timedelta | None = None class FlowBuilder: diff --git a/python/cocoindex/op.py b/python/cocoindex/op.py index 694b79d1..dd888f26 100644 --- a/python/cocoindex/op.py +++ b/python/cocoindex/op.py @@ -49,6 +49,7 @@ ) from .runtime import to_async_call from .index import IndexOptions +import datetime class OpCategory(Enum): @@ -154,6 +155,7 @@ class OpArgs: - max_batch_size: The maximum batch size for the executor. Only valid if `batching` is True. - behavior_version: The behavior version of the executor. Cache will be invalidated if it changes. Must be provided if `cache` is True. + - timeout: Timeout in seconds for this function execution. None means use default (300s). - arg_relationship: It specifies the relationship between an input argument and the output, e.g. `(ArgRelationship.CHUNKS_BASE_TEXT, "content")` means the output is chunks for the input argument with name `content`. @@ -164,6 +166,7 @@ class OpArgs: batching: bool = False max_batch_size: int | None = None behavior_version: int | None = None + timeout: datetime.timedelta | None = None arg_relationship: tuple[ArgRelationship, str] | None = None @@ -202,6 +205,7 @@ def _register_op_factory( class _WrappedExecutor: _executor: Any + _spec: Any _args_info: list[_ArgInfo] _kwargs_info: dict[str, _ArgInfo] _result_encoder: Callable[[Any], Any] @@ -391,6 +395,9 @@ def enable_cache(self) -> bool: def behavior_version(self) -> int | None: return op_args.behavior_version + def timeout(self) -> datetime.timedelta | None: + return op_args.timeout + def batching_options(self) -> dict[str, Any] | None: if op_args.batching: return { diff --git a/rust/cocoindex/src/base/spec.rs b/rust/cocoindex/src/base/spec.rs index 2ae81818..7b7a7fe7 100644 --- a/rust/cocoindex/src/base/spec.rs +++ b/rust/cocoindex/src/base/spec.rs @@ -234,6 +234,9 @@ pub struct ExecutionOptions { #[serde(default, skip_serializing_if = "Option::is_none")] pub max_inflight_bytes: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub timeout: Option, } impl ExecutionOptions { @@ -289,6 +292,9 @@ impl fmt::Display for ImportOpSpec { pub struct TransformOpSpec { pub inputs: Vec, pub op: OpSpec, + + #[serde(default)] + pub execution_options: ExecutionOptions, } impl SpecFormatter for TransformOpSpec { diff --git a/rust/cocoindex/src/builder/analyzer.rs b/rust/cocoindex/src/builder/analyzer.rs index c3662d7a..a244a301 100644 --- a/rust/cocoindex/src/builder/analyzer.rs +++ b/rust/cocoindex/src/builder/analyzer.rs @@ -12,8 +12,11 @@ use crate::{ }; use futures::future::{BoxFuture, try_join3}; use futures::{FutureExt, future::try_join_all}; +use std::time::Duration; use utils::fingerprint::Fingerprinter; +const TIMEOUT_THRESHOLD: Duration = Duration::from_secs(1800); + #[derive(Debug)] pub(super) enum ValueTypeBuilder { Basic(BasicValueType), @@ -804,6 +807,8 @@ impl AnalyzerContext { let output = op_scope.add_op_output(reactive_op_name.clone(), output_enriched_type)?; let op_name = reactive_op_name.clone(); + let op_kind = op.op.kind.clone(); + let execution_options_timeout = op.execution_options.timeout; async move { trace!("Start building executor for transform op `{op_name}`"); let executor = executor.await.with_context(|| { @@ -811,9 +816,13 @@ impl AnalyzerContext { })?; let enable_cache = executor.enable_cache(); let behavior_version = executor.behavior_version(); + let timeout = executor.timeout() + .or(execution_options_timeout) + .or(Some(TIMEOUT_THRESHOLD)); trace!("Finished building executor for transform op `{op_name}`, enable cache: {enable_cache}, behavior version: {behavior_version:?}"); let function_exec_info = AnalyzedFunctionExecInfo { enable_cache, + timeout, behavior_version, fingerprinter: logic_fingerprinter .with(&behavior_version)?, @@ -828,6 +837,7 @@ impl AnalyzerContext { } Ok(AnalyzedReactiveOp::Transform(AnalyzedTransformOp { name: op_name, + op_kind, inputs: input_value_mappings, function_exec_info, executor, diff --git a/rust/cocoindex/src/builder/flow_builder.rs b/rust/cocoindex/src/builder/flow_builder.rs index 6c05917a..b6d38bb2 100644 --- a/rust/cocoindex/src/builder/flow_builder.rs +++ b/rust/cocoindex/src/builder/flow_builder.rs @@ -461,6 +461,7 @@ impl FlowBuilder { }) .collect(), op: spec, + execution_options: Default::default(), }), }; diff --git a/rust/cocoindex/src/builder/plan.rs b/rust/cocoindex/src/builder/plan.rs index 33c0989c..7a2fbddc 100644 --- a/rust/cocoindex/src/builder/plan.rs +++ b/rust/cocoindex/src/builder/plan.rs @@ -3,6 +3,7 @@ use crate::base::spec::FieldName; use crate::prelude::*; use crate::ops::interface::*; +use std::time::Duration; use utils::fingerprint::{Fingerprint, Fingerprinter}; #[derive(Debug, Clone, PartialEq, Eq, Serialize)] @@ -64,6 +65,7 @@ pub struct AnalyzedImportOp { pub struct AnalyzedFunctionExecInfo { pub enable_cache: bool, + pub timeout: Option, pub behavior_version: Option, /// Fingerprinter of the function's behavior. @@ -74,6 +76,7 @@ pub struct AnalyzedFunctionExecInfo { pub struct AnalyzedTransformOp { pub name: String, + pub op_kind: String, pub inputs: Vec, pub function_exec_info: AnalyzedFunctionExecInfo, pub executor: Box, diff --git a/rust/cocoindex/src/execution/evaluator.rs b/rust/cocoindex/src/execution/evaluator.rs index aab987f0..f3d83f46 100644 --- a/rust/cocoindex/src/execution/evaluator.rs +++ b/rust/cocoindex/src/execution/evaluator.rs @@ -2,6 +2,8 @@ use crate::prelude::*; use anyhow::{Context, Ok}; use futures::future::try_join_all; +use log::warn; +use tokio::time::Duration; use crate::base::value::EstimatedByteSize; use crate::base::{schema, value}; @@ -11,6 +13,9 @@ use utils::immutable::RefList; use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, evaluate_with_cell}; +const TIMEOUT_THRESHOLD: Duration = Duration::from_secs(1800); +const WARNING_THRESHOLD: Duration = Duration::from_secs(30); + #[derive(Debug)] pub struct ScopeValueBuilder { // TODO: Share the same lock for values produced in the same execution scope, for stricter atomicity. @@ -356,6 +361,43 @@ async fn evaluate_child_op_scope( }) } +async fn evaluate_with_timeout_and_warning( + eval_future: F, + timeout_duration: Duration, + warn_duration: Duration, + op_kind: String, + op_name: String, +) -> Result +where + F: std::future::Future>, +{ + let mut eval_future = Box::pin(eval_future); + let mut warned = false; + let timeout_future = tokio::time::sleep(timeout_duration); + tokio::pin!(timeout_future); + + loop { + tokio::select! { + res = &mut eval_future => { + return res; + } + _ = &mut timeout_future => { + return Err(anyhow!( + "Function '{}' ({}) timed out after {} seconds", + op_kind, op_name, timeout_duration.as_secs() + )); + } + _ = tokio::time::sleep(warn_duration), if !warned => { + warn!( + "Function '{}' ({}) is taking longer than {}s", + op_kind, op_name, WARNING_THRESHOLD.as_secs() + ); + warned = true; + } + } + } +} + async fn evaluate_op_scope( op_scope: &AnalyzedOpScope, scoped_entries: RefList<'_, &ScopeEntry<'_>>, @@ -378,6 +420,12 @@ async fn evaluate_op_scope( input_values.push(value?); } + let timeout_duration = op.function_exec_info.timeout.unwrap_or(TIMEOUT_THRESHOLD); + let warn_duration = WARNING_THRESHOLD; + + let op_name_for_warning = op.name.clone(); + let op_kind_for_warning = op.op_kind.clone(); + let result = if op.function_exec_info.enable_cache { let output_value_cell = memory.get_cache_entry( || { @@ -391,18 +439,33 @@ async fn evaluate_op_scope( &op.function_exec_info.output_type, /*ttl=*/ None, )?; - evaluate_with_cell(output_value_cell.as_ref(), move || { + + let eval_future = evaluate_with_cell(output_value_cell.as_ref(), move || { op.executor.evaluate(input_values) - }) - .await - .and_then(|v| head_scope.define_field(&op.output, &v)) + }); + let v = evaluate_with_timeout_and_warning( + eval_future, + timeout_duration, + warn_duration, + op_kind_for_warning, + op_name_for_warning, + ) + .await?; + + head_scope.define_field(&op.output, &v) } else { - op.executor - .evaluate(input_values) - .await - .and_then(|v| head_scope.define_field(&op.output, &v)) - } - .with_context(|| format!("Evaluating Transform op `{}`", op.name,)); + let eval_future = op.executor.evaluate(input_values); + let v = evaluate_with_timeout_and_warning( + eval_future, + timeout_duration, + warn_duration, + op_kind_for_warning, + op_name_for_warning, + ) + .await?; + + head_scope.define_field(&op.output, &v) + }; // Track transform operation completion if let Some(ref op_stats) = operation_in_process_stats { @@ -411,7 +474,7 @@ async fn evaluate_op_scope( op_stats.finish_processing(&transform_key, 1); } - result? + result.with_context(|| format!("Evaluating Transform op `{}`", op.name))? } AnalyzedReactiveOp::ForEach(op) => { diff --git a/rust/cocoindex/src/ops/interface.rs b/rust/cocoindex/src/ops/interface.rs index 4980af5d..7cef1eda 100644 --- a/rust/cocoindex/src/ops/interface.rs +++ b/rust/cocoindex/src/ops/interface.rs @@ -185,6 +185,11 @@ pub trait SimpleFunctionExecutor: Send + Sync { fn behavior_version(&self) -> Option { None } + + /// Returns None to use the default timeout (1800s) + fn timeout(&self) -> Option { + None + } } #[async_trait] diff --git a/rust/cocoindex/src/ops/py_factory.rs b/rust/cocoindex/src/ops/py_factory.rs index 7bcf3b3a..584b760b 100644 --- a/rust/cocoindex/src/ops/py_factory.rs +++ b/rust/cocoindex/src/ops/py_factory.rs @@ -43,6 +43,7 @@ struct PyFunctionExecutor { enable_cache: bool, behavior_version: Option, + timeout: Option, } impl PyFunctionExecutor { @@ -112,6 +113,10 @@ impl interface::SimpleFunctionExecutor for Arc { fn behavior_version(&self) -> Option { self.behavior_version } + + fn timeout(&self) -> Option { + self.timeout + } } struct PyBatchedFunctionExecutor { @@ -121,6 +126,7 @@ struct PyBatchedFunctionExecutor { enable_cache: bool, behavior_version: Option, + timeout: Option, batching_options: batching::BatchingOptions, } @@ -240,7 +246,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory { .as_ref() .ok_or_else(|| anyhow!("Python execution context is missing"))? .clone(); - let (prepare_fut, enable_cache, behavior_version, batching_options) = + let (prepare_fut, enable_cache, behavior_version, timeout, batching_options) = Python::with_gil(|py| -> anyhow::Result<_> { let prepare_coro = executor .call_method(py, "prepare", (), None) @@ -260,6 +266,17 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory { .call_method(py, "behavior_version", (), None) .to_result_with_py_trace(py)? .extract::>(py)?; + let timeout = executor + .call_method(py, "timeout", (), None) + .to_result_with_py_trace(py)?; + let timeout = if timeout.is_none(py) { + None + } else { + let td = timeout.into_bound(py); + let total_seconds = + td.call_method0("total_seconds")?.extract::()?; + Some(std::time::Duration::from_secs_f64(total_seconds)) + }; let batching_options = executor .call_method(py, "batching_options", (), None) .to_result_with_py_trace(py)? @@ -271,6 +288,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory { prepare_fut, enable_cache, behavior_version, + timeout, batching_options, )) })?; @@ -284,6 +302,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory { result_type, enable_cache, behavior_version, + timeout, batching_options, } .into_fn_executor(), @@ -297,6 +316,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory { result_type, enable_cache, behavior_version, + timeout, })) }; Ok(executor)