Skip to content

Commit cf31349

Browse files
committed
rxrpc: Move call state changes from recvmsg to I/O thread
jira LE-1907 Rebuild_History Non-Buildable kernel-rt-5.14.0-284.30.1.rt14.315.el9_2 commit-author David Howells <dhowells@redhat.com> commit 93368b6 Empty-Commit: Cherry-Pick Conflicts during history rebuild. Will be included in final tarball splat. Ref for failed cherry-pick at: ciq/ciq_backports/kernel-rt-5.14.0-284.30.1.rt14.315.el9_2/93368b6b.failed Move the call state changes that are made in rxrpc_recvmsg() to the I/O thread. This means that, thenceforth, only the I/O thread does this and the call state lock can be removed. This requires the Rx phase to be ended when the last packet is received, not when it is processed. Since this now changes the rxrpc call state to SUCCEEDED before we've consumed all the data from it, rxrpc_kernel_check_life() mustn't say the call is dead until the recvmsg queue is empty (unless the call has failed). Signed-off-by: David Howells <dhowells@redhat.com> cc: Marc Dionne <marc.dionne@auristor.com> cc: linux-afs@lists.infradead.org (cherry picked from commit 93368b6) Signed-off-by: Jonathan Maple <jmaple@ciq.com> # Conflicts: # net/rxrpc/af_rxrpc.c # net/rxrpc/ar-internal.h # net/rxrpc/recvmsg.c
1 parent 69c9e3f commit cf31349

File tree

1 file changed

+363
-0
lines changed

1 file changed

+363
-0
lines changed
Lines changed: 363 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,363 @@
1+
rxrpc: Move call state changes from recvmsg to I/O thread
2+
3+
jira LE-1907
4+
Rebuild_History Non-Buildable kernel-rt-5.14.0-284.30.1.rt14.315.el9_2
5+
commit-author David Howells <dhowells@redhat.com>
6+
commit 93368b6bd58ac49d804fdc9ab041a6dc89ebf1cc
7+
Empty-Commit: Cherry-Pick Conflicts during history rebuild.
8+
Will be included in final tarball splat. Ref for failed cherry-pick at:
9+
ciq/ciq_backports/kernel-rt-5.14.0-284.30.1.rt14.315.el9_2/93368b6b.failed
10+
11+
Move the call state changes that are made in rxrpc_recvmsg() to the I/O
12+
thread. This means that, thenceforth, only the I/O thread does this and
13+
the call state lock can be removed.
14+
15+
This requires the Rx phase to be ended when the last packet is received,
16+
not when it is processed.
17+
18+
Since this now changes the rxrpc call state to SUCCEEDED before we've
19+
consumed all the data from it, rxrpc_kernel_check_life() mustn't say the
20+
call is dead until the recvmsg queue is empty (unless the call has failed).
21+
22+
Signed-off-by: David Howells <dhowells@redhat.com>
23+
cc: Marc Dionne <marc.dionne@auristor.com>
24+
cc: linux-afs@lists.infradead.org
25+
(cherry picked from commit 93368b6bd58ac49d804fdc9ab041a6dc89ebf1cc)
26+
Signed-off-by: Jonathan Maple <jmaple@ciq.com>
27+
28+
# Conflicts:
29+
# net/rxrpc/af_rxrpc.c
30+
# net/rxrpc/ar-internal.h
31+
# net/rxrpc/recvmsg.c
32+
diff --cc net/rxrpc/af_rxrpc.c
33+
index 0f4d34f420f0,cf200e4e0eae..000000000000
34+
--- a/net/rxrpc/af_rxrpc.c
35+
+++ b/net/rxrpc/af_rxrpc.c
36+
@@@ -380,7 -379,11 +380,15 @@@ EXPORT_SYMBOL(rxrpc_kernel_end_call)
37+
bool rxrpc_kernel_check_life(const struct socket *sock,
38+
const struct rxrpc_call *call)
39+
{
40+
++<<<<<<< HEAD
41+
+ return call->state != RXRPC_CALL_COMPLETE;
42+
++=======
43+
+ if (!rxrpc_call_is_complete(call))
44+
+ return true;
45+
+ if (call->completion != RXRPC_CALL_SUCCEEDED)
46+
+ return false;
47+
+ return !skb_queue_empty(&call->recvmsg_queue);
48+
++>>>>>>> 93368b6bd58a (rxrpc: Move call state changes from recvmsg to I/O thread)
49+
}
50+
EXPORT_SYMBOL(rxrpc_kernel_check_life);
51+
52+
diff --cc net/rxrpc/ar-internal.h
53+
index 46ce41afb431,861273439736..000000000000
54+
--- a/net/rxrpc/ar-internal.h
55+
+++ b/net/rxrpc/ar-internal.h
56+
@@@ -506,8 -544,9 +506,14 @@@ enum rxrpc_call_flag
57+
RXRPC_CALL_DISCONNECTED, /* The call has been disconnected */
58+
RXRPC_CALL_KERNEL, /* The call was made by the kernel */
59+
RXRPC_CALL_UPGRADE, /* Service upgrade was requested for the call */
60+
++<<<<<<< HEAD
61+
+ RXRPC_CALL_DELAY_ACK_PENDING, /* DELAY ACK generation is pending */
62+
+ RXRPC_CALL_IDLE_ACK_PENDING, /* IDLE ACK generation is pending */
63+
++=======
64+
+ RXRPC_CALL_EXCLUSIVE, /* The call uses a once-only connection */
65+
+ RXRPC_CALL_RX_IS_IDLE, /* recvmsg() is idle - send an ACK */
66+
+ RXRPC_CALL_RECVMSG_READ_ALL, /* recvmsg() read all of the received data */
67+
++>>>>>>> 93368b6bd58a (rxrpc: Move call state changes from recvmsg to I/O thread)
68+
};
69+
70+
/*
71+
diff --cc net/rxrpc/recvmsg.c
72+
index c84d2b620396,dd54ceee7bcc..000000000000
73+
--- a/net/rxrpc/recvmsg.c
74+
+++ b/net/rxrpc/recvmsg.c
75+
@@@ -180,41 -101,6 +180,44 @@@ static int rxrpc_recvmsg_term(struct rx
76+
}
77+
78+
/*
79+
++<<<<<<< HEAD
80+
+ * End the packet reception phase.
81+
+ */
82+
+static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
83+
+{
84+
+ rxrpc_seq_t whigh = READ_ONCE(call->rx_highest_seq);
85+
+
86+
+ _enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
87+
+
88+
+ trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh);
89+
+
90+
+ if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY)
91+
+ rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack);
92+
+
93+
+ write_lock_bh(&call->state_lock);
94+
+
95+
+ switch (call->state) {
96+
+ case RXRPC_CALL_CLIENT_RECV_REPLY:
97+
+ __rxrpc_call_completed(call);
98+
+ write_unlock_bh(&call->state_lock);
99+
+ break;
100+
+
101+
+ case RXRPC_CALL_SERVER_RECV_REQUEST:
102+
+ call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
103+
+ call->expect_req_by = jiffies + MAX_JIFFY_OFFSET;
104+
+ write_unlock_bh(&call->state_lock);
105+
+ rxrpc_propose_delay_ACK(call, serial,
106+
+ rxrpc_propose_ack_processing_op);
107+
+ break;
108+
+ default:
109+
+ write_unlock_bh(&call->state_lock);
110+
+ break;
111+
+ }
112+
+}
113+
+
114+
+/*
115+
++=======
116+
++>>>>>>> 93368b6bd58a (rxrpc: Move call state changes from recvmsg to I/O thread)
117+
* Discard a packet we've used up and advance the Rx window by one.
118+
*/
119+
static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
120+
@@@ -291,7 -174,13 +294,17 @@@ static int rxrpc_recvmsg_data(struct so
121+
rx_pkt_offset = call->rx_pkt_offset;
122+
rx_pkt_len = call->rx_pkt_len;
123+
124+
++<<<<<<< HEAD
125+
+ if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) {
126+
++=======
127+
+ if (rxrpc_call_has_failed(call)) {
128+
+ seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1;
129+
+ ret = -EIO;
130+
+ goto done;
131+
+ }
132+
+
133+
+ if (test_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags)) {
134+
++>>>>>>> 93368b6bd58a (rxrpc: Move call state changes from recvmsg to I/O thread)
135+
seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1;
136+
ret = 1;
137+
goto done;
138+
@@@ -323,7 -211,8 +335,12 @@@
139+
ret = ret2;
140+
goto out;
141+
}
142+
++<<<<<<< HEAD
143+
+ rxrpc_transmit_ack_packets(call->peer->local);
144+
++=======
145+
+ rx_pkt_offset = sp->offset;
146+
+ rx_pkt_len = sp->len;
147+
++>>>>>>> 93368b6bd58a (rxrpc: Move call state changes from recvmsg to I/O thread)
148+
} else {
149+
trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq,
150+
rx_pkt_offset, rx_pkt_len, 0);
151+
@@@ -498,37 -387,36 +515,65 @@@ try_again
152+
msg->msg_namelen = len;
153+
}
154+
155+
++<<<<<<< HEAD
156+
+ switch (READ_ONCE(call->state)) {
157+
+ case RXRPC_CALL_CLIENT_RECV_REPLY:
158+
+ case RXRPC_CALL_SERVER_RECV_REQUEST:
159+
+ case RXRPC_CALL_SERVER_ACK_REQUEST:
160+
+ ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
161+
+ flags, &copied);
162+
+ if (ret == -EAGAIN)
163+
+ ret = 0;
164+
+
165+
+ rxrpc_transmit_ack_packets(call->peer->local);
166+
+ if (!skb_queue_empty(&call->recvmsg_queue))
167+
+ rxrpc_notify_socket(call);
168+
+ break;
169+
+ default:
170+
++=======
171+
+ ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
172+
+ flags, &copied);
173+
+ if (ret == -EAGAIN)
174+
++>>>>>>> 93368b6bd58a (rxrpc: Move call state changes from recvmsg to I/O thread)
175+
ret = 0;
176+
- break;
177+
- }
178+
-
179+
+ if (ret == -EIO)
180+
+ goto call_failed;
181+
if (ret < 0)
182+
goto error_unlock_call;
183+
184+
++<<<<<<< HEAD
185+
+ if (call->state == RXRPC_CALL_COMPLETE) {
186+
+ ret = rxrpc_recvmsg_term(call, msg);
187+
+ if (ret < 0)
188+
+ goto error_unlock_call;
189+
+ if (!(flags & MSG_PEEK))
190+
+ rxrpc_release_call(rx, call);
191+
+ msg->msg_flags |= MSG_EOR;
192+
+ ret = 1;
193+
+ }
194+
++=======
195+
+ if (rxrpc_call_is_complete(call) &&
196+
+ skb_queue_empty(&call->recvmsg_queue))
197+
+ goto call_complete;
198+
+ if (rxrpc_call_has_failed(call))
199+
+ goto call_failed;
200+
++>>>>>>> 93368b6bd58a (rxrpc: Move call state changes from recvmsg to I/O thread)
201+
202+
+ rxrpc_notify_socket(call);
203+
+ goto not_yet_complete;
204+
+
205+
+ call_failed:
206+
+ rxrpc_purge_queue(&call->recvmsg_queue);
207+
+ call_complete:
208+
+ ret = rxrpc_recvmsg_term(call, msg);
209+
+ if (ret < 0)
210+
+ goto error_unlock_call;
211+
+ if (!(flags & MSG_PEEK))
212+
+ rxrpc_release_call(rx, call);
213+
+ msg->msg_flags |= MSG_EOR;
214+
+ ret = 1;
215+
+
216+
+ not_yet_complete:
217+
if (ret == 0)
218+
msg->msg_flags |= MSG_MORE;
219+
else
220+
@@@ -599,38 -483,23 +640,51 @@@ int rxrpc_kernel_recv_data(struct socke
221+
222+
mutex_lock(&call->user_mutex);
223+
224+
++<<<<<<< HEAD
225+
+ switch (READ_ONCE(call->state)) {
226+
+ case RXRPC_CALL_CLIENT_RECV_REPLY:
227+
+ case RXRPC_CALL_SERVER_RECV_REQUEST:
228+
+ case RXRPC_CALL_SERVER_ACK_REQUEST:
229+
+ ret = rxrpc_recvmsg_data(sock, call, NULL, iter,
230+
+ *_len, 0, &offset);
231+
+ *_len -= offset;
232+
+ if (ret < 0)
233+
+ goto out;
234+
+
235+
+ /* We can only reach here with a partially full buffer if we
236+
+ * have reached the end of the data. We must otherwise have a
237+
+ * full buffer or have been given -EAGAIN.
238+
+ */
239+
+ if (ret == 1) {
240+
+ if (iov_iter_count(iter) > 0)
241+
+ goto short_data;
242+
+ if (!want_more)
243+
+ goto read_phase_complete;
244+
+ ret = 0;
245+
+ goto out;
246+
+ }
247+
+
248+
+ if (!want_more)
249+
+ goto excess_data;
250+
++=======
251+
+ ret = rxrpc_recvmsg_data(sock, call, NULL, iter, *_len, 0, &offset);
252+
+ *_len -= offset;
253+
+ if (ret == -EIO)
254+
+ goto call_failed;
255+
+ if (ret < 0)
256+
++>>>>>>> 93368b6bd58a (rxrpc: Move call state changes from recvmsg to I/O thread)
257+
goto out;
258+
259+
- case RXRPC_CALL_COMPLETE:
260+
- goto call_complete;
261+
-
262+
- default:
263+
- ret = -EINPROGRESS;
264+
+ /* We can only reach here with a partially full buffer if we have
265+
+ * reached the end of the data. We must otherwise have a full buffer
266+
+ * or have been given -EAGAIN.
267+
+ */
268+
+ if (ret == 1) {
269+
+ if (iov_iter_count(iter) > 0)
270+
+ goto short_data;
271+
+ if (!want_more)
272+
+ goto read_phase_complete;
273+
+ ret = 0;
274+
goto out;
275+
}
276+
277+
@@@ -649,10 -523,12 +707,10 @@@ short_data
278+
ret = -EBADMSG;
279+
goto out;
280+
excess_data:
281+
- trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_excess_data,
282+
- call->cid, call->call_id, call->rx_consumed,
283+
- 0, -EMSGSIZE);
284+
+ trace_rxrpc_rx_eproto(call, 0, tracepoint_string("excess_data"));
285+
ret = -EMSGSIZE;
286+
goto out;
287+
- call_complete:
288+
+ call_failed:
289+
*_abort = call->abort_code;
290+
ret = call->error;
291+
if (call->completion == RXRPC_CALL_SUCCEEDED) {
292+
diff --git a/fs/afs/rxrpc.c b/fs/afs/rxrpc.c
293+
index eccc3cd0cb70..f7aede2ca31c 100644
294+
--- a/fs/afs/rxrpc.c
295+
+++ b/fs/afs/rxrpc.c
296+
@@ -900,6 +900,7 @@ int afs_extract_data(struct afs_call *call, bool want_more)
297+
ret = rxrpc_kernel_recv_data(net->socket, call->rxcall, iter,
298+
&call->iov_len, want_more, &remote_abort,
299+
&call->service_id);
300+
+ trace_afs_receive_data(call, call->iter, want_more, ret);
301+
if (ret == 0 || ret == -EAGAIN)
302+
return ret;
303+
304+
* Unmerged path net/rxrpc/af_rxrpc.c
305+
* Unmerged path net/rxrpc/ar-internal.h
306+
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
307+
index b5326e160685..3cfd84506d42 100644
308+
--- a/net/rxrpc/input.c
309+
+++ b/net/rxrpc/input.c
310+
@@ -313,6 +313,41 @@ static bool rxrpc_receiving_reply(struct rxrpc_call *call)
311+
return rxrpc_end_tx_phase(call, true, "ETD");
312+
}
313+
314+
+/*
315+
+ * End the packet reception phase.
316+
+ */
317+
+static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
318+
+{
319+
+ rxrpc_seq_t whigh = READ_ONCE(call->rx_highest_seq);
320+
+
321+
+ _enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
322+
+
323+
+ trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh);
324+
+
325+
+ if (rxrpc_call_state(call) == RXRPC_CALL_CLIENT_RECV_REPLY)
326+
+ rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack);
327+
+
328+
+ write_lock(&call->state_lock);
329+
+
330+
+ switch (call->state) {
331+
+ case RXRPC_CALL_CLIENT_RECV_REPLY:
332+
+ __rxrpc_call_completed(call);
333+
+ write_unlock(&call->state_lock);
334+
+ break;
335+
+
336+
+ case RXRPC_CALL_SERVER_RECV_REQUEST:
337+
+ call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
338+
+ call->expect_req_by = jiffies + MAX_JIFFY_OFFSET;
339+
+ write_unlock(&call->state_lock);
340+
+ rxrpc_propose_delay_ACK(call, serial,
341+
+ rxrpc_propose_ack_processing_op);
342+
+ break;
343+
+ default:
344+
+ write_unlock(&call->state_lock);
345+
+ break;
346+
+ }
347+
+}
348+
+
349+
static void rxrpc_input_update_ack_window(struct rxrpc_call *call,
350+
rxrpc_seq_t window, rxrpc_seq_t wtop)
351+
{
352+
@@ -331,8 +366,9 @@ static void rxrpc_input_queue_data(struct rxrpc_call *call, struct sk_buff *skb,
353+
354+
__skb_queue_tail(&call->recvmsg_queue, skb);
355+
rxrpc_input_update_ack_window(call, window, wtop);
356+
-
357+
trace_rxrpc_receive(call, last ? why + 1 : why, sp->hdr.serial, sp->hdr.seq);
358+
+ if (last)
359+
+ rxrpc_end_rx_phase(call, sp->hdr.serial);
360+
}
361+
362+
/*
363+
* Unmerged path net/rxrpc/recvmsg.c

0 commit comments

Comments
 (0)