From c1f388b659e8a0b61e7d198fe2cd00925a019fb2 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Tue, 12 Aug 2025 18:04:54 +0530 Subject: [PATCH 1/9] Fix error propagation rule for Python's C API --- src/confluent_kafka/src/Consumer.c | 1 + src/confluent_kafka/src/Producer.c | 1 + src/confluent_kafka/src/confluent_kafka.c | 22 +++++++++++++- src/confluent_kafka/src/confluent_kafka.h | 3 ++ tests/test_Consumer.py | 35 +++++++++++++++++++++++ tests/test_Producer.py | 32 +++++++++++++++++++++ 6 files changed, 93 insertions(+), 1 deletion(-) diff --git a/src/confluent_kafka/src/Consumer.c b/src/confluent_kafka/src/Consumer.c index 80db86d78..33f712096 100644 --- a/src/confluent_kafka/src/Consumer.c +++ b/src/confluent_kafka/src/Consumer.c @@ -1586,6 +1586,7 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, if (result) Py_DECREF(result); else { + PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); CallState_crash(cs); rd_kafka_yield(rk); } diff --git a/src/confluent_kafka/src/Producer.c b/src/confluent_kafka/src/Producer.c index 524d24615..52fe5ad0d 100644 --- a/src/confluent_kafka/src/Producer.c +++ b/src/confluent_kafka/src/Producer.c @@ -174,6 +174,7 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm, if (result) Py_DECREF(result); else { + PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); CallState_crash(cs); rd_kafka_yield(rk); } diff --git a/src/confluent_kafka/src/confluent_kafka.c b/src/confluent_kafka/src/confluent_kafka.c index 201f6dbdd..550d97374 100644 --- a/src/confluent_kafka/src/confluent_kafka.c +++ b/src/confluent_kafka/src/confluent_kafka.c @@ -1766,6 +1766,8 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) if (result) Py_DECREF(result); else { + + PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); crash: CallState_crash(cs); rd_kafka_yield(h->rk); @@ -1821,6 +1823,8 @@ static void throttle_cb (rd_kafka_t *rk, const char *broker_name, int32_t broker goto done; } + PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); + /** * Stop callback dispatcher, return err to application * fall-through to unlock GIL @@ -1850,6 +1854,7 @@ static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) { if (result) Py_DECREF(result); else { + PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); CallState_crash(cs); rd_kafka_yield(h->rk); } @@ -1885,6 +1890,7 @@ static void log_cb (const rd_kafka_t *rk, int level, if (result) Py_DECREF(result); else { + PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); CallState_crash(cs); rd_kafka_yield(h->rk); } @@ -2583,6 +2589,9 @@ void CallState_begin (Handle *h, CallState *cs) { cs->thread_state = PyEval_SaveThread(); assert(cs->thread_state != NULL); cs->crashed = 0; + cs->exception_type = NULL; + cs->exception_value = NULL; + cs->exception_traceback = NULL; #ifdef WITH_PY_TSS PyThread_tss_set(&h->tlskey, cs); #else @@ -2603,9 +2612,20 @@ int CallState_end (Handle *h, CallState *cs) { PyEval_RestoreThread(cs->thread_state); - if (PyErr_CheckSignals() == -1 || cs->crashed) + if (PyErr_CheckSignals() == -1) return 0; + if (cs->crashed) { + /* Restore the saved exception if we have one */ + if (cs->exception_type) { + PyErr_Restore(cs->exception_type, cs->exception_value, cs->exception_traceback); + cs->exception_type = NULL; + cs->exception_value = NULL; + cs->exception_traceback = NULL; + } + return 0; + } + return 1; } diff --git a/src/confluent_kafka/src/confluent_kafka.h b/src/confluent_kafka/src/confluent_kafka.h index 7fbedf281..7caae33fc 100644 --- a/src/confluent_kafka/src/confluent_kafka.h +++ b/src/confluent_kafka/src/confluent_kafka.h @@ -270,6 +270,9 @@ int Handle_traverse (Handle *h, visitproc visit, void *arg); typedef struct { PyThreadState *thread_state; int crashed; /* Callback crashed */ + PyObject *exception_type; /* Stored exception type */ + PyObject *exception_value; /* Stored exception value */ + PyObject *exception_traceback; /* Stored exception traceback */ } CallState; /** diff --git a/tests/test_Consumer.py b/tests/test_Consumer.py index 782b320f1..fbf1f59ac 100644 --- a/tests/test_Consumer.py +++ b/tests/test_Consumer.py @@ -314,3 +314,38 @@ def test_consumer_without_groupid(): with pytest.raises(ValueError) as ex: TestConsumer({'bootstrap.servers': "mybroker:9092"}) assert ex.match('group.id must be set') + + +def test_callback_exception_no_system_error(): + + exception_raised = [] + + def error_cb_that_raises(error): + """Error callback that raises an exception""" + exception_raised.append(error) + raise RuntimeError("Test exception from error_cb") + + # Create consumer with error callback that raises exception + consumer = TestConsumer({ + 'group.id': 'test-callback-systemerror-fix', + 'bootstrap.servers': 'nonexistent-broker:9092', # Will trigger error + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'error_cb': error_cb_that_raises + }) + + consumer.subscribe(['test-topic']) + + # This should trigger the error callback due to connection failure + # Before fix: Would get RuntimeError + SystemError + # After fix: Should only get RuntimeError (no SystemError) + with pytest.raises(RuntimeError) as exc_info: + consumer.consume(timeout=0.1) + + # Verify we got the expected exception message + assert "Test exception from error_cb" in str(exc_info.value) + + # Verify the error callback was actually called + assert len(exception_raised) > 0 + + consumer.close() diff --git a/tests/test_Producer.py b/tests/test_Producer.py index 2eab2d881..9d481df94 100644 --- a/tests/test_Producer.py +++ b/tests/test_Producer.py @@ -973,3 +973,35 @@ def test_produce_batch_api_compatibility(): except ImportError: # AvroProducer not available - skip this part pytest.skip("AvroProducer not available") + +def test_callback_exception_no_system_error(): + delivery_reports = [] + + def delivery_cb_that_raises(err, msg): + """Delivery report callback that raises an exception""" + delivery_reports.append((err, msg)) + raise RuntimeError("Test exception from delivery_cb") + + producer = Producer({ + 'bootstrap.servers': 'nonexistent-broker:9092', # Will cause delivery failures + 'socket.timeout.ms': 100, + 'message.timeout.ms': 1000, + 'on_delivery': delivery_cb_that_raises + }) + + # Produce a message - this will trigger delivery report callback when it fails + producer.produce('test-topic', value='test-message') + + # Flush to ensure delivery reports are processed + # Before fix: Would get RuntimeError + SystemError + # After fix: Should only get RuntimeError (no SystemError) + with pytest.raises(RuntimeError) as exc_info: + producer.flush(timeout=1.0) + + # Verify we got an exception from our callback + assert "Test exception from delivery_cb" in str(exc_info.value) + + # Verify the delivery callback was actually called + assert len(delivery_reports) > 0 + + producer.close() From a61a32e40e7754b0cfc367bb69ea38a2ae3a2d1e Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Wed, 13 Aug 2025 18:39:53 +0530 Subject: [PATCH 2/9] Fix CI --- src/confluent_kafka/src/Consumer.c | 1 + src/confluent_kafka/src/Producer.c | 1 + tests/test_Producer.py | 6 ++---- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/confluent_kafka/src/Consumer.c b/src/confluent_kafka/src/Consumer.c index 33f712096..0f08609f9 100644 --- a/src/confluent_kafka/src/Consumer.c +++ b/src/confluent_kafka/src/Consumer.c @@ -1586,6 +1586,7 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, if (result) Py_DECREF(result); else { + PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); CallState_crash(cs); rd_kafka_yield(rk); diff --git a/src/confluent_kafka/src/Producer.c b/src/confluent_kafka/src/Producer.c index 52fe5ad0d..0c2cdefdf 100644 --- a/src/confluent_kafka/src/Producer.c +++ b/src/confluent_kafka/src/Producer.c @@ -174,6 +174,7 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm, if (result) Py_DECREF(result); else { + PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); CallState_crash(cs); rd_kafka_yield(rk); diff --git a/tests/test_Producer.py b/tests/test_Producer.py index 9d481df94..4a980df7c 100644 --- a/tests/test_Producer.py +++ b/tests/test_Producer.py @@ -985,7 +985,7 @@ def delivery_cb_that_raises(err, msg): producer = Producer({ 'bootstrap.servers': 'nonexistent-broker:9092', # Will cause delivery failures 'socket.timeout.ms': 100, - 'message.timeout.ms': 1000, + 'message.timeout.ms': 10, # Very short timeout to trigger delivery failure quickly 'on_delivery': delivery_cb_that_raises }) @@ -996,12 +996,10 @@ def delivery_cb_that_raises(err, msg): # Before fix: Would get RuntimeError + SystemError # After fix: Should only get RuntimeError (no SystemError) with pytest.raises(RuntimeError) as exc_info: - producer.flush(timeout=1.0) + producer.flush(timeout=2.0) # Longer timeout to ensure delivery callback fires # Verify we got an exception from our callback assert "Test exception from delivery_cb" in str(exc_info.value) # Verify the delivery callback was actually called assert len(delivery_reports) > 0 - - producer.close() From c406840c08539edb6d81915c381e57353db7e083 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Wed, 13 Aug 2025 19:00:20 +0530 Subject: [PATCH 3/9] Fix CI --- tests/test_Consumer.py | 14 +++++++------- tests/test_Producer.py | 14 +++++++------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/tests/test_Consumer.py b/tests/test_Consumer.py index fbf1f59ac..87a1faab0 100644 --- a/tests/test_Consumer.py +++ b/tests/test_Consumer.py @@ -319,12 +319,12 @@ def test_consumer_without_groupid(): def test_callback_exception_no_system_error(): exception_raised = [] - + def error_cb_that_raises(error): """Error callback that raises an exception""" exception_raised.append(error) raise RuntimeError("Test exception from error_cb") - + # Create consumer with error callback that raises exception consumer = TestConsumer({ 'group.id': 'test-callback-systemerror-fix', @@ -333,19 +333,19 @@ def error_cb_that_raises(error): 'session.timeout.ms': 1000, 'error_cb': error_cb_that_raises }) - + consumer.subscribe(['test-topic']) - + # This should trigger the error callback due to connection failure # Before fix: Would get RuntimeError + SystemError # After fix: Should only get RuntimeError (no SystemError) with pytest.raises(RuntimeError) as exc_info: consumer.consume(timeout=0.1) - + # Verify we got the expected exception message assert "Test exception from error_cb" in str(exc_info.value) - + # Verify the error callback was actually called assert len(exception_raised) > 0 - + consumer.close() diff --git a/tests/test_Producer.py b/tests/test_Producer.py index 4a980df7c..5f8ac5dee 100644 --- a/tests/test_Producer.py +++ b/tests/test_Producer.py @@ -976,30 +976,30 @@ def test_produce_batch_api_compatibility(): def test_callback_exception_no_system_error(): delivery_reports = [] - + def delivery_cb_that_raises(err, msg): """Delivery report callback that raises an exception""" delivery_reports.append((err, msg)) raise RuntimeError("Test exception from delivery_cb") - + producer = Producer({ 'bootstrap.servers': 'nonexistent-broker:9092', # Will cause delivery failures 'socket.timeout.ms': 100, 'message.timeout.ms': 10, # Very short timeout to trigger delivery failure quickly 'on_delivery': delivery_cb_that_raises }) - + # Produce a message - this will trigger delivery report callback when it fails producer.produce('test-topic', value='test-message') - + # Flush to ensure delivery reports are processed - # Before fix: Would get RuntimeError + SystemError + # Before fix: Would get RuntimeError + SystemError # After fix: Should only get RuntimeError (no SystemError) with pytest.raises(RuntimeError) as exc_info: producer.flush(timeout=2.0) # Longer timeout to ensure delivery callback fires - + # Verify we got an exception from our callback assert "Test exception from delivery_cb" in str(exc_info.value) - + # Verify the delivery callback was actually called assert len(delivery_reports) > 0 From 8e9205c7de15e711741a8a37f57b1034dbe91b14 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Sat, 16 Aug 2025 12:50:34 +0530 Subject: [PATCH 4/9] Address PR review feedback - Add issue number #865 to test comments for better traceability - Move PyErr_Fetch into else condition for consistency in confluent_kafka.c - Add CHANGELOG entry documenting the fix for issue #865 --- CHANGELOG.md | 3 ++ src/confluent_kafka/src/confluent_kafka.c | 4 +- src/confluent_kafka/src/confluent_kafka.h | 49 +++++++++++++++++++++++ tests/test_Consumer.py | 2 +- tests/test_Producer.py | 2 +- 5 files changed, 56 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 77f0dcf85..ce2b6a63b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -78,6 +78,9 @@ for a complete list of changes, enhancements, fixes and upgrade considerations. v2.11.0 is a feature release with the following enhancements: +### Fixes +- Fix error propagation rule for Python's C API to prevent SystemError when callbacks raise exceptions (#865) + confluent-kafka-python v2.11.0 is based on librdkafka v2.11.0, see the [librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.11.0) for a complete list of changes, enhancements, fixes and upgrade considerations. diff --git a/src/confluent_kafka/src/confluent_kafka.c b/src/confluent_kafka/src/confluent_kafka.c index 550d97374..a4ac72f24 100644 --- a/src/confluent_kafka/src/confluent_kafka.c +++ b/src/confluent_kafka/src/confluent_kafka.c @@ -1821,10 +1821,10 @@ static void throttle_cb (rd_kafka_t *rk, const char *broker_name, int32_t broker /* throttle_cb executed successfully */ Py_DECREF(result); goto done; + } else { + PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); } - PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); - /** * Stop callback dispatcher, return err to application * fall-through to unlock GIL diff --git a/src/confluent_kafka/src/confluent_kafka.h b/src/confluent_kafka/src/confluent_kafka.h index 7caae33fc..7495919b2 100644 --- a/src/confluent_kafka/src/confluent_kafka.h +++ b/src/confluent_kafka/src/confluent_kafka.h @@ -275,6 +275,55 @@ typedef struct { PyObject *exception_traceback; /* Stored exception traceback */ } CallState; +/** + * @brief Compatibility layer for Python exception handling API changes. + * PyErr_Fetch was deprecated in Python 3.12 in favor of PyErr_GetRaisedException. + */ +#if PY_VERSION_HEX >= 0x030c0000 +/* Python 3.12+ - use new API */ +static inline void +CallState_fetch_exception(CallState *cs) { + PyObject *exc = PyErr_GetRaisedException(); + if (exc) { + cs->exception_type = (PyObject *)Py_TYPE(exc); + Py_INCREF(cs->exception_type); + cs->exception_value = exc; + cs->exception_traceback = PyException_GetTraceback(exc); + } else { + cs->exception_type = NULL; + cs->exception_value = NULL; + cs->exception_traceback = NULL; + } +} + +static inline void +CallState_restore_exception(CallState *cs) { + if (cs->exception_value) { + PyErr_SetRaisedException(cs->exception_value); + /* PyErr_SetRaisedException steals the reference, so clear our pointer */ + cs->exception_value = NULL; + Py_XDECREF(cs->exception_type); + cs->exception_type = NULL; + Py_XDECREF(cs->exception_traceback); + cs->exception_traceback = NULL; + } +} +#else +/* Python < 3.12 - use legacy API */ +static inline void +CallState_fetch_exception(CallState *cs) { + PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); +} + +static inline void +CallState_restore_exception(CallState *cs) { + PyErr_Restore(cs->exception_type, cs->exception_value, cs->exception_traceback); + cs->exception_type = NULL; + cs->exception_value = NULL; + cs->exception_traceback = NULL; +} +#endif + /** * @brief Initialiase a CallState and unlock the GIL prior to a * possibly blocking external call. diff --git a/tests/test_Consumer.py b/tests/test_Consumer.py index 87a1faab0..7ccc27302 100644 --- a/tests/test_Consumer.py +++ b/tests/test_Consumer.py @@ -337,7 +337,7 @@ def error_cb_that_raises(error): consumer.subscribe(['test-topic']) # This should trigger the error callback due to connection failure - # Before fix: Would get RuntimeError + SystemError + # Before fix: Would get RuntimeError + SystemError (Issue #865) # After fix: Should only get RuntimeError (no SystemError) with pytest.raises(RuntimeError) as exc_info: consumer.consume(timeout=0.1) diff --git a/tests/test_Producer.py b/tests/test_Producer.py index 5f8ac5dee..3640fd24f 100644 --- a/tests/test_Producer.py +++ b/tests/test_Producer.py @@ -993,7 +993,7 @@ def delivery_cb_that_raises(err, msg): producer.produce('test-topic', value='test-message') # Flush to ensure delivery reports are processed - # Before fix: Would get RuntimeError + SystemError + # Before fix: Would get RuntimeError + SystemError (Issue #865) # After fix: Should only get RuntimeError (no SystemError) with pytest.raises(RuntimeError) as exc_info: producer.flush(timeout=2.0) # Longer timeout to ensure delivery callback fires From 4c1829affc5301f8d3f4c3d95fdbcbd973e96447 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Wed, 20 Aug 2025 01:56:24 +0530 Subject: [PATCH 5/9] Add wrapper for depreciated API --- src/confluent_kafka/src/Admin.c | 2 +- src/confluent_kafka/src/Consumer.c | 2 +- src/confluent_kafka/src/Producer.c | 2 +- src/confluent_kafka/src/confluent_kafka.c | 13 ++--- src/confluent_kafka/src/confluent_kafka.h | 64 +++++++++++++---------- 5 files changed, 44 insertions(+), 39 deletions(-) diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index e9b636453..d4d97578d 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -5093,7 +5093,7 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev, PyObject *trace = NULL; /* Fetch (and clear) currently raised exception */ - PyErr_Fetch(&exctype, &error, &trace); + cfl_exception_fetch(&exctype, &exc, &trace); Py_XDECREF(trace); } diff --git a/src/confluent_kafka/src/Consumer.c b/src/confluent_kafka/src/Consumer.c index 0f08609f9..d57bef968 100644 --- a/src/confluent_kafka/src/Consumer.c +++ b/src/confluent_kafka/src/Consumer.c @@ -1587,7 +1587,7 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, Py_DECREF(result); else { - PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); + CallState_fetch_exception(cs); CallState_crash(cs); rd_kafka_yield(rk); } diff --git a/src/confluent_kafka/src/Producer.c b/src/confluent_kafka/src/Producer.c index 0c2cdefdf..70fc84500 100644 --- a/src/confluent_kafka/src/Producer.c +++ b/src/confluent_kafka/src/Producer.c @@ -175,7 +175,7 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm, Py_DECREF(result); else { - PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); + CallState_fetch_exception(cs); CallState_crash(cs); rd_kafka_yield(rk); } diff --git a/src/confluent_kafka/src/confluent_kafka.c b/src/confluent_kafka/src/confluent_kafka.c index a4ac72f24..775b90205 100644 --- a/src/confluent_kafka/src/confluent_kafka.c +++ b/src/confluent_kafka/src/confluent_kafka.c @@ -1767,7 +1767,7 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) Py_DECREF(result); else { - PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); + CallState_fetch_exception(cs); crash: CallState_crash(cs); rd_kafka_yield(h->rk); @@ -1822,7 +1822,7 @@ static void throttle_cb (rd_kafka_t *rk, const char *broker_name, int32_t broker Py_DECREF(result); goto done; } else { - PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); + CallState_fetch_exception(cs); } /** @@ -1854,7 +1854,7 @@ static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) { if (result) Py_DECREF(result); else { - PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); + CallState_fetch_exception(cs); CallState_crash(cs); rd_kafka_yield(h->rk); } @@ -1890,7 +1890,7 @@ static void log_cb (const rd_kafka_t *rk, int level, if (result) Py_DECREF(result); else { - PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); + CallState_fetch_exception(cs); CallState_crash(cs); rd_kafka_yield(h->rk); } @@ -2618,10 +2618,7 @@ int CallState_end (Handle *h, CallState *cs) { if (cs->crashed) { /* Restore the saved exception if we have one */ if (cs->exception_type) { - PyErr_Restore(cs->exception_type, cs->exception_value, cs->exception_traceback); - cs->exception_type = NULL; - cs->exception_value = NULL; - cs->exception_traceback = NULL; + CallState_restore_exception(cs); } return 0; } diff --git a/src/confluent_kafka/src/confluent_kafka.h b/src/confluent_kafka/src/confluent_kafka.h index 7495919b2..c1825a0dc 100644 --- a/src/confluent_kafka/src/confluent_kafka.h +++ b/src/confluent_kafka/src/confluent_kafka.h @@ -277,52 +277,60 @@ typedef struct { /** * @brief Compatibility layer for Python exception handling API changes. - * PyErr_Fetch was deprecated in Python 3.12 in favor of PyErr_GetRaisedException. + * PyErr_Fetch/PyErr_Restore were deprecated in Python 3.12 in favor of + * PyErr_GetRaisedException/PyErr_SetRaisedException. */ -#if PY_VERSION_HEX >= 0x030c0000 -/* Python 3.12+ - use new API */ + +/* General-purpose compatibility wrappers */ static inline void -CallState_fetch_exception(CallState *cs) { +cfl_exception_fetch(PyObject **exc_type, PyObject **exc_value, PyObject **exc_traceback) { +#if PY_VERSION_HEX >= 0x030c0000 + /* Python 3.12+ - use new API */ PyObject *exc = PyErr_GetRaisedException(); if (exc) { - cs->exception_type = (PyObject *)Py_TYPE(exc); - Py_INCREF(cs->exception_type); - cs->exception_value = exc; - cs->exception_traceback = PyException_GetTraceback(exc); + *exc_type = (PyObject *)Py_TYPE(exc); + Py_INCREF(*exc_type); + *exc_value = exc; + *exc_traceback = PyException_GetTraceback(exc); } else { - cs->exception_type = NULL; - cs->exception_value = NULL; - cs->exception_traceback = NULL; + *exc_type = *exc_value = *exc_traceback = NULL; } +#else + /* Python < 3.12 - use legacy API */ + PyErr_Fetch(exc_type, exc_value, exc_traceback); +#endif } static inline void -CallState_restore_exception(CallState *cs) { - if (cs->exception_value) { - PyErr_SetRaisedException(cs->exception_value); - /* PyErr_SetRaisedException steals the reference, so clear our pointer */ - cs->exception_value = NULL; - Py_XDECREF(cs->exception_type); - cs->exception_type = NULL; - Py_XDECREF(cs->exception_traceback); - cs->exception_traceback = NULL; +cfl_exception_restore(PyObject *exc_type, PyObject *exc_value, PyObject *exc_traceback) { +#if PY_VERSION_HEX >= 0x030c0000 + /* Python 3.12+ - use new API */ + if (exc_value) { + PyErr_SetRaisedException(exc_value); + Py_XDECREF(exc_type); + Py_XDECREF(exc_traceback); } -} #else -/* Python < 3.12 - use legacy API */ + /* Python < 3.12 - use legacy API */ + PyErr_Restore(exc_type, exc_value, exc_traceback); +#endif +} + +/* CallState-specific convenience wrappers */ static inline void CallState_fetch_exception(CallState *cs) { - PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); + cfl_exception_fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); } static inline void CallState_restore_exception(CallState *cs) { - PyErr_Restore(cs->exception_type, cs->exception_value, cs->exception_traceback); - cs->exception_type = NULL; - cs->exception_value = NULL; - cs->exception_traceback = NULL; + if (cs->exception_type) { + cfl_exception_restore(cs->exception_type, cs->exception_value, cs->exception_traceback); + cs->exception_type = NULL; + cs->exception_value = NULL; + cs->exception_traceback = NULL; + } } -#endif /** * @brief Initialiase a CallState and unlock the GIL prior to a From 6b573b4b804c59a240cadc351349c3e659d78828 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Wed, 20 Aug 2025 02:01:23 +0530 Subject: [PATCH 6/9] minor --- src/confluent_kafka/src/confluent_kafka.c | 2 +- src/confluent_kafka/src/confluent_kafka.h | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/confluent_kafka/src/confluent_kafka.c b/src/confluent_kafka/src/confluent_kafka.c index 775b90205..3267278cf 100644 --- a/src/confluent_kafka/src/confluent_kafka.c +++ b/src/confluent_kafka/src/confluent_kafka.c @@ -1854,7 +1854,7 @@ static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) { if (result) Py_DECREF(result); else { - CallState_fetch_exception(cs); + CallState_fetch_exception(cs); CallState_crash(cs); rd_kafka_yield(h->rk); } diff --git a/src/confluent_kafka/src/confluent_kafka.h b/src/confluent_kafka/src/confluent_kafka.h index c1825a0dc..c0691807e 100644 --- a/src/confluent_kafka/src/confluent_kafka.h +++ b/src/confluent_kafka/src/confluent_kafka.h @@ -281,7 +281,6 @@ typedef struct { * PyErr_GetRaisedException/PyErr_SetRaisedException. */ -/* General-purpose compatibility wrappers */ static inline void cfl_exception_fetch(PyObject **exc_type, PyObject **exc_value, PyObject **exc_traceback) { #if PY_VERSION_HEX >= 0x030c0000 @@ -316,7 +315,6 @@ cfl_exception_restore(PyObject *exc_type, PyObject *exc_value, PyObject *exc_tra #endif } -/* CallState-specific convenience wrappers */ static inline void CallState_fetch_exception(CallState *cs) { cfl_exception_fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); From beab3c7eb3517fc536c17394f6a6db9dc928ee57 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Fri, 22 Aug 2025 18:39:46 +0530 Subject: [PATCH 7/9] Revert error and exec in fetch --- src/confluent_kafka/src/Admin.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index d4d97578d..1aa2476ea 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -5093,7 +5093,7 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev, PyObject *trace = NULL; /* Fetch (and clear) currently raised exception */ - cfl_exception_fetch(&exctype, &exc, &trace); + cfl_exception_fetch(&exctype, &error, &trace); Py_XDECREF(trace); } From 3e4c8d41650c21c05c144a5d4322886d4075f52c Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Fri, 24 Oct 2025 21:31:58 +0530 Subject: [PATCH 8/9] Address feedback on CallState params and adding more tests --- src/confluent_kafka/src/Admin.c | 29 ++-- src/confluent_kafka/src/Consumer.c | 2 +- src/confluent_kafka/src/Producer.c | 1 - src/confluent_kafka/src/confluent_kafka.c | 7 +- src/confluent_kafka/src/confluent_kafka.h | 33 ++--- tests/test_Admin.py | 86 ++++++++++++ tests/test_Consumer.py | 153 +++++++++++++++++++--- tests/test_Producer.py | 147 +++++++++++++++++++++ 8 files changed, 396 insertions(+), 62 deletions(-) diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index 1aa2476ea..4b8363006 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -4717,7 +4717,7 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev, PyGILState_STATE gstate; PyObject *error, *method, *ret; PyObject *result = NULL; - PyObject *exctype = NULL, *exc = NULL, *excargs = NULL; + PyObject *exc = NULL, *excargs = NULL; /* Acquire GIL */ gstate = PyGILState_Ensure(); @@ -5093,7 +5093,7 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev, PyObject *trace = NULL; /* Fetch (and clear) currently raised exception */ - cfl_exception_fetch(&exctype, &error, &trace); + cfl_exception_fetch(&exc); Py_XDECREF(trace); } @@ -5124,22 +5124,17 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev, * Pass an exception to future.set_exception(). */ - if (!exctype) { + if (!exc) { /* No previous exception raised, use KafkaException */ - exctype = KafkaException; - Py_INCREF(exctype); - } - - /* Create a new exception based on exception type and error. */ - excargs = PyTuple_New(1); - Py_INCREF(error); /* tuple's reference */ - PyTuple_SET_ITEM(excargs, 0, error); - exc = ((PyTypeObject *)exctype)->tp_new( - (PyTypeObject *)exctype, NULL, NULL); - exc->ob_type->tp_init(exc, excargs, NULL); - Py_DECREF(excargs); - Py_XDECREF(exctype); - Py_XDECREF(error); /* from error source above */ + excargs = PyTuple_New(1); + Py_INCREF(error); /* tuple's reference */ + PyTuple_SET_ITEM(excargs, 0, error); + exc = ((PyTypeObject *)KafkaException)->tp_new( + (PyTypeObject *)KafkaException, NULL, NULL); + exc->ob_type->tp_init(exc, excargs, NULL); + Py_DECREF(excargs); + Py_XDECREF(error); /* from error source above */ + } /* * Call future.set_exception(exc) diff --git a/src/confluent_kafka/src/Consumer.c b/src/confluent_kafka/src/Consumer.c index d57bef968..909d1ec3f 100644 --- a/src/confluent_kafka/src/Consumer.c +++ b/src/confluent_kafka/src/Consumer.c @@ -424,6 +424,7 @@ static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, if (result) Py_DECREF(result); else { + CallState_fetch_exception(cs); CallState_crash(cs); rd_kafka_yield(rk); } @@ -1586,7 +1587,6 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, if (result) Py_DECREF(result); else { - CallState_fetch_exception(cs); CallState_crash(cs); rd_kafka_yield(rk); diff --git a/src/confluent_kafka/src/Producer.c b/src/confluent_kafka/src/Producer.c index 70fc84500..7b9d3f39a 100644 --- a/src/confluent_kafka/src/Producer.c +++ b/src/confluent_kafka/src/Producer.c @@ -174,7 +174,6 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm, if (result) Py_DECREF(result); else { - CallState_fetch_exception(cs); CallState_crash(cs); rd_kafka_yield(rk); diff --git a/src/confluent_kafka/src/confluent_kafka.c b/src/confluent_kafka/src/confluent_kafka.c index 3267278cf..d0d395507 100644 --- a/src/confluent_kafka/src/confluent_kafka.c +++ b/src/confluent_kafka/src/confluent_kafka.c @@ -1766,8 +1766,7 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) if (result) Py_DECREF(result); else { - - CallState_fetch_exception(cs); + CallState_fetch_exception(cs); crash: CallState_crash(cs); rd_kafka_yield(h->rk); @@ -2589,9 +2588,7 @@ void CallState_begin (Handle *h, CallState *cs) { cs->thread_state = PyEval_SaveThread(); assert(cs->thread_state != NULL); cs->crashed = 0; - cs->exception_type = NULL; cs->exception_value = NULL; - cs->exception_traceback = NULL; #ifdef WITH_PY_TSS PyThread_tss_set(&h->tlskey, cs); #else @@ -2617,7 +2614,7 @@ int CallState_end (Handle *h, CallState *cs) { if (cs->crashed) { /* Restore the saved exception if we have one */ - if (cs->exception_type) { + if (cs->exception_value) { CallState_restore_exception(cs); } return 0; diff --git a/src/confluent_kafka/src/confluent_kafka.h b/src/confluent_kafka/src/confluent_kafka.h index c0691807e..1b7b8880a 100644 --- a/src/confluent_kafka/src/confluent_kafka.h +++ b/src/confluent_kafka/src/confluent_kafka.h @@ -270,9 +270,7 @@ int Handle_traverse (Handle *h, visitproc visit, void *arg); typedef struct { PyThreadState *thread_state; int crashed; /* Callback crashed */ - PyObject *exception_type; /* Stored exception type */ PyObject *exception_value; /* Stored exception value */ - PyObject *exception_traceback; /* Stored exception traceback */ } CallState; /** @@ -282,51 +280,42 @@ typedef struct { */ static inline void -cfl_exception_fetch(PyObject **exc_type, PyObject **exc_value, PyObject **exc_traceback) { +cfl_exception_fetch(PyObject **exc_value) { #if PY_VERSION_HEX >= 0x030c0000 /* Python 3.12+ - use new API */ - PyObject *exc = PyErr_GetRaisedException(); - if (exc) { - *exc_type = (PyObject *)Py_TYPE(exc); - Py_INCREF(*exc_type); - *exc_value = exc; - *exc_traceback = PyException_GetTraceback(exc); - } else { - *exc_type = *exc_value = *exc_traceback = NULL; - } + *exc_value = PyErr_GetRaisedException(); #else /* Python < 3.12 - use legacy API */ - PyErr_Fetch(exc_type, exc_value, exc_traceback); + PyObject *exc_type, *exc_traceback; + PyErr_Fetch(&exc_type, exc_value, &exc_traceback); + Py_XDECREF(exc_type); + Py_XDECREF(exc_traceback); #endif } static inline void -cfl_exception_restore(PyObject *exc_type, PyObject *exc_value, PyObject *exc_traceback) { +cfl_exception_restore(PyObject *exc_value) { #if PY_VERSION_HEX >= 0x030c0000 /* Python 3.12+ - use new API */ if (exc_value) { PyErr_SetRaisedException(exc_value); - Py_XDECREF(exc_type); - Py_XDECREF(exc_traceback); } #else /* Python < 3.12 - use legacy API */ - PyErr_Restore(exc_type, exc_value, exc_traceback); + PyErr_SetObject(PyExceptionInstance_Class(exc_value), exc_value); #endif } static inline void CallState_fetch_exception(CallState *cs) { - cfl_exception_fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); + cfl_exception_fetch(&cs->exception_value); } static inline void CallState_restore_exception(CallState *cs) { - if (cs->exception_type) { - cfl_exception_restore(cs->exception_type, cs->exception_value, cs->exception_traceback); - cs->exception_type = NULL; + if (cs->exception_value) { + cfl_exception_restore(cs->exception_value); cs->exception_value = NULL; - cs->exception_traceback = NULL; } } diff --git a/tests/test_Admin.py b/tests/test_Admin.py index 85f775dcb..7398786ea 100644 --- a/tests/test_Admin.py +++ b/tests/test_Admin.py @@ -1224,3 +1224,89 @@ def test_elect_leaders(): with pytest.raises(KafkaException): a.elect_leaders(correct_election_type, [correct_partitions])\ .result(timeout=1) + + +@pytest.mark.skipif(libversion()[1] < 0x000b0500, + reason="AdminAPI requires librdkafka >= v0.11.5") +def test_admin_callback_exception_no_system_error(): + """Test AdminClient callbacks exception handling with different exception types""" + + # Test error_cb with different exception types + def error_cb_kafka_exception(error): + raise KafkaException(KafkaError._FAIL, "KafkaException from error_cb") + + def error_cb_value_error(error): + raise ValueError("ValueError from error_cb") + + def error_cb_runtime_error(error): + raise RuntimeError("RuntimeError from error_cb") + + # Test error_cb with KafkaException + admin = AdminClient({ + 'bootstrap.servers': 'nonexistent-broker:9092', + 'socket.timeout.ms': 100, + 'error_cb': error_cb_kafka_exception + }) + + with pytest.raises(KafkaException) as exc_info: + admin.poll(timeout=0.2) + assert "KafkaException from error_cb" in str(exc_info.value) + + # Test error_cb with ValueError + admin = AdminClient({ + 'bootstrap.servers': 'nonexistent-broker:9092', + 'socket.timeout.ms': 100, + 'error_cb': error_cb_value_error + }) + + with pytest.raises(ValueError) as exc_info: + admin.poll(timeout=0.2) + assert "ValueError from error_cb" in str(exc_info.value) + + # Test error_cb with RuntimeError + admin = AdminClient({ + 'bootstrap.servers': 'nonexistent-broker:9092', + 'socket.timeout.ms': 100, + 'error_cb': error_cb_runtime_error + }) + + with pytest.raises(RuntimeError) as exc_info: + admin.poll(timeout=0.2) + assert "RuntimeError from error_cb" in str(exc_info.value) + + +@pytest.mark.skipif(libversion()[1] < 0x000b0500, + reason="AdminAPI requires librdkafka >= v0.11.5") +def test_admin_multiple_callbacks_different_error_types(): + """Test AdminClient with multiple callbacks configured with different error types to see which one gets triggered""" + + callbacks_called = [] + + def error_cb_that_raises_runtime(error): + callbacks_called.append('error_cb_runtime') + raise RuntimeError("RuntimeError from error_cb") + + def stats_cb_that_raises_value(stats_json): + callbacks_called.append('stats_cb_value') + raise ValueError("ValueError from stats_cb") + + def throttle_cb_that_raises_kafka(throttle_event): + callbacks_called.append('throttle_cb_kafka') + raise KafkaException(KafkaError._FAIL, "KafkaException from throttle_cb") + + admin = AdminClient({ + 'bootstrap.servers': 'nonexistent-broker:9092', + 'socket.timeout.ms': 100, + 'statistics.interval.ms': 100, # Enable stats callback + 'error_cb': error_cb_that_raises_runtime, + 'stats_cb': stats_cb_that_raises_value, + 'throttle_cb': throttle_cb_that_raises_kafka + }) + + # Test that error_cb callback raises an exception (it's triggered by connection failures) + with pytest.raises(RuntimeError) as exc_info: + admin.poll(timeout=0.2) + + # Verify that error_cb was called + assert len(callbacks_called) > 0 + assert 'error_cb_runtime' in callbacks_called \ No newline at end of file diff --git a/tests/test_Consumer.py b/tests/test_Consumer.py index 7ccc27302..ebd3764cd 100644 --- a/tests/test_Consumer.py +++ b/tests/test_Consumer.py @@ -317,35 +317,156 @@ def test_consumer_without_groupid(): def test_callback_exception_no_system_error(): + """Test all consumer callbacks exception handling with separate assertions for each callback""" - exception_raised = [] - + # Test error_cb callback + error_called = [] + def error_cb_that_raises(error): """Error callback that raises an exception""" - exception_raised.append(error) + error_called.append(error) raise RuntimeError("Test exception from error_cb") - # Create consumer with error callback that raises exception - consumer = TestConsumer({ - 'group.id': 'test-callback-systemerror-fix', - 'bootstrap.servers': 'nonexistent-broker:9092', # Will trigger error + consumer1 = TestConsumer({ + 'group.id': 'test-error-callback', + 'bootstrap.servers': 'nonexistent-broker:9092', 'socket.timeout.ms': 100, 'session.timeout.ms': 1000, 'error_cb': error_cb_that_raises }) - consumer.subscribe(['test-topic']) + consumer1.subscribe(['test-topic']) - # This should trigger the error callback due to connection failure - # Before fix: Would get RuntimeError + SystemError (Issue #865) - # After fix: Should only get RuntimeError (no SystemError) + # Test error_cb callback with pytest.raises(RuntimeError) as exc_info: - consumer.consume(timeout=0.1) + consumer1.consume(timeout=0.1) - # Verify we got the expected exception message + # Verify error_cb was called and raised the expected exception assert "Test exception from error_cb" in str(exc_info.value) + assert len(error_called) > 0 + consumer1.close() + + # Test stats_cb callback + stats_called = [] + + def stats_cb_that_raises(stats_json): + """Stats callback that raises an exception""" + stats_called.append(stats_json) + raise RuntimeError("Test exception from stats_cb") + + consumer2 = TestConsumer({ + 'group.id': 'test-stats-callback', + 'bootstrap.servers': 'nonexistent-broker:9092', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'statistics.interval.ms': 100, # Enable stats callback + 'stats_cb': stats_cb_that_raises + }) + + consumer2.subscribe(['test-topic']) + + # Test stats_cb callback + with pytest.raises(RuntimeError) as exc_info: + consumer2.consume(timeout=0.2) # Longer timeout to allow stats callback + + # Verify stats_cb was called and raised the expected exception + assert "Test exception from stats_cb" in str(exc_info.value) + assert len(stats_called) > 0 + consumer2.close() + + # Test throttle_cb callback (may not be triggered in this scenario) + throttle_called = [] + + def throttle_cb_that_raises(throttle_event): + """Throttle callback that raises an exception""" + throttle_called.append(throttle_event) + raise RuntimeError("Test exception from throttle_cb") + + consumer3 = TestConsumer({ + 'group.id': 'test-throttle-callback', + 'bootstrap.servers': 'nonexistent-broker:9092', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'throttle_cb': throttle_cb_that_raises + }) + + consumer3.subscribe(['test-topic']) + + # Test throttle_cb callback - may not be triggered, so we'll just verify it doesn't crash + try: + consumer3.consume(timeout=0.1) + # If no exception is raised, that's also fine - throttle_cb may not be triggered + print("Throttle callback not triggered in this scenario") + except RuntimeError as exc_info: + # If throttle_cb was triggered and raised an exception, verify it + if "Test exception from throttle_cb" in str(exc_info.value): + assert len(throttle_called) > 0 + print("Throttle callback was triggered and raised exception") + + consumer3.close() + + +def test_error_callback_exception_different_error_types(): + """Test error callback with different exception types""" + + def error_cb_kafka_exception(error): + """Error callback that raises KafkaException""" + raise KafkaException(error) + + def error_cb_value_error(error): + """Error callback that raises ValueError""" + raise ValueError(f"Custom error: {error}") + + def error_cb_runtime_error(error): + """Error callback that raises RuntimeError""" + raise RuntimeError(f"Runtime error: {error}") + + # Test with KafkaException + consumer1 = TestConsumer({ + 'group.id': 'test-kafka-exception', + 'bootstrap.servers': 'nonexistent-broker:9092', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'error_cb': error_cb_kafka_exception + }) + consumer1.subscribe(['test-topic']) + + with pytest.raises(KafkaException): + consumer1.consume(timeout=0.1) + consumer1.close() + + # Test with ValueError + consumer2 = TestConsumer({ + 'group.id': 'test-value-error', + 'bootstrap.servers': 'nonexistent-broker:9092', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'error_cb': error_cb_value_error + }) + consumer2.subscribe(['test-topic']) + + with pytest.raises(ValueError) as exc_info: + consumer2.consume(timeout=0.1) + assert "Custom error:" in str(exc_info.value) + consumer2.close() + + # Test with RuntimeError + consumer3 = TestConsumer({ + 'group.id': 'test-runtime-error', + 'bootstrap.servers': 'nonexistent-broker:9092', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'error_cb': error_cb_runtime_error + }) + consumer3.subscribe(['test-topic']) + + with pytest.raises(RuntimeError) as exc_info: + consumer3.consume(timeout=0.1) + assert "Runtime error:" in str(exc_info.value) + consumer3.close() + + + + - # Verify the error callback was actually called - assert len(exception_raised) > 0 - consumer.close() diff --git a/tests/test_Producer.py b/tests/test_Producer.py index 3640fd24f..10ef629af 100644 --- a/tests/test_Producer.py +++ b/tests/test_Producer.py @@ -1003,3 +1003,150 @@ def delivery_cb_that_raises(err, msg): # Verify the delivery callback was actually called assert len(delivery_reports) > 0 + + +def test_core_callbacks_exception_different_types(): + """Test error_cb, throttle_cb, and stats_cb exception handling with different exception types""" + + # Test error_cb with different exception types + def error_cb_kafka_exception(error): + raise KafkaException(KafkaError._FAIL, "KafkaException from error_cb") + + def error_cb_value_error(error): + raise ValueError("ValueError from error_cb") + + def error_cb_runtime_error(error): + raise RuntimeError("RuntimeError from error_cb") + + # Test error_cb exceptions - these should be triggered by connection failure + producer = Producer({ + 'bootstrap.servers': 'nonexistent-broker:9092', + 'socket.timeout.ms': 100, + 'error_cb': error_cb_kafka_exception + }) + + with pytest.raises(KafkaException) as exc_info: + producer.produce('test-topic', value='test-message') + producer.flush(timeout=2.0) + assert "KafkaException from error_cb" in str(exc_info.value) + + # Test error_cb with ValueError + producer = Producer({ + 'bootstrap.servers': 'nonexistent-broker:9092', + 'socket.timeout.ms': 100, + 'error_cb': error_cb_value_error + }) + + with pytest.raises(ValueError) as exc_info: + producer.produce('test-topic', value='test-message') + producer.flush(timeout=2.0) + assert "ValueError from error_cb" in str(exc_info.value) + + # Test error_cb with RuntimeError + producer = Producer({ + 'bootstrap.servers': 'nonexistent-broker:9092', + 'socket.timeout.ms': 100, + 'error_cb': error_cb_runtime_error + }) + + with pytest.raises(RuntimeError) as exc_info: + producer.produce('test-topic', value='test-message') + producer.flush(timeout=2.0) + assert "RuntimeError from error_cb" in str(exc_info.value) + + +def test_multiple_callbacks_exception_no_system_error(): + """Test multiple callbacks raising exceptions simultaneously""" + + callbacks_called = [] + + def error_cb_that_raises(error): + callbacks_called.append('error_cb') + raise RuntimeError("Test exception from error_cb") + + def throttle_cb_that_raises(throttle_event): + callbacks_called.append('throttle_cb') + raise RuntimeError("Test exception from throttle_cb") + + def stats_cb_that_raises(stats_json): + callbacks_called.append('stats_cb') + raise RuntimeError("Test exception from stats_cb") + + def delivery_cb_that_raises(err, msg): + callbacks_called.append('delivery_cb') + raise RuntimeError("Test exception from delivery_cb") + + producer = Producer({ + 'bootstrap.servers': 'nonexistent-broker:9092', + 'socket.timeout.ms': 100, + 'statistics.interval.ms': 100, + 'error_cb': error_cb_that_raises, + 'throttle_cb': throttle_cb_that_raises, + 'stats_cb': stats_cb_that_raises, + 'on_delivery': delivery_cb_that_raises + }) + + # This should trigger multiple callbacks + with pytest.raises(RuntimeError) as exc_info: + producer.produce('test-topic', value='test-message') + producer.flush(timeout=2.0) + + # Should get one of the exceptions (not SystemError) + assert "Test exception from" in str(exc_info.value) + assert len(callbacks_called) > 0 + + +def test_delivery_callback_exception_different_message_types(): + """Test delivery callback exception with different message types""" + + def delivery_cb_that_raises(err, msg): + raise RuntimeError("Test exception from delivery_cb") + + # Test with string message + producer = Producer({ + 'bootstrap.servers': 'nonexistent-broker:9092', + 'socket.timeout.ms': 100, + 'message.timeout.ms': 10, + 'on_delivery': delivery_cb_that_raises + }) + + with pytest.raises(RuntimeError) as exc_info: + producer.produce('test-topic', value='string-message') + producer.flush(timeout=2.0) + + assert "Test exception from delivery_cb" in str(exc_info.value) + + # Test with bytes message + producer = Producer({ + 'bootstrap.servers': 'nonexistent-broker:9092', + 'socket.timeout.ms': 100, + 'message.timeout.ms': 10, + 'on_delivery': delivery_cb_that_raises + }) + + with pytest.raises(RuntimeError) as exc_info: + producer.produce('test-topic', value=b'bytes-message') + producer.flush(timeout=2.0) + + assert "Test exception from delivery_cb" in str(exc_info.value) + + +def test_callback_exception_with_producer_methods(): + """Test callback exception with different producer methods""" + + def delivery_cb_that_raises(err, msg): + raise RuntimeError("Test exception from delivery_cb") + + producer = Producer({ + 'bootstrap.servers': 'nonexistent-broker:9092', + 'socket.timeout.ms': 100, + 'message.timeout.ms': 10, + 'on_delivery': delivery_cb_that_raises + }) + + # Test with flush method - this should trigger the callback + with pytest.raises(RuntimeError) as exc_info: + producer.produce('test-topic', value='test-message') + producer.flush(timeout=2.0) + + assert "Test exception from delivery_cb" in str(exc_info.value) From 0798f7f52c5016d182b4c019e8615562fdbb4b4f Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Fri, 24 Oct 2025 21:50:18 +0530 Subject: [PATCH 9/9] Fix linter --- tests/test_Admin.py | 109 ++++++++++++++++++++--------------------- tests/test_Consumer.py | 32 +++++------- tests/test_Producer.py | 57 ++++++++++----------- 3 files changed, 95 insertions(+), 103 deletions(-) diff --git a/tests/test_Admin.py b/tests/test_Admin.py index 7398786ea..e6168cf9f 100644 --- a/tests/test_Admin.py +++ b/tests/test_Admin.py @@ -658,20 +658,20 @@ def test_describe_topics_api(): # Wrong argument type for args in [ - [topic_names], - ["test-topic-1"], - [TopicCollection([3])], - [TopicCollection(["correct", 3])], - [TopicCollection([None])] - ]: + [topic_names], + ["test-topic-1"], + [TopicCollection([3])], + [TopicCollection(["correct", 3])], + [TopicCollection([None])] + ]: with pytest.raises(TypeError): a.describe_topics(*args, **kwargs) # Wrong argument value for args in [ - [TopicCollection([""])], - [TopicCollection(["correct", ""])] - ]: + [TopicCollection([""])], + [TopicCollection(["correct", ""])] + ]: with pytest.raises(ValueError): a.describe_topics(*args, **kwargs) @@ -1053,13 +1053,13 @@ def test_list_offsets_api(): # Wrong option types for kwargs in [ - { - "isolation_level": 10 - }, - { - "request_timeout": "test" - } - ]: + { + "isolation_level": 10 + }, + { + "request_timeout": "test" + } + ]: requests = { TopicPartition("topic1", 0, 10): OffsetSpec.earliest() } @@ -1068,10 +1068,10 @@ def test_list_offsets_api(): # Wrong option values for kwargs in [ - { - "request_timeout": -1 - } - ]: + { + "request_timeout": -1 + } + ]: requests = { TopicPartition("topic1", 0, 10): OffsetSpec.earliest() } @@ -1095,13 +1095,13 @@ def test_list_offsets_api(): # Invalid TopicPartition for requests in [ - { - TopicPartition("", 0, 10): OffsetSpec.earliest() - }, - { - TopicPartition("correct", -1, 10): OffsetSpec.earliest() - } - ]: + { + TopicPartition("", 0, 10): OffsetSpec.earliest() + }, + { + TopicPartition("correct", -1, 10): OffsetSpec.earliest() + } + ]: with pytest.raises(ValueError): a.list_offsets(requests, **kwargs) @@ -1131,33 +1131,33 @@ def test_list_offsets_api(): # Key isn't a TopicPartition for requests in [ - { - "not-topic-partition": OffsetSpec.latest() - }, - { - TopicPartition("topic1", 0, 10): OffsetSpec.latest(), - "not-topic-partition": OffsetSpec.latest() - }, - { - None: OffsetSpec.latest() - } - ]: + { + "not-topic-partition": OffsetSpec.latest() + }, + { + TopicPartition("topic1", 0, 10): OffsetSpec.latest(), + "not-topic-partition": OffsetSpec.latest() + }, + { + None: OffsetSpec.latest() + } + ]: with pytest.raises(TypeError): a.list_offsets(requests, **kwargs) # Value isn't a OffsetSpec for requests in [ - { - TopicPartition("topic1", 0, 10): "test" - }, - { - TopicPartition("topic1", 0, 10): OffsetSpec.latest(), - TopicPartition("topic1", 0, 10): "test" - }, - { - TopicPartition("topic1", 0, 10): None - } - ]: + { + TopicPartition("topic1", 0, 10): "test" + }, + { + TopicPartition("topic1", 0, 10): OffsetSpec.latest(), + TopicPartition("topic1", 0, 10): "test" + }, + { + TopicPartition("topic1", 0, 10): None + } + ]: with pytest.raises(TypeError): a.list_offsets(requests, **kwargs) @@ -1226,8 +1226,6 @@ def test_elect_leaders(): .result(timeout=1) -@pytest.mark.skipif(libversion()[1] < 0x000b0500, - reason="AdminAPI requires librdkafka >= v0.11.5") def test_admin_callback_exception_no_system_error(): """Test AdminClient callbacks exception handling with different exception types""" @@ -1275,10 +1273,9 @@ def error_cb_runtime_error(error): assert "RuntimeError from error_cb" in str(exc_info.value) -@pytest.mark.skipif(libversion()[1] < 0x000b0500, - reason="AdminAPI requires librdkafka >= v0.11.5") def test_admin_multiple_callbacks_different_error_types(): - """Test AdminClient with multiple callbacks configured with different error types to see which one gets triggered""" + """Test AdminClient with multiple callbacks configured with different error types +to see which one gets triggered""" callbacks_called = [] @@ -1304,9 +1301,9 @@ def throttle_cb_that_raises_kafka(throttle_event): }) # Test that error_cb callback raises an exception (it's triggered by connection failures) - with pytest.raises(RuntimeError) as exc_info: + with pytest.raises(RuntimeError): admin.poll(timeout=0.2) # Verify that error_cb was called assert len(callbacks_called) > 0 - assert 'error_cb_runtime' in callbacks_called \ No newline at end of file + assert 'error_cb_runtime' in callbacks_called diff --git a/tests/test_Consumer.py b/tests/test_Consumer.py index ebd3764cd..cb3e832ff 100644 --- a/tests/test_Consumer.py +++ b/tests/test_Consumer.py @@ -321,7 +321,7 @@ def test_callback_exception_no_system_error(): # Test error_cb callback error_called = [] - + def error_cb_that_raises(error): """Error callback that raises an exception""" error_called.append(error) @@ -348,7 +348,7 @@ def error_cb_that_raises(error): # Test stats_cb callback stats_called = [] - + def stats_cb_that_raises(stats_json): """Stats callback that raises an exception""" stats_called.append(stats_json) @@ -376,7 +376,7 @@ def stats_cb_that_raises(stats_json): # Test throttle_cb callback (may not be triggered in this scenario) throttle_called = [] - + def throttle_cb_that_raises(throttle_event): """Throttle callback that raises an exception""" throttle_called.append(throttle_event) @@ -402,25 +402,25 @@ def throttle_cb_that_raises(throttle_event): if "Test exception from throttle_cb" in str(exc_info.value): assert len(throttle_called) > 0 print("Throttle callback was triggered and raised exception") - + consumer3.close() def test_error_callback_exception_different_error_types(): """Test error callback with different exception types""" - + def error_cb_kafka_exception(error): """Error callback that raises KafkaException""" raise KafkaException(error) - + def error_cb_value_error(error): """Error callback that raises ValueError""" raise ValueError(f"Custom error: {error}") - + def error_cb_runtime_error(error): """Error callback that raises RuntimeError""" raise RuntimeError(f"Runtime error: {error}") - + # Test with KafkaException consumer1 = TestConsumer({ 'group.id': 'test-kafka-exception', @@ -430,11 +430,11 @@ def error_cb_runtime_error(error): 'error_cb': error_cb_kafka_exception }) consumer1.subscribe(['test-topic']) - + with pytest.raises(KafkaException): consumer1.consume(timeout=0.1) consumer1.close() - + # Test with ValueError consumer2 = TestConsumer({ 'group.id': 'test-value-error', @@ -444,12 +444,12 @@ def error_cb_runtime_error(error): 'error_cb': error_cb_value_error }) consumer2.subscribe(['test-topic']) - + with pytest.raises(ValueError) as exc_info: consumer2.consume(timeout=0.1) assert "Custom error:" in str(exc_info.value) consumer2.close() - + # Test with RuntimeError consumer3 = TestConsumer({ 'group.id': 'test-runtime-error', @@ -459,14 +459,8 @@ def error_cb_runtime_error(error): 'error_cb': error_cb_runtime_error }) consumer3.subscribe(['test-topic']) - + with pytest.raises(RuntimeError) as exc_info: consumer3.consume(timeout=0.1) assert "Runtime error:" in str(exc_info.value) consumer3.close() - - - - - - diff --git a/tests/test_Producer.py b/tests/test_Producer.py index 10ef629af..afb784ebc 100644 --- a/tests/test_Producer.py +++ b/tests/test_Producer.py @@ -974,6 +974,7 @@ def test_produce_batch_api_compatibility(): # AvroProducer not available - skip this part pytest.skip("AvroProducer not available") + def test_callback_exception_no_system_error(): delivery_reports = [] @@ -1007,48 +1008,48 @@ def delivery_cb_that_raises(err, msg): def test_core_callbacks_exception_different_types(): """Test error_cb, throttle_cb, and stats_cb exception handling with different exception types""" - + # Test error_cb with different exception types def error_cb_kafka_exception(error): raise KafkaException(KafkaError._FAIL, "KafkaException from error_cb") - + def error_cb_value_error(error): raise ValueError("ValueError from error_cb") - + def error_cb_runtime_error(error): raise RuntimeError("RuntimeError from error_cb") - + # Test error_cb exceptions - these should be triggered by connection failure producer = Producer({ 'bootstrap.servers': 'nonexistent-broker:9092', 'socket.timeout.ms': 100, 'error_cb': error_cb_kafka_exception }) - + with pytest.raises(KafkaException) as exc_info: producer.produce('test-topic', value='test-message') producer.flush(timeout=2.0) assert "KafkaException from error_cb" in str(exc_info.value) - + # Test error_cb with ValueError producer = Producer({ 'bootstrap.servers': 'nonexistent-broker:9092', 'socket.timeout.ms': 100, 'error_cb': error_cb_value_error }) - + with pytest.raises(ValueError) as exc_info: producer.produce('test-topic', value='test-message') producer.flush(timeout=2.0) assert "ValueError from error_cb" in str(exc_info.value) - + # Test error_cb with RuntimeError producer = Producer({ 'bootstrap.servers': 'nonexistent-broker:9092', 'socket.timeout.ms': 100, 'error_cb': error_cb_runtime_error }) - + with pytest.raises(RuntimeError) as exc_info: producer.produce('test-topic', value='test-message') producer.flush(timeout=2.0) @@ -1057,25 +1058,25 @@ def error_cb_runtime_error(error): def test_multiple_callbacks_exception_no_system_error(): """Test multiple callbacks raising exceptions simultaneously""" - + callbacks_called = [] - + def error_cb_that_raises(error): callbacks_called.append('error_cb') raise RuntimeError("Test exception from error_cb") - + def throttle_cb_that_raises(throttle_event): callbacks_called.append('throttle_cb') raise RuntimeError("Test exception from throttle_cb") - + def stats_cb_that_raises(stats_json): callbacks_called.append('stats_cb') raise RuntimeError("Test exception from stats_cb") - + def delivery_cb_that_raises(err, msg): callbacks_called.append('delivery_cb') raise RuntimeError("Test exception from delivery_cb") - + producer = Producer({ 'bootstrap.servers': 'nonexistent-broker:9092', 'socket.timeout.ms': 100, @@ -1085,12 +1086,12 @@ def delivery_cb_that_raises(err, msg): 'stats_cb': stats_cb_that_raises, 'on_delivery': delivery_cb_that_raises }) - + # This should trigger multiple callbacks with pytest.raises(RuntimeError) as exc_info: producer.produce('test-topic', value='test-message') producer.flush(timeout=2.0) - + # Should get one of the exceptions (not SystemError) assert "Test exception from" in str(exc_info.value) assert len(callbacks_called) > 0 @@ -1098,10 +1099,10 @@ def delivery_cb_that_raises(err, msg): def test_delivery_callback_exception_different_message_types(): """Test delivery callback exception with different message types""" - + def delivery_cb_that_raises(err, msg): raise RuntimeError("Test exception from delivery_cb") - + # Test with string message producer = Producer({ 'bootstrap.servers': 'nonexistent-broker:9092', @@ -1109,13 +1110,13 @@ def delivery_cb_that_raises(err, msg): 'message.timeout.ms': 10, 'on_delivery': delivery_cb_that_raises }) - + with pytest.raises(RuntimeError) as exc_info: producer.produce('test-topic', value='string-message') producer.flush(timeout=2.0) - + assert "Test exception from delivery_cb" in str(exc_info.value) - + # Test with bytes message producer = Producer({ 'bootstrap.servers': 'nonexistent-broker:9092', @@ -1123,30 +1124,30 @@ def delivery_cb_that_raises(err, msg): 'message.timeout.ms': 10, 'on_delivery': delivery_cb_that_raises }) - + with pytest.raises(RuntimeError) as exc_info: producer.produce('test-topic', value=b'bytes-message') producer.flush(timeout=2.0) - + assert "Test exception from delivery_cb" in str(exc_info.value) def test_callback_exception_with_producer_methods(): """Test callback exception with different producer methods""" - + def delivery_cb_that_raises(err, msg): raise RuntimeError("Test exception from delivery_cb") - + producer = Producer({ 'bootstrap.servers': 'nonexistent-broker:9092', 'socket.timeout.ms': 100, 'message.timeout.ms': 10, 'on_delivery': delivery_cb_that_raises }) - + # Test with flush method - this should trigger the callback with pytest.raises(RuntimeError) as exc_info: producer.produce('test-topic', value='test-message') producer.flush(timeout=2.0) - + assert "Test exception from delivery_cb" in str(exc_info.value)