diff --git a/internal/events/webhooks/webhooks.go b/internal/events/webhooks/webhooks.go index 0bc1fd6dd5..b8f30fe38f 100644 --- a/internal/events/webhooks/webhooks.go +++ b/internal/events/webhooks/webhooks.go @@ -87,7 +87,7 @@ func (wh *WebHooks) Init(ctx context.Context, config config.Section) (err error) client := ffresty.NewWithConfig(ctx, *ffrestyConfig) *wh = WebHooks{ - ctx: log.WithLogField(ctx, "webhook", wh.connID), + ctx: log.WithLogField(ctx, "webhook", connID), capabilities: &events.Capabilities{ BatchDelivery: true, }, @@ -139,7 +139,7 @@ func (p *whPayload) firstData() fftypes.JSONObject { } func (wh *WebHooks) buildPayload(ctx context.Context, sub *core.Subscription, event *core.CombinedEventDataDelivery) *whPayload { - log.L(wh.ctx).Debugf("Webhook-> %s event %s on subscription %s", sub.Options.URL, event.Event.ID, sub.ID) + log.L(ctx).Debugf("Webhook-> %s event %s on subscription %s", sub.Options.URL, event.Event.ID, sub.ID) withData := sub.Options.WithData != nil && *sub.Options.WithData options := sub.Options.TransportOptions() p := &whPayload{ @@ -402,7 +402,7 @@ func (wh *WebHooks) attemptRequest(ctx context.Context, sub *core.Subscription, Status: resp.StatusCode(), Headers: fftypes.JSONObject{}, } - log.L(wh.ctx).Debugf("Webhook<- %s %s on subscription %s returned %d", req.method, req.url, sub.ID, res.Status) + log.L(ctx).Debugf("Webhook<- %s %s on subscription %s returned %d", req.method, req.url, sub.ID, res.Status) header := resp.Header() for h := range header { res.Headers[h] = header.Get(h) @@ -440,7 +440,7 @@ func (wh *WebHooks) doDelivery(ctx context.Context, connID string, reply bool, s if gwErr != nil { // Generate a bad-gateway error response - we always want to send something back, // rather than just causing timeouts - log.L(wh.ctx).Errorf("Failed to invoke webhook: %s", gwErr) + log.L(ctx).Errorf("Failed to invoke webhook: %s on subscription %s", gwErr, sub.ID) b, _ := json.Marshal(&fftypes.RESTError{ Error: gwErr.Error(), }) @@ -453,7 +453,7 @@ func (wh *WebHooks) doDelivery(ctx context.Context, connID string, reply bool, s } } b, _ := json.Marshal(&res) - log.L(wh.ctx).Tracef("Webhook response: %s", string(b)) + log.L(ctx).Tracef("Webhook response: %s for subscription %s", string(b), sub.ID) // For each event emit a response for _, combinedEvent := range events { @@ -465,7 +465,7 @@ func (wh *WebHooks) doDelivery(ctx context.Context, connID string, reply bool, s txType = fftypes.FFEnum(strings.ToLower(req.replyTx)) } if cb, ok := wh.callbacks.handlers[sub.Namespace]; ok { - log.L(wh.ctx).Debugf("Sending reply message for %s CID=%s", event.ID, event.Message.Header.ID) + log.L(ctx).Debugf("Sending reply message for %s CID=%s", event.ID, event.Message.Header.ID) cb.DeliveryResponse(connID, &core.EventDeliveryResponse{ ID: event.ID, Rejected: false, @@ -501,12 +501,13 @@ func (wh *WebHooks) doDelivery(ctx context.Context, connID string, reply bool, s } func (wh *WebHooks) DeliveryRequest(ctx context.Context, connID string, sub *core.Subscription, event *core.EventDelivery, data core.DataArray) error { + ctx = log.WithLogField(ctx, "webhook", wh.connID) reply := sub.Options.TransportOptions().GetBool("reply") if reply && event.Message != nil && event.Message.Header.CID != nil { // We cowardly refuse to dispatch a message that is itself a reply, as it's hard for users to // avoid loops - and there's no way for us to detect here if a user has configured correctly // to avoid a loop. - log.L(wh.ctx).Debugf("Webhook subscription with reply enabled called with reply event '%s'", event.ID) + log.L(ctx).Debugf("Webhook subscription with reply enabled called with reply event '%s'", event.ID) if cb, ok := wh.callbacks.handlers[sub.Namespace]; ok { cb.DeliveryResponse(connID, &core.EventDeliveryResponse{ ID: event.ID, @@ -540,6 +541,7 @@ func (wh *WebHooks) DeliveryRequest(ctx context.Context, connID string, sub *cor } func (wh *WebHooks) BatchDeliveryRequest(ctx context.Context, connID string, sub *core.Subscription, events []*core.CombinedEventDataDelivery) error { + ctx = log.WithLogField(ctx, "webhook", wh.connID) reply := sub.Options.TransportOptions().GetBool("reply") if reply { nonReplyEvents := []*core.CombinedEventDataDelivery{} @@ -549,7 +551,7 @@ func (wh *WebHooks) BatchDeliveryRequest(ctx context.Context, connID string, sub // avoid loops - and there's no way for us to detect here if a user has configured correctly // to avoid a loop. if event.Message != nil && event.Message.Header.CID != nil { - log.L(wh.ctx).Debugf("Webhook subscription with reply enabled called with reply event '%s'", event.ID) + log.L(ctx).Debugf("Webhook subscription with reply enabled called with reply event '%s'", event.ID) if cb, ok := wh.callbacks.handlers[sub.Namespace]; ok { cb.DeliveryResponse(connID, &core.EventDeliveryResponse{ ID: event.ID, diff --git a/internal/events/webhooks/webhooks_test.go b/internal/events/webhooks/webhooks_test.go index 958e21ba64..5e7ba82d10 100644 --- a/internal/events/webhooks/webhooks_test.go +++ b/internal/events/webhooks/webhooks_test.go @@ -40,10 +40,12 @@ import ( "github.com/hyperledger/firefly-common/pkg/ffresty" "github.com/hyperledger/firefly-common/pkg/fftls" "github.com/hyperledger/firefly-common/pkg/fftypes" + fflog "github.com/hyperledger/firefly-common/pkg/log" "github.com/hyperledger/firefly/internal/coreconfig" "github.com/hyperledger/firefly/mocks/eventsmocks" "github.com/hyperledger/firefly/pkg/core" "github.com/hyperledger/firefly/pkg/events" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) @@ -470,14 +472,10 @@ func TestRequestWithBodyReplyEndToEndWithTLS(t *testing.T) { ctx, cancelCtx := context.WithCancel(context.Background()) go func() { - select { - case <-ctx.Done(): - shutdownContext, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - if err := server.Shutdown(shutdownContext); err != nil { - return - } - } + <-ctx.Done() + shutdownContext, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _ = server.Shutdown(shutdownContext) }() server.Handler = r @@ -1490,3 +1488,68 @@ func TestRequestWithBodyReplyEndToEndWithBatch(t *testing.T) { func TestFirstDataNeverNil(t *testing.T) { assert.NotNil(t, (&whPayload{}).firstData()) } + +// testHook captures logrus entries for assertions +type testHook struct{ entries []*logrus.Entry } + +func (h *testHook) Levels() []logrus.Level { return logrus.AllLevels } +func (h *testHook) Fire(e *logrus.Entry) error { + h.entries = append(h.entries, e) + return nil +} + +func TestLoggingContextPreserved(t *testing.T) { + wh, cancel := newTestWebHooks(t) + defer cancel() + + // Capture logs at debug level + logger := logrus.StandardLogger() + origHooks := logger.Hooks + hook := &testHook{} + logger.AddHook(hook) + logrus.SetLevel(logrus.DebugLevel) + defer logger.ReplaceHooks(origHooks) + + // Minimal HTTP server to exercise delivery path + r := mux.NewRouter() + r.HandleFunc("/ping", func(res http.ResponseWriter, req *http.Request) { + res.WriteHeader(200) + _, _ = res.Write([]byte(`ok`)) + }).Methods(http.MethodPost) + server := httptest.NewServer(r) + defer server.Close() + + sub := &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{Namespace: "ns1"}, + } + to := sub.Options.TransportOptions() + to["url"] = fmt.Sprintf("http://%s/ping", server.Listener.Addr()) + // Ensure we log via buildPayload/attemptRequest path + event := &core.EventDelivery{ + EnrichedEvent: core.EnrichedEvent{Event: core.Event{ID: fftypes.NewUUID()}}, + Subscription: core.SubscriptionRef{ID: sub.ID}, + } + + parentCtx := fflog.WithLogField(context.Background(), "httpReq", "req-123") + + // Expect the DeliveryResponse callback invoked along the non-reply path + mcb := wh.callbacks.handlers["ns1"].(*eventsmocks.Callbacks) + mcb.On("DeliveryResponse", mock.Anything, mock.MatchedBy(func(resp *core.EventDeliveryResponse) bool { + return !resp.Rejected + })).Return(nil) + + err := wh.DeliveryRequest(parentCtx, mock.Anything, sub, event, nil) + assert.NoError(t, err) + + // Find any log entry with the preserved fields + found := false + for _, e := range hook.entries { + if e.Data["httpReq"] == "req-123" && e.Data["webhook"] != nil { + found = true + break + } + } + assert.True(t, found, "expected log entry with preserved httpReq and webhook fields") + + mcb.AssertExpectations(t) +}