Skip to content

Commit 69c9e3f

Browse files
committed
rxrpc: Move call state changes from sendmsg to I/O thread
jira LE-1907 Rebuild_History Non-Buildable kernel-rt-5.14.0-284.30.1.rt14.315.el9_2 commit-author David Howells <dhowells@redhat.com> commit 2d68942 Empty-Commit: Cherry-Pick Conflicts during history rebuild. Will be included in final tarball splat. Ref for failed cherry-pick at: ciq/ciq_backports/kernel-rt-5.14.0-284.30.1.rt14.315.el9_2/2d689424.failed Move all the call state changes that are made in rxrpc_sendmsg() to the I/O thread. This is a step towards removing the call state lock. This requires the switch to the RXRPC_CALL_CLIENT_AWAIT_REPLY and RXRPC_CALL_SERVER_SEND_REPLY states to be done when the last packet is decanted from ->tx_sendmsg to ->tx_buffer in the I/O thread, not when it is added to ->tx_sendmsg by sendmsg(). Signed-off-by: David Howells <dhowells@redhat.com> cc: Marc Dionne <marc.dionne@auristor.com> cc: linux-afs@lists.infradead.org (cherry picked from commit 2d68942) Signed-off-by: Jonathan Maple <jmaple@ciq.com> # Conflicts: # net/rxrpc/call_event.c # net/rxrpc/sendmsg.c
1 parent ba67555 commit 69c9e3f

File tree

1 file changed

+320
-0
lines changed

1 file changed

+320
-0
lines changed
Lines changed: 320 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,320 @@
1+
rxrpc: Move call state changes from sendmsg to I/O thread
2+
3+
jira LE-1907
4+
Rebuild_History Non-Buildable kernel-rt-5.14.0-284.30.1.rt14.315.el9_2
5+
commit-author David Howells <dhowells@redhat.com>
6+
commit 2d689424b6184535890c251f937ccf815fde9cd2
7+
Empty-Commit: Cherry-Pick Conflicts during history rebuild.
8+
Will be included in final tarball splat. Ref for failed cherry-pick at:
9+
ciq/ciq_backports/kernel-rt-5.14.0-284.30.1.rt14.315.el9_2/2d689424.failed
10+
11+
Move all the call state changes that are made in rxrpc_sendmsg() to the I/O
12+
thread. This is a step towards removing the call state lock.
13+
14+
This requires the switch to the RXRPC_CALL_CLIENT_AWAIT_REPLY and
15+
RXRPC_CALL_SERVER_SEND_REPLY states to be done when the last packet is
16+
decanted from ->tx_sendmsg to ->tx_buffer in the I/O thread, not when it is
17+
added to ->tx_sendmsg by sendmsg().
18+
19+
Signed-off-by: David Howells <dhowells@redhat.com>
20+
cc: Marc Dionne <marc.dionne@auristor.com>
21+
cc: linux-afs@lists.infradead.org
22+
(cherry picked from commit 2d689424b6184535890c251f937ccf815fde9cd2)
23+
Signed-off-by: Jonathan Maple <jmaple@ciq.com>
24+
25+
# Conflicts:
26+
# net/rxrpc/call_event.c
27+
# net/rxrpc/sendmsg.c
28+
diff --cc net/rxrpc/call_event.c
29+
index a95f4604cb29,2e3c01060d59..000000000000
30+
--- a/net/rxrpc/call_event.c
31+
+++ b/net/rxrpc/call_event.c
32+
@@@ -291,6 -251,131 +291,134 @@@ out
33+
_leave("");
34+
}
35+
36+
++<<<<<<< HEAD
37+
++=======
38+
+ /*
39+
+ * Start transmitting the reply to a service. This cancels the need to ACK the
40+
+ * request if we haven't yet done so.
41+
+ */
42+
+ static void rxrpc_begin_service_reply(struct rxrpc_call *call)
43+
+ {
44+
+ unsigned long now;
45+
+
46+
+ write_lock(&call->state_lock);
47+
+
48+
+ if (call->state == RXRPC_CALL_SERVER_ACK_REQUEST) {
49+
+ now = jiffies;
50+
+ call->state = RXRPC_CALL_SERVER_SEND_REPLY;
51+
+ WRITE_ONCE(call->delay_ack_at, now + MAX_JIFFY_OFFSET);
52+
+ if (call->ackr_reason == RXRPC_ACK_DELAY)
53+
+ call->ackr_reason = 0;
54+
+ trace_rxrpc_timer(call, rxrpc_timer_init_for_send_reply, now);
55+
+ }
56+
+
57+
+ write_unlock(&call->state_lock);
58+
+ }
59+
+
60+
+ /*
61+
+ * Close the transmission phase. After this point there is no more data to be
62+
+ * transmitted in the call.
63+
+ */
64+
+ static void rxrpc_close_tx_phase(struct rxrpc_call *call)
65+
+ {
66+
+ _debug("________awaiting reply/ACK__________");
67+
+
68+
+ write_lock(&call->state_lock);
69+
+ switch (call->state) {
70+
+ case RXRPC_CALL_CLIENT_SEND_REQUEST:
71+
+ call->state = RXRPC_CALL_CLIENT_AWAIT_REPLY;
72+
+ break;
73+
+ case RXRPC_CALL_SERVER_SEND_REPLY:
74+
+ call->state = RXRPC_CALL_SERVER_AWAIT_ACK;
75+
+ break;
76+
+ default:
77+
+ break;
78+
+ }
79+
+ write_unlock(&call->state_lock);
80+
+ }
81+
+
82+
+ static bool rxrpc_tx_window_has_space(struct rxrpc_call *call)
83+
+ {
84+
+ unsigned int winsize = min_t(unsigned int, call->tx_winsize,
85+
+ call->cong_cwnd + call->cong_extra);
86+
+ rxrpc_seq_t window = call->acks_hard_ack, wtop = window + winsize;
87+
+ rxrpc_seq_t tx_top = call->tx_top;
88+
+ int space;
89+
+
90+
+ space = wtop - tx_top;
91+
+ return space > 0;
92+
+ }
93+
+
94+
+ /*
95+
+ * Decant some if the sendmsg prepared queue into the transmission buffer.
96+
+ */
97+
+ static void rxrpc_decant_prepared_tx(struct rxrpc_call *call)
98+
+ {
99+
+ struct rxrpc_txbuf *txb;
100+
+
101+
+ if (!test_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
102+
+ if (list_empty(&call->tx_sendmsg))
103+
+ return;
104+
+ rxrpc_expose_client_call(call);
105+
+ }
106+
+
107+
+ while ((txb = list_first_entry_or_null(&call->tx_sendmsg,
108+
+ struct rxrpc_txbuf, call_link))) {
109+
+ spin_lock(&call->tx_lock);
110+
+ list_del(&txb->call_link);
111+
+ spin_unlock(&call->tx_lock);
112+
+
113+
+ call->tx_top = txb->seq;
114+
+ list_add_tail(&txb->call_link, &call->tx_buffer);
115+
+
116+
+ if (txb->wire.flags & RXRPC_LAST_PACKET)
117+
+ rxrpc_close_tx_phase(call);
118+
+
119+
+ rxrpc_transmit_one(call, txb);
120+
+
121+
+ if (!rxrpc_tx_window_has_space(call))
122+
+ break;
123+
+ }
124+
+ }
125+
+
126+
+ static void rxrpc_transmit_some_data(struct rxrpc_call *call)
127+
+ {
128+
+ switch (call->state) {
129+
+ case RXRPC_CALL_SERVER_ACK_REQUEST:
130+
+ if (list_empty(&call->tx_sendmsg))
131+
+ return;
132+
+ rxrpc_begin_service_reply(call);
133+
+ fallthrough;
134+
+
135+
+ case RXRPC_CALL_SERVER_SEND_REPLY:
136+
+ case RXRPC_CALL_CLIENT_SEND_REQUEST:
137+
+ if (!rxrpc_tx_window_has_space(call))
138+
+ return;
139+
+ if (list_empty(&call->tx_sendmsg)) {
140+
+ rxrpc_inc_stat(call->rxnet, stat_tx_data_underflow);
141+
+ return;
142+
+ }
143+
+ rxrpc_decant_prepared_tx(call);
144+
+ break;
145+
+ default:
146+
+ return;
147+
+ }
148+
+ }
149+
+
150+
+ /*
151+
+ * Ping the other end to fill our RTT cache and to retrieve the rwind
152+
+ * and MTU parameters.
153+
+ */
154+
+ static void rxrpc_send_initial_ping(struct rxrpc_call *call)
155+
+ {
156+
+ if (call->peer->rtt_count < 3 ||
157+
+ ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000),
158+
+ ktime_get_real()))
159+
+ rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
160+
+ rxrpc_propose_ack_ping_for_params);
161+
+ }
162+
+
163+
++>>>>>>> 2d689424b618 (rxrpc: Move call state changes from sendmsg to I/O thread)
164+
/*
165+
* Handle retransmission and deferred ACK/abort generation.
166+
*/
167+
diff --cc net/rxrpc/sendmsg.c
168+
index 45c09f0de6fe,0428528abbf4..000000000000
169+
--- a/net/rxrpc/sendmsg.c
170+
+++ b/net/rxrpc/sendmsg.c
171+
@@@ -204,10 -189,8 +204,9 @@@ static void rxrpc_queue_packet(struct r
172+
struct rxrpc_txbuf *txb,
173+
rxrpc_notify_end_tx_t notify_end_tx)
174+
{
175+
- unsigned long now;
176+
rxrpc_seq_t seq = txb->seq;
177+
- bool last = test_bit(RXRPC_TXBUF_LAST, &txb->flags), poke;
178+
+ bool last = test_bit(RXRPC_TXBUF_LAST, &txb->flags);
179+
+ int ret;
180+
181+
rxrpc_inc_stat(call->rxnet, stat_tx_data);
182+
183+
@@@ -230,58 -206,17 +229,72 @@@
184+
else
185+
trace_rxrpc_txqueue(call, rxrpc_txqueue_queue);
186+
187+
++<<<<<<< HEAD
188+
+ if (last || call->state == RXRPC_CALL_SERVER_ACK_REQUEST) {
189+
+ _debug("________awaiting reply/ACK__________");
190+
+ write_lock_bh(&call->state_lock);
191+
+ switch (call->state) {
192+
+ case RXRPC_CALL_CLIENT_SEND_REQUEST:
193+
+ call->state = RXRPC_CALL_CLIENT_AWAIT_REPLY;
194+
+ rxrpc_notify_end_tx(rx, call, notify_end_tx);
195+
+ break;
196+
+ case RXRPC_CALL_SERVER_ACK_REQUEST:
197+
+ call->state = RXRPC_CALL_SERVER_SEND_REPLY;
198+
+ now = jiffies;
199+
+ WRITE_ONCE(call->delay_ack_at, now + MAX_JIFFY_OFFSET);
200+
+ if (call->ackr_reason == RXRPC_ACK_DELAY)
201+
+ call->ackr_reason = 0;
202+
+ trace_rxrpc_timer(call, rxrpc_timer_init_for_send_reply, now);
203+
+ if (!last)
204+
+ break;
205+
+ fallthrough;
206+
+ case RXRPC_CALL_SERVER_SEND_REPLY:
207+
+ call->state = RXRPC_CALL_SERVER_AWAIT_ACK;
208+
+ rxrpc_notify_end_tx(rx, call, notify_end_tx);
209+
+ break;
210+
+ default:
211+
+ break;
212+
+ }
213+
+ write_unlock_bh(&call->state_lock);
214+
+ }
215+
+
216+
+ if (seq == 1 && rxrpc_is_client_call(call))
217+
+ rxrpc_expose_client_call(call);
218+
+
219+
+ ret = rxrpc_send_data_packet(call, txb);
220+
+ if (ret < 0) {
221+
+ switch (ret) {
222+
+ case -ENETUNREACH:
223+
+ case -EHOSTUNREACH:
224+
+ case -ECONNREFUSED:
225+
+ rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
226+
+ 0, ret);
227+
+ goto out;
228+
+ }
229+
+ } else {
230+
+ unsigned long now = jiffies;
231+
+ unsigned long resend_at = now + call->peer->rto_j;
232+
+
233+
+ WRITE_ONCE(call->resend_at, resend_at);
234+
+ rxrpc_reduce_call_timer(call, resend_at, now,
235+
+ rxrpc_timer_set_for_send);
236+
+ }
237+
+
238+
+out:
239+
+ rxrpc_put_txbuf(txb, rxrpc_txbuf_put_trans);
240+
++=======
241+
+ /* Add the packet to the call's output buffer */
242+
+ spin_lock(&call->tx_lock);
243+
+ poke = list_empty(&call->tx_sendmsg);
244+
+ list_add_tail(&txb->call_link, &call->tx_sendmsg);
245+
+ call->tx_prepared = seq;
246+
+ if (last)
247+
+ rxrpc_notify_end_tx(rx, call, notify_end_tx);
248+
+ spin_unlock(&call->tx_lock);
249+
+
250+
+ if (poke)
251+
+ rxrpc_poke_call(call, rxrpc_call_poke_start);
252+
++>>>>>>> 2d689424b618 (rxrpc: Move call state changes from sendmsg to I/O thread)
253+
}
254+
255+
/*
256+
@@@ -716,11 -649,7 +733,15 @@@ int rxrpc_do_sendmsg(struct rxrpc_sock
257+
break;
258+
}
259+
260+
++<<<<<<< HEAD
261+
+ state = READ_ONCE(call->state);
262+
+ _debug("CALL %d USR %lx ST %d on CONN %p",
263+
+ call->debug_id, call->user_call_ID, state, call->conn);
264+
+
265+
+ if (state >= RXRPC_CALL_COMPLETE) {
266+
++=======
267+
+ if (rxrpc_call_is_complete(call)) {
268+
++>>>>>>> 2d689424b618 (rxrpc: Move call state changes from sendmsg to I/O thread)
269+
/* it's too late for this call */
270+
ret = -ESHUTDOWN;
271+
} else if (p.command == RXRPC_CMD_SEND_ABORT) {
272+
@@@ -776,24 -705,10 +797,31 @@@ int rxrpc_kernel_send_data(struct socke
273+
_debug("CALL %d USR %lx ST %d on CONN %p",
274+
call->debug_id, call->user_call_ID, call->state, call->conn);
275+
276+
++<<<<<<< HEAD
277+
+ switch (READ_ONCE(call->state)) {
278+
+ case RXRPC_CALL_CLIENT_SEND_REQUEST:
279+
+ case RXRPC_CALL_SERVER_ACK_REQUEST:
280+
+ case RXRPC_CALL_SERVER_SEND_REPLY:
281+
+ ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len,
282+
+ notify_end_tx, &dropped_lock);
283+
+ break;
284+
+ case RXRPC_CALL_COMPLETE:
285+
+ read_lock_bh(&call->state_lock);
286+
+ ret = call->error;
287+
+ read_unlock_bh(&call->state_lock);
288+
+ break;
289+
+ default:
290+
+ /* Request phase complete for this client call */
291+
+ trace_rxrpc_rx_eproto(call, 0, tracepoint_string("late_send"));
292+
+ ret = -EPROTO;
293+
+ break;
294+
+ }
295+
++=======
296+
+ ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len,
297+
+ notify_end_tx, &dropped_lock);
298+
+ if (ret == -ESHUTDOWN)
299+
+ ret = call->error;
300+
++>>>>>>> 2d689424b618 (rxrpc: Move call state changes from sendmsg to I/O thread)
301+
302+
if (!dropped_lock)
303+
mutex_unlock(&call->user_mutex);
304+
diff --git a/Documentation/networking/rxrpc.rst b/Documentation/networking/rxrpc.rst
305+
index 39494a6ea739..e1af54424192 100644
306+
--- a/Documentation/networking/rxrpc.rst
307+
+++ b/Documentation/networking/rxrpc.rst
308+
@@ -880,8 +880,8 @@ The kernel interface functions are as follows:
309+
310+
notify_end_rx can be NULL or it can be used to specify a function to be
311+
called when the call changes state to end the Tx phase. This function is
312+
- called with the call-state spinlock held to prevent any reply or final ACK
313+
- from being delivered first.
314+
+ called with a spinlock held to prevent the last DATA packet from being
315+
+ transmitted until the function returns.
316+
317+
(#) Receive data from a call::
318+
319+
* Unmerged path net/rxrpc/call_event.c
320+
* Unmerged path net/rxrpc/sendmsg.c

0 commit comments

Comments
 (0)