summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
authorOlivier Crête <olivier.crete@collabora.com>2019-08-23 16:21:47 -0400
committerTim-Philipp Müller <tim@centricular.com>2019-09-01 00:22:05 +0100
commitc692f6ac4409e99e205f91b0c35b25e7d59fb897 (patch)
tree938000ac4c2cf355b2ebbb862e0a9b9fd6471cc4 /ext
parent1298e6a18668b229384c61c347e920065507228d (diff)
downloadgstreamer-plugins-bad-c692f6ac4409e99e205f91b0c35b25e7d59fb897.tar.gz
srtobject: Remove pointless GMainLoop
Just use srt's blocking epoll function and fix locking while we're at it.
Diffstat (limited to 'ext')
-rw-r--r--ext/srt/gstsrtobject.c215
-rw-r--r--ext/srt/gstsrtobject.h3
2 files changed, 107 insertions, 111 deletions
diff --git a/ext/srt/gstsrtobject.c b/ext/srt/gstsrtobject.c
index ea7bb6f72..069f255c8 100644
--- a/ext/srt/gstsrtobject.c
+++ b/ext/srt/gstsrtobject.c
@@ -586,102 +586,101 @@ static gpointer
thread_func (gpointer data)
{
GstSRTObject *srtobject = data;
-
- g_main_loop_run (srtobject->loop);
-
- return NULL;
-}
-
-static gboolean
-idle_listen_source_cb (gpointer data)
-{
- GstSRTObject *srtobject = data;
SRTSOCKET caller_sock;
- struct sockaddr caller_sa;
- gsize caller_sa_len;
+ union
+ {
+ struct sockaddr_storage ss;
+ struct sockaddr sa;
+ } caller_sa;
+ int caller_sa_len;
gint poll_timeout;
SRTSOCKET rsock;
gint rsocklen = 1;
- if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
- &poll_timeout)) {
- poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
- }
+ for (;;) {
+ if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
+ &poll_timeout)) {
+ poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
+ }
- GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller");
+ GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller");
- if (srt_epoll_wait (srtobject->listener_poll_id, &rsock,
- &rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0) < 0) {
- gint srt_errno = srt_getlasterror (NULL);
+ if (srt_epoll_wait (srtobject->listener_poll_id, &rsock,
+ &rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0) < 0) {
+ gint srt_errno = srt_getlasterror (NULL);
- if (srt_errno == SRT_ETIMEOUT) {
- return TRUE;
- } else {
- GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED,
- ("abort polling: %s", srt_getlasterror_str ()), (NULL));
- return FALSE;
+ if (srtobject->listener_poll_id == SRT_ERROR)
+ return NULL;
+ if (srt_errno == SRT_ETIMEOUT) {
+ continue;
+ } else {
+ GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED,
+ ("abort polling: %s", srt_getlasterror_str ()), (NULL));
+ return NULL;
+ }
}
- }
- caller_sock =
- srt_accept (srtobject->listener_sock, &caller_sa, (int *) &caller_sa_len);
+ caller_sock =
+ srt_accept (srtobject->listener_sock, &caller_sa.sa, &caller_sa_len);
- if (caller_sock != SRT_INVALID_SOCK) {
- SRTCaller *caller;
- gint flag = SRT_EPOLL_ERR;
-
- caller = srt_caller_new ();
- caller->sockaddr =
- g_socket_address_new_from_native (&caller_sa, caller_sa_len);
- caller->poll_id = srt_epoll_create ();
- caller->sock = caller_sock;
-
- if (gst_uri_handler_get_uri_type (GST_URI_HANDLER
- (srtobject->element)) == GST_URI_SRC) {
- flag |= SRT_EPOLL_IN;
- } else {
- flag |= SRT_EPOLL_OUT;
- }
+ if (caller_sock != SRT_INVALID_SOCK) {
+ SRTCaller *caller;
+ gint flag = SRT_EPOLL_ERR;
- if (srt_epoll_add_usock (caller->poll_id, caller_sock, &flag)) {
+ caller = srt_caller_new ();
+ caller->sockaddr =
+ g_socket_address_new_from_native (&caller_sa.sa, caller_sa_len);
+ caller->poll_id = srt_epoll_create ();
+ caller->sock = caller_sock;
- GST_ELEMENT_ERROR (srtobject->element, RESOURCE, SETTINGS,
- ("%s", srt_getlasterror_str ()), (NULL));
+ if (gst_uri_handler_get_uri_type (GST_URI_HANDLER
+ (srtobject->element)) == GST_URI_SRC) {
+ flag |= SRT_EPOLL_IN;
+ } else {
+ flag |= SRT_EPOLL_OUT;
+ }
- srt_caller_free (caller);
+ if (srt_epoll_add_usock (caller->poll_id, caller_sock, &flag)) {
- /* try-again */
- return TRUE;
- }
+ GST_ELEMENT_ERROR (srtobject->element, RESOURCE, SETTINGS,
+ ("%s", srt_getlasterror_str ()), (NULL));
- GST_OBJECT_LOCK (srtobject->element);
- srtobject->callers = g_list_append (srtobject->callers, caller);
- g_cond_signal (&srtobject->sock_cond);
- GST_OBJECT_UNLOCK (srtobject->element);
+ srt_caller_free (caller);
- /* notifying caller-added */
- if (srtobject->caller_added_closure != NULL) {
- GValue values[2] = { G_VALUE_INIT, G_VALUE_INIT };
+ /* try-again */
+ continue;
+ }
- g_value_init (&values[0], G_TYPE_INT);
- g_value_set_int (&values[0], caller->sock);
+ GST_OBJECT_LOCK (srtobject->element);
+ srtobject->callers = g_list_append (srtobject->callers, caller);
+ g_cond_signal (&srtobject->sock_cond);
+ GST_OBJECT_UNLOCK (srtobject->element);
- g_value_init (&values[1], G_TYPE_SOCKET_ADDRESS);
- g_value_set_object (&values[1], caller->sockaddr);
+ /* notifying caller-added */
+ if (srtobject->caller_added_closure != NULL) {
+ GValue values[2] = { G_VALUE_INIT, G_VALUE_INIT };
- g_closure_invoke (srtobject->caller_added_closure, NULL, 2, values, NULL);
+ g_value_init (&values[0], G_TYPE_INT);
+ g_value_set_int (&values[0], caller->sock);
- g_value_unset (&values[1]);
- }
+ g_value_init (&values[1], G_TYPE_SOCKET_ADDRESS);
+ g_value_set_object (&values[1], caller->sockaddr);
- GST_DEBUG_OBJECT (srtobject->element, "Accept to connect");
- }
+ g_closure_invoke (srtobject->caller_added_closure, NULL, 2, values,
+ NULL);
+
+ g_value_unset (&values[1]);
+ }
- /* only one caller is allowed if the element is source. */
- return gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element)) !=
- GST_URI_SRC;
+ GST_DEBUG_OBJECT (srtobject->element, "Accept to connect");
+
+ if (gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element)) ==
+ GST_URI_SRC)
+ return NULL;
+ }
+ }
}
static gboolean
@@ -752,15 +751,6 @@ gst_srt_object_wait_connect (GstSRTObject * srtobject,
srtobject->listener_sock = sock;
- srtobject->context = g_main_context_new ();
- srtobject->loop = g_main_loop_new (srtobject->context, TRUE);
-
- srtobject->listener_source = g_idle_source_new ();
- g_source_set_callback (srtobject->listener_source,
- (GSourceFunc) idle_listen_source_cb, srtobject, NULL);
-
- g_source_attach (srtobject->listener_source, srtobject->context);
-
srtobject->thread =
g_thread_try_new ("GstSRTObjectListener", thread_func, srtobject, error);
@@ -772,9 +762,6 @@ gst_srt_object_wait_connect (GstSRTObject * srtobject,
failed:
- g_clear_pointer (&srtobject->loop, g_main_loop_unref);
- g_clear_pointer (&srtobject->context, g_main_context_unref);
-
if (srtobject->listener_poll_id != SRT_ERROR) {
srt_epoll_release (srtobject->listener_poll_id);
}
@@ -1019,6 +1006,7 @@ out:
void
gst_srt_object_close (GstSRTObject * srtobject)
{
+ GST_OBJECT_LOCK (srtobject->element);
if (srtobject->poll_id != SRT_ERROR) {
srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock);
}
@@ -1032,20 +1020,16 @@ gst_srt_object_close (GstSRTObject * srtobject)
srtobject->sock = SRT_INVALID_SOCK;
}
- if (srtobject->loop) {
- g_main_loop_quit (srtobject->loop);
-
- if (srtobject->listener_poll_id != SRT_ERROR) {
- srt_epoll_remove_usock (srtobject->listener_poll_id,
- srtobject->listener_sock);
- srtobject->listener_poll_id = SRT_ERROR;
- }
-
- g_thread_join (srtobject->thread);
-
- g_clear_pointer (&srtobject->thread, g_thread_unref);
- g_clear_pointer (&srtobject->loop, g_main_loop_unref);
- g_clear_pointer (&srtobject->context, g_main_context_unref);
+ if (srtobject->listener_poll_id != SRT_ERROR) {
+ srt_epoll_remove_usock (srtobject->listener_poll_id,
+ srtobject->listener_sock);
+ srtobject->listener_poll_id = SRT_ERROR;
+ }
+ if (srtobject->thread) {
+ GThread *thread = g_steal_pointer (&srtobject->thread);
+ GST_OBJECT_UNLOCK (srtobject->element);
+ g_thread_join (thread);
+ GST_OBJECT_LOCK (srtobject->element);
}
if (srtobject->listener_sock != SRT_INVALID_SOCK) {
@@ -1056,14 +1040,20 @@ gst_srt_object_close (GstSRTObject * srtobject)
srtobject->listener_sock = SRT_INVALID_SOCK;
}
- g_list_foreach (srtobject->callers, (GFunc) srt_caller_invoke_removed_closure,
- srtobject);
- g_list_free_full (srtobject->callers, (GDestroyNotify) srt_caller_free);
+ if (srtobject->callers) {
+ GList *callers = g_steal_pointer (&srtobject->callers);
+ GST_OBJECT_UNLOCK (srtobject->element);
+ g_list_foreach (callers, (GFunc) srt_caller_invoke_removed_closure,
+ srtobject);
+ GST_OBJECT_LOCK (srtobject->element);
+ g_list_free_full (callers, (GDestroyNotify) srt_caller_free);
+ }
g_clear_pointer (&srtobject->caller_added_closure, g_closure_unref);
g_clear_pointer (&srtobject->caller_removed_closure, g_closure_unref);
srtobject->opened = FALSE;
+ GST_OBJECT_UNLOCK (srtobject->element);
}
static gboolean
@@ -1076,7 +1066,7 @@ gst_srt_object_wait_caller (GstSRTObject * srtobject,
GST_OBJECT_LOCK (srtobject->element);
while (!g_cancellable_is_cancelled (cancellable)) {
- ret = g_list_length (srtobject->callers) >= 1;
+ ret = (srtobject->callers != NULL);
if (ret)
break;
g_cond_wait (&srtobject->sock_cond,
@@ -1096,7 +1086,7 @@ gst_srt_object_read (GstSRTObject * srtobject,
gssize len = 0;
gint poll_timeout;
GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
- gint poll_id;
+ gint poll_id = SRT_ERROR;
/* Only source element can read data */
g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
@@ -1111,9 +1101,13 @@ gst_srt_object_read (GstSRTObject * srtobject,
if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
return -1;
+ GST_OBJECT_LOCK (srtobject->element);
caller = srtobject->callers->data;
- poll_id = caller->poll_id;
-
+ if (srtobject->callers)
+ poll_id = caller->poll_id;
+ GST_OBJECT_UNLOCK (srtobject->element);
+ if (poll_id == SRT_ERROR)
+ return 0;
} else {
poll_id = srtobject->poll_id;
}
@@ -1127,9 +1121,16 @@ gst_srt_object_read (GstSRTObject * srtobject,
SRTSOCKET rsock;
gint rsocklen = 1;
+ int pollret;
- if (srt_epoll_wait (poll_id, &rsock,
- &rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0) < 0) {
+ pollret = srt_epoll_wait (poll_id, &rsock,
+ &rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0);
+ if (pollret < 0) {
+ gint srt_errno = srt_getlasterror (NULL);
+
+ if (srt_errno != SRT_ETIMEOUT) {
+ return 0;
+ }
continue;
}
@@ -1288,9 +1289,7 @@ gst_srt_object_write_to_callers (GstSRTObject * srtobject,
err:
srtobject->callers = g_list_remove (srtobject->callers, caller);
srt_caller_invoke_removed_closure (caller, srtobject);
- GST_OBJECT_UNLOCK (srtobject->element);
srt_caller_free (caller);
- GST_OBJECT_LOCK (srtobject->element);
}
GST_OBJECT_UNLOCK (srtobject->element);
diff --git a/ext/srt/gstsrtobject.h b/ext/srt/gstsrtobject.h
index 86bc413c4..06d4598dd 100644
--- a/ext/srt/gstsrtobject.h
+++ b/ext/srt/gstsrtobject.h
@@ -61,9 +61,6 @@ struct _GstSRTObject
SRTSOCKET listener_sock;
gint listener_poll_id;
- GMainLoop *loop;
- GMainContext *context;
- GSource *listener_source;
GThread *thread;
GList *callers;