22#
33# SPDX-License-Identifier: MPL-2.0
44
5- import threading
6- import time
7- import cv2
85import io
9- import os
10- import re
6+ import warnings
117from PIL import Image
8+ from arduino .app_peripherals .camera import Camera , CameraReadError as CRE , CameraOpenError as COE
9+ from arduino .app_utils .image .image_editor import compressed_to_png , letterboxed
1210from arduino .app_utils import Logger
1311
1412logger = Logger ("USB Camera" )
1513
14+ CameraReadError = CRE
1615
17- class CameraReadError (Exception ):
18- """Exception raised when the specified camera cannot be found."""
19-
20- pass
21-
22-
23- class CameraOpenError (Exception ):
24- """Exception raised when the camera cannot be opened."""
25-
26- pass
16+ CameraOpenError = COE
2717
2818
19+ @warnings .deprecated ("Use the Camera peripheral instead of this one" )
2920class USBCamera :
3021 """Represents an input peripheral for capturing images from a USB camera device.
3122 This class uses OpenCV to interface with the camera and capture images.
@@ -34,7 +25,7 @@ class USBCamera:
3425 def __init__ (
3526 self ,
3627 camera : int = 0 ,
37- resolution : tuple [int , int ] = ( None , None ) ,
28+ resolution : tuple [int , int ] = None ,
3829 fps : int = 10 ,
3930 compression : bool = False ,
4031 letterbox : bool = False ,
@@ -48,35 +39,26 @@ def __init__(
4839 compression (bool): Whether to compress the captured images. If True, images are compressed to PNG format.
4940 letterbox (bool): Whether to apply letterboxing to the captured images.
5041 """
51- video_devices = self ._get_video_devices_by_index ()
52- if camera in video_devices :
53- self .camera = int (video_devices [camera ])
54- else :
55- raise CameraOpenError (
56- f"Not available camera at index 0 { camera } . Verify the connected cameras and fi cameras are listed "
57- f"inside devices listed here: /dev/v4l/by-id"
58- )
59-
60- self .resolution = resolution
61- self .fps = fps
6242 self .compression = compression
63- self .letterbox = letterbox
64- self ._cap = None
65- self ._cap_lock = threading .Lock ()
66- self ._last_capture_time_monotonic = time .monotonic ()
67- if self .fps > 0 :
68- self .desired_interval = 1.0 / self .fps
69- else :
70- # Capture as fast as possible
71- self .desired_interval = 0
43+
44+ pipe = None
45+ if compression :
46+ pipe = compressed_to_png ()
47+ if letterbox :
48+ pipe = pipe | letterboxed () if pipe else letterboxed ()
49+
50+ self ._wrapped_camera = Camera (source = camera ,
51+ resolution = resolution ,
52+ fps = fps ,
53+ adjustments = pipe )
7254
7355 def capture (self ) -> Image .Image | None :
7456 """Captures a frame from the camera, blocking to respect the configured FPS.
7557
7658 Returns:
7759 PIL.Image.Image | None: The captured frame as a PIL Image, or None if no frame is available.
7860 """
79- image_bytes = self ._extract_frame ()
61+ image_bytes = self ._wrapped_camera . capture ()
8062 if image_bytes is None :
8163 return None
8264 try :
@@ -95,157 +77,18 @@ def capture_bytes(self) -> bytes | None:
9577 Returns:
9678 bytes | None: The captured frame as a bytes array, or None if no frame is available.
9779 """
98- frame = self ._extract_frame ()
80+ frame = self ._wrapped_camera . capture ()
9981 if frame is None :
10082 return None
10183 return frame .tobytes ()
10284
103- def _extract_frame (self ) -> cv2 .typing .MatLike | None :
104- # Without locking, 'elapsed_time' could be a stale value but this scenario is unlikely to be noticeable in
105- # practice, also its effects would disappear in the next capture. This optimization prevents us from calling
106- # time.sleep while holding a lock.
107- current_time_monotonic = time .monotonic ()
108- elapsed_time = current_time_monotonic - self ._last_capture_time_monotonic
109- if elapsed_time < self .desired_interval :
110- sleep_duration = self .desired_interval - elapsed_time
111- time .sleep (sleep_duration ) # Keep time.sleep out of the locked section!
112-
113- with self ._cap_lock :
114- if self ._cap is None :
115- return None
116-
117- ret , bgr_frame = self ._cap .read ()
118- if not ret :
119- raise CameraReadError (f"Failed to read from camera { self .camera } ." )
120- self ._last_capture_time_monotonic = time .monotonic ()
121- if bgr_frame is None :
122- # No frame available, skip this iteration
123- return None
124-
125- try :
126- if self .letterbox :
127- bgr_frame = self ._letterbox (bgr_frame )
128- if self .compression :
129- success , rgb_frame = cv2 .imencode (".png" , bgr_frame )
130- if success :
131- return rgb_frame
132- else :
133- return None
134- else :
135- return cv2 .cvtColor (bgr_frame , cv2 .COLOR_BGR2RGB )
136- except cv2 .error as e :
137- logger .exception (f"Error converting frame: { e } " )
138- return None
139-
140- def _letterbox (self , frame : cv2 .typing .MatLike ) -> cv2 .typing .MatLike :
141- """Applies letterboxing to the frame to make it square.
142-
143- Args:
144- frame (cv2.typing.MatLike): The input frame to be letterboxed (as cv2 supported format - numpy like).
145-
146- Returns:
147- cv2.typing.MatLike: The letterboxed frame (as cv2 supported format - numpy like).
148- """
149- h , w = frame .shape [:2 ]
150- if w != h :
151- # Letterbox: add padding to make it square (yolo colors)
152- size = max (h , w )
153- return cv2 .copyMakeBorder (
154- frame ,
155- top = (size - h ) // 2 ,
156- bottom = (size - h + 1 ) // 2 ,
157- left = (size - w ) // 2 ,
158- right = (size - w + 1 ) // 2 ,
159- borderType = cv2 .BORDER_CONSTANT ,
160- value = (114 , 114 , 114 ),
161- )
162- else :
163- return frame
164-
165- def _get_video_devices_by_index (self ):
166- """Reads symbolic links in /dev/v4l/by-id/, resolves them, and returns a
167- dictionary mapping the numeric index to the system /dev/videoX device.
168-
169- Returns:
170- dict[int, str]: a dict where keys are ordinal integer indices (e.g., 0, 1) and values are the
171- /dev/videoX device names (e.g., "0", "1").
172- """
173- devices_by_index = {}
174- directory_path = "/dev/v4l/by-id/"
175-
176- # Check if the directory exists
177- if not os .path .exists (directory_path ):
178- logger .error (f"Error: Directory '{ directory_path } ' not found." )
179- return devices_by_index
180-
181- try :
182- # List all entries in the directory
183- entries = os .listdir (directory_path )
184-
185- for entry in entries :
186- full_path = os .path .join (directory_path , entry )
187-
188- # Check if the entry is a symbolic link
189- if os .path .islink (full_path ):
190- # Use a regular expression to find the numeric index at the end of the filename
191- match = re .search (r"index(\d+)$" , entry )
192- if match :
193- index_str = match .group (1 )
194- try :
195- index = int (index_str )
196-
197- # Resolve the symbolic link to its absolute path
198- resolved_path = os .path .realpath (full_path )
199-
200- # Get just the filename (e.g., "video0") from the resolved path
201- device_name = os .path .basename (resolved_path )
202-
203- # Remove the "video" prefix to get just the number
204- device_number = device_name .replace ("video" , "" )
205-
206- # Add the index and device number to the dictionary
207- devices_by_index [index ] = device_number
208-
209- except ValueError :
210- logger .warning (f"Warning: Could not convert index '{ index_str } ' to an integer for '{ entry } '. Skipping." )
211- continue
212- except OSError as e :
213- logger .error (f"Error accessing directory '{ directory_path } ': { e } " )
214- return devices_by_index
215-
216- return devices_by_index
217-
21885 def start (self ):
21986 """Starts the camera capture."""
220- with self ._cap_lock :
221- if self ._cap is not None :
222- return
223-
224- temp_cap = cv2 .VideoCapture (self .camera )
225- if not temp_cap .isOpened ():
226- raise CameraOpenError (f"Failed to open camera { self .camera } ." )
227-
228- self ._cap = temp_cap # Assign only after successful initialization
229- self ._last_capture_time_monotonic = time .monotonic ()
230-
231- if self .resolution [0 ] is not None and self .resolution [1 ] is not None :
232- self ._cap .set (cv2 .CAP_PROP_FRAME_WIDTH , self .resolution [0 ])
233- self ._cap .set (cv2 .CAP_PROP_FRAME_HEIGHT , self .resolution [1 ])
234- # Verify if setting resolution was successful
235- actual_width = self ._cap .get (cv2 .CAP_PROP_FRAME_WIDTH )
236- actual_height = self ._cap .get (cv2 .CAP_PROP_FRAME_HEIGHT )
237- if actual_width != self .resolution [0 ] or actual_height != self .resolution [1 ]:
238- logger .warning (
239- f"Camera { self .camera } could not be set to { self .resolution [0 ]} x{ self .resolution [1 ]} , "
240- f"actual resolution: { int (actual_width )} x{ int (actual_height )} " ,
241- )
87+ self ._wrapped_camera .start ()
24288
24389 def stop (self ):
24490 """Stops the camera and releases its resources."""
245- with self ._cap_lock :
246- if self ._cap is not None :
247- self ._cap .release ()
248- self ._cap = None
91+ self ._wrapped_camera .stop ()
24992
25093 def produce (self ):
25194 """Alias for capture method."""
0 commit comments