diff --git a/src/api/downloader.rs b/src/api/downloader.rs index 50db0fc2f..bf78bf793 100644 --- a/src/api/downloader.rs +++ b/src/api/downloader.rs @@ -34,7 +34,7 @@ pub struct Downloader { #[rpc_requests(message = SwarmMsg, alias = "Msg")] #[derive(Debug, Serialize, Deserialize)] enum SwarmProtocol { - #[rpc(tx = mpsc::Sender)] + #[rpc(tx = mpsc::Sender)] Download(DownloadRequest), } @@ -46,7 +46,7 @@ struct DownloaderActor { } #[derive(Debug, Serialize, Deserialize)] -pub enum DownloadProgessItem { +pub enum DownloadProgressItem { #[serde(skip)] Error(anyhow::Error), TryProvider { @@ -98,7 +98,7 @@ impl DownloaderActor { async fn handle_download(store: Store, pool: ConnectionPool, msg: DownloadMsg) { let DownloadMsg { inner, mut tx, .. } = msg; if let Err(cause) = handle_download_impl(store, pool, inner, &mut tx).await { - tx.send(DownloadProgessItem::Error(cause)).await.ok(); + tx.send(DownloadProgressItem::Error(cause)).await.ok(); } } @@ -106,7 +106,7 @@ async fn handle_download_impl( store: Store, pool: ConnectionPool, request: DownloadRequest, - tx: &mut mpsc::Sender, + tx: &mut mpsc::Sender, ) -> anyhow::Result<()> { match request.strategy { SplitStrategy::Split => handle_download_split_impl(store, pool, request, tx).await?, @@ -127,7 +127,7 @@ async fn handle_download_split_impl( store: Store, pool: ConnectionPool, request: DownloadRequest, - tx: &mut mpsc::Sender, + tx: &mut mpsc::Sender, ) -> anyhow::Result<()> { let providers = request.providers; let requests = split_request(&request.request, &providers, &pool, &store, Drain).await?; @@ -140,7 +140,7 @@ async fn handle_download_split_impl( let progress_tx = progress_tx.clone(); async move { let hash = request.hash; - let (tx, rx) = tokio::sync::mpsc::channel::<(usize, DownloadProgessItem)>(16); + let (tx, rx) = tokio::sync::mpsc::channel::<(usize, DownloadProgressItem)>(16); progress_tx.send(rx).await.ok(); let sink = TokioMpscSenderSink(tx).with_map(move |x| (id, x)); let res = execute_get(&pool, Arc::new(request), &providers, &store, sink).await; @@ -154,12 +154,12 @@ async fn handle_download_split_impl( into_stream(progress_rx) .flat_map(into_stream) .map(move |(id, item)| match item { - DownloadProgessItem::Progress(offset) => { + DownloadProgressItem::Progress(offset) => { total += offset; if let Some(prev) = offsets.insert(id, offset) { total -= prev; } - DownloadProgessItem::Progress(total) + DownloadProgressItem::Progress(total) } x => x, }) @@ -174,7 +174,7 @@ async fn handle_download_split_impl( Some((_hash, Ok(()))) => { } Some((_hash, Err(_e))) => { - tx.send(DownloadProgessItem::DownloadError).await?; + tx.send(DownloadProgressItem::DownloadError).await?; } None => break, } @@ -298,19 +298,19 @@ impl<'de> Deserialize<'de> for DownloadRequest { pub type DownloadOptions = DownloadRequest; pub struct DownloadProgress { - fut: future::Boxed>>, + fut: future::Boxed>>, } impl DownloadProgress { - fn new(fut: future::Boxed>>) -> Self { + fn new(fut: future::Boxed>>) -> Self { Self { fut } } - pub async fn stream(self) -> irpc::Result + Unpin> { + pub async fn stream(self) -> irpc::Result + Unpin> { let rx = self.fut.await?; Ok(Box::pin(rx.into_stream().map(|item| match item { Ok(item) => item, - Err(e) => DownloadProgessItem::Error(e.into()), + Err(e) => DownloadProgressItem::Error(e.into()), }))) } @@ -320,8 +320,8 @@ impl DownloadProgress { tokio::pin!(stream); while let Some(item) = stream.next().await { match item? { - DownloadProgessItem::Error(e) => Err(e)?, - DownloadProgessItem::DownloadError => anyhow::bail!("Download error"), + DownloadProgressItem::Error(e) => Err(e)?, + DownloadProgressItem::DownloadError => anyhow::bail!("Download error"), _ => {} } } @@ -372,7 +372,7 @@ async fn split_request<'a>( providers: &Arc, pool: &ConnectionPool, store: &Store, - progress: impl Sink, + progress: impl Sink, ) -> anyhow::Result + Send + 'a>> { Ok(match request { FiniteRequest::Get(req) => { @@ -428,13 +428,13 @@ async fn execute_get( request: Arc, providers: &Arc, store: &Store, - mut progress: impl Sink, + mut progress: impl Sink, ) -> anyhow::Result<()> { let remote = store.remote(); let mut providers = providers.find_providers(request.content()); while let Some(provider) = providers.next().await { progress - .send(DownloadProgessItem::TryProvider { + .send(DownloadProgressItem::TryProvider { id: provider, request: request.clone(), }) @@ -447,7 +447,7 @@ async fn execute_get( let local_bytes = local.local_bytes(); let Ok(conn) = conn.await else { progress - .send(DownloadProgessItem::ProviderFailed { + .send(DownloadProgressItem::ProviderFailed { id: provider, request: request.clone(), }) @@ -458,13 +458,13 @@ async fn execute_get( .execute_get_sink( &conn, local.missing(), - (&mut progress).with_map(move |x| DownloadProgessItem::Progress(x + local_bytes)), + (&mut progress).with_map(move |x| DownloadProgressItem::Progress(x + local_bytes)), ) .await { Ok(_stats) => { progress - .send(DownloadProgessItem::PartComplete { + .send(DownloadProgressItem::PartComplete { request: request.clone(), }) .await?; @@ -472,7 +472,7 @@ async fn execute_get( } Err(_cause) => { progress - .send(DownloadProgessItem::ProviderFailed { + .send(DownloadProgressItem::ProviderFailed { id: provider, request: request.clone(), })