From b2f9da8e59f76dcaf0b43617086ceb2a750472b7 Mon Sep 17 00:00:00 2001 From: "marco.mengelkoch" Date: Sat, 18 Oct 2025 11:04:24 +0200 Subject: [PATCH 1/5] add async_call rustpython --- Cargo.lock | 164 ++++++++++++++++++++++++++++++++++++++- Cargo.toml | 6 +- src/lib.rs | 5 +- src/pyo3_runner.rs | 7 +- src/rustpython_runner.rs | 102 ++++++++++++++++++++---- 5 files changed, 260 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ec443d8..cabf18d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -45,6 +45,15 @@ dependencies = [ "libc", ] +[[package]] +name = "ansi_term" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" +dependencies = [ + "winapi", +] + [[package]] name = "anyhow" version = "1.0.100" @@ -64,6 +73,7 @@ dependencies = [ "dunce", "once_cell", "pyo3", + "rustpython", "rustpython-stdlib", "rustpython-vm", "serde_json", @@ -228,6 +238,21 @@ dependencies = [ "windows-link", ] +[[package]] +name = "clap" +version = "2.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0610544180c38b88101fecf2dd634b174a62eef6946f84dfc6a7127512b381c" +dependencies = [ + "ansi_term", + "atty", + "bitflags 1.3.2", + "strsim", + "textwrap 0.11.0", + "unicode-width 0.1.14", + "vec_map", +] + [[package]] name = "clipboard-win" version = "5.4.1" @@ -334,6 +359,27 @@ dependencies = [ "subtle", ] +[[package]] +name = "dirs-next" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1" +dependencies = [ + "cfg-if", + "dirs-sys-next", +] + +[[package]] +name = "dirs-sys-next" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d" +dependencies = [ + "libc", + "redox_users", + "winapi", +] + [[package]] name = "dns-lookup" version = "2.1.1" @@ -370,6 +416,17 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" +[[package]] +name = "env_logger" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a12e6657c4c97ebab115a42dcee77225f7f482cdd841cf7088c657a42e9e00e7" +dependencies = [ + "atty", + "log", + "termcolor", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -428,6 +485,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a3d7db9596fecd151c5f638c0ee5d5bd487b6e0ea232e5dc96d5250f6f94b1d" dependencies = [ "crc32fast", + "libz-sys", "miniz_oxide", ] @@ -727,6 +785,16 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" +[[package]] +name = "libredox" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "416f7e718bdb06000964960ffa43b4335ad4012ae8b99060261aa4a8088d5ccb" +dependencies = [ + "bitflags 2.9.4", + "libc", +] + [[package]] name = "libsqlite3-sys" version = "0.28.0" @@ -738,6 +806,17 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "libz-sys" +version = "1.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b70e7a7df205e92a1a4cd9aaae7898dac0aa555503cc0a649494d0d60e7651d" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.4.15" @@ -1343,6 +1422,17 @@ dependencies = [ "bitflags 2.9.4", ] +[[package]] +name = "redox_users" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43" +dependencies = [ + "getrandom 0.2.16", + "libredox", + "thiserror 1.0.69", +] + [[package]] name = "regex-automata" version = "0.1.10" @@ -1418,6 +1508,27 @@ dependencies = [ "windows-sys 0.61.1", ] +[[package]] +name = "rustpython" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ef4653f77eb5866c7ba903bd0480bcfa9f6d41f73156895b1813687f8dbd026" +dependencies = [ + "atty", + "cfg-if", + "clap", + "dirs-next", + "env_logger", + "libc", + "log", + "rustpython-compiler", + "rustpython-parser", + "rustpython-pylib", + "rustpython-stdlib", + "rustpython-vm", + "rustyline", +] + [[package]] name = "rustpython-ast" version = "0.4.0" @@ -1530,7 +1641,7 @@ dependencies = [ "rustpython-parser-core", "syn 1.0.109", "syn-ext", - "textwrap", + "textwrap 0.15.2", ] [[package]] @@ -1613,6 +1724,17 @@ dependencies = [ "once_cell", ] +[[package]] +name = "rustpython-pylib" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "852d8eb34fce6cbce4eab3b42a87ef996197230168c7fca9d98173057ce27050" +dependencies = [ + "glob", + "rustpython-compiler-core", + "rustpython-derive", +] + [[package]] name = "rustpython-sre_engine" version = "0.4.0" @@ -1650,6 +1772,7 @@ dependencies = [ "junction", "libc", "libsqlite3-sys", + "libz-sys", "mac_address", "malachite-bigint", "md-5", @@ -1965,6 +2088,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "strsim" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" + [[package]] name = "strum" version = "0.24.1" @@ -2061,6 +2190,15 @@ dependencies = [ "windows-sys 0.61.1", ] +[[package]] +name = "termcolor" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" +dependencies = [ + "winapi-util", +] + [[package]] name = "termios" version = "0.3.3" @@ -2070,6 +2208,15 @@ dependencies = [ "libc", ] +[[package]] +name = "textwrap" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" +dependencies = [ + "unicode-width 0.1.14", +] + [[package]] name = "textwrap" version = "0.15.2" @@ -2469,6 +2616,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "vec_map" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" + [[package]] name = "version_check" version = "0.9.5" @@ -2598,6 +2751,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.61.1", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index d3a9927..9e45ad1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ tokio = { version = "1.47.0", features = ["sync", "macros", "rt", "rt-multi-thre default = ["pyo3"] # default = ["rustpython"] pyo3 = ["dep:pyo3"] -rustpython = ["dep:rustpython-vm", "dep:rustpython-stdlib"] +rustpython = ["dep:rustpython-vm", "dep:rustpython", "dep:rustpython-stdlib"] [dependencies.pyo3] version = "0.26.0" @@ -33,6 +33,10 @@ version = "0.4.0" optional = true features = ["threading", "serde", "importlib"] +[dependencies.rustpython] +version = "0.4.0" +optional = true + [dependencies.rustpython-stdlib] version = "0.4.0" optional = true diff --git a/src/lib.rs b/src/lib.rs index 3e9126e..20dbcf5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -92,7 +92,6 @@ pub struct PyRunner { sender: mpsc::Sender, } - impl Default for PyRunner { fn default() -> Self { PyRunner::new() @@ -124,7 +123,9 @@ impl PyRunner { #[cfg(feature = "rustpython")] { - rustpython_runner::python_thread_main(receiver); + use tokio::runtime::Builder; + let rt = Builder::new_current_thread().enable_all().build().unwrap(); + rt.block_on(rustpython_runner::python_thread_main(receiver)); } }); diff --git a/src/pyo3_runner.rs b/src/pyo3_runner.rs index ed9fea1..a773c29 100644 --- a/src/pyo3_runner.rs +++ b/src/pyo3_runner.rs @@ -48,7 +48,9 @@ pub(crate) async fn python_thread_main(mut receiver: mpsc::Receiver) match result { Ok(func) => { - py.detach(|| tokio::spawn(handle_call_async_function(func, args, cmd.responder))); + py.detach(|| { + tokio::spawn(handle_call_async_function(func, args, cmd.responder)) + }); return; // The response is sent async, so we can return early. } Err(e) => Err(e), @@ -190,8 +192,7 @@ fn py_any_to_json(obj: &pyo3::Bound<'_, PyAny>) -> PyResult { return Ok(Value::String(s.to_string())); } if let Ok(list) = obj.cast::() { - let items: PyResult> = - list.iter().map(|item| py_any_to_json(&item)).collect(); + let items: PyResult> = list.iter().map(|item| py_any_to_json(&item)).collect(); return Ok(Value::Array(items?)); } if let Ok(dict) = obj.cast::() { diff --git a/src/rustpython_runner.rs b/src/rustpython_runner.rs index 15be641..1628a56 100644 --- a/src/rustpython_runner.rs +++ b/src/rustpython_runner.rs @@ -4,25 +4,23 @@ // git clone https://github.com/marcomq/async_py use crate::{CmdType, PyCommand}; +use rustpython::InterpreterConfig; use rustpython_vm::{ - builtins::{PyBaseException, PyBool, PyDict, PyFloat, PyInt, PyList, PyStr}, - convert::ToPyObject, - eval, - scope::Scope, - AsObject, Interpreter, PyObjectRef, PyRef, PyResult, Settings, VirtualMachine, + builtins::{PyBaseException, PyBool, PyDict, PyDictRef, PyFloat, PyInt, PyList, PyStr}, convert::ToPyObject, eval, function::ArgMapping, scope::Scope, AsObject, PyObjectRef, PyRef, PyResult, Settings, VirtualMachine }; use serde_json::{json, Map, Number, Value}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; /// The main loop for the RustPython thread. -pub(crate) fn python_thread_main(mut receiver: mpsc::Receiver) { +pub(crate) async fn python_thread_main(mut receiver: mpsc::Receiver) { let mut settings = Settings::default(); settings.path_list.push("Lib".to_owned()); - let interp = Interpreter::with_init(settings, |vm| { - vm.add_native_modules(rustpython_stdlib::get_module_inits()); - }); + let interp = InterpreterConfig::new() + .settings(settings) + .init_stdlib() + .interpreter(); - interp.enter(|vm| { + let scope = interp.enter(|vm| { let scope = vm.new_scope_with_builtins(); vm.run_code_string( scope.clone(), @@ -30,7 +28,10 @@ pub(crate) fn python_thread_main(mut receiver: mpsc::Receiver) { "".into(), ) .unwrap(); - while let Some(cmd) = receiver.blocking_recv() { + scope + }); + while let Some(cmd) = receiver.recv().await { + interp.enter(|vm| { let result = match &cmd.cmd_type { CmdType::RunCode(code) => vm .run_code_string(scope.clone(), code, "".to_owned()) @@ -48,10 +49,23 @@ pub(crate) fn python_thread_main(mut receiver: mpsc::Receiver) { .and_then(|obj| py_to_json(vm, &obj)) } CmdType::CallAsyncFunction { name, args } => { - dbg!(name, args); - unimplemented!("Async functions are not supported yet in RustPython") + // We need to clone the globals from the current scope to pass to the async task. + let locals = Some(scope.locals.clone()); + let globals = scope.globals.clone(); + tokio::spawn(handle_call_async_function( + locals, + globals, + name.clone(), + args.clone(), + cmd.responder, + )); + // The response is sent async, so we can return early. + return; + } + CmdType::Stop => { + receiver.close(); + return; } - CmdType::Stop => break, }; let response = result.map_err(|err| { format!( @@ -61,8 +75,8 @@ pub(crate) fn python_thread_main(mut receiver: mpsc::Receiver) { ) }); let _ = cmd.responder.send(response); - } - }); + }); + } } fn read_variable(vm: &VirtualMachine, scope: Scope, var_name: &str) -> PyResult { @@ -83,6 +97,60 @@ fn call_function(vm: &VirtualMachine, scope: Scope, name: &str, args: Vec func.call(py_args, vm) } +async fn handle_call_async_function( + locals: Option, + globals: PyDictRef, + name: String, + args: Vec, + responder: oneshot::Sender>, +) { + // Each async task runs in its own interpreter to allow for concurrency. + // They share the globals from the main interpreter's scope. + let name_clone = name.clone(); + let result = tokio::task::spawn_blocking(move || { + let mut settings = Settings::default(); + settings.path_list.push("Lib".to_owned()); + let interp = InterpreterConfig::new() + .settings(settings) + .init_stdlib() + .interpreter(); + + interp.enter(|vm| { + let scope = Scope::with_builtins(locals, globals, vm); + let result: PyResult = (|| { + let asyncio = vm.import("asyncio", 0)?; + let loop_obj = asyncio.get_attr("new_event_loop", vm)?.call(vec![], vm)?; + asyncio + .get_attr("set_event_loop", vm)? + .call(vec![loop_obj.clone()], vm)?; + + let py_args: Vec = + args.into_iter().map(|v| json_to_py(vm, v)).collect(); + + let coroutine = read_variable(vm, scope.clone(), &name)?.call(py_args, vm)?; + + let result_obj = loop_obj + .get_attr("run_until_complete", vm)? + .call(vec![coroutine], vm)?; + loop_obj.get_attr("close", vm)?.call(vec![], vm)?; + py_to_json(vm, &result_obj) + })(); + result + }) + }) + .await + .unwrap(); // unwrap the JoinError + + let response = result.map_err(|err| { + format!( + "Cannot apply async function call '{}': {}", + name_clone, + print_err_msg(err) + ) + }); + let _ = responder.send(response); +} + /// Converts a `serde_json::Value` to a `PyObjectRef`. fn json_to_py(vm: &VirtualMachine, val: Value) -> PyObjectRef { match val { From 681148554c5480b4d85d1a628920b84a8f1c9466 Mon Sep 17 00:00:00 2001 From: "marco.mengelkoch" Date: Sat, 18 Oct 2025 11:32:35 +0200 Subject: [PATCH 2/5] using workaround --- Cargo.toml | 4 ++-- src/rustpython_runner.rs | 42 +++++++++++++++++++++++++++------------- 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9e45ad1..416b471 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,8 +18,8 @@ once_cell = "1.19" tokio = { version = "1.47.0", features = ["sync", "macros", "rt", "rt-multi-thread"] } [features] -default = ["pyo3"] -# default = ["rustpython"] +# default = ["pyo3"] +default = ["rustpython"] pyo3 = ["dep:pyo3"] rustpython = ["dep:rustpython-vm", "dep:rustpython", "dep:rustpython-stdlib"] diff --git a/src/rustpython_runner.rs b/src/rustpython_runner.rs index 1628a56..4879972 100644 --- a/src/rustpython_runner.rs +++ b/src/rustpython_runner.rs @@ -13,22 +13,32 @@ use tokio::sync::{mpsc, oneshot}; /// The main loop for the RustPython thread. pub(crate) async fn python_thread_main(mut receiver: mpsc::Receiver) { - let mut settings = Settings::default(); - settings.path_list.push("Lib".to_owned()); - let interp = InterpreterConfig::new() - .settings(settings) + // Create a single, shared sys.modules dictionary that is thread-safe. + let (shared_sys_modules, shared_sys_path) = InterpreterConfig::new() .init_stdlib() - .interpreter(); + .interpreter() + .enter(|vm| { + let modules = vm.sys_module.get_attr("modules", vm).unwrap().downcast::().unwrap(); + let path = vm.sys_module.get_attr("path", vm).unwrap().downcast::().unwrap(); + (modules, path) + }); + + let interp = InterpreterConfig::new().init_stdlib().interpreter(); + let (scope, main_globals) = interp.enter(|vm| { + // Replace the default sys.modules with our shared one. + vm.sys_module.set_attr("modules", shared_sys_modules.clone().to_pyobject(vm), vm).unwrap(); + // Replace the default sys.path with our shared one. + vm.sys_module.set_attr("path", shared_sys_path.clone().to_pyobject(vm), vm).unwrap(); - let scope = interp.enter(|vm| { let scope = vm.new_scope_with_builtins(); + // Add current directory to path for local module imports in tests. vm.run_code_string( scope.clone(), "import sys; sys.path.append('./')", "".into(), ) .unwrap(); - scope + (scope.clone(), scope.globals.clone()) }); while let Some(cmd) = receiver.recv().await { interp.enter(|vm| { @@ -50,13 +60,12 @@ pub(crate) async fn python_thread_main(mut receiver: mpsc::Receiver) } CmdType::CallAsyncFunction { name, args } => { // We need to clone the globals from the current scope to pass to the async task. - let locals = Some(scope.locals.clone()); - let globals = scope.globals.clone(); tokio::spawn(handle_call_async_function( - locals, - globals, + main_globals.clone(), name.clone(), args.clone(), + shared_sys_modules.clone(), + shared_sys_path.clone(), cmd.responder, )); // The response is sent async, so we can return early. @@ -98,10 +107,11 @@ fn call_function(vm: &VirtualMachine, scope: Scope, name: &str, args: Vec } async fn handle_call_async_function( - locals: Option, globals: PyDictRef, name: String, args: Vec, + shared_sys_modules: PyDictRef, + shared_sys_path: PyRef, responder: oneshot::Sender>, ) { // Each async task runs in its own interpreter to allow for concurrency. @@ -116,7 +126,13 @@ async fn handle_call_async_function( .interpreter(); interp.enter(|vm| { - let scope = Scope::with_builtins(locals, globals, vm); + // Use the globals from the main thread and the shared sys.modules. + vm.sys_module.set_attr("modules", shared_sys_modules.to_pyobject(vm), vm).unwrap(); + // Use the shared sys.path. + vm.sys_module.set_attr("path", shared_sys_path.to_pyobject(vm), vm).unwrap(); + // We create a new scope but use the globals from the main interpreter. + // This gives us access to functions defined there. + let scope = Scope::with_builtins(None, globals, vm); let result: PyResult = (|| { let asyncio = vm.import("asyncio", 0)?; let loop_obj = asyncio.get_attr("new_event_loop", vm)?.call(vec![], vm)?; From de42ba2eb98eb07fff308e6370b4d4573792950d Mon Sep 17 00:00:00 2001 From: "marco.mengelkoch" Date: Sat, 18 Oct 2025 11:40:50 +0200 Subject: [PATCH 3/5] cleanup --- Cargo.toml | 4 ++-- README.md | 1 - src/lib.rs | 3 +-- src/rustpython_runner.rs | 11 ++--------- 4 files changed, 5 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 416b471..9e45ad1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,8 +18,8 @@ once_cell = "1.19" tokio = { version = "1.47.0", features = ["sync", "macros", "rt", "rt-multi-thread"] } [features] -# default = ["pyo3"] -default = ["rustpython"] +default = ["pyo3"] +# default = ["rustpython"] pyo3 = ["dep:pyo3"] rustpython = ["dep:rustpython-vm", "dep:rustpython", "dep:rustpython-stdlib"] diff --git a/README.md b/README.md index 983a317..6e7146c 100644 --- a/README.md +++ b/README.md @@ -86,7 +86,6 @@ Each call will use its own event loop. This may not be very efficient and change Make sure to use `call_async_function` for async python functions. Using `call_function` will probably raise an error. -`call_async_function` is not available for RustPython. ### Using a venv It is generally recommended to use a venv to install pip packages. diff --git a/src/lib.rs b/src/lib.rs index 20dbcf5..daa1c4b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -348,7 +348,6 @@ impl PyRunner { /// * `args`: A vector of `serde_json::Value` to pass as arguments to the function. /// /// **Note:** This function is safe to call from any context (sync or async). - #[cfg(feature = "pyo3")] pub fn call_async_function_sync( &self, name: &str, @@ -570,11 +569,11 @@ async def add_and_sleep(a, b, sleep_time): let result2 = executor.call_async_function("add_and_sleep", vec![5.into(), 10.into(), 0.1.into()]); let (result1, result2) = tokio::join!(result1, result2); + // The order of execution is guaranteed by the last timing parameters assert_eq!(result1.unwrap(), Value::Number(17.into())); assert_eq!(result2.unwrap(), Value::Number(16.into())); } - #[cfg(feature = "pyo3")] #[test] fn test_run_with_async_function_sync() { let executor = PyRunner::new(); diff --git a/src/rustpython_runner.rs b/src/rustpython_runner.rs index 4879972..ee60d7d 100644 --- a/src/rustpython_runner.rs +++ b/src/rustpython_runner.rs @@ -134,21 +134,14 @@ async fn handle_call_async_function( // This gives us access to functions defined there. let scope = Scope::with_builtins(None, globals, vm); let result: PyResult = (|| { - let asyncio = vm.import("asyncio", 0)?; - let loop_obj = asyncio.get_attr("new_event_loop", vm)?.call(vec![], vm)?; - asyncio - .get_attr("set_event_loop", vm)? - .call(vec![loop_obj.clone()], vm)?; + let asyncio_run = vm.import("asyncio", 0)?.get_attr("run", vm)?; let py_args: Vec = args.into_iter().map(|v| json_to_py(vm, v)).collect(); let coroutine = read_variable(vm, scope.clone(), &name)?.call(py_args, vm)?; - let result_obj = loop_obj - .get_attr("run_until_complete", vm)? - .call(vec![coroutine], vm)?; - loop_obj.get_attr("close", vm)?.call(vec![], vm)?; + let result_obj = asyncio_run.call(vec![coroutine], vm)?; py_to_json(vm, &result_obj) })(); result From 0053900c2010532d81e29b0e1a8877f5359f686b Mon Sep 17 00:00:00 2001 From: "marco.mengelkoch" Date: Sat, 18 Oct 2025 13:45:06 +0200 Subject: [PATCH 4/5] optimize and resolve issues --- src/lib.rs | 1 - src/pyo3_runner.rs | 9 +- src/rustpython_runner.rs | 378 ++++++++++++++++++++++++++++----------- 3 files changed, 278 insertions(+), 110 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index daa1c4b..e2354e7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -548,7 +548,6 @@ def add(a, b): ); } - #[cfg(feature = "pyo3")] #[tokio::test] async fn test_run_with_async_function() { let executor = PyRunner::new(); diff --git a/src/pyo3_runner.rs b/src/pyo3_runner.rs index a773c29..0bdbf2a 100644 --- a/src/pyo3_runner.rs +++ b/src/pyo3_runner.rs @@ -152,15 +152,16 @@ async fn handle_call_async_function( responder: oneshot::Sender>, ) { let result = Python::attach(|py| { + // Note: This approach creates a new asyncio event loop for each async call, + // which can be inefficient for a high volume of calls. It ensures that each + // call is isolated but does not share an event loop for concurrent execution + // on the Python side. let func = func.bind(py); let t_args = vec_to_py_tuple(&py, args)?; let coroutine = func.call1(t_args)?; let asyncio = py.import("asyncio")?; - let loop_obj = asyncio.call_method0("new_event_loop")?; - asyncio.call_method1("set_event_loop", (loop_obj.clone(),))?; - let result = loop_obj.call_method1("run_until_complete", (coroutine,))?; - loop_obj.call_method0("close")?; + let result = asyncio.call_method1("run", (coroutine,))?; py_any_to_json(&result) }); diff --git a/src/rustpython_runner.rs b/src/rustpython_runner.rs index ee60d7d..a922232 100644 --- a/src/rustpython_runner.rs +++ b/src/rustpython_runner.rs @@ -6,30 +6,49 @@ use crate::{CmdType, PyCommand}; use rustpython::InterpreterConfig; use rustpython_vm::{ - builtins::{PyBaseException, PyBool, PyDict, PyDictRef, PyFloat, PyInt, PyList, PyStr}, convert::ToPyObject, eval, function::ArgMapping, scope::Scope, AsObject, PyObjectRef, PyRef, PyResult, Settings, VirtualMachine + builtins::{PyBaseException, PyBool, PyDict, PyDictRef, PyFloat, PyInt, PyList, PyStr}, + convert::ToPyObject, + eval, + scope::Scope, + AsObject, PyObjectRef, PyRef, PyResult, VirtualMachine, }; use serde_json::{json, Map, Number, Value}; -use tokio::sync::{mpsc, oneshot}; +use std::collections::HashMap; +use tokio::sync::{mpsc, oneshot, Notify}; + +/// Holds the Python objects related to the async infrastructure. +struct AsyncPyState { + loop_obj: PyObjectRef, + make_callback_fn: PyObjectRef, +} /// The main loop for the RustPython thread. +/// +/// This function implements a sophisticated single-interpreter model for concurrency. +/// It spawns a dedicated Python thread that runs an `asyncio` event loop. +/// Asynchronous Python function calls from Rust are scheduled onto this single, +/// persistent event loop using `run_coroutine_threadsafe`. +/// +/// A `tokio::sync::Notify` channel is used as a signaling mechanism to efficiently +/// wake up the Rust `select!` loop only when results are available in the Python-side queue, +/// avoiding the need for polling. pub(crate) async fn python_thread_main(mut receiver: mpsc::Receiver) { - // Create a single, shared sys.modules dictionary that is thread-safe. - let (shared_sys_modules, shared_sys_path) = InterpreterConfig::new() - .init_stdlib() - .interpreter() - .enter(|vm| { - let modules = vm.sys_module.get_attr("modules", vm).unwrap().downcast::().unwrap(); - let path = vm.sys_module.get_attr("path", vm).unwrap().downcast::().unwrap(); - (modules, path) - }); - let interp = InterpreterConfig::new().init_stdlib().interpreter(); - let (scope, main_globals) = interp.enter(|vm| { - // Replace the default sys.modules with our shared one. - vm.sys_module.set_attr("modules", shared_sys_modules.clone().to_pyobject(vm), vm).unwrap(); - // Replace the default sys.path with our shared one. - vm.sys_module.set_attr("path", shared_sys_path.clone().to_pyobject(vm), vm).unwrap(); + // pending responders (id -> oneshot sender) + let mut pending: HashMap>> = HashMap::new(); + let mut next_id: usize = 1; + let mut async_state: Option = None; + // Create a notifier that the Python thread can use to wake up the Rust select! loop. + let notify = std::sync::Arc::new(Notify::new()); + let rust_notify_fn = { + let notify = notify.clone(); + move || { + notify.notify_one(); + } + }; + + let (scope, main_globals) = interp.enter(|vm| { let scope = vm.new_scope_with_builtins(); // Add current directory to path for local module imports in tests. vm.run_code_string( @@ -38,54 +57,195 @@ pub(crate) async fn python_thread_main(mut receiver: mpsc::Receiver) "".into(), ) .unwrap(); + + // Add the native rust_notify function to Python globals. + scope + .globals + .set_item( + "_rust_notify", + vm.new_function("_rust_notify", rust_notify_fn) + .to_pyobject(vm), + vm, + ) + .unwrap(); + (scope.clone(), scope.globals.clone()) }); - while let Some(cmd) = receiver.recv().await { - interp.enter(|vm| { - let result = match &cmd.cmd_type { - CmdType::RunCode(code) => vm - .run_code_string(scope.clone(), code, "".to_owned()) - .map(|_| Value::Null), - CmdType::EvalCode(code) => eval::eval(vm, code, scope.clone(), "") - .and_then(|obj| py_to_json(vm, &obj)), - CmdType::RunFile(file) => vm - .run_script(scope.clone(), file.to_str().unwrap()) - .map(|_| Value::Null), - CmdType::ReadVariable(var_name) => { - read_variable(vm, scope.clone(), var_name).and_then(|obj| py_to_json(vm, &obj)) + + loop { + tokio::select! { + // Branch 1: Wait for a new command from the channel. + Some(cmd) = receiver.recv() => { + if let CmdType::Stop = cmd.cmd_type { + receiver.close(); + } else { + handle_command(&interp, &scope, &mut async_state, &mut pending, &mut next_id, cmd); + } + }, + // Branch 2: Wait for a notification from Python that a result is ready. + // The `if !pending.is_empty()` guard ensures this branch is only + // active when there are outstanding async tasks. + _ = notify.notified(), if !pending.is_empty() => { + handle_notification(&interp, &main_globals, &mut pending); + } + // If the receiver is closed and there are no more pending tasks, exit. + else => { + if receiver.is_closed() && pending.is_empty() { + break; } - CmdType::CallFunction { name, args } => { - call_function(vm, scope.clone(), name, args.clone()) - .and_then(|obj| py_to_json(vm, &obj)) + } + } + } +} + +/// Sets up the asyncio event loop and related infrastructure in the Python interpreter. +/// This is called on-demand when the first async function is executed. +fn setup_async_infrastructure( + vm: &VirtualMachine, + scope: &Scope, +) -> PyResult { + // Initialize a dedicated asyncio loop, a result queue and helper callbacks + // in the Python interpreter. The loop will be run in a separate Python + // thread so that run_coroutine_threadsafe can schedule coroutines onto it. + vm.run_code_string( + scope.clone(), + r#" +import asyncio, threading, queue, traceback +_loop = asyncio.new_event_loop() +def _run_loop(): + asyncio.set_event_loop(_loop) + _loop.run_forever() +_thread = threading.Thread(target=_run_loop, daemon=True) +_thread.start() +_result_queue = queue.Queue() +def _async_done_cb(fut, id): + try: + res = fut.result() + _result_queue.put({'id': id, 'ok': True, 'payload': res}) + _rust_notify() + except Exception as e: + tb = traceback.format_exception_only(type(e), e) + _result_queue.put({'id': id, 'ok': False, 'payload': ''.join(tb)}) +def _make_callback(id): + def _cb(fut): + _async_done_cb(fut, id) + return _cb +"#, + "".into(), + )?; + + let globals = &scope.globals; + let loop_obj = globals.get_item("_loop", vm)?; + let make_callback_fn = globals.get_item("_make_callback", vm)?; + Ok(AsyncPyState { loop_obj, make_callback_fn }) +} + +/// Processes a command received from the Rust side. +fn handle_command( + interp: &rustpython_vm::Interpreter, + scope: &Scope, + async_state: &mut Option, + pending: &mut HashMap>>, + next_id: &mut usize, + cmd: PyCommand, +) { + interp.enter(|vm| { + match cmd.cmd_type { + CmdType::RunCode(code) => { + let result = vm + .run_code_string(scope.clone(), &code, "".to_owned()) + .map(|_| Value::Null) + .map_err(print_err_msg); + let _ = cmd.responder.send(result); + } + CmdType::EvalCode(code) => { + let result = eval::eval(vm, &code, scope.clone(), "") + .and_then(|obj| py_to_json(vm, &obj)) + .map_err(print_err_msg); + let _ = cmd.responder.send(result); + } + CmdType::RunFile(file) => { + let result = vm + .run_script(scope.clone(), file.to_str().unwrap()) + .map(|_| Value::Null) + .map_err(print_err_msg); + let _ = cmd.responder.send(result); + } + CmdType::ReadVariable(var_name) => { + let result = read_variable(vm, scope.clone(), &var_name) + .and_then(|obj| py_to_json(vm, &obj)) + .map_err(print_err_msg); + let _ = cmd.responder.send(result); + } + CmdType::CallFunction { name, args } => { + let result = call_function(vm, scope.clone(), &name, args) + .and_then(|obj| py_to_json(vm, &obj)) + .map_err(print_err_msg); + let _ = cmd.responder.send(result); + } + CmdType::CallAsyncFunction { name, args } => { + let id = *next_id; + *next_id += 1; + + // Initialize async infrastructure on first use. + if async_state.is_none() { + match setup_async_infrastructure(vm, scope) { + Ok(state) => *async_state = Some(state), + Err(e) => { + let _ = cmd.responder.send(Err(print_err_msg(e))); + return; + } + } } - CmdType::CallAsyncFunction { name, args } => { - // We need to clone the globals from the current scope to pass to the async task. - tokio::spawn(handle_call_async_function( - main_globals.clone(), - name.clone(), - args.clone(), - shared_sys_modules.clone(), - shared_sys_path.clone(), - cmd.responder, - )); - // The response is sent async, so we can return early. - return; + + let state = async_state.as_ref().unwrap(); + pending.insert(id, cmd.responder); // Must insert before calling async. + if let Err(e) = handle_call_async_function( + vm, + scope.clone(), + state, + id, + &name, + args, + ) { + if let Some(tx) = pending.remove(&id) { + let _ = tx.send(Err(e)); + } } - CmdType::Stop => { - receiver.close(); - return; + // Response will be sent asynchronously via the queue; return. + } + CmdType::Stop => { + // This case is handled in the select! loop to close the receiver. + } + } + }); +} + +/// Drains the Python-side result queue and completes any pending responders. +fn handle_notification( + interp: &rustpython_vm::Interpreter, + main_globals: &PyDictRef, + pending: &mut HashMap>>, +) { + interp.enter(|vm| { + if let Ok(rq) = main_globals.get_item("_result_queue", vm) { + if let Ok(get_nowait) = rq.get_attr("get_nowait", vm) { + while let Ok(item) = get_nowait.call(vec![], vm) { + if let Some(dict) = item.downcast_ref::() { + let id_obj = dict.get_item("id", vm).unwrap(); + let ok_obj = dict.get_item("ok", vm).unwrap(); + let payload_obj = dict.get_item("payload", vm).unwrap(); + let id = id_obj.clone().try_into_value::(vm).ok().unwrap_or(0) as usize; + if let Some(tx) = pending.remove(&id) { + let ok = ok_obj.clone().try_into_value::(vm).ok().unwrap_or(false); + let res = if ok { py_to_json(vm, &payload_obj).map_err(print_err_msg) } else { Err(payload_obj.str(vm).map(|s| s.to_string()).unwrap_or_else(|_| "".to_string())) }; + let _ = tx.send(res); + } + } } - }; - let response = result.map_err(|err| { - format!( - "Cannot apply cmd {:?}: {}", - cmd.cmd_type, - print_err_msg(err) - ) - }); - let _ = cmd.responder.send(response); - }); - } + } + } + }); } fn read_variable(vm: &VirtualMachine, scope: Scope, var_name: &str) -> PyResult { @@ -106,58 +266,66 @@ fn call_function(vm: &VirtualMachine, scope: Scope, name: &str, args: Vec func.call(py_args, vm) } -async fn handle_call_async_function( - globals: PyDictRef, - name: String, +fn handle_call_async_function( + vm: &VirtualMachine, + scope: Scope, + async_state: &AsyncPyState, + id: usize, + name: &str, args: Vec, - shared_sys_modules: PyDictRef, - shared_sys_path: PyRef, - responder: oneshot::Sender>, -) { - // Each async task runs in its own interpreter to allow for concurrency. - // They share the globals from the main interpreter's scope. - let name_clone = name.clone(); - let result = tokio::task::spawn_blocking(move || { - let mut settings = Settings::default(); - settings.path_list.push("Lib".to_owned()); - let interp = InterpreterConfig::new() - .settings(settings) - .init_stdlib() - .interpreter(); +) -> Result<(), String> { + // Build the coroutine and schedule it on the dedicated loop via + // asyncio.run_coroutine_threadsafe. Then attach a done-callback + // that will put the result into `_result_queue` with the id. + let func = read_variable(vm, scope.clone(), name) + .map_err(|e| format!("Cannot find function {}: {}", name, print_err_msg(e)))?; - interp.enter(|vm| { - // Use the globals from the main thread and the shared sys.modules. - vm.sys_module.set_attr("modules", shared_sys_modules.to_pyobject(vm), vm).unwrap(); - // Use the shared sys.path. - vm.sys_module.set_attr("path", shared_sys_path.to_pyobject(vm), vm).unwrap(); - // We create a new scope but use the globals from the main interpreter. - // This gives us access to functions defined there. - let scope = Scope::with_builtins(None, globals, vm); - let result: PyResult = (|| { - let asyncio_run = vm.import("asyncio", 0)?.get_attr("run", vm)?; + let py_args = args + .iter() + .map(|v| json_to_py(vm, v.clone())) + .collect::>(); + let coroutine = func + .call(py_args, vm) + .map_err(|err| format!("Cannot call function {}: {}", name, print_err_msg(err)))?; - let py_args: Vec = - args.into_iter().map(|v| json_to_py(vm, v)).collect(); + // call asyncio.run_coroutine_threadsafe(coroutine, loop) + let asyncio = vm + .import("asyncio", 0) + .map_err(|err| format!("Cannot import asyncio: {}", print_err_msg(err)))?; + let run_threadsafe = asyncio + .get_attr("run_coroutine_threadsafe", vm) + .map_err(|err| { + format!( + "asyncio.run_coroutine_threadsafe not available: {}", + print_err_msg(err) + ) + })?; - let coroutine = read_variable(vm, scope.clone(), &name)?.call(py_args, vm)?; + // Call run_coroutine_threadsafe(coroutine, loop_obj) + let loop_obj = async_state.loop_obj.clone(); + let fut = run_threadsafe + .call(vec![coroutine, loop_obj.clone().to_pyobject(vm)], vm) + .map_err(|err| format!("Failed to schedule coroutine: {}", print_err_msg(err)))?; - let result_obj = asyncio_run.call(vec![coroutine], vm)?; - py_to_json(vm, &result_obj) - })(); - result - }) - }) - .await - .unwrap(); // unwrap the JoinError + // Create callback using _make_callback(id) + let make_cb = async_state.make_callback_fn.clone(); + let cb = make_cb + .call(vec![vm.ctx.new_int(id as i64).to_pyobject(vm)], vm) + .map_err(|err| format!("Failed to create callback: {}", print_err_msg(err)))?; - let response = result.map_err(|err| { - format!( - "Cannot apply async function call '{}': {}", - name_clone, - print_err_msg(err) - ) - }); - let _ = responder.send(response); + // Attach the callback: fut.add_done_callback(cb) + if let Ok(add_done) = fut.get_attr("add_done_callback", vm) { + if let Err(err) = add_done.call(vec![cb], vm) { + return Err(format!( + "Failed to add done callback: {}", + print_err_msg(err) + )); + } + } else { + return Err("fut.add_done_callback not found".to_string()); + } + + Ok(()) } /// Converts a `serde_json::Value` to a `PyObjectRef`. From 63baf92765535590218bc14dcbac702a55ebc711 Mon Sep 17 00:00:00 2001 From: "marco.mengelkoch" Date: Sat, 18 Oct 2025 14:24:16 +0200 Subject: [PATCH 5/5] change to single event loop --- Cargo.toml | 2 +- src/pyo3_runner.rs | 284 +++++++++++++++++++++++++++++---------- src/rustpython_runner.rs | 39 +++--- 3 files changed, 239 insertions(+), 86 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9e45ad1..1c7d88f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ pyo3 = ["dep:pyo3"] rustpython = ["dep:rustpython-vm", "dep:rustpython", "dep:rustpython-stdlib"] [dependencies.pyo3] -version = "0.26.0" +version = "0.26" features = ["auto-initialize"] optional = true diff --git a/src/pyo3_runner.rs b/src/pyo3_runner.rs index 0bdbf2a..44ba38c 100644 --- a/src/pyo3_runner.rs +++ b/src/pyo3_runner.rs @@ -2,73 +2,167 @@ // © Copyright 2025, by Marco Mengelkoch // Licensed under MIT License, see License file for more details // git clone https://github.com/marcomq/async_py - use crate::{print_path_for_python, CmdType, PyCommand}; use pyo3::{ exceptions::PyKeyError, prelude::*, - types::{PyBool, PyDict, PyFloat, PyInt, PyList, PyString}, + types::{PyBool, PyCFunction, PyDict, PyFloat, PyInt, PyList, PyString, PyTuple}, IntoPyObjectExt, }; use serde_json::Value; +use std::collections::HashMap; use std::ffi::CString; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot, Notify}; + +/// Holds the Python objects related to the async infrastructure. +struct AsyncPyState { + loop_obj: Py, + result_queue: Py, + make_callback_fn: Py, +} /// The main loop for the Python thread. This function is spawned in a new /// thread and is responsible for all Python interaction. pub(crate) async fn python_thread_main(mut receiver: mpsc::Receiver) { Python::initialize(); - let globals = Python::attach(|py| PyDict::new(py).unbind()); - while let Some(mut cmd) = receiver.recv().await { - Python::attach(|py| { - let globals = globals.bind(py); - let result = match std::mem::replace(&mut cmd.cmd_type, CmdType::Stop) { - CmdType::RunCode(code) => { - let c_code = CString::new(code).expect("CString::new failed"); - py.run(&c_code, Some(globals), None).map(|_| Value::Null) - } - CmdType::EvalCode(code) => { - let c_code = CString::new(code).expect("CString::new failed"); - py.eval(&c_code, Some(globals), None) - .and_then(|obj| py_any_to_json(&obj)) - } - CmdType::RunFile(file) => handle_run_file(py, globals, file), - CmdType::ReadVariable(var_name) => { - get_py_object(globals, &var_name).and_then(|obj| py_any_to_json(&obj)) + + // State for async operations + let mut pending: HashMap>> = HashMap::new(); + let mut next_id: usize = 1; + let mut async_state: Option = None; + + // Notifier to wake up the Rust select! loop from Python. + let notify = std::sync::Arc::new(Notify::new()); + + // Create globals and inject the notifier callback. + let globals = Python::attach(|py| -> PyResult> { + let globals = PyDict::new(py); + let rust_notify_fn = { + let notify = notify.clone(); + PyCFunction::new_closure(py, None, None, move |py, _| { + notify.notify_one(); + Ok::, PyErr>(py.py().None()) + })? + .unbind() + }; + globals.set_item("_rust_notify", rust_notify_fn)?; + Ok(globals.unbind()) + }) + .expect("Failed to initialize Python globals"); + + loop { + tokio::select! { + // Branch 1: Wait for a new command from the channel. + Some(cmd) = receiver.recv() => { + if let CmdType::Stop = cmd.cmd_type { + receiver.close(); + } else { + handle_command(&globals, &mut async_state, &mut pending, &mut next_id, cmd); } - CmdType::CallFunction { name, args } => { - handle_call_function(py, globals, name, args) + }, + // Branch 2: Wait for a notification from Python that a result is ready. + _ = notify.notified(), if !pending.is_empty() && async_state.is_some() => { + handle_notification(async_state.as_ref().unwrap(), &mut pending); + } + // If the receiver is closed and there are no more pending tasks, exit. + else => { + if receiver.is_closed() && pending.is_empty() { + break; } - CmdType::CallAsyncFunction { name, args } => { - let result: PyResult<_> = (|| { - let func = get_py_object(globals, &name)?; - check_func_callable(&func, &name)?; - Ok(func.unbind()) - })(); - - match result { - Ok(func) => { - py.detach(|| { - tokio::spawn(handle_call_async_function(func, args, cmd.responder)) - }); - return; // The response is sent async, so we can return early. + } + } + } +} + +/// Sets up the asyncio event loop and related infrastructure in Python. +fn setup_async_infrastructure(py: Python, globals: &Bound) -> PyResult { + let code = r#" +import asyncio, threading, queue, traceback +_loop = asyncio.new_event_loop() +def _run_loop(): + asyncio.set_event_loop(_loop) + _loop.run_forever() +_thread = threading.Thread(target=_run_loop, daemon=True) +_thread.start() +_result_queue = queue.Queue() +def _async_done_cb(fut, id): + try: + res = fut.result() + _result_queue.put({'id': id, 'ok': True, 'payload': res}) + except Exception as e: + tb = traceback.format_exception_only(type(e), e) + _result_queue.put({'id': id, 'ok': False, 'payload': ''.join(tb)}) + finally: + _rust_notify() +def _make_callback(id): + def _cb(fut): + _async_done_cb(fut, id) + return _cb +"#; + let c_code = CString::new(code).expect("CString::new failed"); + py.run(&c_code, Some(globals), None)?; + Ok(AsyncPyState { + loop_obj: globals.get_item("_loop")?.unwrap().unbind(), + result_queue: globals.get_item("_result_queue")?.unwrap().unbind(), + make_callback_fn: globals.get_item("_make_callback")?.unwrap().unbind(), + }) +} + +/// Processes a command received from the Rust side. +fn handle_command( + globals: &Py, + async_state: &mut Option, + pending: &mut HashMap>>, + next_id: &mut usize, + mut cmd: PyCommand, +) { + Python::attach(|py| { + let globals = globals.bind(py); + let result = match std::mem::replace(&mut cmd.cmd_type, CmdType::Stop) { + CmdType::RunCode(code) => { + let c_code = CString::new(code).expect("CString::new failed"); + py.run(&c_code, Some(globals), None).map(|_| Value::Null) + } + CmdType::EvalCode(code) => { + let c_code = CString::new(code).expect("CString::new failed"); + py.eval(&c_code, Some(globals), None) + .and_then(|obj| py_any_to_json(&obj)) + } + CmdType::RunFile(file) => handle_run_file(py, globals, file), + CmdType::ReadVariable(var_name) => { + get_py_object(globals, &var_name).and_then(|obj| py_any_to_json(&obj)) + } + CmdType::CallFunction { name, args } => handle_call_function(py, globals, name, args), + CmdType::CallAsyncFunction { name, args } => { + let id = *next_id; + *next_id += 1; + + // Initialize async infrastructure on first use. + if async_state.is_none() { + match setup_async_infrastructure(py, globals) { + Ok(state) => *async_state = Some(state), + Err(e) => { + let _ = cmd.responder.send(Err(e.to_string())); + return; } - Err(e) => Err(e), } } - CmdType::Stop => return receiver.close(), - }; - - // Convert PyErr to a string representation to avoid exposing it outside this module. - let response = match result { - Ok(value) => Ok(value), - Err(e) => Err(e.to_string()), - }; - let _ = cmd.responder.send(response); - }); - // After the loop, we can send a final confirmation for the Stop command if needed, - // but the current implementation in lib.rs handles the channel closing. - } + let state = async_state.as_ref().unwrap(); + pending.insert(id, cmd.responder); + + if let Err(e) = handle_call_async_function(py, globals, state, id, &name, args) { + if let Some(tx) = pending.remove(&id) { + let _ = tx.send(Err(e)); + } + } + return; // Response is sent async, so we return early. + } + CmdType::Stop => return, // Handled in the select! loop. + }; + + let response = result.map_err(|e| e.to_string()); + let _ = cmd.responder.send(response); + }); } /// Resolves a potentially dot-separated Python object name from the globals dictionary. @@ -103,7 +197,7 @@ fn check_func_callable(func: &Bound, name: &str) -> PyResult<()> { fn handle_run_file( py: Python, - globals: &pyo3::Bound<'_, PyDict>, + globals: &Bound<'_, PyDict>, file: std::path::PathBuf, ) -> PyResult { let code = format!( @@ -123,7 +217,7 @@ with open({}, 'r') as f: /// Handles the `CallFunction` command. fn handle_call_function( py: Python, - globals: &pyo3::Bound<'_, PyDict>, + globals: &Bound<'_, PyDict>, name: String, args: Vec, ) -> PyResult { @@ -142,30 +236,82 @@ fn vec_to_py_tuple<'py>( .into_iter() .map(|v| json_value_to_pyobject(*py, v)) .collect::>>()?; - pyo3::types::PyTuple::new(*py, py_args) + PyTuple::new(*py, py_args) } /// Handles the `CallAsyncFunction` command. -async fn handle_call_async_function( - func: Py, +fn handle_call_async_function( + py: Python, + globals: &Bound, + async_state: &AsyncPyState, + id: usize, + name: &str, args: Vec, - responder: oneshot::Sender>, +) -> Result<(), String> { + let func = get_py_object(globals, name) + .and_then(|f| check_func_callable(&f, name).map(|_| f)) + .map_err(|e| e.to_string())?; + + let t_args = vec_to_py_tuple(&py, args).map_err(|e| e.to_string())?; + let coroutine = func.call1(t_args).map_err(|e| e.to_string())?; + + let asyncio = py.import("asyncio").map_err(|e| e.to_string())?; + let run_threadsafe = asyncio + .getattr("run_coroutine_threadsafe") + .map_err(|e| e.to_string())?; + + let fut = run_threadsafe + .call1((coroutine, async_state.loop_obj.bind(py))) + .map_err(|e| e.to_string())?; + + let cb = async_state + .make_callback_fn + .bind(py) + .call1((id,)) + .map_err(|e| e.to_string())?; + + fut.call_method1("add_done_callback", (cb,)) + .map_err(|e| e.to_string())?; + + Ok(()) +} + +/// Drains the Python-side result queue and completes any pending responders. +fn handle_notification( + async_state: &AsyncPyState, + pending: &mut HashMap>>, ) { - let result = Python::attach(|py| { - // Note: This approach creates a new asyncio event loop for each async call, - // which can be inefficient for a high volume of calls. It ensures that each - // call is isolated but does not share an event loop for concurrent execution - // on the Python side. - let func = func.bind(py); - let t_args = vec_to_py_tuple(&py, args)?; - let coroutine = func.call1(t_args)?; - - let asyncio = py.import("asyncio")?; - let result = asyncio.call_method1("run", (coroutine,))?; - - py_any_to_json(&result) + Python::attach(|py| { + let get_nowait = match async_state.result_queue.bind(py).getattr("get_nowait") { + Ok(f) => f, + Err(_) => return, // Should not happen if setup is correct + }; + while let Ok(item) = get_nowait.call0() { + if let Ok(dict) = item.downcast::() { + let id = dict + .get_item("id") + .unwrap() + .unwrap() + .extract::() + .unwrap(); + if let Some(tx) = pending.remove(&id) { + let ok = dict + .get_item("ok") + .unwrap() + .unwrap() + .extract::() + .unwrap(); + let payload = dict.get_item("payload").unwrap().unwrap(); + let res = if ok { + py_any_to_json(&payload).map_err(|e| e.to_string()) + } else { + Err(payload.to_string()) + }; + let _ = tx.send(res); + } + } + } }); - let _ = responder.send(result.map_err(|e| e.to_string())); } /// Recursively converts a Python object to a `serde_json::Value`. diff --git a/src/rustpython_runner.rs b/src/rustpython_runner.rs index a922232..05a448d 100644 --- a/src/rustpython_runner.rs +++ b/src/rustpython_runner.rs @@ -100,10 +100,7 @@ pub(crate) async fn python_thread_main(mut receiver: mpsc::Receiver) /// Sets up the asyncio event loop and related infrastructure in the Python interpreter. /// This is called on-demand when the first async function is executed. -fn setup_async_infrastructure( - vm: &VirtualMachine, - scope: &Scope, -) -> PyResult { +fn setup_async_infrastructure(vm: &VirtualMachine, scope: &Scope) -> PyResult { // Initialize a dedicated asyncio loop, a result queue and helper callbacks // in the Python interpreter. The loop will be run in a separate Python // thread so that run_coroutine_threadsafe can schedule coroutines onto it. @@ -137,7 +134,10 @@ def _make_callback(id): let globals = &scope.globals; let loop_obj = globals.get_item("_loop", vm)?; let make_callback_fn = globals.get_item("_make_callback", vm)?; - Ok(AsyncPyState { loop_obj, make_callback_fn }) + Ok(AsyncPyState { + loop_obj, + make_callback_fn, + }) } /// Processes a command received from the Rust side. @@ -200,14 +200,9 @@ fn handle_command( let state = async_state.as_ref().unwrap(); pending.insert(id, cmd.responder); // Must insert before calling async. - if let Err(e) = handle_call_async_function( - vm, - scope.clone(), - state, - id, - &name, - args, - ) { + if let Err(e) = + handle_call_async_function(vm, scope.clone(), state, id, &name, args) + { if let Some(tx) = pending.remove(&id) { let _ = tx.send(Err(e)); } @@ -235,10 +230,22 @@ fn handle_notification( let id_obj = dict.get_item("id", vm).unwrap(); let ok_obj = dict.get_item("ok", vm).unwrap(); let payload_obj = dict.get_item("payload", vm).unwrap(); - let id = id_obj.clone().try_into_value::(vm).ok().unwrap_or(0) as usize; + let id = + id_obj.clone().try_into_value::(vm).ok().unwrap_or(0) as usize; if let Some(tx) = pending.remove(&id) { - let ok = ok_obj.clone().try_into_value::(vm).ok().unwrap_or(false); - let res = if ok { py_to_json(vm, &payload_obj).map_err(print_err_msg) } else { Err(payload_obj.str(vm).map(|s| s.to_string()).unwrap_or_else(|_| "".to_string())) }; + let ok = ok_obj + .clone() + .try_into_value::(vm) + .ok() + .unwrap_or(false); + let res = if ok { + py_to_json(vm, &payload_obj).map_err(print_err_msg) + } else { + Err(payload_obj + .str(vm) + .map(|s| s.to_string()) + .unwrap_or_else(|_| "".to_string())) + }; let _ = tx.send(res); } }