Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions ydb/core/persqueue/pqtablet/partition/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,9 @@ TConsumerSnapshot TPartition::CreateSnapshot(TUserInfo& userInfo) const {
if (userInfo.Offset >= static_cast<i64>(GetEndOffset())) {
result.LastCommittedMessage.CreateTimestamp = now;
result.LastCommittedMessage.WriteTimestamp = now;
} else if (userInfo.ActualTimestamps) {
result.LastCommittedMessage.CreateTimestamp = userInfo.CreateTimestamp;
result.LastCommittedMessage.WriteTimestamp = userInfo.WriteTimestamp;
} else {
auto timestamp = GetWriteTimeEstimate(userInfo.Offset);
result.LastCommittedMessage.CreateTimestamp = timestamp;
Expand Down Expand Up @@ -932,7 +935,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
NKikimrPQ::TClientInfo* clientInfo = result.MutableLagsInfo();
clientInfo->SetClientId(userInfo.User);

auto snapshot = CreateSnapshot(userInfo);
const auto snapshot = CreateSnapshot(userInfo);

auto write = clientInfo->MutableWritePosition();
write->SetOffset(userInfo.Offset);
Expand Down Expand Up @@ -962,7 +965,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
}

if (ev->Get()->GetStatForAllConsumers) { //fill lags
auto snapshot = CreateSnapshot(userInfo);
const auto snapshot = CreateSnapshot(userInfo);

auto* clientInfo = result.AddConsumerResult();
clientInfo->SetConsumer(userInfo.User);
Expand Down Expand Up @@ -1103,7 +1106,7 @@ void TPartition::Handle(TEvPQ::TEvGetPartitionClientInfo::TPtr& ev, const TActor
result.SetEndOffset(GetEndOffset());
result.SetResponseTimestamp(ctx.Now().MilliSeconds());
for (auto&& pr : UsersInfoStorage->GetAll()) {
auto snapshot = CreateSnapshot(pr.second);
const auto snapshot = CreateSnapshot(pr.second);

const TUserInfo& userInfo(pr.second);
NKikimrPQ::TClientInfo& clientInfo = *result.AddClientInfo();
Expand Down Expand Up @@ -1889,7 +1892,7 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important)
continue;
bool haveChanges = false;
auto snapshot = CreateSnapshot(userInfo);
const auto snapshot = CreateSnapshot(userInfo);

auto ts = snapshot.LastCommittedMessage.WriteTimestamp.MilliSeconds();
if (ts < MIN_TIMESTAMP_MS) {
Expand Down
25 changes: 18 additions & 7 deletions ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Y_UNIT_TEST_SUITE(WithSDK) {
return setup.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER);
};

std::deque<TString> messagesTextQueue;
auto write = [&](size_t seqNo) {
TTopicClient client(setup.MakeDriver());

Expand All @@ -49,9 +50,11 @@ Y_UNIT_TEST_SUITE(WithSDK) {

TString msgTxt = TStringBuilder() << "message_" << seqNo;
TWriteMessage msg(msgTxt);
msg.CreateTimestamp(TInstant::Now() - TDuration::Seconds(10 - seqNo));
constexpr size_t maxSeqNo = 10;
Y_ASSERT(seqNo <= maxSeqNo);
msg.CreateTimestamp(TInstant::Now() - TDuration::Seconds(maxSeqNo - seqNo));
UNIT_ASSERT(session->Write(std::move(msg)));

messagesTextQueue.push_back(msgTxt);
session->Close(TDuration::Seconds(5));
};

Expand All @@ -78,7 +81,7 @@ Y_UNIT_TEST_SUITE(WithSDK) {
Sleep(TDuration::Seconds(2));
write(7);
Sleep(TDuration::Seconds(2));
write(11);
write(10);

Cerr << ">>>>> Check describe for topic which contains messages, but consumer hasn`t read\n";
{
Expand All @@ -100,6 +103,7 @@ Y_UNIT_TEST_SUITE(WithSDK) {
}

UNIT_ASSERT(setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1).IsSuccess());
messagesTextQueue.pop_front();

Cerr << ">>>>> Check describe for topic whis contains messages, has commited offset but hasn`t read (restart tablet for example)\n";
{
Expand All @@ -115,7 +119,7 @@ Y_UNIT_TEST_SUITE(WithSDK) {
UNIT_ASSERT_VALUES_EQUAL(1, c->GetCommittedOffset());
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag());
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag());
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(4), c->GetMaxCommittedTimeLag());
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(2), c->GetMaxCommittedTimeLag());
UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3)); // why not zero?
UNIT_ASSERT_VALUES_EQUAL(1, c->GetLastReadOffset());
}
Expand All @@ -135,8 +139,15 @@ Y_UNIT_TEST_SUITE(WithSDK) {
Cerr << ">>>>> Event = " << e->index() << Endl << Flush;
}
if (e && std::holds_alternative<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(e.value())) {
// we must recive only one date event with second message
break;
for (const auto& message : std::get<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(e.value()).GetMessages()) {
UNIT_ASSERT(!messagesTextQueue.empty());
UNIT_ASSERT_VALUES_EQUAL(message.GetData(), messagesTextQueue.front());
messagesTextQueue.pop_front();
}
if (messagesTextQueue.empty()) {
// we must receive data events for all messages except the first one
break;
}
} else if (e && std::holds_alternative<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(e.value())) {
std::get<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(e.value()).Confirm();
}
Expand All @@ -160,7 +171,7 @@ Y_UNIT_TEST_SUITE(WithSDK) {
UNIT_ASSERT_VALUES_EQUAL(1, c->GetCommittedOffset());
//UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag());
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag());
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(4), c->GetMaxCommittedTimeLag());
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(2), c->GetMaxCommittedTimeLag());
UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3));
UNIT_ASSERT_VALUES_EQUAL(3, c->GetLastReadOffset());
}
Expand Down
Loading