Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ydb/library/actors/core/event.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ namespace NActors {
}
virtual ui32 Type() const = 0;
virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0;
virtual std::optional<TRope> SerializeToRope(std::function<TRcBuf(ui32 size)>) const {
return std::nullopt;
}
virtual bool IsSerializable() const = 0;
virtual ui32 CalculateSerializedSizeCached() const {
return CalculateSerializedSize();
Expand Down
1 change: 1 addition & 0 deletions ydb/library/actors/core/event_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ namespace NActors {
size_t Tailroom = 0; // tailroom for the chunk
size_t Alignment = 0; // required alignment
bool IsInline = false; // if true, goes through ordinary channel
bool IsRdmaCapable = false; // if true, could go through RDMA
};

struct TEventSerializationInfo {
Expand Down
135 changes: 103 additions & 32 deletions ydb/library/actors/core/event_pb.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#include "event_pb.h"

#include <ydb/library/actors/interconnect/rdma/mem_pool.h>
#include <ydb/library/actors/protos/interconnect.pb.h>

namespace NActors {
TString EventPBBaseToString(const TString& header, const TString& dbgStr) {
TString res;
Expand Down Expand Up @@ -264,26 +267,9 @@ namespace NActors {
return res;
}

bool SerializeToArcadiaStreamImpl(TChunkSerializer* chunker, const TVector<TRope> &payload) {
// serialize payload first
template<typename TCb>
bool SerializeHeaderCommon(const TVector<TRope>& payload, TCb& append) {
if (payload) {
void *data;
int size = 0;
auto append = [&](const char *p, size_t len) {
while (len) {
if (size) {
const size_t numBytesToCopy = std::min<size_t>(size, len);
memcpy(data, p, numBytesToCopy);
data = static_cast<char*>(data) + numBytesToCopy;
size -= numBytesToCopy;
p += numBytesToCopy;
len -= numBytesToCopy;
} else if (!chunker->Next(&data, &size)) {
return false;
}
}
return true;
};
auto appendNumber = [&](size_t number) {
char buf[MaxNumberBytes];
return append(buf, SerializeNumber(number, buf));
Expand All @@ -299,19 +285,86 @@ namespace NActors {
return false;
}
}
if (size) {
chunker->BackUp(std::exchange(size, 0));
}

return true;
}

bool SerializePayloadCommon(const TVector<TRope> &payload, std::function<bool(TRope)> append) {
for (const TRope& rope : payload) {
if (!append(rope)) {
return false;
}
for (const TRope& rope : payload) {
if (!chunker->WriteRope(&rope)) {
}
return true;
}

bool SerializeToArcadiaStreamImpl(TChunkSerializer* chunker, const TVector<TRope> &payload) {
// serialize payload first
void *data;
int size = 0;
auto append = [&](const char *p, size_t len) {
while (len) {
if (size) {
const size_t numBytesToCopy = std::min<size_t>(size, len);
memcpy(data, p, numBytesToCopy);
data = static_cast<char*>(data) + numBytesToCopy;
size -= numBytesToCopy;
p += numBytesToCopy;
len -= numBytesToCopy;
} else if (!chunker->Next(&data, &size)) {
return false;
}
}
return true;
};
if (!SerializeHeaderCommon(payload, append)) {
return false;
}
if (size) {
chunker->BackUp(std::exchange(size, 0));
}

auto appendRope = [&](TRope rope) {
if (!chunker->WriteRope(&rope)) {
return false;
}
return true;
};
if (!SerializePayloadCommon(payload, appendRope)) {
return false;
}

return true;
}

std::optional<TRope> SerializeToRopeImpl(std::function<TRcBuf(ui32 size)> alloc, const TVector<TRope> &payload) {
TRope result;
auto sz = CalculateSerializedHeaderSizeImpl(payload);
if (!sz) {
return result;
}
TRcBuf headerBuf = alloc(sz);
if (!headerBuf) {
return {};
}
char* data = headerBuf.GetDataMut();
auto append = [&data](const char *p, size_t len) {
std::memcpy(data, p, len);
data += len;
return true;
};
SerializeHeaderCommon(payload, append);
result.Insert(result.End(), headerBuf);

auto appendRope = [&](TRope rope) {
result.Insert(result.End(), std::move(rope));
return true;
};
SerializePayloadCommon(payload, appendRope);

return result;
}

void ParseExtendedFormatPayload(TRope::TConstIterator &iter, size_t &size, TVector<TRope> &payload, size_t &totalPayloadSize)
{
Expand Down Expand Up @@ -361,23 +414,41 @@ namespace NActors {
}
}

ui32 CalculateSerializedSizeImpl(const TVector<TRope> &payload, ssize_t recordSize) {
ssize_t result = recordSize;
if (result >= 0 && payload) {
ui32 CalculateSerializedHeaderSizeImpl(const TVector<TRope> &payload) {
ui32 result = 0;
if (payload) {
++result; // marker
char buf[MaxNumberBytes];
result += SerializeNumber(payload.size(), buf);
size_t totalPayloadSize = 0;
for (const TRope& rope : payload) {
size_t ropeSize = rope.GetSize();
totalPayloadSize += ropeSize;
result += SerializeNumber(ropeSize, buf);
}
result += totalPayloadSize;
}
return result;
}

ui32 CalculateSerializedSizeImpl(const TVector<TRope> &payload, ssize_t recordSize) {
ssize_t result = recordSize;
if (result >= 0 && payload) {
result += CalculateSerializedHeaderSizeImpl(payload);
for (const TRope& rope : payload) {
result += rope.GetSize();
}
}
return result;
}

bool IsRdma(const TRope &rope) {
for (auto it = rope.Begin(); it != rope.End(); ++it) {
const TRcBuf& chunk = it.GetChunk();
if (NInterconnect::NRdma::TryExtractFromRcBuf(chunk).Empty()) {
return false;
}
}
return true;
}

TEventSerializationInfo CreateSerializationInfoImpl(size_t preserializedSize, bool allowExternalDataChannel, const TVector<TRope> &payload, ssize_t recordSize) {
TEventSerializationInfo info;
info.IsExtendedFormat = static_cast<bool>(payload);
Expand All @@ -389,14 +460,14 @@ namespace NActors {
for (const TRope& rope : payload) {
headerLen += SerializeNumber(rope.size(), temp);
}
info.Sections.push_back(TEventSectionInfo{0, headerLen, 0, 0, true});
info.Sections.push_back(TEventSectionInfo{0, headerLen, 0, 0, true, true});
for (const TRope& rope : payload) {
info.Sections.push_back(TEventSectionInfo{0, rope.size(), 0, 0, false});
info.Sections.push_back(TEventSectionInfo{0, rope.size(), 0, 0, false, IsRdma(rope)});
}
}

const size_t byteSize = Max<ssize_t>(0, recordSize) + preserializedSize;
info.Sections.push_back(TEventSectionInfo{0, byteSize, 0, 0, true}); // protobuf itself
info.Sections.push_back(TEventSectionInfo{0, byteSize, 0, 0, true, true}); // protobuf itself

#ifndef NDEBUG
size_t total = 0;
Expand Down
19 changes: 19 additions & 0 deletions ydb/library/actors/core/event_pb.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ namespace NActors {
~TCoroutineChunkSerializer();

void SetSerializingEvent(const IEventBase *event);
void DiscardEvent() { Event = nullptr; };
void Abort();
std::span<TChunk> FeedBuf(void* data, size_t size);
bool IsComplete() const {
Expand Down Expand Up @@ -150,6 +151,8 @@ namespace NActors {

void ParseExtendedFormatPayload(TRope::TConstIterator &iter, size_t &size, TVector<TRope> &payload, size_t &totalPayloadSize);
bool SerializeToArcadiaStreamImpl(TChunkSerializer* chunker, const TVector<TRope> &payload);
ui32 CalculateSerializedHeaderSizeImpl(const TVector<TRope> &payload);
std::optional<TRope> SerializeToRopeImpl(std::function<TRcBuf(ui32 size)> alloc, const TVector<TRope> &payload);
ui32 CalculateSerializedSizeImpl(const TVector<TRope> &payload, ssize_t recordSize);
TEventSerializationInfo CreateSerializationInfoImpl(size_t preserializedSize, bool allowExternalDataChannel, const TVector<TRope> &payload, ssize_t recordSize);

Expand Down Expand Up @@ -202,6 +205,22 @@ namespace NActors {
return CalculateSerializedSizeImpl(Payload, Record.ByteSize());
}

std::optional<TRope> SerializeToRope(std::function<TRcBuf(ui32 size)> alloc) const override {
std::optional<TRope> result = SerializeToRopeImpl(alloc, Payload);
if (!result) {
return {};
}
ui32 size = Record.ByteSizeLong();
TRcBuf recordsSerializedBuf = alloc(size);
if (!recordsSerializedBuf) {
return {};
}
bool serializationDone = Record.SerializePartialToArray(recordsSerializedBuf.GetDataMut(), size);
Y_ABORT_UNLESS(serializationDone);
result->Insert(result->End(), std::move(recordsSerializedBuf));
return result;
}

static TEv* Load(const TEventSerializedData *input) {
THolder<TEv> holder(new TEv());
TEventPBBase* ev = holder.Get();
Expand Down
8 changes: 5 additions & 3 deletions ydb/library/actors/interconnect/channel_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace NActors {
std::shared_ptr<IInterconnectMetrics> Metrics;
const ui32 MaxSerializedEventSize;
const TSessionParams Params;
std::shared_ptr<NInterconnect::NRdma::IMemPool> RdmaMemPool;

struct THeapItem {
TEventOutputChannel *Channel;
Expand All @@ -29,11 +30,12 @@ namespace NActors {
public:
TChannelScheduler(ui32 peerNodeId, const TChannelsConfig& predefinedChannels,
std::shared_ptr<IInterconnectMetrics> metrics, ui32 maxSerializedEventSize,
TSessionParams params)
TSessionParams params, std::shared_ptr<NInterconnect::NRdma::IMemPool> rdmaMemPool)
: PeerNodeId(peerNodeId)
, Metrics(std::move(metrics))
, MaxSerializedEventSize(maxSerializedEventSize)
, Params(std::move(params))
, RdmaMemPool(std::move(rdmaMemPool))
{
for (const auto& item : predefinedChannels) {
GetOutputChannel(item.first);
Expand Down Expand Up @@ -71,15 +73,15 @@ namespace NActors {
auto& res = ChannelArray[channel];
if (Y_UNLIKELY(!res)) {
res.emplace(channel, PeerNodeId, MaxSerializedEventSize, Metrics,
Params);
Params, RdmaMemPool);
}
return *res;
} else {
auto it = ChannelMap.find(channel);
if (Y_UNLIKELY(it == ChannelMap.end())) {
it = ChannelMap.emplace(std::piecewise_construct, std::forward_as_tuple(channel),
std::forward_as_tuple(channel, PeerNodeId, MaxSerializedEventSize,
Metrics, Params)).first;
Metrics, Params, RdmaMemPool)).first;
}
return it->second;
}
Expand Down
Loading
Loading