diff options
author | Olivier Crête <olivier.crete@collabora.com> | 2019-08-23 16:21:47 -0400 |
---|---|---|
committer | Tim-Philipp Müller <tim@centricular.com> | 2019-09-01 00:22:05 +0100 |
commit | c692f6ac4409e99e205f91b0c35b25e7d59fb897 (patch) | |
tree | 938000ac4c2cf355b2ebbb862e0a9b9fd6471cc4 /ext | |
parent | 1298e6a18668b229384c61c347e920065507228d (diff) | |
download | gstreamer-plugins-bad-c692f6ac4409e99e205f91b0c35b25e7d59fb897.tar.gz |
srtobject: Remove pointless GMainLoop
Just use srt's blocking epoll function and fix locking while we're at it.
Diffstat (limited to 'ext')
-rw-r--r-- | ext/srt/gstsrtobject.c | 215 | ||||
-rw-r--r-- | ext/srt/gstsrtobject.h | 3 |
2 files changed, 107 insertions, 111 deletions
diff --git a/ext/srt/gstsrtobject.c b/ext/srt/gstsrtobject.c index ea7bb6f72..069f255c8 100644 --- a/ext/srt/gstsrtobject.c +++ b/ext/srt/gstsrtobject.c @@ -586,102 +586,101 @@ static gpointer thread_func (gpointer data) { GstSRTObject *srtobject = data; - - g_main_loop_run (srtobject->loop); - - return NULL; -} - -static gboolean -idle_listen_source_cb (gpointer data) -{ - GstSRTObject *srtobject = data; SRTSOCKET caller_sock; - struct sockaddr caller_sa; - gsize caller_sa_len; + union + { + struct sockaddr_storage ss; + struct sockaddr sa; + } caller_sa; + int caller_sa_len; gint poll_timeout; SRTSOCKET rsock; gint rsocklen = 1; - if (!gst_structure_get_int (srtobject->parameters, "poll-timeout", - &poll_timeout)) { - poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT; - } + for (;;) { + if (!gst_structure_get_int (srtobject->parameters, "poll-timeout", + &poll_timeout)) { + poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT; + } - GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller"); + GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller"); - if (srt_epoll_wait (srtobject->listener_poll_id, &rsock, - &rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0) < 0) { - gint srt_errno = srt_getlasterror (NULL); + if (srt_epoll_wait (srtobject->listener_poll_id, &rsock, + &rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0) < 0) { + gint srt_errno = srt_getlasterror (NULL); - if (srt_errno == SRT_ETIMEOUT) { - return TRUE; - } else { - GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED, - ("abort polling: %s", srt_getlasterror_str ()), (NULL)); - return FALSE; + if (srtobject->listener_poll_id == SRT_ERROR) + return NULL; + if (srt_errno == SRT_ETIMEOUT) { + continue; + } else { + GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED, + ("abort polling: %s", srt_getlasterror_str ()), (NULL)); + return NULL; + } } - } - caller_sock = - srt_accept (srtobject->listener_sock, &caller_sa, (int *) &caller_sa_len); + caller_sock = + srt_accept (srtobject->listener_sock, &caller_sa.sa, &caller_sa_len); - if (caller_sock != SRT_INVALID_SOCK) { - SRTCaller *caller; - gint flag = SRT_EPOLL_ERR; - - caller = srt_caller_new (); - caller->sockaddr = - g_socket_address_new_from_native (&caller_sa, caller_sa_len); - caller->poll_id = srt_epoll_create (); - caller->sock = caller_sock; - - if (gst_uri_handler_get_uri_type (GST_URI_HANDLER - (srtobject->element)) == GST_URI_SRC) { - flag |= SRT_EPOLL_IN; - } else { - flag |= SRT_EPOLL_OUT; - } + if (caller_sock != SRT_INVALID_SOCK) { + SRTCaller *caller; + gint flag = SRT_EPOLL_ERR; - if (srt_epoll_add_usock (caller->poll_id, caller_sock, &flag)) { + caller = srt_caller_new (); + caller->sockaddr = + g_socket_address_new_from_native (&caller_sa.sa, caller_sa_len); + caller->poll_id = srt_epoll_create (); + caller->sock = caller_sock; - GST_ELEMENT_ERROR (srtobject->element, RESOURCE, SETTINGS, - ("%s", srt_getlasterror_str ()), (NULL)); + if (gst_uri_handler_get_uri_type (GST_URI_HANDLER + (srtobject->element)) == GST_URI_SRC) { + flag |= SRT_EPOLL_IN; + } else { + flag |= SRT_EPOLL_OUT; + } - srt_caller_free (caller); + if (srt_epoll_add_usock (caller->poll_id, caller_sock, &flag)) { - /* try-again */ - return TRUE; - } + GST_ELEMENT_ERROR (srtobject->element, RESOURCE, SETTINGS, + ("%s", srt_getlasterror_str ()), (NULL)); - GST_OBJECT_LOCK (srtobject->element); - srtobject->callers = g_list_append (srtobject->callers, caller); - g_cond_signal (&srtobject->sock_cond); - GST_OBJECT_UNLOCK (srtobject->element); + srt_caller_free (caller); - /* notifying caller-added */ - if (srtobject->caller_added_closure != NULL) { - GValue values[2] = { G_VALUE_INIT, G_VALUE_INIT }; + /* try-again */ + continue; + } - g_value_init (&values[0], G_TYPE_INT); - g_value_set_int (&values[0], caller->sock); + GST_OBJECT_LOCK (srtobject->element); + srtobject->callers = g_list_append (srtobject->callers, caller); + g_cond_signal (&srtobject->sock_cond); + GST_OBJECT_UNLOCK (srtobject->element); - g_value_init (&values[1], G_TYPE_SOCKET_ADDRESS); - g_value_set_object (&values[1], caller->sockaddr); + /* notifying caller-added */ + if (srtobject->caller_added_closure != NULL) { + GValue values[2] = { G_VALUE_INIT, G_VALUE_INIT }; - g_closure_invoke (srtobject->caller_added_closure, NULL, 2, values, NULL); + g_value_init (&values[0], G_TYPE_INT); + g_value_set_int (&values[0], caller->sock); - g_value_unset (&values[1]); - } + g_value_init (&values[1], G_TYPE_SOCKET_ADDRESS); + g_value_set_object (&values[1], caller->sockaddr); - GST_DEBUG_OBJECT (srtobject->element, "Accept to connect"); - } + g_closure_invoke (srtobject->caller_added_closure, NULL, 2, values, + NULL); + + g_value_unset (&values[1]); + } - /* only one caller is allowed if the element is source. */ - return gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element)) != - GST_URI_SRC; + GST_DEBUG_OBJECT (srtobject->element, "Accept to connect"); + + if (gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element)) == + GST_URI_SRC) + return NULL; + } + } } static gboolean @@ -752,15 +751,6 @@ gst_srt_object_wait_connect (GstSRTObject * srtobject, srtobject->listener_sock = sock; - srtobject->context = g_main_context_new (); - srtobject->loop = g_main_loop_new (srtobject->context, TRUE); - - srtobject->listener_source = g_idle_source_new (); - g_source_set_callback (srtobject->listener_source, - (GSourceFunc) idle_listen_source_cb, srtobject, NULL); - - g_source_attach (srtobject->listener_source, srtobject->context); - srtobject->thread = g_thread_try_new ("GstSRTObjectListener", thread_func, srtobject, error); @@ -772,9 +762,6 @@ gst_srt_object_wait_connect (GstSRTObject * srtobject, failed: - g_clear_pointer (&srtobject->loop, g_main_loop_unref); - g_clear_pointer (&srtobject->context, g_main_context_unref); - if (srtobject->listener_poll_id != SRT_ERROR) { srt_epoll_release (srtobject->listener_poll_id); } @@ -1019,6 +1006,7 @@ out: void gst_srt_object_close (GstSRTObject * srtobject) { + GST_OBJECT_LOCK (srtobject->element); if (srtobject->poll_id != SRT_ERROR) { srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock); } @@ -1032,20 +1020,16 @@ gst_srt_object_close (GstSRTObject * srtobject) srtobject->sock = SRT_INVALID_SOCK; } - if (srtobject->loop) { - g_main_loop_quit (srtobject->loop); - - if (srtobject->listener_poll_id != SRT_ERROR) { - srt_epoll_remove_usock (srtobject->listener_poll_id, - srtobject->listener_sock); - srtobject->listener_poll_id = SRT_ERROR; - } - - g_thread_join (srtobject->thread); - - g_clear_pointer (&srtobject->thread, g_thread_unref); - g_clear_pointer (&srtobject->loop, g_main_loop_unref); - g_clear_pointer (&srtobject->context, g_main_context_unref); + if (srtobject->listener_poll_id != SRT_ERROR) { + srt_epoll_remove_usock (srtobject->listener_poll_id, + srtobject->listener_sock); + srtobject->listener_poll_id = SRT_ERROR; + } + if (srtobject->thread) { + GThread *thread = g_steal_pointer (&srtobject->thread); + GST_OBJECT_UNLOCK (srtobject->element); + g_thread_join (thread); + GST_OBJECT_LOCK (srtobject->element); } if (srtobject->listener_sock != SRT_INVALID_SOCK) { @@ -1056,14 +1040,20 @@ gst_srt_object_close (GstSRTObject * srtobject) srtobject->listener_sock = SRT_INVALID_SOCK; } - g_list_foreach (srtobject->callers, (GFunc) srt_caller_invoke_removed_closure, - srtobject); - g_list_free_full (srtobject->callers, (GDestroyNotify) srt_caller_free); + if (srtobject->callers) { + GList *callers = g_steal_pointer (&srtobject->callers); + GST_OBJECT_UNLOCK (srtobject->element); + g_list_foreach (callers, (GFunc) srt_caller_invoke_removed_closure, + srtobject); + GST_OBJECT_LOCK (srtobject->element); + g_list_free_full (callers, (GDestroyNotify) srt_caller_free); + } g_clear_pointer (&srtobject->caller_added_closure, g_closure_unref); g_clear_pointer (&srtobject->caller_removed_closure, g_closure_unref); srtobject->opened = FALSE; + GST_OBJECT_UNLOCK (srtobject->element); } static gboolean @@ -1076,7 +1066,7 @@ gst_srt_object_wait_caller (GstSRTObject * srtobject, GST_OBJECT_LOCK (srtobject->element); while (!g_cancellable_is_cancelled (cancellable)) { - ret = g_list_length (srtobject->callers) >= 1; + ret = (srtobject->callers != NULL); if (ret) break; g_cond_wait (&srtobject->sock_cond, @@ -1096,7 +1086,7 @@ gst_srt_object_read (GstSRTObject * srtobject, gssize len = 0; gint poll_timeout; GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE; - gint poll_id; + gint poll_id = SRT_ERROR; /* Only source element can read data */ g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER @@ -1111,9 +1101,13 @@ gst_srt_object_read (GstSRTObject * srtobject, if (!gst_srt_object_wait_caller (srtobject, cancellable, error)) return -1; + GST_OBJECT_LOCK (srtobject->element); caller = srtobject->callers->data; - poll_id = caller->poll_id; - + if (srtobject->callers) + poll_id = caller->poll_id; + GST_OBJECT_UNLOCK (srtobject->element); + if (poll_id == SRT_ERROR) + return 0; } else { poll_id = srtobject->poll_id; } @@ -1127,9 +1121,16 @@ gst_srt_object_read (GstSRTObject * srtobject, SRTSOCKET rsock; gint rsocklen = 1; + int pollret; - if (srt_epoll_wait (poll_id, &rsock, - &rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0) < 0) { + pollret = srt_epoll_wait (poll_id, &rsock, + &rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0); + if (pollret < 0) { + gint srt_errno = srt_getlasterror (NULL); + + if (srt_errno != SRT_ETIMEOUT) { + return 0; + } continue; } @@ -1288,9 +1289,7 @@ gst_srt_object_write_to_callers (GstSRTObject * srtobject, err: srtobject->callers = g_list_remove (srtobject->callers, caller); srt_caller_invoke_removed_closure (caller, srtobject); - GST_OBJECT_UNLOCK (srtobject->element); srt_caller_free (caller); - GST_OBJECT_LOCK (srtobject->element); } GST_OBJECT_UNLOCK (srtobject->element); diff --git a/ext/srt/gstsrtobject.h b/ext/srt/gstsrtobject.h index 86bc413c4..06d4598dd 100644 --- a/ext/srt/gstsrtobject.h +++ b/ext/srt/gstsrtobject.h @@ -61,9 +61,6 @@ struct _GstSRTObject SRTSOCKET listener_sock; gint listener_poll_id; - GMainLoop *loop; - GMainContext *context; - GSource *listener_source; GThread *thread; GList *callers; |