Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions containers/aggregator/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# ------------------------
# Stage 1: Build Go aggregator
# ------------------------
FROM golang:1.23-alpine AS build

WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download

COPY . .
RUN go build -o aggregator .

# ------------------------
# Stage 2: Minimal runtime
# ------------------------
FROM alpine:latest

WORKDIR /root/
# Copy the compiled binary from build stage
COPY --from=build /app/aggregator .

# Expose internal port
EXPOSE 5000

# Run the aggregator
CMD ["./aggregator"]
156 changes: 75 additions & 81 deletions actor.go → containers/aggregator/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ package main
import (
"aggregator/auth"
"fmt"
"io"
"net/http"
"os"
"strings"
"time"

"github.com/google/uuid"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
"io"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"net/http"
"strings"
"time"
)

var serverMux *http.ServeMux
Expand All @@ -30,25 +32,39 @@ type Actor struct {
// TODO This needs to be more generic and extensible
func createActor(pipelineDescription string) (Actor, error) {
id := uuid.New().String()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

namespace := "aggregator-ns"

podScafolding := &v1.Pod{
// ----------------------------
// 1. Pod spec
// ----------------------------
image := os.Getenv("TRANSFORMATION")
if image == "" {
return Actor{}, fmt.Errorf("TRANSFORMATION env var is not set")
}

podSpec := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: id,
Labels: map[string]string{
"app": id, // Important for service selector!
"app": id,
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "transformation",
Image: "incremunica",
Image: image,
ImagePullPolicy: v1.PullNever,
Env: []v1.EnvVar{
{Name: "PIPELINE_DESCRIPTION", Value: fmt.Sprintf("%v", pipelineDescription)},
{Name: "HTTP_PROXY", Value: "http://uma-proxy-service.default.svc.cluster.local:8080"},
{Name: "HTTPS_PROXY", Value: "http://uma-proxy-service.default.svc.cluster.local:8443"},
{Name: "PIPELINE_DESCRIPTION", Value: pipelineDescription},
{Name: "SSL_CERT_FILE", Value: "/key-pair/uma-proxy.crt"},
{Name: "HTTP_PROXY", Value: "http://uma-proxy.uma-proxy-ns.svc.cluster.local:8080"},
{Name: "HTTPS_PROXY", Value: "http://uma-proxy.uma-proxy-ns.svc.cluster.local:8443"},
{Name: "http_proxy", Value: "http://uma-proxy.uma-proxy-ns.svc.cluster.local:8080"},
{Name: "https_proxy", Value: "http://uma-proxy.uma-proxy-ns.svc.cluster.local:8443"},
{Name: "LOG_LEVEL", Value: LogLevel.String()},
},
Ports: []v1.ContainerPort{
Expand Down Expand Up @@ -77,84 +93,73 @@ func createActor(pipelineDescription string) (Actor, error) {
},
}

ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
pod, err := Clientset.CoreV1().Pods("default").Create(ctx, podScafolding, metav1.CreateOptions{})
// ----------------------------
// 2. Create Pod
// ----------------------------
pod, err := Clientset.CoreV1().Pods(namespace).Create(ctx, podSpec, metav1.CreateOptions{})
if err != nil {
return Actor{}, fmt.Errorf("failed to create pod in namespace %s: %w", namespace, err)
}

// ----------------------------
// 3. Service spec
// ----------------------------
serviceName := "id-" + id + "-service"
svc := &v1.Service{
serviceSpec := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
},
Spec: v1.ServiceSpec{
Type: v1.ServiceTypeNodePort, // Or NodePort if you want external access
Type: v1.ServiceTypeClusterIP,
Selector: map[string]string{
"app": id, // Matches pod's label
"app": id,
},
Ports: []v1.ServicePort{
{
Port: 80, // The port your client will use
TargetPort: intstr.FromInt(8080), // The port inside the pod
Port: 80,
TargetPort: intstr.FromInt(8080),
},
},
},
}

_, err = Clientset.CoreV1().Services("default").Create(ctx, svc, metav1.CreateOptions{})

_, err = Clientset.CoreV1().Services(namespace).Create(ctx, serviceSpec, metav1.CreateOptions{})
if err != nil {
return Actor{}, fmt.Errorf("failed to create pod: %v", err)
return Actor{}, fmt.Errorf("failed to create service in namespace %s: %w", namespace, err)
}

watcher, _ := Clientset.CoreV1().Pods("default").Watch(ctx, metav1.ListOptions{
// ----------------------------
// 4. Wait for Pod Running
// ----------------------------
watcher, err := Clientset.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", id),
})

nodes, err := Clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
logrus.WithFields(logrus.Fields{"err": err}).Error("Failed to list nodes")
return Actor{}, err
}

nodeIp := ""
for _, node := range nodes.Items {
for _, addr := range node.Status.Addresses {
if addr.Type == v1.NodeExternalIP || addr.Type == v1.NodeInternalIP {
nodeIp = addr.Address
break
}
}
}

svc, err = Clientset.CoreV1().Services("default").Get(context.Background(), serviceName, metav1.GetOptions{})
if err != nil {
return Actor{}, err
}
defer watcher.Stop()

nodePort := 0
for _, port := range svc.Spec.Ports {
if svc.Spec.Type == v1.ServiceTypeNodePort {
nodePort = int(port.NodePort)
}
}

if nodePort == 0 {
return Actor{}, fmt.Errorf("no NodePort found for service %s", serviceName)
}

podLoop:
for event := range watcher.ResultChan() {
pod := event.Object.(*v1.Pod)
if pod.Status.Phase == v1.PodRunning {
break
} else if pod.Status.Phase == v1.PodFailed {
return Actor{}, fmt.Errorf("pod failed to start: %v", pod.Status.Reason)
p, ok := event.Object.(*v1.Pod)
if !ok {
continue
}
switch p.Status.Phase {
case v1.PodRunning:
break podLoop
case v1.PodFailed:
return Actor{}, fmt.Errorf("pod failed: %v", p.Status.Reason)
}
}

logrus.WithFields(logrus.Fields{"url": fmt.Sprintf("http://%s:%d", nodeIp, nodePort)}).Info("Pod is running")

var handleAllRequests = func(w http.ResponseWriter, r *http.Request) {
// -----------------------------
// 5. Register handler in aggregator
// -----------------------------
var requestHandler = func(w http.ResponseWriter, r *http.Request) {
logrus.WithFields(logrus.Fields{"actor_id": id, "path": r.URL.Path, "method": r.Method}).Info("Received request for actor")
if !auth.AuthorizeRequest(w, r, nil) {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}

Expand All @@ -163,32 +168,23 @@ func createActor(pipelineDescription string) (Actor, error) {
subPath := strings.TrimPrefix(r.URL.Path, actorPrefix)

// Construct the target URL with the subpath and query parameters
targetURL := fmt.Sprintf("http://%s:%d/%s", nodeIp, nodePort, subPath)
serviceURL := fmt.Sprintf("http://%s.%s.svc.cluster.local:80/%s", serviceName, namespace, subPath)
if r.URL.RawQuery != "" {
targetURL += "?" + r.URL.RawQuery
serviceURL += "?" + r.URL.RawQuery
}

// Create a new request with the same method, headers, and body
req, err := http.NewRequest(r.Method, targetURL, r.Body)
req, err := http.NewRequest(r.Method, serviceURL, r.Body)
if err != nil {
logrus.WithFields(logrus.Fields{"err": err}).Error("Error creating request")
http.Error(w, "Failed to create request", http.StatusInternalServerError)
http.Error(w, "failed to create request", http.StatusInternalServerError)
return
}
req.Header = r.Header.Clone()

// Copy headers from original request
for name, values := range r.Header {
for _, value := range values {
req.Header.Add(name, value)
}
}

// Make the request to the pod
client := &http.Client{}
resp, err := client.Do(req)
// Make the request to the service
resp, err := http.DefaultClient.Do(req)
if err != nil {
logrus.WithFields(logrus.Fields{"err": err, "target_url": req.URL.String()}).Error("Error reaching pod service")
http.Error(w, "Failed to reach pod service", http.StatusInternalServerError)
fmt.Println("Failed to reach actor service:", err.Error())
http.Error(w, "failed to reach actor service", http.StatusBadGateway)
return
}
defer resp.Body.Close()
Expand All @@ -206,8 +202,8 @@ func createActor(pipelineDescription string) (Actor, error) {
// Check if this is a streaming response (like SSE)
contentType := resp.Header.Get("Content-Type")
isSSE := strings.Contains(contentType, "text/event-stream")

logrus.WithFields(logrus.Fields{"content_type": contentType, "is_sse": isSSE}).Debug("Content-Type response")

if isSSE {
// Handle Server-Sent Events streaming
flusher, ok := w.(http.Flusher)
Expand Down Expand Up @@ -237,10 +233,8 @@ func createActor(pipelineDescription string) (Actor, error) {
}
}

serverMux.HandleFunc("/"+id+"/", handleAllRequests)

// Also handle exact match without trailing slash for backwards compatibility
serverMux.HandleFunc("/"+id, handleAllRequests)
serverMux.HandleFunc("/"+id+"/", requestHandler)
serverMux.HandleFunc("/"+id, requestHandler)

actor := Actor{
Id: id,
Expand Down
Loading