Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions cli/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,11 @@ type ImageMetadata struct {
ClassificationAnnotations []Annotation `json:"classification_annotations"`
BBoxAnnotations []BBoxAnnotation `json:"bounding_box_annotations"`
Timestamp string `json:"timestamp"`
PartID string `json:"part_id"`
ComponentName string `json:"component_name"`
BinaryDataID string `json:"binary_data_id,omitempty"`
OrganizationID string `json:"organization_id,omitempty"`
LocationID string `json:"location_id,omitempty"`
PartID string `json:"part_id,omitempty"`
ComponentName string `json:"component_name,omitempty"`
}

// BBoxAnnotation holds the information associated with each bounding box.
Expand Down Expand Up @@ -332,13 +335,17 @@ func binaryDataToJSONLines(ctx context.Context, client datapb.DataServiceClient,
fileName += ext
}

captureMD := datum.GetMetadata().GetCaptureMetadata()
jsonl = ImageMetadata{
ImagePath: fileName,
ClassificationAnnotations: annotations,
BBoxAnnotations: bboxAnnotations,
PartID: datum.GetMetadata().GetCaptureMetadata().GetPartId(),
ComponentName: datum.GetMetadata().GetCaptureMetadata().GetComponentName(),
Timestamp: datum.GetMetadata().GetTimeRequested().AsTime().String(),
BinaryDataID: datum.GetMetadata().GetBinaryDataId(),
OrganizationID: captureMD.GetOrganizationId(),
LocationID: captureMD.GetLocationId(),
PartID: captureMD.GetPartId(),
ComponentName: captureMD.GetComponentName(),
}

line, err := json.Marshal(jsonl)
Expand Down
170 changes: 166 additions & 4 deletions robot/web/request_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package web

import (
"context"
"encoding/json"
"fmt"
"os"
"strconv"
Expand All @@ -10,6 +11,7 @@ import (
"testing"
"time"

"github.com/viamrobotics/webrtc/v3"
googlegrpc "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -19,6 +21,7 @@ import (
"go.viam.com/rdk/resource"
rutils "go.viam.com/rdk/utils"
"go.viam.com/rdk/utils/ssync"
"go.viam.com/utils/rpc"
)

// RequestLimitExceededError is an error returned when a request is rejected
Expand Down Expand Up @@ -58,12 +61,19 @@ type requestStats struct {
dataSent atomic.Int64
}

type pcStats struct {
inFlight ssync.Map[string, *atomic.Int64]
}

// RequestCounter is used to track and limit incoming requests. It instruments
// every unary and streaming request coming in from both external clients and
// internal modules.
type RequestCounter struct {
logger logging.Logger

errors atomic.Int64
pcCounters ssync.Map[*webrtc.PeerConnection, *pcStats]

// requestKeyToStats maps individual API calls for each resource to a set of
// metrics. E.g: `motor-foo.IsPowered` and `motor-foo.GoFor` would each have
// their own set of stats.
Expand All @@ -82,8 +92,19 @@ type RequestCounter struct {
}

// decrInFlight decrements the in flight request counter for a given resource.
func (rc *RequestCounter) decrInFlight(resource string) {
func (rc *RequestCounter) decrInFlight(resource string, pc *webrtc.PeerConnection) {
rc.ensureInFlightCounterForResource(resource).Add(-1)

stats, ok := rc.pcCounters.Load(pc)
if !ok {
panic("not ok")
}

oldCnt, ok := stats.inFlight.Load(resource)
if !ok {
panic("not ok")
}
oldCnt.Add(-1)
}

func (rc *RequestCounter) preRequestIncrement(key string) {
Expand Down Expand Up @@ -135,23 +156,152 @@ func (rc *RequestCounter) Stats() any {
return ret
}

func (rc *RequestCounter) dumpPCs(pc *webrtc.PeerConnection) {
myConnName := "module conn"
if pc != nil {
for statsKey := range pc.GetStats() {
if strings.HasPrefix(statsKey, "PeerConnection-") {
myConnName = statsKey
break
}
}
}

errId := rc.errors.Add(1)
rc.pcCounters.Range(func(key *webrtc.PeerConnection, val *pcStats) bool {
if key == nil {
output := make(map[string]int64)
val.inFlight.Range(func(apiName string, cnt *atomic.Int64) bool {
output[apiName] = cnt.Load()
return true
})

jsonLog, err := json.MarshalIndent(output, fmt.Sprintf("ErrorID: %v", errId), " ")
if err != nil {
panic(err)
}

rc.logger.Infof("DBG. Too many in-flight dump. RC: %p MyConn: %v. Other module connection. ErrID: %v\n%v",
rc, myConnName, errId, string(jsonLog))
return true
}

pcStats := key.GetStats()

var pcStat string
// JSON-ified stat blob
var candPair map[string]any
// map of cand name to JSON-ified stat blob
cands := make(map[string]map[string]any)
var localCand map[string]any
var remoteCand map[string]any
for statKey, statVal := range pcStats {
// statVal is an opaque blob. JSON-ify it.
//
// Searching for:
// 1. statKey: PeerConnection-1761759940137764346
// 2. statVal.Type: "candidate-pair"
// 3. statKey: <pair>.LocalCandidateID
// 4. statKey: <pair>.RemoteCandidateID
if strings.HasPrefix(statKey, "PeerConnection-") {
pcStat = statKey
} else if strings.HasPrefix(statKey, "candidate:") {
statValJSON, err := json.Marshal(statVal)
if err != nil {
panic(err)
}

statValMap := make(map[string]any)
err = json.Unmarshal(statValJSON, &statValMap)
if err != nil {
panic(err)
}

if statValMap["type"].(string) == "candidate-pair" {
candPair = statValMap
} else {
cands[statKey] = statValMap
}
}
}

if pcStat == "" {
panic("no peer connection")
}

if candPair == nil {
panic("no candidate pair bad")
}

for candName, candStats := range cands {
if candName == candPair["localCandidateId"].(string) {
localCand = candStats
} else if candName == candPair["remoteCandidateId"].(string) {
remoteCand = candStats
}
}

if localCand == nil || remoteCand == nil {
panic(fmt.Sprintf("Nil candidates. Local: %v Remote: %v", localCand, remoteCand))
}

seconds, millis :=
int64(localCand["timestamp"].(float64))/1000,
int64(localCand["timestamp"].(float64))%1000
connectTime := time.Unix(seconds, millis)

logOutput := struct {
PeerConn string
ConnectTime string
SinceConnect string
LocalIP string
RemoteIP string
APICnt map[string]int64
}{
PeerConn: pcStat,
ConnectTime: fmt.Sprintf("%v", connectTime),
SinceConnect: fmt.Sprintf("%v", time.Since(connectTime)),
LocalIP: fmt.Sprintf("%v", localCand["ip"]),
RemoteIP: fmt.Sprintf("%v", remoteCand["ip"]),
APICnt: make(map[string]int64),
}
val.inFlight.Range(func(apiName string, cnt *atomic.Int64) bool {
logOutput.APICnt[apiName] = cnt.Load()
return true
})

jsonLog, err := json.MarshalIndent(logOutput, fmt.Sprintf("ErrorID: %v", errId), " ")
if err != nil {
panic(err)
}

rc.logger.Infof("DBG. Too many in-flight dump. RC: %p MyConn: %v. ErrorID: %v\n%v",
rc, myConnName, errId, string(jsonLog))
return true
})
}

// UnaryInterceptor returns an incoming server interceptor that will pull method information and
// optionally resource information to bump the request counters.
func (rc *RequestCounter) UnaryInterceptor(
ctx context.Context, req any, info *googlegrpc.UnaryServerInfo, handler googlegrpc.UnaryHandler,
) (resp any, err error) {
apiMethod := extractViamAPI(info.FullMethod)
pc, _ := rpc.ContextPeerConnection(ctx)

if resource := buildResourceLimitKey(req, apiMethod); resource != "" {
if ok := rc.incrInFlight(resource); !ok {
if ok := rc.incrInFlight(resource, pc); !ok {
rc.logger.Warnw("Request limit exceeded for resource",
"method", apiMethod.full, "resource", resource)
rc.dumpPCs(pc)
return nil, &RequestLimitExceededError{
resource: resource,
limit: rc.inFlightLimit,
}
}
defer rc.decrInFlight(resource)
defer rc.decrInFlight(resource, pc)
}

requestCounterKey := buildRCKey(req, apiMethod)
// Storing in FTDC: `web.motor-name.MotorService/IsMoving: <count>`.
if apiMethod.shortPath != "" {
Expand Down Expand Up @@ -208,18 +358,30 @@ func (rc *RequestCounter) ensureInFlightCounterForResource(resource string) *ato
if !ok {
counter, _ = rc.inFlightRequests.LoadOrStore(resource, &atomic.Int64{})
}

return counter
}

// incrInFlight attempts to increment the in flight request counter for a given
// resource. It returns true if it was successful and false if an additional
// request would exceed the configured limit.
func (rc *RequestCounter) incrInFlight(resource string) bool {
func (rc *RequestCounter) incrInFlight(resource string, pc *webrtc.PeerConnection) bool {
counter := rc.ensureInFlightCounterForResource(resource)
if newCount := counter.Add(1); newCount > rc.inFlightLimit {
counter.Add(-1)
return false
}

stats, ok := rc.pcCounters.Load(pc)
if !ok {
stats, _ = rc.pcCounters.LoadOrStore(pc, &pcStats{})
}
pcCnt, ok := stats.inFlight.Load(resource)
if !ok {
pcCnt, _ = stats.inFlight.LoadOrStore(resource, &atomic.Int64{})
}
pcCnt.Add(1)

return true
}

Expand Down
Loading