summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJan Alexander Steffens (heftig) <jsteffens@make.tv>2020-01-27 16:22:20 +0100
committerGStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org>2020-02-21 15:20:41 +0000
commit63ec837824aef0710067d7d5bda75bcd3018fcee (patch)
treebdd61f1e1ef1336a9447bbd27f43403fef0f0003
parenta566461294a706545e8ccbcd48aa7fb1a6fb9872 (diff)
downloadgstreamer-plugins-bad-63ec837824aef0710067d7d5bda75bcd3018fcee.tar.gz
rtmp2: Handle outgoing set chunk/window size properly
Apply outgoing sizes only after writing the chunk to the peer. This is important particularly for the set chunk size and allows exposing it without threading issues.
-rw-r--r--gst/rtmp2/rtmp/rtmpconnection.c110
1 files changed, 103 insertions, 7 deletions
diff --git a/gst/rtmp2/rtmp/rtmpconnection.c b/gst/rtmp2/rtmp/rtmpconnection.c
index 7d7837779..02d3e80ac 100644
--- a/gst/rtmp2/rtmp/rtmpconnection.c
+++ b/gst/rtmp2/rtmp/rtmpconnection.c
@@ -75,9 +75,9 @@ struct _GstRtmpConnection
/* RTMP configuration */
guint32 in_chunk_size;
- guint32 out_chunk_size;
+ guint32 out_chunk_size, out_chunk_size_pending;
guint32 in_window_ack_size;
- guint32 out_window_ack_size;
+ guint32 out_window_ack_size, out_window_ack_size_pending;
guint64 in_bytes_total;
guint64 in_bytes_acked;
@@ -121,6 +121,12 @@ static void
gst_rtmp_connection_send_ping_response (GstRtmpConnection * connection,
guint32 event_data);
+static gboolean
+gst_rtmp_connection_prepare_protocol_control (GstRtmpConnection * self,
+ GstBuffer * buffer);
+static void
+gst_rtmp_connection_apply_protocol_control (GstRtmpConnection * self);
+
typedef struct
{
gdouble transaction_id;
@@ -483,6 +489,14 @@ gst_rtmp_connection_start_write (GstRtmpConnection * self)
goto out;
}
+ if (gst_rtmp_message_is_protocol_control (message)) {
+ if (!gst_rtmp_connection_prepare_protocol_control (self, message)) {
+ GST_ERROR_OBJECT (self,
+ "Failed to prepare protocol control %" GST_PTR_FORMAT, message);
+ goto out;
+ }
+ }
+
cstream = gst_rtmp_chunk_streams_get (self->output_streams, meta->cstream);
if (!cstream) {
GST_ERROR_OBJECT (self, "Failed to get chunk stream for %" GST_PTR_FORMAT,
@@ -555,6 +569,8 @@ gst_rtmp_connection_write_buffer_done (GObject * obj,
}
GST_LOG_OBJECT (self, "write completed");
+
+ gst_rtmp_connection_apply_protocol_control (self);
gst_rtmp_connection_start_write (self);
g_object_unref (self);
}
@@ -1059,11 +1075,6 @@ gst_rtmp_connection_request_window_size (GstRtmpConnection * connection,
g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
- if (connection->out_window_ack_size == window_ack_size)
- return;
-
- connection->out_window_ack_size = window_ack_size;
-
gst_rtmp_connection_queue_message (connection,
gst_rtmp_message_new_protocol_control (&pc));
}
@@ -1078,3 +1089,88 @@ gst_rtmp_connection_set_data_frame (GstRtmpConnection * connection,
gst_buffer_prepend_memory (buffer, gst_memory_ref (set_data_frame_value));
gst_rtmp_connection_queue_message (connection, buffer);
}
+
+static gboolean
+gst_rtmp_connection_prepare_protocol_control (GstRtmpConnection * self,
+ GstBuffer * buffer)
+{
+ GstRtmpProtocolControl pc;
+
+ if (!gst_rtmp_message_parse_protocol_control (buffer, &pc)) {
+ GST_ERROR_OBJECT (self, "can't parse protocol control message");
+ return FALSE;
+ }
+
+ switch (pc.type) {
+ case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE:{
+ guint32 chunk_size = pc.param;
+
+ GST_INFO_OBJECT (self, "pending chunk size %" G_GUINT32_FORMAT,
+ chunk_size);
+
+ if (chunk_size < GST_RTMP_MINIMUM_CHUNK_SIZE) {
+ GST_ERROR_OBJECT (self,
+ "requested chunk size %" G_GUINT32_FORMAT " is too small",
+ chunk_size);
+ return FALSE;
+ }
+
+ if (chunk_size > GST_RTMP_MAXIMUM_CHUNK_SIZE) {
+ GST_ERROR_OBJECT (self,
+ "requested chunk size %" G_GUINT32_FORMAT " is too large",
+ chunk_size);
+ return FALSE;
+ }
+
+ if (chunk_size < GST_RTMP_DEFAULT_CHUNK_SIZE) {
+ GST_WARNING_OBJECT (self,
+ "requesting small chunk size %" G_GUINT32_FORMAT, chunk_size);
+ }
+
+ self->out_chunk_size_pending = pc.param;
+ break;
+ }
+
+ case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE:{
+ guint32 window_ack_size = pc.param;
+
+ GST_INFO_OBJECT (self, "pending window ack size: %" G_GUINT32_FORMAT,
+ window_ack_size);
+
+ if (window_ack_size < GST_RTMP_DEFAULT_WINDOW_ACK_SIZE) {
+ GST_WARNING_OBJECT (self,
+ "requesting small window ack size %" G_GUINT32_FORMAT,
+ window_ack_size);
+ }
+
+ self->out_window_ack_size_pending = window_ack_size;
+ break;
+ }
+
+ default:
+ break;
+ }
+
+ return TRUE;
+}
+
+static void
+gst_rtmp_connection_apply_protocol_control (GstRtmpConnection * self)
+{
+ guint32 chunk_size, window_ack_size;
+
+ chunk_size = self->out_chunk_size_pending;
+ if (chunk_size) {
+ self->out_chunk_size = chunk_size;
+ self->out_chunk_size_pending = 0;
+ GST_INFO_OBJECT (self, "applied chunk size %" G_GUINT32_FORMAT, chunk_size);
+ }
+
+ window_ack_size = self->out_window_ack_size_pending;
+ if (window_ack_size) {
+ self->out_window_ack_size = window_ack_size;
+ self->out_window_ack_size_pending = 0;
+ GST_INFO_OBJECT (self, "applied window ack size %" G_GUINT32_FORMAT,
+ window_ack_size);
+ }
+}