1+ #pragma once
2+
3+ #ifndef MQTT_MAX_PACKET_SIZE
4+ #define MQTT_MAX_PACKET_SIZE 30000
5+ #endif
6+
7+ #include < WiFi.h>
8+ #include < MQTT.h>
9+ #include " ./extra/exception.h"
10+
11+ using Eloquent::Error::Exception;
12+
13+ typedef struct {
14+ uint8_t *buf;
15+ uint32_t len;
16+ } mqtt_frame_t ;
17+
18+ namespace Eloquent {
19+ namespace Esp32cam {
20+ /* *
21+ * Stream camera frames over MQTT
22+ */
23+ class MqttStream {
24+ public:
25+ Exception exception;
26+
27+ /* *
28+ *
29+ */
30+ MqttStream () :
31+ exception (" MqttStream" ),
32+ _throttle (100 ) {
33+
34+ }
35+
36+ /* *
37+ * Don't send frames too often
38+ * @param ms
39+ */
40+ void throttle (size_t ms) {
41+ _throttle = ms;
42+ }
43+
44+ /* *
45+ *
46+ * @return
47+ */
48+ Exception& begin (String clientID) {
49+ _clientID = clientID;
50+ _mqttClient.begin (" mqtt.eloquentarduino.com" , 1883 , _wifiClient);
51+
52+ if (WiFi.status () != WL_CONNECTED)
53+ return exception.set (" WiFi not connected" );
54+
55+ if (!_mqttClient.connect (_clientID.c_str (), _clientID.c_str (), " password" )) {
56+ ESP_LOGE (" MqttStream" , " Can't connect to MQTT broker. Are you sure your CLIENT_ID is valid?" );
57+ return exception.set (" Can't connect to MQTT broker" );
58+ }
59+
60+ _queue = xQueueCreate (1 , sizeof (mqtt_frame_t ));
61+
62+ xTaskCreate ([](void *args) {
63+ MqttStream *self = (MqttStream*) args;
64+ mqtt_frame_t frame;
65+ String topic = String (" /" ) + self->_clientID + " /stream" ;
66+
67+ while (true ) {
68+ // await for new frame
69+ if (xQueueReceive (self->_queue , &frame, 10 ) != pdPASS)
70+ continue ;
71+
72+ if (WiFi.status () != WL_CONNECTED) {
73+ ESP_LOGE (" MqttStream" , " WiFi not connected" );
74+ delay (500 );
75+ continue ;
76+ }
77+
78+ if (!self->_mqttClient .connected ()) {
79+ ESP_LOGE (" MqttStream" , " Can't connect to MQTT broker. Are you sure your CLIENT_ID is valid?" );
80+ delay (10 );
81+ continue ;
82+ }
83+
84+ // publish
85+ if (frame.len > 0 ) {
86+ if (frame.len > MQTT_MAX_PACKET_SIZE) {
87+ ESP_LOGE (" MqttStream" , " Frame too large (%d bytes, max %d allowed)" , frame.len , MQTT_MAX_PACKET_SIZE);
88+ continue ;
89+ }
90+
91+ if (!self->_mqttClient .publish (topic.c_str (), (const char *) frame.buf , frame.len ))
92+ ESP_LOGE (" MqttStream" , " Can't publish frame" );
93+ }
94+
95+ // self->_mqttClient.loop();
96+ }
97+ }, " MqttStream" , 50000 , this , 0 , NULL );
98+
99+ return exception.clear ();
100+ }
101+
102+ /* *
103+ *
104+ * @tparam Camera
105+ * @param camera
106+ * @return
107+ */
108+ template <typename Camera>
109+ Exception& queue (Camera& camera) {
110+ if (_queue == NULL )
111+ return exception.set (" Queue is not initialized. Did you call begin()?" );
112+
113+ const size_t now = millis ();
114+
115+ // throttle
116+ if (now > _queuedAt && now - _queuedAt < _throttle)
117+ return exception.clear ();
118+
119+ // discard old frame, if not consumed yet
120+ if (uxQueueSpacesAvailable (_queue) > 0 )
121+ xQueueReset (_queue);
122+
123+ if (!camera.hasFrame ())
124+ return exception.set (" Cannot queue empty frame" );
125+
126+ mqtt_frame_t frame = {
127+ .buf = camera.frame ->buf ,
128+ .len = camera.frame ->len
129+ };
130+
131+ if (xQueueSendToFront (_queue, (void *) &frame, 0 ) != pdTRUE)
132+ return exception.set (" Cannot queue frame" );
133+
134+ _queuedAt = millis ();
135+ return exception.clear ();
136+ }
137+
138+ protected:
139+ size_t _queuedAt;
140+ size_t _throttle;
141+ String _clientID;
142+ String _location;
143+ WiFiClient _wifiClient;
144+ MQTTClient _mqttClient;
145+ QueueHandle_t _queue;
146+ };
147+ }
148+ }
149+
150+ namespace eloq {
151+ static Eloquent::Esp32cam::MqttStream mqttStream;
152+ }
0 commit comments