Skip to content

Conversation

pythonjsgo
Copy link

@pythonjsgo pythonjsgo commented Oct 2, 2025

Description

This PR adds support for returning Promise<Observable<MessageEvent>> from SSE handlers, enabling async validation and custom HTTP status codes before establishing the SSE connection.

Motivation

Fixes #12260

Currently, SSE handlers can only return Observable<MessageEvent>, which means HTTP headers are sent immediately. This prevents implementing common patterns like:

  • Returning 404 when a resource doesn't exist
  • Returning 401 for authentication failures
  • Setting custom status codes based on async validation
  • Setting headers based on async data

Changes

Core Implementation

  1. RouterResponseController.sse() - Now accepts Promise<Observable> in addition to Observable
  2. SseStream.pipe() - Accepts optional statusCode parameter
  3. RouterExecutionContext - Updated to await async SSE handlers

Key Features

  • ✅ Full backward compatibility (existing code works without changes)
  • ✅ Automatic detection of custom status codes via response.status()
  • ✅ Works with both Express and Fastify
  • ✅ Zero breaking changes

Example Usage

Resource validation with 404 response

@Sse(':gameId/events')
async gameEvents(
  @Param('gameId') gameId: string,
  @Res() response: Response,
): Promise<Observable<MessageEvent>> {
  // Async validation
  const game = await this.gameService.findById(gameId);
  
  if (!game) {
    response.status(404);
    throw new NotFoundException('Game not found');
  }
  
  return this.gameService.getGameEvents(gameId);
}

Authentication with 401 response

@Sse()
async streamNotifications(
  @Headers('authorization') authHeader: string,
  @Res() response: Response,
): Promise<Observable<MessageEvent>> {
  const user = await this.authService.validateToken(authHeader);
  
  if (!user) {
    response.status(401);
    throw new UnauthorizedException();
  }
  
  return this.notificationService.getUserNotifications(user.id);
}

Testing

Comprehensive tests added:

  • ✅ Promise support
  • ✅ Custom status code extraction
  • ✅ Custom status code in SseStream.pipe()
  • ✅ Backward compatibility
  • ✅ All existing tests pass

Technical Details

  • Uses Promise.resolve() for efficient handling of both sync and async cases
  • Status code extracted from response object before piping
  • Minimal performance overhead for existing handlers
  • Type-safe implementation

Checklist

  • Code follows project style guidelines
  • Tests added and passing
  • Non-breaking change
  • Documentation in PR description

Adds support for returning Promise<Observable<MessageEvent>> from SSE
handlers, enabling async validation before sending HTTP headers.

This allows developers to:
- Perform async validation before establishing SSE connection
- Return custom HTTP status codes (404, 401, etc.)
- Set headers based on async data
- Implement proper resource validation patterns

Implementation details:
- RouterResponseController.sse() now accepts Promise<Observable>
- Custom status codes extracted from response.statusCode
- SseStream.pipe() accepts optional statusCode parameter
- Full backward compatibility maintained

Fixes nestjs#12260
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds support for returning Promise<Observable<MessageEvent>> from SSE handlers, enabling async validation and custom HTTP status code handling before establishing SSE connections.

Key Changes:

  • SSE handlers can now return Promise<Observable> in addition to Observable, allowing async validation before streaming
  • Custom HTTP status codes can be set via response.status() and are automatically applied to SSE connections
  • Full backward compatibility maintained with existing synchronous handlers

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
packages/core/router/router-response-controller.ts Made sse() method async, added support for Promise<Observable> input, and implemented custom status code extraction from response object
packages/core/router/router-execution-context.ts Updated SSE handler wrapper to await the async responseController.sse() call
packages/core/router/sse-stream.ts Added optional statusCode parameter to pipe() method options
packages/core/test/router/router-response-controller.spec.ts Added tests for Promise<Observable> support and custom status code handling
packages/core/test/router/sse-stream.spec.ts Added tests verifying custom and default status code behavior in pipe()

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@kamilmysliwiec
Copy link
Member

image

@coveralls
Copy link

Pull Request Test Coverage Report for Build a5f50a3a-aaea-4bd2-9ff9-c0e49284cc52

Details

  • 10 of 10 (100.0%) changed or added relevant lines in 3 files are covered.
  • 1 unchanged line in 1 file lost coverage.
  • Overall coverage decreased (-0.007%) to 88.68%

Files with Coverage Reduction New Missed Lines %
packages/core/router/sse-stream.ts 1 96.88%
Totals Coverage Status
Change from base Build c69bbfb3-fc2c-445b-b2ee-985a95974f70: -0.007%
Covered Lines: 7278
Relevant Lines: 8207

💛 - Coveralls

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Allow sending custom headers and status codes on a SSE controller when an async flow is required

3 participants