From 62dc74d683578fbc3112a5feb888f5f5cfc4ffa3 Mon Sep 17 00:00:00 2001 From: Ayushi Sharma Date: Wed, 5 Nov 2025 13:44:33 +0530 Subject: [PATCH 1/4] feat(dagger): add support for tolerations and node affinity for dagger --- modules/dagger/driver.go | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/modules/dagger/driver.go b/modules/dagger/driver.go index 9c738f0..8dca1f2 100644 --- a/modules/dagger/driver.go +++ b/modules/dagger/driver.go @@ -104,6 +104,8 @@ type driverConf struct { // timeout value for a kube deployment run KubeDeployTimeout int `json:"kube_deploy_timeout_seconds"` + + NodeAffinityMatchExpressions kubernetes.NodeAffinityMatchExpressions `json:"node_affinity_match_expressions"` } type Output struct { @@ -218,6 +220,35 @@ func (dd *daggerDriver) getHelmRelease(res resource.Resource, conf Config, formatted := fmt.Sprintf("[%s]", strings.Join(programArgs, ",")) encodedProgramArgs := base64.StdEncoding.EncodeToString([]byte(formatted)) + tolerationKey := fmt.Sprintf("dagger_%s", conf.EnvVariables["SINK_TYPE"]) + tolerations := []map[string]any{} + + for _, t := range kubeOut.Tolerations[tolerationKey] { + tolerations = append(tolerations, map[string]any{ + "key": t.Key, + "value": t.Value, + "effect": t.Effect, + "operator": t.Operator, + }) + } + + requiredDuringSchedulingIgnoredDuringExecution := []kubernetes.Preference{} + preferredDuringSchedulingIgnoredDuringExecution := []kubernetes.WeightedPreference{} + + affinityKey := fmt.Sprintf("dagger_%s", conf.EnvVariables["SINK_TYPE"]) + if affinity, ok := kubeOut.Affinities[affinityKey]; ok { + requiredDuringSchedulingIgnoredDuringExecution = affinity.RequiredDuringSchedulingIgnoredDuringExecution + preferredDuringSchedulingIgnoredDuringExecution = affinity.PreferredDuringSchedulingIgnoredDuringExecution + } + + if dd.conf.NodeAffinityMatchExpressions.RequiredDuringSchedulingIgnoredDuringExecution != nil { + requiredDuringSchedulingIgnoredDuringExecution = dd.conf.NodeAffinityMatchExpressions.RequiredDuringSchedulingIgnoredDuringExecution + } + + if dd.conf.NodeAffinityMatchExpressions.PreferredDuringSchedulingIgnoredDuringExecution != nil { + preferredDuringSchedulingIgnoredDuringExecution = dd.conf.NodeAffinityMatchExpressions.PreferredDuringSchedulingIgnoredDuringExecution + } + rc.Values = map[string]any{ labelsConfKey: modules.CloneAndMergeMaps(deploymentLabels, entropyLabels), "image": imageRepository, @@ -250,6 +281,11 @@ func (dd *daggerDriver) getHelmRelease(res resource.Resource, conf Config, "dagger_k8s_ha_url": conf.DaggerK8sHAURL, "cloud_provider": conf.CloudProvider, "fs_oss_endpoint": conf.FSOSSEndpoint, + "tolerations": tolerations, + "nodeAffinityMatchExpressions": map[string]any{ + "requiredDuringSchedulingIgnoredDuringExecution": requiredDuringSchedulingIgnoredDuringExecution, + "preferredDuringSchedulingIgnoredDuringExecution": preferredDuringSchedulingIgnoredDuringExecution, + }, } return rc, nil From 2e701df5bac2c13e01b6cc655b12710257bf4727 Mon Sep 17 00:00:00 2001 From: Ayushi Sharma Date: Mon, 10 Nov 2025 12:23:25 +0530 Subject: [PATCH 2/4] feat: test dagger chart with existing autoscaler tolerations --- modules/dagger/driver.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/dagger/driver.go b/modules/dagger/driver.go index 8dca1f2..5cd9466 100644 --- a/modules/dagger/driver.go +++ b/modules/dagger/driver.go @@ -220,7 +220,7 @@ func (dd *daggerDriver) getHelmRelease(res resource.Resource, conf Config, formatted := fmt.Sprintf("[%s]", strings.Join(programArgs, ",")) encodedProgramArgs := base64.StdEncoding.EncodeToString([]byte(formatted)) - tolerationKey := fmt.Sprintf("dagger_%s", conf.EnvVariables["SINK_TYPE"]) + tolerationKey := "firehose_autoscaler" tolerations := []map[string]any{} for _, t := range kubeOut.Tolerations[tolerationKey] { @@ -235,7 +235,7 @@ func (dd *daggerDriver) getHelmRelease(res resource.Resource, conf Config, requiredDuringSchedulingIgnoredDuringExecution := []kubernetes.Preference{} preferredDuringSchedulingIgnoredDuringExecution := []kubernetes.WeightedPreference{} - affinityKey := fmt.Sprintf("dagger_%s", conf.EnvVariables["SINK_TYPE"]) + affinityKey := "firehose_autoscaler" if affinity, ok := kubeOut.Affinities[affinityKey]; ok { requiredDuringSchedulingIgnoredDuringExecution = affinity.RequiredDuringSchedulingIgnoredDuringExecution preferredDuringSchedulingIgnoredDuringExecution = affinity.PreferredDuringSchedulingIgnoredDuringExecution From 86ec77f6b837f41c09b84f7b0aaed0c2e89c1536 Mon Sep 17 00:00:00 2001 From: Ayushi Sharma Date: Thu, 13 Nov 2025 09:19:48 +0530 Subject: [PATCH 3/4] add debug logs --- modules/dagger/driver.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/modules/dagger/driver.go b/modules/dagger/driver.go index 5cd9466..fcfaec1 100644 --- a/modules/dagger/driver.go +++ b/modules/dagger/driver.go @@ -288,6 +288,8 @@ func (dd *daggerDriver) getHelmRelease(res resource.Resource, conf Config, }, } + fmt.Println("Helm Release Values:", rc.Values) + return rc, nil } From 72a6ddd505683d419f75c55c75089f8001e1f0c7 Mon Sep 17 00:00:00 2001 From: Ayushi Sharma Date: Thu, 13 Nov 2025 10:48:22 +0530 Subject: [PATCH 4/4] feat: centralize node affinity interface conversion logic --- modules/dagger/driver.go | 15 ++++++----- modules/firehose/driver.go | 49 ++---------------------------------- modules/kubernetes/output.go | 46 +++++++++++++++++++++++++++++++++ 3 files changed, 57 insertions(+), 53 deletions(-) diff --git a/modules/dagger/driver.go b/modules/dagger/driver.go index fcfaec1..3c96235 100644 --- a/modules/dagger/driver.go +++ b/modules/dagger/driver.go @@ -49,6 +49,8 @@ const ( const defaultKey = "default" +const daggerTaintKey = "dagger" + var defaultDriverConf = driverConf{ Namespace: map[string]string{ defaultKey: "dagger", @@ -220,7 +222,7 @@ func (dd *daggerDriver) getHelmRelease(res resource.Resource, conf Config, formatted := fmt.Sprintf("[%s]", strings.Join(programArgs, ",")) encodedProgramArgs := base64.StdEncoding.EncodeToString([]byte(formatted)) - tolerationKey := "firehose_autoscaler" + tolerationKey := daggerTaintKey tolerations := []map[string]any{} for _, t := range kubeOut.Tolerations[tolerationKey] { @@ -235,7 +237,7 @@ func (dd *daggerDriver) getHelmRelease(res resource.Resource, conf Config, requiredDuringSchedulingIgnoredDuringExecution := []kubernetes.Preference{} preferredDuringSchedulingIgnoredDuringExecution := []kubernetes.WeightedPreference{} - affinityKey := "firehose_autoscaler" + affinityKey := daggerTaintKey if affinity, ok := kubeOut.Affinities[affinityKey]; ok { requiredDuringSchedulingIgnoredDuringExecution = affinity.RequiredDuringSchedulingIgnoredDuringExecution preferredDuringSchedulingIgnoredDuringExecution = affinity.PreferredDuringSchedulingIgnoredDuringExecution @@ -249,6 +251,9 @@ func (dd *daggerDriver) getHelmRelease(res resource.Resource, conf Config, preferredDuringSchedulingIgnoredDuringExecution = dd.conf.NodeAffinityMatchExpressions.PreferredDuringSchedulingIgnoredDuringExecution } + requiredDuringSchedulingIgnoredDuringExecutionInterface := kubernetes.PreferenceSliceToInterfaceSlice(requiredDuringSchedulingIgnoredDuringExecution) + preferredDuringSchedulingIgnoredDuringExecutionInterface := kubernetes.WeightedPreferencesToInterfaceSlice(preferredDuringSchedulingIgnoredDuringExecution) + rc.Values = map[string]any{ labelsConfKey: modules.CloneAndMergeMaps(deploymentLabels, entropyLabels), "image": imageRepository, @@ -283,13 +288,11 @@ func (dd *daggerDriver) getHelmRelease(res resource.Resource, conf Config, "fs_oss_endpoint": conf.FSOSSEndpoint, "tolerations": tolerations, "nodeAffinityMatchExpressions": map[string]any{ - "requiredDuringSchedulingIgnoredDuringExecution": requiredDuringSchedulingIgnoredDuringExecution, - "preferredDuringSchedulingIgnoredDuringExecution": preferredDuringSchedulingIgnoredDuringExecution, + "requiredDuringSchedulingIgnoredDuringExecution": requiredDuringSchedulingIgnoredDuringExecutionInterface, + "preferredDuringSchedulingIgnoredDuringExecution": preferredDuringSchedulingIgnoredDuringExecutionInterface, }, } - fmt.Println("Helm Release Values:", rc.Values) - return rc, nil } diff --git a/modules/firehose/driver.go b/modules/firehose/driver.go index 133c05e..de28b98 100644 --- a/modules/firehose/driver.go +++ b/modules/firehose/driver.go @@ -16,7 +16,6 @@ import ( "github.com/goto/entropy/pkg/errors" "github.com/goto/entropy/pkg/helm" "github.com/goto/entropy/pkg/kube" - "github.com/mitchellh/mapstructure" ) const ( @@ -351,8 +350,8 @@ func (fd *firehoseDriver) getHelmRelease(res resource.Resource, conf Config, imageRepository = conf.ChartValues.ImageRepository } - requiredDuringSchedulingIgnoredDuringExecutionInterface := preferenceSliceToInterfaceSlice(requiredDuringSchedulingIgnoredDuringExecution) - preferredDuringSchedulingIgnoredDuringExecutionInterface := weightedPreferencesToInterfaceSlice(preferredDuringSchedulingIgnoredDuringExecution) + requiredDuringSchedulingIgnoredDuringExecutionInterface := kubernetes.PreferenceSliceToInterfaceSlice(requiredDuringSchedulingIgnoredDuringExecution) + preferredDuringSchedulingIgnoredDuringExecutionInterface := kubernetes.WeightedPreferencesToInterfaceSlice(preferredDuringSchedulingIgnoredDuringExecution) rc.Values = map[string]any{ labelsConfKey: modules.CloneAndMergeMaps(deploymentLabels, entropyLabels), @@ -518,47 +517,3 @@ func renderTplOfMapStringAny(labelsTpl map[string]any, labelsValues map[string]s return labelsTpl, nil } - -func preferenceSliceToInterfaceSlice(prefs []kubernetes.Preference) []map[string]interface{} { - result := make([]map[string]interface{}, len(prefs)) - - for i, pref := range prefs { - var prefMap map[string]interface{} - if err := mapstructure.Decode(pref, &prefMap); err != nil { - continue - } - - lowercaseMap := make(map[string]interface{}) - for k, v := range prefMap { - lowercaseMap[strings.ToLower(k)] = v - } - result[i] = lowercaseMap - } - - return result -} - -func weightedPreferencesToInterfaceSlice(weightedPrefs []kubernetes.WeightedPreference) []map[string]interface{} { - result := make([]map[string]interface{}, len(weightedPrefs)) - - for i, wp := range weightedPrefs { - var wpMap map[string]interface{} - if err := mapstructure.Decode(wp, &wpMap); err != nil { - continue - } - - lowercaseMap := make(map[string]interface{}) - for k, v := range wpMap { - // Special handling for the preference field - if k == "Preference" && v != nil { - // Convert the nested Preference slice - lowercaseMap["preference"] = preferenceSliceToInterfaceSlice(wp.Preference) - } else { - lowercaseMap[strings.ToLower(k)] = v - } - } - result[i] = lowercaseMap - } - - return result -} diff --git a/modules/kubernetes/output.go b/modules/kubernetes/output.go index 70d5cc4..7c43e42 100644 --- a/modules/kubernetes/output.go +++ b/modules/kubernetes/output.go @@ -2,10 +2,12 @@ package kubernetes import ( "encoding/json" + "strings" "k8s.io/apimachinery/pkg/version" "github.com/goto/entropy/pkg/kube" + "github.com/mitchellh/mapstructure" ) type Output struct { @@ -47,3 +49,47 @@ func (out Output) JSON() []byte { } return b } + +func PreferenceSliceToInterfaceSlice(prefs []Preference) []map[string]interface{} { + result := make([]map[string]interface{}, len(prefs)) + + for i, pref := range prefs { + var prefMap map[string]interface{} + if err := mapstructure.Decode(pref, &prefMap); err != nil { + continue + } + + lowercaseMap := make(map[string]interface{}) + for k, v := range prefMap { + lowercaseMap[strings.ToLower(k)] = v + } + result[i] = lowercaseMap + } + + return result +} + +func WeightedPreferencesToInterfaceSlice(weightedPrefs []WeightedPreference) []map[string]interface{} { + result := make([]map[string]interface{}, len(weightedPrefs)) + + for i, wp := range weightedPrefs { + var wpMap map[string]interface{} + if err := mapstructure.Decode(wp, &wpMap); err != nil { + continue + } + + lowercaseMap := make(map[string]interface{}) + for k, v := range wpMap { + // Special handling for the preference field + if k == "Preference" && v != nil { + // Convert the nested Preference slice + lowercaseMap["preference"] = PreferenceSliceToInterfaceSlice(wp.Preference) + } else { + lowercaseMap[strings.ToLower(k)] = v + } + } + result[i] = lowercaseMap + } + + return result +}