summaryrefslogtreecommitdiff
path: root/libc/src
diff options
context:
space:
mode:
authorJoseph Huber <jhuber6@vols.utk.edu>2023-05-01 12:10:04 -0500
committerJoseph Huber <jhuber6@vols.utk.edu>2023-05-05 10:12:19 -0500
commitaea866c12cb428eb5fe062ffa910a63daff62b01 (patch)
tree9bf301a8c3c7f66949f2a36399ba2a5580b7b195 /libc/src
parenta1be6f0290effb26ff5e0d7a43ab20931f1353a2 (diff)
downloadllvm-aea866c12cb428eb5fe062ffa910a63daff62b01.tar.gz
[libc] Support concurrent RPC port access on the GPU
Previously we used a single port to implement the RPC. This was sufficient for single threaded tests but can potentially cause deadlocks when using multiple threads. The reason for this is that GPUs make no forward progress guarantees. Therefore one group of threads waiting on another group of threads can spin forever because there is no guarantee that the other threads will continue executing. The typical workaround for this is to allocate enough memory that a sufficiently large number of work groups can make progress. As long as this number is somewhat close to the amount of total concurrency we can obtain reliable execution around a shared resource. This patch enables using multiple ports by widening the arrays to a predetermined size and indexes into them. Empty ports are currently obtained via a trivial linker scan. This should be imporoved in the future for performance reasons. Portions of D148191 were applied to achieve parallel support. Depends on D149581 Reviewed By: JonChesterfield Differential Revision: https://reviews.llvm.org/D149598
Diffstat (limited to 'libc/src')
-rw-r--r--libc/src/__support/RPC/rpc.h162
-rw-r--r--libc/src/__support/RPC/rpc_util.h5
2 files changed, 96 insertions, 71 deletions
diff --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h
index 170f74830b14..5395c4e57332 100644
--- a/libc/src/__support/RPC/rpc.h
+++ b/libc/src/__support/RPC/rpc.h
@@ -69,6 +69,12 @@ struct alignas(64) Packet {
Payload payload;
};
+// TODO: This should be configured by the server and passed in. The general rule
+// of thumb is that you should have at least as many ports as possible
+// concurrent work items on the GPU to mitigate the lack offorward
+// progress guarantees on the GPU.
+constexpr uint64_t default_port_count = 64;
+
/// A common process used to synchronize communication between a client and a
/// server. The process contains an inbox and an outbox used for signaling
/// ownership of the shared buffer between both sides.
@@ -96,22 +102,31 @@ template <bool InvertInbox> struct Process {
LIBC_INLINE Process &operator=(const Process &) = default;
LIBC_INLINE ~Process() = default;
+ uint64_t port_count;
uint32_t lane_size;
cpp::Atomic<uint32_t> *lock;
cpp::Atomic<uint32_t> *inbox;
cpp::Atomic<uint32_t> *outbox;
- Packet *buffer;
+ Packet *packet;
/// Initialize the communication channels.
- LIBC_INLINE void reset(uint32_t lane_size, void *lock, void *inbox,
- void *outbox, void *buffer) {
- *this = {
- lane_size,
- reinterpret_cast<cpp::Atomic<uint32_t> *>(lock),
- reinterpret_cast<cpp::Atomic<uint32_t> *>(inbox),
- reinterpret_cast<cpp::Atomic<uint32_t> *>(outbox),
- reinterpret_cast<Packet *>(buffer),
- };
+ LIBC_INLINE void reset(uint64_t port_count, uint32_t lane_size, void *lock,
+ void *inbox, void *outbox, void *packet) {
+ *this = {port_count,
+ lane_size,
+ reinterpret_cast<cpp::Atomic<uint32_t> *>(lock),
+ reinterpret_cast<cpp::Atomic<uint32_t> *>(inbox),
+ reinterpret_cast<cpp::Atomic<uint32_t> *>(outbox),
+ reinterpret_cast<Packet *>(packet)};
+ }
+
+ /// The length of the packet is flexible because the server needs to look up
+ /// the lane size at runtime. This helper indexes at the proper offset.
+ LIBC_INLINE Packet &get_packet(uint64_t index) {
+ return *reinterpret_cast<Packet *>(
+ reinterpret_cast<uint8_t *>(packet) +
+ index * align_up(sizeof(Header) + lane_size * sizeof(Buffer),
+ alignof(Packet)));
}
/// Inverting the bits loaded from the inbox in exactly one of the pair of
@@ -190,25 +205,25 @@ template <bool InvertInbox> struct Process {
/// Invokes a function accross every active buffer across the total lane size.
LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *)> fn,
- uint32_t index) {
+ Packet &packet) {
if constexpr (is_process_gpu()) {
- fn(&buffer[index].payload.slot[gpu::get_lane_id()]);
+ fn(&packet.payload.slot[gpu::get_lane_id()]);
} else {
for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
- if (buffer[index].header.mask & 1ul << i)
- fn(&buffer[index].payload.slot[i]);
+ if (packet.header.mask & 1ul << i)
+ fn(&packet.payload.slot[i]);
}
}
/// Alternate version that also provides the index of the current lane.
LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *, uint32_t)> fn,
- uint32_t index) {
+ Packet &packet) {
if constexpr (is_process_gpu()) {
- fn(&buffer[index].payload.slot[gpu::get_lane_id()], gpu::get_lane_id());
+ fn(&packet.payload.slot[gpu::get_lane_id()], gpu::get_lane_id());
} else {
for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
- if (buffer[index].header.mask & 1ul << i)
- fn(&buffer[index].payload.slot[i], i);
+ if (packet.header.mask & 1ul << i)
+ fn(&packet.payload.slot[i], i);
}
}
};
@@ -234,7 +249,7 @@ template <bool T> struct Port {
template <typename A> LIBC_INLINE void recv_n(A alloc);
LIBC_INLINE uint16_t get_opcode() const {
- return process.buffer[index].header.opcode;
+ return process.get_packet(index).header.opcode;
}
LIBC_INLINE void close() { process.unlock(lane_mask, index); }
@@ -281,7 +296,7 @@ template <bool T> template <typename F> LIBC_INLINE void Port<T>::send(F fill) {
}
// Apply the \p fill function to initialize the buffer and release the memory.
- process.invoke_rpc(fill, index);
+ process.invoke_rpc(fill, process.get_packet(index));
out = !out;
atomic_thread_fence(cpp::MemoryOrder::RELEASE);
process.outbox[index].store(out, cpp::MemoryOrder::RELAXED);
@@ -299,7 +314,7 @@ template <bool T> template <typename U> LIBC_INLINE void Port<T>::recv(U use) {
atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
// Apply the \p use function to read the memory out of the buffer.
- process.invoke_rpc(use, index);
+ process.invoke_rpc(use, process.get_packet(index));
out = !out;
process.outbox[index].store(out, cpp::MemoryOrder::RELAXED);
}
@@ -340,7 +355,7 @@ LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) {
inline_memcpy(buffer->data, ptr + idx, len);
});
}
- gpu::sync_lane(process.buffer[index].header.mask);
+ gpu::sync_lane(process.get_packet(index).header.mask);
}
/// Receives an arbitrarily sized data buffer across the shared channel in
@@ -396,32 +411,34 @@ LIBC_INLINE void Port<T>::recv_n(A alloc) {
/// participating thread.
[[clang::convergent]] LIBC_INLINE cpp::optional<Client::Port>
Client::try_open(uint16_t opcode) {
- constexpr uint64_t index = 0;
- const uint64_t lane_mask = gpu::get_lane_mask();
-
- // Attempt to acquire the lock on this index.
- if (!try_lock(lane_mask, index))
- return cpp::nullopt;
-
- // The mailbox state must be read with the lock held.
- atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
-
- uint32_t in = load_inbox(index);
- uint32_t out = outbox[index].load(cpp::MemoryOrder::RELAXED);
-
- // Once we acquire the index we need to check if we are in a valid sending
- // state.
- if (buffer_unavailable(in, out)) {
- unlock(lane_mask, index);
- return cpp::nullopt;
- }
+ // Perform a naive linear scan for a port that can be opened to send data.
+ for (uint64_t index = 0; index < port_count; ++index) {
+ // Attempt to acquire the lock on this index.
+ uint64_t lane_mask = gpu::get_lane_mask();
+ if (!try_lock(lane_mask, index))
+ continue;
+
+ // The mailbox state must be read with the lock held.
+ atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
+
+ uint32_t in = load_inbox(index);
+ uint32_t out = outbox[index].load(cpp::MemoryOrder::RELAXED);
+
+ // Once we acquire the index we need to check if we are in a valid sending
+ // state.
+ if (buffer_unavailable(in, out)) {
+ unlock(lane_mask, index);
+ continue;
+ }
- if (is_first_lane(lane_mask)) {
- buffer[index].header.opcode = opcode;
- buffer[index].header.mask = lane_mask;
+ if (is_first_lane(lane_mask)) {
+ get_packet(index).header.opcode = opcode;
+ get_packet(index).header.mask = lane_mask;
+ }
+ gpu::sync_lane(lane_mask);
+ return Port(*this, lane_mask, index, out);
}
- gpu::sync_lane(lane_mask);
- return Port(*this, lane_mask, index, out);
+ return cpp::nullopt;
}
LIBC_INLINE Client::Port Client::open(uint16_t opcode) {
@@ -436,33 +453,36 @@ LIBC_INLINE Client::Port Client::open(uint16_t opcode) {
/// port if it has a pending receive operation
[[clang::convergent]] LIBC_INLINE cpp::optional<Server::Port>
Server::try_open() {
- constexpr uint64_t index = 0;
- const uint64_t lane_mask = gpu::get_lane_mask();
-
- uint32_t in = load_inbox(index);
- uint32_t out = outbox[index].load(cpp::MemoryOrder::RELAXED);
-
- // The server is passive, if there is no work pending don't bother
- // opening a port.
- if (buffer_unavailable(in, out))
- return cpp::nullopt;
-
- // Attempt to acquire the lock on this index.
- if (!try_lock(lane_mask, index))
- return cpp::nullopt;
-
- // The mailbox state must be read with the lock held.
- atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
-
- in = load_inbox(index);
- out = outbox[index].load(cpp::MemoryOrder::RELAXED);
+ // Perform a naive linear scan for a port that has a pending request.
+ for (uint64_t index = 0; index < port_count; ++index) {
+ uint32_t in = load_inbox(index);
+ uint32_t out = outbox[index].load(cpp::MemoryOrder::RELAXED);
+
+ // The server is passive, if there is no work pending don't bother
+ // opening a port.
+ if (buffer_unavailable(in, out))
+ continue;
+
+ // Attempt to acquire the lock on this index.
+ uint64_t lane_mask = gpu::get_lane_mask();
+ // Attempt to acquire the lock on this index.
+ if (!try_lock(lane_mask, index))
+ continue;
+
+ // The mailbox state must be read with the lock held.
+ atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
+
+ in = load_inbox(index);
+ out = outbox[index].load(cpp::MemoryOrder::RELAXED);
+
+ if (buffer_unavailable(in, out)) {
+ unlock(lane_mask, index);
+ continue;
+ }
- if (buffer_unavailable(in, out)) {
- unlock(lane_mask, index);
- return cpp::nullopt;
+ return Port(*this, lane_mask, index, out);
}
-
- return Port(*this, lane_mask, index, out);
+ return cpp::nullopt;
}
LIBC_INLINE Server::Port Server::open() {
diff --git a/libc/src/__support/RPC/rpc_util.h b/libc/src/__support/RPC/rpc_util.h
index 224723ef20c9..c6282e40c903 100644
--- a/libc/src/__support/RPC/rpc_util.h
+++ b/libc/src/__support/RPC/rpc_util.h
@@ -49,6 +49,11 @@ LIBC_INLINE constexpr bool is_process_gpu() {
#endif
}
+/// Return \p val aligned "upwards" according to \p align.
+template <typename V, typename A> LIBC_INLINE V align_up(V val, A align) {
+ return ((val + V(align) - 1) / V(align)) * V(align);
+}
+
} // namespace rpc
} // namespace __llvm_libc