Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1b0168d
perf(pool): replace hookManager RWMutex with atomic.Pointer and add p…
ndyakov Oct 25, 2025
78bcfbb
perf(pool): eliminate mutex overhead in state machine hot path
ndyakov Oct 25, 2025
374acc3
perf(pool): use predefined state slices to eliminate allocations in h…
ndyakov Oct 25, 2025
f08338e
perf(pool): optimize TryTransition to reduce atomic operations
ndyakov Oct 25, 2025
0773d52
perf(pool): add fast path for Get/Put to match master performance
ndyakov Oct 25, 2025
9481c4d
combine cas
ndyakov Oct 25, 2025
d43b973
fix linter
ndyakov Oct 25, 2025
d34f1e0
try faster approach
ndyakov Oct 25, 2025
c02fe3e
fast semaphore
ndyakov Oct 25, 2025
9ec5dae
better inlining for hot path
ndyakov Oct 25, 2025
f8feb0e
fix linter issues
ndyakov Oct 25, 2025
5ff5463
use new semaphore in auth as well
ndyakov Oct 25, 2025
0878735
linter should be happy now
ndyakov Oct 25, 2025
b2225f1
Merge branch 'ndyakov/state-machine-conn' into ndyakov/pool-performance
ndyakov Oct 25, 2025
f52ab34
add comments
ndyakov Oct 25, 2025
d64c4eb
Update internal/pool/conn_state.go
ndyakov Oct 25, 2025
8654d1a
address comment
ndyakov Oct 25, 2025
bec09a5
slight reordering
ndyakov Oct 26, 2025
55c502d
try to cache time if for non-critical calculation
ndyakov Oct 27, 2025
4a30663
fix wrong benchmark
ndyakov Oct 27, 2025
316aeb7
add concurrent test
ndyakov Oct 27, 2025
da5fe33
fix benchmark report
ndyakov Oct 27, 2025
471a828
add additional expect to check output
ndyakov Oct 27, 2025
9f3f8b7
comment and variable rename
ndyakov Oct 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 113 additions & 2 deletions hset_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package redis_test
import (
"context"
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -100,7 +101,82 @@ func benchmarkHSETOperations(b *testing.B, rdb *redis.Client, ctx context.Contex
avgTimePerOp := b.Elapsed().Nanoseconds() / int64(operations*b.N)
b.ReportMetric(float64(avgTimePerOp), "ns/op")
// report average time in milliseconds from totalTimes
avgTimePerOpMs := totalTimes[0].Milliseconds() / int64(len(totalTimes))
sumTime := time.Duration(0)
for _, t := range totalTimes {
sumTime += t
}
avgTimePerOpMs := sumTime.Milliseconds() / int64(len(totalTimes))
b.ReportMetric(float64(avgTimePerOpMs), "ms")
}

// benchmarkHSETOperationsConcurrent performs the actual HSET benchmark for a given scale
func benchmarkHSETOperationsConcurrent(b *testing.B, rdb *redis.Client, ctx context.Context, operations int) {
hashKey := fmt.Sprintf("benchmark_hash_%d", operations)

b.ResetTimer()
b.StartTimer()
totalTimes := []time.Duration{}

for i := 0; i < b.N; i++ {
b.StopTimer()
// Clean up the hash before each iteration
rdb.Del(ctx, hashKey)
b.StartTimer()

startTime := time.Now()
// Perform the specified number of HSET operations

wg := sync.WaitGroup{}
timesCh := make(chan time.Duration, operations)
errCh := make(chan error, operations)

for j := 0; j < operations; j++ {
wg.Add(1)
go func(j int) {
defer wg.Done()
field := fmt.Sprintf("field_%d", j)
value := fmt.Sprintf("value_%d", j)

err := rdb.HSet(ctx, hashKey, field, value).Err()
if err != nil {
errCh <- err
return
}
timesCh <- time.Since(startTime)
}(j)
}

wg.Wait()
close(timesCh)
close(errCh)

// Check for errors
for err := range errCh {
b.Errorf("HSET operation failed: %v", err)
}

for d := range timesCh {
totalTimes = append(totalTimes, d)
}
}

// Stop the timer to calculate metrics
b.StopTimer()

// Report operations per second
opsPerSec := float64(operations*b.N) / b.Elapsed().Seconds()
b.ReportMetric(opsPerSec, "ops/sec")

// Report average time per operation
avgTimePerOp := b.Elapsed().Nanoseconds() / int64(operations*b.N)
b.ReportMetric(float64(avgTimePerOp), "ns/op")
// report average time in milliseconds from totalTimes

sumTime := time.Duration(0)
for _, t := range totalTimes {
sumTime += t
}
avgTimePerOpMs := sumTime.Milliseconds() / int64(len(totalTimes))
b.ReportMetric(float64(avgTimePerOpMs), "ms")
}

Expand Down Expand Up @@ -134,6 +210,37 @@ func BenchmarkHSETPipelined(b *testing.B) {
}
}

func BenchmarkHSET_Concurrent(b *testing.B) {
ctx := context.Background()

// Setup Redis client
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
DB: 0,
PoolSize: 100,
})
defer rdb.Close()

// Test connection
if err := rdb.Ping(ctx).Err(); err != nil {
b.Skipf("Redis server not available: %v", err)
}

// Clean up before and after tests
defer func() {
rdb.FlushDB(ctx)
}()

// Reduced scales to avoid overwhelming the system with too many concurrent goroutines
scales := []int{1, 10, 100, 1000}

for _, scale := range scales {
b.Run(fmt.Sprintf("HSET_%d_operations_concurrent", scale), func(b *testing.B) {
benchmarkHSETOperationsConcurrent(b, rdb, ctx, scale)
})
}
}

// benchmarkHSETPipelined performs HSET benchmark using pipelining
func benchmarkHSETPipelined(b *testing.B, rdb *redis.Client, ctx context.Context, operations int) {
hashKey := fmt.Sprintf("benchmark_hash_pipelined_%d", operations)
Expand Down Expand Up @@ -177,7 +284,11 @@ func benchmarkHSETPipelined(b *testing.B, rdb *redis.Client, ctx context.Context
avgTimePerOp := b.Elapsed().Nanoseconds() / int64(operations*b.N)
b.ReportMetric(float64(avgTimePerOp), "ns/op")
// report average time in milliseconds from totalTimes
avgTimePerOpMs := totalTimes[0].Milliseconds() / int64(len(totalTimes))
sumTime := time.Duration(0)
for _, t := range totalTimes {
sumTime += t
}
avgTimePerOpMs := sumTime.Milliseconds() / int64(len(totalTimes))
b.ReportMetric(float64(avgTimePerOpMs), "ms")
}

Expand Down
19 changes: 7 additions & 12 deletions internal/auth/streaming/pool_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ type ReAuthPoolHook struct {
shouldReAuth map[uint64]func(error)
shouldReAuthLock sync.RWMutex

// workers is a semaphore channel limiting concurrent re-auth operations
// workers is a semaphore limiting concurrent re-auth operations
// Initialized with poolSize tokens to prevent pool exhaustion
workers chan struct{}
// Uses FastSemaphore for consistency and better performance
workers *internal.FastSemaphore

// reAuthTimeout is the maximum time to wait for acquiring a connection for re-auth
reAuthTimeout time.Duration
Expand All @@ -59,16 +60,10 @@ type ReAuthPoolHook struct {
// The poolSize parameter is used to initialize the worker semaphore, ensuring that
// re-auth operations don't exhaust the connection pool.
func NewReAuthPoolHook(poolSize int, reAuthTimeout time.Duration) *ReAuthPoolHook {
workers := make(chan struct{}, poolSize)
// Initialize the workers channel with tokens (semaphore pattern)
for i := 0; i < poolSize; i++ {
workers <- struct{}{}
}

return &ReAuthPoolHook{
shouldReAuth: make(map[uint64]func(error)),
scheduledReAuth: make(map[uint64]bool),
workers: workers,
workers: internal.NewFastSemaphore(int32(poolSize)),
reAuthTimeout: reAuthTimeout,
}
}
Expand Down Expand Up @@ -162,10 +157,10 @@ func (r *ReAuthPoolHook) OnPut(_ context.Context, conn *pool.Conn) (bool, bool,
r.scheduledLock.Unlock()
r.shouldReAuthLock.Unlock()
go func() {
<-r.workers
r.workers.AcquireBlocking()
// safety first
if conn == nil || (conn != nil && conn.IsClosed()) {
r.workers <- struct{}{}
r.workers.Release()
return
}
defer func() {
Expand All @@ -176,7 +171,7 @@ func (r *ReAuthPoolHook) OnPut(_ context.Context, conn *pool.Conn) (bool, bool,
r.scheduledLock.Lock()
delete(r.scheduledReAuth, connID)
r.scheduledLock.Unlock()
r.workers <- struct{}{}
r.workers.Release()
}()

// Create timeout context for connection acquisition
Expand Down
88 changes: 82 additions & 6 deletions internal/pool/conn.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Package pool implements the pool management
package pool

import (
Expand All @@ -17,6 +18,35 @@ import (

var noDeadline = time.Time{}

// Global time cache updated every 50ms by background goroutine.
// This avoids expensive time.Now() syscalls in hot paths like getEffectiveReadTimeout.
// Max staleness: 50ms, which is acceptable for timeout deadline checks (timeouts are typically 3-30 seconds).
var globalTimeCache struct {
nowNs atomic.Int64
}

func init() {
// Initialize immediately
globalTimeCache.nowNs.Store(time.Now().UnixNano())

// Start background updater
go func() {
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()

for range ticker.C {
globalTimeCache.nowNs.Store(time.Now().UnixNano())
}
}()
}

// getCachedTimeNs returns the current time in nanoseconds from the global cache.
// This is updated every 50ms by a background goroutine, avoiding expensive syscalls.
// Max staleness: 50ms.
func getCachedTimeNs() int64 {
return globalTimeCache.nowNs.Load()
}

// Global atomic counter for connection IDs
var connIDCounter uint64

Expand Down Expand Up @@ -79,6 +109,7 @@ type Conn struct {
expiresAt time.Time

// maintenanceNotifications upgrade support: relaxed timeouts during migrations/failovers

// Using atomic operations for lock-free access to avoid mutex contention
relaxedReadTimeoutNs atomic.Int64 // time.Duration as nanoseconds
relaxedWriteTimeoutNs atomic.Int64 // time.Duration as nanoseconds
Expand Down Expand Up @@ -260,11 +291,13 @@ func (cn *Conn) CompareAndSwapUsed(old, new bool) bool {

if !old && new {
// Acquiring: IDLE → IN_USE
_, err := cn.stateMachine.TryTransition([]ConnState{StateIdle}, StateInUse)
// Use predefined slice to avoid allocation
_, err := cn.stateMachine.TryTransition(validFromCreatedOrIdle, StateInUse)
return err == nil
} else {
// Releasing: IN_USE → IDLE
_, err := cn.stateMachine.TryTransition([]ConnState{StateInUse}, StateIdle)
// Use predefined slice to avoid allocation
_, err := cn.stateMachine.TryTransition(validFromInUse, StateIdle)
return err == nil
}
}
Expand Down Expand Up @@ -454,7 +487,8 @@ func (cn *Conn) getEffectiveReadTimeout(normalTimeout time.Duration) time.Durati
return time.Duration(readTimeoutNs)
}

nowNs := time.Now().UnixNano()
// Use cached time to avoid expensive syscall (max 50ms staleness is acceptable for timeout checks)
nowNs := getCachedTimeNs()
// Check if deadline has passed
if nowNs < deadlineNs {
// Deadline is in the future, use relaxed timeout
Expand Down Expand Up @@ -487,7 +521,8 @@ func (cn *Conn) getEffectiveWriteTimeout(normalTimeout time.Duration) time.Durat
return time.Duration(writeTimeoutNs)
}

nowNs := time.Now().UnixNano()
// Use cached time to avoid expensive syscall (max 50ms staleness is acceptable for timeout checks)
nowNs := getCachedTimeNs()
// Check if deadline has passed
if nowNs < deadlineNs {
// Deadline is in the future, use relaxed timeout
Expand Down Expand Up @@ -632,7 +667,8 @@ func (cn *Conn) MarkQueuedForHandoff() error {
// The connection is typically in IN_USE state when OnPut is called (normal Put flow)
// But in some edge cases or tests, it might be in IDLE or CREATED state
// The pool will detect this state change and preserve it (not overwrite with IDLE)
finalState, err := cn.stateMachine.TryTransition([]ConnState{StateInUse, StateIdle, StateCreated}, StateUnusable)
// Use predefined slice to avoid allocation
finalState, err := cn.stateMachine.TryTransition(validFromCreatedInUseOrIdle, StateUnusable)
if err != nil {
// Check if already in UNUSABLE state (race condition or retry)
// ShouldHandoff should be false now, but check just in case
Expand All @@ -658,6 +694,42 @@ func (cn *Conn) GetStateMachine() *ConnStateMachine {
return cn.stateMachine
}

// TryAcquire attempts to acquire the connection for use.
// This is an optimized inline method for the hot path (Get operation).
//
// It tries to transition from IDLE -> IN_USE or CREATED -> IN_USE.
// Returns true if the connection was successfully acquired, false otherwise.
//
// Performance: This is faster than calling GetStateMachine() + TryTransitionFast()
//
// NOTE: We directly access cn.stateMachine.state here instead of using the state machine's
// methods. This breaks encapsulation but is necessary for performance.
// The IDLE->IN_USE and CREATED->IN_USE transitions don't need
// waiter notification, and benchmarks show 1-3% improvement. If the state machine ever
// needs to notify waiters on these transitions, update this to use TryTransitionFast().
func (cn *Conn) TryAcquire() bool {
// The || operator short-circuits, so only 1 CAS in the common case
return cn.stateMachine.state.CompareAndSwap(uint32(StateIdle), uint32(StateInUse)) ||
cn.stateMachine.state.CompareAndSwap(uint32(StateCreated), uint32(StateInUse))
}

// Release releases the connection back to the pool.
// This is an optimized inline method for the hot path (Put operation).
//
// It tries to transition from IN_USE -> IDLE.
// Returns true if the connection was successfully released, false otherwise.
//
// Performance: This is faster than calling GetStateMachine() + TryTransitionFast().
//
// NOTE: We directly access cn.stateMachine.state here instead of using the state machine's
// methods. This breaks encapsulation but is necessary for performance.
// If the state machine ever needs to notify waiters
// on this transition, update this to use TryTransitionFast().
func (cn *Conn) Release() bool {
// Inline the hot path - single CAS operation
return cn.stateMachine.state.CompareAndSwap(uint32(StateInUse), uint32(StateIdle))
}

// ClearHandoffState clears the handoff state after successful handoff.
// Makes the connection usable again.
func (cn *Conn) ClearHandoffState() {
Expand Down Expand Up @@ -800,8 +872,12 @@ func (cn *Conn) MaybeHasData() bool {
return false
}

// deadline computes the effective deadline time based on context and timeout.
// It updates the usedAt timestamp to now.
// Uses cached time to avoid expensive syscall (max 50ms staleness is acceptable for deadline calculation).
func (cn *Conn) deadline(ctx context.Context, timeout time.Duration) time.Time {
tm := time.Now()
// Use cached time for deadline calculation (called 2x per command: read + write)
tm := time.Unix(0, getCachedTimeNs())
cn.SetUsedAt(tm)

if timeout > 0 {
Expand Down
Loading
Loading