From b704129e04de5c370a7cd4e08fb4d608ac131886 Mon Sep 17 00:00:00 2001 From: "jiasheng.yu" Date: Wed, 17 Sep 2025 18:08:15 +0800 Subject: [PATCH] fix: Fully replace pipelines on Writable.Pipeline update Previously only updated topics/transforms per pipeline ID. Now perform full replacement: remove pipelines not in config (unregister metrics), create/register new pipelines, and build full topics with base prefix. Signed-off-by: jiasheng.yu --- internal/app/configupdates.go | 12 ++++++++---- internal/runtime/runtime.go | 22 ++++++++++++++++++++++ 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/internal/app/configupdates.go b/internal/app/configupdates.go index f30370b03..02363be79 100644 --- a/internal/app/configupdates.go +++ b/internal/app/configupdates.go @@ -151,11 +151,15 @@ func (processor *ConfigUpdateProcessor) processConfigChangedPipeline() { sdk.runtime.TargetType = sdk.targetType // Update the pipelines with their new transforms - for _, pipeline := range pipelines { - // TODO: Look at better way to apply pipeline updates - sdk.runtime.SetFunctionsPipelineTransforms(pipeline.Id, pipeline.Transforms) - sdk.runtime.SetFunctionsPipelineTopics(pipeline.Id, pipeline.Topics) + for i, pipline := range pipelines { + var fullTopics []string + for _, topic := range pipline.Topics { + fullTopics = append(fullTopics, coreCommon.BuildTopic(sdk.config.MessageBus.GetBaseTopicPrefix(), topic)) + } + pipline.Topics = fullTopics + pipelines[i] = pipline } + sdk.runtime.UpdateFunctionsPipelines(pipelines) sdk.LoggingClient().Info("Configurable Pipeline successfully reloaded from new configuration") } diff --git a/internal/runtime/runtime.go b/internal/runtime/runtime.go index fc2f20283..becf576a1 100644 --- a/internal/runtime/runtime.go +++ b/internal/runtime/runtime.go @@ -162,6 +162,28 @@ func (fpr *FunctionsPipelineRuntime) RemoveAllFunctionPipelines() { fpr.isBusyCopying.Unlock() } +func (fpr *FunctionsPipelineRuntime) UpdateFunctionsPipelines(piplines map[string]interfaces.FunctionPipeline) { + fpr.isBusyCopying.Lock() + defer fpr.isBusyCopying.Unlock() + + metricManager := bootstrapContainer.MetricsManagerFrom(fpr.dic.Get) + for _, curPipline := range fpr.pipelines { + if _, ok := piplines[curPipline.Id]; !ok { + fpr.unregisterPipelineMetric(metricManager, internal.PipelineMessagesProcessedName, curPipline.Id) + fpr.unregisterPipelineMetric(metricManager, internal.PipelineMessageProcessingTimeName, curPipline.Id) + fpr.unregisterPipelineMetric(metricManager, internal.PipelineProcessingErrorsName, curPipline.Id) + delete(fpr.pipelines, curPipline.Id) + } + } + for _, toAdd := range piplines { + pipeline := NewFunctionPipeline(toAdd.Id, toAdd.Topics, toAdd.Transforms) + fpr.pipelines[toAdd.Id] = &pipeline + fpr.registerPipelineMetric(metricManager, internal.PipelineMessagesProcessedName, pipeline.Id, pipeline.MessagesProcessed) + fpr.registerPipelineMetric(metricManager, internal.PipelineMessageProcessingTimeName, pipeline.Id, pipeline.MessageProcessingTime) + fpr.registerPipelineMetric(metricManager, internal.PipelineProcessingErrorsName, pipeline.Id, pipeline.ProcessingErrors) + } +} + // AddFunctionsPipeline is thread safe to set transforms func (fpr *FunctionsPipelineRuntime) AddFunctionsPipeline(id string, topics []string, transforms []interfaces.AppFunction) error { _, exists := fpr.pipelines[id]