Skip to content

Commit 62adb82

Browse files
authored
Merge pull request #131 from labthings/test-mjpeg-stream-2
Test MJPEGStream
2 parents 1f10cc4 + ca41f6c commit 62adb82

File tree

4 files changed

+133
-7
lines changed

4 files changed

+133
-7
lines changed

dev-requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ orjson==3.10.15
8181
# via fastapi
8282
packaging==24.2
8383
# via pytest
84+
pillow==11.3.0
85+
# via labthings-fastapi (pyproject.toml)
8486
pluggy==1.5.0
8587
# via pytest
8688
pydantic==2.10.6

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ dev = [
3030
"mypy>=1.6.1, <2",
3131
"ruff>=0.1.3",
3232
"types-jsonschema",
33+
"Pillow",
3334
]
3435

3536
[project.urls]

src/labthings_fastapi/outputs/mjpeg_stream.py

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,11 @@ def __init__(self, gen: AsyncGenerator[bytes, None], status_code: int = 200):
4141
4242
This response is initialised with an async generator that yields `bytes`
4343
objects, each of which is a JPEG file. We add the --frame markers and mime
44-
types that enable it to work in an `img` tag.
44+
types that mark it as an MJPEG stream. This is sufficient to enable it to
45+
work in an `img` tag, with the `src` set to the MJPEG stream's endpoint.
46+
47+
It expects an async generator that supplies individual JPEGs to be streamed,
48+
such as the one provided by `.MJPEGStream`.
4549
4650
NB the ``status_code`` argument is used by FastAPI to set the status code of
4751
the response in OpenAPI.
@@ -63,6 +67,24 @@ async def mjpeg_async_generator(self) -> AsyncGenerator[bytes, None]:
6367

6468

6569
class MJPEGStream:
70+
"""Manage streaming images over HTTP as an MJPEG stream
71+
72+
An MJPEGStream object handles accepting images (already in
73+
JPEG format) and streaming them to HTTP clients as a multipart
74+
response.
75+
76+
The minimum needed to make the stream work is to periodically
77+
call `add_frame` with JPEG image data.
78+
79+
To add a stream to a `.Thing`, use the `.MJPEGStreamDescriptor`
80+
which will handle creating an `MJPEGStream` object on first access,
81+
and will also add it to the HTTP API.
82+
83+
The MJPEG stream buffers the last few frames (10 by default) and
84+
also has a hook to notify the size of each frame as it is added.
85+
The latter is used by OpenFlexure's autofocus routine.
86+
"""
87+
6688
def __init__(self, ringbuffer_size: int = 10):
6789
self._lock = threading.Lock()
6890
self.condition = anyio.Condition()
@@ -85,10 +107,11 @@ def reset(self, ringbuffer_size: Optional[int] = None):
85107
]
86108
self.last_frame_i = -1
87109

88-
def stop(self):
110+
def stop(self, portal: BlockingPortal):
89111
"""Stop the stream"""
90112
with self._lock:
91113
self._streaming = False
114+
portal.start_task_soon(self.notify_stream_stopped)
92115

93116
async def ringbuffer_entry(self, i: int) -> RingbufferEntry:
94117
"""Return the ith frame acquired by the camera
@@ -117,9 +140,13 @@ async def buffer_for_reading(self, i: int) -> AsyncIterator[bytes]:
117140
yield entry.frame
118141

119142
async def next_frame(self) -> int:
120-
"""Wait for the next frame, and return its index"""
143+
"""Wait for the next frame, and return its index
144+
145+
:raises StopAsyncIteration: if the stream has stopped."""
121146
async with self.condition:
122147
await self.condition.wait()
148+
if not self._streaming:
149+
raise StopAsyncIteration()
123150
return self.last_frame_i
124151

125152
async def grab_frame(self) -> bytes:
@@ -148,6 +175,8 @@ async def frame_async_generator(self) -> AsyncGenerator[bytes, None]:
148175
i = await self.next_frame()
149176
async with self.buffer_for_reading(i) as frame:
150177
yield frame
178+
except StopAsyncIteration:
179+
break
151180
except Exception as e:
152181
logging.error(f"Error in stream: {e}, stream stopped")
153182
return
@@ -156,7 +185,7 @@ async def mjpeg_stream_response(self) -> MJPEGStreamResponse:
156185
"""Return a StreamingResponse that streams an MJPEG stream"""
157186
return MJPEGStreamResponse(self.frame_async_generator())
158187

159-
def add_frame(self, frame: bytes, portal: BlockingPortal):
188+
def add_frame(self, frame: bytes, portal: BlockingPortal) -> None:
160189
"""Return the next buffer in the ringbuffer to write to
161190
162191
:param frame: The frame to add
@@ -174,15 +203,31 @@ def add_frame(self, frame: bytes, portal: BlockingPortal):
174203
entry.index = self.last_frame_i + 1
175204
portal.start_task_soon(self.notify_new_frame, entry.index)
176205

177-
async def notify_new_frame(self, i):
178-
"""Notify any waiting tasks that a new frame is available"""
206+
async def notify_new_frame(self, i: int) -> None:
207+
"""Notify any waiting tasks that a new frame is available.
208+
209+
:param i: The number of the frame (which counts up since the server starts)
210+
"""
179211
async with self.condition:
180212
self.last_frame_i = i
181213
self.condition.notify_all()
182214

215+
async def notify_stream_stopped(self) -> None:
216+
"""Raise an exception in any waiting tasks to signal the stream has stopped."""
217+
assert self._streaming is False
218+
async with self.condition:
219+
self.condition.notify_all()
220+
183221

184222
class MJPEGStreamDescriptor:
185-
"""A descriptor that returns a MJPEGStream object when accessed"""
223+
"""A descriptor that returns a MJPEGStream object when accessed
224+
225+
If this descriptor is added to a `.Thing`, it will create an `.MJPEGStream`
226+
object when it is first accessed. It will also add two HTTP endpoints,
227+
one with the name of the descriptor serving the MJPEG stream, and another
228+
with `/viewer` appended, which serves a basic HTML page that views the stream.
229+
230+
"""
186231

187232
def __init__(self, **kwargs):
188233
self._kwargs = kwargs

tests/test_mjpeg_stream.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import io
2+
import threading
3+
import time
4+
from PIL import Image
5+
from fastapi.testclient import TestClient
6+
import labthings_fastapi as lt
7+
8+
9+
class Telly(lt.Thing):
10+
_stream_thread: threading.Thread
11+
_streaming: bool = False
12+
framerate: float = 1000
13+
frame_limit: int = 3
14+
15+
stream = lt.outputs.MJPEGStreamDescriptor()
16+
17+
def __enter__(self):
18+
self._streaming = True
19+
self._stream_thread = threading.Thread(target=self._make_images)
20+
self._stream_thread.start()
21+
22+
def __exit__(self, exc_t, exc_v, exc_tb):
23+
self._streaming = False
24+
self._stream_thread.join()
25+
26+
def _make_images(self):
27+
"""Stream a series of solid colours"""
28+
colours = ["#F00", "#0F0", "#00F"]
29+
jpegs = []
30+
for c in colours:
31+
image = Image.new("RGB", (10, 10), c)
32+
dest = io.BytesIO()
33+
image.save(dest, "jpeg")
34+
jpegs.append(dest.getvalue())
35+
36+
i = 0
37+
while self._streaming and (i < self.frame_limit or self.frame_limit < 0):
38+
self.stream.add_frame(
39+
jpegs[i % len(jpegs)], self._labthings_blocking_portal
40+
)
41+
time.sleep(1 / self.framerate)
42+
i = i + 1
43+
self.stream.stop(self._labthings_blocking_portal)
44+
self._streaming = False
45+
46+
47+
def test_mjpeg_stream():
48+
"""Verify the MJPEG stream contains at least one frame marker.
49+
50+
A limitation of the TestClient is that it can't actually stream.
51+
This means that all of the frames sent by our test Thing will
52+
arrive in a single packet.
53+
54+
For now, we just check it starts with the frame separator,
55+
but it might be possible in the future to check there are three
56+
images there.
57+
"""
58+
server = lt.ThingServer()
59+
telly = Telly()
60+
server.add_thing(telly, "telly")
61+
with TestClient(server.app) as client:
62+
with client.stream("GET", "/telly/stream") as stream:
63+
stream.raise_for_status()
64+
received = 0
65+
for b in stream.iter_bytes():
66+
received += 1
67+
assert b.startswith(b"--frame")
68+
69+
70+
if __name__ == "__main__":
71+
import uvicorn
72+
73+
server = lt.ThingServer()
74+
telly = Telly()
75+
telly.framerate = 6
76+
telly.frame_limit = -1
77+
server.add_thing(telly, "telly")
78+
uvicorn.run(server.app, port=5000)

0 commit comments

Comments
 (0)