@@ -3546,30 +3546,6 @@ class UserTypeDoesNotExist(Exception):
35463546 pass
35473547
35483548
3549- class _ControlReconnectionHandler (_ReconnectionHandler ):
3550- """
3551- Internal
3552- """
3553-
3554- def __init__ (self , control_connection , * args , ** kwargs ):
3555- _ReconnectionHandler .__init__ (self , * args , ** kwargs )
3556- self .control_connection = weakref .proxy (control_connection )
3557-
3558- def try_reconnect (self ):
3559- return self .control_connection ._reconnect_internal ()
3560-
3561- def on_reconnection (self , connection ):
3562- self .control_connection ._set_new_connection (connection )
3563-
3564- def on_exception (self , exc , next_delay ):
3565- # TODO only overridden to add logging, so add logging
3566- if isinstance (exc , AuthenticationFailed ):
3567- return False
3568- else :
3569- log .debug ("Error trying to reconnect control connection: %r" , exc )
3570- return True
3571-
3572-
35733549def _watch_callback (obj_weakref , method_name , * args , ** kwargs ):
35743550 """
35753551 A callback handler for the ControlConnection that tolerates
@@ -3662,6 +3638,7 @@ def __init__(self, cluster, timeout,
36623638
36633639 self ._reconnection_handler = None
36643640 self ._reconnection_lock = RLock ()
3641+ self ._reconnection_pending = False
36653642
36663643 self ._event_schedule_times = {}
36673644
@@ -3695,6 +3672,8 @@ def _connect_host_in_lbp(self):
36953672 )
36963673
36973674 for host in lbp .make_query_plan ():
3675+ if self ._is_shutdown :
3676+ break
36983677 try :
36993678 return (self ._try_connect (host ), None )
37003679 except ConnectionException as exc :
@@ -3818,44 +3797,47 @@ def reconnect(self):
38183797 if self ._is_shutdown :
38193798 return
38203799
3800+ with self ._reconnection_lock :
3801+ if self ._reconnection_pending :
3802+ return
3803+ self ._reconnection_pending = True
3804+
38213805 self ._submit (self ._reconnect )
38223806
3823- def _reconnect (self ):
3807+ def _reconnect (self , schedule = None ):
38243808 log .debug ("[control connection] Attempting to reconnect" )
3809+ if self ._is_shutdown :
3810+ return
3811+
38253812 try :
38263813 self ._set_new_connection (self ._reconnect_internal ())
3814+ self ._reconnection_pending = False
3815+ return
38273816 except NoHostAvailable :
3828- # make a retry schedule (which includes backoff)
3829- schedule = self ._cluster .reconnection_policy .new_schedule ()
3817+ log .debug ("[control connection] Reconnection plan is exhausted, scheduling new reconnection attempt" )
3818+ except Exception as ex :
3819+ log .debug ("[control connection] Unexpected exception during reconnect, scheduling new reconnection attempt: %s" , ex )
38303820
3831- with self ._reconnection_lock :
3821+ if schedule is None :
3822+ schedule = self ._cluster .reconnection_policy .new_schedule ()
38323823
3833- # cancel existing reconnection attempts
3834- if self ._reconnection_handler :
3835- self ._reconnection_handler .cancel ()
3824+ try :
3825+ next_delay = next (schedule )
3826+ except StopIteration :
3827+ # the schedule has been exhausted
3828+ schedule = self ._cluster .reconnection_policy .new_schedule ()
3829+ try :
3830+ next_delay = next (schedule )
3831+ except StopIteration :
3832+ next_delay = 0
38363833
3837- # when a connection is successfully made, _set_new_connection
3838- # will be called with the new connection and then our
3839- # _reconnection_handler will be cleared out
3840- self ._reconnection_handler = _ControlReconnectionHandler (
3841- self , self ._cluster .scheduler , schedule ,
3842- self ._get_and_set_reconnection_handler ,
3843- new_handler = None )
3844- self ._reconnection_handler .start ()
3845- except Exception :
3846- log .debug ("[control connection] error reconnecting" , exc_info = True )
3847- raise
3834+ if self ._is_shutdown :
3835+ return
38483836
3849- def _get_and_set_reconnection_handler (self , new_handler ):
3850- """
3851- Called by the _ControlReconnectionHandler when a new connection
3852- is successfully created. Clears out the _reconnection_handler on
3853- this ControlConnection.
3854- """
3855- with self ._reconnection_lock :
3856- old = self ._reconnection_handler
3857- self ._reconnection_handler = new_handler
3858- return old
3837+ if next_delay == 0 :
3838+ self ._submit (self ._reconnect )
3839+ else :
3840+ self ._cluster .scheduler .schedule (next_delay , partial (self ._reconnect , schedule ))
38593841
38603842 def _submit (self , * args , ** kwargs ):
38613843 try :
@@ -3866,11 +3848,6 @@ def _submit(self, *args, **kwargs):
38663848 return None
38673849
38683850 def shutdown (self ):
3869- # stop trying to reconnect (if we are)
3870- with self ._reconnection_lock :
3871- if self ._reconnection_handler :
3872- self ._reconnection_handler .cancel ()
3873-
38743851 with self ._lock :
38753852 if self ._is_shutdown :
38763853 return
0 commit comments