|
1 | 1 | use std::io::Cursor; |
2 | 2 |
|
3 | 3 | use bao_tree::ChunkRanges; |
| 4 | +use bytes::BytesMut; |
4 | 5 | use iroh_io::AsyncSliceReaderExt; |
5 | 6 |
|
6 | 7 | use crate::{ |
7 | 8 | store::{ |
8 | 9 | bao_file::test_support::{ |
9 | 10 | decode_response_into_batch, make_wire_data, random_test_data, simulate_remote, validate, |
10 | 11 | }, |
11 | | - Map as _, MapEntryMut, MapMut, ReadableStore, Store as _, |
| 12 | + Map as _, MapEntry, MapEntryMut, MapMut, ReadableStore, Store as _, ValidateProgress, |
12 | 13 | }, |
13 | | - util::raw_outboard, |
| 14 | + util::{progress::AsyncChannelProgressSender, raw_outboard}, |
14 | 15 | IROH_BLOCK_SIZE, |
15 | 16 | }; |
16 | 17 |
|
@@ -809,3 +810,34 @@ async fn actor_store_smoke() { |
809 | 810 | db.sync().await.unwrap(); |
810 | 811 | db.dump().await.unwrap(); |
811 | 812 | } |
| 813 | + |
| 814 | +#[tokio::test] |
| 815 | +async fn verifiable_stream_smoke() -> testresult::TestResult { |
| 816 | + let db1 = crate::store::mem::Store::new(); |
| 817 | + let db2 = crate::store::mem::Store::new(); |
| 818 | + |
| 819 | + const SIZE: usize = 16 * 1024 * 1024; |
| 820 | + let data = random_test_data(SIZE); |
| 821 | + let tag = db1.import_bytes(Bytes::from(data), BlobFormat::Raw).await?; |
| 822 | + let mut buffer = BytesMut::with_capacity(SIZE + 1024 * 1024); |
| 823 | + let entry = db1.get(tag.hash()).await?.expect("We just wrote this hash"); |
| 824 | + |
| 825 | + entry.write_verifiable_stream(0, &mut buffer).await?; |
| 826 | + |
| 827 | + db2.import_verifiable_stream(*tag.hash(), SIZE as u64, 0, buffer.freeze()) |
| 828 | + .await?; |
| 829 | + |
| 830 | + let (tx, rx) = async_channel::bounded(128); |
| 831 | + let handle = tokio::spawn(async move { |
| 832 | + while let Ok(progress) = rx.recv().await { |
| 833 | + if let ValidateProgress::Abort(err) = progress { |
| 834 | + panic!("Got an error: {err}"); |
| 835 | + } |
| 836 | + } |
| 837 | + }); |
| 838 | + db2.validate(false, AsyncChannelProgressSender::new(tx).boxed()) |
| 839 | + .await?; |
| 840 | + handle.await?; |
| 841 | + |
| 842 | + Ok(()) |
| 843 | +} |
0 commit comments