Skip to content

Commit 964ce99

Browse files
committed
fix: pipelining
1 parent 20e21a3 commit 964ce99

File tree

11 files changed

+790
-221
lines changed

11 files changed

+790
-221
lines changed

src/arduino/app_peripherals/camera/base_camera.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,20 @@ class BaseCamera(ABC):
2323
providing a unified API regardless of the underlying camera protocol or type.
2424
"""
2525

26-
def __init__(self, resolution: Optional[Tuple[int, int]] = None, fps: int = 10,
27-
transformer: Optional[Callable[[np.ndarray], np.ndarray]] = None, **kwargs):
26+
def __init__(self, resolution: Optional[Tuple[int, int]] = (640, 480), fps: int = 10,
27+
adjuster: Optional[Callable[[np.ndarray], np.ndarray]] = None, **kwargs):
2828
"""
2929
Initialize the camera base.
3030
3131
Args:
3232
resolution (tuple, optional): Resolution as (width, height). None uses default resolution.
3333
fps (int): Frames per second for the camera.
34-
transformer (callable, optional): Function to transform frames that takes a numpy array and returns a numpy array. Default: None
34+
adjuster (callable, optional): Function pipeline to adjust frames that takes a numpy array and returns a numpy array. Default: None
3535
**kwargs: Additional camera-specific parameters.
3636
"""
3737
self.resolution = resolution
3838
self.fps = fps
39-
self.transformer = transformer
39+
self.adjuster = adjuster
4040
self._is_started = False
4141
self._cap_lock = threading.Lock()
4242
self._last_capture_time = time.monotonic()
@@ -100,13 +100,11 @@ def _extract_frame(self) -> Optional[np.ndarray]:
100100

101101
self._last_capture_time = time.monotonic()
102102

103-
if self.transformer is None:
104-
return frame
105-
106-
try:
107-
frame = frame | self.transformer
108-
except Exception as e:
109-
raise CameraTransformError(f"Frame transformation failed ({self.transformer}): {e}")
103+
if self.adjuster is not None:
104+
try:
105+
frame = self.adjuster(frame)
106+
except Exception as e:
107+
raise CameraTransformError(f"Frame transformation failed ({self.adjuster}): {e}")
110108

111109
return frame
112110

src/arduino/app_peripherals/camera/camera.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def __new__(cls, source: Union[str, int] = 0, **kwargs) -> BaseCamera:
3232
resolution (tuple, optional): Frame resolution as (width, height).
3333
Default: None (auto)
3434
fps (int, optional): Target frames per second. Default: 10
35-
transformer (callable, optional): Function to transform frames that takes a
35+
adjuster (callable, optional): Function pipeline to adjust frames that takes a
3636
numpy array and returns a numpy array. Default: None
3737
V4L Camera Parameters:
3838
device_index (int, optional): V4L device index override

src/arduino/app_peripherals/camera/examples/websocket_camera_proxy.py

Lines changed: 43 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@
2424

2525
import os
2626

27+
import numpy as np
28+
2729
from arduino.app_peripherals.camera import Camera
30+
from arduino.app_utils.image.image_editor import ImageEditor
2831

2932
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..', '..'))
3033

@@ -53,48 +56,37 @@ async def connect_output_tcp(output_host: str, output_port: int):
5356
"""Connect to the output TCP server."""
5457
global output_writer, output_reader
5558

56-
logger.info(f"Connecting to TCP server at {output_host}:{output_port}...")
59+
logger.info(f"Connecting to output server at {output_host}:{output_port}...")
5760

5861
try:
5962
output_reader, output_writer = await asyncio.open_connection(
6063
output_host, output_port
6164
)
62-
logger.info("TCP connection established successfully")
63-
64-
return True
65+
logger.info("Connected successfully to output server")
6566

6667
except Exception as e:
67-
logger.error(f"Failed to connect to TCP server: {e}")
68-
return False
68+
raise Exception(f"Failed to connect to output server: {e}")
6969

7070

71-
async def forward_frame(frame, quality: int):
71+
async def forward_frame(frame: np.ndarray):
7272
"""Forward a frame to the output TCP server as raw JPEG."""
7373
global output_writer
7474

7575
if not output_writer or output_writer.is_closing():
7676
return
7777

7878
try:
79-
# Frame is already a PIL.Image.Image in JPEG format
80-
# Convert PIL image to bytes
81-
import io
82-
img_bytes = io.BytesIO()
83-
frame.save(img_bytes, format='JPEG', quality=quality)
84-
frame_data = img_bytes.getvalue()
85-
86-
# Send raw JPEG binary data
87-
output_writer.write(frame_data)
79+
output_writer.write(frame.tobytes())
8880
await output_writer.drain()
8981

9082
except ConnectionResetError:
91-
logger.warning("TCP connection reset while forwarding frame")
83+
logger.warning("Output connection reset while forwarding frame")
9284
output_writer = None
9385
except Exception as e:
9486
logger.error(f"Error forwarding frame: {e}")
9587

9688

97-
async def camera_loop(fps: int, quality: int):
89+
async def camera_loop(fps: int):
9890
"""Main camera capture and forwarding loop."""
9991
global running, camera
10092

@@ -111,21 +103,26 @@ async def camera_loop(fps: int, quality: int):
111103
try:
112104
# Read frame from WebSocketCamera
113105
frame = camera.capture()
114-
115-
if frame is not None:
116-
# Rate limiting
117-
current_time = time.time()
118-
time_since_last = current_time - last_frame_time
119-
if time_since_last < frame_interval:
120-
await asyncio.sleep(frame_interval - time_since_last)
121-
122-
last_frame_time = time.time()
123-
124-
# Forward frame if output TCP connection is available
125-
await forward_frame(frame, quality)
126-
else:
106+
# frame = ImageEditor.compress_to_jpeg(frame, 80.1)
107+
if frame is None:
127108
# No frame available, small delay to avoid busy waiting
128109
await asyncio.sleep(0.01)
110+
continue
111+
112+
# Rate limiting
113+
current_time = time.time()
114+
time_since_last = current_time - last_frame_time
115+
if time_since_last < frame_interval:
116+
await asyncio.sleep(frame_interval - time_since_last)
117+
118+
last_frame_time = time.time()
119+
120+
if output_writer is None or output_writer.is_closing():
121+
# Output connection is not available, give room to the other tasks
122+
await asyncio.sleep(0.01)
123+
else:
124+
# Forward frame if output connection is available
125+
await forward_frame(frame)
129126

130127
except Exception as e:
131128
logger.error(f"Error in camera loop: {e}")
@@ -138,18 +135,16 @@ async def maintain_output_connection(output_host: str, output_port: int, reconne
138135

139136
while running:
140137
try:
141-
# Establish connection
142-
if await connect_output_tcp(output_host, output_port):
143-
logger.info("TCP connection established, maintaining...")
138+
await connect_output_tcp(output_host, output_port)
139+
140+
# Keep monitoring
141+
while running and output_writer and not output_writer.is_closing():
142+
await asyncio.sleep(1.0)
144143

145-
# Keep connection alive
146-
while running and output_writer and not output_writer.is_closing():
147-
await asyncio.sleep(1.0)
148-
149-
logger.info("TCP connection lost")
144+
logger.info("Lost connection to output server")
150145

151146
except Exception as e:
152-
logger.error(f"TCP connection error: {e}")
147+
logger.error(e)
153148
finally:
154149
# Clean up connection
155150
if output_writer:
@@ -163,7 +158,7 @@ async def maintain_output_connection(output_host: str, output_port: int, reconne
163158

164159
# Wait before reconnecting
165160
if running:
166-
logger.info(f"Reconnecting to TCP server in {reconnect_delay} seconds...")
161+
logger.info(f"Reconnecting to output server in {reconnect_delay} seconds...")
167162
await asyncio.sleep(reconnect_delay)
168163

169164

@@ -172,10 +167,12 @@ async def main():
172167
global running, camera
173168

174169
parser = argparse.ArgumentParser(description="WebSocket Camera Proxy")
170+
parser.add_argument("--input-host", default="localhost",
171+
help="WebSocketCamera input host (default: localhost)")
175172
parser.add_argument("--input-port", type=int, default=8080,
176173
help="WebSocketCamera input port (default: 8080)")
177-
parser.add_argument("--output-host", default="0.0.0.0",
178-
help="Output TCP server host (default: 0.0.0.0)")
174+
parser.add_argument("--output-host", default="localhost",
175+
help="Output TCP server host (default: localhost)")
179176
parser.add_argument("--output-port", type=int, default=5000,
180177
help="Output TCP server port (default: 5000)")
181178
parser.add_argument("--fps", type=int, default=30,
@@ -204,11 +201,12 @@ async def main():
204201
logger.info(f"Target FPS: {args.fps}")
205202

206203
from arduino.app_utils.image.image_editor import compressed_to_jpeg
207-
camera = Camera("ws://0.0.0.0:5000", transformer=compressed_to_jpeg(80))
204+
camera = Camera(f"ws://{args.input_host}:{args.input_port}", adjuster=compressed_to_jpeg(80))
205+
# camera = Camera(f"ws://{args.input_host}:{args.input_port}")
208206

209207
try:
210208
# Start camera input and output connection tasks
211-
camera_task = asyncio.create_task(camera_loop(args.fps, args.quality))
209+
camera_task = asyncio.create_task(camera_loop(args.fps))
212210
connection_task = asyncio.create_task(maintain_output_connection(args.output_host, args.output_port, reconnect_delay))
213211

214212
# Run both tasks concurrently

src/arduino/app_peripherals/camera/examples/websocket_client_streamer.py

Lines changed: 17 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
import sys
1414
import time
1515

16+
from arduino.app_peripherals.camera import Camera
17+
from arduino.app_utils.image.image_editor import ImageEditor
18+
1619
logging.basicConfig(
1720
level=logging.INFO,
1821
format='%(asctime)s - %(levelname)s - %(message)s'
@@ -82,44 +85,33 @@ async def stop(self):
8285
logger.warning(f"Error closing WebSocket: {e}")
8386

8487
if self.camera:
85-
self.camera.release()
86-
logger.info("Camera released")
88+
self.camera.stop()
89+
logger.info("Camera stopped")
8790

8891
logger.info("Webcam streamer stopped")
8992

9093
async def _camera_loop(self):
9194
"""Main camera capture loop."""
9295
logger.info(f"Opening camera {self.camera_id}...")
93-
self.camera = cv2.VideoCapture(self.camera_id)
96+
self.camera = Camera(self.camera_id, resolution=(FRAME_WIDTH, FRAME_HEIGHT), fps=self.fps)
97+
self.camera.start()
9498

95-
if not self.camera.isOpened():
99+
if not self.camera.is_started():
96100
logger.error(f"Failed to open camera {self.camera_id}")
97101
return
98102

99-
self.camera.set(cv2.CAP_PROP_FPS, self.fps)
100-
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, FRAME_WIDTH)
101-
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, FRAME_HEIGHT)
102-
103-
# Verify the resolution was set correctly
104-
actual_width = int(self.camera.get(cv2.CAP_PROP_FRAME_WIDTH))
105-
actual_height = int(self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT))
106-
actual_fps = self.camera.get(cv2.CAP_PROP_FPS)
107-
108-
if actual_width != FRAME_WIDTH or actual_height != FRAME_HEIGHT:
109-
logger.warning(f"Camera resolution mismatch! Requested {FRAME_WIDTH}x{FRAME_HEIGHT}, got {actual_width}x{actual_height}")
110-
111103
logger.info("Camera opened successfully")
112104

113105
last_frame_time = time.time()
114106

115107
while self.running:
116108
try:
117-
ret, frame = self.camera.read()
118-
if not ret:
109+
frame = self.camera.capture()
110+
if frame is None:
119111
logger.warning("Failed to capture frame")
120112
await asyncio.sleep(0.1)
121113
continue
122-
114+
123115
# Rate limiting to enforce frame rate
124116
current_time = time.time()
125117
time_since_last = current_time - last_frame_time
@@ -135,6 +127,8 @@ async def _camera_loop(self):
135127
logger.warning("WebSocket connection lost during frame send")
136128
self.websocket = None
137129

130+
await asyncio.sleep(0.001)
131+
138132
except Exception as e:
139133
logger.error(f"Error in camera loop: {e}")
140134
await asyncio.sleep(1.0)
@@ -191,9 +185,6 @@ async def _handle_websocket_messages(self):
191185
logger.info(f"Server goodbye: {data.get('message', 'Disconnecting')}")
192186
break
193187

194-
elif data.get("status") == "dropping_frames":
195-
logger.warning(f"Server warning: {data.get('message', 'Dropping frames!')}")
196-
197188
elif data.get("error"):
198189
logger.warning(f"Server error: {data.get('message', 'Unknown error')}")
199190
if data.get("code") == 1000: # Server busy
@@ -216,36 +207,18 @@ async def _send_frame(self, frame):
216207
try:
217208
if self.server_frame_format == "binary":
218209
# Encode frame as JPEG and send binary data
219-
encode_params = [cv2.IMWRITE_JPEG_QUALITY, self.quality]
220-
success, encoded_frame = cv2.imencode('.jpg', frame, encode_params)
221-
222-
if not success:
223-
logger.warning("Failed to encode frame")
224-
return
225-
210+
encoded_frame = ImageEditor.compress_to_jpeg(frame)
226211
await self.websocket.send(encoded_frame.tobytes())
227212

228213
elif self.server_frame_format == "base64":
229214
# Encode frame as JPEG and send base64 data
230-
encode_params = [cv2.IMWRITE_JPEG_QUALITY, self.quality]
231-
success, encoded_frame = cv2.imencode('.jpg', frame, encode_params)
232-
233-
if not success:
234-
logger.warning("Failed to encode frame")
235-
return
236-
215+
encoded_frame = ImageEditor.compress_to_jpeg(frame)
237216
frame_b64 = base64.b64encode(encoded_frame.tobytes()).decode('utf-8')
238217
await self.websocket.send(frame_b64)
239218

240219
elif self.server_frame_format == "json":
241220
# Encode frame as JPEG, base64 encode and wrap in JSON
242-
encode_params = [cv2.IMWRITE_JPEG_QUALITY, self.quality]
243-
success, encoded_frame = cv2.imencode('.jpg', frame, encode_params)
244-
245-
if not success:
246-
logger.warning("Failed to encode frame")
247-
return
248-
221+
encoded_frame = ImageEditor.compress_to_jpeg(frame)
249222
frame_b64 = base64.b64encode(encoded_frame.tobytes()).decode('utf-8')
250223
message = json.dumps({"image": frame_b64})
251224
await self.websocket.send(message)
@@ -269,7 +242,7 @@ def signal_handler(signum, frame):
269242
async def main():
270243
"""Main function."""
271244
parser = argparse.ArgumentParser(description="WebSocket Camera Client Streamer")
272-
parser.add_argument("--host", default="127.0.0.1", help="WebSocket server host (default: 127.0.0.1)")
245+
parser.add_argument("--host", default="localhost", help="WebSocket server host (default: localhost)")
273246
parser.add_argument("--port", type=int, default=8080, help="WebSocket server port (default: 8080)")
274247
parser.add_argument("--camera", type=int, default=0, help="Camera device ID (default: 0)")
275248
parser.add_argument("--fps", type=int, default=30, help="Target FPS (default: 30)")

src/arduino/app_peripherals/camera/v4l_camera.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,17 @@ def _open_camera(self) -> None:
129129
f"Camera {self.camera_id} resolution set to {actual_width}x{actual_height} "
130130
f"instead of requested {self.resolution[0]}x{self.resolution[1]}"
131131
)
132+
self.resolution = (actual_width, actual_height)
133+
134+
if self.fps:
135+
self._cap.set(cv2.CAP_PROP_FPS, self.fps)
136+
137+
actual_fps = int(self._cap.get(cv2.CAP_PROP_FPS))
138+
if actual_fps != self.fps:
139+
logger.warning(
140+
f"Camera {self.camera_id} FPS set to {actual_fps} instead of requested {self.fps}"
141+
)
142+
self.fps = actual_fps
132143

133144
logger.info(f"Opened V4L camera {self.camera_id}")
134145

0 commit comments

Comments
 (0)