diff options
Diffstat (limited to 'gst')
-rw-r--r-- | gst/rtmp2/gstrtmp2sink.c | 90 | ||||
-rw-r--r-- | gst/rtmp2/rtmp/rtmpclient.c | 38 | ||||
-rw-r--r-- | gst/rtmp2/rtmp/rtmpclient.h | 11 |
3 files changed, 139 insertions, 0 deletions
diff --git a/gst/rtmp2/gstrtmp2sink.c b/gst/rtmp2/gstrtmp2sink.c index 6a105ee9a..391c92415 100644 --- a/gst/rtmp2/gstrtmp2sink.c +++ b/gst/rtmp2/gstrtmp2sink.c @@ -88,6 +88,8 @@ typedef struct GPtrArray *headers; guint64 last_ts, base_ts; /* timestamp fixup */ + + GstRtmpStopCommands stop_commands; } GstRtmp2Sink; typedef struct @@ -106,6 +108,7 @@ static void gst_rtmp2_sink_uri_handler_init (GstURIHandlerInterface * iface); /* GstBaseSink virtual functions */ static gboolean gst_rtmp2_sink_start (GstBaseSink * sink); static gboolean gst_rtmp2_sink_stop (GstBaseSink * sink); +static gboolean gst_rtmp2_sink_event (GstBaseSink * sink, GstEvent * event); static gboolean gst_rtmp2_sink_unlock (GstBaseSink * sink); static gboolean gst_rtmp2_sink_unlock_stop (GstBaseSink * sink); static GstFlowReturn gst_rtmp2_sink_render (GstBaseSink * sink, @@ -147,8 +150,35 @@ enum PROP_PEAK_KBPS, PROP_CHUNK_SIZE, PROP_STATS, + PROP_STOP_COMMANDS, }; +#define DEFAULT_STOP_COMMANDS GST_RTMP_STOP_COMMAND_FCUNPUBLISH | \ + GST_RTMP_STOP_COMMAND_DELETE_STREAM /* FCUnpublish + deleteStream */ + +#define GST_RTMP_STOP_COMMANDS_TYPE \ + (gst_rtmp2_sink_stop_commands_get_type()) + +static GType +gst_rtmp2_sink_stop_commands_get_type (void) +{ + static GType type = 0; + static const GFlagsValue types[] = { + {GST_RTMP_STOP_COMMAND_NONE, "No command", "none"}, + {GST_RTMP_STOP_COMMAND_FCUNPUBLISH, "FCUnpublish", "fcunpublish"}, + {GST_RTMP_STOP_COMMAND_CLOSE_STREAM, "closeStream", "closestream"}, + {GST_RTMP_STOP_COMMAND_DELETE_STREAM, "deleteStream", "deletestream"}, + {0, NULL, NULL}, + }; + + if (g_once_init_enter (&type)) { + GType tmp = g_flags_register_static ("GstRtmp2SinkStopCommandsFlags", + types); + g_once_init_leave (&type, tmp); + } + return type; +} + /* pad templates */ static GstStaticPadTemplate gst_rtmp2_sink_sink_template = @@ -183,6 +213,7 @@ gst_rtmp2_sink_class_init (GstRtmp2SinkClass * klass) gobject_class->finalize = gst_rtmp2_sink_finalize; base_sink_class->start = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_start); base_sink_class->stop = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_stop); + base_sink_class->event = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_event); base_sink_class->unlock = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_unlock); base_sink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_unlock_stop); base_sink_class->render = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_render); @@ -227,6 +258,12 @@ gst_rtmp2_sink_class_init (GstRtmp2SinkClass * klass) g_param_spec_boxed ("stats", "Stats", "Retrieve a statistics structure", GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_STOP_COMMANDS, + g_param_spec_flags ("stop-commands", "Stop commands", + "RTMP commands to send on EOS event before closing connection", + GST_RTMP_STOP_COMMANDS_TYPE, DEFAULT_STOP_COMMANDS, + (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS))); + gst_type_mark_as_plugin_api (GST_TYPE_RTMP_LOCATION_HANDLER, 0); GST_DEBUG_CATEGORY_INIT (gst_rtmp2_sink_debug_category, "rtmp2sink", 0, "debug category for rtmp2sink element"); @@ -249,6 +286,8 @@ gst_rtmp2_sink_init (GstRtmp2Sink * self) self->headers = g_ptr_array_new_with_free_func ((GDestroyNotify) gst_mini_object_unref); + + self->stop_commands = DEFAULT_STOP_COMMANDS; } static void @@ -360,6 +399,11 @@ gst_rtmp2_sink_set_property (GObject * object, guint property_id, set_chunk_size (self); g_mutex_unlock (&self->lock); break; + case PROP_STOP_COMMANDS: + GST_OBJECT_LOCK (self); + self->stop_commands = g_value_get_flags (value); + GST_OBJECT_UNLOCK (self); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; @@ -457,6 +501,11 @@ gst_rtmp2_sink_get_property (GObject * object, guint property_id, case PROP_STATS: g_value_take_boxed (value, gst_rtmp2_sink_get_stats (self)); break; + case PROP_STOP_COMMANDS: + GST_OBJECT_LOCK (self); + g_value_set_flags (value, self->stop_commands); + GST_OBJECT_UNLOCK (self); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; @@ -557,6 +606,47 @@ gst_rtmp2_sink_stop (GstBaseSink * sink) } static gboolean +stop_publish_invoker (gpointer user_data) +{ + GstRtmp2Sink *self = user_data; + + if (self->connection) { + GST_OBJECT_LOCK (self); + if (self->stop_commands != GST_RTMP_STOP_COMMAND_NONE) { + gst_rtmp_client_stop_publish (self->connection, self->location.stream, + self->stop_commands); + } + GST_OBJECT_UNLOCK (self); + } + + return G_SOURCE_REMOVE; +} + +static gboolean +gst_rtmp2_sink_event (GstBaseSink * sink, GstEvent * event) +{ + GstEventType type; + GstRtmp2Sink *self = GST_RTMP2_SINK (sink); + + type = GST_EVENT_TYPE (event); + + switch (type) { + case GST_EVENT_EOS: + g_mutex_lock (&self->lock); + if (self->loop) { + GST_DEBUG_OBJECT (self, "Got EOS: stopping publish"); + g_main_context_invoke (self->context, stop_publish_invoker, self); + } + g_mutex_unlock (&self->lock); + break; + default: + break; + } + + return GST_BASE_SINK_CLASS (gst_rtmp2_sink_parent_class)->event (sink, event);; +} + +static gboolean gst_rtmp2_sink_unlock (GstBaseSink * sink) { GstRtmp2Sink *self = GST_RTMP2_SINK (sink); diff --git a/gst/rtmp2/rtmp/rtmpclient.c b/gst/rtmp2/rtmp/rtmpclient.c index 73f397d3d..a49e8b7b3 100644 --- a/gst/rtmp2/rtmp/rtmpclient.c +++ b/gst/rtmp2/rtmp/rtmpclient.c @@ -296,6 +296,8 @@ static void socket_connect_done (GObject * source, GAsyncResult * result, static void handshake_done (GObject * source, GAsyncResult * result, gpointer user_data); static void send_connect (GTask * task); +static void send_stop (GstRtmpConnection * connection, const gchar * stream, + const GstRtmpStopCommands stop_commands); static void send_secure_token_response (GTask * task, GstRtmpConnection * connection, const gchar * challenge); static void connection_error (GstRtmpConnection * connection, @@ -1346,3 +1348,39 @@ gst_rtmp_client_start_play_finish (GstRtmpConnection * connection, { return start_stream_finish (connection, result, stream_id, error); } + +void +gst_rtmp_client_stop_publish (GstRtmpConnection * connection, + const gchar * stream, const GstRtmpStopCommands stop_commands) +{ + send_stop (connection, stream, stop_commands); +} + +static void +send_stop (GstRtmpConnection * connection, const gchar * stream, + const GstRtmpStopCommands stop_commands) +{ + GstAmfNode *command_object, *stream_name; + + command_object = gst_amf_node_new_null (); + stream_name = gst_amf_node_new_string (stream, -1); + + if (stop_commands & GST_RTMP_STOP_COMMAND_FCUNPUBLISH) { + GST_DEBUG ("Sending stop command 'FCUnpublish' for stream '%s'", stream); + gst_rtmp_connection_send_command (connection, NULL, NULL, 0, + "FCUnpublish", command_object, stream_name, NULL); + } + if (stop_commands & GST_RTMP_STOP_COMMAND_CLOSE_STREAM) { + GST_DEBUG ("Sending stop command 'closeStream' for stream '%s'", stream); + gst_rtmp_connection_send_command (connection, NULL, NULL, 0, + "closeStream", command_object, stream_name, NULL); + } + if (stop_commands & GST_RTMP_STOP_COMMAND_DELETE_STREAM) { + GST_DEBUG ("Sending stop command 'deleteStream' for stream '%s'", stream); + gst_rtmp_connection_send_command (connection, NULL, NULL, 0, + "deleteStream", command_object, stream_name, NULL); + } + + gst_amf_node_free (stream_name); + gst_amf_node_free (command_object); +} diff --git a/gst/rtmp2/rtmp/rtmpclient.h b/gst/rtmp2/rtmp/rtmpclient.h index 10a6e22de..2ab0774f9 100644 --- a/gst/rtmp2/rtmp/rtmpclient.h +++ b/gst/rtmp2/rtmp/rtmpclient.h @@ -73,6 +73,14 @@ typedef struct _GstRtmpLocation gboolean publish; } GstRtmpLocation; +typedef enum +{ + GST_RTMP_STOP_COMMAND_NONE = 0, + GST_RTMP_STOP_COMMAND_FCUNPUBLISH = (1 << 0), + GST_RTMP_STOP_COMMAND_CLOSE_STREAM = (1 << 1), + GST_RTMP_STOP_COMMAND_DELETE_STREAM = (1 << 2) +} GstRtmpStopCommands; + void gst_rtmp_location_copy (GstRtmpLocation * dest, const GstRtmpLocation * src); void gst_rtmp_location_clear (GstRtmpLocation * uri); @@ -98,5 +106,8 @@ void gst_rtmp_client_start_play_async (GstRtmpConnection * connection, gboolean gst_rtmp_client_start_play_finish (GstRtmpConnection * connection, GAsyncResult * result, guint * stream_id, GError ** error); +void gst_rtmp_client_stop_publish (GstRtmpConnection * connection, + const gchar * stream, const GstRtmpStopCommands stop_commands); + G_END_DECLS #endif |