@@ -122,11 +122,11 @@ impl RabbitmqClient {
122122 }
123123
124124 // Receive messages from a queue
125- pub async fn await_message (
125+ // Receive messages from a queue with no timeout
126+ pub async fn await_message_no_timeout (
126127 & self ,
127128 queue_name : & str ,
128129 message_id : String ,
129- timeout : Duration ,
130130 ack_on_success : bool ,
131131 ) -> Result < Message , RabbitMqError > {
132132 let mut consumer = {
@@ -149,53 +149,64 @@ impl RabbitmqClient {
149149
150150 debug ! ( "Starting to consume from {}" , queue_name) ;
151151
152- // Create a future for the next message
153- let receive_future = async {
154- while let Some ( delivery_result) = consumer. next ( ) . await {
155- let delivery = match delivery_result {
156- Ok ( del) => del,
157- Err ( _) => return Err ( RabbitMqError :: DeserializationError ) ,
158- } ;
159- let data = & delivery. data ;
160- let message_str = match std:: str:: from_utf8 ( & data) {
161- Ok ( str) => str,
162- Err ( _) => {
163- return Err ( RabbitMqError :: DeserializationError ) ;
164- }
165- } ;
166-
167- debug ! ( "Received message: {}" , message_str) ;
168-
169- // Parse the message
170- let message = match serde_json:: from_str :: < Message > ( message_str) {
171- Ok ( m) => m,
172- Err ( e) => {
173- log:: error!( "Failed to parse message: {}" , e) ;
174- return Err ( RabbitMqError :: DeserializationError ) ;
175- }
176- } ;
177-
178- if message. message_id == message_id {
179- if ack_on_success {
180- delivery
181- . ack ( lapin:: options:: BasicAckOptions :: default ( ) )
182- . await
183- . expect ( "Failed to acknowledge message" ) ;
184- }
185-
186- return Ok ( message) ;
152+ while let Some ( delivery_result) = consumer. next ( ) . await {
153+ let delivery = match delivery_result {
154+ Ok ( del) => del,
155+ Err ( _) => return Err ( RabbitMqError :: DeserializationError ) ,
156+ } ;
157+ let data = & delivery. data ;
158+ let message_str = match std:: str:: from_utf8 ( & data) {
159+ Ok ( str) => str,
160+ Err ( _) => {
161+ return Err ( RabbitMqError :: DeserializationError ) ;
162+ }
163+ } ;
164+
165+ debug ! ( "Received message: {}" , message_str) ;
166+
167+ // Parse the message
168+ let message = match serde_json:: from_str :: < Message > ( message_str) {
169+ Ok ( m) => m,
170+ Err ( e) => {
171+ log:: error!( "Failed to parse message: {}" , e) ;
172+ return Err ( RabbitMqError :: DeserializationError ) ;
187173 }
174+ } ;
175+
176+ if message. message_id == message_id {
177+ if ack_on_success {
178+ delivery
179+ . ack ( lapin:: options:: BasicAckOptions :: default ( ) )
180+ . await
181+ . expect ( "Failed to acknowledge message" ) ;
182+ }
183+
184+ return Ok ( message) ;
188185 }
189- Err ( RabbitMqError :: DeserializationError )
190- } ;
186+ }
187+ Err ( RabbitMqError :: DeserializationError )
188+ }
191189
192- // Set a timeout of 10 seconds
193- match tokio:: time:: timeout ( timeout, receive_future) . await {
190+ // Receive messages from a queue with timeout
191+ pub async fn await_message (
192+ & self ,
193+ queue_name : & str ,
194+ message_id : String ,
195+ timeout : Duration ,
196+ ack_on_success : bool ,
197+ ) -> Result < Message , RabbitMqError > {
198+ // Set a timeout
199+ match tokio:: time:: timeout (
200+ timeout,
201+ self . await_message_no_timeout ( queue_name, message_id, ack_on_success) ,
202+ )
203+ . await
204+ {
194205 Ok ( result) => result,
195206 Err ( _) => {
196207 debug ! (
197208 "Timeout waiting for message after {} seconds" ,
198- timeout. as_millis ( ) / 1000
209+ timeout. as_secs ( )
199210 ) ;
200211 Err ( RabbitMqError :: TimeoutError )
201212 }
0 commit comments