Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ type Options struct {
// DialerRetryTimeout is the backoff duration between retry attempts.
// Default: 100ms
DialerRetryTimeout time.Duration

// Optional logger for connection pool operations.
Logger *internal.Logging
}

type lastDialErrorWrap struct {
Expand Down Expand Up @@ -218,7 +221,7 @@ func (p *ConnPool) checkMinIdleConns() {
p.idleConnsLen.Add(-1)

p.freeTurn()
internal.Logger.Printf(context.Background(), "addIdleConn panic: %+v", err)
p.logf(context.Background(), "addIdleConn panic: %+v", err)
}
}()

Expand Down Expand Up @@ -373,7 +376,7 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
return cn, nil
}

internal.Logger.Printf(ctx, "redis: connection pool: failed to dial after %d attempts: %v", maxRetries, lastErr)
p.logf(ctx, "redis: connection pool: failed to dial after %d attempts: %v", maxRetries, lastErr)
// All retries failed - handle error tracking
p.setLastDialError(lastErr)
if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.cfg.PoolSize) {
Expand Down Expand Up @@ -446,7 +449,7 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {

for {
if attempts >= getAttempts {
internal.Logger.Printf(ctx, "redis: connection pool: was not able to get a healthy connection after %d attempts", attempts)
p.logf(ctx, "redis: connection pool: was not able to get a healthy connection after %d attempts", attempts)
break
}
attempts++
Expand All @@ -473,12 +476,12 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
if hookManager != nil {
acceptConn, err := hookManager.ProcessOnGet(ctx, cn, false)
if err != nil {
internal.Logger.Printf(ctx, "redis: connection pool: failed to process idle connection by hook: %v", err)
p.logf(ctx, "redis: connection pool: failed to process idle connection by hook: %v", err)
_ = p.CloseConn(cn)
continue
}
if !acceptConn {
internal.Logger.Printf(ctx, "redis: connection pool: conn[%d] rejected by hook, returning to pool", cn.GetID())
p.logf(ctx, "redis: connection pool: conn[%d] rejected by hook, returning to pool", cn.GetID())
p.Put(ctx, cn)
cn = nil
continue
Expand All @@ -504,7 +507,7 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
// this should not happen with a new connection, but we handle it gracefully
if err != nil || !acceptConn {
// Failed to process connection, discard it
internal.Logger.Printf(ctx, "redis: connection pool: failed to process new connection conn[%d] by hook: accept=%v, err=%v", newcn.GetID(), acceptConn, err)
p.logf(ctx, "redis: connection pool: failed to process new connection conn[%d] by hook: accept=%v, err=%v", newcn.GetID(), acceptConn, err)
_ = p.CloseConn(newcn)
return nil, err
}
Expand Down Expand Up @@ -605,7 +608,7 @@ func (p *ConnPool) popIdle() (*Conn, error) {

// If we exhausted all attempts without finding a usable connection, return nil
if attempts > 1 && attempts >= maxAttempts && int32(attempts) >= p.poolSize.Load() {
internal.Logger.Printf(context.Background(), "redis: connection pool: failed to get a usable connection after %d attempts", attempts)
p.logf(context.Background(), "redis: connection pool: failed to get a usable connection after %d attempts", attempts)
return nil, nil
}

Expand All @@ -622,7 +625,7 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
// Peek at the reply type to check if it's a push notification
if replyType, err := cn.PeekReplyTypeSafe(); err != nil || replyType != proto.RespPush {
// Not a push notification or error peeking, remove connection
internal.Logger.Printf(ctx, "Conn has unread data (not push notification), removing it")
p.logf(ctx, "Conn has unread data (not push notification), removing it")
p.Remove(ctx, cn, err)
}
// It's a push notification, allow pooling (client will handle it)
Expand All @@ -635,7 +638,7 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
if hookManager != nil {
shouldPool, shouldRemove, err = hookManager.ProcessOnPut(ctx, cn)
if err != nil {
internal.Logger.Printf(ctx, "Connection hook error: %v", err)
p.logf(ctx, "Connection hook error: %v", err)
p.Remove(ctx, cn, err)
return
}
Expand Down Expand Up @@ -737,7 +740,7 @@ func (p *ConnPool) removeConn(cn *Conn) {
// this can be idle conn
for idx, ic := range p.idleConns {
if ic.GetID() == cid {
internal.Logger.Printf(context.Background(), "redis: connection pool: removing idle conn[%d]", cid)
p.logf(context.Background(), "redis: connection pool: removing idle conn[%d]", cid)
p.idleConns = append(p.idleConns[:idx], p.idleConns[idx+1:]...)
p.idleConnsLen.Add(-1)
break
Expand Down Expand Up @@ -853,7 +856,7 @@ func (p *ConnPool) isHealthyConn(cn *Conn, now time.Time) bool {
if replyType, err := cn.rd.PeekReplyType(); err == nil && replyType == proto.RespPush {
// For RESP3 connections with push notifications, we allow some buffered data
// The client will process these notifications before using the connection
internal.Logger.Printf(context.Background(), "push: conn[%d] has buffered data, likely push notifications - will be processed by client", cn.GetID())
p.logf(context.Background(), "push: conn[%d] has buffered data, likely push notifications - will be processed by client", cn.GetID())
return true // Connection is healthy, client will handle notifications
}
return false // Unexpected data, not push notifications, connection is unhealthy
Expand All @@ -863,3 +866,11 @@ func (p *ConnPool) isHealthyConn(cn *Conn, now time.Time) bool {
}
return true
}

func (p *ConnPool) logf(ctx context.Context, format string, args ...any) {
logger := internal.Logger
if p.cfg.Logger != nil {
logger = *p.cfg.Logger
}
logger.Printf(ctx, format, args...)
}
26 changes: 21 additions & 5 deletions maintnotifications/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (cb *CircuitBreaker) Execute(fn func() error) error {
cb.requests.Store(0)
cb.successes.Store(0)
if internal.LogLevel.InfoOrAbove() {
internal.Logger.Printf(context.Background(), logs.CircuitBreakerTransitioningToHalfOpen(cb.endpoint))
cb.logf(context.Background(), logs.CircuitBreakerTransitioningToHalfOpen(cb.endpoint))
}
// Fall through to half-open logic
} else {
Expand Down Expand Up @@ -145,15 +145,15 @@ func (cb *CircuitBreaker) recordFailure() {
if failures >= int64(cb.failureThreshold) {
if cb.state.CompareAndSwap(int32(CircuitBreakerClosed), int32(CircuitBreakerOpen)) {
if internal.LogLevel.WarnOrAbove() {
internal.Logger.Printf(context.Background(), logs.CircuitBreakerOpened(cb.endpoint, failures))
cb.logf(context.Background(), logs.CircuitBreakerOpened(cb.endpoint, failures))
}
}
}
case CircuitBreakerHalfOpen:
// Any failure in half-open state immediately opens the circuit
if cb.state.CompareAndSwap(int32(CircuitBreakerHalfOpen), int32(CircuitBreakerOpen)) {
if internal.LogLevel.WarnOrAbove() {
internal.Logger.Printf(context.Background(), logs.CircuitBreakerReopened(cb.endpoint))
cb.logf(context.Background(), logs.CircuitBreakerReopened(cb.endpoint))
}
}
}
Expand All @@ -177,7 +177,7 @@ func (cb *CircuitBreaker) recordSuccess() {
if cb.state.CompareAndSwap(int32(CircuitBreakerHalfOpen), int32(CircuitBreakerClosed)) {
cb.failures.Store(0)
if internal.LogLevel.InfoOrAbove() {
internal.Logger.Printf(context.Background(), logs.CircuitBreakerClosed(cb.endpoint, successes))
cb.logf(context.Background(), logs.CircuitBreakerClosed(cb.endpoint, successes))
}
}
}
Expand All @@ -202,6 +202,14 @@ func (cb *CircuitBreaker) GetStats() CircuitBreakerStats {
}
}

func (cb *CircuitBreaker) logf(ctx context.Context, format string, args ...interface{}) {
logger := internal.Logger
if cb.config != nil && cb.config.Logger != nil {
logger = *cb.config.Logger
}
logger.Printf(ctx, format, args...)
}

// CircuitBreakerStats provides statistics about a circuit breaker
type CircuitBreakerStats struct {
Endpoint string
Expand Down Expand Up @@ -326,7 +334,7 @@ func (cbm *CircuitBreakerManager) cleanup() {

// Log cleanup results
if len(toDelete) > 0 && internal.LogLevel.InfoOrAbove() {
internal.Logger.Printf(context.Background(), logs.CircuitBreakerCleanup(len(toDelete), count))
cbm.logf(context.Background(), logs.CircuitBreakerCleanup(len(toDelete), count))
}

cbm.lastCleanup.Store(now.Unix())
Expand All @@ -351,3 +359,11 @@ func (cbm *CircuitBreakerManager) Reset() {
return true
})
}

func (cbm *CircuitBreakerManager) logf(ctx context.Context, format string, args ...interface{}) {
logger := internal.Logger
if cbm.config != nil && cbm.config.Logger != nil {
logger = *cbm.config.Logger
}
logger.Printf(ctx, format, args...)
}
5 changes: 5 additions & 0 deletions maintnotifications/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ type Config struct {
// After this many retries, the connection will be removed from the pool.
// Default: 3
MaxHandoffRetries int

// Logger is an optional custom logger for maintenance notifications.
Logger *internal.Logging
}

func (c *Config) IsEnabled() bool {
Expand Down Expand Up @@ -341,6 +344,8 @@ func (c *Config) Clone() *Config {

// Configuration fields
MaxHandoffRetries: c.MaxHandoffRetries,

Logger: c.Logger,
}
}

Expand Down
44 changes: 26 additions & 18 deletions maintnotifications/handoff_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (hwm *handoffWorkerManager) onDemandWorker() {
defer func() {
// Handle panics to ensure proper cleanup
if r := recover(); r != nil {
internal.Logger.Printf(context.Background(), logs.WorkerPanicRecovered(r))
hwm.logf(context.Background(), logs.WorkerPanicRecovered(r))
}

// Decrement active worker count when exiting
Expand All @@ -146,21 +146,21 @@ func (hwm *handoffWorkerManager) onDemandWorker() {
select {
case <-hwm.shutdown:
if internal.LogLevel.InfoOrAbove() {
internal.Logger.Printf(context.Background(), logs.WorkerExitingDueToShutdown())
hwm.logf(context.Background(), logs.WorkerExitingDueToShutdown())
}
return
case <-timer.C:
// Worker has been idle for too long, exit to save resources
if internal.LogLevel.InfoOrAbove() {
internal.Logger.Printf(context.Background(), logs.WorkerExitingDueToInactivityTimeout(hwm.workerTimeout))
hwm.logf(context.Background(), logs.WorkerExitingDueToInactivityTimeout(hwm.workerTimeout))
}
return
case request := <-hwm.handoffQueue:
// Check for shutdown before processing
select {
case <-hwm.shutdown:
if internal.LogLevel.InfoOrAbove() {
internal.Logger.Printf(context.Background(), logs.WorkerExitingDueToShutdownWhileProcessing())
hwm.logf(context.Background(), logs.WorkerExitingDueToShutdownWhileProcessing())
}
// Clean up the request before exiting
hwm.pending.Delete(request.ConnID)
Expand All @@ -178,7 +178,7 @@ func (hwm *handoffWorkerManager) processHandoffRequest(request HandoffRequest) {
// Remove from pending map
defer hwm.pending.Delete(request.Conn.GetID())
if internal.LogLevel.InfoOrAbove() {
internal.Logger.Printf(context.Background(), logs.HandoffStarted(request.Conn.GetID(), request.Endpoint))
hwm.logf(context.Background(), logs.HandoffStarted(request.Conn.GetID(), request.Endpoint))
}

// Create a context with handoff timeout from config
Expand Down Expand Up @@ -226,12 +226,12 @@ func (hwm *handoffWorkerManager) processHandoffRequest(request HandoffRequest) {
if hwm.config != nil {
maxRetries = hwm.config.MaxHandoffRetries
}
internal.Logger.Printf(context.Background(), logs.HandoffFailed(request.ConnID, request.Endpoint, currentRetries, maxRetries, err))
hwm.logf(context.Background(), logs.HandoffFailed(request.ConnID, request.Endpoint, currentRetries, maxRetries, err))
}
time.AfterFunc(afterTime, func() {
if err := hwm.queueHandoff(request.Conn); err != nil {
if internal.LogLevel.WarnOrAbove() {
internal.Logger.Printf(context.Background(), logs.CannotQueueHandoffForRetry(err))
hwm.logf(context.Background(), logs.CannotQueueHandoffForRetry(err))
}
hwm.closeConnFromRequest(context.Background(), request, err)
}
Expand Down Expand Up @@ -259,7 +259,7 @@ func (hwm *handoffWorkerManager) queueHandoff(conn *pool.Conn) error {
// if shouldHandoff is false and retries is 0, then we are not retrying and not do a handoff
if !shouldHandoff && conn.HandoffRetries() == 0 {
if internal.LogLevel.InfoOrAbove() {
internal.Logger.Printf(context.Background(), logs.ConnectionNotMarkedForHandoff(conn.GetID()))
hwm.logf(context.Background(), logs.ConnectionNotMarkedForHandoff(conn.GetID()))
}
return errors.New(logs.ConnectionNotMarkedForHandoffError(conn.GetID()))
}
Expand Down Expand Up @@ -302,7 +302,7 @@ func (hwm *handoffWorkerManager) queueHandoff(conn *pool.Conn) error {
queueLen := len(hwm.handoffQueue)
queueCap := cap(hwm.handoffQueue)
if internal.LogLevel.WarnOrAbove() {
internal.Logger.Printf(context.Background(), logs.HandoffQueueFull(queueLen, queueCap))
hwm.logf(context.Background(), logs.HandoffQueueFull(queueLen, queueCap))
}
}
}
Expand Down Expand Up @@ -356,7 +356,7 @@ func (hwm *handoffWorkerManager) performConnectionHandoff(ctx context.Context, c

// Check if circuit breaker is open before attempting handoff
if circuitBreaker.IsOpen() {
internal.Logger.Printf(ctx, logs.CircuitBreakerOpen(connID, newEndpoint))
hwm.logf(ctx, logs.CircuitBreakerOpen(connID, newEndpoint))
return false, ErrCircuitBreakerOpen // Don't retry when circuit breaker is open
}

Expand Down Expand Up @@ -385,15 +385,15 @@ func (hwm *handoffWorkerManager) performHandoffInternal(
connID uint64,
) (shouldRetry bool, err error) {
retries := conn.IncrementAndGetHandoffRetries(1)
internal.Logger.Printf(ctx, logs.HandoffRetryAttempt(connID, retries, newEndpoint, conn.RemoteAddr().String()))
hwm.logf(ctx, logs.HandoffRetryAttempt(connID, retries, newEndpoint, conn.RemoteAddr().String()))
maxRetries := 3 // Default fallback
if hwm.config != nil {
maxRetries = hwm.config.MaxHandoffRetries
}

if retries > maxRetries {
if internal.LogLevel.WarnOrAbove() {
internal.Logger.Printf(ctx, logs.ReachedMaxHandoffRetries(connID, newEndpoint, maxRetries))
hwm.logf(ctx, logs.ReachedMaxHandoffRetries(connID, newEndpoint, maxRetries))
}
// won't retry on ErrMaxHandoffRetriesReached
return false, ErrMaxHandoffRetriesReached
Expand All @@ -405,7 +405,7 @@ func (hwm *handoffWorkerManager) performHandoffInternal(
// Create new connection to the new endpoint
newNetConn, err := endpointDialer(ctx)
if err != nil {
internal.Logger.Printf(ctx, logs.FailedToDialNewEndpoint(connID, newEndpoint, err))
hwm.logf(ctx, logs.FailedToDialNewEndpoint(connID, newEndpoint, err))
// will retry
// Maybe a network error - retry after a delay
return true, err
Expand All @@ -425,7 +425,7 @@ func (hwm *handoffWorkerManager) performHandoffInternal(
conn.SetRelaxedTimeoutWithDeadline(relaxedTimeout, relaxedTimeout, deadline)

if internal.LogLevel.InfoOrAbove() {
internal.Logger.Printf(context.Background(), logs.ApplyingRelaxedTimeoutDueToPostHandoff(connID, relaxedTimeout, deadline.Format("15:04:05.000")))
hwm.logf(context.Background(), logs.ApplyingRelaxedTimeoutDueToPostHandoff(connID, relaxedTimeout, deadline.Format("15:04:05.000")))
}
}

Expand All @@ -447,7 +447,7 @@ func (hwm *handoffWorkerManager) performHandoffInternal(
// - clear the handoff state (shouldHandoff, endpoint, seqID)
// - reset the handoff retries to 0
conn.ClearHandoffState()
internal.Logger.Printf(ctx, logs.HandoffSucceeded(connID, newEndpoint))
hwm.logf(ctx, logs.HandoffSucceeded(connID, newEndpoint))

// successfully completed the handoff, no retry needed and no error
return false, nil
Expand Down Expand Up @@ -478,15 +478,23 @@ func (hwm *handoffWorkerManager) closeConnFromRequest(ctx context.Context, reque
if pooler != nil {
pooler.Remove(ctx, conn, err)
if internal.LogLevel.WarnOrAbove() {
internal.Logger.Printf(ctx, logs.RemovingConnectionFromPool(conn.GetID(), err))
hwm.logf(ctx, logs.RemovingConnectionFromPool(conn.GetID(), err))
}
} else {
err := conn.Close() // Close the connection if no pool provided
if err != nil {
internal.Logger.Printf(ctx, "redis: failed to close connection: %v", err)
hwm.logf(ctx, "redis: failed to close connection: %v", err)
}
if internal.LogLevel.WarnOrAbove() {
internal.Logger.Printf(ctx, logs.NoPoolProvidedCannotRemove(conn.GetID(), err))
hwm.logf(ctx, logs.NoPoolProvidedCannotRemove(conn.GetID(), err))
}
}
}

func (hwm *handoffWorkerManager) logf(ctx context.Context, format string, args ...interface{}) {
logger := internal.Logger
if hwm.config != nil && hwm.config.Logger != nil {
logger = *hwm.config.Logger
}
logger.Printf(ctx, format, args...)
}
Loading
Loading