diff --git a/bundled/componentize_py_async_support/__init__.py b/bundled/componentize_py_async_support/__init__.py index 2d65b71..28a1a45 100644 --- a/bundled/componentize_py_async_support/__init__.py +++ b/bundled/componentize_py_async_support/__init__.py @@ -26,6 +26,11 @@ class _FutureState: handles: list[asyncio.Handle] pending_count: int +class _ReturnCode: + COMPLETED = 0 + DROPPED = 1 + CANCELLED = 2 + class _CallbackCode: EXIT = 0 YIELD = 1 diff --git a/bundled/componentize_py_async_support/futures.py b/bundled/componentize_py_async_support/futures.py index c35ba0d..a50c16e 100644 --- a/bundled/componentize_py_async_support/futures.py +++ b/bundled/componentize_py_async_support/futures.py @@ -4,6 +4,7 @@ from typing import TypeVar, Generic, cast, Self, Any, Callable from types import TracebackType +from componentize_py_async_support import _ReturnCode T = TypeVar('T') @@ -13,7 +14,7 @@ def __init__(self, type_: int, handle: int): self.handle: int | None = handle self.finalizer = weakref.finalize(self, componentize_py_runtime.future_drop_readable, type_, handle) - async def read(self) -> T | None: + async def read(self) -> T: self.finalizer.detach() handle = self.handle self.handle = None @@ -57,15 +58,25 @@ def __init__(self, type_: int, handle: int, default: Callable[[], T]): self.default = default self.finalizer = weakref.finalize(self, write_default, type_, handle, default) - async def write(self, value: T) -> None: + async def write(self, value: T) -> bool: self.finalizer.detach() handle = self.handle self.handle = None if handle is not None: - await componentize_py_async_support.await_result( + code, _ = await componentize_py_async_support.await_result( componentize_py_runtime.future_write(self.type_, handle, value) ) componentize_py_runtime.future_drop_writable(self.type_, handle) + match code: + case _ReturnCode.COMPLETED: + return True + case _ReturnCode.DROPPED: + return False + case _ReturnCode.CANCELLED: + # todo + raise NotImplementedError + case _: + raise AssertionError else: raise AssertionError diff --git a/bundled/componentize_py_async_support/streams.py b/bundled/componentize_py_async_support/streams.py index 629c5a4..7650d62 100644 --- a/bundled/componentize_py_async_support/streams.py +++ b/bundled/componentize_py_async_support/streams.py @@ -4,11 +4,7 @@ from typing import TypeVar, Generic, Self, cast from types import TracebackType - -class _ReturnCode: - COMPLETED = 0 - DROPPED = 1 - CANCELLED = 2 +from componentize_py_async_support import _ReturnCode class ByteStreamReader: def __init__(self, type_: int, handle: int): diff --git a/bundled/componentize_py_runtime.pyi b/bundled/componentize_py_runtime.pyi index b670aad..5377e92 100644 --- a/bundled/componentize_py_runtime.pyi +++ b/bundled/componentize_py_runtime.pyi @@ -19,7 +19,7 @@ def promise_get_result(event: int, promise: int) -> Any: ... def future_read(ty: int, future: int) -> Result[Any, tuple[int, int]]: ... -def future_write(ty: int, future: int, value: Any) -> Result[None, tuple[int, int]]: ... +def future_write(ty: int, future: int, value: Any) -> Result[tuple[int, int], tuple[int, int]]: ... def future_drop_readable(ty: int, future: int) -> None: ... diff --git a/examples/cli/README.md b/examples/cli/README.md index 03bf829..e2a9144 100644 --- a/examples/cli/README.md +++ b/examples/cli/README.md @@ -9,15 +9,15 @@ run a Python-based component targetting the [wasi-cli] `command` world. ## Prerequisites -* `Wasmtime` 39.0.0 or later +* `Wasmtime` 38.0.0 or later * `componentize-py` 0.19.0 Below, we use [Rust](https://rustup.rs/)'s `cargo` to install `Wasmtime`. If you don't have `cargo`, you can download and install from -https://github.com/bytecodealliance/wasmtime/releases/tag/v39.0.0. +https://github.com/bytecodealliance/wasmtime/releases/tag/v38.0.0. ``` -cargo install --version 39.0.0 wasmtime-cli +cargo install --version 38.0.0 wasmtime-cli pip install componentize-py==0.19.0 ``` diff --git a/examples/http-p3/README.md b/examples/http-p3/README.md index 0ca960e..dabd7f6 100644 --- a/examples/http-p3/README.md +++ b/examples/http-p3/README.md @@ -10,15 +10,15 @@ run a Python-based component targetting version `0.3.0-rc-2025-09-16` of the ## Prerequisites -* `Wasmtime` 39.0.0 or later +* `Wasmtime` 38.0.0 or later * `componentize-py` 0.19.0 Below, we use [Rust](https://rustup.rs/)'s `cargo` to install `Wasmtime`. If you don't have `cargo`, you can download and install from -https://github.com/bytecodealliance/wasmtime/releases/tag/v39.0.0. +https://github.com/bytecodealliance/wasmtime/releases/tag/v38.0.0. ``` -cargo install --version 39.0.0 wasmtime-cli +cargo install --version 38.0.0 wasmtime-cli pip install componentize-py==0.19.0 ``` diff --git a/examples/http/README.md b/examples/http/README.md index 06e7938..26895a8 100644 --- a/examples/http/README.md +++ b/examples/http/README.md @@ -9,15 +9,15 @@ run a Python-based component targetting the [wasi-http] `proxy` world. ## Prerequisites -* `Wasmtime` 39.0.0 or later +* `Wasmtime` 38.0.0 or later * `componentize-py` 0.19.0 Below, we use [Rust](https://rustup.rs/)'s `cargo` to install `Wasmtime`. If you don't have `cargo`, you can download and install from -https://github.com/bytecodealliance/wasmtime/releases/tag/v39.0.0. +https://github.com/bytecodealliance/wasmtime/releases/tag/v38.0.0. ``` -cargo install --version 39.0.0 wasmtime-cli +cargo install --version 38.0.0 wasmtime-cli pip install componentize-py==0.19.0 ``` diff --git a/examples/matrix-math/README.md b/examples/matrix-math/README.md index 36dbfb0..b09e4c0 100644 --- a/examples/matrix-math/README.md +++ b/examples/matrix-math/README.md @@ -10,7 +10,7 @@ within a guest component. ## Prerequisites -* `wasmtime` 39.0.0 or later +* `wasmtime` 38.0.0 or later * `componentize-py` 0.19.0 * `NumPy`, built for WASI @@ -19,10 +19,10 @@ not yet publish WASI builds. Below, we use [Rust](https://rustup.rs/)'s `cargo` to install `Wasmtime`. If you don't have `cargo`, you can download and install from -https://github.com/bytecodealliance/wasmtime/releases/tag/v39.0.0. +https://github.com/bytecodealliance/wasmtime/releases/tag/v38.0.0. ``` -cargo install --version 39.0.0 wasmtime-cli +cargo install --version 38.0.0 wasmtime-cli pip install componentize-py==0.19.0 curl -OL https://github.com/dicej/wasi-wheels/releases/download/v0.0.2/numpy-wasi.tar.gz tar xf numpy-wasi.tar.gz diff --git a/examples/sandbox/README.md b/examples/sandbox/README.md index 3bd9e47..b50a478 100644 --- a/examples/sandbox/README.md +++ b/examples/sandbox/README.md @@ -7,11 +7,11 @@ sandboxed Python code snippets from within a Python app. ## Prerequisites -* `wasmtime-py` 39.0.0 or later +* `wasmtime-py` 38.0.0 or later * `componentize-py` 0.19.0 ``` -pip install componentize-py==0.19.0 wasmtime==39.0.0 +pip install componentize-py==0.19.0 wasmtime==38.0.0 ``` ## Running the demo diff --git a/examples/tcp/README.md b/examples/tcp/README.md index b9bb327..72ff412 100644 --- a/examples/tcp/README.md +++ b/examples/tcp/README.md @@ -10,15 +10,15 @@ making an outbound TCP request using `wasi-sockets`. ## Prerequisites -* `Wasmtime` 39.0.0 or later +* `Wasmtime` 38.0.0 or later * `componentize-py` 0.19.0 Below, we use [Rust](https://rustup.rs/)'s `cargo` to install `Wasmtime`. If you don't have `cargo`, you can download and install from -https://github.com/bytecodealliance/wasmtime/releases/tag/v39.0.0. +https://github.com/bytecodealliance/wasmtime/releases/tag/v38.0.0. ``` -cargo install --version 39.0.0 wasmtime-cli +cargo install --version 38.0.0 wasmtime-cli pip install componentize-py==0.19.0 ``` diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 3d65fb4..516d849 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -310,6 +310,7 @@ mod async_ { }, FutureWrite { _call: MyCall<'static>, + resources: Option>, }, } @@ -391,9 +392,16 @@ mod async_ { } call.stack.pop().unwrap_or(py.None()).into_bound(py) } - Promise::FutureWrite { .. } => { + Promise::FutureWrite { ref resources, .. } => { let count = event >> 4; let code = event & 0xF; + + if let (RETURN_CODE_DROPPED, Some(resources)) = (code, resources) { + for resource in resources { + resource.restore(py) + } + } + PyTuple::new(py, [code, count]).unwrap().into_any() } } @@ -804,11 +812,16 @@ mod async_ { } else { unsafe { call.defer_deallocate(buffer, layout) }; + call.resources = Some(Vec::new()); let code = unsafe { ty.lower(&mut call, buffer); ty.write()(handle, buffer.cast()) }; + let resources = call + .resources + .take() + .and_then(|v| if v.is_empty() { None } else { Some(v) }); Ok(if code == RETURN_CODE_BLOCKED { ERR_CONSTRUCTOR @@ -822,6 +835,7 @@ mod async_ { usize::try_from(handle).unwrap(), Box::into_raw(Box::new(async_::Promise::FutureWrite { _call: call, + resources, })) as usize, ], ) @@ -831,6 +845,13 @@ mod async_ { } else { let count = code >> 4; let code = code & 0xF; + + if let (RETURN_CODE_DROPPED, Some(resources)) = (code, &resources) { + for resource in resources { + resource.restore(py) + } + } + OK_CONSTRUCTOR .get() .unwrap() @@ -1330,8 +1351,10 @@ impl Interpreter for MyInterpreter { } fn resource_dtor(ty: wit::Resource, handle: usize) { + // We don't currently include a `drop` function as part of the abstract + // base class we generate for an exported resource, so there's nothing + // to do here. If/when that changes, we'll want to call `drop` here. _ = (ty, handle); - todo!() } } diff --git a/src/test/python_source/app.py b/src/test/python_source/app.py index 7160d79..a745596 100644 --- a/src/test/python_source/app.py +++ b/src/test/python_source/app.py @@ -5,12 +5,14 @@ import resource_alias1 import resource_borrow_in_record import componentize_py_async_support +import streams_and_futures as my_streams_and_futures from componentize_py_types import Result, Ok, Err from tests import exports, imports from tests.imports import resource_borrow_import from tests.imports import simple_import_and_export from tests.imports import simple_async_import_and_export +from tests.imports import host_thing_interface from tests.exports import resource_alias2 from tests.exports import streams_and_futures from typing import Tuple, List, Optional @@ -147,6 +149,42 @@ async def pipe_things(rx: StreamReader[streams_and_futures.Thing], tx: StreamWri # Write the things all at once. The host will read them only one at a time, # forcing us to re-take ownership of any unwritten items between writes. await tx.write_all(things) + +async def pipe_host_things(rx: StreamReader[host_thing_interface.HostThing], tx: StreamWriter[host_thing_interface.HostThing]): + # Read the things one at a time, forcing the host to re-take ownership of + # any unwritten items between writes. + things = [] + while not rx.writer_dropped: + things += await rx.read(1) + + # Write the things all at once. The host will read them only one at a time, + # forcing us to re-take ownership of any unwritten items between writes. + await tx.write_all(things) + +async def write_thing(thing: my_streams_and_futures.Thing, + tx1: FutureWriter[streams_and_futures.Thing], + tx2: FutureWriter[streams_and_futures.Thing]): + # The host will drop the first reader without reading, which should give us + # back ownership of `thing`. + wrote = await tx1.write(thing) + assert not wrote + # The host will read from the second reader, though. + wrote = await tx2.write(thing) + assert wrote + +async def write_host_thing(thing: host_thing_interface.HostThing, + tx1: FutureWriter[host_thing_interface.HostThing], + tx2: FutureWriter[host_thing_interface.HostThing]): + # The host will drop the first reader without reading, which should give us + # back ownership of `thing`. + wrote = await tx1.write(thing) + assert not wrote + # The host will read from the second reader, though. + wrote = await tx2.write(thing) + assert wrote + +def unreachable() -> str: + raise AssertionError class StreamsAndFutures(exports.StreamsAndFutures): async def echo_stream_u8(self, stream: ByteStreamReader) -> ByteStreamReader: @@ -155,8 +193,6 @@ async def echo_stream_u8(self, stream: ByteStreamReader) -> ByteStreamReader: return rx async def echo_future_string(self, future: FutureReader[str]) -> FutureReader[str]: - def unreachable() -> str: - raise AssertionError tx, rx = tests.string_future(unreachable) componentize_py_async_support.spawn(pipe_strings(future, tx)) return rx @@ -166,6 +202,23 @@ async def short_reads(self, stream: StreamReader[streams_and_futures.Thing]) -> componentize_py_async_support.spawn(pipe_things(stream, tx)) return rx + async def short_reads_host(self, stream: StreamReader[host_thing_interface.HostThing]) -> StreamReader[host_thing_interface.HostThing]: + tx, rx = tests.host_thing_interface_host_thing_stream() + componentize_py_async_support.spawn(pipe_host_things(stream, tx)) + return rx + + async def dropped_future_reader(self, value: str) -> tuple[FutureReader[streams_and_futures.Thing], FutureReader[streams_and_futures.Thing]]: + tx1, rx1 = tests.streams_and_futures_thing_future(unreachable) + tx2, rx2 = tests.streams_and_futures_thing_future(unreachable) + componentize_py_async_support.spawn(write_thing(my_streams_and_futures.Thing(value), tx1, tx2)) + return (rx1, rx2) + + async def dropped_future_reader_host(self, value: str) -> tuple[FutureReader[host_thing_interface.HostThing], FutureReader[host_thing_interface.HostThing]]: + tx1, rx1 = tests.host_thing_interface_host_thing_future(unreachable) + tx2, rx2 = tests.host_thing_interface_host_thing_future(unreachable) + componentize_py_async_support.spawn(write_host_thing(host_thing_interface.HostThing(value), tx1, tx2)) + return (rx1, rx2) + class Tests(tests.Tests): def test_resource_borrow_import(self, v: int) -> int: return resource_borrow_import.foo(resource_borrow_import.Thing(v + 1)) + 4 diff --git a/src/test/tests.rs b/src/test/tests.rs index 13a7944..8cb37d0 100644 --- a/src/test/tests.rs +++ b/src/test/tests.rs @@ -40,6 +40,7 @@ wasmtime::component::bindgen!({ "componentize-py:test/resource-floats.float": MyFloat, "resource-floats-imports.float": MyFloat, "componentize-py:test/resource-borrow-in-record.thing": ThingString, + "componentize-py:test/host-thing-interface.host-thing": ThingString, }, }); @@ -1188,9 +1189,12 @@ fn test_short_reads(delay: bool) -> Result<()> { things.push(thing.call_constructor(&mut store, string).await?); } - store + let received_things = store .run_concurrent(async |store| { let count = things.len(); + // Write the items all at once. The receiver will only read them + // one at a time, forcing us to retake ownership of the unwritten + // items between writes. let stream = store .with(|store| StreamReader::new(store, VecProducer::new(things, delay))); @@ -1199,6 +1203,8 @@ fn test_short_reads(delay: bool) -> Result<()> { let received_things = Arc::new(Mutex::new( Vec::::with_capacity(count), )); + // Read only one item at a time, forcing the sender to retake + // ownership of any unwritten items. store.with(|store| { stream.pipe(store, OneAtATime::new(received_things.clone(), delay)) }); @@ -1209,7 +1215,7 @@ fn test_short_reads(delay: bool) -> Result<()> { let mut received_strings = Vec::with_capacity(strings.len()); let received_things = mem::take(received_things.lock().unwrap().deref_mut()); - for it in received_things { + for &it in &received_things { received_strings.push(thing.call_get(store, it).await?.0); } @@ -1221,6 +1227,106 @@ fn test_short_reads(delay: bool) -> Result<()> { .collect::>() ); + anyhow::Ok(received_things) + }) + .await??; + + for it in received_things { + it.resource_drop_async::<()>(&mut store).await?; + } + + anyhow::Ok(()) + })?; + + Ok(()) + }) +} + +#[test] +fn short_reads_host() -> Result<()> { + test_short_reads_host(false) +} + +#[test] +fn short_reads_host_with_delay() -> Result<()> { + test_short_reads_host(true) +} + +fn test_short_reads_host(delay: bool) -> Result<()> { + use componentize_py::test::host_thing_interface::{ + Host, HostHostThing, HostHostThingWithStore, + }; + + impl HostHostThingWithStore for HasSelf { + async fn get( + accessor: &Accessor, + this: Resource, + ) -> Result { + accessor.with(|mut store| Ok(store.get().table.get(&this)?.0.clone())) + } + } + + impl HostHostThing for Ctx { + async fn new(&mut self, v: String) -> Result> { + Ok(self.ctx().table.push(ThingString(v))?) + } + + async fn drop(&mut self, this: Resource) -> Result<()> { + Ok(self.ctx().table.delete(this).map(|_| ())?) + } + } + + impl Host for Ctx {} + + TESTER.test(|world, store, runtime| { + runtime.block_on(async { + let instance = world.componentize_py_test_streams_and_futures(); + + let strings = ["a", "b", "c", "d", "e"]; + let mut things = Vec::with_capacity(strings.len()); + for string in strings { + things.push(store.data_mut().table.push(ThingString(string.into()))?); + } + + store + .run_concurrent(async |store| { + let count = things.len(); + // Write the items all at once. The receiver will only read them + // one at a time, forcing us to retake ownership of the unwritten + // items between writes. + let stream = store + .with(|store| StreamReader::new(store, VecProducer::new(things, delay))); + + let (stream, task) = instance.call_short_reads_host(store, stream).await?; + + let received_things = Arc::new(Mutex::new( + Vec::>::with_capacity(count), + )); + // Read only one item at a time, forcing the sender to retake + // ownership of any unwritten items. + store.with(|store| { + stream.pipe(store, OneAtATime::new(received_things.clone(), delay)) + }); + + task.block(store).await; + + assert_eq!(count, received_things.lock().unwrap().len()); + + let received_strings = store.with(|mut store| { + mem::take(received_things.lock().unwrap().deref_mut()) + .into_iter() + .map(|v| Ok(store.get().table.delete(v)?.0)) + .collect::>>() + })?; + + assert_eq!( + &strings[..], + &received_strings + .iter() + .map(|s| s.as_str()) + .collect::>() + ); + anyhow::Ok(()) }) .await? @@ -1229,3 +1335,108 @@ fn test_short_reads(delay: bool) -> Result<()> { Ok(()) }) } + +#[test] +fn dropped_future_reader() -> Result<()> { + test_dropped_future_reader(false) +} + +#[test] +fn dropped_future_reader_with_delay() -> Result<()> { + test_dropped_future_reader(true) +} + +fn test_dropped_future_reader(delay: bool) -> Result<()> { + TESTER.test(|world, mut store, runtime| { + runtime.block_on(async { + let instance = world.componentize_py_test_streams_and_futures(); + let thing = instance.thing(); + + let it = store + .run_concurrent(async |store| { + let expected = "Beware the Jubjub bird, and shun\n\tThe frumious Bandersnatch!"; + let ((mut rx1, rx2), task) = instance + .call_dropped_future_reader(store, expected.into()) + .await?; + // Close the future without reading the value. This will + // force the sender to retake ownership of the value it + // tried to write. + rx1.close_with(store); + + let received = Arc::new(Mutex::new(None::)); + store.with(|store| { + rx2.pipe(store, OptionConsumer::new(received.clone(), delay)) + }); + + task.block(store).await; + + let it = received.lock().unwrap().take().unwrap(); + + assert_eq!(expected, &thing.call_get(store, it).await?.0); + + anyhow::Ok(it) + }) + .await??; + + it.resource_drop_async::<()>(&mut store).await?; + + anyhow::Ok(()) + })?; + + Ok(()) + }) +} + +#[test] +fn dropped_future_reader_host() -> Result<()> { + test_dropped_future_reader_host(false) +} + +#[test] +fn dropped_future_reader_host_with_delay() -> Result<()> { + test_dropped_future_reader_host(true) +} + +fn test_dropped_future_reader_host(delay: bool) -> Result<()> { + TESTER.test(|world, store, runtime| { + runtime.block_on(async { + let instance = world.componentize_py_test_streams_and_futures(); + + store + .run_concurrent(async |store| { + let expected = "Beware the Jubjub bird, and shun\n\tThe frumious Bandersnatch!"; + let ((mut rx1, rx2), task) = instance + .call_dropped_future_reader_host(store, expected.into()) + .await?; + // Close the future without reading the value. This will + // force the sender to retake ownership of the value it + // tried to write. + rx1.close_with(store); + + let received = Arc::new(Mutex::new(None::>)); + store.with(|store| { + rx2.pipe(store, OptionConsumer::new(received.clone(), delay)) + }); + + task.block(store).await; + + let it = store.with(|mut store| { + anyhow::Ok( + store + .get() + .table + .delete(received.lock().unwrap().take().unwrap())? + .0, + ) + })?; + + assert_eq!(expected, &it); + + anyhow::Ok(it) + }) + .await? + })?; + + Ok(()) + }) +} diff --git a/src/test/wit/tests.wit b/src/test/wit/tests.wit index cf9ca8b..39b91f3 100644 --- a/src/test/wit/tests.wit +++ b/src/test/wit/tests.wit @@ -142,7 +142,16 @@ interface resource-borrow-in-record { test: func(a: list) -> list; } +interface host-thing-interface { + resource host-thing { + constructor(s: string); + get: async func() -> string; + } +} + interface streams-and-futures { + use host-thing-interface.{host-thing}; + resource thing { constructor(s: string); get: async func() -> string; @@ -151,6 +160,9 @@ interface streams-and-futures { echo-stream-u8: async func(s: stream) -> stream; echo-future-string: async func(f: future) -> future; short-reads: async func(s: stream) -> stream; + short-reads-host: async func(s: stream) -> stream; + dropped-future-reader: async func(value: string) -> tuple, future>; + dropped-future-reader-host: async func(value: string) -> tuple, future>; } world tests {