Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ struct TEvPQ {
EvGetMLPConsumerStateRequest,
EvGetMLPConsumerStateResponse,
EvMLPConsumerUpdateConfig,
EvMLPDLQMoverResponse,
EvEnd
};

Expand Down Expand Up @@ -1593,6 +1594,20 @@ struct TEvPQ {
NKikimrPQ::TPQTabletConfig::TConsumer Config;
std::optional<TDuration> RetentionPeriod;
};

struct TEvMLPDLQMoverResponse : TEventLocal<TEvMLPDLQMoverResponse, EvMLPDLQMoverResponse> {

TEvMLPDLQMoverResponse(Ydb::StatusIds::StatusCode status, std::vector<ui64>&& movedMessages, TString&& errorDescription = "")
: Status(status)
, MovedMessages(std::move(movedMessages))
, ErrorDescription(std::move(errorDescription))
{
}

Ydb::StatusIds::StatusCode Status;
std::vector<ui64> MovedMessages;
TString ErrorDescription;
};
};

} //NKikimr
1 change: 1 addition & 0 deletions ydb/core/persqueue/pqtablet/partition/mlp/mlp.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace NKikimr::NPQ::NMLP {

// MLP не работает если включена компактифкация по ключу!!! (иначе не понятно как прореживать скомпакченные значения)
NActors::IActor* CreateConsumerActor(
const TString& database,
ui64 tabletId,
const NActors::TActorId& tabletActorId,
ui32 partitionId,
Expand Down
20 changes: 19 additions & 1 deletion ydb/core/persqueue/pqtablet/partition/mlp/mlp_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ std::unique_ptr<TEvPQ::TEvRead> MakeEvRead(
}

std::unique_ptr<TEvPQ::TEvSetClientInfo> MakeEvCommit(
const NKikimrPQ::TPQTabletConfig::TConsumer consumer,
const NKikimrPQ::TPQTabletConfig::TConsumer& consumer,
ui64 offset,
ui64 cookie
) {
Expand All @@ -48,6 +48,24 @@ std::unique_ptr<TEvPQ::TEvSetClientInfo> MakeEvCommit(
);
}

std::unique_ptr<TEvPersQueue::TEvHasDataInfo> MakeEvHasData(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Зачем такая подпись? Проще же не подписываться, а со стороны партиции EndOffset на каждый чих пушить в MLP консумер-актор.

const TActorId& selfId,
ui32 partitionId,
ui64 offset,
const NKikimrPQ::TPQTabletConfig::TConsumer& consumer
) {

auto result = std::make_unique<TEvPersQueue::TEvHasDataInfo>();
auto& record = result->Record;
record.SetPartition(partitionId);
record.SetOffset(offset);
record.SetDeadline(TDuration::Seconds(5).MilliSeconds());
ActorIdToProto(selfId, record.MutableSender());
record.SetClientId(consumer.GetName());

return result;
}

bool IsSucess(const TEvPQ::TEvProxyResponse::TPtr& ev) {
return ev->Get()->Response->GetStatus() == NMsgBusProxy::MSTATUS_OK &&
ev->Get()->Response->GetErrorCode() == NPersQueue::NErrorCode::OK;
Expand Down
28 changes: 27 additions & 1 deletion ydb/core/persqueue/pqtablet/partition/mlp/mlp_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,39 @@ std::unique_ptr<TEvPQ::TEvRead> MakeEvRead(
);

std::unique_ptr<TEvPQ::TEvSetClientInfo> MakeEvCommit(
const NKikimrPQ::TPQTabletConfig::TConsumer consumer,
const NKikimrPQ::TPQTabletConfig::TConsumer& consumer,
ui64 offset,
ui64 cookie = 0
);

std::unique_ptr<TEvPersQueue::TEvHasDataInfo> MakeEvHasData(
const TActorId& selfId,
ui32 partitionId,
ui64 offset,
const NKikimrPQ::TPQTabletConfig::TConsumer& consumer
);

bool IsSucess(const TEvPQ::TEvProxyResponse::TPtr& ev);
bool IsSucess(const TEvPersQueue::TEvResponse::TPtr& ev);
ui64 GetCookie(const TEvPQ::TEvProxyResponse::TPtr& ev);

NActors::IActor* CreateMessageEnricher(ui64 tabletId,
const ui32 partitionId,
const TString& consumerName,
std::deque<TReadResult>&& replies);

struct TDLQMoverSettings {
TActorId ParentActorId;
TString Database;
ui64 TabletId;
ui32 PartitionId;
TString ConsumerName;
ui64 ConsumerGeneration;
TString DestinationTopic;
ui64 FirstMessageSeqNo;
std::deque<ui64> Messages;
};

NActors::IActor* CreateDLQMover(TDLQMoverSettings&& settings);

} // namespace NKikimr::NPQ::NMLP
105 changes: 95 additions & 10 deletions ydb/core/persqueue/pqtablet/partition/mlp/mlp_consumer.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#include "mlp_consumer.h"
#include "mlp_message_enricher.h"
#include "mlp_storage.h"

#include <ydb/core/persqueue/common/key.h>
Expand Down Expand Up @@ -82,10 +81,11 @@ void AddReadWAL(std::unique_ptr<TEvKeyValue::TEvRequest>& request, ui32 partitio
readWAL->SetIncludeData(true);
}

TConsumerActor::TConsumerActor(ui64 tabletId, const TActorId& tabletActorId, ui32 partitionId,
TConsumerActor::TConsumerActor(const TString& database,ui64 tabletId, const TActorId& tabletActorId, ui32 partitionId,
const TActorId& partitionActorId, const NKikimrPQ::TPQTabletConfig_TConsumer& config,
std::optional<TDuration> retentionPeriod)
: TBaseTabletActor(tabletId, tabletActorId, NKikimrServices::EServiceKikimr::PQ_MLP_CONSUMER)
, Database(database)
, PartitionId(partitionId)
, PartitionActorId(partitionActorId)
, Config(config)
Expand Down Expand Up @@ -122,6 +122,10 @@ void TConsumerActor::PassAway() {
ReplyErrorAll(SelfId(), UnlockRequestsQueue);
ReplyErrorAll(SelfId(), ChangeMessageDeadlineRequestsQueue);

if (DLQMoverActorId) {
Send(DLQMoverActorId, new TEvents::TEvPoison());
}

TBase::PassAway();
}

Expand Down Expand Up @@ -202,6 +206,7 @@ void TConsumerActor::HandleOnInit(TEvKeyValue::TEvResponse::TPtr& ev) {
LOG_D("Read snapshot");
HasSnapshot = true;
LastWALIndex = snapshot.GetWALIndex();
DLQMovedMessageCount = snapshot.GetMeta().GetDLQMovedMessages();
Storage->Initialize(snapshot);
} else {
LOG_W("Received snapshot from old consumer generation: " << Config.GetGeneration() << " vs " << snapshot.GetConfiguration().GetGeneration());
Expand Down Expand Up @@ -238,6 +243,7 @@ void TConsumerActor::HandleOnInit(TEvKeyValue::TEvResponse::TPtr& ev) {
if (Config.GetGeneration() == wal.GetGeneration()) {
LOG_D("Read WAL " << w.key());
LastWALIndex = wal.GetWALIndex();
DLQMovedMessageCount = wal.GetDLQMovedMessages();
Storage->ApplyWAL(wal);
} else {
LOG_W("Received snapshot from old consumer generation: " << Config.GetGeneration() << " vs " << wal.GetGeneration());
Expand Down Expand Up @@ -311,7 +317,7 @@ void TConsumerActor::Handle(TEvKeyValue::TEvResponse::TPtr& ev) {

if (!PendingReadQueue.empty()) {
auto msgs = std::exchange(PendingReadQueue, {});
RegisterWithSameMailbox(new TMessageEnricherActor(TabletActorId, PartitionId, Config.GetName(), std::move(msgs)));
RegisterWithSameMailbox(CreateMessageEnricher(TabletId, PartitionId, Config.GetName(), std::move(msgs)));
}
ReplyOk<TEvPQ::TEvMLPCommitResponse>(SelfId(), PendingCommitQueue);
ReplyOk<TEvPQ::TEvMLPUnlockResponse>(SelfId(), PendingUnlockQueue);
Expand All @@ -337,6 +343,11 @@ void TConsumerActor::UpdateStorageConfig() {
Storage->SetKeepMessageOrder(Config.GetKeepMessageOrder());
Storage->SetMaxMessageProcessingCount(Config.GetMaxProcessingAttempts());
Storage->SetRetentionPeriod(RetentionPeriod);
if (Config.GetDeadLetterPolicyEnabled() && Config.GetDeadLetterPolicy() != NKikimrPQ::TPQTabletConfig::DEAD_LETTER_POLICY_UNSPECIFIED) {
Storage->SetDeadLetterPolicy(Config.GetDeadLetterPolicy());
} else {
Storage->SetDeadLetterPolicy(std::nullopt);
}
}

void TConsumerActor::Handle(TEvPQ::TEvMLPConsumerUpdateConfig::TPtr& ev) {
Expand Down Expand Up @@ -366,6 +377,10 @@ void TConsumerActor::Handle(TEvPQ::TEvGetMLPConsumerStateRequest::TPtr& ev) {
Send(ev->Sender, std::move(response), 0, ev->Cookie);
}

void TConsumerActor::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr&) {
FirstPipeCacheRequest = true;
}

STFUNC(TConsumerActor::StateInit) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvPQ::TEvMLPReadRequest, Queue);
Expand Down Expand Up @@ -395,7 +410,10 @@ STFUNC(TConsumerActor::StateWork) {
hFunc(TEvPQ::TEvGetMLPConsumerStateRequest, Handle);
hFunc(TEvKeyValue::TEvResponse, Handle);
hFunc(TEvPQ::TEvProxyResponse, Handle);
hFunc(TEvPersQueue::TEvHasDataInfoResponse, Handle);
hFunc(TEvPQ::TEvError, Handle);
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
hFunc(TEvPQ::TEvMLPDLQMoverResponse, Handle);
hFunc(TEvents::TEvWakeup, HandleOnWork);
sFunc(TEvents::TEvPoison, PassAway);
default:
Expand All @@ -414,7 +432,10 @@ STFUNC(TConsumerActor::StateWrite) {
hFunc(TEvPQ::TEvGetMLPConsumerStateRequest, Handle);
hFunc(TEvKeyValue::TEvResponse, Handle);
hFunc(TEvPQ::TEvProxyResponse, Handle);
hFunc(TEvPersQueue::TEvHasDataInfoResponse, Handle);
hFunc(TEvPQ::TEvError, Handle);
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
hFunc(TEvPQ::TEvMLPDLQMoverResponse, Handle);
hFunc(TEvents::TEvWakeup, Handle);
sFunc(TEvents::TEvPoison, PassAway);
default:
Expand Down Expand Up @@ -533,6 +554,7 @@ void TConsumerActor::Persist() {

NKikimrPQ::TMLPStorageWAL wal;
wal.SetWALIndex(LastWALIndex);
wal.SetDLQMovedMessages(DLQMovedMessageCount);
batch.SerializeTo(wal);

auto data = wal.SerializeAsString();
Expand All @@ -559,6 +581,7 @@ void TConsumerActor::Persist() {
Storage->SerializeTo(snapshot);

snapshot.SetWALIndex(LastWALIndex);
snapshot.MutableMeta()->SetDLQMovedMessages(DLQMovedMessageCount);

auto request = std::make_unique<TEvKeyValue::TEvRequest>();

Expand All @@ -583,6 +606,17 @@ void TConsumerActor::Persist() {
}
}

size_t TConsumerActor::RequiredToFetchMessageCount() const {
auto& metrics = Storage->GetMetrics();

auto maxMessages = Storage->MinMessages;
if (metrics.LockedMessageCount * 2 > metrics.UnprocessedMessageCount) {
maxMessages = std::max<size_t>(maxMessages, metrics.LockedMessageCount * 2 - metrics.UnprocessedMessageCount);
}

return std::min(maxMessages, Storage->MaxMessages - metrics.InflyMessageCount);
}

bool TConsumerActor::FetchMessagesIfNeeded() {
if (FetchInProgress) {
return false;
Expand All @@ -602,12 +636,7 @@ bool TConsumerActor::FetchMessagesIfNeeded() {

FetchInProgress = true;

auto maxMessages = Storage->MinMessages;
if (metrics.LockedMessageCount * 2 > metrics.UnprocessedMessageCount) {
maxMessages = std::max<size_t>(maxMessages, metrics.LockedMessageCount * 2 - metrics.UnprocessedMessageCount);
}
maxMessages = std::min(maxMessages, Storage->MaxMessages - metrics.InflyMessageCount);

auto maxMessages = RequiredToFetchMessageCount();
LOG_D("Fetching " << maxMessages << " messages from offset " << Storage->GetLastOffset() << " from " << PartitionActorId);
Send(PartitionActorId, MakeEvRead(SelfId(), Config.GetName(), Storage->GetLastOffset(), maxMessages, ++FetchCookie));

Expand Down Expand Up @@ -665,32 +694,88 @@ void TConsumerActor::Handle(TEvPQ::TEvProxyResponse::TPtr& ev) {
if (CurrentStateFunc() == &TConsumerActor::StateWork) {
ProcessEventQueue();
}

if (!HasDataInProgress && RequiredToFetchMessageCount()) {
HasDataInProgress = true;
auto request = MakeEvHasData(SelfId(), PartitionId, Storage->GetLastOffset(), Config);
LOG_D("Subscribing to data: " << request->Record.ShortDebugString());
SendToPQTablet(std::move(request));
}
}
}

void TConsumerActor::Handle(TEvPersQueue::TEvHasDataInfoResponse::TPtr&) {
LOG_D("Handle TEvPersQueue::TEvHasDataInfo");
FetchMessagesIfNeeded();
}

void TConsumerActor::Handle(TEvPQ::TEvError::TPtr& ev) {
Restart(TStringBuilder() << "Received error: " << ev->Get()->Error);
}

void TConsumerActor::HandleOnWork(TEvents::TEvWakeup::TPtr&) {
FetchMessagesIfNeeded();
ProcessEventQueue();
MoveToDLQIfPossible();
Schedule(WakeupInterval, new TEvents::TEvWakeup());
}

void TConsumerActor::MoveToDLQIfPossible() {
if (!DLQMoverActorId && !Storage->GetDLQMessages().empty()) {
std::deque<ui64> messages(Storage->GetDLQMessages());
DLQMoverActorId = RegisterWithSameMailbox(CreateDLQMover({
.ParentActorId = SelfId(),
.Database = Database,
.TabletId = TabletId,
.PartitionId = PartitionId,
.ConsumerName = Config.GetName(),
.ConsumerGeneration = Config.GetGeneration(),
.DestinationTopic = Config.GetDeadLetterQueue(),
.FirstMessageSeqNo = DLQMovedMessageCount + 1,
.Messages = std::move(messages)
}));
}
}

void TConsumerActor::Handle(TEvPQ::TEvMLPDLQMoverResponse::TPtr& ev) {
LOG_D("Handle TEvPQ::TEvMLPDLQMoverResponse");

if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
LOG_W("Error moving messages to the DLQ: " << ev->Get()->ErrorDescription);
}

auto& moved = ev->Get()->MovedMessages;
LOG_D("Moved to the DLQ: " << JoinRange(", ", moved.begin(), moved.end()));

DLQMoverActorId = {};
for (auto offset : moved) {
AFL_ENSURE(Storage->MarkDLQMoved(offset))("o", offset);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Вот так писать нельзя никогда

}

DLQMovedMessageCount += moved.size();
}

void TConsumerActor::Handle(TEvents::TEvWakeup::TPtr&) {
LOG_D("Handle TEvents::TEvWakeup");
MoveToDLQIfPossible();
Schedule(WakeupInterval, new TEvents::TEvWakeup());
}

void TConsumerActor::SendToPQTablet(std::unique_ptr<IEventBase> ev) {
auto forward = std::make_unique<TEvPipeCache::TEvForward>(ev.release(), TabletId, FirstPipeCacheRequest, 1);
Send(MakePipePerNodeCacheID(false), forward.release(), IEventHandle::FlagTrackDelivery);
FirstPipeCacheRequest = false;
}

NActors::IActor* CreateConsumerActor(
const TString& database,
ui64 tabletId,
const NActors::TActorId& tabletActorId,
ui32 partitionId,
const NActors::TActorId& partitionActorId,
const NKikimrPQ::TPQTabletConfig_TConsumer& config,
const std::optional<TDuration> reteintion) {
return new TConsumerActor(tabletId, tabletActorId, partitionId, partitionActorId, config, reteintion);
return new TConsumerActor(database, tabletId, tabletActorId, partitionId, partitionActorId, config, reteintion);
}

}
Loading
Loading