@@ -69,6 +69,12 @@ struct alignas(64) Packet {
6969 Payload payload;
7070};
7171
72+ // TODO: This should be configured by the server and passed in. The general rule
73+ // of thumb is that you should have at least as many ports as possible
74+ // concurrent work items on the GPU to mitigate the lack offorward
75+ // progress guarantees on the GPU.
76+ constexpr uint64_t default_port_count = 64 ;
77+
7278// / A common process used to synchronize communication between a client and a
7379// / server. The process contains an inbox and an outbox used for signaling
7480// / ownership of the shared buffer between both sides.
@@ -96,22 +102,31 @@ template <bool InvertInbox> struct Process {
96102 LIBC_INLINE Process &operator =(const Process &) = default ;
97103 LIBC_INLINE ~Process () = default ;
98104
105+ uint64_t port_count;
99106 uint32_t lane_size;
100107 cpp::Atomic<uint32_t > *lock;
101108 cpp::Atomic<uint32_t > *inbox;
102109 cpp::Atomic<uint32_t > *outbox;
103- Packet *buffer ;
110+ Packet *packet ;
104111
105112 // / Initialize the communication channels.
106- LIBC_INLINE void reset (uint32_t lane_size, void *lock, void *inbox,
107- void *outbox, void *buffer) {
108- *this = {
109- lane_size,
110- reinterpret_cast <cpp::Atomic<uint32_t > *>(lock),
111- reinterpret_cast <cpp::Atomic<uint32_t > *>(inbox),
112- reinterpret_cast <cpp::Atomic<uint32_t > *>(outbox),
113- reinterpret_cast <Packet *>(buffer),
114- };
113+ LIBC_INLINE void reset (uint64_t port_count, uint32_t lane_size, void *lock,
114+ void *inbox, void *outbox, void *packet) {
115+ *this = {port_count,
116+ lane_size,
117+ reinterpret_cast <cpp::Atomic<uint32_t > *>(lock),
118+ reinterpret_cast <cpp::Atomic<uint32_t > *>(inbox),
119+ reinterpret_cast <cpp::Atomic<uint32_t > *>(outbox),
120+ reinterpret_cast <Packet *>(packet)};
121+ }
122+
123+ // / The length of the packet is flexible because the server needs to look up
124+ // / the lane size at runtime. This helper indexes at the proper offset.
125+ LIBC_INLINE Packet &get_packet (uint64_t index) {
126+ return *reinterpret_cast <Packet *>(
127+ reinterpret_cast <uint8_t *>(packet) +
128+ index * align_up (sizeof (Header) + lane_size * sizeof (Buffer),
129+ alignof (Packet)));
115130 }
116131
117132 // / Inverting the bits loaded from the inbox in exactly one of the pair of
@@ -190,25 +205,25 @@ template <bool InvertInbox> struct Process {
190205
191206 // / Invokes a function accross every active buffer across the total lane size.
192207 LIBC_INLINE void invoke_rpc (cpp::function<void (Buffer *)> fn,
193- uint32_t index ) {
208+ Packet &packet ) {
194209 if constexpr (is_process_gpu ()) {
195- fn (&buffer[index] .payload .slot [gpu::get_lane_id ()]);
210+ fn (&packet .payload .slot [gpu::get_lane_id ()]);
196211 } else {
197212 for (uint32_t i = 0 ; i < lane_size; i += gpu::get_lane_size ())
198- if (buffer[index] .header .mask & 1ul << i)
199- fn (&buffer[index] .payload .slot [i]);
213+ if (packet .header .mask & 1ul << i)
214+ fn (&packet .payload .slot [i]);
200215 }
201216 }
202217
203218 // / Alternate version that also provides the index of the current lane.
204219 LIBC_INLINE void invoke_rpc (cpp::function<void (Buffer *, uint32_t )> fn,
205- uint32_t index ) {
220+ Packet &packet ) {
206221 if constexpr (is_process_gpu ()) {
207- fn (&buffer[index] .payload .slot [gpu::get_lane_id ()], gpu::get_lane_id ());
222+ fn (&packet .payload .slot [gpu::get_lane_id ()], gpu::get_lane_id ());
208223 } else {
209224 for (uint32_t i = 0 ; i < lane_size; i += gpu::get_lane_size ())
210- if (buffer[index] .header .mask & 1ul << i)
211- fn (&buffer[index] .payload .slot [i], i);
225+ if (packet .header .mask & 1ul << i)
226+ fn (&packet .payload .slot [i], i);
212227 }
213228 }
214229};
@@ -234,7 +249,7 @@ template <bool T> struct Port {
234249 template <typename A> LIBC_INLINE void recv_n (A alloc);
235250
236251 LIBC_INLINE uint16_t get_opcode () const {
237- return process.buffer [ index] .header .opcode ;
252+ return process.get_packet ( index) .header .opcode ;
238253 }
239254
240255 LIBC_INLINE void close () { process.unlock (lane_mask, index); }
@@ -281,7 +296,7 @@ template <bool T> template <typename F> LIBC_INLINE void Port<T>::send(F fill) {
281296 }
282297
283298 // Apply the \p fill function to initialize the buffer and release the memory.
284- process.invoke_rpc (fill, index);
299+ process.invoke_rpc (fill, process. get_packet ( index) );
285300 out = !out;
286301 atomic_thread_fence (cpp::MemoryOrder::RELEASE);
287302 process.outbox [index].store (out, cpp::MemoryOrder::RELAXED);
@@ -299,7 +314,7 @@ template <bool T> template <typename U> LIBC_INLINE void Port<T>::recv(U use) {
299314 atomic_thread_fence (cpp::MemoryOrder::ACQUIRE);
300315
301316 // Apply the \p use function to read the memory out of the buffer.
302- process.invoke_rpc (use, index);
317+ process.invoke_rpc (use, process. get_packet ( index) );
303318 out = !out;
304319 process.outbox [index].store (out, cpp::MemoryOrder::RELAXED);
305320}
@@ -340,7 +355,7 @@ LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) {
340355 inline_memcpy (buffer->data , ptr + idx, len);
341356 });
342357 }
343- gpu::sync_lane (process.buffer [ index] .header .mask );
358+ gpu::sync_lane (process.get_packet ( index) .header .mask );
344359}
345360
346361// / Receives an arbitrarily sized data buffer across the shared channel in
@@ -396,32 +411,34 @@ LIBC_INLINE void Port<T>::recv_n(A alloc) {
396411// / participating thread.
397412[[clang::convergent]] LIBC_INLINE cpp::optional<Client::Port>
398413Client::try_open (uint16_t opcode) {
399- constexpr uint64_t index = 0 ;
400- const uint64_t lane_mask = gpu::get_lane_mask ();
401-
402- // Attempt to acquire the lock on this index.
403- if (!try_lock (lane_mask, index))
404- return cpp:: nullopt ;
405-
406- // The mailbox state must be read with the lock held.
407- atomic_thread_fence (cpp::MemoryOrder::ACQUIRE);
408-
409- uint32_t in = load_inbox (index);
410- uint32_t out = outbox[index].load (cpp::MemoryOrder::RELAXED);
411-
412- // Once we acquire the index we need to check if we are in a valid sending
413- // state.
414- if (buffer_unavailable (in, out)) {
415- unlock (lane_mask, index);
416- return cpp:: nullopt ;
417- }
414+ // Perform a naive linear scan for a port that can be opened to send data.
415+ for ( uint64_t index = 0 ; index < port_count; ++index) {
416+ // Attempt to acquire the lock on this index.
417+ uint64_t lane_mask = gpu::get_lane_mask ();
418+ if (!try_lock (lane_mask, index))
419+ continue ;
420+
421+ // The mailbox state must be read with the lock held.
422+ atomic_thread_fence (cpp::MemoryOrder::ACQUIRE);
423+
424+ uint32_t in = load_inbox (index);
425+ uint32_t out = outbox[index].load (cpp::MemoryOrder::RELAXED);
426+
427+ // Once we acquire the index we need to check if we are in a valid sending
428+ // state.
429+ if (buffer_unavailable (in, out)) {
430+ unlock (lane_mask, index);
431+ continue ;
432+ }
418433
419- if (is_first_lane (lane_mask)) {
420- buffer[index].header .opcode = opcode;
421- buffer[index].header .mask = lane_mask;
434+ if (is_first_lane (lane_mask)) {
435+ get_packet (index).header .opcode = opcode;
436+ get_packet (index).header .mask = lane_mask;
437+ }
438+ gpu::sync_lane (lane_mask);
439+ return Port (*this , lane_mask, index, out);
422440 }
423- gpu::sync_lane (lane_mask);
424- return Port (*this , lane_mask, index, out);
441+ return cpp::nullopt ;
425442}
426443
427444LIBC_INLINE Client::Port Client::open (uint16_t opcode) {
@@ -436,33 +453,36 @@ LIBC_INLINE Client::Port Client::open(uint16_t opcode) {
436453// / port if it has a pending receive operation
437454[[clang::convergent]] LIBC_INLINE cpp::optional<Server::Port>
438455Server::try_open () {
439- constexpr uint64_t index = 0 ;
440- const uint64_t lane_mask = gpu::get_lane_mask ();
441-
442- uint32_t in = load_inbox (index);
443- uint32_t out = outbox[index].load (cpp::MemoryOrder::RELAXED);
444-
445- // The server is passive, if there is no work pending don't bother
446- // opening a port.
447- if (buffer_unavailable (in, out))
448- return cpp::nullopt ;
449-
450- // Attempt to acquire the lock on this index.
451- if (!try_lock (lane_mask, index))
452- return cpp::nullopt ;
453-
454- // The mailbox state must be read with the lock held.
455- atomic_thread_fence (cpp::MemoryOrder::ACQUIRE);
456-
457- in = load_inbox (index);
458- out = outbox[index].load (cpp::MemoryOrder::RELAXED);
456+ // Perform a naive linear scan for a port that has a pending request.
457+ for (uint64_t index = 0 ; index < port_count; ++index) {
458+ uint32_t in = load_inbox (index);
459+ uint32_t out = outbox[index].load (cpp::MemoryOrder::RELAXED);
460+
461+ // The server is passive, if there is no work pending don't bother
462+ // opening a port.
463+ if (buffer_unavailable (in, out))
464+ continue ;
465+
466+ // Attempt to acquire the lock on this index.
467+ uint64_t lane_mask = gpu::get_lane_mask ();
468+ // Attempt to acquire the lock on this index.
469+ if (!try_lock (lane_mask, index))
470+ continue ;
471+
472+ // The mailbox state must be read with the lock held.
473+ atomic_thread_fence (cpp::MemoryOrder::ACQUIRE);
474+
475+ in = load_inbox (index);
476+ out = outbox[index].load (cpp::MemoryOrder::RELAXED);
477+
478+ if (buffer_unavailable (in, out)) {
479+ unlock (lane_mask, index);
480+ continue ;
481+ }
459482
460- if (buffer_unavailable (in, out)) {
461- unlock (lane_mask, index);
462- return cpp::nullopt ;
483+ return Port (*this , lane_mask, index, out);
463484 }
464-
465- return Port (*this , lane_mask, index, out);
485+ return cpp::nullopt ;
466486}
467487
468488LIBC_INLINE Server::Port Server::open () {
0 commit comments