From b63da6435f5986d84bd94d4225749c0d27afbf50 Mon Sep 17 00:00:00 2001 From: Tahiya Date: Tue, 30 Sep 2025 10:58:36 -0400 Subject: [PATCH 1/2] DATA-4662 - Export all capture metadata fields in dataset file (#5330) --- cli/dataset.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/cli/dataset.go b/cli/dataset.go index 8af186eb0b4..4687f658a01 100644 --- a/cli/dataset.go +++ b/cli/dataset.go @@ -273,8 +273,11 @@ type ImageMetadata struct { ClassificationAnnotations []Annotation `json:"classification_annotations"` BBoxAnnotations []BBoxAnnotation `json:"bounding_box_annotations"` Timestamp string `json:"timestamp"` - PartID string `json:"part_id"` - ComponentName string `json:"component_name"` + BinaryDataID string `json:"binary_data_id,omitempty"` + OrganizationID string `json:"organization_id,omitempty"` + LocationID string `json:"location_id,omitempty"` + PartID string `json:"part_id,omitempty"` + ComponentName string `json:"component_name,omitempty"` } // BBoxAnnotation holds the information associated with each bounding box. @@ -332,13 +335,17 @@ func binaryDataToJSONLines(ctx context.Context, client datapb.DataServiceClient, fileName += ext } + captureMD := datum.GetMetadata().GetCaptureMetadata() jsonl = ImageMetadata{ ImagePath: fileName, ClassificationAnnotations: annotations, BBoxAnnotations: bboxAnnotations, - PartID: datum.GetMetadata().GetCaptureMetadata().GetPartId(), - ComponentName: datum.GetMetadata().GetCaptureMetadata().GetComponentName(), Timestamp: datum.GetMetadata().GetTimeRequested().AsTime().String(), + BinaryDataID: datum.GetMetadata().GetBinaryDataId(), + OrganizationID: captureMD.GetOrganizationId(), + LocationID: captureMD.GetLocationId(), + PartID: captureMD.GetPartId(), + ComponentName: captureMD.GetComponentName(), } line, err := json.Marshal(jsonl) From 7fcbbc080b83416012b4cfb3479d4166604cfe13 Mon Sep 17 00:00:00 2001 From: Dan Gottlieb Date: Wed, 29 Oct 2025 15:03:55 -0400 Subject: [PATCH 2/2] Dump a ton of information when hitting inflight request limits. --- robot/web/request_counter.go | 170 ++++++++++++++++++++++++++++++++++- 1 file changed, 166 insertions(+), 4 deletions(-) diff --git a/robot/web/request_counter.go b/robot/web/request_counter.go index ed0712d657c..a9ad52f00ae 100644 --- a/robot/web/request_counter.go +++ b/robot/web/request_counter.go @@ -2,6 +2,7 @@ package web import ( "context" + "encoding/json" "fmt" "os" "strconv" @@ -10,6 +11,7 @@ import ( "testing" "time" + "github.com/viamrobotics/webrtc/v3" googlegrpc "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -19,6 +21,7 @@ import ( "go.viam.com/rdk/resource" rutils "go.viam.com/rdk/utils" "go.viam.com/rdk/utils/ssync" + "go.viam.com/utils/rpc" ) // RequestLimitExceededError is an error returned when a request is rejected @@ -58,12 +61,19 @@ type requestStats struct { dataSent atomic.Int64 } +type pcStats struct { + inFlight ssync.Map[string, *atomic.Int64] +} + // RequestCounter is used to track and limit incoming requests. It instruments // every unary and streaming request coming in from both external clients and // internal modules. type RequestCounter struct { logger logging.Logger + errors atomic.Int64 + pcCounters ssync.Map[*webrtc.PeerConnection, *pcStats] + // requestKeyToStats maps individual API calls for each resource to a set of // metrics. E.g: `motor-foo.IsPowered` and `motor-foo.GoFor` would each have // their own set of stats. @@ -82,8 +92,19 @@ type RequestCounter struct { } // decrInFlight decrements the in flight request counter for a given resource. -func (rc *RequestCounter) decrInFlight(resource string) { +func (rc *RequestCounter) decrInFlight(resource string, pc *webrtc.PeerConnection) { rc.ensureInFlightCounterForResource(resource).Add(-1) + + stats, ok := rc.pcCounters.Load(pc) + if !ok { + panic("not ok") + } + + oldCnt, ok := stats.inFlight.Load(resource) + if !ok { + panic("not ok") + } + oldCnt.Add(-1) } func (rc *RequestCounter) preRequestIncrement(key string) { @@ -135,23 +156,152 @@ func (rc *RequestCounter) Stats() any { return ret } +func (rc *RequestCounter) dumpPCs(pc *webrtc.PeerConnection) { + myConnName := "module conn" + if pc != nil { + for statsKey := range pc.GetStats() { + if strings.HasPrefix(statsKey, "PeerConnection-") { + myConnName = statsKey + break + } + } + } + + errId := rc.errors.Add(1) + rc.pcCounters.Range(func(key *webrtc.PeerConnection, val *pcStats) bool { + if key == nil { + output := make(map[string]int64) + val.inFlight.Range(func(apiName string, cnt *atomic.Int64) bool { + output[apiName] = cnt.Load() + return true + }) + + jsonLog, err := json.MarshalIndent(output, fmt.Sprintf("ErrorID: %v", errId), " ") + if err != nil { + panic(err) + } + + rc.logger.Infof("DBG. Too many in-flight dump. RC: %p MyConn: %v. Other module connection. ErrID: %v\n%v", + rc, myConnName, errId, string(jsonLog)) + return true + } + + pcStats := key.GetStats() + + var pcStat string + // JSON-ified stat blob + var candPair map[string]any + // map of cand name to JSON-ified stat blob + cands := make(map[string]map[string]any) + var localCand map[string]any + var remoteCand map[string]any + for statKey, statVal := range pcStats { + // statVal is an opaque blob. JSON-ify it. + // + // Searching for: + // 1. statKey: PeerConnection-1761759940137764346 + // 2. statVal.Type: "candidate-pair" + // 3. statKey: .LocalCandidateID + // 4. statKey: .RemoteCandidateID + if strings.HasPrefix(statKey, "PeerConnection-") { + pcStat = statKey + } else if strings.HasPrefix(statKey, "candidate:") { + statValJSON, err := json.Marshal(statVal) + if err != nil { + panic(err) + } + + statValMap := make(map[string]any) + err = json.Unmarshal(statValJSON, &statValMap) + if err != nil { + panic(err) + } + + if statValMap["type"].(string) == "candidate-pair" { + candPair = statValMap + } else { + cands[statKey] = statValMap + } + } + } + + if pcStat == "" { + panic("no peer connection") + } + + if candPair == nil { + panic("no candidate pair bad") + } + + for candName, candStats := range cands { + if candName == candPair["localCandidateId"].(string) { + localCand = candStats + } else if candName == candPair["remoteCandidateId"].(string) { + remoteCand = candStats + } + } + + if localCand == nil || remoteCand == nil { + panic(fmt.Sprintf("Nil candidates. Local: %v Remote: %v", localCand, remoteCand)) + } + + seconds, millis := + int64(localCand["timestamp"].(float64))/1000, + int64(localCand["timestamp"].(float64))%1000 + connectTime := time.Unix(seconds, millis) + + logOutput := struct { + PeerConn string + ConnectTime string + SinceConnect string + LocalIP string + RemoteIP string + APICnt map[string]int64 + }{ + PeerConn: pcStat, + ConnectTime: fmt.Sprintf("%v", connectTime), + SinceConnect: fmt.Sprintf("%v", time.Since(connectTime)), + LocalIP: fmt.Sprintf("%v", localCand["ip"]), + RemoteIP: fmt.Sprintf("%v", remoteCand["ip"]), + APICnt: make(map[string]int64), + } + val.inFlight.Range(func(apiName string, cnt *atomic.Int64) bool { + logOutput.APICnt[apiName] = cnt.Load() + return true + }) + + jsonLog, err := json.MarshalIndent(logOutput, fmt.Sprintf("ErrorID: %v", errId), " ") + if err != nil { + panic(err) + } + + rc.logger.Infof("DBG. Too many in-flight dump. RC: %p MyConn: %v. ErrorID: %v\n%v", + rc, myConnName, errId, string(jsonLog)) + return true + }) +} + // UnaryInterceptor returns an incoming server interceptor that will pull method information and // optionally resource information to bump the request counters. func (rc *RequestCounter) UnaryInterceptor( ctx context.Context, req any, info *googlegrpc.UnaryServerInfo, handler googlegrpc.UnaryHandler, ) (resp any, err error) { apiMethod := extractViamAPI(info.FullMethod) + pc, _ := rpc.ContextPeerConnection(ctx) + if resource := buildResourceLimitKey(req, apiMethod); resource != "" { - if ok := rc.incrInFlight(resource); !ok { + if ok := rc.incrInFlight(resource, pc); !ok { rc.logger.Warnw("Request limit exceeded for resource", "method", apiMethod.full, "resource", resource) + rc.dumpPCs(pc) return nil, &RequestLimitExceededError{ resource: resource, limit: rc.inFlightLimit, } } - defer rc.decrInFlight(resource) + defer rc.decrInFlight(resource, pc) } + requestCounterKey := buildRCKey(req, apiMethod) // Storing in FTDC: `web.motor-name.MotorService/IsMoving: `. if apiMethod.shortPath != "" { @@ -208,18 +358,30 @@ func (rc *RequestCounter) ensureInFlightCounterForResource(resource string) *ato if !ok { counter, _ = rc.inFlightRequests.LoadOrStore(resource, &atomic.Int64{}) } + return counter } // incrInFlight attempts to increment the in flight request counter for a given // resource. It returns true if it was successful and false if an additional // request would exceed the configured limit. -func (rc *RequestCounter) incrInFlight(resource string) bool { +func (rc *RequestCounter) incrInFlight(resource string, pc *webrtc.PeerConnection) bool { counter := rc.ensureInFlightCounterForResource(resource) if newCount := counter.Add(1); newCount > rc.inFlightLimit { counter.Add(-1) return false } + + stats, ok := rc.pcCounters.Load(pc) + if !ok { + stats, _ = rc.pcCounters.LoadOrStore(pc, &pcStats{}) + } + pcCnt, ok := stats.inFlight.Load(resource) + if !ok { + pcCnt, _ = stats.inFlight.LoadOrStore(resource, &atomic.Int64{}) + } + pcCnt.Add(1) + return true }