summaryrefslogtreecommitdiff
path: root/libc
diff options
context:
space:
mode:
authorJoseph Huber <jhuber6@vols.utk.edu>2023-05-01 12:10:04 -0500
committerJoseph Huber <jhuber6@vols.utk.edu>2023-05-05 10:12:19 -0500
commitaea866c12cb428eb5fe062ffa910a63daff62b01 (patch)
tree9bf301a8c3c7f66949f2a36399ba2a5580b7b195 /libc
parenta1be6f0290effb26ff5e0d7a43ab20931f1353a2 (diff)
downloadllvm-aea866c12cb428eb5fe062ffa910a63daff62b01.tar.gz
[libc] Support concurrent RPC port access on the GPU
Previously we used a single port to implement the RPC. This was sufficient for single threaded tests but can potentially cause deadlocks when using multiple threads. The reason for this is that GPUs make no forward progress guarantees. Therefore one group of threads waiting on another group of threads can spin forever because there is no guarantee that the other threads will continue executing. The typical workaround for this is to allocate enough memory that a sufficiently large number of work groups can make progress. As long as this number is somewhat close to the amount of total concurrency we can obtain reliable execution around a shared resource. This patch enables using multiple ports by widening the arrays to a predetermined size and indexes into them. Empty ports are currently obtained via a trivial linker scan. This should be imporoved in the future for performance reasons. Portions of D148191 were applied to achieve parallel support. Depends on D149581 Reviewed By: JonChesterfield Differential Revision: https://reviews.llvm.org/D149598
Diffstat (limited to 'libc')
-rw-r--r--libc/src/__support/RPC/rpc.h162
-rw-r--r--libc/src/__support/RPC/rpc_util.h5
-rw-r--r--libc/startup/gpu/amdgpu/start.cpp5
-rw-r--r--libc/startup/gpu/nvptx/start.cpp5
-rw-r--r--libc/utils/gpu/loader/Server.h72
-rw-r--r--libc/utils/gpu/loader/amdgpu/Loader.cpp15
-rw-r--r--libc/utils/gpu/loader/nvptx/Loader.cpp18
7 files changed, 160 insertions, 122 deletions
diff --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h
index 170f74830b14..5395c4e57332 100644
--- a/libc/src/__support/RPC/rpc.h
+++ b/libc/src/__support/RPC/rpc.h
@@ -69,6 +69,12 @@ struct alignas(64) Packet {
Payload payload;
};
+// TODO: This should be configured by the server and passed in. The general rule
+// of thumb is that you should have at least as many ports as possible
+// concurrent work items on the GPU to mitigate the lack offorward
+// progress guarantees on the GPU.
+constexpr uint64_t default_port_count = 64;
+
/// A common process used to synchronize communication between a client and a
/// server. The process contains an inbox and an outbox used for signaling
/// ownership of the shared buffer between both sides.
@@ -96,22 +102,31 @@ template <bool InvertInbox> struct Process {
LIBC_INLINE Process &operator=(const Process &) = default;
LIBC_INLINE ~Process() = default;
+ uint64_t port_count;
uint32_t lane_size;
cpp::Atomic<uint32_t> *lock;
cpp::Atomic<uint32_t> *inbox;
cpp::Atomic<uint32_t> *outbox;
- Packet *buffer;
+ Packet *packet;
/// Initialize the communication channels.
- LIBC_INLINE void reset(uint32_t lane_size, void *lock, void *inbox,
- void *outbox, void *buffer) {
- *this = {
- lane_size,
- reinterpret_cast<cpp::Atomic<uint32_t> *>(lock),
- reinterpret_cast<cpp::Atomic<uint32_t> *>(inbox),
- reinterpret_cast<cpp::Atomic<uint32_t> *>(outbox),
- reinterpret_cast<Packet *>(buffer),
- };
+ LIBC_INLINE void reset(uint64_t port_count, uint32_t lane_size, void *lock,
+ void *inbox, void *outbox, void *packet) {
+ *this = {port_count,
+ lane_size,
+ reinterpret_cast<cpp::Atomic<uint32_t> *>(lock),
+ reinterpret_cast<cpp::Atomic<uint32_t> *>(inbox),
+ reinterpret_cast<cpp::Atomic<uint32_t> *>(outbox),
+ reinterpret_cast<Packet *>(packet)};
+ }
+
+ /// The length of the packet is flexible because the server needs to look up
+ /// the lane size at runtime. This helper indexes at the proper offset.
+ LIBC_INLINE Packet &get_packet(uint64_t index) {
+ return *reinterpret_cast<Packet *>(
+ reinterpret_cast<uint8_t *>(packet) +
+ index * align_up(sizeof(Header) + lane_size * sizeof(Buffer),
+ alignof(Packet)));
}
/// Inverting the bits loaded from the inbox in exactly one of the pair of
@@ -190,25 +205,25 @@ template <bool InvertInbox> struct Process {
/// Invokes a function accross every active buffer across the total lane size.
LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *)> fn,
- uint32_t index) {
+ Packet &packet) {
if constexpr (is_process_gpu()) {
- fn(&buffer[index].payload.slot[gpu::get_lane_id()]);
+ fn(&packet.payload.slot[gpu::get_lane_id()]);
} else {
for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
- if (buffer[index].header.mask & 1ul << i)
- fn(&buffer[index].payload.slot[i]);
+ if (packet.header.mask & 1ul << i)
+ fn(&packet.payload.slot[i]);
}
}
/// Alternate version that also provides the index of the current lane.
LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *, uint32_t)> fn,
- uint32_t index) {
+ Packet &packet) {
if constexpr (is_process_gpu()) {
- fn(&buffer[index].payload.slot[gpu::get_lane_id()], gpu::get_lane_id());
+ fn(&packet.payload.slot[gpu::get_lane_id()], gpu::get_lane_id());
} else {
for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
- if (buffer[index].header.mask & 1ul << i)
- fn(&buffer[index].payload.slot[i], i);
+ if (packet.header.mask & 1ul << i)
+ fn(&packet.payload.slot[i], i);
}
}
};
@@ -234,7 +249,7 @@ template <bool T> struct Port {
template <typename A> LIBC_INLINE void recv_n(A alloc);
LIBC_INLINE uint16_t get_opcode() const {
- return process.buffer[index].header.opcode;
+ return process.get_packet(index).header.opcode;
}
LIBC_INLINE void close() { process.unlock(lane_mask, index); }
@@ -281,7 +296,7 @@ template <bool T> template <typename F> LIBC_INLINE void Port<T>::send(F fill) {
}
// Apply the \p fill function to initialize the buffer and release the memory.
- process.invoke_rpc(fill, index);
+ process.invoke_rpc(fill, process.get_packet(index));
out = !out;
atomic_thread_fence(cpp::MemoryOrder::RELEASE);
process.outbox[index].store(out, cpp::MemoryOrder::RELAXED);
@@ -299,7 +314,7 @@ template <bool T> template <typename U> LIBC_INLINE void Port<T>::recv(U use) {
atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
// Apply the \p use function to read the memory out of the buffer.
- process.invoke_rpc(use, index);
+ process.invoke_rpc(use, process.get_packet(index));
out = !out;
process.outbox[index].store(out, cpp::MemoryOrder::RELAXED);
}
@@ -340,7 +355,7 @@ LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) {
inline_memcpy(buffer->data, ptr + idx, len);
});
}
- gpu::sync_lane(process.buffer[index].header.mask);
+ gpu::sync_lane(process.get_packet(index).header.mask);
}
/// Receives an arbitrarily sized data buffer across the shared channel in
@@ -396,32 +411,34 @@ LIBC_INLINE void Port<T>::recv_n(A alloc) {
/// participating thread.
[[clang::convergent]] LIBC_INLINE cpp::optional<Client::Port>
Client::try_open(uint16_t opcode) {
- constexpr uint64_t index = 0;
- const uint64_t lane_mask = gpu::get_lane_mask();
-
- // Attempt to acquire the lock on this index.
- if (!try_lock(lane_mask, index))
- return cpp::nullopt;
-
- // The mailbox state must be read with the lock held.
- atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
-
- uint32_t in = load_inbox(index);
- uint32_t out = outbox[index].load(cpp::MemoryOrder::RELAXED);
-
- // Once we acquire the index we need to check if we are in a valid sending
- // state.
- if (buffer_unavailable(in, out)) {
- unlock(lane_mask, index);
- return cpp::nullopt;
- }
+ // Perform a naive linear scan for a port that can be opened to send data.
+ for (uint64_t index = 0; index < port_count; ++index) {
+ // Attempt to acquire the lock on this index.
+ uint64_t lane_mask = gpu::get_lane_mask();
+ if (!try_lock(lane_mask, index))
+ continue;
+
+ // The mailbox state must be read with the lock held.
+ atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
+
+ uint32_t in = load_inbox(index);
+ uint32_t out = outbox[index].load(cpp::MemoryOrder::RELAXED);
+
+ // Once we acquire the index we need to check if we are in a valid sending
+ // state.
+ if (buffer_unavailable(in, out)) {
+ unlock(lane_mask, index);
+ continue;
+ }
- if (is_first_lane(lane_mask)) {
- buffer[index].header.opcode = opcode;
- buffer[index].header.mask = lane_mask;
+ if (is_first_lane(lane_mask)) {
+ get_packet(index).header.opcode = opcode;
+ get_packet(index).header.mask = lane_mask;
+ }
+ gpu::sync_lane(lane_mask);
+ return Port(*this, lane_mask, index, out);
}
- gpu::sync_lane(lane_mask);
- return Port(*this, lane_mask, index, out);
+ return cpp::nullopt;
}
LIBC_INLINE Client::Port Client::open(uint16_t opcode) {
@@ -436,33 +453,36 @@ LIBC_INLINE Client::Port Client::open(uint16_t opcode) {
/// port if it has a pending receive operation
[[clang::convergent]] LIBC_INLINE cpp::optional<Server::Port>
Server::try_open() {
- constexpr uint64_t index = 0;
- const uint64_t lane_mask = gpu::get_lane_mask();
-
- uint32_t in = load_inbox(index);
- uint32_t out = outbox[index].load(cpp::MemoryOrder::RELAXED);
-
- // The server is passive, if there is no work pending don't bother
- // opening a port.
- if (buffer_unavailable(in, out))
- return cpp::nullopt;
-
- // Attempt to acquire the lock on this index.
- if (!try_lock(lane_mask, index))
- return cpp::nullopt;
-
- // The mailbox state must be read with the lock held.
- atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
-
- in = load_inbox(index);
- out = outbox[index].load(cpp::MemoryOrder::RELAXED);
+ // Perform a naive linear scan for a port that has a pending request.
+ for (uint64_t index = 0; index < port_count; ++index) {
+ uint32_t in = load_inbox(index);
+ uint32_t out = outbox[index].load(cpp::MemoryOrder::RELAXED);
+
+ // The server is passive, if there is no work pending don't bother
+ // opening a port.
+ if (buffer_unavailable(in, out))
+ continue;
+
+ // Attempt to acquire the lock on this index.
+ uint64_t lane_mask = gpu::get_lane_mask();
+ // Attempt to acquire the lock on this index.
+ if (!try_lock(lane_mask, index))
+ continue;
+
+ // The mailbox state must be read with the lock held.
+ atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
+
+ in = load_inbox(index);
+ out = outbox[index].load(cpp::MemoryOrder::RELAXED);
+
+ if (buffer_unavailable(in, out)) {
+ unlock(lane_mask, index);
+ continue;
+ }
- if (buffer_unavailable(in, out)) {
- unlock(lane_mask, index);
- return cpp::nullopt;
+ return Port(*this, lane_mask, index, out);
}
-
- return Port(*this, lane_mask, index, out);
+ return cpp::nullopt;
}
LIBC_INLINE Server::Port Server::open() {
diff --git a/libc/src/__support/RPC/rpc_util.h b/libc/src/__support/RPC/rpc_util.h
index 224723ef20c9..c6282e40c903 100644
--- a/libc/src/__support/RPC/rpc_util.h
+++ b/libc/src/__support/RPC/rpc_util.h
@@ -49,6 +49,11 @@ LIBC_INLINE constexpr bool is_process_gpu() {
#endif
}
+/// Return \p val aligned "upwards" according to \p align.
+template <typename V, typename A> LIBC_INLINE V align_up(V val, A align) {
+ return ((val + V(align) - 1) / V(align)) * V(align);
+}
+
} // namespace rpc
} // namespace __llvm_libc
diff --git a/libc/startup/gpu/amdgpu/start.cpp b/libc/startup/gpu/amdgpu/start.cpp
index d1dfc7b8c11d..84adb3b97527 100644
--- a/libc/startup/gpu/amdgpu/start.cpp
+++ b/libc/startup/gpu/amdgpu/start.cpp
@@ -15,7 +15,7 @@ extern "C" int main(int argc, char **argv, char **envp);
namespace __llvm_libc {
-static cpp::Atomic<uint32_t> lock = 0;
+static cpp::Atomic<uint32_t> lock[rpc::default_port_count] = {0};
extern "C" uintptr_t __init_array_start[];
extern "C" uintptr_t __init_array_end[];
@@ -43,7 +43,8 @@ extern "C" [[gnu::visibility("protected"), clang::amdgpu_kernel]] void
_begin(int argc, char **argv, char **env, void *in, void *out, void *buffer) {
// We need to set up the RPC client first in case any of the constructors
// require it.
- __llvm_libc::rpc::client.reset(__llvm_libc::gpu::get_lane_size(),
+ __llvm_libc::rpc::client.reset(__llvm_libc::rpc::default_port_count,
+ __llvm_libc::gpu::get_lane_size(),
&__llvm_libc::lock, in, out, buffer);
// We want the fini array callbacks to be run after other atexit
diff --git a/libc/startup/gpu/nvptx/start.cpp b/libc/startup/gpu/nvptx/start.cpp
index 83453ae1e47a..1d366dc829df 100644
--- a/libc/startup/gpu/nvptx/start.cpp
+++ b/libc/startup/gpu/nvptx/start.cpp
@@ -15,7 +15,7 @@ extern "C" int main(int argc, char **argv, char **envp);
namespace __llvm_libc {
-static cpp::Atomic<uint32_t> lock = 0;
+static cpp::Atomic<uint32_t> lock[rpc::default_port_count] = {0};
extern "C" {
// Nvidia's 'nvlink' linker does not provide these symbols. We instead need
@@ -47,7 +47,8 @@ extern "C" [[gnu::visibility("protected"), clang::nvptx_kernel]] void
_begin(int argc, char **argv, char **env, void *in, void *out, void *buffer) {
// We need to set up the RPC client first in case any of the constructors
// require it.
- __llvm_libc::rpc::client.reset(__llvm_libc::gpu::get_lane_size(),
+ __llvm_libc::rpc::client.reset(__llvm_libc::rpc::default_port_count,
+ __llvm_libc::gpu::get_lane_size(),
&__llvm_libc::lock, in, out, buffer);
// We want the fini array callbacks to be run after other atexit
diff --git a/libc/utils/gpu/loader/Server.h b/libc/utils/gpu/loader/Server.h
index 6ffb32955c89..f77bf256618a 100644
--- a/libc/utils/gpu/loader/Server.h
+++ b/libc/utils/gpu/loader/Server.h
@@ -19,47 +19,51 @@
static __llvm_libc::rpc::Server server;
-static __llvm_libc::cpp::Atomic<uint32_t> lock;
+static __llvm_libc::cpp::Atomic<uint32_t>
+ lock[__llvm_libc::rpc::default_port_count] = {0};
/// Queries the RPC client at least once and performs server-side work if there
/// are any active requests.
void handle_server() {
- auto port = server.try_open();
- if (!port)
- return;
+ // Continue servicing the client until there is no work left and we return.
+ for (;;) {
+ auto port = server.try_open();
+ if (!port)
+ return;
- switch (port->get_opcode()) {
- case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: {
- uint64_t str_size[__llvm_libc::rpc::MAX_LANE_SIZE] = {0};
- char *strs[__llvm_libc::rpc::MAX_LANE_SIZE] = {nullptr};
- port->recv_n([&](uint64_t size, uint32_t id) {
- str_size[id] = size;
- strs[id] = new char[size];
- return strs[id];
- });
- for (uint64_t i = 0; i < __llvm_libc::rpc::MAX_LANE_SIZE; ++i) {
- if (strs[i]) {
- fwrite(strs[i], str_size[i], 1, stderr);
- delete[] strs[i];
+ switch (port->get_opcode()) {
+ case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: {
+ uint64_t str_size[__llvm_libc::rpc::MAX_LANE_SIZE] = {0};
+ char *strs[__llvm_libc::rpc::MAX_LANE_SIZE] = {nullptr};
+ port->recv_n([&](uint64_t size, uint32_t id) {
+ str_size[id] = size;
+ strs[id] = new char[size];
+ return strs[id];
+ });
+ for (uint64_t i = 0; i < __llvm_libc::rpc::MAX_LANE_SIZE; ++i) {
+ if (strs[i]) {
+ fwrite(strs[i], str_size[i], 1, stderr);
+ delete[] strs[i];
+ }
}
+ break;
}
- break;
- }
- case __llvm_libc::rpc::Opcode::EXIT: {
- port->recv([](__llvm_libc::rpc::Buffer *buffer) {
- exit(reinterpret_cast<uint32_t *>(buffer->data)[0]);
- });
- break;
- }
- case __llvm_libc::rpc::Opcode::TEST_INCREMENT: {
- port->recv_and_send([](__llvm_libc::rpc::Buffer *buffer) {
- reinterpret_cast<uint64_t *>(buffer->data)[0] += 1;
- });
- break;
- }
- default:
- port->recv([](__llvm_libc::rpc::Buffer *buffer) {});
+ case __llvm_libc::rpc::Opcode::EXIT: {
+ port->recv([](__llvm_libc::rpc::Buffer *buffer) {
+ exit(reinterpret_cast<uint32_t *>(buffer->data)[0]);
+ });
+ break;
+ }
+ case __llvm_libc::rpc::Opcode::TEST_INCREMENT: {
+ port->recv_and_send([](__llvm_libc::rpc::Buffer *buffer) {
+ reinterpret_cast<uint64_t *>(buffer->data)[0] += 1;
+ });
+ break;
+ }
+ default:
+ port->recv([](__llvm_libc::rpc::Buffer *buffer) {});
+ }
+ port->close();
}
- port->close();
}
#endif
diff --git a/libc/utils/gpu/loader/amdgpu/Loader.cpp b/libc/utils/gpu/loader/amdgpu/Loader.cpp
index ee12d6d63ffb..07fa1ae7fe16 100644
--- a/libc/utils/gpu/loader/amdgpu/Loader.cpp
+++ b/libc/utils/gpu/loader/amdgpu/Loader.cpp
@@ -330,6 +330,7 @@ int load(int argc, char **argv, char **envp, void *image, size_t size,
hsa_amd_memory_fill(dev_ret, 0, sizeof(int));
// Allocate finegrained memory for the RPC server and client to share.
+ uint64_t port_size = __llvm_libc::rpc::default_port_count;
uint32_t wavefront_size = 0;
if (hsa_status_t err = hsa_agent_get_info(
dev_agent, HSA_AGENT_INFO_WAVEFRONT_SIZE, &wavefront_size))
@@ -338,18 +339,19 @@ int load(int argc, char **argv, char **envp, void *image, size_t size,
void *server_outbox;
void *buffer;
if (hsa_status_t err = hsa_amd_memory_pool_allocate(
- finegrained_pool, sizeof(__llvm_libc::cpp::Atomic<int>),
+ finegrained_pool, port_size * sizeof(__llvm_libc::cpp::Atomic<int>),
/*flags=*/0, &server_inbox))
handle_error(err);
if (hsa_status_t err = hsa_amd_memory_pool_allocate(
- finegrained_pool, sizeof(__llvm_libc::cpp::Atomic<int>),
+ finegrained_pool, port_size * sizeof(__llvm_libc::cpp::Atomic<int>),
/*flags=*/0, &server_outbox))
handle_error(err);
if (hsa_status_t err = hsa_amd_memory_pool_allocate(
finegrained_pool,
- align_up(sizeof(__llvm_libc::rpc::Header) +
- (wavefront_size * sizeof(__llvm_libc::rpc::Buffer)),
- alignof(__llvm_libc::rpc::Packet)),
+ port_size *
+ align_up(sizeof(__llvm_libc::rpc::Header) +
+ (wavefront_size * sizeof(__llvm_libc::rpc::Buffer)),
+ alignof(__llvm_libc::rpc::Packet)),
/*flags=*/0, &buffer))
handle_error(err);
hsa_amd_agents_allow_access(1, &dev_agent, nullptr, server_inbox);
@@ -357,7 +359,8 @@ int load(int argc, char **argv, char **envp, void *image, size_t size,
hsa_amd_agents_allow_access(1, &dev_agent, nullptr, buffer);
// Initialize the RPC server's buffer for host-device communication.
- server.reset(wavefront_size, &lock, server_inbox, server_outbox, buffer);
+ server.reset(port_size, wavefront_size, &lock, server_inbox, server_outbox,
+ buffer);
// Obtain a queue with the minimum (power of two) size, used to send commands
// to the HSA runtime and launch execution on the device.
diff --git a/libc/utils/gpu/loader/nvptx/Loader.cpp b/libc/utils/gpu/loader/nvptx/Loader.cpp
index ca18da939f4c..314f5a8055fb 100644
--- a/libc/utils/gpu/loader/nvptx/Loader.cpp
+++ b/libc/utils/gpu/loader/nvptx/Loader.cpp
@@ -246,18 +246,22 @@ int load(int argc, char **argv, char **envp, void *image, size_t size,
if (CUresult err = cuMemsetD32(dev_ret, 0, 1))
handle_error(err);
+ uint64_t port_size = __llvm_libc::rpc::default_port_count;
uint32_t warp_size = 32;
- void *server_inbox = allocator(sizeof(__llvm_libc::cpp::Atomic<int>));
- void *server_outbox = allocator(sizeof(__llvm_libc::cpp::Atomic<int>));
- void *buffer =
- allocator(align_up(sizeof(__llvm_libc::rpc::Header) +
- (warp_size * sizeof(__llvm_libc::rpc::Buffer)),
- alignof(__llvm_libc::rpc::Packet)));
+ void *server_inbox =
+ allocator(port_size * sizeof(__llvm_libc::cpp::Atomic<int>));
+ void *server_outbox =
+ allocator(port_size * sizeof(__llvm_libc::cpp::Atomic<int>));
+ void *buffer = allocator(
+ port_size * align_up(sizeof(__llvm_libc::rpc::Header) +
+ (warp_size * sizeof(__llvm_libc::rpc::Buffer)),
+ alignof(__llvm_libc::rpc::Packet)));
if (!server_inbox || !server_outbox || !buffer)
handle_error("Failed to allocate memory the RPC client / server.");
// Initialize the RPC server's buffer for host-device communication.
- server.reset(warp_size, &lock, server_inbox, server_outbox, buffer);
+ server.reset(port_size, warp_size, &lock, server_inbox, server_outbox,
+ buffer);
LaunchParameters single_threaded_params = {1, 1, 1, 1, 1, 1};
// Call the kernel to