Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/mscclpp/core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ class Context : public std::enable_shared_from_this<Context> {
friend class Endpoint;
friend class Connection;
friend class RegisteredMemory;
friend class SemaphoreStub;
};

/// Block of memory that has been registered to a Context.
Expand Down
3 changes: 3 additions & 0 deletions include/mscclpp/gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ using CUmemGenericAllocationHandle = hipMemGenericAllocationHandle_t;
using CUmemAllocationProp = hipMemAllocationProp;
using CUmemAccessDesc = hipMemAccessDesc;
using CUmemAllocationHandleType = hipMemAllocationHandleType;
using CUmemAllocationGranularity_flags = hipMemAllocationGranularity_flags;

constexpr auto cudaErrorPeerAccessAlreadyEnabled = hipErrorPeerAccessAlreadyEnabled;
constexpr auto cudaErrorContextIsDestroyed = hipErrorContextIsDestroyed;
Expand All @@ -44,6 +45,7 @@ constexpr auto CU_MEM_ALLOCATION_TYPE_PINNED = hipMemAllocationTypePinned;
constexpr auto CU_MEM_LOCATION_TYPE_DEVICE = hipMemLocationTypeDevice;
constexpr auto CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR = hipMemHandleTypePosixFileDescriptor;
constexpr auto CU_MEM_ACCESS_FLAGS_PROT_READWRITE = hipMemAccessFlagsProtReadWrite;
constexpr auto CU_MEM_ALLOC_GRANULARITY_MINIMUM = hipMemAllocationGranularityMinimum;

#ifndef CUDA_SUCCESS
#define CUDA_SUCCESS hipSuccess
Expand Down Expand Up @@ -106,6 +108,7 @@ constexpr auto CU_MEM_ACCESS_FLAGS_PROT_READWRITE = hipMemAccessFlagsProtReadWri
#define cuMemRetainAllocationHandle(...) hipMemRetainAllocationHandle(__VA_ARGS__)
#define cuMemExportToShareableHandle(...) hipMemExportToShareableHandle(__VA_ARGS__)
#define cuMemImportFromShareableHandle(...) hipMemImportFromShareableHandle(__VA_ARGS__)
#define cuMemGetAllocationGranularity(...) hipMemGetAllocationGranularity(__VA_ARGS__)

#else

Expand Down
1 change: 1 addition & 0 deletions include/mscclpp/gpu_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ auto gpuCallocPhysicalUnique(size_t nelems = 1, size_t gran = 0, size_t align =
}

size_t getMulticastGranularity(size_t size, CUmulticastGranularity_flags granFlag);
size_t getCuAllocationGranularity(CUmemAllocationGranularity_flags granFlag);

#endif // CUDA_NVLS_API_AVAILABLE

Expand Down
7 changes: 7 additions & 0 deletions src/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ IbCtx *Context::Impl::getIbContext(Transport ibTransport) {
return it->second.get();
}

std::shared_ptr<uint64_t> Context::Impl::getToken() {
if (!tokenPool_) {
tokenPool_ = std::make_shared<TokenPool>(maxNumTokens_);
}
return tokenPool_->getToken();
}

MSCCLPP_API_CPP Context::Context() : pimpl_(std::make_unique<Impl>()) {}

MSCCLPP_API_CPP Context::~Context() = default;
Expand Down
14 changes: 14 additions & 0 deletions src/gpu_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,20 @@ void* gpuCallocUncached(size_t bytes) {
#endif // defined(__HIP_PLATFORM_AMD__)

#if (CUDA_NVLS_API_AVAILABLE)
size_t getCuAllocationGranularity(CUmemAllocationGranularity_flags granFlag) {
size_t gran = 0;
int deviceId = -1;
MSCCLPP_CUDATHROW(cudaGetDevice(&deviceId));

CUmemAllocationProp prop = {};
prop.type = CU_MEM_ALLOCATION_TYPE_PINNED;
prop.location.type = CU_MEM_LOCATION_TYPE_DEVICE;
prop.location.id = deviceId;
prop.requestedHandleTypes = (CUmemAllocationHandleType)(CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR | CU_MEM_HANDLE_TYPE_FABRIC);
cuMemGetAllocationGranularity(&gran, &prop, granFlag);
return gran;
}

size_t getMulticastGranularity(size_t size, CUmulticastGranularity_flags granFlag) {
size_t gran = 0;
int numDevices = 0;
Expand Down
4 changes: 4 additions & 0 deletions src/include/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@ class CudaIpcStream {
int deviceId() const { return deviceId_; }
};

class TokenPool;
struct Context::Impl {
std::unordered_map<Transport, std::unique_ptr<IbCtx>> ibContexts_;
std::vector<std::shared_ptr<CudaIpcStream>> ipcStreams_;
std::shared_ptr<TokenPool> tokenPool_;
const size_t maxNumTokens_ = 1 << 15; // 32K tokens

Impl();

IbCtx *getIbContext(Transport ibTransport);
std::shared_ptr<uint64_t> getToken();
};

} // namespace mscclpp
Expand Down
13 changes: 13 additions & 0 deletions src/include/utils_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,19 @@ struct PairHash {
}
};

class TokenPool : public std::enable_shared_from_this<TokenPool> {
public:
TokenPool(size_t nTokens);
std::shared_ptr<uint64_t> getToken();

private:
size_t nToken_;
uint64_t* baseAddr_;
uint64_t tailMask_;
std::shared_ptr<uint64_t> tokens_;
std::vector<uint64_t> allocationMap_;
};

} // namespace mscclpp

#endif
17 changes: 10 additions & 7 deletions src/semaphore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,21 @@ struct SemaphoreStub::Impl {

Impl(const std::vector<char>& data);

std::shared_ptr<uint64_t> gpuCallocToken(std::shared_ptr<Context> context);

std::shared_ptr<Connection> connection_;
std::shared_ptr<uint64_t> token_;
RegisteredMemory idMemory_;
Device device_;
};

static std::shared_ptr<uint64_t> gpuCallocToken() {
// #if (CUDA_NVLS_API_AVAILABLE)
// if (isNvlsSupported()) {
// return detail::gpuCallocPhysicalShared<uint64_t>(1, 0);
// }
// #endif // CUDA_NVLS_API_AVAILABLE
std::shared_ptr<uint64_t> SemaphoreStub::Impl::gpuCallocToken(std::shared_ptr<Context> context) {
// NVLS is not supported in ROCm/HIP
#if (CUDA_NVLS_API_AVAILABLE)
if (isNvlsSupported()) {
return context->pimpl_->getToken();
}
#endif // CUDA_NVLS_API_AVAILABLE
#if defined(MSCCLPP_DEVICE_HIP)
return detail::gpuCallocUncachedShared<uint64_t>();
#else // !defined(MSCCLPP_DEVICE_HIP)
Expand All @@ -49,7 +52,7 @@ SemaphoreStub::Impl::Impl(std::shared_ptr<Connection> connection) : connection_(
throw Error("Local GPU ID is not provided", ErrorCode::InvalidUsage);
}
MSCCLPP_CUDATHROW(cudaSetDevice(localDevice.id));
token_ = gpuCallocToken();
token_ = gpuCallocToken(connection_->context());
} else {
throw Error("Unsupported local device type", ErrorCode::InvalidUsage);
}
Expand Down
39 changes: 39 additions & 0 deletions src/utils_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <memory>
#include <mscclpp/env.hpp>
#include <mscclpp/errors.hpp>
#include <mscclpp/gpu_utils.hpp>
#include <sstream>
#include <string>

Expand Down Expand Up @@ -232,4 +233,42 @@ void getRandomData(void* buffer, size_t bytes) {
}
}

TokenPool::TokenPool(size_t nToken) : nToken_(nToken) {
#if (CUDA_NVLS_API_AVAILABLE)
tokens_ = detail::gpuCallocPhysicalShared<uint64_t>(
nToken, detail::getCuAllocationGranularity(CU_MEM_ALLOC_GRANULARITY_MINIMUM));
MSCCLPP_CUTHROW(cuMemGetAddressRange((CUdeviceptr*)(&baseAddr_), NULL, (CUdeviceptr)tokens_.get()));
size_t nElems = (nToken + (UINT64_WIDTH - 1)) / UINT64_WIDTH;
allocationMap_.resize(nElems, 0);
tailMask_ = (nToken % UINT64_WIDTH) ? ((1UL << (nToken % UINT64_WIDTH)) - 1) : ~0UL;
#else
throw Error("TokenPool only available on GPUs with NVLS support", ErrorCode::InvalidUsage);
#endif
}

std::shared_ptr<uint64_t> TokenPool::getToken() {
auto deleter = [self = shared_from_this()](uint64_t* token) {
size_t index = (token - self->baseAddr_) / UINT64_WIDTH;
size_t bit = (token - self->baseAddr_) % UINT64_WIDTH;
uint64_t mask = 1UL << bit;
if ((self->allocationMap_[index] & mask) == 0) {
WARN("TokenPool tried to free a token that was not allocated");
return;
}
self->allocationMap_[index] &= ~mask;
};

size_t size = allocationMap_.size();
for (size_t i = 0; i < size; i++) {
uint64_t mask = (i + 1 == size) ? tailMask_ : ~0ULL;
uint64_t holes = (~allocationMap_[i]) & mask;
if (!holes) continue;
size_t bit = __builtin_ctzll(holes);
allocationMap_[i] |= (1UL << bit);
INFO(MSCCLPP_ALLOC, "TokenPool allocated token at addr %p", baseAddr_ + i * UINT64_WIDTH + bit);
return std::shared_ptr<uint64_t>(baseAddr_ + i * UINT64_WIDTH + bit, deleter);
}
throw Error("TokenPool is exhausted", ErrorCode::InternalError);
}

} // namespace mscclpp
Loading