-
Notifications
You must be signed in to change notification settings - Fork 728
DLQ policy for MLP consumer #28573
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DLQ policy for MLP consumer #28573
Conversation
|
⚪ ⚪ Ya make output | Test bloat | Test bloat
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
|
⚪
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements a Dead Letter Queue (DLQ) policy for the MLP (Message Lock Processing) consumer, enabling messages that exceed the maximum processing attempts to be moved to a designated DLQ topic.
Key changes:
- Added support for configurable DLQ policies (MOVE, DELETE, UNSPECIFIED) allowing different handling of messages that reach maximum processing attempts
- Implemented
TDLQMoverActorto move failed messages to a separate DLQ topic - Enhanced storage serialization to track DLQ state and deleted messages
Reviewed Changes
Copilot reviewed 24 out of 24 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| ydb/library/services/services.proto | Added new service identifier for DLQ mover component |
| ydb/core/protos/pqconfig.proto | Added WAL field to track messages deleted from DLQ |
| ydb/core/persqueue/writer/common.h | Enhanced error logging with error reason details |
| ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.h | Added DLQ policy configuration and changed message structure to ui32 fields |
| ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.cpp | Implemented DLQ policy handling logic and message processing limits |
| ydb/core/persqueue/pqtablet/partition/mlp/mlp_dlq_mover.cpp | New actor implementation for moving messages to DLQ topic |
| ydb/core/persqueue/pqtablet/partition/mlp/mlp_consumer.cpp | Integrated DLQ mover into consumer workflow |
| ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp | Added test coverage for DLQ policies |
| ydb/core/persqueue/pqtablet/partition/mlp/mlp_dlq_mover_ut.cpp | New integration tests for DLQ moving functionality |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| if (!HasDataInProgress && RequireInflyMessageCount()) { | ||
| HasDataInProgress = true; | ||
| Send(TabletActorId, MakeEvHasData(SelfId(), PartitionId,Storage->GetLastOffset(), Config)); |
Copilot
AI
Nov 10, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing space after comma between 'PartitionId' and 'Storage->GetLastOffset()'. Should be PartitionId, Storage->GetLastOffset() for consistent formatting.
| Send(TabletActorId, MakeEvHasData(SelfId(), PartitionId,Storage->GetLastOffset(), Config)); | |
| Send(TabletActorId, MakeEvHasData(SelfId(), PartitionId, Storage->GetLastOffset(), Config)); |
|
🟢 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 25 out of 25 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| #TIMEOUT(30) | ||
|
|
Copilot
AI
Nov 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Commented-out TIMEOUT directive should either be removed or uncommented. Leaving commented-out configuration in build files can cause confusion.
| #TIMEOUT(30) |
|
⚪ ⚪
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
| ui64 DeadlineDelta: 16 = 0; | ||
| ui64 HasMessageGroupId: 1 = false; | ||
| ui32 DeadlineDelta: 16 = 0; | ||
| ui32 HasMessageGroupId: 1 = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
а почему не
| ui32 HasMessageGroupId: 1 = false; | |
| bool HasMessageGroupId: 1 = false; |
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
не, пусть остается ui32 т.к. тут битовая маска. так проще считать куда сколько бит потрачено. а сделаешь bool, а оно еще выравнивание поломает
|
⚪
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
|
⚪ ⚪ Ya make output | Test bloat | Test bloat
⚪ Ya make output | Test bloat | Test bloat | Test bloat
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
|
⚪ ⚪ Ya make output | Test bloat | Test bloat
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
|
⚪
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
| ); | ||
| } | ||
|
|
||
| std::unique_ptr<TEvPersQueue::TEvHasDataInfo> MakeEvHasData( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Зачем такая подпись? Проще же не подписываться, а со стороны партиции EndOffset на каждый чих пушить в MLP консумер-актор.
|
|
||
| DLQMoverActorId = {}; | ||
| for (auto offset : moved) { | ||
| AFL_ENSURE(Storage->MarkDLQMoved(offset))("o", offset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Вот так писать нельзя никогда
| read->SetTimeoutMs(0); | ||
|
|
||
| Send(TabletActorId, std::move(request), 0, ++Cookie); | ||
| SendToPQTablet(std::move(request)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Зачем тут через пайпу, это же локальный актор?
Changelog entry
...
Changelog category
Description for reviewers
...