@@ -68,7 +68,7 @@ use std::{io, thread};
6868
6969use std:: sync:: atomic:: AtomicUsize ;
7070use std:: sync:: atomic:: Ordering ;
71- use std:: sync:: Mutex ;
71+ use std:: sync:: { Arc , Condvar , Mutex } ;
7272use take_mut:: take;
7373
7474use std:: panic:: { catch_unwind, AssertUnwindSafe } ;
@@ -245,6 +245,12 @@ where
245245 thread_name : Option < String > ,
246246}
247247
248+ struct SpawnedThreadInfo {
249+ sender : Sender < AsyncMsg > ,
250+ flush_lock : Arc < Mutex < FlushStatus > > ,
251+ flush_cond : Arc < Condvar > ,
252+ }
253+
248254impl < D > AsyncCoreBuilder < D >
249255where
250256 D : slog:: Drain < Err = slog:: Never , Ok = ( ) > + Send + ' static ,
@@ -287,25 +293,71 @@ where
287293 self
288294 }
289295
290- fn spawn_thread ( self ) -> ( thread:: JoinHandle < ( ) > , Sender < AsyncMsg > ) {
296+ fn spawn_thread ( self ) -> ( thread:: JoinHandle < ( ) > , SpawnedThreadInfo ) {
291297 let ( tx, rx) = crossbeam_channel:: bounded ( self . chan_size ) ;
292298 let mut builder = thread:: Builder :: new ( ) ;
293299 if let Some ( thread_name) = self . thread_name {
294300 builder = builder. name ( thread_name) ;
295301 }
302+ let flush_lock = Arc :: new ( Mutex :: new ( FlushStatus :: NotRequested ) ) ;
303+ let flush_cond = Arc :: new ( Condvar :: new ( ) ) ;
304+ let state = SpawnedThreadInfo {
305+ sender : tx,
306+ flush_lock : Arc :: clone ( & flush_lock) ,
307+ flush_cond : Arc :: clone ( & flush_cond) ,
308+ } ;
296309 let drain = self . drain ;
297310 let join = builder
298311 . spawn ( move || {
299312 let drain = AssertUnwindSafe ( & drain) ;
313+ let mut gave_flush_warning = false ;
300314 // catching possible unwinding panics which can occur in used inner Drain implementation
301315 if let Err ( panic_cause) = catch_unwind ( move || loop {
316+ let mut give_flush_warning = |x : & str | {
317+ if !gave_flush_warning {
318+ eprintln ! ( "slog-async: {}" , x) ;
319+ }
320+ gave_flush_warning = true ;
321+ } ;
302322 match rx. recv ( ) {
303323 Ok ( AsyncMsg :: Record ( r) ) => {
304324 if r. log_to ( & * drain) . is_err ( ) {
305325 eprintln ! ( "slog-async failed while writing" ) ;
306326 return ;
307327 }
308- }
328+ } ,
329+ Ok ( AsyncMsg :: Flush ) => {
330+ let status_lock = match flush_lock. lock ( ) {
331+ Err ( _e) => {
332+ give_flush_warning ( "fush lock poisoned" ) ;
333+ continue ;
334+ } ,
335+ Ok ( s) => s,
336+ } ;
337+ if !matches ! ( * status_lock, FlushStatus :: Pending ) {
338+ flush_cond. notify_all ( ) ;
339+ drop ( status_lock) ;
340+ continue ;
341+ }
342+ drop ( status_lock) ;
343+ let res_status = match drain. flush ( ) {
344+ Ok ( ( ) ) => FlushStatus :: Success ,
345+ Err ( e) => FlushStatus :: Failure ( e) ,
346+ } ;
347+ let mut status_lock = match flush_lock. lock ( ) {
348+ Err ( _e) => {
349+ give_flush_warning ( "fush lock poisoned" ) ;
350+ continue ;
351+ } ,
352+ Ok ( s) => s,
353+ } ;
354+ if !matches ! ( * status_lock, FlushStatus :: Pending ) {
355+ give_flush_warning ( "fush status corrupted" ) ;
356+ }
357+ * status_lock = res_status;
358+ flush_cond. notify_all ( ) ;
359+ drop ( status_lock) ;
360+ } ,
309361 Ok ( AsyncMsg :: Finish ) => return ,
310362 Err ( recv_error) => {
311363 eprintln ! ( "slog-async failed while receiving: {recv_error}" ) ;
@@ -318,7 +370,7 @@ where
318370 } )
319371 . unwrap ( ) ;
320372
321- ( join, tx )
373+ ( join, state )
322374 }
323375
324376 /// Build `AsyncCore`
@@ -329,12 +381,14 @@ where
329381 /// Build `AsyncCore`
330382 pub fn build_no_guard ( self ) -> AsyncCore {
331383 let blocking = self . blocking ;
332- let ( join, tx ) = self . spawn_thread ( ) ;
384+ let ( join, info ) = self . spawn_thread ( ) ;
333385
334386 AsyncCore {
335- ref_sender : tx ,
387+ ref_sender : info . sender ,
336388 tl_sender : thread_local:: ThreadLocal :: new ( ) ,
337389 join : Mutex :: new ( Some ( join) ) ,
390+ flush_lock : info. flush_lock ,
391+ flush_cond : info. flush_cond ,
338392 blocking,
339393 }
340394 }
@@ -344,18 +398,20 @@ where
344398 /// See `AsyncGuard` for more information.
345399 pub fn build_with_guard ( self ) -> ( AsyncCore , AsyncGuard ) {
346400 let blocking = self . blocking ;
347- let ( join, tx ) = self . spawn_thread ( ) ;
401+ let ( join, info ) = self . spawn_thread ( ) ;
348402
349403 (
350404 AsyncCore {
351- ref_sender : tx . clone ( ) ,
405+ ref_sender : info . sender . clone ( ) ,
352406 tl_sender : thread_local:: ThreadLocal :: new ( ) ,
353407 join : Mutex :: new ( None ) ,
408+ flush_lock : info. flush_lock ,
409+ flush_cond : info. flush_cond ,
354410 blocking,
355411 } ,
356412 AsyncGuard {
357413 join : Some ( join) ,
358- tx,
414+ tx : info . sender ,
359415 } ,
360416 )
361417 }
@@ -403,6 +459,14 @@ impl Drop for AsyncGuard {
403459 }
404460}
405461
462+ #[ derive( Debug ) ]
463+ enum FlushStatus {
464+ NotRequested ,
465+ Pending ,
466+ Failure ( slog:: FlushError ) ,
467+ Success ,
468+ }
469+
406470/// Core of `Async` drain
407471///
408472/// See `Async` for documentation.
@@ -418,6 +482,8 @@ pub struct AsyncCore {
418482 tl_sender : thread_local:: ThreadLocal < Sender < AsyncMsg > > ,
419483 join : Mutex < Option < thread:: JoinHandle < ( ) > > > ,
420484 blocking : bool ,
485+ flush_lock : Arc < Mutex < FlushStatus > > ,
486+ flush_cond : Arc < Condvar > ,
421487}
422488
423489impl AsyncCore {
@@ -474,8 +540,52 @@ impl Drain for AsyncCore {
474540 ) -> AsyncResult < ( ) > {
475541 self . send ( AsyncRecord :: from ( record, logger_values) )
476542 }
477- }
478543
544+ fn flush ( & self ) -> Result < ( ) , slog:: FlushError > {
545+ fn handle_poison (
546+ mut e : std:: sync:: PoisonError <
547+ std:: sync:: MutexGuard < ' _ , FlushStatus > ,
548+ > ,
549+ ) -> std:: io:: Error {
550+ * * e. get_mut ( ) = FlushStatus :: NotRequested ; // cancel request, allowing retry
551+ std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , "mutex poisoned" )
552+ }
553+ let sender = self . get_sender ( ) . map_err ( |_e| {
554+ std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , "mutex poisoned" )
555+ } ) ?;
556+ let mut status_lock = self . flush_lock . lock ( ) . map_err ( handle_poison) ?;
557+ while !matches ! ( * status_lock, FlushStatus :: NotRequested ) {
558+ // another flush is in progress, block until that one succeeds
559+ status_lock =
560+ self . flush_cond . wait ( status_lock) . map_err ( handle_poison) ?;
561+ }
562+ assert ! (
563+ matches!( * status_lock, FlushStatus :: NotRequested ) ,
564+ "{:?}" ,
565+ * status_lock
566+ ) ;
567+ match sender. send ( AsyncMsg :: Flush ) {
568+ Ok ( ( ) ) => { }
569+ Err ( _) => {
570+ return Err ( std:: io:: Error :: new (
571+ std:: io:: ErrorKind :: Other ,
572+ "channel disconnected" ,
573+ )
574+ . into ( ) ) ;
575+ }
576+ }
577+ * status_lock = FlushStatus :: Pending ;
578+ while matches ! ( * status_lock, FlushStatus :: Pending ) {
579+ status_lock =
580+ self . flush_cond . wait ( status_lock) . map_err ( handle_poison) ?;
581+ }
582+ match std:: mem:: replace ( & mut * status_lock, FlushStatus :: NotRequested ) {
583+ FlushStatus :: NotRequested | FlushStatus :: Pending => unreachable ! ( ) ,
584+ FlushStatus :: Failure ( cause) => Err ( cause) ,
585+ FlushStatus :: Success => Ok ( ( ) ) ,
586+ }
587+ }
588+ }
479589/// Serialized record.
480590pub struct AsyncRecord {
481591 msg : String ,
@@ -545,6 +655,7 @@ impl AsyncRecord {
545655enum AsyncMsg {
546656 Record ( AsyncRecord ) ,
547657 Finish ,
658+ Flush ,
548659}
549660
550661impl Drop for AsyncCore {
@@ -796,6 +907,10 @@ impl Drain for Async {
796907
797908 Ok ( ( ) )
798909 }
910+
911+ fn flush ( & self ) -> Result < ( ) , slog:: FlushError > {
912+ self . core . flush ( )
913+ }
799914}
800915
801916impl Drop for Async {
@@ -806,7 +921,6 @@ impl Drop for Async {
806921
807922// }}}
808923
809-
810924#[ cfg( test) ]
811925mod test {
812926 use super :: * ;
@@ -815,25 +929,45 @@ mod test {
815929 #[ test]
816930 fn integration_test ( ) {
817931 let ( mock_drain, mock_drain_rx) = MockDrain :: new ( ) ;
818- let async_drain = AsyncBuilder :: new ( mock_drain)
819- . build ( ) ;
820- let slog = slog:: Logger :: root ( async_drain. fuse ( ) , o ! ( "field1" => "value1" ) ) ;
932+ let async_drain = AsyncBuilder :: new ( mock_drain) . build ( ) ;
933+ let slog =
934+ slog:: Logger :: root ( async_drain. fuse ( ) , o ! ( "field1" => "value1" ) ) ;
821935
822936 info ! ( slog, "Message 1" ; "field2" => "value2" ) ;
823937 warn ! ( slog, "Message 2" ; "field3" => "value3" ) ;
824- assert_eq ! ( mock_drain_rx. recv( ) . unwrap( ) , r#"INFO Message 1: [("field1", "value1"), ("field2", "value2")]"# ) ;
825- assert_eq ! ( mock_drain_rx. recv( ) . unwrap( ) , r#"WARN Message 2: [("field1", "value1"), ("field3", "value3")]"# ) ;
938+ assert_eq ! (
939+ mock_drain_rx. recv( ) . unwrap( ) ,
940+ * r#"INFO Message 1: [("field1", "value1"), ("field2", "value2")]"#
941+ ) ;
942+ assert_eq ! (
943+ mock_drain_rx. recv( ) . unwrap( ) ,
944+ * r#"WARN Message 2: [("field1", "value1"), ("field3", "value3")]"#
945+ ) ;
946+ slog. flush ( ) . unwrap ( ) ;
947+ assert_eq ! ( mock_drain_rx. recv( ) . unwrap( ) , MockMsg :: Flush ) ;
948+ }
949+
950+ #[ derive( Debug , PartialEq ) ]
951+ enum MockMsg {
952+ Log ( String ) ,
953+ Flush ,
954+ }
955+ impl PartialEq < str > for MockMsg {
956+ fn eq ( & self , other : & str ) -> bool {
957+ match self {
958+ MockMsg :: Log ( ref msg) => msg == other,
959+ _ => false ,
960+ }
961+ }
826962 }
827-
828-
829963 /// Test-helper drain
830964 #[ derive( Debug ) ]
831965 struct MockDrain {
832- tx : mpsc:: Sender < String > ,
966+ tx : mpsc:: Sender < MockMsg > ,
833967 }
834968
835969 impl MockDrain {
836- fn new ( ) -> ( Self , mpsc:: Receiver < String > ) {
970+ fn new ( ) -> ( Self , mpsc:: Receiver < MockMsg > ) {
837971 let ( tx, rx) = mpsc:: channel ( ) ;
838972 ( Self { tx } , rx)
839973 }
@@ -843,14 +977,23 @@ mod test {
843977 type Ok = ( ) ;
844978 type Err = slog:: Never ;
845979
846- fn log ( & self , record : & Record , logger_kv : & OwnedKVList ) -> Result < Self :: Ok , Self :: Err > {
980+ fn log (
981+ & self ,
982+ record : & Record ,
983+ logger_kv : & OwnedKVList ,
984+ ) -> Result < Self :: Ok , Self :: Err > {
847985 let mut serializer = MockSerializer :: default ( ) ;
848986 logger_kv. serialize ( record, & mut serializer) . unwrap ( ) ;
849987 record. kv ( ) . serialize ( record, & mut serializer) . unwrap ( ) ;
850988 let level = record. level ( ) . as_short_str ( ) ;
851989 let msg = record. msg ( ) . to_string ( ) ;
852990 let entry = format ! ( "{} {}: {:?}" , level, msg, serializer. kvs) ;
853- self . tx . send ( entry) . unwrap ( ) ;
991+ self . tx . send ( MockMsg :: Log ( entry) ) . unwrap ( ) ;
992+ Ok ( ( ) )
993+ }
994+
995+ fn flush ( & self ) -> Result < ( ) , slog:: FlushError > {
996+ self . tx . send ( MockMsg :: Flush ) . unwrap ( ) ;
854997 Ok ( ( ) )
855998 }
856999 }
@@ -861,7 +1004,11 @@ mod test {
8611004 }
8621005
8631006 impl slog:: Serializer for MockSerializer {
864- fn emit_arguments ( & mut self , key : Key , val : & fmt:: Arguments ) -> Result < ( ) , slog:: Error > {
1007+ fn emit_arguments (
1008+ & mut self ,
1009+ key : Key ,
1010+ val : & fmt:: Arguments ,
1011+ ) -> Result < ( ) , slog:: Error > {
8651012 self . kvs . push ( ( key. to_string ( ) , val. to_string ( ) ) ) ;
8661013 Ok ( ( ) )
8671014 }
0 commit comments