-
Notifications
You must be signed in to change notification settings - Fork 2
SIANXSVC-826: Added direct-reply result backend #5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
cb210b8 to
75088c5
Compare
|
Fixed a bug where messages were received in the wrong order and added a unit-test for it. |
PatrickTaibel
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've absolutely no clue if this is a good idea as I don't completely understand how celery backends work. It has potential as at least during my short test I can see ~50% performance improvements with 1 worker process.
What needs to be fixed is that, right now, this only works when no concurrent executions happen. I marked the lines were I got the greenlet exceptions but there might be more. It seems we need to lock this somehow but I'm not sure how and especially not how to achieve this within a Celery result backend.
celery_amqp_backend/backend.py
Outdated
| consumer.consume() | ||
|
|
||
| try: | ||
| consumer.connection.drain_events(timeout=0.5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This call fails when multiple concurrent executions happen.
Exception: ConcurrentObjectUseError
Message: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent._gevent_c_waiter.Waiter object at 0x7fb6c0c1ebf0>>
Traceback:
File \"/usr/local/lib/python3.7/site-packages/celery/result.py\", line 231, in get
on_message=on_message,
File \"/usr/local/lib/python3.7/site-packages/celery/backends/base.py\", line 760, in wait_for_pending
no_ack=no_ack,
File \"/app/app/celery_amqp_backend/backend.py\", line 471, in wait_for
on_interval=on_interval
File \"/usr/local/lib/python3.7/site-packages/celery/backends/base.py\", line 783, in wait_for
meta = self.get_task_meta(task_id)
File \"/app/app/celery_amqp_backend/backend.py\", line 510, in get_task_meta
consumer.connection.drain_events(timeout=0.5)
File \"/usr/local/lib/python3.7/site-packages/kombu/connection.py\", line 316, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File \"/usr/local/lib/python3.7/site-packages/kombu/transport/pyamqp.py\", line 169, in drain_events
return connection.drain_events(**kwargs)
File \"/usr/local/lib/python3.7/site-packages/amqp/connection.py\", line 525, in drain_events
while not self.blocking_read(timeout):
File \"/usr/local/lib/python3.7/site-packages/amqp/connection.py\", line 530, in blocking_read
frame = self.transport.read_frame()
File \"/usr/local/lib/python3.7/site-packages/amqp/transport.py\", line 294, in read_frame
frame_header = read(7, True)
File \"/usr/local/lib/python3.7/site-packages/amqp/transport.py\", line 627, in _read
s = recv(n - len(rbuf))
File \"/usr/local/lib/python3.7/site-packages/gevent/_socketcommon.py\", line 663, in recv
self._wait(self._read_event)
File \"src/gevent/_hub_primitives.py\", line 317, in gevent._gevent_c_hub_primitives.wait_on_socket
File \"src/gevent/_hub_primitives.py\", line 322, in gevent._gevent_c_hub_primitives.wait_on_socket
File \"src/gevent/_hub_primitives.py\", line 297, in gevent._gevent_c_hub_primitives._primitive_wait
celery_amqp_backend/backend.py
Outdated
| consumer = kombu.Consumer( | ||
| channel, | ||
| queues=[consumer_queue], | ||
| auto_declare=True, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This call fails when multiple concurrent executions happen.
Exception: ConcurrentObjectUseError
Message: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent._gevent_c_waiter.Waiter object at 0x7fb6c0c1ebf0>>
Traceback:
File \"/usr/local/lib/python3.7/site-packages/celery/app/base.py\", line 787, in send_task
self.backend.on_task_call(P, task_id)
File \"/app/app/celery_amqp_backend/backend.py\", line 576, in on_task_call
producer.channel,
File \"/app/app/celery_amqp_backend/backend.py\", line 590, in _create_consumer
auto_declare=True,
File \"/usr/local/lib/python3.7/site-packages/kombu/messaging.py\", line 387, in __init__
self.revive(self.channel)
File \"/usr/local/lib/python3.7/site-packages/kombu/messaging.py\", line 409, in revive
self.declare()
File \"/usr/local/lib/python3.7/site-packages/kombu/messaging.py\", line 422, in declare
queue.declare()
File \"/usr/local/lib/python3.7/site-packages/kombu/entity.py\", line 606, in declare
self._create_queue(nowait=nowait, channel=channel)
File \"/usr/local/lib/python3.7/site-packages/kombu/entity.py\", line 615, in _create_queue
self.queue_declare(nowait=nowait, passive=False, channel=channel)
File \"/usr/local/lib/python3.7/site-packages/kombu/entity.py\", line 650, in queue_declare
nowait=nowait,
File \"/usr/local/lib/python3.7/site-packages/amqp/channel.py\", line 1163, in queue_declare
spec.Queue.DeclareOk, returns_tuple=True,
File \"/usr/local/lib/python3.7/site-packages/amqp/abstract_channel.py\", line 99, in wait
self.connection.drain_events(timeout=timeout)
File \"/usr/local/lib/python3.7/site-packages/amqp/connection.py\", line 525, in drain_events
while not self.blocking_read(timeout):
File \"/usr/local/lib/python3.7/site-packages/amqp/connection.py\", line 530, in blocking_read
frame = self.transport.read_frame()
File \"/usr/local/lib/python3.7/site-packages/amqp/transport.py\", line 294, in read_frame
frame_header = read(7, True)
File \"/usr/local/lib/python3.7/site-packages/amqp/transport.py\", line 627, in _read
s = recv(n - len(rbuf))
File \"/usr/local/lib/python3.7/site-packages/gevent/_socketcommon.py\", line 663, in recv
self._wait(self._read_event)
File \"src/gevent/_hub_primitives.py\", line 317, in gevent._gevent_c_hub_primitives.wait_on_socket
File \"src/gevent/_hub_primitives.py\", line 322, in gevent._gevent_c_hub_primitives.wait_on_socket
File \"src/gevent/_hub_primitives.py\", line 297, in gevent._gevent_c_hub_primitives._primitive_wait
1f7ff5e to
3eb7708
Compare
3eb7708 to
51a2803
Compare
Codecov ReportBase: 86.66% // Head: 88.60% // Increases project coverage by
Additional details and impacted files@@ Coverage Diff @@
## main #5 +/- ##
==========================================
+ Coverage 86.66% 88.60% +1.93%
==========================================
Files 3 3
Lines 150 272 +122
==========================================
+ Hits 130 241 +111
- Misses 20 31 +11
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report at Codecov. |
| with version 5.0. Celery encourages you to use the newer `rpc://` result backend, as it does not create a new | ||
| result queue for each task and thus is faster in many circumstances. However, it's not always possible to switch | ||
| to the new `rpc://` result backend, as it does have restrictions as follows: | ||
| `celery-amqp-backend` contains two result backens for Celery. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: backens
Implemented changes:
DirectReplyAMQPBackendthat uses RabbitMQ direct-reply for resultsor groups)