summaryrefslogtreecommitdiff
path: root/libc
diff options
context:
space:
mode:
Diffstat (limited to 'libc')
-rw-r--r--libc/src/__support/RPC/rpc.h162
-rw-r--r--libc/src/__support/RPC/rpc_util.h5
-rw-r--r--libc/startup/gpu/amdgpu/start.cpp5
-rw-r--r--libc/startup/gpu/nvptx/start.cpp5
-rw-r--r--libc/utils/gpu/loader/Server.h72
-rw-r--r--libc/utils/gpu/loader/amdgpu/Loader.cpp15
-rw-r--r--libc/utils/gpu/loader/nvptx/Loader.cpp18
7 files changed, 160 insertions, 122 deletions
diff --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h
index 170f74830b14..5395c4e57332 100644
--- a/libc/src/__support/RPC/rpc.h
+++ b/libc/src/__support/RPC/rpc.h
@@ -69,6 +69,12 @@ struct alignas(64) Packet {
Payload payload;
};
+// TODO: This should be configured by the server and passed in. The general rule
+// of thumb is that you should have at least as many ports as possible
+// concurrent work items on the GPU to mitigate the lack offorward
+// progress guarantees on the GPU.
+constexpr uint64_t default_port_count = 64;
+
/// A common process used to synchronize communication between a client and a
/// server. The process contains an inbox and an outbox used for signaling
/// ownership of the shared buffer between both sides.
@@ -96,22 +102,31 @@ template <bool InvertInbox> struct Process {
LIBC_INLINE Process &operator=(const Process &) = default;
LIBC_INLINE ~Process() = default;
+ uint64_t port_count;
uint32_t lane_size;
cpp::Atomic<uint32_t> *lock;
cpp::Atomic<uint32_t> *inbox;
cpp::Atomic<uint32_t> *outbox;
- Packet *buffer;
+ Packet *packet;
/// Initialize the communication channels.
- LIBC_INLINE void reset(uint32_t lane_size, void *lock, void *inbox,
- void *outbox, void *buffer) {
- *this = {
- lane_size,
- reinterpret_cast<cpp::Atomic<uint32_t> *>(lock),
- reinterpret_cast<cpp::Atomic<uint32_t> *>(inbox),
- reinterpret_cast<cpp::Atomic<uint32_t> *>(outbox),
- reinterpret_cast<Packet *>(buffer),
- };
+ LIBC_INLINE void reset(uint64_t port_count, uint32_t lane_size, void *lock,
+ void *inbox, void *outbox, void *packet) {
+ *this = {port_count,
+ lane_size,
+ reinterpret_cast<cpp::Atomic<uint32_t> *>(lock),
+ reinterpret_cast<cpp::Atomic<uint32_t> *>(inbox),
+ reinterpret_cast<cpp::Atomic<uint32_t> *>(outbox),
+ reinterpret_cast<Packet *>(packet)};
+ }
+
+ /// The length of the packet is flexible because the server needs to look up
+ /// the lane size at runtime. This helper indexes at the proper offset.
+ LIBC_INLINE Packet &get_packet(uint64_t index) {
+ return *reinterpret_cast<Packet *>(
+ reinterpret_cast<uint8_t *>(packet) +
+ index * align_up(sizeof(Header) + lane_size * sizeof(Buffer),
+ alignof(Packet)));
}
/// Inverting the bits loaded from the inbox in exactly one of the pair of
@@ -190,25 +205,25 @@ template <bool InvertInbox> struct Process {
/// Invokes a function accross every active buffer across the total lane size.
LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *)> fn,
- uint32_t index) {
+ Packet &packet) {
if constexpr (is_process_gpu()) {
- fn(&buffer[index].payload.slot[gpu::get_lane_id()]);
+ fn(&packet.payload.slot[gpu::get_lane_id()]);
} else {
for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
- if (buffer[index].header.mask & 1ul << i)
- fn(&buffer[index].payload.slot[i]);
+ if (packet.header.mask & 1ul << i)
+ fn(&packet.payload.slot[i]);
}
}
/// Alternate version that also provides the index of the current lane.
LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *, uint32_t)> fn,
- uint32_t index) {
+ Packet &packet) {
if constexpr (is_process_gpu()) {
- fn(&buffer[index].payload.slot[gpu::get_lane_id()], gpu::get_lane_id());
+ fn(&packet.payload.slot[gpu::get_lane_id()], gpu::get_lane_id());
} else {
for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
- if (buffer[index].header.mask & 1ul << i)
- fn(&buffer[index].payload.slot[i], i);
+ if (packet.header.mask & 1ul << i)
+ fn(&packet.payload.slot[i], i);
}
}
};
@@ -234,7 +249,7 @@ template <bool T> struct Port {
template <typename A> LIBC_INLINE void recv_n(A alloc);
LIBC_INLINE uint16_t get_opcode() const {
- return process.buffer[index].header.opcode;
+ return process.get_packet(index).header.opcode;
}
LIBC_INLINE void close() { process.unlock(lane_mask, index); }
@@ -281,7 +296,7 @@ template <bool T> template <typename F> LIBC_INLINE void Port<T>::send(F fill) {
}
// Apply the \p fill function to initialize the buffer and release the memory.
- process.invoke_rpc(fill, index);
+ process.invoke_rpc(fill, process.get_packet(index));
out = !out;
atomic_thread_fence(cpp::MemoryOrder::RELEASE);
process.outbox[index].store(out, cpp::MemoryOrder::RELAXED);
@@ -299,7 +314,7 @@ template <bool T> template <typename U> LIBC_INLINE void Port<T>::recv(U use) {
atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
// Apply the \p use function to read the memory out of the buffer.
- process.invoke_rpc(use, index);
+ process.invoke_rpc(use, process.get_packet(index));
out = !out;
process.outbox[index].store(out, cpp::MemoryOrder::RELAXED);
}
@@ -340,7 +355,7 @@ LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) {
inline_memcpy(buffer->data, ptr + idx, len);
});
}
- gpu::sync_lane(process.buffer[index].header.mask);
+ gpu::sync_lane(process.get_packet(index).header.mask);
}
/// Receives an arbitrarily sized data buffer across the shared channel in
@@ -396,32 +411,34 @@ LIBC_INLINE void Port<T>::recv_n(A alloc) {
/// participating thread.
[[clang::convergent]] LIBC_INLINE cpp::optional<Client::Port>
Client::try_open(uint16_t opcode) {
- constexpr uint64_t index = 0;
- const uint64_t lane_mask = gpu::get_lane_mask();
-
- // Attempt to acquire the lock on this index.
- if (!try_lock(lane_mask, index))
- return cpp::nullopt;
-
- // The mailbox state must be read with the lock held.
- atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
-
- uint32_t in = load_inbox(index);
- uint32_t out = outbox[index].load(cpp::MemoryOrder::RELAXED);
-
- // Once we acquire the index we need to check if we are in a valid sending
- // state.
- if (buffer_unavailable(in, out)) {
- unlock(lane_mask, index);
- return cpp::nullopt;
- }
+ // Perform a naive linear scan for a port that can be opened to send data.
+ for (uint64_t index = 0; index < port_count; ++index) {
+ // Attempt to acquire the lock on this index.
+ uint64_t lane_mask = gpu::get_lane_mask();
+ if (!try_lock(lane_mask, index))
+ continue;
+
+ // The mailbox state must be read with the lock held.
+ atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
+
+ uint32_t in = load_inbox(index);
+ uint32_t out = outbox[index].load(cpp::MemoryOrder::RELAXED);
+
+ // Once we acquire the index we need to check if we are in a valid sending
+ // state.
+ if (buffer_unavailable(in, out)) {
+ unlock(lane_mask, index);
+ continue;
+ }
- if (is_first_lane(lane_mask)) {
- buffer[index].header.opcode = opcode;
- buffer[index].header.mask = lane_mask;
+ if (is_first_lane(lane_mask)) {
+ get_packet(index).header.opcode = opcode;
+ get_packet(index).header.mask = lane_mask;
+ }
+ gpu::sync_lane(lane_mask);
+ return Port(*this, lane_mask, index, out);
}
- gpu::sync_lane(lane_mask);
- return Port(*this, lane_mask, index, out);
+ return cpp::nullopt;
}
LIBC_INLINE Client::Port Client::open(uint16_t opcode) {
@@ -436,33 +453,36 @@ LIBC_INLINE Client::Port Client::open(uint16_t opcode) {
/// port if it has a pending receive operation
[[clang::convergent]] LIBC_INLINE cpp::optional<Server::Port>
Server::try_open() {
- constexpr uint64_t index = 0;
- const uint64_t lane_mask = gpu::get_lane_mask();
-
- uint32_t in = load_inbox(index);
- uint32_t out = outbox[index].load(cpp::MemoryOrder::RELAXED);
-
- // The server is passive, if there is no work pending don't bother
- // opening a port.
- if (buffer_unavailable(in, out))
- return cpp::nullopt;
-
- // Attempt to acquire the lock on this index.
- if (!try_lock(lane_mask, index))
- return cpp::nullopt;
-
- // The mailbox state must be read with the lock held.
- atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
-
- in = load_inbox(index);
- out = outbox[index].load(cpp::MemoryOrder::RELAXED);
+ // Perform a naive linear scan for a port that has a pending request.
+ for (uint64_t index = 0; index < port_count; ++index) {
+ uint32_t in = load_inbox(index);
+ uint32_t out = outbox[index].load(cpp::MemoryOrder::RELAXED);
+
+ // The server is passive, if there is no work pending don't bother
+ // opening a port.
+ if (buffer_unavailable(in, out))
+ continue;
+
+ // Attempt to acquire the lock on this index.
+ uint64_t lane_mask = gpu::get_lane_mask();
+ // Attempt to acquire the lock on this index.
+ if (!try_lock(lane_mask, index))
+ continue;
+
+ // The mailbox state must be read with the lock held.
+ atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
+
+ in = load_inbox(index);
+ out = outbox[index].load(cpp::MemoryOrder::RELAXED);
+
+ if (buffer_unavailable(in, out)) {
+ unlock(lane_mask, index);
+ continue;
+ }
- if (buffer_unavailable(in, out)) {
- unlock(lane_mask, index);
- return cpp::nullopt;
+ return Port(*this, lane_mask, index, out);
}
-
- return Port(*this, lane_mask, index, out);
+ return cpp::nullopt;
}
LIBC_INLINE Server::Port Server::open() {
diff --git a/libc/src/__support/RPC/rpc_util.h b/libc/src/__support/RPC/rpc_util.h
index 224723ef20c9..c6282e40c903 100644
--- a/libc/src/__support/RPC/rpc_util.h
+++ b/libc/src/__support/RPC/rpc_util.h
@@ -49,6 +49,11 @@ LIBC_INLINE constexpr bool is_process_gpu() {
#endif
}
+/// Return \p val aligned "upwards" according to \p align.
+template <typename V, typename A> LIBC_INLINE V align_up(V val, A align) {
+ return ((val + V(align) - 1) / V(align)) * V(align);
+}
+
} // namespace rpc
} // namespace __llvm_libc
diff --git a/libc/startup/gpu/amdgpu/start.cpp b/libc/startup/gpu/amdgpu/start.cpp
index d1dfc7b8c11d..84adb3b97527 100644
--- a/libc/startup/gpu/amdgpu/start.cpp
+++ b/libc/startup/gpu/amdgpu/start.cpp
@@ -15,7 +15,7 @@ extern "C" int main(int argc, char **argv, char **envp);
namespace __llvm_libc {
-static cpp::Atomic<uint32_t> lock = 0;
+static cpp::Atomic<uint32_t> lock[rpc::default_port_count] = {0};
extern "C" uintptr_t __init_array_start[];
extern "C" uintptr_t __init_array_end[];
@@ -43,7 +43,8 @@ extern "C" [[gnu::visibility("protected"), clang::amdgpu_kernel]] void
_begin(int argc, char **argv, char **env, void *in, void *out, void *buffer) {
// We need to set up the RPC client first in case any of the constructors
// require it.
- __llvm_libc::rpc::client.reset(__llvm_libc::gpu::get_lane_size(),
+ __llvm_libc::rpc::client.reset(__llvm_libc::rpc::default_port_count,
+ __llvm_libc::gpu::get_lane_size(),
&__llvm_libc::lock, in, out, buffer);
// We want the fini array callbacks to be run after other atexit
diff --git a/libc/startup/gpu/nvptx/start.cpp b/libc/startup/gpu/nvptx/start.cpp
index 83453ae1e47a..1d366dc829df 100644
--- a/libc/startup/gpu/nvptx/start.cpp
+++ b/libc/startup/gpu/nvptx/start.cpp
@@ -15,7 +15,7 @@ extern "C" int main(int argc, char **argv, char **envp);
namespace __llvm_libc {
-static cpp::Atomic<uint32_t> lock = 0;
+static cpp::Atomic<uint32_t> lock[rpc::default_port_count] = {0};
extern "C" {
// Nvidia's 'nvlink' linker does not provide these symbols. We instead need
@@ -47,7 +47,8 @@ extern "C" [[gnu::visibility("protected"), clang::nvptx_kernel]] void
_begin(int argc, char **argv, char **env, void *in, void *out, void *buffer) {
// We need to set up the RPC client first in case any of the constructors
// require it.
- __llvm_libc::rpc::client.reset(__llvm_libc::gpu::get_lane_size(),
+ __llvm_libc::rpc::client.reset(__llvm_libc::rpc::default_port_count,
+ __llvm_libc::gpu::get_lane_size(),
&__llvm_libc::lock, in, out, buffer);
// We want the fini array callbacks to be run after other atexit
diff --git a/libc/utils/gpu/loader/Server.h b/libc/utils/gpu/loader/Server.h
index 6ffb32955c89..f77bf256618a 100644
--- a/libc/utils/gpu/loader/Server.h
+++ b/libc/utils/gpu/loader/Server.h
@@ -19,47 +19,51 @@
static __llvm_libc::rpc::Server server;
-static __llvm_libc::cpp::Atomic<uint32_t> lock;
+static __llvm_libc::cpp::Atomic<uint32_t>
+ lock[__llvm_libc::rpc::default_port_count] = {0};
/// Queries the RPC client at least once and performs server-side work if there
/// are any active requests.
void handle_server() {
- auto port = server.try_open();
- if (!port)
- return;
+ // Continue servicing the client until there is no work left and we return.
+ for (;;) {
+ auto port = server.try_open();
+ if (!port)
+ return;
- switch (port->get_opcode()) {
- case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: {
- uint64_t str_size[__llvm_libc::rpc::MAX_LANE_SIZE] = {0};
- char *strs[__llvm_libc::rpc::MAX_LANE_SIZE] = {nullptr};
- port->recv_n([&](uint64_t size, uint32_t id) {
- str_size[id] = size;
- strs[id] = new char[size];
- return strs[id];
- });
- for (uint64_t i = 0; i < __llvm_libc::rpc::MAX_LANE_SIZE; ++i) {
- if (strs[i]) {
- fwrite(strs[i], str_size[i], 1, stderr);
- delete[] strs[i];
+ switch (port->get_opcode()) {
+ case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: {
+ uint64_t str_size[__llvm_libc::rpc::MAX_LANE_SIZE] = {0};
+ char *strs[__llvm_libc::rpc::MAX_LANE_SIZE] = {nullptr};
+ port->recv_n([&](uint64_t size, uint32_t id) {
+ str_size[id] = size;
+ strs[id] = new char[size];
+ return strs[id];
+ });
+ for (uint64_t i = 0; i < __llvm_libc::rpc::MAX_LANE_SIZE; ++i) {
+ if (strs[i]) {
+ fwrite(strs[i], str_size[i], 1, stderr);
+ delete[] strs[i];
+ }
}
+ break;
}
- break;
- }
- case __llvm_libc::rpc::Opcode::EXIT: {
- port->recv([](__llvm_libc::rpc::Buffer *buffer) {
- exit(reinterpret_cast<uint32_t *>(buffer->data)[0]);
- });
- break;
- }
- case __llvm_libc::rpc::Opcode::TEST_INCREMENT: {
- port->recv_and_send([](__llvm_libc::rpc::Buffer *buffer) {
- reinterpret_cast<uint64_t *>(buffer->data)[0] += 1;
- });
- break;
- }
- default:
- port->recv([](__llvm_libc::rpc::Buffer *buffer) {});
+ case __llvm_libc::rpc::Opcode::EXIT: {
+ port->recv([](__llvm_libc::rpc::Buffer *buffer) {
+ exit(reinterpret_cast<uint32_t *>(buffer->data)[0]);
+ });
+ break;
+ }
+ case __llvm_libc::rpc::Opcode::TEST_INCREMENT: {
+ port->recv_and_send([](__llvm_libc::rpc::Buffer *buffer) {
+ reinterpret_cast<uint64_t *>(buffer->data)[0] += 1;
+ });
+ break;
+ }
+ default:
+ port->recv([](__llvm_libc::rpc::Buffer *buffer) {});
+ }
+ port->close();
}
- port->close();
}
#endif
diff --git a/libc/utils/gpu/loader/amdgpu/Loader.cpp b/libc/utils/gpu/loader/amdgpu/Loader.cpp
index ee12d6d63ffb..07fa1ae7fe16 100644
--- a/libc/utils/gpu/loader/amdgpu/Loader.cpp
+++ b/libc/utils/gpu/loader/amdgpu/Loader.cpp
@@ -330,6 +330,7 @@ int load(int argc, char **argv, char **envp, void *image, size_t size,
hsa_amd_memory_fill(dev_ret, 0, sizeof(int));
// Allocate finegrained memory for the RPC server and client to share.
+ uint64_t port_size = __llvm_libc::rpc::default_port_count;
uint32_t wavefront_size = 0;
if (hsa_status_t err = hsa_agent_get_info(
dev_agent, HSA_AGENT_INFO_WAVEFRONT_SIZE, &wavefront_size))
@@ -338,18 +339,19 @@ int load(int argc, char **argv, char **envp, void *image, size_t size,
void *server_outbox;
void *buffer;
if (hsa_status_t err = hsa_amd_memory_pool_allocate(
- finegrained_pool, sizeof(__llvm_libc::cpp::Atomic<int>),
+ finegrained_pool, port_size * sizeof(__llvm_libc::cpp::Atomic<int>),
/*flags=*/0, &server_inbox))
handle_error(err);
if (hsa_status_t err = hsa_amd_memory_pool_allocate(
- finegrained_pool, sizeof(__llvm_libc::cpp::Atomic<int>),
+ finegrained_pool, port_size * sizeof(__llvm_libc::cpp::Atomic<int>),
/*flags=*/0, &server_outbox))
handle_error(err);
if (hsa_status_t err = hsa_amd_memory_pool_allocate(
finegrained_pool,
- align_up(sizeof(__llvm_libc::rpc::Header) +
- (wavefront_size * sizeof(__llvm_libc::rpc::Buffer)),
- alignof(__llvm_libc::rpc::Packet)),
+ port_size *
+ align_up(sizeof(__llvm_libc::rpc::Header) +
+ (wavefront_size * sizeof(__llvm_libc::rpc::Buffer)),
+ alignof(__llvm_libc::rpc::Packet)),
/*flags=*/0, &buffer))
handle_error(err);
hsa_amd_agents_allow_access(1, &dev_agent, nullptr, server_inbox);
@@ -357,7 +359,8 @@ int load(int argc, char **argv, char **envp, void *image, size_t size,
hsa_amd_agents_allow_access(1, &dev_agent, nullptr, buffer);
// Initialize the RPC server's buffer for host-device communication.
- server.reset(wavefront_size, &lock, server_inbox, server_outbox, buffer);
+ server.reset(port_size, wavefront_size, &lock, server_inbox, server_outbox,
+ buffer);
// Obtain a queue with the minimum (power of two) size, used to send commands
// to the HSA runtime and launch execution on the device.
diff --git a/libc/utils/gpu/loader/nvptx/Loader.cpp b/libc/utils/gpu/loader/nvptx/Loader.cpp
index ca18da939f4c..314f5a8055fb 100644
--- a/libc/utils/gpu/loader/nvptx/Loader.cpp
+++ b/libc/utils/gpu/loader/nvptx/Loader.cpp
@@ -246,18 +246,22 @@ int load(int argc, char **argv, char **envp, void *image, size_t size,
if (CUresult err = cuMemsetD32(dev_ret, 0, 1))
handle_error(err);
+ uint64_t port_size = __llvm_libc::rpc::default_port_count;
uint32_t warp_size = 32;
- void *server_inbox = allocator(sizeof(__llvm_libc::cpp::Atomic<int>));
- void *server_outbox = allocator(sizeof(__llvm_libc::cpp::Atomic<int>));
- void *buffer =
- allocator(align_up(sizeof(__llvm_libc::rpc::Header) +
- (warp_size * sizeof(__llvm_libc::rpc::Buffer)),
- alignof(__llvm_libc::rpc::Packet)));
+ void *server_inbox =
+ allocator(port_size * sizeof(__llvm_libc::cpp::Atomic<int>));
+ void *server_outbox =
+ allocator(port_size * sizeof(__llvm_libc::cpp::Atomic<int>));
+ void *buffer = allocator(
+ port_size * align_up(sizeof(__llvm_libc::rpc::Header) +
+ (warp_size * sizeof(__llvm_libc::rpc::Buffer)),
+ alignof(__llvm_libc::rpc::Packet)));
if (!server_inbox || !server_outbox || !buffer)
handle_error("Failed to allocate memory the RPC client / server.");
// Initialize the RPC server's buffer for host-device communication.
- server.reset(warp_size, &lock, server_inbox, server_outbox, buffer);
+ server.reset(port_size, warp_size, &lock, server_inbox, server_outbox,
+ buffer);
LaunchParameters single_threaded_params = {1, 1, 1, 1, 1, 1};
// Call the kernel to