-
Notifications
You must be signed in to change notification settings - Fork 379
Closed
Description
When running cargo lambda watch with a streaming response like:
use lambda_runtime::{
service_fn,
streaming::{channel, Body, Response},
tracing, Error, LambdaEvent,
};
use serde_json::Value;
async fn stream(_event: LambdaEvent<Value>) -> Result<Response<Body>, Error> {
let (mut tx, rx) = channel();
tokio::spawn(async move {
for i in 0..10 {
if let Err(err) = tx.send_data(format!("{}\n", i).into()).await {
tracing::error!("failed to send data: {}", err);
break;
}
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await
}
});
Ok(rx.into())
}
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing::init_default_subscriber();
lambda_runtime::run(service_fn(stream)).await?;
Ok(())
}If the client the making the request terminates it prematurely:
$ curl -N localhost:9000
0
1
2
3
^C
The thread accepting the request panics:
thread 'tokio-runtime-worker' panicked at /Users/robherley/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/lambda_runtime-0.14.4/src/requests.rs:120:51:
called `Result::unwrap()` on an `Err` value: Error { inner: ChannelClosed }
This is due to the unwrapping of send_data:
| tx.send_data(chunk).await.unwrap(); |
Which can raise the ChannelClosed:
aws-lambda-rust-runtime/lambda-runtime-api-client/src/body/sender.rs
Lines 64 to 69 in 084a7c8
| pub async fn send_data(&mut self, chunk: Bytes) -> Result<(), Error> { | |
| self.ready().await?; | |
| self.data_tx | |
| .try_send(Ok(chunk)) | |
| .map_err(|_| Error::new(SenderError::ChannelClosed)) | |
| } |
I'm not quite sure what the desired behavior is, but it should be more graceful than crashing all the tasks in that shared thread.
After some quick testing I believe this only happens locally. When deployed (using a function URL) it doesn't appear to panic and continues to finish the task, which might be due to how "real" client connections are abstracted.
Metadata
Metadata
Assignees
Labels
No labels