summaryrefslogtreecommitdiff
path: root/gst
diff options
context:
space:
mode:
authorJan Alexander Steffens (heftig) <jan.steffens@ltnglobal.com>2020-09-01 13:28:44 +0200
committerTim-Philipp Müller <tim@centricular.com>2020-10-02 01:09:19 +0100
commit273ae7cd1e4a5a697d60d07d6da61c9c994a2169 (patch)
tree6706f94377039ce80f3f51754024082adbd0487f /gst
parentfa4a72033d7297d3ed03c0765fd4481637aa27c8 (diff)
downloadgstreamer-plugins-bad-273ae7cd1e4a5a697d60d07d6da61c9c994a2169.tar.gz
rtmp2: Replace stats queue with stats lock
Making the thread receiving the stats wait on the loop to respond was not a good idea, as the latter can get blocked on the streaming thread. Have get_stats read the values directly, adding a lock to ensure we don't read garbage. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/1629>
Diffstat (limited to 'gst')
-rw-r--r--gst/rtmp2/gstrtmp2sink.c13
-rw-r--r--gst/rtmp2/gstrtmp2src.c13
-rw-r--r--gst/rtmp2/rtmp/rtmpconnection.c58
3 files changed, 38 insertions, 46 deletions
diff --git a/gst/rtmp2/gstrtmp2sink.c b/gst/rtmp2/gstrtmp2sink.c
index 6a105ee9a..9140738c8 100644
--- a/gst/rtmp2/gstrtmp2sink.c
+++ b/gst/rtmp2/gstrtmp2sink.c
@@ -1147,18 +1147,7 @@ gst_rtmp2_sink_get_stats (GstRtmp2Sink * self)
g_mutex_lock (&self->lock);
if (self->connection) {
- GstRtmpConnection *connection = g_object_ref (self->connection);
-
- g_mutex_unlock (&self->lock);
-
- /* We need to do this without holding the lock as the g_async_queue_pop
- * waits on the loop thread to deliver the stats. The loop thread might
- * attempt to take the lock as well, leading to a deadlock. */
- s = gst_rtmp_connection_get_stats (connection);
-
- g_mutex_lock (&self->lock);
-
- g_object_unref (connection);
+ s = gst_rtmp_connection_get_stats (self->connection);
} else if (self->stats) {
s = gst_structure_copy (self->stats);
} else {
diff --git a/gst/rtmp2/gstrtmp2src.c b/gst/rtmp2/gstrtmp2src.c
index 2b7a05f98..f5c356b11 100644
--- a/gst/rtmp2/gstrtmp2src.c
+++ b/gst/rtmp2/gstrtmp2src.c
@@ -1008,18 +1008,7 @@ gst_rtmp2_src_get_stats (GstRtmp2Src * self)
g_mutex_lock (&self->lock);
if (self->connection) {
- GstRtmpConnection *connection = g_object_ref (self->connection);
-
- g_mutex_unlock (&self->lock);
-
- /* We need to do this without holding the lock as the g_async_queue_pop
- * waits on the loop thread to deliver the stats. The loop thread might
- * attempt to take the lock as well, leading to a deadlock. */
- s = gst_rtmp_connection_get_stats (connection);
-
- g_mutex_lock (&self->lock);
-
- g_object_unref (connection);
+ s = gst_rtmp_connection_get_stats (self->connection);
} else if (self->stats) {
s = gst_structure_copy (self->stats);
} else {
diff --git a/gst/rtmp2/rtmp/rtmpconnection.c b/gst/rtmp2/rtmp/rtmpconnection.c
index b9fe7675a..22d1e69eb 100644
--- a/gst/rtmp2/rtmp/rtmpconnection.c
+++ b/gst/rtmp2/rtmp/rtmpconnection.c
@@ -52,7 +52,7 @@ struct _GstRtmpConnection
GSocketConnection *connection;
GCancellable *cancellable;
GSocketClient *socket_client;
- GAsyncQueue *output_queue, *stats_queue;
+ GAsyncQueue *output_queue;
GMainContext *main_context;
GSource *input_source;
@@ -73,6 +73,12 @@ struct _GstRtmpConnection
gboolean writing;
+ /* Protects the values below during concurrent access.
+ * - Taken by the loop thread when writing, but not reading.
+ * - Taken by other threads when reading (calling get_stats).
+ */
+ GMutex stats_lock;
+
/* RTMP configuration */
guint32 in_chunk_size;
guint32 out_chunk_size, out_chunk_size_pending;
@@ -248,8 +254,6 @@ gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
rtmpconnection->cancellable = g_cancellable_new ();
rtmpconnection->output_queue =
g_async_queue_new_full ((GDestroyNotify) gst_buffer_unref);
- rtmpconnection->stats_queue =
- g_async_queue_new_full ((GDestroyNotify) gst_structure_free);
rtmpconnection->input_streams = gst_rtmp_chunk_streams_new ();
rtmpconnection->output_streams = gst_rtmp_chunk_streams_new ();
@@ -258,6 +262,8 @@ gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
rtmpconnection->input_bytes = g_byte_array_sized_new (2 * READ_SIZE);
rtmpconnection->input_needed_bytes = 1;
+
+ g_mutex_init (&rtmpconnection->stats_lock);
}
void
@@ -284,10 +290,10 @@ gst_rtmp_connection_finalize (GObject * object)
/* clean up object here */
+ g_mutex_clear (&rtmpconnection->stats_lock);
g_clear_object (&rtmpconnection->cancellable);
g_clear_object (&rtmpconnection->connection);
g_clear_pointer (&rtmpconnection->output_queue, g_async_queue_unref);
- g_clear_pointer (&rtmpconnection->stats_queue, g_async_queue_unref);
g_clear_pointer (&rtmpconnection->input_streams, gst_rtmp_chunk_streams_free);
g_clear_pointer (&rtmpconnection->output_streams,
gst_rtmp_chunk_streams_free);
@@ -468,7 +474,10 @@ gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data)
GST_TRACE_OBJECT (sc, "read %" G_GSIZE_FORMAT " bytes", ret);
+ g_mutex_lock (&sc->stats_lock);
sc->in_bytes_total += ret;
+ g_mutex_unlock (&sc->stats_lock);
+
bytes_since_ack = sc->in_bytes_total - sc->in_bytes_acked;
if (sc->in_window_ack_size && bytes_since_ack >= sc->in_window_ack_size) {
gst_rtmp_connection_send_ack (sc);
@@ -569,7 +578,9 @@ gst_rtmp_connection_write_buffer_done (GObject * obj,
res = gst_rtmp_output_stream_write_all_buffer_finish (os, result,
&bytes_written, &error);
+ g_mutex_lock (&self->stats_lock);
self->out_bytes_total += bytes_written;
+ g_mutex_unlock (&self->stats_lock);
if (!res) {
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
@@ -923,7 +934,9 @@ gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self,
"peer requested small chunk size %" G_GUINT32_FORMAT, chunk_size);
}
+ g_mutex_lock (&self->stats_lock);
self->in_chunk_size = chunk_size;
+ g_mutex_unlock (&self->stats_lock);
}
static void
@@ -948,7 +961,9 @@ gst_rtmp_connection_handle_ack (GstRtmpConnection * self, guint32 bytes)
GST_LOG_OBJECT (self, "Peer acknowledged %" G_GUINT64_FORMAT " bytes",
new_ack - last_ack);
+ g_mutex_lock (&self->stats_lock);
self->out_bytes_acked = new_ack;
+ g_mutex_unlock (&self->stats_lock);
}
static void
@@ -961,7 +976,9 @@ gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection * self,
window_ack_size);
}
+ g_mutex_lock (&self->stats_lock);
self->in_window_ack_size = window_ack_size;
+ g_mutex_unlock (&self->stats_lock);
}
static gboolean
@@ -1173,7 +1190,9 @@ gst_rtmp_connection_send_ack (GstRtmpConnection * connection)
gst_rtmp_connection_queue_message (connection,
gst_rtmp_message_new_protocol_control (&pc));
+ g_mutex_lock (&connection->stats_lock);
connection->in_bytes_acked = in_bytes_total;
+ g_mutex_unlock (&connection->stats_lock);
}
static void
@@ -1301,15 +1320,23 @@ gst_rtmp_connection_apply_protocol_control (GstRtmpConnection * self)
chunk_size = self->out_chunk_size_pending;
if (chunk_size) {
- self->out_chunk_size = chunk_size;
self->out_chunk_size_pending = 0;
+
+ g_mutex_lock (&self->stats_lock);
+ self->out_chunk_size = chunk_size;
+ g_mutex_unlock (&self->stats_lock);
+
GST_INFO_OBJECT (self, "applied chunk size %" G_GUINT32_FORMAT, chunk_size);
}
window_ack_size = self->out_window_ack_size_pending;
if (window_ack_size) {
- self->out_window_ack_size = window_ack_size;
self->out_window_ack_size_pending = 0;
+
+ g_mutex_lock (&self->stats_lock);
+ self->out_window_ack_size = window_ack_size;
+ g_mutex_unlock (&self->stats_lock);
+
GST_INFO_OBJECT (self, "applied window ack size %" G_GUINT32_FORMAT,
window_ack_size);
}
@@ -1335,14 +1362,6 @@ gst_rtmp_connection_get_null_stats (void)
return get_stats (NULL);
}
-static gboolean
-get_stats_invoker (gpointer ptr)
-{
- GstRtmpConnection *self = ptr;
- g_async_queue_push (self->stats_queue, get_stats (self));
- return G_SOURCE_REMOVE;
-}
-
GstStructure *
gst_rtmp_connection_get_stats (GstRtmpConnection * self)
{
@@ -1350,14 +1369,9 @@ gst_rtmp_connection_get_stats (GstRtmpConnection * self)
g_return_val_if_fail (GST_IS_RTMP_CONNECTION (self), NULL);
- if (g_main_context_acquire (self->main_context)) {
- s = get_stats (self);
- g_main_context_release (self->main_context);
- } else {
- g_main_context_invoke_full (self->main_context, G_PRIORITY_HIGH,
- get_stats_invoker, g_object_ref (self), g_object_unref);
- s = g_async_queue_pop (self->stats_queue);
- }
+ g_mutex_lock (&self->stats_lock);
+ s = get_stats (self);
+ g_mutex_unlock (&self->stats_lock);
return s;
}