Skip to content

Commit 2c2e57f

Browse files
authored
Merge pull request #960 from posit-dev/feature/read-srcref
Take charge of parsing and evaluating inputs
2 parents 8a05e0a + fbd76e2 commit 2c2e57f

File tree

37 files changed

+2492
-1145
lines changed

37 files changed

+2492
-1145
lines changed

crates/amalthea/src/error.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ pub enum Error {
4343
UnknownCommName(String),
4444
UnknownCommId(String),
4545
InvalidCommMessage(String, String, String),
46-
InvalidInputRequest(String),
4746
InvalidConsoleInput(String),
4847
Anyhow(anyhow::Error),
4948
ShellErrorReply(Exception),
@@ -196,9 +195,6 @@ impl fmt::Display for Error {
196195
msg, id, err
197196
)
198197
},
199-
Error::InvalidInputRequest(message) => {
200-
write!(f, "{message}")
201-
},
202198
Error::InvalidConsoleInput(message) => {
203199
write!(f, "{message}")
204200
},
@@ -228,6 +224,6 @@ impl<T: std::fmt::Debug> From<SendError<T>> for Error {
228224
macro_rules! anyhow {
229225
($($rest: expr),*) => {{
230226
let message = anyhow::anyhow!($($rest, )*);
231-
crate::error::Error::Anyhow(message)
227+
$crate::error::Error::Anyhow(message)
232228
}}
233229
}

crates/amalthea/src/fixtures/dummy_frontend.rs

Lines changed: 178 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use crate::wire::jupyter_message::JupyterMessage;
2121
use crate::wire::jupyter_message::Message;
2222
use crate::wire::jupyter_message::ProtocolMessage;
2323
use crate::wire::jupyter_message::Status;
24+
use crate::wire::shutdown_reply::ShutdownReply;
25+
use crate::wire::shutdown_request::ShutdownRequest;
2426
use crate::wire::status::ExecutionState;
2527
use crate::wire::stream::Stream;
2628
use crate::wire::wire_message::WireMessage;
@@ -36,7 +38,7 @@ pub struct DummyConnection {
3638
}
3739

3840
pub struct DummyFrontend {
39-
pub _control_socket: Socket,
41+
pub control_socket: Socket,
4042
pub shell_socket: Socket,
4143
pub iopub_socket: Socket,
4244
pub stdin_socket: Socket,
@@ -132,7 +134,7 @@ impl DummyFrontend {
132134
// the Jupyter specification, these must share a ZeroMQ identity.
133135
let shell_id = rand::thread_rng().gen::<[u8; 16]>();
134136

135-
let _control_socket = Socket::new(
137+
let control_socket = Socket::new(
136138
connection.session.clone(),
137139
connection.ctx.clone(),
138140
String::from("Control"),
@@ -198,7 +200,7 @@ impl DummyFrontend {
198200
});
199201

200202
Self {
201-
_control_socket,
203+
control_socket,
202204
shell_socket,
203205
iopub_socket,
204206
stdin_socket,
@@ -207,12 +209,22 @@ impl DummyFrontend {
207209
}
208210
}
209211

212+
/// Sends a Jupyter message on the Control socket; returns the ID of the newly
213+
/// created message
214+
pub fn send_control<T: ProtocolMessage>(&self, msg: T) -> String {
215+
Self::send(&self.control_socket, &self.session, msg)
216+
}
217+
210218
/// Sends a Jupyter message on the Shell socket; returns the ID of the newly
211219
/// created message
212220
pub fn send_shell<T: ProtocolMessage>(&self, msg: T) -> String {
213221
Self::send(&self.shell_socket, &self.session, msg)
214222
}
215223

224+
pub fn send_shutdown_request(&self, restart: bool) -> String {
225+
self.send_control(ShutdownRequest { restart })
226+
}
227+
216228
pub fn send_execute_request(&self, code: &str, options: ExecuteRequestOptions) -> String {
217229
self.send_shell(ExecuteRequest {
218230
code: String::from(code),
@@ -224,6 +236,77 @@ impl DummyFrontend {
224236
})
225237
}
226238

239+
/// Sends an execute request and handles the standard message flow:
240+
/// busy -> execute_input -> idle -> execute_reply.
241+
/// Asserts that the input code matches and returns the execution count.
242+
#[track_caller]
243+
pub fn execute_request_invisibly(&self, code: &str) -> u32 {
244+
self.send_execute_request(code, ExecuteRequestOptions::default());
245+
self.recv_iopub_busy();
246+
247+
let input = self.recv_iopub_execute_input();
248+
assert_eq!(input.code, code);
249+
250+
self.recv_iopub_idle();
251+
252+
let execution_count = self.recv_shell_execute_reply();
253+
assert_eq!(execution_count, input.execution_count);
254+
255+
execution_count
256+
}
257+
258+
/// Sends an execute request and handles the standard message flow with a result:
259+
/// busy -> execute_input -> execute_result -> idle -> execute_reply.
260+
/// Asserts that the input code matches and passes the result to the callback.
261+
/// Returns the execution count.
262+
#[track_caller]
263+
pub fn execute_request<F>(&self, code: &str, result_check: F) -> u32
264+
where
265+
F: FnOnce(String),
266+
{
267+
self.send_execute_request(code, ExecuteRequestOptions::default());
268+
self.recv_iopub_busy();
269+
270+
let input = self.recv_iopub_execute_input();
271+
assert_eq!(input.code, code);
272+
273+
let result = self.recv_iopub_execute_result();
274+
result_check(result);
275+
276+
self.recv_iopub_idle();
277+
278+
let execution_count = self.recv_shell_execute_reply();
279+
assert_eq!(execution_count, input.execution_count);
280+
281+
execution_count
282+
}
283+
284+
/// Sends an execute request that produces an error and handles the standard message flow:
285+
/// busy -> execute_input -> execute_error -> idle -> execute_reply_exception.
286+
/// Passes the error message to the callback for custom assertions.
287+
/// Returns the execution count.
288+
#[track_caller]
289+
pub fn execute_request_error<F>(&self, code: &str, error_check: F) -> u32
290+
where
291+
F: FnOnce(String),
292+
{
293+
self.send_execute_request(code, ExecuteRequestOptions::default());
294+
self.recv_iopub_busy();
295+
296+
let input = self.recv_iopub_execute_input();
297+
assert_eq!(input.code, code);
298+
299+
let error_msg = self.recv_iopub_execute_error();
300+
error_check(error_msg);
301+
302+
self.recv_iopub_idle();
303+
304+
let execution_count = self.recv_shell_execute_reply_exception();
305+
assert_eq!(execution_count, input.execution_count);
306+
307+
execution_count
308+
}
309+
227310
/// Sends a Jupyter message on the Stdin socket
228311
pub fn send_stdin<T: ProtocolMessage>(&self, msg: T) {
229312
Self::send(&self.stdin_socket, &self.session, msg);
@@ -236,6 +319,7 @@ impl DummyFrontend {
236319
id
237320
}
238321

322+
#[track_caller]
239323
pub fn recv(socket: &Socket) -> Message {
240324
// It's important to wait with a timeout because the kernel thread might have
241325
// panicked, preventing it from sending the expected message. The tests would then
@@ -246,28 +330,48 @@ impl DummyFrontend {
246330
//
247331
// Note that the panic hook will still have run to record the panic, so we'll get
248332
// expected panic information in the test output.
333+
//
334+
// If you're debugging tests, you'll need to bump this timeout to a large value.
249335
if socket.poll_incoming(10000).unwrap() {
250336
return Message::read_from_socket(socket).unwrap();
251337
}
252338

253339
panic!("Timeout while expecting message on socket {}", socket.name);
254340
}
255341

342+
/// Receives a Jupyter message from the Control socket
343+
#[track_caller]
344+
pub fn recv_control(&self) -> Message {
345+
Self::recv(&self.control_socket)
346+
}
347+
256348
/// Receives a Jupyter message from the Shell socket
349+
#[track_caller]
257350
pub fn recv_shell(&self) -> Message {
258351
Self::recv(&self.shell_socket)
259352
}
260353

261354
/// Receives a Jupyter message from the IOPub socket
355+
#[track_caller]
262356
pub fn recv_iopub(&self) -> Message {
263357
Self::recv(&self.iopub_socket)
264358
}
265359

266360
/// Receives a Jupyter message from the Stdin socket
361+
#[track_caller]
267362
pub fn recv_stdin(&self) -> Message {
268363
Self::recv(&self.stdin_socket)
269364
}
270365

366+
/// Receive from Control and assert `ShutdownReply` message.
367+
#[track_caller]
368+
pub fn recv_control_shutdown_reply(&self) -> ShutdownReply {
369+
let message = self.recv_control();
370+
assert_matches!(message, Message::ShutdownReply(message) => {
371+
message.content
372+
})
373+
}
374+
271375
/// Receive from Shell and assert `ExecuteReply` message.
272376
/// Returns `execution_count`.
273377
#[track_caller]
@@ -349,30 +453,63 @@ impl DummyFrontend {
349453
assert_matches!(msg, Message::UpdateDisplayData(_))
350454
}
351455

456+
/// Receive from IOPub Stream
457+
///
458+
/// Stdout and Stderr Stream messages are buffered, so to reliably test
459+
/// against them we have to collect the messages in batches on the receiving
460+
/// end and compare against an expected message.
461+
///
462+
/// The comparison is done with an assertive closure: we'll wait for more
463+
/// output as long as the closure panics.
464+
///
465+
/// Because closures can't track callers yet, the `recv_iopub_stream()`
466+
/// variant is more ergonomic and should be preferred.
467+
/// See <https://github.com/rust-lang/rust/issues/87417> for tracking issue.
352468
#[track_caller]
353-
pub fn recv_iopub_stream_stdout(&self, expect: &str) {
354-
self.recv_iopub_stream(expect, Stream::Stdout)
469+
fn recv_iopub_stream_with<F>(&self, stream: Stream, mut f: F)
470+
where
471+
F: FnMut(&str),
472+
{
473+
let mut out = String::new();
474+
475+
loop {
476+
let msg = self.recv_iopub();
477+
let piece = assert_matches!(msg, Message::Stream(data) => {
478+
assert_eq!(data.content.name, stream);
479+
data.content.text
480+
});
481+
out.push_str(&piece);
482+
483+
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
484+
f(&out);
485+
})) {
486+
Ok(_) => break,
487+
Err(_) => continue,
488+
};
489+
}
355490
}
356491

357492
#[track_caller]
358-
pub fn recv_iopub_stream_stderr(&self, expect: &str) {
359-
self.recv_iopub_stream(expect, Stream::Stderr)
493+
pub fn recv_iopub_stream_stdout_with<F>(&self, f: F)
494+
where
495+
F: FnMut(&str),
496+
{
497+
self.recv_iopub_stream_with(Stream::Stdout, f)
360498
}
361499

362500
#[track_caller]
363-
pub fn recv_iopub_comm_close(&self) -> String {
364-
let msg = self.recv_iopub();
365-
366-
assert_matches!(msg, Message::CommClose(data) => {
367-
data.content.comm_id
368-
})
501+
pub fn recv_iopub_stream_stderr_with<F>(&self, f: F)
502+
where
503+
F: FnMut(&str),
504+
{
505+
self.recv_iopub_stream_with(Stream::Stderr, f)
369506
}
370507

371508
/// Receive from IOPub Stream
372509
///
373-
/// Stdout and Stderr Stream messages are buffered, so to reliably test against them
374-
/// we have to collect the messages in batches on the receiving end and compare against
375-
/// an expected message.
510+
/// This variant compares the stream against its expected _last_ output.
511+
/// We can't use `recv_iopub_stream_with()` here because closures
512+
/// can't track callers.
376513
#[track_caller]
377514
fn recv_iopub_stream(&self, expect: &str, stream: Stream) {
378515
let mut out = String::new();
@@ -381,30 +518,45 @@ impl DummyFrontend {
381518
// Receive a piece of stream output (with a timeout)
382519
let msg = self.recv_iopub();
383520

384-
// Assert its type
385521
let piece = assert_matches!(msg, Message::Stream(data) => {
386522
assert_eq!(data.content.name, stream);
387523
data.content.text
388524
});
389525

390-
// Add to what we've already collected
391526
out += piece.as_str();
392527

393-
if out == expect {
394-
// Done, found the entire `expect` string
395-
return;
396-
}
397-
398-
if !expect.starts_with(out.as_str()) {
399-
// Something is wrong, message doesn't match up
400-
panic!("Expected IOPub stream of '{expect}'. Actual stream of '{out}'.");
528+
if out.ends_with(expect) {
529+
break;
401530
}
402531

403532
// We have a prefix of `expect`, but not the whole message yet.
404533
// Wait on the next IOPub Stream message.
405534
}
406535
}
407536

537+
/// Receives stdout stream output until the collected output ends with
538+
/// `expect`. Note: The comparison uses `ends_with`, not full equality.
539+
#[track_caller]
540+
pub fn recv_iopub_stream_stdout(&self, expect: &str) {
541+
self.recv_iopub_stream(expect, Stream::Stdout)
542+
}
543+
544+
/// Receives stderr stream output until the collected output ends with
545+
/// `expect`. Note: The comparison uses `ends_with`, not full equality.
546+
#[track_caller]
547+
pub fn recv_iopub_stream_stderr(&self, expect: &str) {
548+
self.recv_iopub_stream(expect, Stream::Stderr)
549+
}
550+
551+
#[track_caller]
552+
pub fn recv_iopub_comm_close(&self) -> String {
553+
let msg = self.recv_iopub();
554+
555+
assert_matches!(msg, Message::CommClose(data) => {
556+
data.content.comm_id
557+
})
558+
}
559+
408560
/// Receive from IOPub and assert ExecuteResult message. Returns compulsory
409561
/// `evalue` field.
410562
#[track_caller]

crates/amalthea/src/socket/control.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,11 @@ impl Control {
102102
H: FnOnce(JupyterMessage<T>) -> Result<(), Error>,
103103
{
104104
// Enter the kernel-busy state in preparation for handling the message.
105+
// The protocol specification is vague about status messages for
106+
// Control, we mostly emit them for compatibility with ipykernel:
107+
// https://github.com/ipython/ipykernel/pull/585. These status messages
108+
// can be discriminated from those on Shell by examining the parent
109+
// header.
105110
if let Err(err) = self.send_state(req.clone(), ExecutionState::Busy) {
106111
warn!("Failed to change kernel status to busy: {err}");
107112
}

crates/amalthea/src/wire/jupyter_message.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use crate::wire::is_complete_reply::IsCompleteReply;
4646
use crate::wire::is_complete_request::IsCompleteRequest;
4747
use crate::wire::kernel_info_request::KernelInfoRequest;
4848
use crate::wire::originator::Originator;
49+
use crate::wire::shutdown_reply::ShutdownReply;
4950
use crate::wire::shutdown_request::ShutdownRequest;
5051
use crate::wire::status::KernelStatus;
5152
use crate::wire::wire_message::WireMessage;
@@ -101,6 +102,7 @@ pub enum Message {
101102
// Control
102103
InterruptReply(JupyterMessage<InterruptReply>),
103104
InterruptRequest(JupyterMessage<InterruptRequest>),
105+
ShutdownReply(JupyterMessage<ShutdownReply>),
104106
ShutdownRequest(JupyterMessage<ShutdownRequest>),
105107
// Registration
106108
HandshakeRequest(JupyterMessage<HandshakeRequest>),
@@ -163,6 +165,7 @@ impl TryFrom<&Message> for WireMessage {
163165
Message::IsCompleteRequest(msg) => WireMessage::try_from(msg),
164166
Message::KernelInfoReply(msg) => WireMessage::try_from(msg),
165167
Message::KernelInfoRequest(msg) => WireMessage::try_from(msg),
168+
Message::ShutdownReply(msg) => WireMessage::try_from(msg),
166169
Message::ShutdownRequest(msg) => WireMessage::try_from(msg),
167170
Message::Status(msg) => WireMessage::try_from(msg),
168171
Message::CommInfoReply(msg) => WireMessage::try_from(msg),
@@ -245,6 +248,9 @@ impl TryFrom<&WireMessage> for Message {
245248
if kind == UpdateDisplayData::message_type() {
246249
return Ok(Message::UpdateDisplayData(JupyterMessage::try_from(msg)?));
247250
}
251+
if kind == ShutdownReply::message_type() {
252+
return Ok(Message::ShutdownReply(JupyterMessage::try_from(msg)?));
253+
}
248254
if kind == ShutdownRequest::message_type() {
249255
return Ok(Message::ShutdownRequest(JupyterMessage::try_from(msg)?));
250256
}

0 commit comments

Comments
 (0)