From dadd575e4eb39f9d8e6d22215a397740303e6344 Mon Sep 17 00:00:00 2001 From: Alden Hsu Date: Mon, 3 Nov 2025 19:35:40 +0000 Subject: [PATCH 1/5] add streamServerMu to guard streamServer creation/deletion --- robot/web/web.go | 21 +++++++++++---------- robot/web/web_c.go | 13 ++++++++++++- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/robot/web/web.go b/robot/web/web.go index 1b366856748..affe565c722 100644 --- a/robot/web/web.go +++ b/robot/web/web.go @@ -108,16 +108,17 @@ type webService struct { tcpModServer rpc.Server // Will be nil on non-cgo builds. - streamServer *webstream.Server - opts options - addr string - modAddrs config.ParentSockAddrs - logger logging.Logger - cancelCtx context.Context - cancelFunc func() - isRunning bool - webWorkers sync.WaitGroup - modWorkers sync.WaitGroup + streamServerMu sync.Mutex + streamServer *webstream.Server + opts options + addr string + modAddrs config.ParentSockAddrs + logger logging.Logger + cancelCtx context.Context + cancelFunc func() + isRunning bool + webWorkers sync.WaitGroup + modWorkers sync.WaitGroup requestCounter RequestCounter modPeerConnTracker *grpc.ModPeerConnTracker diff --git a/robot/web/web_c.go b/robot/web/web_c.go index 5d0ed193ca7..4d00802760f 100644 --- a/robot/web/web_c.go +++ b/robot/web/web_c.go @@ -23,10 +23,19 @@ func (svc *webService) Reconfigure(ctx context.Context, _ resource.Dependencies, if !svc.isRunning { return nil } - return svc.streamServer.AddNewStreams(svc.cancelCtx) + + svc.streamServerMu.Lock() + defer svc.streamServerMu.Unlock() + // May be nil if this is being run after closeStreamServer. + if svc.streamServer != nil { + return svc.streamServer.AddNewStreams(svc.cancelCtx) + } + return nil } func (svc *webService) closeStreamServer() { + svc.streamServerMu.Lock() + defer svc.streamServerMu.Unlock() if err := svc.streamServer.Close(); err != nil { svc.logger.Errorw("error closing stream server", "error", err) } @@ -41,6 +50,8 @@ func (svc *webService) closeStreamServer() { func (svc *webService) initStreamServer(ctx context.Context, srv rpc.Server) error { // The webService depends on the stream server in addition to modules. We relax expectations on // what will be started first and allow for any order. + svc.streamServerMu.Lock() + defer svc.streamServerMu.Unlock() if svc.streamServer == nil { var streamConfig gostream.StreamConfig if svc.opts.streamConfig != nil { From c67f0a24daaffadd0178704fc6ec782149f3685b Mon Sep 17 00:00:00 2001 From: Alden Hsu Date: Thu, 6 Nov 2025 15:15:55 +0000 Subject: [PATCH 2/5] rename streamServerMu to streamServerInitCloseMu; add comment explaining usage --- robot/web/web.go | 24 +++++++++++++----------- robot/web/web_c.go | 12 ++++++------ 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/robot/web/web.go b/robot/web/web.go index affe565c722..e9e8bdda0a5 100644 --- a/robot/web/web.go +++ b/robot/web/web.go @@ -108,17 +108,19 @@ type webService struct { tcpModServer rpc.Server // Will be nil on non-cgo builds. - streamServerMu sync.Mutex - streamServer *webstream.Server - opts options - addr string - modAddrs config.ParentSockAddrs - logger logging.Logger - cancelCtx context.Context - cancelFunc func() - isRunning bool - webWorkers sync.WaitGroup - modWorkers sync.WaitGroup + streamServer *webstream.Server + // streamServerInitCloseMu synchronizes concurrent access to streamServer, particularly instance management: (re)creating & destroying. + // it differs from streamServer.mu which is used to guard internal operations for a single instance. + streamServerInitCloseMu sync.Mutex + opts options + addr string + modAddrs config.ParentSockAddrs + logger logging.Logger + cancelCtx context.Context + cancelFunc func() + isRunning bool + webWorkers sync.WaitGroup + modWorkers sync.WaitGroup requestCounter RequestCounter modPeerConnTracker *grpc.ModPeerConnTracker diff --git a/robot/web/web_c.go b/robot/web/web_c.go index 4d00802760f..72edf0b6584 100644 --- a/robot/web/web_c.go +++ b/robot/web/web_c.go @@ -24,8 +24,8 @@ func (svc *webService) Reconfigure(ctx context.Context, _ resource.Dependencies, return nil } - svc.streamServerMu.Lock() - defer svc.streamServerMu.Unlock() + svc.streamServerInitCloseMu.Lock() + defer svc.streamServerInitCloseMu.Unlock() // May be nil if this is being run after closeStreamServer. if svc.streamServer != nil { return svc.streamServer.AddNewStreams(svc.cancelCtx) @@ -34,8 +34,8 @@ func (svc *webService) Reconfigure(ctx context.Context, _ resource.Dependencies, } func (svc *webService) closeStreamServer() { - svc.streamServerMu.Lock() - defer svc.streamServerMu.Unlock() + svc.streamServerInitCloseMu.Lock() + defer svc.streamServerInitCloseMu.Unlock() if err := svc.streamServer.Close(); err != nil { svc.logger.Errorw("error closing stream server", "error", err) } @@ -50,8 +50,8 @@ func (svc *webService) closeStreamServer() { func (svc *webService) initStreamServer(ctx context.Context, srv rpc.Server) error { // The webService depends on the stream server in addition to modules. We relax expectations on // what will be started first and allow for any order. - svc.streamServerMu.Lock() - defer svc.streamServerMu.Unlock() + svc.streamServerInitCloseMu.Lock() + defer svc.streamServerInitCloseMu.Unlock() if svc.streamServer == nil { var streamConfig gostream.StreamConfig if svc.opts.streamConfig != nil { From f6e6fe019adfb532793b68c4434d0b79532f821f Mon Sep 17 00:00:00 2001 From: Alden Hsu Date: Fri, 14 Nov 2025 10:30:53 -0500 Subject: [PATCH 3/5] Revert "rename streamServerMu to streamServerInitCloseMu; add comment explaining usage" This reverts commit c67f0a24daaffadd0178704fc6ec782149f3685b. --- robot/web/web.go | 24 +++++++++++------------- robot/web/web_c.go | 12 ++++++------ 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/robot/web/web.go b/robot/web/web.go index e9e8bdda0a5..affe565c722 100644 --- a/robot/web/web.go +++ b/robot/web/web.go @@ -108,19 +108,17 @@ type webService struct { tcpModServer rpc.Server // Will be nil on non-cgo builds. - streamServer *webstream.Server - // streamServerInitCloseMu synchronizes concurrent access to streamServer, particularly instance management: (re)creating & destroying. - // it differs from streamServer.mu which is used to guard internal operations for a single instance. - streamServerInitCloseMu sync.Mutex - opts options - addr string - modAddrs config.ParentSockAddrs - logger logging.Logger - cancelCtx context.Context - cancelFunc func() - isRunning bool - webWorkers sync.WaitGroup - modWorkers sync.WaitGroup + streamServerMu sync.Mutex + streamServer *webstream.Server + opts options + addr string + modAddrs config.ParentSockAddrs + logger logging.Logger + cancelCtx context.Context + cancelFunc func() + isRunning bool + webWorkers sync.WaitGroup + modWorkers sync.WaitGroup requestCounter RequestCounter modPeerConnTracker *grpc.ModPeerConnTracker diff --git a/robot/web/web_c.go b/robot/web/web_c.go index 72edf0b6584..4d00802760f 100644 --- a/robot/web/web_c.go +++ b/robot/web/web_c.go @@ -24,8 +24,8 @@ func (svc *webService) Reconfigure(ctx context.Context, _ resource.Dependencies, return nil } - svc.streamServerInitCloseMu.Lock() - defer svc.streamServerInitCloseMu.Unlock() + svc.streamServerMu.Lock() + defer svc.streamServerMu.Unlock() // May be nil if this is being run after closeStreamServer. if svc.streamServer != nil { return svc.streamServer.AddNewStreams(svc.cancelCtx) @@ -34,8 +34,8 @@ func (svc *webService) Reconfigure(ctx context.Context, _ resource.Dependencies, } func (svc *webService) closeStreamServer() { - svc.streamServerInitCloseMu.Lock() - defer svc.streamServerInitCloseMu.Unlock() + svc.streamServerMu.Lock() + defer svc.streamServerMu.Unlock() if err := svc.streamServer.Close(); err != nil { svc.logger.Errorw("error closing stream server", "error", err) } @@ -50,8 +50,8 @@ func (svc *webService) closeStreamServer() { func (svc *webService) initStreamServer(ctx context.Context, srv rpc.Server) error { // The webService depends on the stream server in addition to modules. We relax expectations on // what will be started first and allow for any order. - svc.streamServerInitCloseMu.Lock() - defer svc.streamServerInitCloseMu.Unlock() + svc.streamServerMu.Lock() + defer svc.streamServerMu.Unlock() if svc.streamServer == nil { var streamConfig gostream.StreamConfig if svc.opts.streamConfig != nil { From c9c2cc2f211b8fea1dfa3596d2d0042aed38c3ca Mon Sep 17 00:00:00 2001 From: Alden Hsu Date: Fri, 14 Nov 2025 10:30:53 -0500 Subject: [PATCH 4/5] Revert "add streamServerMu to guard streamServer creation/deletion" This reverts commit dadd575e4eb39f9d8e6d22215a397740303e6344. --- robot/web/web.go | 21 ++++++++++----------- robot/web/web_c.go | 13 +------------ 2 files changed, 11 insertions(+), 23 deletions(-) diff --git a/robot/web/web.go b/robot/web/web.go index affe565c722..1b366856748 100644 --- a/robot/web/web.go +++ b/robot/web/web.go @@ -108,17 +108,16 @@ type webService struct { tcpModServer rpc.Server // Will be nil on non-cgo builds. - streamServerMu sync.Mutex - streamServer *webstream.Server - opts options - addr string - modAddrs config.ParentSockAddrs - logger logging.Logger - cancelCtx context.Context - cancelFunc func() - isRunning bool - webWorkers sync.WaitGroup - modWorkers sync.WaitGroup + streamServer *webstream.Server + opts options + addr string + modAddrs config.ParentSockAddrs + logger logging.Logger + cancelCtx context.Context + cancelFunc func() + isRunning bool + webWorkers sync.WaitGroup + modWorkers sync.WaitGroup requestCounter RequestCounter modPeerConnTracker *grpc.ModPeerConnTracker diff --git a/robot/web/web_c.go b/robot/web/web_c.go index 4d00802760f..5d0ed193ca7 100644 --- a/robot/web/web_c.go +++ b/robot/web/web_c.go @@ -23,19 +23,10 @@ func (svc *webService) Reconfigure(ctx context.Context, _ resource.Dependencies, if !svc.isRunning { return nil } - - svc.streamServerMu.Lock() - defer svc.streamServerMu.Unlock() - // May be nil if this is being run after closeStreamServer. - if svc.streamServer != nil { - return svc.streamServer.AddNewStreams(svc.cancelCtx) - } - return nil + return svc.streamServer.AddNewStreams(svc.cancelCtx) } func (svc *webService) closeStreamServer() { - svc.streamServerMu.Lock() - defer svc.streamServerMu.Unlock() if err := svc.streamServer.Close(); err != nil { svc.logger.Errorw("error closing stream server", "error", err) } @@ -50,8 +41,6 @@ func (svc *webService) closeStreamServer() { func (svc *webService) initStreamServer(ctx context.Context, srv rpc.Server) error { // The webService depends on the stream server in addition to modules. We relax expectations on // what will be started first and allow for any order. - svc.streamServerMu.Lock() - defer svc.streamServerMu.Unlock() if svc.streamServer == nil { var streamConfig gostream.StreamConfig if svc.opts.streamConfig != nil { From 4c2d2da1295398a4749553a9b46971d2753a2e7d Mon Sep 17 00:00:00 2001 From: Alden Hsu Date: Thu, 13 Nov 2025 11:30:29 -0500 Subject: [PATCH 5/5] move closeStreamServer to stopWeb --- robot/web/web.go | 2 +- robot/web/web_c.go | 19 +++++++++++-------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/robot/web/web.go b/robot/web/web.go index 1b366856748..be3a79fc4ed 100644 --- a/robot/web/web.go +++ b/robot/web/web.go @@ -360,6 +360,7 @@ func (svc *webService) stopWeb() { if svc.cancelFunc != nil { svc.cancelFunc() } + svc.closeStreamServer() svc.isRunning = false svc.webWorkers.Wait() } @@ -484,7 +485,6 @@ func (svc *webService) runWeb(ctx context.Context, options weboptions.Options) ( svc.logger.Errorw("error stopping rpc server", "error", err) } }() - svc.closeStreamServer() }) svc.webWorkers.Add(1) utils.PanicCapturingGo(func() { diff --git a/robot/web/web_c.go b/robot/web/web_c.go index 5d0ed193ca7..33a53fa3668 100644 --- a/robot/web/web_c.go +++ b/robot/web/web_c.go @@ -27,15 +27,18 @@ func (svc *webService) Reconfigure(ctx context.Context, _ resource.Dependencies, } func (svc *webService) closeStreamServer() { - if err := svc.streamServer.Close(); err != nil { - svc.logger.Errorw("error closing stream server", "error", err) - } + // streamServer is called by svc.stopWeb, which is called by both Stop and Close in the shutdown process. + if svc.streamServer != nil { + if err := svc.streamServer.Close(); err != nil { + svc.logger.Errorw("error closing stream server", "error", err) + } - // RSDK-10570: Nil out the stream server such that we recreate it on a `runWeb` call. Recreating - // the stream server is important for passing in a fresh `svc.cancelCtx` that's in an alive - // state. The stream server checks that context, for example, when handling the AddStream API - // call. - svc.streamServer = nil + // RSDK-10570: Nil out the stream server such that we recreate it on a `runWeb` call. Recreating + // the stream server is important for passing in a fresh `svc.cancelCtx` that's in an alive + // state. The stream server checks that context, for example, when handling the AddStream API + // call. + svc.streamServer = nil + } } func (svc *webService) initStreamServer(ctx context.Context, srv rpc.Server) error {