@@ -79,7 +79,8 @@ static void dump(void* index, uint8_t* buf, int len)
7979 }
8080}
8181
82- VideoGstAnalyzer::VideoGstAnalyzer () {
82+ VideoGstAnalyzer::VideoGstAnalyzer (EventRegistry *handle) : m_asyncHandle(handle)
83+ {
8384 ELOG_INFO (" Init" );
8485 sourceid = 0 ;
8586 sink = NULL ;
@@ -94,78 +95,104 @@ VideoGstAnalyzer::VideoGstAnalyzer() {
9495 }
9596}
9697
97- VideoGstAnalyzer::~VideoGstAnalyzer () {
98+ VideoGstAnalyzer::~VideoGstAnalyzer ()
99+ {
98100 ELOG_DEBUG (" Closed all media in this Analyzer" );
99- if (pipeline_ != nullptr && pipelineHandle != nullptr ) {
100- destroyPlugin (pipeline_);
101- dlclose (pipelineHandle);
101+ destroyPipeline ();
102+ }
103+
104+ bool VideoGstAnalyzer::notifyAsyncEvent (const std::string& event, const std::string& data)
105+ {
106+ if (m_asyncHandle) {
107+ return m_asyncHandle->notifyAsyncEvent (event, data);
108+ } else {
109+ return false ;
102110 }
103- stopLoop ();
104111}
105112
106- gboolean VideoGstAnalyzer::StreamEventCallBack (GstBus *bus, GstMessage *message, gpointer data)
107- {
108- ELOG_DEBUG (" Got %s message\n " , GST_MESSAGE_TYPE_NAME (message));
109-
110- VideoGstAnalyzer* pStreamObj = static_cast <VideoGstAnalyzer*>(data);
111-
112- switch (GST_MESSAGE_TYPE (message)) {
113- case GST_MESSAGE_ERROR: {
114- GError *err;
115- gchar *debug;
116- gst_message_parse_error (message, &err, &debug);
117- ELOG_ERROR (" Error: %s\n " , err->message );
118- g_error_free (err);
119- g_free (debug);
120- g_main_loop_quit (pStreamObj->loop );
121- break ;
122- }
123- case GST_MESSAGE_EOS:
124- /* end-of-stream */
125- ELOG_ERROR (" End of stream\n " );
126- g_main_loop_quit (pStreamObj->loop );
127- break ;
128- case GST_MESSAGE_TAG:{
129- /* end-of-stream */
130- GstTagList *tags = NULL ;
131- gst_message_parse_tag (message, &tags);
132-
133- ELOG_DEBUG (" Got tags from element %s:\n " , GST_OBJECT_NAME (message->src ));
134- gst_tag_list_unref (tags);
135- break ;
136- }
137- case GST_MESSAGE_QOS:{
138- /* end-of-stream */
139- ELOG_DEBUG (" Got QOS message from %s \n " ,message->src ->name );
140- break ;
141- }
142- case GST_MESSAGE_STATE_CHANGED:{
143- GstState old_state, new_state, pending_state;
144- gst_message_parse_state_changed (message, &old_state, &new_state, &pending_state);
145- ELOG_DEBUG (" State change from %d to %d, play:%d \n " ,old_state, new_state, GST_STATE_PAUSED);
146- break ;
147- }
148- default :
149- /* unhandled message */
150- break ;
113+ bool VideoGstAnalyzer::notifyAsyncEventInEmergency (const std::string& event, const std::string& data)
114+ {
115+ if (m_asyncHandle) {
116+ return m_asyncHandle->notifyAsyncEventInEmergency (event, data);
117+ } else {
118+ return false ;
151119 }
152- return true ;
120+ }
121+
122+ gboolean VideoGstAnalyzer::StreamEventCallBack (GstBus *bus, GstMessage *message, gpointer data)
123+ {
124+ ELOG_DEBUG (" Got %s message\n " , GST_MESSAGE_TYPE_NAME (message));
125+
126+ VideoGstAnalyzer* pStreamObj = static_cast <VideoGstAnalyzer*>(data);
127+
128+ switch (GST_MESSAGE_TYPE (message)) {
129+ case GST_MESSAGE_ERROR: {
130+ GError *err;
131+ gchar *debug;
132+ gst_message_parse_error (message, &err, &debug);
133+ ELOG_ERROR (" Error: %s\n " , err->message );
134+ g_error_free (err);
135+ g_free (debug);
136+ pStreamObj->notifyAsyncEvent (" fatal" , " GStreamer pipeline error" );
137+ break ;
138+ }
139+ case GST_MESSAGE_EOS:
140+ /* end-of-stream */
141+ ELOG_ERROR (" End of stream\n " );
142+ g_main_loop_quit (pStreamObj->loop );
143+ break ;
144+ case GST_MESSAGE_TAG:{
145+ /* end-of-stream */
146+ GstTagList *tags = NULL ;
147+ gst_message_parse_tag (message, &tags);
148+
149+ ELOG_DEBUG (" Got tags from element %s:\n " , GST_OBJECT_NAME (message->src ));
150+ gst_tag_list_unref (tags);
151+ break ;
152+ }
153+ case GST_MESSAGE_QOS:{
154+ /* end-of-stream */
155+ ELOG_DEBUG (" Got QOS message from %s \n " ,message->src ->name );
156+ break ;
153157 }
158+ case GST_MESSAGE_STATE_CHANGED:{
159+ GstState old_state, new_state, pending_state;
160+ gst_message_parse_state_changed (message, &old_state, &new_state, &pending_state);
161+ ELOG_DEBUG (" State change from %d to %d, play:%d \n " ,old_state, new_state, GST_STATE_PAUSED);
162+ break ;
163+ }
164+ default :
165+ /* unhandled message */
166+ break ;
167+ }
168+ return true ;
169+ }
154170
155171void VideoGstAnalyzer::clearPipeline ()
156- {
157- if (pipeline != nullptr ){
158- gst_element_set_state (pipeline, GST_STATE_NULL);
159- gst_object_unref (GST_OBJECT (pipeline));
160- g_source_remove (m_bus_watch_id);
161- g_main_loop_unref (loop);
162- gst_object_unref (m_bus);
163- }
164-
172+ {
173+ if (pipeline != nullptr ){
174+ gst_element_set_state (pipeline, GST_STATE_NULL);
175+ gst_object_unref (GST_OBJECT (pipeline));
176+ g_source_remove (m_bus_watch_id);
177+ g_main_loop_unref (loop);
178+ gst_object_unref (m_bus);
165179 }
166180
167- int VideoGstAnalyzer::createPipeline () {
181+ }
182+
183+ void VideoGstAnalyzer::destroyPipeline ()
184+ {
185+ ELOG_DEBUG (" Closed all media in this Analyzer" );
186+ setState (GST_STATE_NULL);
187+ if (pipeline_ != nullptr && pipelineHandle != nullptr ) {
188+ destroyPlugin (pipeline_);
189+ dlclose (pipelineHandle);
190+ stopLoop ();
191+ }
192+ }
168193
194+ int VideoGstAnalyzer::createPipeline ()
195+ {
169196 pipelineHandle = dlopen (libraryName.c_str (), RTLD_LAZY);
170197 if (pipelineHandle == nullptr ) {
171198 ELOG_ERROR_T (" Failed to open the plugin.(%s)" , libraryName.c_str ());
@@ -263,7 +290,8 @@ void VideoGstAnalyzer::new_sample_from_sink (GstElement * source, gpointer data)
263290 gst_sample_unref (sample);
264291}
265292
266- int VideoGstAnalyzer::addElementMany () {
293+ int VideoGstAnalyzer::addElementMany ()
294+ {
267295 if (pipeline_){
268296 rvaStatus status = pipeline_->LinkElements ();
269297 if (status != RVA_ERR_OK) {
@@ -290,20 +318,22 @@ int VideoGstAnalyzer::addElementMany() {
290318}
291319
292320
293- void VideoGstAnalyzer::stopLoop (){
321+ void VideoGstAnalyzer::stopLoop ()
322+ {
294323 if (loop){
295324 ELOG_DEBUG (" main loop quit\n " );
296325 g_main_loop_quit (loop);
297326 }
298- g_thread_join (m_thread);
299327}
300328
301- void VideoGstAnalyzer::main_loop_thread (gpointer data){
329+ void VideoGstAnalyzer::main_loop_thread (gpointer data)
330+ {
302331 g_main_loop_run (loop);
303332 g_thread_exit (0 );
304333}
305334
306- void VideoGstAnalyzer::setState (GstState newstate) {
335+ void VideoGstAnalyzer::setState (GstState newstate)
336+ {
307337 ret = gst_element_set_state (pipeline, newstate);
308338 if (ret == GST_STATE_CHANGE_FAILURE) {
309339 ELOG_ERROR (" Unable to set the pipeline to the PLAYING state.\n " );
@@ -312,7 +342,8 @@ void VideoGstAnalyzer::setState(GstState newstate) {
312342}
313343
314344
315- int VideoGstAnalyzer::setPlaying () {
345+ int VideoGstAnalyzer::setPlaying ()
346+ {
316347
317348 setState (GST_STATE_PLAYING);
318349
@@ -321,12 +352,14 @@ int VideoGstAnalyzer::setPlaying() {
321352 return 0 ;
322353}
323354
324- void VideoGstAnalyzer::emitListenTo (int minPort, int maxPort) {
355+ void VideoGstAnalyzer::emitListenTo (int minPort, int maxPort)
356+ {
325357 ELOG_DEBUG (" Listening\n " );
326358 m_internalin.reset (new GstInternalIn ((GstAppSrc*)source, minPort, maxPort));
327359}
328360
329- void VideoGstAnalyzer::addOutput (int connectionID, owt_base::FrameDestination* out) {
361+ void VideoGstAnalyzer::addOutput (int connectionID, owt_base::FrameDestination* out)
362+ {
330363 ELOG_DEBUG (" Add analyzed stream back to OWT\n " );
331364 if (sink != nullptr ){
332365
@@ -338,8 +371,7 @@ void VideoGstAnalyzer::addOutput(int connectionID, owt_base::FrameDestination* o
338371 }
339372 m_gstinternalout->setPad (encoder_pad);
340373 }
341- // gst_pad_send_event(encoder_pad, gst_event_new_custom( GST_EVENT_CUSTOM_UPSTREAM, gst_structure_new( "GstForceKeyUnit", "all-headers", G_TYPE_BOOLEAN, TRUE, NULL)));
342- // m_internalout.push_back(out);
374+
343375 m_gstinternalout->addVideoDestination (out);
344376 if (!addlistener) {
345377 g_object_set (G_OBJECT (sink), " emit-signals" , TRUE , " sync" , FALSE , NULL );
@@ -352,22 +384,23 @@ void VideoGstAnalyzer::addOutput(int connectionID, owt_base::FrameDestination* o
352384
353385}
354386
355- void VideoGstAnalyzer::disconnect (owt_base::FrameDestination* out){
387+ void VideoGstAnalyzer::disconnect (owt_base::FrameDestination* out)
388+ {
356389 ELOG_DEBUG (" Disconnect remote connection\n " );
357- // m_internalout.remove(out);
358390 m_gstinternalout->removeVideoDestination (out);
359391}
360392
361- int VideoGstAnalyzer::getListeningPort () {
393+ int VideoGstAnalyzer::getListeningPort ()
394+ {
362395 int listeningPort;
363396 listeningPort = m_internalin->getListeningPort ();
364397 ELOG_DEBUG (" >>>>>Listen port is :%d\n " , listeningPort);
365398 return listeningPort;
366399}
367400
368401void VideoGstAnalyzer::setOutputParam (std::string codec, int width, int height,
369- int framerate, int bitrate, int kfi, std::string algo, std::string libraryName){
370-
402+ int framerate, int bitrate, int kfi, std::string algo, std::string libraryName)
403+ {
371404 this ->codec = codec;
372405 this ->width = width;
373406 this ->height = height;
0 commit comments