44
55import threading
66import time
7- import io
87from abc import ABC , abstractmethod
9- from typing import Optional , Tuple
10- from PIL import Image
11- import cv2
8+ from typing import Optional , Tuple , Callable
129import numpy as np
1310
1411from arduino .app_utils import Logger
1512
16- from .errors import CameraOpenError
13+ from .errors import CameraOpenError , CameraTransformError
1714
1815logger = Logger ("Camera" )
1916
@@ -27,21 +24,19 @@ class BaseCamera(ABC):
2724 """
2825
2926 def __init__ (self , resolution : Optional [Tuple [int , int ]] = None , fps : int = 10 ,
30- compression : bool = False , letterbox : bool = False , ** kwargs ):
27+ transformer : Optional [ Callable [[ np . ndarray ], np . ndarray ]] = None , ** kwargs ):
3128 """
3229 Initialize the camera base.
3330
3431 Args:
35- resolution: Resolution as (width, height). None uses default resolution.
36- fps: Frames per second for the camera.
37- compression: Whether to compress captured images to PNG format.
38- letterbox: Whether to apply letterboxing to make images square.
32+ resolution (tuple, optional): Resolution as (width, height). None uses default resolution.
33+ fps (int): Frames per second for the camera.
34+ transformer (callable, optional): Function to transform frames that takes a numpy array and returns a numpy array. Default: None
3935 **kwargs: Additional camera-specific parameters.
4036 """
4137 self .resolution = resolution
4238 self .fps = fps
43- self .compression = compression
44- self .letterbox = letterbox
39+ self .transformer = transformer
4540 self ._is_started = False
4641 self ._cap_lock = threading .Lock ()
4742 self ._last_capture_time = time .monotonic ()
@@ -74,52 +69,17 @@ def stop(self) -> None:
7469 except Exception as e :
7570 logger .warning (f"Error stopping camera: { e } " )
7671
77- def capture (self ) -> Optional [Image . Image ]:
72+ def capture (self ) -> Optional [np . ndarray ]:
7873 """
7974 Capture a frame from the camera, respecting the configured FPS.
8075
8176 Returns:
82- PIL Image or None if no frame is available.
77+ Numpy array or None if no frame is available.
8378 """
8479 frame = self ._extract_frame ()
8580 if frame is None :
8681 return None
87-
88- try :
89- if self .compression :
90- # Convert to PNG bytes first, then to PIL Image
91- success , encoded = cv2 .imencode ('.png' , frame )
92- if success :
93- return Image .open (io .BytesIO (encoded .tobytes ()))
94- else :
95- return None
96- else :
97- # Convert BGR to RGB for PIL
98- if len (frame .shape ) == 3 and frame .shape [2 ] == 3 :
99- rgb_frame = cv2 .cvtColor (frame , cv2 .COLOR_BGR2RGB )
100- else :
101- rgb_frame = frame
102- return Image .fromarray (rgb_frame )
103- except Exception as e :
104- logger .exception (f"Error converting frame to PIL Image: { e } " )
105- return None
106-
107- def capture_bytes (self ) -> Optional [bytes ]:
108- """
109- Capture a frame and return as bytes.
110-
111- Returns:
112- Frame as bytes or None if no frame is available.
113- """
114- frame = self ._extract_frame ()
115- if frame is None :
116- return None
117-
118- if self .compression :
119- success , encoded = cv2 .imencode ('.png' , frame )
120- return encoded .tobytes () if success else None
121- else :
122- return frame .tobytes ()
82+ return frame
12383
12484 def _extract_frame (self ) -> Optional [np .ndarray ]:
12585 """Extract a frame with FPS throttling and post-processing."""
@@ -140,45 +100,24 @@ def _extract_frame(self) -> Optional[np.ndarray]:
140100
141101 self ._last_capture_time = time .monotonic ()
142102
143- # Apply post-processing
144- if self .letterbox :
145- frame = self ._letterbox (frame )
103+ if self .transformer is None :
104+ return frame
105+
106+ try :
107+ frame = frame | self .transformer
108+ except Exception as e :
109+ raise CameraTransformError (f"Frame transformation failed ({ self .transformer } ): { e } " )
146110
147111 return frame
148112
149- def _letterbox (self , frame : np .ndarray ) -> np .ndarray :
150- """Apply letterboxing to make the frame square."""
151- h , w = frame .shape [:2 ]
152- if w != h :
153- size = max (h , w )
154- return cv2 .copyMakeBorder (
155- frame ,
156- top = (size - h ) // 2 ,
157- bottom = (size - h + 1 ) // 2 ,
158- left = (size - w ) // 2 ,
159- right = (size - w + 1 ) // 2 ,
160- borderType = cv2 .BORDER_CONSTANT ,
161- value = (114 , 114 , 114 )
162- )
163- return frame
164-
165113 def is_started (self ) -> bool :
166114 """Check if the camera is started."""
167115 return self ._is_started
168116
169- def produce (self ) -> Optional [Image . Image ]:
117+ def produce (self ) -> Optional [np . ndarray ]:
170118 """Alias for capture method for compatibility."""
171119 return self .capture ()
172120
173- def __enter__ (self ):
174- """Context manager entry."""
175- self .start ()
176- return self
177-
178- def __exit__ (self , exc_type , exc_val , exc_tb ):
179- """Context manager exit."""
180- self .stop ()
181-
182121 @abstractmethod
183122 def _open_camera (self ) -> None :
184123 """Open the camera connection. Must be implemented by subclasses."""
@@ -193,3 +132,12 @@ def _close_camera(self) -> None:
193132 def _read_frame (self ) -> Optional [np .ndarray ]:
194133 """Read a single frame from the camera. Must be implemented by subclasses."""
195134 pass
135+
136+ def __enter__ (self ):
137+ """Context manager entry."""
138+ self .start ()
139+ return self
140+
141+ def __exit__ (self , exc_type , exc_val , exc_tb ):
142+ """Context manager exit."""
143+ self .stop ()
0 commit comments