From a930f5473d1d8e7336054ba3b51dcc4d167675f5 Mon Sep 17 00:00:00 2001 From: AdwitaSingh1711 <108565358+AdwitaSingh1711@users.noreply.github.com> Date: Sun, 2 Nov 2025 16:47:42 +0530 Subject: [PATCH 1/7] feat: add timeout to prevent long-running functions (Closes #658) --- python/cocoindex/flow.py | 1 + python/cocoindex/op.py | 9 ++ rust/cocoindex/src/base/spec.rs | 6 + rust/cocoindex/src/builder/analyzer.rs | 8 ++ rust/cocoindex/src/builder/flow_builder.rs | 1 + rust/cocoindex/src/builder/plan.rs | 3 + rust/cocoindex/src/execution/evaluator.rs | 118 ++++++++++++++++--- rust/cocoindex/src/execution/live_updater.rs | 1 + rust/cocoindex/src/ops/interface.rs | 5 + rust/cocoindex/src/ops/py_factory.rs | 18 ++- rust/utils/src/retryable.rs | 3 + 11 files changed, 156 insertions(+), 17 deletions(-) diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index 84a55c5b..b4eb9b40 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -486,6 +486,7 @@ class _SourceRefreshOptions: class _ExecutionOptions: max_inflight_rows: int | None = None max_inflight_bytes: int | None = None + timeout: int | None = None class FlowBuilder: diff --git a/python/cocoindex/op.py b/python/cocoindex/op.py index 694b79d1..4e0e574b 100644 --- a/python/cocoindex/op.py +++ b/python/cocoindex/op.py @@ -154,6 +154,7 @@ class OpArgs: - max_batch_size: The maximum batch size for the executor. Only valid if `batching` is True. - behavior_version: The behavior version of the executor. Cache will be invalidated if it changes. Must be provided if `cache` is True. + - timeout: Timeout in seconds for this function execution. None means use default (300s). - arg_relationship: It specifies the relationship between an input argument and the output, e.g. `(ArgRelationship.CHUNKS_BASE_TEXT, "content")` means the output is chunks for the input argument with name `content`. @@ -164,6 +165,7 @@ class OpArgs: batching: bool = False max_batch_size: int | None = None behavior_version: int | None = None + timeout: int | None = None arg_relationship: tuple[ArgRelationship, str] | None = None @@ -202,6 +204,7 @@ def _register_op_factory( class _WrappedExecutor: _executor: Any + _spec: Any _args_info: list[_ArgInfo] _kwargs_info: dict[str, _ArgInfo] _result_encoder: Callable[[Any], Any] @@ -391,6 +394,12 @@ def enable_cache(self) -> bool: def behavior_version(self) -> int | None: return op_args.behavior_version + def timeout(self) -> int | None: + if op_args.timeout is not None: + return op_args.timeout + + return None + def batching_options(self) -> dict[str, Any] | None: if op_args.batching: return { diff --git a/rust/cocoindex/src/base/spec.rs b/rust/cocoindex/src/base/spec.rs index 2ae81818..7b7a7fe7 100644 --- a/rust/cocoindex/src/base/spec.rs +++ b/rust/cocoindex/src/base/spec.rs @@ -234,6 +234,9 @@ pub struct ExecutionOptions { #[serde(default, skip_serializing_if = "Option::is_none")] pub max_inflight_bytes: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub timeout: Option, } impl ExecutionOptions { @@ -289,6 +292,9 @@ impl fmt::Display for ImportOpSpec { pub struct TransformOpSpec { pub inputs: Vec, pub op: OpSpec, + + #[serde(default)] + pub execution_options: ExecutionOptions, } impl SpecFormatter for TransformOpSpec { diff --git a/rust/cocoindex/src/builder/analyzer.rs b/rust/cocoindex/src/builder/analyzer.rs index c3662d7a..a945a6e6 100644 --- a/rust/cocoindex/src/builder/analyzer.rs +++ b/rust/cocoindex/src/builder/analyzer.rs @@ -12,6 +12,7 @@ use crate::{ }; use futures::future::{BoxFuture, try_join3}; use futures::{FutureExt, future::try_join_all}; +use tokio::time::Duration; use utils::fingerprint::Fingerprinter; #[derive(Debug)] @@ -804,6 +805,8 @@ impl AnalyzerContext { let output = op_scope.add_op_output(reactive_op_name.clone(), output_enriched_type)?; let op_name = reactive_op_name.clone(); + let op_kind = op.op.kind.clone(); + let execution_options_timeout = op.execution_options.timeout; async move { trace!("Start building executor for transform op `{op_name}`"); let executor = executor.await.with_context(|| { @@ -811,9 +814,13 @@ impl AnalyzerContext { })?; let enable_cache = executor.enable_cache(); let behavior_version = executor.behavior_version(); + let timeout = executor.timeout() + .or(execution_options_timeout) + .or(Some(Duration::from_secs(300))); trace!("Finished building executor for transform op `{op_name}`, enable cache: {enable_cache}, behavior version: {behavior_version:?}"); let function_exec_info = AnalyzedFunctionExecInfo { enable_cache, + timeout, behavior_version, fingerprinter: logic_fingerprinter .with(&behavior_version)?, @@ -828,6 +835,7 @@ impl AnalyzerContext { } Ok(AnalyzedReactiveOp::Transform(AnalyzedTransformOp { name: op_name, + op_kind, inputs: input_value_mappings, function_exec_info, executor, diff --git a/rust/cocoindex/src/builder/flow_builder.rs b/rust/cocoindex/src/builder/flow_builder.rs index 6c05917a..b6d38bb2 100644 --- a/rust/cocoindex/src/builder/flow_builder.rs +++ b/rust/cocoindex/src/builder/flow_builder.rs @@ -461,6 +461,7 @@ impl FlowBuilder { }) .collect(), op: spec, + execution_options: Default::default(), }), }; diff --git a/rust/cocoindex/src/builder/plan.rs b/rust/cocoindex/src/builder/plan.rs index 33c0989c..02603cdd 100644 --- a/rust/cocoindex/src/builder/plan.rs +++ b/rust/cocoindex/src/builder/plan.rs @@ -4,6 +4,7 @@ use crate::prelude::*; use crate::ops::interface::*; use utils::fingerprint::{Fingerprint, Fingerprinter}; +use std::time::Duration; #[derive(Debug, Clone, PartialEq, Eq, Serialize)] pub struct AnalyzedLocalFieldReference { @@ -64,6 +65,7 @@ pub struct AnalyzedImportOp { pub struct AnalyzedFunctionExecInfo { pub enable_cache: bool, + pub timeout: Option, pub behavior_version: Option, /// Fingerprinter of the function's behavior. @@ -74,6 +76,7 @@ pub struct AnalyzedFunctionExecInfo { pub struct AnalyzedTransformOp { pub name: String, + pub op_kind: String, pub inputs: Vec, pub function_exec_info: AnalyzedFunctionExecInfo, pub executor: Box, diff --git a/rust/cocoindex/src/execution/evaluator.rs b/rust/cocoindex/src/execution/evaluator.rs index aab987f0..eb295130 100644 --- a/rust/cocoindex/src/execution/evaluator.rs +++ b/rust/cocoindex/src/execution/evaluator.rs @@ -2,6 +2,8 @@ use crate::prelude::*; use anyhow::{Context, Ok}; use futures::future::try_join_all; +use log::warn; +use tokio::time::Duration; use crate::base::value::EstimatedByteSize; use crate::base::{schema, value}; @@ -366,10 +368,13 @@ async fn evaluate_op_scope( for reactive_op in op_scope.reactive_ops.iter() { match reactive_op { AnalyzedReactiveOp::Transform(op) => { + let transform_key = format!("transform/{}{}", op_scope.scope_qualifier, op.name); + + // eprintln!("🔍 DEBUG: Transform op '{}' (function: {}) starting, timeout: {:?}", + // op.name, op.op_kind, op.function_exec_info.timeout); + // Track transform operation start if let Some(ref op_stats) = operation_in_process_stats { - let transform_key = - format!("transform/{}{}", op_scope.scope_qualifier, op.name); op_stats.start_processing(&transform_key, 1); } @@ -378,6 +383,28 @@ async fn evaluate_op_scope( input_values.push(value?); } + let timeout_duration = op + .function_exec_info + .timeout + .unwrap_or(Duration::from_secs(300)); + let warn_duration = Duration::from_secs(30); + + let op_name_for_warning = op.name.clone(); + let op_kind_for_warning = op.op_kind.clone(); + let warn_handle = tokio::spawn(async move { + tokio::time::sleep(warn_duration).await; + // eprintln!("WARNING: Function '{}' is taking longer than 30s", op_name_for_warning); + // warn!("Function '{}' is taking longer than 30s", op_name_for_warning); + eprintln!( + "⚠️ WARNING: Function '{}' ({}) is taking longer than 30s", + op_kind_for_warning, op_name_for_warning + ); // ✅ Show both + warn!( + "Function '{}' ({}) is taking longer than 30s", + op_kind_for_warning, op_name_for_warning + ); + }); + // Execute with timeout let result = if op.function_exec_info.enable_cache { let output_value_cell = memory.get_cache_entry( || { @@ -391,27 +418,88 @@ async fn evaluate_op_scope( &op.function_exec_info.output_type, /*ttl=*/ None, )?; - evaluate_with_cell(output_value_cell.as_ref(), move || { + + let eval_future = evaluate_with_cell(output_value_cell.as_ref(), move || { op.executor.evaluate(input_values) - }) - .await - .and_then(|v| head_scope.define_field(&op.output, &v)) + }); + + // Handle timeout + let timeout_result = tokio::time::timeout(timeout_duration, eval_future).await; + if timeout_result.is_err() { + Err(anyhow!( + // "Function '{}' timed out after {} seconds", + "Function '{}' ({}) timed out after {} seconds", + op.op_kind, + op.name, + timeout_duration.as_secs() + )) + } else { + timeout_result + .unwrap() + .and_then(|v| head_scope.define_field(&op.output, &v)) + } } else { - op.executor - .evaluate(input_values) - .await - .and_then(|v| head_scope.define_field(&op.output, &v)) - } - .with_context(|| format!("Evaluating Transform op `{}`", op.name,)); + let eval_future = op.executor.evaluate(input_values); + + // Handle timeout + let timeout_result = tokio::time::timeout(timeout_duration, eval_future).await; + if timeout_result.is_err() { + Err(anyhow!( + // "Function '{}' timed out after {} seconds", + "Function '{}' ({}) timed out after {} seconds", + op.op_kind, + op.name, + timeout_duration.as_secs() + )) + } else { + timeout_result + .unwrap() + .and_then(|v| head_scope.define_field(&op.output, &v)) + } + }; + + warn_handle.abort(); // Track transform operation completion if let Some(ref op_stats) = operation_in_process_stats { - let transform_key = - format!("transform/{}{}", op_scope.scope_qualifier, op.name); op_stats.finish_processing(&transform_key, 1); } - result? + result.with_context(|| format!("Evaluating Transform op `{}`", op.name))? + // let result = if op.function_exec_info.enable_cache { + // let output_value_cell = memory.get_cache_entry( + // || { + // Ok(op + // .function_exec_info + // .fingerprinter + // .clone() + // .with(&input_values)? + // .into_fingerprint()) + // }, + // &op.function_exec_info.output_type, + // /*ttl=*/ None, + // )?; + // evaluate_with_cell(output_value_cell.as_ref(), move || { + // op.executor.evaluate(input_values) + // }) + // .await + // .and_then(|v| head_scope.define_field(&op.output, &v)) + // } else { + // op.executor + // .evaluate(input_values) + // .await + // .and_then(|v| head_scope.define_field(&op.output, &v)) + // } + // .with_context(|| format!("Evaluating Transform op `{}`", op.name,)); + + // // Track transform operation completion + // if let Some(ref op_stats) = operation_in_process_stats { + // let transform_key = + // format!("transform/{}{}", op_scope.scope_qualifier, op.name); + // op_stats.finish_processing(&transform_key, 1); + // } + + // result? } AnalyzedReactiveOp::ForEach(op) => { diff --git a/rust/cocoindex/src/execution/live_updater.rs b/rust/cocoindex/src/execution/live_updater.rs index 0fb623a2..2833295c 100644 --- a/rust/cocoindex/src/execution/live_updater.rs +++ b/rust/cocoindex/src/execution/live_updater.rs @@ -172,6 +172,7 @@ impl SourceUpdateTask { let mut change_stream = change_stream; let retry_options = retryable::RetryOptions { retry_timeout: None, + per_call_timeout: None, initial_backoff: std::time::Duration::from_secs(5), max_backoff: std::time::Duration::from_secs(60), }; diff --git a/rust/cocoindex/src/ops/interface.rs b/rust/cocoindex/src/ops/interface.rs index 4980af5d..6a5d4000 100644 --- a/rust/cocoindex/src/ops/interface.rs +++ b/rust/cocoindex/src/ops/interface.rs @@ -185,6 +185,11 @@ pub trait SimpleFunctionExecutor: Send + Sync { fn behavior_version(&self) -> Option { None } + + /// Returns None to use the default timeout (300s) + fn timeout(&self) -> Option { + None + } } #[async_trait] diff --git a/rust/cocoindex/src/ops/py_factory.rs b/rust/cocoindex/src/ops/py_factory.rs index 7bcf3b3a..c0257316 100644 --- a/rust/cocoindex/src/ops/py_factory.rs +++ b/rust/cocoindex/src/ops/py_factory.rs @@ -43,6 +43,7 @@ struct PyFunctionExecutor { enable_cache: bool, behavior_version: Option, + timeout: Option, } impl PyFunctionExecutor { @@ -112,6 +113,10 @@ impl interface::SimpleFunctionExecutor for Arc { fn behavior_version(&self) -> Option { self.behavior_version } + + fn timeout(&self) -> Option { + self.timeout + } } struct PyBatchedFunctionExecutor { @@ -121,6 +126,7 @@ struct PyBatchedFunctionExecutor { enable_cache: bool, behavior_version: Option, + timeout: Option, batching_options: batching::BatchingOptions, } @@ -240,7 +246,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory { .as_ref() .ok_or_else(|| anyhow!("Python execution context is missing"))? .clone(); - let (prepare_fut, enable_cache, behavior_version, batching_options) = + let (prepare_fut, enable_cache, behavior_version, timeout, batching_options) = Python::with_gil(|py| -> anyhow::Result<_> { let prepare_coro = executor .call_method(py, "prepare", (), None) @@ -260,6 +266,11 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory { .call_method(py, "behavior_version", (), None) .to_result_with_py_trace(py)? .extract::>(py)?; + let timeout = executor + .call_method(py, "timeout", (), None) + .to_result_with_py_trace(py)? + .extract::>(py)? + .map(std::time::Duration::from_secs); let batching_options = executor .call_method(py, "batching_options", (), None) .to_result_with_py_trace(py)? @@ -270,7 +281,8 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory { Ok(( prepare_fut, enable_cache, - behavior_version, + behavior_version, + timeout, batching_options, )) })?; @@ -284,6 +296,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory { result_type, enable_cache, behavior_version, + timeout, batching_options, } .into_fn_executor(), @@ -297,6 +310,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory { result_type, enable_cache, behavior_version, + timeout, })) }; Ok(executor) diff --git a/rust/utils/src/retryable.rs b/rust/utils/src/retryable.rs index 622f29bc..43cf5b38 100644 --- a/rust/utils/src/retryable.rs +++ b/rust/utils/src/retryable.rs @@ -109,6 +109,7 @@ pub fn Ok(value: T) -> Result { pub struct RetryOptions { pub retry_timeout: Option, + pub per_call_timeout: Option, pub initial_backoff: Duration, pub max_backoff: Duration, } @@ -117,6 +118,7 @@ impl Default for RetryOptions { fn default() -> Self { Self { retry_timeout: Some(DEFAULT_RETRY_TIMEOUT), + per_call_timeout: None, initial_backoff: Duration::from_millis(100), max_backoff: Duration::from_secs(10), } @@ -125,6 +127,7 @@ impl Default for RetryOptions { pub static HEAVY_LOADED_OPTIONS: RetryOptions = RetryOptions { retry_timeout: Some(DEFAULT_RETRY_TIMEOUT), + per_call_timeout: Some(Duration::from_secs(300)), initial_backoff: Duration::from_secs(1), max_backoff: Duration::from_secs(60), }; From 0d5d6bfdf3354981553b90ccadb246a8acd0d718 Mon Sep 17 00:00:00 2001 From: AdwitaSingh1711 <108565358+AdwitaSingh1711@users.noreply.github.com> Date: Sun, 2 Nov 2025 17:28:51 +0530 Subject: [PATCH 2/7] cleaned evaluator.rs --- rust/cocoindex/src/execution/evaluator.rs | 41 ----------------------- 1 file changed, 41 deletions(-) diff --git a/rust/cocoindex/src/execution/evaluator.rs b/rust/cocoindex/src/execution/evaluator.rs index eb295130..ca517e96 100644 --- a/rust/cocoindex/src/execution/evaluator.rs +++ b/rust/cocoindex/src/execution/evaluator.rs @@ -370,9 +370,6 @@ async fn evaluate_op_scope( AnalyzedReactiveOp::Transform(op) => { let transform_key = format!("transform/{}{}", op_scope.scope_qualifier, op.name); - // eprintln!("🔍 DEBUG: Transform op '{}' (function: {}) starting, timeout: {:?}", - // op.name, op.op_kind, op.function_exec_info.timeout); - // Track transform operation start if let Some(ref op_stats) = operation_in_process_stats { op_stats.start_processing(&transform_key, 1); @@ -393,8 +390,6 @@ async fn evaluate_op_scope( let op_kind_for_warning = op.op_kind.clone(); let warn_handle = tokio::spawn(async move { tokio::time::sleep(warn_duration).await; - // eprintln!("WARNING: Function '{}' is taking longer than 30s", op_name_for_warning); - // warn!("Function '{}' is taking longer than 30s", op_name_for_warning); eprintln!( "⚠️ WARNING: Function '{}' ({}) is taking longer than 30s", op_kind_for_warning, op_name_for_warning @@ -427,7 +422,6 @@ async fn evaluate_op_scope( let timeout_result = tokio::time::timeout(timeout_duration, eval_future).await; if timeout_result.is_err() { Err(anyhow!( - // "Function '{}' timed out after {} seconds", "Function '{}' ({}) timed out after {} seconds", op.op_kind, op.name, @@ -445,7 +439,6 @@ async fn evaluate_op_scope( let timeout_result = tokio::time::timeout(timeout_duration, eval_future).await; if timeout_result.is_err() { Err(anyhow!( - // "Function '{}' timed out after {} seconds", "Function '{}' ({}) timed out after {} seconds", op.op_kind, op.name, @@ -466,40 +459,6 @@ async fn evaluate_op_scope( } result.with_context(|| format!("Evaluating Transform op `{}`", op.name))? - // let result = if op.function_exec_info.enable_cache { - // let output_value_cell = memory.get_cache_entry( - // || { - // Ok(op - // .function_exec_info - // .fingerprinter - // .clone() - // .with(&input_values)? - // .into_fingerprint()) - // }, - // &op.function_exec_info.output_type, - // /*ttl=*/ None, - // )?; - // evaluate_with_cell(output_value_cell.as_ref(), move || { - // op.executor.evaluate(input_values) - // }) - // .await - // .and_then(|v| head_scope.define_field(&op.output, &v)) - // } else { - // op.executor - // .evaluate(input_values) - // .await - // .and_then(|v| head_scope.define_field(&op.output, &v)) - // } - // .with_context(|| format!("Evaluating Transform op `{}`", op.name,)); - - // // Track transform operation completion - // if let Some(ref op_stats) = operation_in_process_stats { - // let transform_key = - // format!("transform/{}{}", op_scope.scope_qualifier, op.name); - // op_stats.finish_processing(&transform_key, 1); - // } - - // result? } AnalyzedReactiveOp::ForEach(op) => { From 85a56b6ad5cdfeec8382fcce5951349326a2a569 Mon Sep 17 00:00:00 2001 From: AdwitaSingh1711 <108565358+AdwitaSingh1711@users.noreply.github.com> Date: Mon, 3 Nov 2025 22:53:20 +0530 Subject: [PATCH 3/7] cleaned markers --- rust/cocoindex/src/execution/evaluator.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/rust/cocoindex/src/execution/evaluator.rs b/rust/cocoindex/src/execution/evaluator.rs index ca517e96..a65d5658 100644 --- a/rust/cocoindex/src/execution/evaluator.rs +++ b/rust/cocoindex/src/execution/evaluator.rs @@ -370,7 +370,6 @@ async fn evaluate_op_scope( AnalyzedReactiveOp::Transform(op) => { let transform_key = format!("transform/{}{}", op_scope.scope_qualifier, op.name); - // Track transform operation start if let Some(ref op_stats) = operation_in_process_stats { op_stats.start_processing(&transform_key, 1); } @@ -391,15 +390,14 @@ async fn evaluate_op_scope( let warn_handle = tokio::spawn(async move { tokio::time::sleep(warn_duration).await; eprintln!( - "⚠️ WARNING: Function '{}' ({}) is taking longer than 30s", + "WARNING: Function '{}' ({}) is taking longer than 30s", op_kind_for_warning, op_name_for_warning - ); // ✅ Show both + ); warn!( "Function '{}' ({}) is taking longer than 30s", op_kind_for_warning, op_name_for_warning ); }); - // Execute with timeout let result = if op.function_exec_info.enable_cache { let output_value_cell = memory.get_cache_entry( || { @@ -418,7 +416,6 @@ async fn evaluate_op_scope( op.executor.evaluate(input_values) }); - // Handle timeout let timeout_result = tokio::time::timeout(timeout_duration, eval_future).await; if timeout_result.is_err() { Err(anyhow!( @@ -435,7 +432,6 @@ async fn evaluate_op_scope( } else { let eval_future = op.executor.evaluate(input_values); - // Handle timeout let timeout_result = tokio::time::timeout(timeout_duration, eval_future).await; if timeout_result.is_err() { Err(anyhow!( @@ -453,7 +449,6 @@ async fn evaluate_op_scope( warn_handle.abort(); - // Track transform operation completion if let Some(ref op_stats) = operation_in_process_stats { op_stats.finish_processing(&transform_key, 1); } From 28c417a92e5e621cd25e6e74668eead23563260e Mon Sep 17 00:00:00 2001 From: AdwitaSingh1711 <108565358+AdwitaSingh1711@users.noreply.github.com> Date: Fri, 7 Nov 2025 21:11:26 +0530 Subject: [PATCH 4/7] replace tokio::spawn with tokio::select --- rust/cocoindex/src/builder/analyzer.rs | 8 +- rust/cocoindex/src/execution/evaluator.rs | 194 +++++++++++++++++----- 2 files changed, 160 insertions(+), 42 deletions(-) diff --git a/rust/cocoindex/src/builder/analyzer.rs b/rust/cocoindex/src/builder/analyzer.rs index a945a6e6..5e8d57ed 100644 --- a/rust/cocoindex/src/builder/analyzer.rs +++ b/rust/cocoindex/src/builder/analyzer.rs @@ -15,6 +15,9 @@ use futures::{FutureExt, future::try_join_all}; use tokio::time::Duration; use utils::fingerprint::Fingerprinter; +const TIMEOUT_THRESHOLD: u64 = 1800; +const WARNING_THRESHOLD: u64 = 30; + #[derive(Debug)] pub(super) enum ValueTypeBuilder { Basic(BasicValueType), @@ -814,9 +817,12 @@ impl AnalyzerContext { })?; let enable_cache = executor.enable_cache(); let behavior_version = executor.behavior_version(); + // let timeout = executor.timeout() + // .or(execution_options_timeout) + // .or(Some(Duration::from_secs(300))); let timeout = executor.timeout() .or(execution_options_timeout) - .or(Some(Duration::from_secs(300))); + .or(Some(Duration::from_secs(TIMEOUT_THRESHOLD))); trace!("Finished building executor for transform op `{op_name}`, enable cache: {enable_cache}, behavior version: {behavior_version:?}"); let function_exec_info = AnalyzedFunctionExecInfo { enable_cache, diff --git a/rust/cocoindex/src/execution/evaluator.rs b/rust/cocoindex/src/execution/evaluator.rs index a65d5658..b09de7a8 100644 --- a/rust/cocoindex/src/execution/evaluator.rs +++ b/rust/cocoindex/src/execution/evaluator.rs @@ -13,6 +13,9 @@ use utils::immutable::RefList; use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, evaluate_with_cell}; +const TIMEOUT_THRESHOLD: u64 = 1800; +const WARNING_THRESHOLD: u64 = 30; + #[derive(Debug)] pub struct ScopeValueBuilder { // TODO: Share the same lock for values produced in the same execution scope, for stricter atomicity. @@ -379,25 +382,20 @@ async fn evaluate_op_scope( input_values.push(value?); } + // let timeout_duration = op + // .function_exec_info + // .timeout + // .unwrap_or(Duration::from_secs(300)); + // let warn_duration = Duration::from_secs(30); let timeout_duration = op .function_exec_info .timeout - .unwrap_or(Duration::from_secs(300)); - let warn_duration = Duration::from_secs(30); + .unwrap_or(Duration::from_secs(TIMEOUT_THRESHOLD)); + let warn_duration = Duration::from_secs(WARNING_THRESHOLD); let op_name_for_warning = op.name.clone(); let op_kind_for_warning = op.op_kind.clone(); - let warn_handle = tokio::spawn(async move { - tokio::time::sleep(warn_duration).await; - eprintln!( - "WARNING: Function '{}' ({}) is taking longer than 30s", - op_kind_for_warning, op_name_for_warning - ); - warn!( - "Function '{}' ({}) is taking longer than 30s", - op_kind_for_warning, op_name_for_warning - ); - }); + let result = if op.function_exec_info.enable_cache { let output_value_cell = memory.get_cache_entry( || { @@ -416,38 +414,152 @@ async fn evaluate_op_scope( op.executor.evaluate(input_values) }); - let timeout_result = tokio::time::timeout(timeout_duration, eval_future).await; - if timeout_result.is_err() { - Err(anyhow!( - "Function '{}' ({}) timed out after {} seconds", - op.op_kind, - op.name, - timeout_duration.as_secs() - )) - } else { - timeout_result - .unwrap() - .and_then(|v| head_scope.define_field(&op.output, &v)) - } + // Warning + timeout logic + let mut eval_future = Box::pin(eval_future); + let mut warned = false; + let timeout_future = tokio::time::sleep(timeout_duration); + tokio::pin!(timeout_future); + + let res = loop { + tokio::select! { + res = &mut eval_future => { + break Ok(res?); + } + _ = &mut timeout_future => { + break Err(anyhow!( + "Function '{}' ({}) timed out after {} seconds", + op.op_kind, op.name, timeout_duration.as_secs() + )); + } + _ = tokio::time::sleep(warn_duration), if !warned => { + eprintln!( + "WARNING: Function '{}' ({}) is taking longer than 30s", + op_kind_for_warning, op_name_for_warning + ); + warn!( + "Function '{}' ({}) is taking longer than {}s", + op_kind_for_warning, op_name_for_warning, WARNING_THRESHOLD + ); + warned = true; + } + } + }; + + res.and_then(|v| head_scope.define_field(&op.output, &v)) } else { let eval_future = op.executor.evaluate(input_values); - let timeout_result = tokio::time::timeout(timeout_duration, eval_future).await; - if timeout_result.is_err() { - Err(anyhow!( - "Function '{}' ({}) timed out after {} seconds", - op.op_kind, - op.name, - timeout_duration.as_secs() - )) - } else { - timeout_result - .unwrap() - .and_then(|v| head_scope.define_field(&op.output, &v)) - } + // Warning + timeout logic + let mut eval_future = Box::pin(eval_future); + let mut warned = false; + let timeout_future = tokio::time::sleep(timeout_duration); + tokio::pin!(timeout_future); + + let res = loop { + tokio::select! { + res = &mut eval_future => { + break Ok(res?); + } + _ = &mut timeout_future => { + break Err(anyhow!( + "Function '{}' ({}) timed out after {} seconds", + op.op_kind, op.name, timeout_duration.as_secs() + )); + } + _ = tokio::time::sleep(warn_duration), if !warned => { + eprintln!( + "WARNING: Function '{}' ({}) is taking longer than 30s", + op_kind_for_warning, op_name_for_warning + ); + warn!( + "Function '{}' ({}) is taking longer than {}s", + op_kind_for_warning, op_name_for_warning, WARNING_THRESHOLD + ); + warned = true; + } + } + }; + + res.and_then(|v| head_scope.define_field(&op.output, &v)) }; - - warn_handle.abort(); + // let warn_handle = tokio::spawn(async move { + // tokio::time::sleep(warn_duration).await; + // eprintln!( + // "WARNING: Function '{}' ({}) is taking longer than 30s", + // op_kind_for_warning, op_name_for_warning + // ); + // warn!( + // "Function '{}' ({}) is taking longer than 30s", + // op_kind_for_warning, op_name_for_warning + // ); + // }); + + // let mut op_future = Box::pin(op.executor.evaluate(input_values)); + // let mut warned = false; + // let warn_handle = loop{ + // tokio::select!{ + // res = &mut op_future => { + // break res; + // } + // _ = tokio::time::sleep(warn_duration), if !warned => { + // warn!( + // "Function '{}' ({}) is taking longer than {}s", + // op_kind_for_warning, op_name_for_warning, WARNING_THRESHOLD + // ); + // warned = true; + // } + // } + // }; + // let result = if op.function_exec_info.enable_cache { + // let output_value_cell = memory.get_cache_entry( + // || { + // Ok(op + // .function_exec_info + // .fingerprinter + // .clone() + // .with(&input_values)? + // .into_fingerprint()) + // }, + // &op.function_exec_info.output_type, + // /*ttl=*/ None, + // )?; + + // let eval_future = evaluate_with_cell(output_value_cell.as_ref(), move || { + // op.executor.evaluate(input_values) + // }); + + // let timeout_result = tokio::time::timeout(timeout_duration, eval_future).await; + // if timeout_result.is_err() { + // Err(anyhow!( + // "Function '{}' ({}) timed out after {} seconds", + // op.op_kind, + // op.name, + // timeout_duration.as_secs() + // )) + // } else { + // timeout_result + // .unwrap() + // .and_then(|v| head_scope.define_field(&op.output, &v)) + // } + // } else { + // let eval_future = op.executor.evaluate(input_values); + + // let timeout_result = tokio::time::timeout(timeout_duration, eval_future).await; + // if timeout_result.is_err() { + // Err(anyhow!( + // "Function '{}' ({}) timed out after {} seconds", + // op.op_kind, + // op.name, + // timeout_duration.as_secs() + // )) + // } else { + // timeout_result + // .unwrap() + // .and_then(|v| head_scope.define_field(&op.output, &v)) + // } + // }; + + // warn_handle.abort(); if let Some(ref op_stats) = operation_in_process_stats { op_stats.finish_processing(&transform_key, 1); From 50fe1cace8c25c8e1bfdd01f1e37c9dc1c0cb27b Mon Sep 17 00:00:00 2001 From: AdwitaSingh1711 <108565358+AdwitaSingh1711@users.noreply.github.com> Date: Sat, 8 Nov 2025 01:01:03 +0530 Subject: [PATCH 5/7] cleaned commented code --- rust/cocoindex/src/builder/analyzer.rs | 3 - rust/cocoindex/src/execution/evaluator.rs | 88 ----------------------- 2 files changed, 91 deletions(-) diff --git a/rust/cocoindex/src/builder/analyzer.rs b/rust/cocoindex/src/builder/analyzer.rs index 5e8d57ed..ddd67bac 100644 --- a/rust/cocoindex/src/builder/analyzer.rs +++ b/rust/cocoindex/src/builder/analyzer.rs @@ -817,9 +817,6 @@ impl AnalyzerContext { })?; let enable_cache = executor.enable_cache(); let behavior_version = executor.behavior_version(); - // let timeout = executor.timeout() - // .or(execution_options_timeout) - // .or(Some(Duration::from_secs(300))); let timeout = executor.timeout() .or(execution_options_timeout) .or(Some(Duration::from_secs(TIMEOUT_THRESHOLD))); diff --git a/rust/cocoindex/src/execution/evaluator.rs b/rust/cocoindex/src/execution/evaluator.rs index b09de7a8..e8eaf864 100644 --- a/rust/cocoindex/src/execution/evaluator.rs +++ b/rust/cocoindex/src/execution/evaluator.rs @@ -381,12 +381,6 @@ async fn evaluate_op_scope( for value in assemble_input_values(&op.inputs, scoped_entries) { input_values.push(value?); } - - // let timeout_duration = op - // .function_exec_info - // .timeout - // .unwrap_or(Duration::from_secs(300)); - // let warn_duration = Duration::from_secs(30); let timeout_duration = op .function_exec_info .timeout @@ -413,8 +407,6 @@ async fn evaluate_op_scope( let eval_future = evaluate_with_cell(output_value_cell.as_ref(), move || { op.executor.evaluate(input_values) }); - - // Warning + timeout logic let mut eval_future = Box::pin(eval_future); let mut warned = false; let timeout_future = tokio::time::sleep(timeout_duration); @@ -448,8 +440,6 @@ async fn evaluate_op_scope( res.and_then(|v| head_scope.define_field(&op.output, &v)) } else { let eval_future = op.executor.evaluate(input_values); - - // Warning + timeout logic let mut eval_future = Box::pin(eval_future); let mut warned = false; let timeout_future = tokio::time::sleep(timeout_duration); @@ -482,84 +472,6 @@ async fn evaluate_op_scope( res.and_then(|v| head_scope.define_field(&op.output, &v)) }; - // let warn_handle = tokio::spawn(async move { - // tokio::time::sleep(warn_duration).await; - // eprintln!( - // "WARNING: Function '{}' ({}) is taking longer than 30s", - // op_kind_for_warning, op_name_for_warning - // ); - // warn!( - // "Function '{}' ({}) is taking longer than 30s", - // op_kind_for_warning, op_name_for_warning - // ); - // }); - - // let mut op_future = Box::pin(op.executor.evaluate(input_values)); - // let mut warned = false; - // let warn_handle = loop{ - // tokio::select!{ - // res = &mut op_future => { - // break res; - // } - // _ = tokio::time::sleep(warn_duration), if !warned => { - // warn!( - // "Function '{}' ({}) is taking longer than {}s", - // op_kind_for_warning, op_name_for_warning, WARNING_THRESHOLD - // ); - // warned = true; - // } - // } - // }; - // let result = if op.function_exec_info.enable_cache { - // let output_value_cell = memory.get_cache_entry( - // || { - // Ok(op - // .function_exec_info - // .fingerprinter - // .clone() - // .with(&input_values)? - // .into_fingerprint()) - // }, - // &op.function_exec_info.output_type, - // /*ttl=*/ None, - // )?; - - // let eval_future = evaluate_with_cell(output_value_cell.as_ref(), move || { - // op.executor.evaluate(input_values) - // }); - - // let timeout_result = tokio::time::timeout(timeout_duration, eval_future).await; - // if timeout_result.is_err() { - // Err(anyhow!( - // "Function '{}' ({}) timed out after {} seconds", - // op.op_kind, - // op.name, - // timeout_duration.as_secs() - // )) - // } else { - // timeout_result - // .unwrap() - // .and_then(|v| head_scope.define_field(&op.output, &v)) - // } - // } else { - // let eval_future = op.executor.evaluate(input_values); - - // let timeout_result = tokio::time::timeout(timeout_duration, eval_future).await; - // if timeout_result.is_err() { - // Err(anyhow!( - // "Function '{}' ({}) timed out after {} seconds", - // op.op_kind, - // op.name, - // timeout_duration.as_secs() - // )) - // } else { - // timeout_result - // .unwrap() - // .and_then(|v| head_scope.define_field(&op.output, &v)) - // } - // }; - - // warn_handle.abort(); if let Some(ref op_stats) = operation_in_process_stats { op_stats.finish_processing(&transform_key, 1); From 199bed9b1fdf51918e951d0786ad7eb6ca1c7b4a Mon Sep 17 00:00:00 2001 From: AdwitaSingh1711 <108565358+AdwitaSingh1711@users.noreply.github.com> Date: Sat, 8 Nov 2025 22:03:51 +0530 Subject: [PATCH 6/7] added evaluate_with_timeout_and_warning --- rust/cocoindex/src/builder/analyzer.rs | 1 - rust/cocoindex/src/execution/evaluator.rs | 156 ++++++++-------------- 2 files changed, 57 insertions(+), 100 deletions(-) diff --git a/rust/cocoindex/src/builder/analyzer.rs b/rust/cocoindex/src/builder/analyzer.rs index ddd67bac..6fd30b4a 100644 --- a/rust/cocoindex/src/builder/analyzer.rs +++ b/rust/cocoindex/src/builder/analyzer.rs @@ -16,7 +16,6 @@ use tokio::time::Duration; use utils::fingerprint::Fingerprinter; const TIMEOUT_THRESHOLD: u64 = 1800; -const WARNING_THRESHOLD: u64 = 30; #[derive(Debug)] pub(super) enum ValueTypeBuilder { diff --git a/rust/cocoindex/src/execution/evaluator.rs b/rust/cocoindex/src/execution/evaluator.rs index e8eaf864..fedc472d 100644 --- a/rust/cocoindex/src/execution/evaluator.rs +++ b/rust/cocoindex/src/execution/evaluator.rs @@ -361,6 +361,43 @@ async fn evaluate_child_op_scope( }) } +async fn evaluate_with_timeout_and_warning( + eval_future: F, + timeout_duration: Duration, + warn_duration: Duration, + op_kind: String, + op_name: String, +) -> Result +where + F: std::future::Future>, +{ + let mut eval_future = Box::pin(eval_future); + let mut warned = false; + let timeout_future = tokio::time::sleep(timeout_duration); + tokio::pin!(timeout_future); + + loop { + tokio::select! { + res = &mut eval_future => { + return res; + } + _ = &mut timeout_future => { + return Err(anyhow!( + "Function '{}' ({}) timed out after {} seconds", + op_kind, op_name, timeout_duration.as_secs() + )); + } + _ = tokio::time::sleep(warn_duration), if !warned => { + warn!( + "Function '{}' ({}) is taking longer than {}s", + op_kind, op_name, WARNING_THRESHOLD + ); + warned = true; + } + } + } +} + async fn evaluate_op_scope( op_scope: &AnalyzedOpScope, scoped_entries: RefList<'_, &ScopeEntry<'_>>, @@ -407,70 +444,28 @@ async fn evaluate_op_scope( let eval_future = evaluate_with_cell(output_value_cell.as_ref(), move || { op.executor.evaluate(input_values) }); - let mut eval_future = Box::pin(eval_future); - let mut warned = false; - let timeout_future = tokio::time::sleep(timeout_duration); - tokio::pin!(timeout_future); - - let res = loop { - tokio::select! { - res = &mut eval_future => { - break Ok(res?); - } - _ = &mut timeout_future => { - break Err(anyhow!( - "Function '{}' ({}) timed out after {} seconds", - op.op_kind, op.name, timeout_duration.as_secs() - )); - } - _ = tokio::time::sleep(warn_duration), if !warned => { - eprintln!( - "WARNING: Function '{}' ({}) is taking longer than 30s", - op_kind_for_warning, op_name_for_warning - ); - warn!( - "Function '{}' ({}) is taking longer than {}s", - op_kind_for_warning, op_name_for_warning, WARNING_THRESHOLD - ); - warned = true; - } - } - }; - - res.and_then(|v| head_scope.define_field(&op.output, &v)) + let v = evaluate_with_timeout_and_warning( + eval_future, + timeout_duration, + warn_duration, + op_kind_for_warning, + op_name_for_warning, + ) + .await?; + + head_scope.define_field(&op.output, &v) } else { let eval_future = op.executor.evaluate(input_values); - let mut eval_future = Box::pin(eval_future); - let mut warned = false; - let timeout_future = tokio::time::sleep(timeout_duration); - tokio::pin!(timeout_future); - - let res = loop { - tokio::select! { - res = &mut eval_future => { - break Ok(res?); - } - _ = &mut timeout_future => { - break Err(anyhow!( - "Function '{}' ({}) timed out after {} seconds", - op.op_kind, op.name, timeout_duration.as_secs() - )); - } - _ = tokio::time::sleep(warn_duration), if !warned => { - eprintln!( - "WARNING: Function '{}' ({}) is taking longer than 30s", - op_kind_for_warning, op_name_for_warning - ); - warn!( - "Function '{}' ({}) is taking longer than {}s", - op_kind_for_warning, op_name_for_warning, WARNING_THRESHOLD - ); - warned = true; - } - } - }; - - res.and_then(|v| head_scope.define_field(&op.output, &v)) + let v = evaluate_with_timeout_and_warning( + eval_future, + timeout_duration, + warn_duration, + op_kind_for_warning, + op_name_for_warning, + ) + .await?; + + head_scope.define_field(&op.output, &v) }; if let Some(ref op_stats) = operation_in_process_stats { @@ -579,43 +574,6 @@ async fn evaluate_op_scope( let collector_entry = scoped_entries .headn(op.collector_ref.scope_up_level as usize) .ok_or_else(|| anyhow::anyhow!("Collector level out of bound"))?; - - // Assemble input values - let input_values: Vec = - assemble_input_values(&op.input.fields, scoped_entries) - .collect::>>()?; - - // Create field_values vector for all fields in the merged schema - let mut field_values = op - .field_index_mapping - .iter() - .map(|idx| { - idx.map_or(value::Value::Null, |input_idx| { - input_values[input_idx].clone() - }) - }) - .collect::>(); - - // Handle auto_uuid_field (assumed to be at position 0 for efficiency) - if op.has_auto_uuid_field { - if let Some(uuid_idx) = op.collector_schema.auto_uuid_field_idx { - let uuid = memory.next_uuid( - op.fingerprinter - .clone() - .with( - &field_values - .iter() - .enumerate() - .filter(|(i, _)| *i != uuid_idx) - .map(|(_, v)| v) - .collect::>(), - )? - .into_fingerprint(), - )?; - field_values[uuid_idx] = value::Value::Basic(value::BasicValue::Uuid(uuid)); - } - } - { let mut collected_records = collector_entry.collected_values [op.collector_ref.local.collector_idx as usize] From 6e1b7b3994b6c2bccfd2110fd60de551fc9cc1bd Mon Sep 17 00:00:00 2001 From: AdwitaSingh1711 <108565358+AdwitaSingh1711@users.noreply.github.com> Date: Mon, 10 Nov 2025 04:53:19 +0530 Subject: [PATCH 7/7] fixed merge conflicts --- python/cocoindex/flow.py | 2 +- python/cocoindex/op.py | 5 +- rust/cocoindex/src/builder/analyzer.rs | 6 +- rust/cocoindex/src/builder/plan.rs | 2 +- rust/cocoindex/src/execution/evaluator.rs | 59 ++++++++++++++++---- rust/cocoindex/src/execution/live_updater.rs | 1 - rust/cocoindex/src/ops/interface.rs | 2 +- rust/cocoindex/src/ops/py_factory.rs | 2 +- rust/utils/src/retryable.rs | 3 - 9 files changed, 59 insertions(+), 23 deletions(-) diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index b4eb9b40..48969eaf 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -486,7 +486,7 @@ class _SourceRefreshOptions: class _ExecutionOptions: max_inflight_rows: int | None = None max_inflight_bytes: int | None = None - timeout: int | None = None + timeout: datetime.timedelta | None = None class FlowBuilder: diff --git a/python/cocoindex/op.py b/python/cocoindex/op.py index 4e0e574b..310ff719 100644 --- a/python/cocoindex/op.py +++ b/python/cocoindex/op.py @@ -49,6 +49,7 @@ ) from .runtime import to_async_call from .index import IndexOptions +import datetime class OpCategory(Enum): @@ -165,7 +166,7 @@ class OpArgs: batching: bool = False max_batch_size: int | None = None behavior_version: int | None = None - timeout: int | None = None + timeout: datetime.timedelta | None = None arg_relationship: tuple[ArgRelationship, str] | None = None @@ -394,7 +395,7 @@ def enable_cache(self) -> bool: def behavior_version(self) -> int | None: return op_args.behavior_version - def timeout(self) -> int | None: + def timeout(self) -> datetime.timedelta | None: if op_args.timeout is not None: return op_args.timeout diff --git a/rust/cocoindex/src/builder/analyzer.rs b/rust/cocoindex/src/builder/analyzer.rs index 6fd30b4a..a244a301 100644 --- a/rust/cocoindex/src/builder/analyzer.rs +++ b/rust/cocoindex/src/builder/analyzer.rs @@ -12,10 +12,10 @@ use crate::{ }; use futures::future::{BoxFuture, try_join3}; use futures::{FutureExt, future::try_join_all}; -use tokio::time::Duration; +use std::time::Duration; use utils::fingerprint::Fingerprinter; -const TIMEOUT_THRESHOLD: u64 = 1800; +const TIMEOUT_THRESHOLD: Duration = Duration::from_secs(1800); #[derive(Debug)] pub(super) enum ValueTypeBuilder { @@ -818,7 +818,7 @@ impl AnalyzerContext { let behavior_version = executor.behavior_version(); let timeout = executor.timeout() .or(execution_options_timeout) - .or(Some(Duration::from_secs(TIMEOUT_THRESHOLD))); + .or(Some(TIMEOUT_THRESHOLD)); trace!("Finished building executor for transform op `{op_name}`, enable cache: {enable_cache}, behavior version: {behavior_version:?}"); let function_exec_info = AnalyzedFunctionExecInfo { enable_cache, diff --git a/rust/cocoindex/src/builder/plan.rs b/rust/cocoindex/src/builder/plan.rs index 02603cdd..7a2fbddc 100644 --- a/rust/cocoindex/src/builder/plan.rs +++ b/rust/cocoindex/src/builder/plan.rs @@ -3,8 +3,8 @@ use crate::base::spec::FieldName; use crate::prelude::*; use crate::ops::interface::*; -use utils::fingerprint::{Fingerprint, Fingerprinter}; use std::time::Duration; +use utils::fingerprint::{Fingerprint, Fingerprinter}; #[derive(Debug, Clone, PartialEq, Eq, Serialize)] pub struct AnalyzedLocalFieldReference { diff --git a/rust/cocoindex/src/execution/evaluator.rs b/rust/cocoindex/src/execution/evaluator.rs index fedc472d..f3d83f46 100644 --- a/rust/cocoindex/src/execution/evaluator.rs +++ b/rust/cocoindex/src/execution/evaluator.rs @@ -13,8 +13,8 @@ use utils::immutable::RefList; use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, evaluate_with_cell}; -const TIMEOUT_THRESHOLD: u64 = 1800; -const WARNING_THRESHOLD: u64 = 30; +const TIMEOUT_THRESHOLD: Duration = Duration::from_secs(1800); +const WARNING_THRESHOLD: Duration = Duration::from_secs(30); #[derive(Debug)] pub struct ScopeValueBuilder { @@ -390,7 +390,7 @@ where _ = tokio::time::sleep(warn_duration), if !warned => { warn!( "Function '{}' ({}) is taking longer than {}s", - op_kind, op_name, WARNING_THRESHOLD + op_kind, op_name, WARNING_THRESHOLD.as_secs() ); warned = true; } @@ -408,9 +408,10 @@ async fn evaluate_op_scope( for reactive_op in op_scope.reactive_ops.iter() { match reactive_op { AnalyzedReactiveOp::Transform(op) => { - let transform_key = format!("transform/{}{}", op_scope.scope_qualifier, op.name); - + // Track transform operation start if let Some(ref op_stats) = operation_in_process_stats { + let transform_key = + format!("transform/{}{}", op_scope.scope_qualifier, op.name); op_stats.start_processing(&transform_key, 1); } @@ -418,11 +419,9 @@ async fn evaluate_op_scope( for value in assemble_input_values(&op.inputs, scoped_entries) { input_values.push(value?); } - let timeout_duration = op - .function_exec_info - .timeout - .unwrap_or(Duration::from_secs(TIMEOUT_THRESHOLD)); - let warn_duration = Duration::from_secs(WARNING_THRESHOLD); + + let timeout_duration = op.function_exec_info.timeout.unwrap_or(TIMEOUT_THRESHOLD); + let warn_duration = WARNING_THRESHOLD; let op_name_for_warning = op.name.clone(); let op_kind_for_warning = op.op_kind.clone(); @@ -468,7 +467,10 @@ async fn evaluate_op_scope( head_scope.define_field(&op.output, &v) }; + // Track transform operation completion if let Some(ref op_stats) = operation_in_process_stats { + let transform_key = + format!("transform/{}{}", op_scope.scope_qualifier, op.name); op_stats.finish_processing(&transform_key, 1); } @@ -574,6 +576,43 @@ async fn evaluate_op_scope( let collector_entry = scoped_entries .headn(op.collector_ref.scope_up_level as usize) .ok_or_else(|| anyhow::anyhow!("Collector level out of bound"))?; + + // Assemble input values + let input_values: Vec = + assemble_input_values(&op.input.fields, scoped_entries) + .collect::>>()?; + + // Create field_values vector for all fields in the merged schema + let mut field_values = op + .field_index_mapping + .iter() + .map(|idx| { + idx.map_or(value::Value::Null, |input_idx| { + input_values[input_idx].clone() + }) + }) + .collect::>(); + + // Handle auto_uuid_field (assumed to be at position 0 for efficiency) + if op.has_auto_uuid_field { + if let Some(uuid_idx) = op.collector_schema.auto_uuid_field_idx { + let uuid = memory.next_uuid( + op.fingerprinter + .clone() + .with( + &field_values + .iter() + .enumerate() + .filter(|(i, _)| *i != uuid_idx) + .map(|(_, v)| v) + .collect::>(), + )? + .into_fingerprint(), + )?; + field_values[uuid_idx] = value::Value::Basic(value::BasicValue::Uuid(uuid)); + } + } + { let mut collected_records = collector_entry.collected_values [op.collector_ref.local.collector_idx as usize] diff --git a/rust/cocoindex/src/execution/live_updater.rs b/rust/cocoindex/src/execution/live_updater.rs index 2833295c..0fb623a2 100644 --- a/rust/cocoindex/src/execution/live_updater.rs +++ b/rust/cocoindex/src/execution/live_updater.rs @@ -172,7 +172,6 @@ impl SourceUpdateTask { let mut change_stream = change_stream; let retry_options = retryable::RetryOptions { retry_timeout: None, - per_call_timeout: None, initial_backoff: std::time::Duration::from_secs(5), max_backoff: std::time::Duration::from_secs(60), }; diff --git a/rust/cocoindex/src/ops/interface.rs b/rust/cocoindex/src/ops/interface.rs index 6a5d4000..7cef1eda 100644 --- a/rust/cocoindex/src/ops/interface.rs +++ b/rust/cocoindex/src/ops/interface.rs @@ -186,7 +186,7 @@ pub trait SimpleFunctionExecutor: Send + Sync { None } - /// Returns None to use the default timeout (300s) + /// Returns None to use the default timeout (1800s) fn timeout(&self) -> Option { None } diff --git a/rust/cocoindex/src/ops/py_factory.rs b/rust/cocoindex/src/ops/py_factory.rs index c0257316..450789e2 100644 --- a/rust/cocoindex/src/ops/py_factory.rs +++ b/rust/cocoindex/src/ops/py_factory.rs @@ -281,7 +281,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory { Ok(( prepare_fut, enable_cache, - behavior_version, + behavior_version, timeout, batching_options, )) diff --git a/rust/utils/src/retryable.rs b/rust/utils/src/retryable.rs index 43cf5b38..622f29bc 100644 --- a/rust/utils/src/retryable.rs +++ b/rust/utils/src/retryable.rs @@ -109,7 +109,6 @@ pub fn Ok(value: T) -> Result { pub struct RetryOptions { pub retry_timeout: Option, - pub per_call_timeout: Option, pub initial_backoff: Duration, pub max_backoff: Duration, } @@ -118,7 +117,6 @@ impl Default for RetryOptions { fn default() -> Self { Self { retry_timeout: Some(DEFAULT_RETRY_TIMEOUT), - per_call_timeout: None, initial_backoff: Duration::from_millis(100), max_backoff: Duration::from_secs(10), } @@ -127,7 +125,6 @@ impl Default for RetryOptions { pub static HEAVY_LOADED_OPTIONS: RetryOptions = RetryOptions { retry_timeout: Some(DEFAULT_RETRY_TIMEOUT), - per_call_timeout: Some(Duration::from_secs(300)), initial_backoff: Duration::from_secs(1), max_backoff: Duration::from_secs(60), };