diff options
author | Vivia Nikolaidou <vivia@ahiru.eu> | 2019-11-05 13:52:55 +0000 |
---|---|---|
committer | GStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org> | 2019-11-05 13:52:55 +0000 |
commit | 2386858a9179aff2ec249bdffa904bf407de455f (patch) | |
tree | 46bf7595022397f01c369ec1ca808c0e3963b2e2 /gst/rtmp2 | |
parent | 5320bb9085ac3332d89ed9bfa3120b95ca2c1d97 (diff) | |
download | gstreamer-plugins-bad-2386858a9179aff2ec249bdffa904bf407de455f.tar.gz |
Add files from gst-rtmp
For master, without autotools.
Diffstat (limited to 'gst/rtmp2')
-rw-r--r-- | gst/rtmp2/TODO | 63 | ||||
-rw-r--r-- | gst/rtmp2/gstrtmp2.c | 44 | ||||
-rw-r--r-- | gst/rtmp2/gstrtmp2locationhandler.c | 267 | ||||
-rw-r--r-- | gst/rtmp2/gstrtmp2locationhandler.h | 49 | ||||
-rw-r--r-- | gst/rtmp2/gstrtmp2sink.c | 1064 | ||||
-rw-r--r-- | gst/rtmp2/gstrtmp2sink.h | 34 | ||||
-rw-r--r-- | gst/rtmp2/gstrtmp2src.c | 839 | ||||
-rw-r--r-- | gst/rtmp2/gstrtmp2src.h | 34 | ||||
-rw-r--r-- | gst/rtmp2/meson.build | 24 | ||||
-rw-r--r-- | gst/rtmp2/rtmp/amf.c | 1141 | ||||
-rw-r--r-- | gst/rtmp2/rtmp/amf.h | 112 | ||||
-rw-r--r-- | gst/rtmp2/rtmp/rtmpchunkstream.c | 714 | ||||
-rw-r--r-- | gst/rtmp2/rtmp/rtmpchunkstream.h | 59 | ||||
-rw-r--r-- | gst/rtmp2/rtmp/rtmpclient.c | 1240 | ||||
-rw-r--r-- | gst/rtmp2/rtmp/rtmpclient.h | 101 | ||||
-rw-r--r-- | gst/rtmp2/rtmp/rtmpconnection.c | 996 | ||||
-rw-r--r-- | gst/rtmp2/rtmp/rtmpconnection.h | 82 | ||||
-rw-r--r-- | gst/rtmp2/rtmp/rtmphandshake.c | 311 | ||||
-rw-r--r-- | gst/rtmp2/rtmp/rtmphandshake.h | 35 | ||||
-rw-r--r-- | gst/rtmp2/rtmp/rtmpmessage.c | 494 | ||||
-rw-r--r-- | gst/rtmp2/rtmp/rtmpmessage.h | 142 | ||||
-rw-r--r-- | gst/rtmp2/rtmp/rtmputils.c | 246 | ||||
-rw-r--r-- | gst/rtmp2/rtmp/rtmputils.h | 46 |
23 files changed, 8137 insertions, 0 deletions
diff --git a/gst/rtmp2/TODO b/gst/rtmp2/TODO new file mode 100644 index 000000000..a43e48d8d --- /dev/null +++ b/gst/rtmp2/TODO @@ -0,0 +1,63 @@ +- rtmp2sink: Should look into reconnecting and resuming stream without + deleting and recreating stream, which drops clients. + +- Move AMF parser/serializer to GstRtmpMeta? +- Move AMF nodes from g_slice to GstMiniObject? + +- First video frame that comes from Wowza seems to be out-of-order; librtmp + does not have this problem + +- Refactor connection, pull out the ad-hoc read and write handling and put it + with the chunk layer into GBuffered{In,Out}putStream subclasses + +- Refactor elements and pull out the common connection+mainloop handling code + into a context object + +- Change the location properties into something with less boilerplate? + + Perhaps a GstStructure-based prop, custom GValue transforms or GstValue + (de)serializing + +- Use glib-mkenums to generate GEnumClasses + +- Post-connect onStatus handling (needed for src EOS and async errors?) + +- Better mux/demux, at the cost of losing compatibility with flvmux/demux. + + Something like (a/x = application/x-rtmp-messages): + + rtmp2src ! a/x ! rtmp2demux ! a/x,type=video ! rtmp2videodecode ! h264parse + ! a/x,type=audio ! rtmp2audiodecode ! aacparse + + x264enc ! rtmp2videoencode ! a/x,type=video ! rtmp2mux ! a/x ! rtmp2sink + fdkaacenc ! rtmp2audioencode ! a/x,type=audio ! + + And also, in case no muxing is required: + + x264enc ! rtmp2videoencode ! a/x,type=video ! rtmp2sink + fdkaacenc ! rtmp2audioencode ! a/x,type=video ! rtmp2sink + + Proper GstBuffer timestamps need proper timestamp wraparound handling + +- Better client element, which generalizes the existing sink/src to allow + multiple streams over one connection + - Request src pad to play a stream + - Request sink pad to publish a stream (base it on GstAggregator?) + - rtmp2sink/src just specialize the client element with a static pad + +- Server implementation + +- Support more protocols + - rtmpe (App-layer encryption) + - rtmpt (HTTP tunneling) + - rtmpte (HTTP tunneling + App-layer encryption) + - rtmpts (HTTPS tunneling) + - rtmfp (UDP) + +Needed testing: + +- AMF parsing + +- connection closure by peer + +- connection timeouts diff --git a/gst/rtmp2/gstrtmp2.c b/gst/rtmp2/gstrtmp2.c new file mode 100644 index 000000000..22c6399cf --- /dev/null +++ b/gst/rtmp2/gstrtmp2.c @@ -0,0 +1,44 @@ +/* GStreamer + * Copyright (C) 2014 David Schleef <ds@schleef.org> + * Copyright (C) 2017 Make.TV, Inc. <info@make.tv> + * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin Street, Suite 500, + * Boston, MA 02110-1335, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstrtmp2src.h" +#include "gstrtmp2sink.h" + +static gboolean +plugin_init (GstPlugin * plugin) +{ + gst_element_register (plugin, "rtmp2src", GST_RANK_PRIMARY + 1, + GST_TYPE_RTMP2_SRC); + gst_element_register (plugin, "rtmp2sink", GST_RANK_PRIMARY + 1, + GST_TYPE_RTMP2_SINK); + + return TRUE; +} + +GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, + GST_VERSION_MINOR, + rtmp2, + "RTMP plugin", + plugin_init, VERSION, GST_LICENSE, PACKAGE_NAME, GST_PACKAGE_ORIGIN) diff --git a/gst/rtmp2/gstrtmp2locationhandler.c b/gst/rtmp2/gstrtmp2locationhandler.c new file mode 100644 index 000000000..6c1eada87 --- /dev/null +++ b/gst/rtmp2/gstrtmp2locationhandler.c @@ -0,0 +1,267 @@ +/* GStreamer + * Copyright (C) 2017 Make.TV, Inc. <info@make.tv> + * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include "gstrtmp2locationhandler.h" +#include "rtmp/rtmputils.h" +#include "rtmp/rtmpclient.h" +#include <string.h> + +#define DEFAULT_SCHEME GST_RTMP_SCHEME_RTMP +#define DEFAULT_HOST "localhost" +#define DEFAULT_APPLICATION "live" +#define DEFAULT_STREAM "myStream" +#define DEFAULT_LOCATION "rtmp://" DEFAULT_HOST "/" DEFAULT_APPLICATION "/" DEFAULT_STREAM +#define DEFAULT_SECURE_TOKEN NULL +#define DEFAULT_USERNAME NULL +#define DEFAULT_PASSWORD NULL +#define DEFAULT_AUTHMOD GST_RTMP_AUTHMOD_AUTO +#define DEFAULT_TIMEOUT 5 + +G_DEFINE_INTERFACE (GstRtmpLocationHandler, gst_rtmp_location_handler, 0); + +#define GST_CAT_DEFAULT gst_rtmp_location_handler_debug_category +GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT); + +static void +gst_rtmp_location_handler_default_init (GstRtmpLocationHandlerInterface * iface) +{ + GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "rtmp2locationhandler", 0, + "RTMP2 Location Handling"); + + g_object_interface_install_property (iface, g_param_spec_string ("location", + "Location", "Location of RTMP stream to access", DEFAULT_LOCATION, + G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_interface_install_property (iface, g_param_spec_enum ("scheme", + "Scheme", "RTMP connection scheme", + GST_TYPE_RTMP_SCHEME, DEFAULT_SCHEME, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_interface_install_property (iface, g_param_spec_string ("host", + "Host", "RTMP server host name", DEFAULT_HOST, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_interface_install_property (iface, g_param_spec_int ("port", "Port", + "RTMP server port", 1, 65535, + gst_rtmp_scheme_get_default_port (DEFAULT_SCHEME), + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_interface_install_property (iface, + g_param_spec_string ("application", "Application", + "RTMP application path", DEFAULT_APPLICATION, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_interface_install_property (iface, g_param_spec_string ("stream", + "Stream", "RTMP stream path", DEFAULT_STREAM, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_interface_install_property (iface, g_param_spec_string ("username", + "User name", "RTMP authorization user name", DEFAULT_USERNAME, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_interface_install_property (iface, g_param_spec_string ("password", + "Password", "RTMP authorization password", DEFAULT_PASSWORD, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_interface_install_property (iface, + g_param_spec_string ("secure-token", "Secure token", + "RTMP authorization token", DEFAULT_SECURE_TOKEN, + G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_interface_install_property (iface, g_param_spec_enum ("authmod", + "Authorization mode", "RTMP authorization mode", + GST_TYPE_RTMP_AUTHMOD, DEFAULT_AUTHMOD, + G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_interface_install_property (iface, g_param_spec_uint ("timeout", + "Timeout", "RTMP timeout in seconds", 0, G_MAXUINT, DEFAULT_TIMEOUT, + G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_interface_install_property (iface, + g_param_spec_flags ("tls-validation-flags", "TLS validation flags", + "TLS validation flags to use", G_TYPE_TLS_CERTIFICATE_FLAGS, + G_TLS_CERTIFICATE_VALIDATE_ALL, + G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); +} + +static GstURIType +uri_handler_get_type_sink (GType type) +{ + return GST_URI_SINK; +} + +static GstURIType +uri_handler_get_type_src (GType type) +{ + return GST_URI_SRC; +} + +static const gchar *const * +uri_handler_get_protocols (GType type) +{ + return gst_rtmp_scheme_get_strings (); +} + +static gchar * +uri_handler_get_uri (GstURIHandler * handler) +{ + GstRtmpLocation location = { 0, }; + gchar *string; + + g_object_get (handler, "scheme", &location.scheme, "host", &location.host, + "port", &location.port, "application", &location.application, + "stream", &location.stream, NULL); + + string = gst_rtmp_location_get_string (&location, TRUE); + gst_rtmp_location_clear (&location); + return string; +} + +static gboolean +uri_handler_set_uri (GstURIHandler * handler, const gchar * string, + GError ** error) +{ + GstRtmpLocationHandler *self = GST_RTMP_LOCATION_HANDLER (handler); + GstUri *uri; + const gchar *scheme_sep, *path_sep, *stream_sep, *host, *userinfo; + GstRtmpScheme scheme; + guint port; + gboolean ret = FALSE; + + GST_DEBUG_OBJECT (self, "setting URI from %s", GST_STR_NULL (string)); + g_return_val_if_fail (string, FALSE); + + scheme_sep = strstr (string, "://"); + if (!scheme_sep) { + g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_REFERENCE, + "URI lacks scheme: %s", string); + return FALSE; + } + + path_sep = strchr (scheme_sep + 3, '/'); + if (!path_sep) { + g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_REFERENCE, + "URI lacks path: %s", string); + return FALSE; + } + + stream_sep = strrchr (path_sep + 1, '/'); + if (!stream_sep) { + g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_REFERENCE, + "URI lacks stream: %s", string); + return FALSE; + } + + { + gchar *string_without_path = g_strndup (string, path_sep - string); + uri = gst_uri_from_string (string_without_path); + g_free (string_without_path); + } + + if (!uri) { + g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI, + "URI failed to parse: %s", string); + return FALSE; + } + + gst_uri_normalize (uri); + + scheme = gst_rtmp_scheme_from_uri (uri); + if (scheme < 0) { + g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_REFERENCE, + "URI has bad scheme: %s", string); + goto out; + } + + host = gst_uri_get_host (uri); + if (!host) { + g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_REFERENCE, + "URI lacks hostname: %s", string); + goto out; + } + + port = gst_uri_get_port (uri); + if (port == GST_URI_NO_PORT) { + port = gst_rtmp_scheme_get_default_port (scheme); + } + + { + const gchar *path = path_sep + 1, *stream = stream_sep + 1; + gchar *application = g_strndup (path, stream_sep - path); + + GST_DEBUG_OBJECT (self, "setting location to %s://%s:%u/%s stream %s", + gst_rtmp_scheme_to_string (scheme), host, port, application, stream); + + g_object_set (self, "scheme", scheme, "host", host, "port", port, + "application", application, "stream", stream, "username", NULL, + "password", NULL, NULL); + + g_free (application); + } + + userinfo = gst_uri_get_userinfo (uri); + if (userinfo) { + gchar **split = g_strsplit (userinfo, ":", 2); + + if (!split || !split[0] || !split[1]) { + g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_REFERENCE, + "Failed to parse username:password data"); + g_strfreev (split); + goto out; + } + + g_object_set (self, "username", split[0], "password", split[1], NULL); + g_strfreev (split); + } + + ret = TRUE; + +out: + gst_uri_unref (uri); + return ret; +} + +void +gst_rtmp_location_handler_implement_uri_handler (GstURIHandlerInterface * iface, + GstURIType type) +{ + switch (type) { + case GST_URI_SINK: + iface->get_type = uri_handler_get_type_sink; + break; + case GST_URI_SRC: + iface->get_type = uri_handler_get_type_src; + break; + default: + g_return_if_reached (); + } + iface->get_protocols = uri_handler_get_protocols; + iface->get_uri = uri_handler_get_uri; + iface->set_uri = uri_handler_set_uri; +} + +gboolean +gst_rtmp_location_handler_set_uri (GstRtmpLocationHandler * handler, + const gchar * uri) +{ + GError *error = NULL; + gboolean ret; + + g_return_val_if_fail (GST_IS_RTMP_LOCATION_HANDLER (handler), FALSE); + + ret = gst_uri_handler_set_uri (GST_URI_HANDLER (handler), uri, &error); + if (!ret) { + GST_ERROR_OBJECT (handler, "Failed to set URI: %s", error->message); + g_object_set (handler, "scheme", DEFAULT_SCHEME, "host", NULL, + "port", gst_rtmp_scheme_get_default_port (DEFAULT_SCHEME), + "application", NULL, "stream", NULL, NULL); + g_error_free (error); + } + return ret; +} diff --git a/gst/rtmp2/gstrtmp2locationhandler.h b/gst/rtmp2/gstrtmp2locationhandler.h new file mode 100644 index 000000000..51b560ce6 --- /dev/null +++ b/gst/rtmp2/gstrtmp2locationhandler.h @@ -0,0 +1,49 @@ +/* GStreamer + * Copyright (C) 2017 Make.TV, Inc. <info@make.tv> + * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_RTMP_LOCATION_HANDLER_H__ +#define __GST_RTMP_LOCATION_HANDLER_H__ + +#include <gst/gst.h> +#include "rtmp/rtmpclient.h" + +G_BEGIN_DECLS +#define GST_TYPE_RTMP_LOCATION_HANDLER (gst_rtmp_location_handler_get_type ()) +#define GST_RTMP_LOCATION_HANDLER(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_RTMP_LOCATION_HANDLER, GstRtmpLocationHandler)) +#define GST_IS_RTMP_LOCATION_HANDLER(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_RTMP_LOCATION_HANDLER)) +#define GST_RTMP_LOCATION_HANDLER_GET_INTERFACE(inst) (G_TYPE_INSTANCE_GET_INTERFACE ((inst), GST_TYPE_RTMP_LOCATION_HANDLER, GstRtmpLocationHandlerInterface)) +typedef struct _GstRtmpLocationHandler GstRtmpLocationHandler; /* dummy object */ +typedef struct _GstRtmpLocationHandlerInterface GstRtmpLocationHandlerInterface; + +struct _GstRtmpLocationHandlerInterface +{ + GTypeInterface parent_iface; +}; + +GType gst_rtmp_location_handler_get_type (void); + +void gst_rtmp_location_handler_implement_uri_handler (GstURIHandlerInterface * + iface, GstURIType type); + +gboolean gst_rtmp_location_handler_set_uri (GstRtmpLocationHandler * handler, + const gchar * uri); + +G_END_DECLS +#endif diff --git a/gst/rtmp2/gstrtmp2sink.c b/gst/rtmp2/gstrtmp2sink.c new file mode 100644 index 000000000..2ea8b2e38 --- /dev/null +++ b/gst/rtmp2/gstrtmp2sink.c @@ -0,0 +1,1064 @@ +/* GStreamer + * Copyright (C) 2014 David Schleef <ds@schleef.org> + * Copyright (C) 2017 Make.TV, Inc. <info@make.tv> + * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin Street, Suite 500, + * Boston, MA 02110-1335, USA. + */ +/** + * SECTION:element-gstrtmp2sink + * + * The rtmp2sink element sends audio and video streams to an RTMP + * server. + * + * <refsect2> + * <title>Example launch line</title> + * |[ + * gst-launch -v videotestsrc ! x264enc ! flvmux ! rtmp2sink + * location=rtmp://server.example.com/live/myStream + * ]| + * FIXME Describe what the pipeline does. + * </refsect2> + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstrtmp2sink.h" + +#include "gstrtmp2locationhandler.h" +#include "rtmp/rtmpclient.h" +#include "rtmp/rtmpmessage.h" + +#include <gst/gst.h> +#include <gst/base/gstbasesink.h> +#include <gio/gnetworking.h> +#include <string.h> + +GST_DEBUG_CATEGORY_STATIC (gst_rtmp2_sink_debug_category); +#define GST_CAT_DEFAULT gst_rtmp2_sink_debug_category + +/* prototypes */ +#define GST_RTMP2_SINK(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTMP2_SINK,GstRtmp2Sink)) +#define GST_IS_RTMP2_SINK(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTMP2_SINK)) + +typedef struct +{ + GstBaseSink parent_instance; + + /* properties */ + GstRtmpLocation location; + gboolean async_connect; + guint peak_kbps; + + /* stuff */ + gboolean running, flushing; + GMutex lock; + GCond cond; + + GstTask *task; + GRecMutex task_lock; + + GMainLoop *loop; + GMainContext *context; + + GCancellable *cancellable; + GstRtmpConnection *connection; + guint32 stream_id; + + GPtrArray *headers; + guint64 last_ts, base_ts; /* timestamp fixup */ +} GstRtmp2Sink; + +typedef struct +{ + GstBaseSinkClass parent_class; +} GstRtmp2SinkClass; + +/* GObject virtual functions */ +static void gst_rtmp2_sink_set_property (GObject * object, + guint property_id, const GValue * value, GParamSpec * pspec); +static void gst_rtmp2_sink_get_property (GObject * object, + guint property_id, GValue * value, GParamSpec * pspec); +static void gst_rtmp2_sink_finalize (GObject * object); +static void gst_rtmp2_sink_uri_handler_init (GstURIHandlerInterface * iface); + +/* GstBaseSink virtual functions */ +static gboolean gst_rtmp2_sink_start (GstBaseSink * sink); +static gboolean gst_rtmp2_sink_stop (GstBaseSink * sink); +static gboolean gst_rtmp2_sink_unlock (GstBaseSink * sink); +static gboolean gst_rtmp2_sink_unlock_stop (GstBaseSink * sink); +static GstFlowReturn gst_rtmp2_sink_render (GstBaseSink * sink, + GstBuffer * buffer); +static gboolean gst_rtmp2_sink_set_caps (GstBaseSink * sink, GstCaps * caps); + +/* Internal API */ +static void gst_rtmp2_sink_task_func (gpointer user_data); + +static void client_connect_done (GObject * source, GAsyncResult * result, + gpointer user_data); +static void start_publish_done (GObject * source, GAsyncResult * result, + gpointer user_data); +static void connect_task_done (GObject * object, GAsyncResult * result, + gpointer user_data); + +static void set_pacing_rate (GstRtmp2Sink * self); + +enum +{ + PROP_0, + PROP_LOCATION, + PROP_SCHEME, + PROP_HOST, + PROP_PORT, + PROP_APPLICATION, + PROP_STREAM, + PROP_SECURE_TOKEN, + PROP_USERNAME, + PROP_PASSWORD, + PROP_AUTHMOD, + PROP_TIMEOUT, + PROP_TLS_VALIDATION_FLAGS, + PROP_ASYNC_CONNECT, + PROP_PEAK_KBPS, +}; + +/* pad templates */ + +static GstStaticPadTemplate gst_rtmp2_sink_sink_template = +GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("video/x-flv") + ); + +/* class initialization */ + +G_DEFINE_TYPE_WITH_CODE (GstRtmp2Sink, gst_rtmp2_sink, GST_TYPE_BASE_SINK, + G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, + gst_rtmp2_sink_uri_handler_init); + G_IMPLEMENT_INTERFACE (GST_TYPE_RTMP_LOCATION_HANDLER, NULL)); + +static void +gst_rtmp2_sink_class_init (GstRtmp2SinkClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstBaseSinkClass *base_sink_class = GST_BASE_SINK_CLASS (klass); + + gst_element_class_add_static_pad_template (GST_ELEMENT_CLASS (klass), + &gst_rtmp2_sink_sink_template); + + gst_element_class_set_static_metadata (GST_ELEMENT_CLASS (klass), + "RTMP sink element", "Sink", "Sink element for RTMP streams", + "Make.TV, Inc. <info@make.tv>"); + + gobject_class->set_property = gst_rtmp2_sink_set_property; + gobject_class->get_property = gst_rtmp2_sink_get_property; + gobject_class->finalize = gst_rtmp2_sink_finalize; + base_sink_class->start = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_start); + base_sink_class->stop = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_stop); + base_sink_class->unlock = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_unlock); + base_sink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_unlock_stop); + base_sink_class->render = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_render); + base_sink_class->set_caps = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_set_caps); + + g_object_class_override_property (gobject_class, PROP_LOCATION, "location"); + g_object_class_override_property (gobject_class, PROP_SCHEME, "scheme"); + g_object_class_override_property (gobject_class, PROP_HOST, "host"); + g_object_class_override_property (gobject_class, PROP_PORT, "port"); + g_object_class_override_property (gobject_class, PROP_APPLICATION, + "application"); + g_object_class_override_property (gobject_class, PROP_STREAM, "stream"); + g_object_class_override_property (gobject_class, PROP_SECURE_TOKEN, + "secure-token"); + g_object_class_override_property (gobject_class, PROP_USERNAME, "username"); + g_object_class_override_property (gobject_class, PROP_PASSWORD, "password"); + g_object_class_override_property (gobject_class, PROP_AUTHMOD, "authmod"); + g_object_class_override_property (gobject_class, PROP_TIMEOUT, "timeout"); + g_object_class_override_property (gobject_class, PROP_TLS_VALIDATION_FLAGS, + "tls-validation-flags"); + + g_object_class_install_property (gobject_class, PROP_ASYNC_CONNECT, + g_param_spec_boolean ("async-connect", "Async connect", + "Connect on READY, otherwise on first push", TRUE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_PEAK_KBPS, + g_param_spec_uint ("peak-kbps", "Peak bitrate", + "Bitrate in kbit/sec to pace outgoing packets", 0, G_MAXINT / 125, 0, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | + GST_PARAM_MUTABLE_PLAYING)); + + GST_DEBUG_CATEGORY_INIT (gst_rtmp2_sink_debug_category, "rtmp2sink", 0, + "debug category for rtmp2sink element"); +} + +static void +gst_rtmp2_sink_init (GstRtmp2Sink * self) +{ + self->location.flash_ver = g_strdup ("FMLE/3.0 (compatible; FMSc/1.0)"); + self->async_connect = TRUE; + + g_mutex_init (&self->lock); + g_cond_init (&self->cond); + + self->task = gst_task_new (gst_rtmp2_sink_task_func, self, NULL); + g_rec_mutex_init (&self->task_lock); + gst_task_set_lock (self->task, &self->task_lock); + + self->headers = g_ptr_array_new_with_free_func + ((GDestroyNotify) gst_mini_object_unref); +} + +static void +gst_rtmp2_sink_uri_handler_init (GstURIHandlerInterface * iface) +{ + gst_rtmp_location_handler_implement_uri_handler (iface, GST_URI_SINK); +} + +static void +gst_rtmp2_sink_set_property (GObject * object, guint property_id, + const GValue * value, GParamSpec * pspec) +{ + GstRtmp2Sink *self = GST_RTMP2_SINK (object); + + switch (property_id) { + case PROP_LOCATION: + gst_rtmp_location_handler_set_uri (GST_RTMP_LOCATION_HANDLER (self), + g_value_get_string (value)); + break; + case PROP_SCHEME: + GST_OBJECT_LOCK (self); + self->location.scheme = g_value_get_enum (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_HOST: + GST_OBJECT_LOCK (self); + g_free (self->location.host); + self->location.host = g_value_dup_string (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_PORT: + GST_OBJECT_LOCK (self); + self->location.port = g_value_get_int (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_APPLICATION: + GST_OBJECT_LOCK (self); + g_free (self->location.application); + self->location.application = g_value_dup_string (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_STREAM: + GST_OBJECT_LOCK (self); + g_free (self->location.stream); + self->location.stream = g_value_dup_string (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_SECURE_TOKEN: + GST_OBJECT_LOCK (self); + g_free (self->location.secure_token); + self->location.secure_token = g_value_dup_string (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_USERNAME: + GST_OBJECT_LOCK (self); + g_free (self->location.username); + self->location.username = g_value_dup_string (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_PASSWORD: + GST_OBJECT_LOCK (self); + g_free (self->location.password); + self->location.password = g_value_dup_string (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_AUTHMOD: + GST_OBJECT_LOCK (self); + self->location.authmod = g_value_get_enum (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_TIMEOUT: + GST_OBJECT_LOCK (self); + self->location.timeout = g_value_get_uint (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_TLS_VALIDATION_FLAGS: + GST_OBJECT_LOCK (self); + self->location.tls_flags = g_value_get_flags (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_ASYNC_CONNECT: + GST_OBJECT_LOCK (self); + self->async_connect = g_value_get_boolean (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_PEAK_KBPS: + GST_OBJECT_LOCK (self); + self->peak_kbps = g_value_get_uint (value); + GST_OBJECT_UNLOCK (self); + + g_mutex_lock (&self->lock); + set_pacing_rate (self); + g_mutex_unlock (&self->lock); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +static void +gst_rtmp2_sink_get_property (GObject * object, guint property_id, + GValue * value, GParamSpec * pspec) +{ + GstRtmp2Sink *self = GST_RTMP2_SINK (object); + + switch (property_id) { + case PROP_LOCATION: + GST_OBJECT_LOCK (self); + g_value_take_string (value, gst_rtmp_location_get_string (&self->location, + TRUE)); + GST_OBJECT_UNLOCK (self); + break; + case PROP_SCHEME: + GST_OBJECT_LOCK (self); + g_value_set_enum (value, self->location.scheme); + GST_OBJECT_UNLOCK (self); + break; + case PROP_HOST: + GST_OBJECT_LOCK (self); + g_value_set_string (value, self->location.host); + GST_OBJECT_UNLOCK (self); + break; + case PROP_PORT: + GST_OBJECT_LOCK (self); + g_value_set_int (value, self->location.port); + GST_OBJECT_UNLOCK (self); + break; + case PROP_APPLICATION: + GST_OBJECT_LOCK (self); + g_value_set_string (value, self->location.application); + GST_OBJECT_UNLOCK (self); + break; + case PROP_STREAM: + GST_OBJECT_LOCK (self); + g_value_set_string (value, self->location.stream); + GST_OBJECT_UNLOCK (self); + break; + case PROP_SECURE_TOKEN: + GST_OBJECT_LOCK (self); + g_value_set_string (value, self->location.secure_token); + GST_OBJECT_UNLOCK (self); + break; + case PROP_USERNAME: + GST_OBJECT_LOCK (self); + g_value_set_string (value, self->location.username); + GST_OBJECT_UNLOCK (self); + break; + case PROP_PASSWORD: + GST_OBJECT_LOCK (self); + g_value_set_string (value, self->location.password); + GST_OBJECT_UNLOCK (self); + break; + case PROP_AUTHMOD: + GST_OBJECT_LOCK (self); + g_value_set_enum (value, self->location.authmod); + GST_OBJECT_UNLOCK (self); + break; + case PROP_TIMEOUT: + GST_OBJECT_LOCK (self); + g_value_set_uint (value, self->location.timeout); + GST_OBJECT_UNLOCK (self); + break; + case PROP_TLS_VALIDATION_FLAGS: + GST_OBJECT_LOCK (self); + g_value_set_flags (value, self->location.tls_flags); + GST_OBJECT_UNLOCK (self); + break; + case PROP_ASYNC_CONNECT: + GST_OBJECT_LOCK (self); + g_value_set_boolean (value, self->async_connect); + GST_OBJECT_UNLOCK (self); + break; + case PROP_PEAK_KBPS: + GST_OBJECT_LOCK (self); + g_value_set_uint (value, self->peak_kbps); + GST_OBJECT_UNLOCK (self); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +static void +gst_rtmp2_sink_finalize (GObject * object) +{ + GstRtmp2Sink *self = GST_RTMP2_SINK (object); + + g_clear_pointer (&self->headers, g_ptr_array_unref); + + g_clear_object (&self->cancellable); + g_clear_object (&self->connection); + + g_clear_object (&self->task); + g_rec_mutex_clear (&self->task_lock); + + g_mutex_clear (&self->lock); + g_cond_clear (&self->cond); + + gst_rtmp_location_clear (&self->location); + + G_OBJECT_CLASS (gst_rtmp2_sink_parent_class)->finalize (object); +} + +static gboolean +gst_rtmp2_sink_start (GstBaseSink * sink) +{ + GstRtmp2Sink *self = GST_RTMP2_SINK (sink); + gboolean async; + + GST_OBJECT_LOCK (self); + async = self->async_connect; + GST_OBJECT_UNLOCK (self); + + GST_INFO_OBJECT (self, "Starting (%s)", async ? "async" : "delayed"); + + g_clear_object (&self->cancellable); + + self->running = TRUE; + self->cancellable = g_cancellable_new (); + self->stream_id = 0; + self->last_ts = 0; + self->base_ts = 0; + + if (async) { + gst_task_start (self->task); + } + + return TRUE; +} + +static gboolean +quit_invoker (gpointer user_data) +{ + g_main_loop_quit (user_data); + return G_SOURCE_REMOVE; +} + +static void +stop_task (GstRtmp2Sink * self) +{ + gst_task_stop (self->task); + self->running = FALSE; + + if (self->cancellable) { + GST_DEBUG_OBJECT (self, "Cancelling"); + g_cancellable_cancel (self->cancellable); + } + + if (self->loop) { + GST_DEBUG_OBJECT (self, "Stopping loop"); + g_main_context_invoke_full (self->context, G_PRIORITY_DEFAULT_IDLE, + quit_invoker, g_main_loop_ref (self->loop), + (GDestroyNotify) g_main_loop_unref); + } + + g_cond_broadcast (&self->cond); +} + +static gboolean +gst_rtmp2_sink_stop (GstBaseSink * sink) +{ + GstRtmp2Sink *self = GST_RTMP2_SINK (sink); + + GST_DEBUG_OBJECT (self, "stop"); + + g_mutex_lock (&self->lock); + stop_task (self); + g_mutex_unlock (&self->lock); + + gst_task_join (self->task); + + return TRUE; +} + +static gboolean +gst_rtmp2_sink_unlock (GstBaseSink * sink) +{ + GstRtmp2Sink *self = GST_RTMP2_SINK (sink); + + GST_DEBUG_OBJECT (self, "unlock"); + + g_mutex_lock (&self->lock); + self->flushing = TRUE; + g_cond_broadcast (&self->cond); + g_mutex_unlock (&self->lock); + + return TRUE; +} + +static gboolean +gst_rtmp2_sink_unlock_stop (GstBaseSink * sink) +{ + GstRtmp2Sink *self = GST_RTMP2_SINK (sink); + + GST_DEBUG_OBJECT (self, "unlock_stop"); + + g_mutex_lock (&self->lock); + self->flushing = FALSE; + g_mutex_unlock (&self->lock); + + return TRUE; +} + +static gboolean +buffer_to_message (GstRtmp2Sink * self, GstBuffer * buffer, GstBuffer ** outbuf) +{ + GstBuffer *message; + gsize payload_offset, payload_size; + guint64 timestamp; + guint32 cstream; + GstRtmpMessageType type; + + { + GstMapInfo info; + + if (G_UNLIKELY (!gst_buffer_map (buffer, &info, GST_MAP_READ))) { + GST_ERROR_OBJECT (self, "map failed: %" GST_PTR_FORMAT, buffer); + return FALSE; + } + + /* FIXME: This is ugly and only works behind flvmux. + * Implement true RTMP muxing. */ + + if (G_UNLIKELY (info.size >= 4 && memcmp (info.data, "FLV", 3) == 0)) { + /* drop the header, we don't need it */ + GST_DEBUG_OBJECT (self, "ignoring FLV header: %" GST_PTR_FORMAT, buffer); + gst_buffer_unmap (buffer, &info); + *outbuf = NULL; + return TRUE; + } + + if (G_UNLIKELY (info.size < 11 + 4)) { + GST_ERROR_OBJECT (self, "too small: %" GST_PTR_FORMAT, buffer); + gst_buffer_unmap (buffer, &info); + return FALSE; + } + + /* payload between 11 byte header and 4 byte size footer */ + payload_offset = 11; + payload_size = info.size - 11 - 4; + + type = GST_READ_UINT8 (info.data); + timestamp = GST_READ_UINT24_BE (info.data + 4); + timestamp |= (guint32) GST_READ_UINT8 (info.data + 7) << 24; + + /* flvmux timestamps roll over after about 49 days */ + if (timestamp + self->base_ts + G_MAXINT32 < self->last_ts) { + GST_WARNING_OBJECT (self, "Timestamp regression %" G_GUINT64_FORMAT + " -> %" G_GUINT64_FORMAT "; assuming overflow", self->last_ts, + timestamp + self->base_ts); + self->base_ts += G_MAXUINT32; + self->base_ts += 1; + } else if (timestamp + self->base_ts > self->last_ts + G_MAXINT32) { + GST_WARNING_OBJECT (self, "Timestamp jump %" G_GUINT64_FORMAT + " -> %" G_GUINT64_FORMAT "; assuming underflow", self->last_ts, + timestamp + self->base_ts); + if (self->base_ts > 0) { + self->base_ts -= G_MAXUINT32; + self->base_ts -= 1; + } else { + GST_WARNING_OBJECT (self, "Cannot regress further;" + " forcing timestamp to zero"); + timestamp = 0; + } + } + timestamp += self->base_ts; + self->last_ts = timestamp; + + gst_buffer_unmap (buffer, &info); + } + + switch (type) { + case GST_RTMP_MESSAGE_TYPE_DATA_AMF0: + cstream = 4; + break; + + case GST_RTMP_MESSAGE_TYPE_AUDIO: + cstream = 5; + break; + + case GST_RTMP_MESSAGE_TYPE_VIDEO: + cstream = 6; + break; + + default: + GST_ERROR_OBJECT (self, "unknown tag type %d", type); + return FALSE; + } + + /* May not know stream ID yet; set later */ + message = gst_rtmp_message_new (type, cstream, 0); + message = gst_buffer_append_region (message, gst_buffer_ref (buffer), + payload_offset, payload_size); + + GST_BUFFER_DTS (message) = timestamp * GST_MSECOND; + + if (type == GST_RTMP_MESSAGE_TYPE_DATA_AMF0) { + /* FIXME: HACK: Attach a setDataFrame header. + * This should be done using a command. */ + + static const guint8 header[] = { + 0x02, 0x00, 0x0d, 0x40, 0x73, 0x65, 0x74, 0x44, + 0x61, 0x74, 0x61, 0x46, 0x72, 0x61, 0x6d, 0x65 + }; + + GstMemory *memory = gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY, + (guint8 *) header, sizeof header, 0, sizeof header, NULL, NULL); + + gst_buffer_prepend_memory (message, memory); + } + + *outbuf = message; + return TRUE; +} + +static gboolean +should_drop_header (GstRtmp2Sink * self, GstBuffer * buffer) +{ + guint len; + + if (G_LIKELY (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_HEADER))) { + return FALSE; + } + + g_mutex_lock (&self->lock); + len = self->headers->len; + g_mutex_unlock (&self->lock); + + /* Drop header buffers when we have streamheader caps */ + return len > 0; +} + +static void +send_message (GstRtmp2Sink * self, GstBuffer * message) +{ + GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (message); + g_return_if_fail (self->stream_id != 0); + meta->mstream = self->stream_id; + gst_rtmp_connection_queue_message (self->connection, message); +} + +static void +send_streamheader (GstRtmp2Sink * self) +{ + guint i; + + if (G_LIKELY (self->headers->len == 0)) { + return; + } + + GST_DEBUG_OBJECT (self, "Sending %u streamheader messages", + self->headers->len); + + for (i = 0; i < self->headers->len; i++) { + send_message (self, g_ptr_array_index (self->headers, i)); + } + + /* Steal pointers: suppress free */ + g_ptr_array_set_free_func (self->headers, NULL); + g_ptr_array_set_size (self->headers, 0); + g_ptr_array_set_free_func (self->headers, + (GDestroyNotify) gst_mini_object_unref); +} + +static inline gboolean +is_running (GstRtmp2Sink * self) +{ + return G_LIKELY (self->running && !self->flushing); +} + +static GstFlowReturn +gst_rtmp2_sink_render (GstBaseSink * sink, GstBuffer * buffer) +{ + GstRtmp2Sink *self = GST_RTMP2_SINK (sink); + GstBuffer *message; + GstFlowReturn ret; + + if (G_UNLIKELY (should_drop_header (self, buffer))) { + GST_DEBUG_OBJECT (self, "Skipping header %" GST_PTR_FORMAT, buffer); + return GST_FLOW_OK; + } + + GST_LOG_OBJECT (self, "render %" GST_PTR_FORMAT, buffer); + + if (G_UNLIKELY (!buffer_to_message (self, buffer, &message))) { + GST_ELEMENT_ERROR (self, STREAM, FAILED, ("Failed to convert FLV to RTMP"), + ("Failed to convert %" GST_PTR_FORMAT, message)); + return GST_FLOW_ERROR; + } + + if (G_UNLIKELY (!message)) { + GST_DEBUG_OBJECT (self, "Skipping %" GST_PTR_FORMAT, buffer); + return GST_FLOW_OK; + } + + g_mutex_lock (&self->lock); + + if (G_UNLIKELY (is_running (self) && self->cancellable && + gst_task_get_state (self->task) != GST_TASK_STARTED)) { + GST_DEBUG_OBJECT (self, "Starting connect"); + gst_task_start (self->task); + } + + while (G_UNLIKELY (is_running (self) && !self->connection)) { + GST_DEBUG_OBJECT (self, "Waiting for connection"); + g_cond_wait (&self->cond, &self->lock); + } + + while (G_UNLIKELY (is_running (self) && self->connection && + gst_rtmp_connection_get_num_queued (self->connection) > 3)) { + GST_LOG_OBJECT (self, "Waiting for queue"); + g_cond_wait (&self->cond, &self->lock); + } + + if (G_UNLIKELY (!is_running (self))) { + gst_buffer_unref (message); + ret = GST_FLOW_FLUSHING; + } else if (G_UNLIKELY (!self->connection)) { + gst_buffer_unref (message); + /* send_connect_error has sent an ERROR message */ + ret = GST_FLOW_ERROR; + } else { + send_streamheader (self); + send_message (self, message); + ret = GST_FLOW_OK; + } + + g_mutex_unlock (&self->lock); + return ret; +} + +static gboolean +add_streamheader (GstRtmp2Sink * self, const GValue * value) +{ + GstBuffer *buffer, *message; + + g_return_val_if_fail (value, FALSE); + + if (!GST_VALUE_HOLDS_BUFFER (value)) { + GST_ERROR_OBJECT (self, "'streamheader' item of unexpected type '%s'", + G_VALUE_TYPE_NAME (value)); + return FALSE; + } + + buffer = gst_value_get_buffer (value); + + if (!buffer_to_message (self, buffer, &message)) { + GST_ERROR_OBJECT (self, "Failed to read streamheader %" GST_PTR_FORMAT, + buffer); + return FALSE; + } + + if (message) { + GST_DEBUG_OBJECT (self, "Adding streamheader %" GST_PTR_FORMAT, buffer); + g_ptr_array_add (self->headers, message); + } else { + GST_DEBUG_OBJECT (self, "Skipping streamheader %" GST_PTR_FORMAT, buffer); + } + + return TRUE; +} + +static gboolean +gst_rtmp2_sink_set_caps (GstBaseSink * sink, GstCaps * caps) +{ + GstRtmp2Sink *self = GST_RTMP2_SINK (sink); + GstStructure *s; + const GValue *streamheader; + guint i = 0; + + GST_DEBUG_OBJECT (self, "setcaps %" GST_PTR_FORMAT, caps); + + g_ptr_array_set_size (self->headers, 0); + + s = gst_caps_get_structure (caps, 0); + streamheader = gst_structure_get_value (s, "streamheader"); + + if (!streamheader) { + GST_DEBUG_OBJECT (self, "'streamheader' field not present"); + } else if (GST_VALUE_HOLDS_BUFFER (streamheader)) { + GST_DEBUG_OBJECT (self, "'streamheader' field holds buffer"); + if (!add_streamheader (self, streamheader)) { + return FALSE; + } + + i = 1; + } else if (GST_VALUE_HOLDS_ARRAY (streamheader)) { + guint size = gst_value_array_get_size (streamheader); + + GST_DEBUG_OBJECT (self, "'streamheader' field holds array"); + + for (; i < size; i++) { + const GValue *v = gst_value_array_get_value (streamheader, i); + + if (!add_streamheader (self, v)) { + return FALSE; + } + } + } else { + GST_ERROR_OBJECT (self, "'streamheader' field has unexpected type '%s'", + G_VALUE_TYPE_NAME (streamheader)); + return FALSE; + } + + GST_DEBUG_OBJECT (self, "Collected streamheaders: %u buffers -> %u messages", + i, self->headers->len); + + return TRUE; +} + +/* Mainloop task */ +static void +gst_rtmp2_sink_task_func (gpointer user_data) +{ + GstRtmp2Sink *self = GST_RTMP2_SINK (user_data); + GMainContext *context; + GMainLoop *loop; + GTask *connector; + + GST_DEBUG_OBJECT (self, "gst_rtmp2_sink_task starting"); + + g_mutex_lock (&self->lock); + context = self->context = g_main_context_new (); + g_main_context_push_thread_default (context); + loop = self->loop = g_main_loop_new (context, TRUE); + connector = g_task_new (self, self->cancellable, connect_task_done, NULL); + GST_OBJECT_LOCK (self); + gst_rtmp_client_connect_async (&self->location, self->cancellable, + client_connect_done, connector); + GST_OBJECT_UNLOCK (self); + g_mutex_unlock (&self->lock); + + g_main_loop_run (loop); + + g_mutex_lock (&self->lock); + g_clear_pointer (&self->loop, g_main_loop_unref); + g_clear_pointer (&self->connection, gst_rtmp_connection_close_and_unref); + g_cond_broadcast (&self->cond); + g_mutex_unlock (&self->lock); + + while (g_main_context_pending (context)) { + GST_DEBUG_OBJECT (self, "iterating main context to clean up"); + g_main_context_iteration (context, FALSE); + } + + g_main_context_pop_thread_default (context); + + g_mutex_lock (&self->lock); + g_clear_pointer (&self->context, g_main_context_unref); + g_ptr_array_set_size (self->headers, 0); + g_mutex_unlock (&self->lock); + + GST_DEBUG_OBJECT (self, "gst_rtmp2_sink_task exiting"); +} + +static void +client_connect_done (GObject * source, GAsyncResult * result, + gpointer user_data) +{ + GTask *task = user_data; + GstRtmp2Sink *self = g_task_get_source_object (task); + GError *error = NULL; + GstRtmpConnection *connection; + + connection = gst_rtmp_client_connect_finish (result, &error); + if (!connection) { + g_task_return_error (task, error); + g_object_unref (task); + return; + } + + g_task_set_task_data (task, connection, g_object_unref); + + if (g_task_return_error_if_cancelled (task)) { + g_object_unref (task); + return; + } + + GST_OBJECT_LOCK (self); + gst_rtmp_client_start_publish_async (connection, self->location.stream, + g_task_get_cancellable (task), start_publish_done, task); + GST_OBJECT_UNLOCK (self); +} + +static void +start_publish_done (GObject * source, GAsyncResult * result, gpointer user_data) +{ + GTask *task = G_TASK (user_data); + GstRtmp2Sink *self = g_task_get_source_object (task); + GstRtmpConnection *connection = g_task_get_task_data (task); + GError *error = NULL; + + if (g_task_return_error_if_cancelled (task)) { + g_object_unref (task); + return; + } + + if (gst_rtmp_client_start_publish_finish (connection, result, + &self->stream_id, &error)) { + g_task_return_pointer (task, g_object_ref (connection), + gst_rtmp_connection_close_and_unref); + } else { + g_task_return_error (task, error); + } + + g_task_set_task_data (task, NULL, NULL); + g_object_unref (task); +} + +static void +put_chunk (GstRtmpConnection * connection, gpointer user_data) +{ + GstRtmp2Sink *self = GST_RTMP2_SINK (user_data); + + g_mutex_lock (&self->lock); + g_cond_signal (&self->cond); + g_mutex_unlock (&self->lock); +} + +static void +error_callback (GstRtmpConnection * connection, GstRtmp2Sink * self) +{ + g_mutex_lock (&self->lock); + if (self->cancellable) { + g_cancellable_cancel (self->cancellable); + } else if (self->loop) { + GST_ELEMENT_ERROR (self, RESOURCE, WRITE, ("Connection error"), (NULL)); + stop_task (self); + } + g_mutex_unlock (&self->lock); +} + +static void +send_connect_error (GstRtmp2Sink * self, GError * error) +{ + if (!error) { + GST_ERROR_OBJECT (self, "Connect failed with NULL error"); + GST_ELEMENT_ERROR (self, RESOURCE, FAILED, ("Failed to connect"), (NULL)); + return; + } + + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + GST_DEBUG_OBJECT (self, "Connection was cancelled (%s)", + GST_STR_NULL (error->message)); + return; + } + + GST_ERROR_OBJECT (self, "Failed to connect (%s:%d): %s", + g_quark_to_string (error->domain), error->code, + GST_STR_NULL (error->message)); + + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED)) { + GST_ELEMENT_ERROR (self, RESOURCE, NOT_AUTHORIZED, + ("Not authorized to connect"), ("%s", GST_STR_NULL (error->message))); + } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CONNECTION_REFUSED)) { + GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ, + ("Could not connect"), ("%s", GST_STR_NULL (error->message))); + } else { + GST_ELEMENT_ERROR (self, RESOURCE, FAILED, + ("Failed to connect"), + ("error %s:%d: %s", g_quark_to_string (error->domain), error->code, + GST_STR_NULL (error->message))); + } +} + +static void +connect_task_done (GObject * object, GAsyncResult * result, gpointer user_data) +{ + GstRtmp2Sink *self = GST_RTMP2_SINK (object); + GTask *task = G_TASK (result); + GError *error = NULL; + + g_mutex_lock (&self->lock); + + g_warn_if_fail (g_task_is_valid (task, object)); + + if (self->cancellable == g_task_get_cancellable (task)) { + g_clear_object (&self->cancellable); + } + + self->connection = g_task_propagate_pointer (task, &error); + if (self->connection) { + set_pacing_rate (self); + gst_rtmp_connection_set_output_handler (self->connection, + put_chunk, g_object_ref (self), g_object_unref); + g_signal_connect_object (self->connection, "error", + G_CALLBACK (error_callback), self, 0); + } else { + send_connect_error (self, error); + stop_task (self); + g_error_free (error); + } + + g_cond_broadcast (&self->cond); + g_mutex_unlock (&self->lock); +} + +static gboolean +socket_set_pacing_rate (GSocket * socket, gint pacing_rate, GError ** error) +{ +#ifdef SO_MAX_PACING_RATE + if (!g_socket_set_option (socket, SOL_SOCKET, SO_MAX_PACING_RATE, + pacing_rate, error)) { + g_prefix_error (error, "setsockopt failed: "); + return FALSE; + } +#else + if (pacing_rate != -1) { + g_set_error (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, + "SO_MAX_PACING_RATE is not supported"); + return FALSE; + } +#endif + + return TRUE; +} + +static void +set_pacing_rate (GstRtmp2Sink * self) +{ + GError *error = NULL; + gint pacing_rate; + + if (!self->connection) + return; + + GST_OBJECT_LOCK (self); + pacing_rate = self->peak_kbps ? self->peak_kbps * 125 : -1; + GST_OBJECT_UNLOCK (self); + + if (socket_set_pacing_rate (gst_rtmp_connection_get_socket (self->connection), + pacing_rate, &error)) + GST_INFO_OBJECT (self, "Set pacing rate to %d Bps", pacing_rate); + else + GST_WARNING_OBJECT (self, "Could not set pacing rate: %s", error->message); + + g_clear_error (&error); +} diff --git a/gst/rtmp2/gstrtmp2sink.h b/gst/rtmp2/gstrtmp2sink.h new file mode 100644 index 000000000..98f4d1173 --- /dev/null +++ b/gst/rtmp2/gstrtmp2sink.h @@ -0,0 +1,34 @@ +/* GStreamer + * Copyright (C) 2014 David Schleef <ds@schleef.org> + * Copyright (C) 2017 Make.TV, Inc. <info@make.tv> + * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef _GST_RTMP2_SINK_H_ + +#define _GST_RTMP2_SINK_H_ + +#include <gst/gst.h> + +G_BEGIN_DECLS + +#define GST_TYPE_RTMP2_SINK (gst_rtmp2_sink_get_type()) +GType gst_rtmp2_sink_get_type (void); + +G_END_DECLS +#endif diff --git a/gst/rtmp2/gstrtmp2src.c b/gst/rtmp2/gstrtmp2src.c new file mode 100644 index 000000000..0e7598e32 --- /dev/null +++ b/gst/rtmp2/gstrtmp2src.c @@ -0,0 +1,839 @@ +/* GStreamer + * Copyright (C) 2014 David Schleef <ds@schleef.org> + * Copyright (C) 2017 Make.TV, Inc. <info@make.tv> + * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin Street, Suite 500, + * Boston, MA 02110-1335, USA. + */ +/** + * SECTION:element-gstrtmp2src + * + * The rtmp2src element receives input streams from an RTMP server. + * + * <refsect2> + * <title>Example launch line</title> + * |[ + * gst-launch -v rtmp2src ! decodebin ! fakesink + * ]| + * FIXME Describe what the pipeline does. + * </refsect2> + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstrtmp2src.h" + +#include "gstrtmp2locationhandler.h" +#include "rtmp/rtmpclient.h" +#include "rtmp/rtmpmessage.h" + +#include <gst/base/gstpushsrc.h> +#include <string.h> + +GST_DEBUG_CATEGORY_STATIC (gst_rtmp2_src_debug_category); +#define GST_CAT_DEFAULT gst_rtmp2_src_debug_category + +/* prototypes */ +#define GST_RTMP2_SRC(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTMP2_SRC,GstRtmp2Src)) +#define GST_IS_RTMP2_SRC(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTMP2_SRC)) + +typedef struct +{ + GstPushSrc parent_instance; + + /* properties */ + GstRtmpLocation location; + gboolean async_connect; + + /* stuff */ + gboolean running, flushing; + GMutex lock; + GCond cond; + + GstTask *task; + GRecMutex task_lock; + + GMainLoop *loop; + GMainContext *context; + + GCancellable *cancellable; + GstRtmpConnection *connection; + guint32 stream_id; + + GstBuffer *message; + gboolean sent_header; + GstClockTime last_ts; +} GstRtmp2Src; + +typedef struct +{ + GstPushSrcClass parent_class; +} GstRtmp2SrcClass; + +/* GObject virtual functions */ +static void gst_rtmp2_src_set_property (GObject * object, + guint property_id, const GValue * value, GParamSpec * pspec); +static void gst_rtmp2_src_get_property (GObject * object, + guint property_id, GValue * value, GParamSpec * pspec); +static void gst_rtmp2_src_finalize (GObject * object); +static void gst_rtmp2_src_uri_handler_init (GstURIHandlerInterface * iface); + +/* GstBaseSrc virtual functions */ +static gboolean gst_rtmp2_src_start (GstBaseSrc * src); +static gboolean gst_rtmp2_src_stop (GstBaseSrc * src); +static gboolean gst_rtmp2_src_unlock (GstBaseSrc * src); +static gboolean gst_rtmp2_src_unlock_stop (GstBaseSrc * src); +static GstFlowReturn gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, + guint size, GstBuffer ** outbuf); + +/* Internal API */ +static void gst_rtmp2_src_task_func (gpointer user_data); +static void client_connect_done (GObject * source, GAsyncResult * result, + gpointer user_data); +static void start_play_done (GObject * object, GAsyncResult * result, + gpointer user_data); +static void connect_task_done (GObject * object, GAsyncResult * result, + gpointer user_data); + +enum +{ + PROP_0, + PROP_LOCATION, + PROP_SCHEME, + PROP_HOST, + PROP_PORT, + PROP_APPLICATION, + PROP_STREAM, + PROP_SECURE_TOKEN, + PROP_USERNAME, + PROP_PASSWORD, + PROP_AUTHMOD, + PROP_TIMEOUT, + PROP_TLS_VALIDATION_FLAGS, + PROP_ASYNC_CONNECT, +}; + +/* pad templates */ + +static GstStaticPadTemplate gst_rtmp2_src_src_template = +GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("video/x-flv") + ); + +/* class initialization */ + +G_DEFINE_TYPE_WITH_CODE (GstRtmp2Src, gst_rtmp2_src, GST_TYPE_PUSH_SRC, + G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, + gst_rtmp2_src_uri_handler_init); + G_IMPLEMENT_INTERFACE (GST_TYPE_RTMP_LOCATION_HANDLER, NULL)); + +static void +gst_rtmp2_src_class_init (GstRtmp2SrcClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstBaseSrcClass *base_src_class = GST_BASE_SRC_CLASS (klass); + + gst_element_class_add_static_pad_template (GST_ELEMENT_CLASS (klass), + &gst_rtmp2_src_src_template); + + gst_element_class_set_static_metadata (GST_ELEMENT_CLASS (klass), + "RTMP source element", "Source", "Source element for RTMP streams", + "Make.TV, Inc. <info@make.tv>"); + + gobject_class->set_property = gst_rtmp2_src_set_property; + gobject_class->get_property = gst_rtmp2_src_get_property; + gobject_class->finalize = gst_rtmp2_src_finalize; + base_src_class->start = GST_DEBUG_FUNCPTR (gst_rtmp2_src_start); + base_src_class->stop = GST_DEBUG_FUNCPTR (gst_rtmp2_src_stop); + base_src_class->unlock = GST_DEBUG_FUNCPTR (gst_rtmp2_src_unlock); + base_src_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_rtmp2_src_unlock_stop); + base_src_class->create = GST_DEBUG_FUNCPTR (gst_rtmp2_src_create); + + g_object_class_override_property (gobject_class, PROP_LOCATION, "location"); + g_object_class_override_property (gobject_class, PROP_SCHEME, "scheme"); + g_object_class_override_property (gobject_class, PROP_HOST, "host"); + g_object_class_override_property (gobject_class, PROP_PORT, "port"); + g_object_class_override_property (gobject_class, PROP_APPLICATION, + "application"); + g_object_class_override_property (gobject_class, PROP_STREAM, "stream"); + g_object_class_override_property (gobject_class, PROP_SECURE_TOKEN, + "secure-token"); + g_object_class_override_property (gobject_class, PROP_USERNAME, "username"); + g_object_class_override_property (gobject_class, PROP_PASSWORD, "password"); + g_object_class_override_property (gobject_class, PROP_AUTHMOD, "authmod"); + g_object_class_override_property (gobject_class, PROP_TIMEOUT, "timeout"); + g_object_class_override_property (gobject_class, PROP_TLS_VALIDATION_FLAGS, + "tls-validation-flags"); + + g_object_class_install_property (gobject_class, PROP_ASYNC_CONNECT, + g_param_spec_boolean ("async-connect", "Async connect", + "Connect on READY, otherwise on first push", TRUE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + GST_DEBUG_CATEGORY_INIT (gst_rtmp2_src_debug_category, "rtmp2src", 0, + "debug category for rtmp2src element"); +} + +static void +gst_rtmp2_src_init (GstRtmp2Src * self) +{ + self->async_connect = TRUE; + + g_mutex_init (&self->lock); + g_cond_init (&self->cond); + + self->task = gst_task_new (gst_rtmp2_src_task_func, self, NULL); + g_rec_mutex_init (&self->task_lock); + gst_task_set_lock (self->task, &self->task_lock); +} + +static void +gst_rtmp2_src_uri_handler_init (GstURIHandlerInterface * iface) +{ + gst_rtmp_location_handler_implement_uri_handler (iface, GST_URI_SRC); +} + +static void +gst_rtmp2_src_set_property (GObject * object, guint property_id, + const GValue * value, GParamSpec * pspec) +{ + GstRtmp2Src *self = GST_RTMP2_SRC (object); + + switch (property_id) { + case PROP_LOCATION: + gst_rtmp_location_handler_set_uri (GST_RTMP_LOCATION_HANDLER (self), + g_value_get_string (value)); + break; + case PROP_SCHEME: + GST_OBJECT_LOCK (self); + self->location.scheme = g_value_get_enum (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_HOST: + GST_OBJECT_LOCK (self); + g_free (self->location.host); + self->location.host = g_value_dup_string (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_PORT: + GST_OBJECT_LOCK (self); + self->location.port = g_value_get_int (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_APPLICATION: + GST_OBJECT_LOCK (self); + g_free (self->location.application); + self->location.application = g_value_dup_string (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_STREAM: + GST_OBJECT_LOCK (self); + g_free (self->location.stream); + self->location.stream = g_value_dup_string (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_SECURE_TOKEN: + GST_OBJECT_LOCK (self); + g_free (self->location.secure_token); + self->location.secure_token = g_value_dup_string (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_USERNAME: + GST_OBJECT_LOCK (self); + g_free (self->location.username); + self->location.username = g_value_dup_string (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_PASSWORD: + GST_OBJECT_LOCK (self); + g_free (self->location.password); + self->location.password = g_value_dup_string (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_AUTHMOD: + GST_OBJECT_LOCK (self); + self->location.authmod = g_value_get_enum (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_TIMEOUT: + GST_OBJECT_LOCK (self); + self->location.timeout = g_value_get_uint (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_TLS_VALIDATION_FLAGS: + GST_OBJECT_LOCK (self); + self->location.tls_flags = g_value_get_flags (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_ASYNC_CONNECT: + GST_OBJECT_LOCK (self); + self->async_connect = g_value_get_boolean (value); + GST_OBJECT_UNLOCK (self); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +static void +gst_rtmp2_src_get_property (GObject * object, guint property_id, + GValue * value, GParamSpec * pspec) +{ + GstRtmp2Src *self = GST_RTMP2_SRC (object); + + switch (property_id) { + case PROP_LOCATION: + GST_OBJECT_LOCK (self); + g_value_take_string (value, gst_rtmp_location_get_string (&self->location, + TRUE)); + GST_OBJECT_UNLOCK (self); + break; + case PROP_SCHEME: + GST_OBJECT_LOCK (self); + g_value_set_enum (value, self->location.scheme); + GST_OBJECT_UNLOCK (self); + break; + case PROP_HOST: + GST_OBJECT_LOCK (self); + g_value_set_string (value, self->location.host); + GST_OBJECT_UNLOCK (self); + break; + case PROP_PORT: + GST_OBJECT_LOCK (self); + g_value_set_int (value, self->location.port); + GST_OBJECT_UNLOCK (self); + break; + case PROP_APPLICATION: + GST_OBJECT_LOCK (self); + g_value_set_string (value, self->location.application); + GST_OBJECT_UNLOCK (self); + break; + case PROP_STREAM: + GST_OBJECT_LOCK (self); + g_value_set_string (value, self->location.stream); + GST_OBJECT_UNLOCK (self); + break; + case PROP_SECURE_TOKEN: + GST_OBJECT_LOCK (self); + g_value_set_string (value, self->location.secure_token); + GST_OBJECT_UNLOCK (self); + break; + case PROP_USERNAME: + GST_OBJECT_LOCK (self); + g_value_set_string (value, self->location.username); + GST_OBJECT_UNLOCK (self); + break; + case PROP_PASSWORD: + GST_OBJECT_LOCK (self); + g_value_set_string (value, self->location.password); + GST_OBJECT_UNLOCK (self); + break; + case PROP_AUTHMOD: + GST_OBJECT_LOCK (self); + g_value_set_enum (value, self->location.authmod); + GST_OBJECT_UNLOCK (self); + break; + case PROP_TIMEOUT: + GST_OBJECT_LOCK (self); + g_value_set_uint (value, self->location.timeout); + GST_OBJECT_UNLOCK (self); + break; + case PROP_TLS_VALIDATION_FLAGS: + GST_OBJECT_LOCK (self); + g_value_set_flags (value, self->location.tls_flags); + GST_OBJECT_UNLOCK (self); + break; + case PROP_ASYNC_CONNECT: + GST_OBJECT_LOCK (self); + g_value_set_boolean (value, self->async_connect); + GST_OBJECT_UNLOCK (self); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +static void +gst_rtmp2_src_finalize (GObject * object) +{ + GstRtmp2Src *self = GST_RTMP2_SRC (object); + + gst_buffer_replace (&self->message, NULL); + + g_clear_object (&self->cancellable); + g_clear_object (&self->connection); + + g_clear_object (&self->task); + g_rec_mutex_clear (&self->task_lock); + + g_mutex_clear (&self->lock); + g_cond_clear (&self->cond); + + gst_rtmp_location_clear (&self->location); + + G_OBJECT_CLASS (gst_rtmp2_src_parent_class)->finalize (object); +} + +static gboolean +gst_rtmp2_src_start (GstBaseSrc * src) +{ + GstRtmp2Src *self = GST_RTMP2_SRC (src); + gboolean async; + + GST_OBJECT_LOCK (self); + async = self->async_connect; + GST_OBJECT_UNLOCK (self); + + GST_INFO_OBJECT (self, "Starting (%s)", async ? "async" : "delayed"); + + g_clear_object (&self->cancellable); + + self->running = TRUE; + self->cancellable = g_cancellable_new (); + self->stream_id = 0; + self->sent_header = FALSE; + self->last_ts = GST_CLOCK_TIME_NONE; + + if (async) { + gst_task_start (self->task); + } + + return TRUE; +} + +static gboolean +quit_invoker (gpointer user_data) +{ + g_main_loop_quit (user_data); + return G_SOURCE_REMOVE; +} + +static void +stop_task (GstRtmp2Src * self) +{ + gst_task_stop (self->task); + self->running = FALSE; + + if (self->cancellable) { + GST_DEBUG_OBJECT (self, "Cancelling"); + g_cancellable_cancel (self->cancellable); + } + + if (self->loop) { + GST_DEBUG_OBJECT (self, "Stopping loop"); + g_main_context_invoke_full (self->context, G_PRIORITY_DEFAULT_IDLE, + quit_invoker, g_main_loop_ref (self->loop), + (GDestroyNotify) g_main_loop_unref); + } + + g_cond_broadcast (&self->cond); +} + +static gboolean +gst_rtmp2_src_stop (GstBaseSrc * src) +{ + GstRtmp2Src *self = GST_RTMP2_SRC (src); + + GST_DEBUG_OBJECT (self, "stop"); + + g_mutex_lock (&self->lock); + stop_task (self); + g_mutex_unlock (&self->lock); + + gst_task_join (self->task); + + return TRUE; +} + +static gboolean +gst_rtmp2_src_unlock (GstBaseSrc * src) +{ + GstRtmp2Src *self = GST_RTMP2_SRC (src); + + GST_DEBUG_OBJECT (self, "unlock"); + + g_mutex_lock (&self->lock); + self->flushing = TRUE; + g_cond_broadcast (&self->cond); + g_mutex_unlock (&self->lock); + + return TRUE; +} + +static gboolean +gst_rtmp2_src_unlock_stop (GstBaseSrc * src) +{ + GstRtmp2Src *self = GST_RTMP2_SRC (src); + + GST_DEBUG_OBJECT (self, "unlock_stop"); + + g_mutex_lock (&self->lock); + self->flushing = FALSE; + g_mutex_unlock (&self->lock); + + return TRUE; +} + +static GstFlowReturn +gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size, + GstBuffer ** outbuf) +{ + GstRtmp2Src *self = GST_RTMP2_SRC (src); + GstBuffer *message, *buffer; + GstRtmpMeta *meta; + guint32 timestamp = 0; + + static const guint8 flv_header_data[] = { + 0x46, 0x4c, 0x56, 0x01, 0x01, 0x00, 0x00, 0x00, + 0x09, 0x00, 0x00, 0x00, 0x00, + }; + + GST_LOG_OBJECT (self, "create"); + + g_mutex_lock (&self->lock); + + if (self->running) { + gst_task_start (self->task); + } + + while (!self->message) { + if (!self->running) { + g_mutex_unlock (&self->lock); + return GST_FLOW_EOS; + } + if (self->flushing) { + g_mutex_unlock (&self->lock); + return GST_FLOW_FLUSHING; + } + g_cond_wait (&self->cond, &self->lock); + } + + message = self->message; + self->message = NULL; + g_cond_signal (&self->cond); + g_mutex_unlock (&self->lock); + + meta = gst_buffer_get_rtmp_meta (message); + if (!meta) { + GST_ELEMENT_ERROR (self, CORE, FAILED, + ("Internal error: No RTMP meta on buffer"), + ("No RTMP meta on %" GST_PTR_FORMAT, message)); + gst_buffer_unref (message); + return GST_FLOW_ERROR; + } + + if (GST_BUFFER_DTS_IS_VALID (message)) { + GstClockTime last_ts = self->last_ts, ts = GST_BUFFER_DTS (message); + + if (GST_CLOCK_TIME_IS_VALID (last_ts) && last_ts > ts) { + GST_LOG_OBJECT (self, "Timestamp regression: %" GST_TIME_FORMAT + " > %" GST_TIME_FORMAT, GST_TIME_ARGS (last_ts), GST_TIME_ARGS (ts)); + } + + self->last_ts = ts; + timestamp = ts / GST_MSECOND; + } + + buffer = gst_buffer_copy_region (message, GST_BUFFER_COPY_MEMORY, 0, -1); + + { + guint8 *tag_header = g_malloc (11); + GstMemory *memory = + gst_memory_new_wrapped (0, tag_header, 11, 0, 11, tag_header, g_free); + GST_WRITE_UINT8 (tag_header, meta->type); + GST_WRITE_UINT24_BE (tag_header + 1, meta->size); + GST_WRITE_UINT24_BE (tag_header + 4, timestamp); + GST_WRITE_UINT8 (tag_header + 7, timestamp >> 24); + GST_WRITE_UINT24_BE (tag_header + 8, 0); + gst_buffer_prepend_memory (buffer, memory); + } + + { + guint8 *tag_footer = g_malloc (4); + GstMemory *memory = + gst_memory_new_wrapped (0, tag_footer, 4, 0, 4, tag_footer, g_free); + GST_WRITE_UINT32_BE (tag_footer, meta->size + 11); + gst_buffer_append_memory (buffer, memory); + } + + if (!self->sent_header) { + GstMemory *memory = gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY, + (guint8 *) flv_header_data, sizeof flv_header_data, 0, + sizeof flv_header_data, NULL, NULL); + gst_buffer_prepend_memory (buffer, memory); + self->sent_header = TRUE; + } + + *outbuf = buffer; + + gst_buffer_unref (message); + return GST_FLOW_OK; +} + +/* Mainloop task */ +static void +gst_rtmp2_src_task_func (gpointer user_data) +{ + GstRtmp2Src *self = GST_RTMP2_SRC (user_data); + GMainContext *context; + GMainLoop *loop; + GTask *connector; + + GST_DEBUG_OBJECT (self, "gst_rtmp2_src_task starting"); + + g_mutex_lock (&self->lock); + context = self->context = g_main_context_new (); + g_main_context_push_thread_default (context); + loop = self->loop = g_main_loop_new (context, TRUE); + connector = g_task_new (self, self->cancellable, connect_task_done, NULL); + GST_OBJECT_LOCK (self); + gst_rtmp_client_connect_async (&self->location, self->cancellable, + client_connect_done, connector); + GST_OBJECT_UNLOCK (self); + g_mutex_unlock (&self->lock); + + g_main_loop_run (loop); + + g_mutex_lock (&self->lock); + g_clear_pointer (&self->loop, g_main_loop_unref); + g_clear_pointer (&self->connection, gst_rtmp_connection_close_and_unref); + g_cond_broadcast (&self->cond); + g_mutex_unlock (&self->lock); + + while (g_main_context_pending (context)) { + GST_DEBUG_OBJECT (self, "iterating main context to clean up"); + g_main_context_iteration (context, FALSE); + } + + g_main_context_pop_thread_default (context); + + g_mutex_lock (&self->lock); + g_clear_pointer (&self->context, g_main_context_unref); + gst_buffer_replace (&self->message, NULL); + g_mutex_unlock (&self->lock); + + GST_DEBUG_OBJECT (self, "gst_rtmp2_src_task exiting"); +} + +static void +client_connect_done (GObject * source, GAsyncResult * result, + gpointer user_data) +{ + GTask *task = user_data; + GstRtmp2Src *self = g_task_get_source_object (task); + GError *error = NULL; + GstRtmpConnection *connection; + + connection = gst_rtmp_client_connect_finish (result, &error); + if (!connection) { + g_task_return_error (task, error); + g_object_unref (task); + return; + } + + g_task_set_task_data (task, connection, g_object_unref); + + if (g_task_return_error_if_cancelled (task)) { + g_object_unref (task); + return; + } + + GST_OBJECT_LOCK (self); + gst_rtmp_client_start_play_async (connection, self->location.stream, + g_task_get_cancellable (task), start_play_done, task); + GST_OBJECT_UNLOCK (self); +} + +static void +start_play_done (GObject * source, GAsyncResult * result, gpointer user_data) +{ + GTask *task = G_TASK (user_data); + GstRtmp2Src *self = g_task_get_source_object (task); + GstRtmpConnection *connection = g_task_get_task_data (task); + GError *error = NULL; + + if (g_task_return_error_if_cancelled (task)) { + g_object_unref (task); + return; + } + + if (gst_rtmp_client_start_play_finish (connection, result, + &self->stream_id, &error)) { + g_task_return_pointer (task, g_object_ref (connection), + gst_rtmp_connection_close_and_unref); + } else { + g_task_return_error (task, error); + } + + g_task_set_task_data (task, NULL, NULL); + g_object_unref (task); +} + +static void +got_message (GstRtmpConnection * connection, GstBuffer * buffer, + gpointer user_data) +{ + GstRtmp2Src *self = GST_RTMP2_SRC (user_data); + GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (buffer); + guint32 min_size = 1; + + g_return_if_fail (meta); + + if (meta->mstream != self->stream_id) { + GST_DEBUG_OBJECT (self, "Ignoring %s message with stream %" G_GUINT32_FORMAT + " != %" G_GUINT32_FORMAT, gst_rtmp_message_type_get_nick (meta->type), + meta->mstream, self->stream_id); + return; + } + + switch (meta->type) { + case GST_RTMP_MESSAGE_TYPE_VIDEO: + min_size = 6; + break; + + case GST_RTMP_MESSAGE_TYPE_AUDIO: + min_size = 2; + break; + + case GST_RTMP_MESSAGE_TYPE_DATA_AMF0: + break; + + default: + GST_DEBUG_OBJECT (self, "Ignoring %s message, wrong type", + gst_rtmp_message_type_get_nick (meta->type)); + return; + } + + if (meta->size < min_size) { + GST_DEBUG_OBJECT (self, "Ignoring too small %s message (%" G_GUINT32_FORMAT + " < %" G_GUINT32_FORMAT ")", + gst_rtmp_message_type_get_nick (meta->type), meta->size, min_size); + return; + } + + g_mutex_lock (&self->lock); + while (self->message) { + if (!self->running) { + goto out; + } + g_cond_wait (&self->cond, &self->lock); + } + + self->message = gst_buffer_ref (buffer); + g_cond_signal (&self->cond); + +out: + g_mutex_unlock (&self->lock); + return; +} + +static void +error_callback (GstRtmpConnection * connection, GstRtmp2Src * self) +{ + g_mutex_lock (&self->lock); + if (self->cancellable) { + g_cancellable_cancel (self->cancellable); + } else if (self->loop) { + GST_INFO_OBJECT (self, "Connection error"); + stop_task (self); + } + g_mutex_unlock (&self->lock); +} + +static void +control_callback (GstRtmpConnection * connection, gint uc_type, + guint stream_id, GstRtmp2Src * self) +{ + GST_INFO_OBJECT (self, "stream %u got %s", stream_id, + gst_rtmp_user_control_type_get_nick (uc_type)); + + if (uc_type == GST_RTMP_USER_CONTROL_TYPE_STREAM_EOF && stream_id == 1) { + GST_INFO_OBJECT (self, "went EOS"); + stop_task (self); + } +} + +static void +send_connect_error (GstRtmp2Src * self, GError * error) +{ + if (!error) { + GST_ERROR_OBJECT (self, "Connect failed with NULL error"); + GST_ELEMENT_ERROR (self, RESOURCE, FAILED, ("Failed to connect"), (NULL)); + return; + } + + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + GST_DEBUG_OBJECT (self, "Connection was cancelled (%s)", + GST_STR_NULL (error->message)); + return; + } + + GST_ERROR_OBJECT (self, "Failed to connect (%s:%d): %s", + g_quark_to_string (error->domain), error->code, + GST_STR_NULL (error->message)); + + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED)) { + GST_ELEMENT_ERROR (self, RESOURCE, NOT_AUTHORIZED, + ("Not authorized to connect"), ("%s", GST_STR_NULL (error->message))); + } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CONNECTION_REFUSED)) { + GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ, + ("Could not connect"), ("%s", GST_STR_NULL (error->message))); + } else { + GST_ELEMENT_ERROR (self, RESOURCE, FAILED, + ("Failed to connect"), + ("error %s:%d: %s", g_quark_to_string (error->domain), error->code, + GST_STR_NULL (error->message))); + } +} + +static void +connect_task_done (GObject * object, GAsyncResult * result, gpointer user_data) +{ + GstRtmp2Src *self = GST_RTMP2_SRC (object); + GTask *task = G_TASK (result); + GError *error = NULL; + + g_mutex_lock (&self->lock); + + g_warn_if_fail (g_task_is_valid (task, object)); + + if (self->cancellable == g_task_get_cancellable (task)) { + g_clear_object (&self->cancellable); + } + + self->connection = g_task_propagate_pointer (task, &error); + if (self->connection) { + gst_rtmp_connection_set_input_handler (self->connection, + got_message, g_object_ref (self), g_object_unref); + g_signal_connect_object (self->connection, "error", + G_CALLBACK (error_callback), self, 0); + g_signal_connect_object (self->connection, "stream-control", + G_CALLBACK (control_callback), self, 0); + } else { + send_connect_error (self, error); + stop_task (self); + g_error_free (error); + } + + g_cond_broadcast (&self->cond); + g_mutex_unlock (&self->lock); +} diff --git a/gst/rtmp2/gstrtmp2src.h b/gst/rtmp2/gstrtmp2src.h new file mode 100644 index 000000000..e2c40a66e --- /dev/null +++ b/gst/rtmp2/gstrtmp2src.h @@ -0,0 +1,34 @@ +/* GStreamer + * Copyright (C) 2014 David Schleef <ds@schleef.org> + * Copyright (C) 2017 Make.TV, Inc. <info@make.tv> + * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef _GST_RTMP2_SRC_H_ + +#define _GST_RTMP2_SRC_H_ + +#include <gst/gst.h> + +G_BEGIN_DECLS + +#define GST_TYPE_RTMP2_SRC (gst_rtmp2_src_get_type()) +GType gst_rtmp2_src_get_type (void); + +G_END_DECLS +#endif diff --git a/gst/rtmp2/meson.build b/gst/rtmp2/meson.build new file mode 100644 index 000000000..c67a24867 --- /dev/null +++ b/gst/rtmp2/meson.build @@ -0,0 +1,24 @@ +rtmp2_sources = [ + 'gstrtmp2.c', + 'gstrtmp2locationhandler.c', + 'gstrtmp2sink.c', + 'gstrtmp2src.c', + 'rtmp/amf.c', + 'rtmp/rtmpchunkstream.c', + 'rtmp/rtmpclient.c', + 'rtmp/rtmpconnection.c', + 'rtmp/rtmphandshake.c', + 'rtmp/rtmpmessage.c', + 'rtmp/rtmputils.c', +] + +gstrtmp2 = library('gstrtmp2', + rtmp2_sources, + c_args : gst_plugins_bad_args, + include_directories : [configinc, libsinc], + dependencies : [gstbase_dep, gio_dep, libm], + install : true, + install_dir : plugins_install_dir, +) +pkgconfig.generate(gstrtmp2, install_dir : plugins_pkgconfig_install_dir) +plugins += [gstrtmp2] diff --git a/gst/rtmp2/rtmp/amf.c b/gst/rtmp2/rtmp/amf.c new file mode 100644 index 000000000..a6f5b3be0 --- /dev/null +++ b/gst/rtmp2/rtmp/amf.c @@ -0,0 +1,1141 @@ +/* GStreamer RTMP Library + * Copyright (C) 2014 David Schleef <ds@schleef.org> + * Copyright (C) 2017 Make.TV, Inc. <info@make.tv> + * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "amf.h" +#include "rtmputils.h" +#include <string.h> +#include <gst/gst.h> + +#define MAX_RECURSION_DEPTH 16 + +GST_DEBUG_CATEGORY_STATIC (gst_rtmp_amf_debug_category); +#define GST_CAT_DEFAULT gst_rtmp_amf_debug_category + +static GBytes *empty_bytes; + +static void +init_static (void) +{ + static volatile gsize done = 0; + if (g_once_init_enter (&done)) { + empty_bytes = g_bytes_new_static ("", 0); + GST_DEBUG_CATEGORY_INIT (gst_rtmp_amf_debug_category, "rtmpamf", 0, + "debug category for the amf parser"); + g_once_init_leave (&done, 1); + } +} + +const gchar * +gst_amf_type_get_nick (GstAmfType type) +{ + switch (type) { + case GST_AMF_TYPE_INVALID: + return "invalid"; + case GST_AMF_TYPE_NUMBER: + return "number"; + case GST_AMF_TYPE_BOOLEAN: + return "boolean"; + case GST_AMF_TYPE_STRING: + return "string"; + case GST_AMF_TYPE_OBJECT: + return "object"; + case GST_AMF_TYPE_MOVIECLIP: + return "movieclip"; + case GST_AMF_TYPE_NULL: + return "null"; + case GST_AMF_TYPE_UNDEFINED: + return "undefined"; + case GST_AMF_TYPE_REFERENCE: + return "reference"; + case GST_AMF_TYPE_ECMA_ARRAY: + return "ecma-array"; + case GST_AMF_TYPE_OBJECT_END: + return "object-end"; + case GST_AMF_TYPE_STRICT_ARRAY: + return "strict-array"; + case GST_AMF_TYPE_DATE: + return "date"; + case GST_AMF_TYPE_LONG_STRING: + return "long-string"; + case GST_AMF_TYPE_UNSUPPORTED: + return "unsupported"; + case GST_AMF_TYPE_RECORDSET: + return "recordset"; + case GST_AMF_TYPE_XML_DOCUMENT: + return "xml-document"; + case GST_AMF_TYPE_TYPED_OBJECT: + return "typed-object"; + case GST_AMF_TYPE_AVMPLUS_OBJECT: + return "avmplus-object"; + default: + return "unknown"; + } +} + +typedef struct +{ + gchar *name; + GstAmfNode *value; +} AmfObjectField; + +static void +amf_object_field_clear (gpointer ptr) +{ + AmfObjectField *field = ptr; + g_clear_pointer (&field->name, g_free); + g_clear_pointer (&field->value, gst_amf_node_free); +} + +struct _GstAmfNode +{ + GstAmfType type; + union + { + gint v_int; + gdouble v_double; + GBytes *v_bytes; + GArray *v_fields; + GPtrArray *v_elements; + } value; +}; + +static inline const AmfObjectField * +get_field (const GstAmfNode * node, guint index) +{ + return &g_array_index (node->value.v_fields, const AmfObjectField, index); +} + +static inline void +append_field (GstAmfNode * node, gchar * name, GstAmfNode * value) +{ + AmfObjectField field = { + .name = name, + .value = value, + }; + g_array_append_val (node->value.v_fields, field); +} + +static inline const GstAmfNode * +get_element (const GstAmfNode * node, guint index) +{ + return g_ptr_array_index (node->value.v_elements, index); +} + +static inline void +append_element (GstAmfNode * node, GstAmfNode * value) +{ + g_ptr_array_add (node->value.v_elements, value); +} + +static GstAmfNode * +node_new (GstAmfType type) +{ + GstAmfNode *node; + + init_static (); + + node = g_slice_alloc0 (sizeof *node); + node->type = type; + + switch (type) { + case GST_AMF_TYPE_STRING: + case GST_AMF_TYPE_LONG_STRING: + node->value.v_bytes = g_bytes_ref (empty_bytes); + break; + + case GST_AMF_TYPE_OBJECT: + case GST_AMF_TYPE_ECMA_ARRAY: + node->value.v_fields = + g_array_new (FALSE, FALSE, sizeof (AmfObjectField)); + g_array_set_clear_func (node->value.v_fields, amf_object_field_clear); + break; + + case GST_AMF_TYPE_STRICT_ARRAY: + node->value.v_elements = + g_ptr_array_new_with_free_func (gst_amf_node_free); + break; + + default: + break; + } + + return node; +} + +GstAmfNode * +gst_amf_node_new_null (void) +{ + return node_new (GST_AMF_TYPE_NULL); +} + +GstAmfNode * +gst_amf_node_new_boolean (gboolean value) +{ + GstAmfNode *node = node_new (GST_AMF_TYPE_BOOLEAN); + node->value.v_int = ! !value; + return node; +} + +GstAmfNode * +gst_amf_node_new_number (gdouble value) +{ + GstAmfNode *node = node_new (GST_AMF_TYPE_NUMBER); + node->value.v_double = value; + return node; +} + +GstAmfNode * +gst_amf_node_new_string (const gchar * value, gssize size) +{ + GstAmfNode *node = node_new (GST_AMF_TYPE_STRING); + gst_amf_node_set_string (node, value, size); + return node; +} + +GstAmfNode * +gst_amf_node_new_take_string (gchar * value, gssize size) +{ + GstAmfNode *node = node_new (GST_AMF_TYPE_STRING); + gst_amf_node_take_string (node, value, size); + return node; +} + +GstAmfNode * +gst_amf_node_new_object (void) +{ + return node_new (GST_AMF_TYPE_OBJECT); +} + +GstAmfNode * +gst_amf_node_copy (const GstAmfNode * node) +{ + GstAmfNode *copy; + + g_return_val_if_fail (node, NULL); + + copy = node_new (node->type); + + switch (node->type) { + case GST_AMF_TYPE_STRING: + case GST_AMF_TYPE_LONG_STRING: + copy->value.v_bytes = g_bytes_ref (node->value.v_bytes); + break; + + case GST_AMF_TYPE_OBJECT: + case GST_AMF_TYPE_ECMA_ARRAY:{ + guint i, len = gst_amf_node_get_num_fields (node); + for (i = 0; i < len; i++) { + const AmfObjectField *field = get_field (node, i); + append_field (copy, g_strdup (field->name), + gst_amf_node_copy (field->value)); + } + break; + } + + case GST_AMF_TYPE_STRICT_ARRAY:{ + guint i, len = gst_amf_node_get_num_elements (node); + for (i = 0; i < len; i++) { + const GstAmfNode *elem = get_element (node, i); + append_element (copy, gst_amf_node_copy (elem)); + } + break; + } + + default: + copy->value = node->value; + break; + } + + return copy; +} + +void +gst_amf_node_free (gpointer ptr) +{ + GstAmfNode *node = ptr; + + switch (node->type) { + case GST_AMF_TYPE_STRING: + case GST_AMF_TYPE_LONG_STRING: + g_bytes_unref (node->value.v_bytes); + break; + + case GST_AMF_TYPE_OBJECT: + case GST_AMF_TYPE_ECMA_ARRAY: + g_array_unref (node->value.v_fields); + break; + + case GST_AMF_TYPE_STRICT_ARRAY: + g_ptr_array_unref (node->value.v_elements); + break; + + default: + break; + } + + g_slice_free (GstAmfNode, node); +} + +GstAmfType +gst_amf_node_get_type (const GstAmfNode * node) +{ + g_return_val_if_fail (node, GST_AMF_TYPE_INVALID); + return node->type; +} + +gboolean +gst_amf_node_get_boolean (const GstAmfNode * node) +{ + g_return_val_if_fail (gst_amf_node_get_type (node) == GST_AMF_TYPE_BOOLEAN, + FALSE); + return node->value.v_int; +} + +gdouble +gst_amf_node_get_number (const GstAmfNode * node) +{ + g_return_val_if_fail (gst_amf_node_get_type (node) == GST_AMF_TYPE_NUMBER, + FALSE); + return node->value.v_double; +} + +gchar * +gst_amf_node_get_string (const GstAmfNode * node, gsize * out_size) +{ + gsize size; + const gchar *data = gst_amf_node_peek_string (node, &size); + + if (out_size) { + *out_size = size; + return g_memdup (data, size); + } else { + return g_strndup (data, size); + } +} + +const gchar * +gst_amf_node_peek_string (const GstAmfNode * node, gsize * size) +{ + GstAmfType type = gst_amf_node_get_type (node); + g_return_val_if_fail (type == GST_AMF_TYPE_STRING || + type == GST_AMF_TYPE_LONG_STRING, FALSE); + return g_bytes_get_data (node->value.v_bytes, size); +} + +const GstAmfNode * +gst_amf_node_get_field (const GstAmfNode * node, const gchar * name) +{ + guint i, len = gst_amf_node_get_num_fields (node); + + g_return_val_if_fail (name, NULL); + + for (i = 0; i < len; i++) { + const AmfObjectField *field = get_field (node, i); + if (strcmp (field->name, name) == 0) { + return field->value; + } + } + + return NULL; +} + +const GstAmfNode * +gst_amf_node_get_field_by_index (const GstAmfNode * node, guint index) +{ + guint len = gst_amf_node_get_num_fields (node); + g_return_val_if_fail (index < len, NULL); + return get_field (node, index)->value; +} + +guint +gst_amf_node_get_num_fields (const GstAmfNode * node) +{ + GstAmfType type = gst_amf_node_get_type (node); + g_return_val_if_fail (type == GST_AMF_TYPE_OBJECT || + type == GST_AMF_TYPE_ECMA_ARRAY, 0); + return node->value.v_fields->len; +} + +const GstAmfNode * +gst_amf_node_get_element (const GstAmfNode * node, guint index) +{ + guint len = gst_amf_node_get_num_elements (node); + g_return_val_if_fail (index < len, NULL); + return get_element (node, index); +} + +guint +gst_amf_node_get_num_elements (const GstAmfNode * node) +{ + GstAmfType type = gst_amf_node_get_type (node); + g_return_val_if_fail (type == GST_AMF_TYPE_STRICT_ARRAY, 0); + return node->value.v_elements->len; +} + +void +gst_amf_node_set_boolean (GstAmfNode * node, gboolean value) +{ + g_return_if_fail (node->type == GST_AMF_TYPE_BOOLEAN); + node->value.v_int = ! !value; +} + +void +gst_amf_node_set_number (GstAmfNode * node, gdouble value) +{ + g_return_if_fail (node->type == GST_AMF_TYPE_NUMBER); + node->value.v_double = value; +} + +void +gst_amf_node_take_string (GstAmfNode * node, gchar * value, gssize size) +{ + g_return_if_fail (node->type == GST_AMF_TYPE_STRING || + node->type == GST_AMF_TYPE_LONG_STRING); + + g_return_if_fail (value); + + if (size < 0) { + size = strlen (value); + } + + if (size > G_MAXUINT32) { + GST_WARNING ("Long string too long (%" G_GSSIZE_FORMAT "), truncating", + size); + size = G_MAXUINT32; + value[size] = 0; + } + + if (size > G_MAXUINT16) { + node->type = GST_AMF_TYPE_LONG_STRING; + } + + g_bytes_unref (node->value.v_bytes); + node->value.v_bytes = g_bytes_new_take (value, size); +} + +void +gst_amf_node_set_string (GstAmfNode * node, const gchar * value, gssize size) +{ + gchar *copy; + + g_return_if_fail (value); + + if (size < 0) { + size = strlen (value); + copy = g_memdup (value, size + 1); + } else { + copy = g_memdup (value, size); + } + + gst_amf_node_take_string (node, copy, size); +} + +void +gst_amf_node_append_field (GstAmfNode * node, const gchar * name, + const GstAmfNode * value) +{ + gst_amf_node_append_take_field (node, name, gst_amf_node_copy (value)); +} + +void +gst_amf_node_append_take_field (GstAmfNode * node, const gchar * name, + GstAmfNode * value) +{ + g_return_if_fail (node->type == GST_AMF_TYPE_OBJECT || + node->type == GST_AMF_TYPE_ECMA_ARRAY); + g_return_if_fail (name); + append_field (node, g_strdup (name), value); +} + +void +gst_amf_node_append_field_number (GstAmfNode * node, const gchar * name, + gdouble value) +{ + gst_amf_node_append_take_field (node, name, gst_amf_node_new_number (value)); +} + +void +gst_amf_node_append_field_boolean (GstAmfNode * node, const gchar * name, + gboolean value) +{ + gst_amf_node_append_take_field (node, name, gst_amf_node_new_boolean (value)); +} + +void +gst_amf_node_append_field_string (GstAmfNode * node, const gchar * name, + const gchar * value, gssize size) +{ + gst_amf_node_append_take_field (node, name, + gst_amf_node_new_string (value, size)); +} + +void +gst_amf_node_append_field_take_string (GstAmfNode * node, const gchar * name, + gchar * value, gssize size) +{ + gst_amf_node_append_take_field (node, name, + gst_amf_node_new_take_string (value, size)); +} + +/* Dumper *******************************************************************/ + +static inline void +dump_indent (GString * string, gint indent, guint depth) +{ + if (indent < 0) { + g_string_append_c (string, ' '); + } else { + guint i; + g_string_append_c (string, '\n'); + for (i = 0; i < indent + depth * 2; i++) { + g_string_append_c (string, ' '); + } + } +} + +static inline void +dump_bytes (GString * string, GBytes * value) +{ + gsize size; + const gchar *data = g_bytes_get_data (value, &size); + gst_rtmp_string_print_escaped (string, data, size); +} + +static void +dump_node (GString * string, const GstAmfNode * node, gint indent, + guint recursion_depth) +{ + const gchar *object_delim = "{}"; + + switch (gst_amf_node_get_type (node)) { + case GST_AMF_TYPE_NUMBER: + g_string_append_printf (string, "%g", node->value.v_double); + break; + + case GST_AMF_TYPE_BOOLEAN: + g_string_append (string, node->value.v_int ? "True" : "False"); + break; + + case GST_AMF_TYPE_LONG_STRING: + g_string_append_c (string, 'L'); + /* no break */ + case GST_AMF_TYPE_STRING: + dump_bytes (string, node->value.v_bytes); + break; + + case GST_AMF_TYPE_ECMA_ARRAY: + object_delim = "[]"; + /* no break */ + case GST_AMF_TYPE_OBJECT:{ + guint i, len = gst_amf_node_get_num_fields (node); + g_string_append_c (string, object_delim[0]); + if (len) { + for (i = 0; i < len; i++) { + const AmfObjectField *field = get_field (node, i); + dump_indent (string, indent, recursion_depth + 1); + gst_rtmp_string_print_escaped (string, field->name, -1); + g_string_append_c (string, ':'); + g_string_append_c (string, ' '); + dump_node (string, field->value, indent, recursion_depth + 1); + if (i < len - 1) { + g_string_append_c (string, ','); + } + } + dump_indent (string, indent, recursion_depth); + } + g_string_append_c (string, object_delim[1]); + break; + } + + case GST_AMF_TYPE_STRICT_ARRAY:{ + guint i, len = gst_amf_node_get_num_elements (node); + g_string_append_c (string, '('); + if (len) { + for (i = 0; i < len; i++) { + const GstAmfNode *value = get_element (node, i); + dump_indent (string, indent, recursion_depth + 1); + dump_node (string, value, indent, recursion_depth + 1); + if (i < len - 1) { + g_string_append_c (string, ','); + } + } + dump_indent (string, indent, recursion_depth); + } + g_string_append_c (string, ')'); + break; + } + + default: + g_string_append (string, gst_amf_type_get_nick (node->type)); + break; + } +} + +void +gst_amf_node_dump (const GstAmfNode * node, gint indent, GString * string) +{ + dump_node (string, node, indent, 0); +} + +static void +dump_argument (const GstAmfNode * node, guint n) +{ + if (G_UNLIKELY (GST_LEVEL_LOG <= _gst_debug_min) && + GST_LEVEL_LOG <= gst_debug_category_get_threshold (GST_CAT_DEFAULT)) { + GString *string = g_string_new (NULL); + gst_amf_node_dump (node, -1, string); + GST_LOG ("Argument #%u: %s", n, string->str); + g_string_free (string, TRUE); + } +} + +/* Parser *******************************************************************/ + +typedef struct +{ + const guint8 *data; + gsize size, offset; + guint8 recursion_depth; +} AmfParser; + +static GstAmfNode *parse_value (AmfParser * parser); + +static inline guint8 +parse_u8 (AmfParser * parser) +{ + guint8 value; + value = parser->data[parser->offset]; + parser->offset += sizeof value; + return value; +} + +static inline guint16 +parse_u16 (AmfParser * parser) +{ + guint16 value; + value = GST_READ_UINT16_BE (parser->data + parser->offset); + parser->offset += sizeof value; + return value; +} + +static inline guint32 +parse_u32 (AmfParser * parser) +{ + guint32 value; + value = GST_READ_UINT32_BE (parser->data + parser->offset); + parser->offset += sizeof value; + return value; +} + +static gdouble +parse_number (AmfParser * parser) +{ + gdouble value; + + if (sizeof value > parser->size - parser->offset) { + GST_ERROR ("number too long"); + return 0.0; + } + + value = GST_READ_DOUBLE_BE (parser->data + parser->offset); + parser->offset += sizeof value; + return value; +} + +static gboolean +parse_boolean (AmfParser * parser) +{ + guint8 value; + + if (sizeof value > parser->size - parser->offset) { + GST_ERROR ("boolean too long"); + return FALSE; + } + + value = parse_u8 (parser); + return ! !value; +} + +static inline GBytes * +read_string (AmfParser * parser, gsize size) +{ + gchar *string; + + if (size == 0) { + return g_bytes_ref (empty_bytes); + } + + if (size > parser->size - parser->offset) { + GST_ERROR ("string too long (%" G_GSIZE_FORMAT ")", size); + return NULL; + } + + /* Null-terminate all incoming strings for internal safety */ + if (parser->data[parser->offset + size - 1] == 0) { + string = g_malloc (size); + } else { + string = g_malloc (size + 1); + string[size] = 0; + } + + memcpy (string, parser->data + parser->offset, size); + + parser->offset += size; + return g_bytes_new_take (string, size); +} + +static GBytes * +parse_string (AmfParser * parser) +{ + guint16 size; + + if (sizeof size > parser->size - parser->offset) { + GST_ERROR ("string size too long"); + return NULL; + } + + size = parse_u16 (parser); + return read_string (parser, size); +} + +static GBytes * +parse_long_string (AmfParser * parser) +{ + guint32 size; + + if (sizeof size > parser->size - parser->offset) { + GST_ERROR ("long string size too long"); + return NULL; + } + + size = parse_u32 (parser); + return read_string (parser, size); +} + +static guint32 +parse_object (AmfParser * parser, GstAmfNode * node) +{ + guint32 n_fields = 0; + + while (TRUE) { + GBytes *name; + gsize size; + GstAmfNode *value; + + name = parse_string (parser); + if (!name) { + GST_ERROR ("object too long"); + break; + } + + value = parse_value (parser); + if (!value) { + GST_ERROR ("object too long"); + g_bytes_unref (name); + break; + } + + if (gst_amf_node_get_type (value) == GST_AMF_TYPE_OBJECT_END) { + g_bytes_unref (name); + gst_amf_node_free (value); + break; + } + + if (g_bytes_get_size (name) == 0) { + GST_ERROR ("empty object field name"); + g_bytes_unref (name); + gst_amf_node_free (value); + break; + } + + append_field (node, g_bytes_unref_to_data (name, &size), value); + n_fields++; + }; + + return n_fields; +} + +static void +parse_ecma_array (AmfParser * parser, GstAmfNode * node) +{ + guint32 n_elements, n_read; + + if (sizeof n_elements > parser->size - parser->offset) { + GST_ERROR ("array size too long"); + return; + } + + n_elements = parse_u32 (parser); + + /* FIXME This is weird. The one time I've seen this, the encoded value + * was 0, but the number of elements was 1. */ + if (n_elements == 0) { + GST_DEBUG ("Interpreting ECMA array length 0 as 1"); + n_elements = 1; + } + + n_read = parse_object (parser, node); + + if (n_read != n_elements) { + GST_WARNING ("Expected array with %" G_GUINT32_FORMAT " elements," + " but read %" G_GUINT32_FORMAT, n_elements, n_read); + } +} + +static void +parse_strict_array (AmfParser * parser, GstAmfNode * node) +{ + GstAmfNode *value = NULL; + guint32 n_elements, i; + + if (sizeof n_elements > parser->size - parser->offset) { + GST_ERROR ("array size too long"); + return; + } + + n_elements = parse_u32 (parser); + + for (i = 0; i < n_elements; i++) { + value = parse_value (parser); + if (!value) { + GST_ERROR ("array too long"); + break; + } + + append_element (node, value); + } +} + +static GstAmfNode * +parse_value (AmfParser * parser) +{ + GstAmfNode *node = NULL; + GstAmfType type; + + if (1 > parser->size - parser->offset) { + GST_ERROR ("value too long"); + return NULL; + } + + type = parse_u8 (parser); + node = node_new (type); + GST_TRACE ("parsing AMF type %d (%s)", type, gst_amf_type_get_nick (type)); + + parser->recursion_depth++; + if (parser->recursion_depth > MAX_RECURSION_DEPTH) { + GST_ERROR ("maximum recursion depth %d reached", parser->recursion_depth); + return node; + } + + switch (type) { + case GST_AMF_TYPE_NUMBER: + node->value.v_double = parse_number (parser); + break; + case GST_AMF_TYPE_BOOLEAN: + node->value.v_int = parse_boolean (parser); + break; + case GST_AMF_TYPE_STRING: + node->value.v_bytes = parse_string (parser); + break; + case GST_AMF_TYPE_LONG_STRING: + node->value.v_bytes = parse_long_string (parser); + break; + case GST_AMF_TYPE_OBJECT: + parse_object (parser, node); + break; + case GST_AMF_TYPE_ECMA_ARRAY: + parse_ecma_array (parser, node); + break; + case GST_AMF_TYPE_STRICT_ARRAY: + parse_strict_array (parser, node); + break; + case GST_AMF_TYPE_NULL: + case GST_AMF_TYPE_UNDEFINED: + case GST_AMF_TYPE_OBJECT_END: + case GST_AMF_TYPE_UNSUPPORTED: + break; + default: + GST_ERROR ("unimplemented AMF type %d (%s)", type, + gst_amf_type_get_nick (type)); + break; + } + + parser->recursion_depth--; + return node; +} + +GPtrArray * +gst_amf_parse_command (const guint8 * data, gsize size, + gdouble * transaction_id, gchar ** command_name) +{ + AmfParser parser = { + .data = data, + .size = size, + .recursion_depth = 0, + }; + GstAmfNode *node1 = NULL, *node2 = NULL; + GPtrArray *args = NULL; + + g_return_val_if_fail (data, NULL); + g_return_val_if_fail (size, NULL); + + init_static (); + + GST_TRACE ("Starting parse with %" G_GSIZE_FORMAT " bytes", parser.size); + + node1 = parse_value (&parser); + if (gst_amf_node_get_type (node1) != GST_AMF_TYPE_STRING) { + GST_ERROR ("no command name"); + goto out; + } + + node2 = parse_value (&parser); + if (gst_amf_node_get_type (node2) != GST_AMF_TYPE_NUMBER) { + GST_ERROR ("no transaction ID"); + goto out; + } + + GST_LOG ("Parsing command '%s', transid %.0f", + gst_amf_node_peek_string (node1, NULL), gst_amf_node_get_number (node2)); + + args = g_ptr_array_new_with_free_func (gst_amf_node_free); + + while (parser.offset < parser.size) { + GstAmfNode *node = parse_value (&parser); + if (!node) { + break; + } + + dump_argument (node, args->len); + g_ptr_array_add (args, node); + } + + GST_TRACE ("Done parsing; consumed %" G_GSIZE_FORMAT " bytes and left %" + G_GSIZE_FORMAT " bytes", parser.offset, parser.size - parser.offset); + + if (args->len == 0) { + GST_ERROR ("no command arguments"); + g_clear_pointer (&args, g_ptr_array_unref); + goto out; + } + + if (command_name) { + *command_name = gst_amf_node_get_string (node1, NULL); + } + + if (transaction_id) { + *transaction_id = gst_amf_node_get_number (node2); + } + +out: + g_clear_pointer (&node1, gst_amf_node_free); + g_clear_pointer (&node2, gst_amf_node_free); + return args; +} + +/* Serializer ***************************************************************/ + +static void serialize_value (GByteArray * array, const GstAmfNode * node); + +static inline void +serialize_u8 (GByteArray * array, guint8 value) +{ + g_byte_array_append (array, (guint8 *) & value, sizeof value); +} + +static inline void +serialize_u16 (GByteArray * array, guint16 value) +{ + value = GUINT16_TO_BE (value); + g_byte_array_append (array, (guint8 *) & value, sizeof value); +} + +static inline void +serialize_u32 (GByteArray * array, guint32 value) +{ + value = GUINT32_TO_BE (value); + g_byte_array_append (array, (guint8 *) & value, sizeof value); +} + +static inline void +serialize_number (GByteArray * array, gdouble value) +{ + value = GDOUBLE_TO_BE (value); + g_byte_array_append (array, (guint8 *) & value, sizeof value); +} + +static inline void +serialize_boolean (GByteArray * array, gboolean value) +{ + serialize_u8 (array, value); +} + +static void +serialize_string (GByteArray * array, const gchar * string, gssize size) +{ + if (size < 0) { + size = strlen (string); + } + + if (size > G_MAXUINT16) { + GST_WARNING ("String too long: %" G_GSSIZE_FORMAT, size); + size = G_MAXUINT16; + } + + serialize_u16 (array, size); + g_byte_array_append (array, (guint8 *) string, size); +} + +static void +serialize_long_string (GByteArray * array, const gchar * string, gssize size) +{ + if (size < 0) { + size = strlen (string); + } + + if (size > G_MAXUINT32) { + GST_WARNING ("Long string too long: %" G_GSSIZE_FORMAT, size); + size = G_MAXUINT32; + } + + serialize_u32 (array, size); + g_byte_array_append (array, (guint8 *) string, size); +} + +static inline void +serialize_bytes (GByteArray * array, GBytes * bytes, gboolean long_string) +{ + gsize size; + const gchar *data = g_bytes_get_data (bytes, &size); + + if (long_string) { + serialize_long_string (array, data, size); + } else { + serialize_string (array, data, size); + } +} + +static void +serialize_object (GByteArray * array, const GstAmfNode * node) +{ + guint i; + + for (i = 0; i < gst_amf_node_get_num_fields (node); i++) { + const AmfObjectField *field = get_field (node, i); + serialize_string (array, field->name, -1); + serialize_value (array, field->value); + } + serialize_u16 (array, 0); + serialize_u8 (array, GST_AMF_TYPE_OBJECT_END); +} + +static void +serialize_ecma_array (GByteArray * array, const GstAmfNode * node) +{ + /* FIXME: Shouldn't this be the field count? */ + serialize_u32 (array, 0); + serialize_object (array, node); +} + +static void +serialize_value (GByteArray * array, const GstAmfNode * node) +{ + serialize_u8 (array, node->type); + switch (node->type) { + case GST_AMF_TYPE_NUMBER: + serialize_number (array, node->value.v_double); + break; + case GST_AMF_TYPE_BOOLEAN: + serialize_boolean (array, node->value.v_int); + break; + case GST_AMF_TYPE_STRING: + serialize_bytes (array, node->value.v_bytes, FALSE); + break; + case GST_AMF_TYPE_LONG_STRING: + serialize_bytes (array, node->value.v_bytes, TRUE); + break; + case GST_AMF_TYPE_OBJECT: + serialize_object (array, node); + break; + case GST_AMF_TYPE_ECMA_ARRAY: + serialize_ecma_array (array, node); + break; + case GST_AMF_TYPE_NULL: + case GST_AMF_TYPE_UNDEFINED: + case GST_AMF_TYPE_OBJECT_END: + case GST_AMF_TYPE_UNSUPPORTED: + break; + default: + GST_ERROR ("unimplemented AMF type %d (%s)", node->type, + gst_amf_type_get_nick (node->type)); + break; + } +} + +GBytes * +gst_amf_serialize_command (gdouble transaction_id, const gchar * command_name, + const GstAmfNode * argument, ...) +{ + va_list ap; + GBytes *ret; + + va_start (ap, argument); + ret = gst_amf_serialize_command_valist (transaction_id, command_name, + argument, ap); + va_end (ap); + + return ret; +} + +GBytes * +gst_amf_serialize_command_valist (gdouble transaction_id, + const gchar * command_name, const GstAmfNode * argument, va_list var_args) +{ + GByteArray *array = g_byte_array_new (); + guint i = 0; + + g_return_val_if_fail (command_name, NULL); + g_return_val_if_fail (argument, NULL); + + GST_LOG ("Serializing command '%s', transid %.0f", command_name, + transaction_id); + + serialize_u8 (array, GST_AMF_TYPE_STRING); + serialize_string (array, command_name, -1); + serialize_u8 (array, GST_AMF_TYPE_NUMBER); + serialize_number (array, transaction_id); + + while (argument) { + serialize_value (array, argument); + dump_argument (argument, i++); + + argument = va_arg (var_args, const GstAmfNode *); + } + + GST_TRACE ("Done serializing; consumed %u args and produced %u bytes", i, + array->len); + + return g_byte_array_free_to_bytes (array); +} diff --git a/gst/rtmp2/rtmp/amf.h b/gst/rtmp2/rtmp/amf.h new file mode 100644 index 000000000..639456e4f --- /dev/null +++ b/gst/rtmp2/rtmp/amf.h @@ -0,0 +1,112 @@ +/* GStreamer RTMP Library + * Copyright (C) 2014 David Schleef <ds@schleef.org> + * Copyright (C) 2017 Make.TV, Inc. <info@make.tv> + * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef _GST_RTMP_AMF_H_ +#define _GST_RTMP_AMF_H_ + +#include <glib.h> + +G_BEGIN_DECLS + +typedef enum +{ + GST_AMF_TYPE_INVALID = -1, + GST_AMF_TYPE_NUMBER = 0, + GST_AMF_TYPE_BOOLEAN = 1, + GST_AMF_TYPE_STRING = 2, + GST_AMF_TYPE_OBJECT = 3, + GST_AMF_TYPE_MOVIECLIP = 4, + GST_AMF_TYPE_NULL = 5, + GST_AMF_TYPE_UNDEFINED = 6, + GST_AMF_TYPE_REFERENCE = 7, + GST_AMF_TYPE_ECMA_ARRAY = 8, + GST_AMF_TYPE_OBJECT_END = 9, + GST_AMF_TYPE_STRICT_ARRAY = 10, + GST_AMF_TYPE_DATE = 11, + GST_AMF_TYPE_LONG_STRING = 12, + GST_AMF_TYPE_UNSUPPORTED = 13, + GST_AMF_TYPE_RECORDSET = 14, + GST_AMF_TYPE_XML_DOCUMENT = 15, + GST_AMF_TYPE_TYPED_OBJECT = 16, + GST_AMF_TYPE_AVMPLUS_OBJECT = 17 +} GstAmfType; + +const gchar * gst_amf_type_get_nick (GstAmfType type); + +typedef struct _GstAmfNode GstAmfNode; + +GstAmfNode * gst_amf_node_new_null (void); +GstAmfNode * gst_amf_node_new_number (gdouble value); +GstAmfNode * gst_amf_node_new_boolean (gboolean value); +GstAmfNode * gst_amf_node_new_string (const gchar * value, gssize size); +GstAmfNode * gst_amf_node_new_take_string (gchar * value, gssize size); +GstAmfNode * gst_amf_node_new_object (void); + +GstAmfNode * gst_amf_node_copy (const GstAmfNode * node); +void gst_amf_node_free (gpointer ptr); + +GstAmfType gst_amf_node_get_type (const GstAmfNode * node); +gdouble gst_amf_node_get_number (const GstAmfNode * node); +gboolean gst_amf_node_get_boolean (const GstAmfNode * node); +gchar * gst_amf_node_get_string (const GstAmfNode * node, gsize * size); +const gchar * gst_amf_node_peek_string (const GstAmfNode * node, gsize * size); + +const GstAmfNode * gst_amf_node_get_field (const GstAmfNode * node, + const gchar * name); +const GstAmfNode * gst_amf_node_get_field_by_index (const GstAmfNode * node, + guint index); +guint gst_amf_node_get_num_fields (const GstAmfNode * node); + +const GstAmfNode * gst_amf_node_get_element (const GstAmfNode * node, + guint index); +guint gst_amf_node_get_num_elements (const GstAmfNode * node); + +void gst_amf_node_set_number (GstAmfNode * node, gdouble value); +void gst_amf_node_set_boolean (GstAmfNode * node, gboolean value); +void gst_amf_node_set_string (GstAmfNode * node, const gchar * value, gssize size); +void gst_amf_node_take_string (GstAmfNode * node, gchar * value, gssize size); + +void gst_amf_node_append_field (GstAmfNode * node, + const gchar * name, const GstAmfNode * value); +void gst_amf_node_append_take_field (GstAmfNode * node, + const gchar * name, GstAmfNode * value); +void gst_amf_node_append_field_number (GstAmfNode * node, + const gchar * name, gdouble value); +void gst_amf_node_append_field_boolean (GstAmfNode * node, + const gchar * name, gboolean value); +void gst_amf_node_append_field_string (GstAmfNode * node, + const gchar * name, const gchar * value, gssize size); +void gst_amf_node_append_field_take_string (GstAmfNode * node, + const gchar * name, gchar * value, gssize size); + +void gst_amf_node_dump (const GstAmfNode * node, gint indent, + GString * string); + +GPtrArray * gst_amf_parse_command (const guint8 * data, gsize size, + gdouble * transaction_id, gchar ** command_name); + +GBytes * gst_amf_serialize_command (gdouble transaction_id, + const gchar * command_name, const GstAmfNode * argument, ...) G_GNUC_NULL_TERMINATED; +GBytes * gst_amf_serialize_command_valist (gdouble transaction_id, + const gchar * command_name, const GstAmfNode * argument, va_list va_args); + +G_END_DECLS +#endif diff --git a/gst/rtmp2/rtmp/rtmpchunkstream.c b/gst/rtmp2/rtmp/rtmpchunkstream.c new file mode 100644 index 000000000..95cb267a5 --- /dev/null +++ b/gst/rtmp2/rtmp/rtmpchunkstream.c @@ -0,0 +1,714 @@ +/* GStreamer RTMP Library + * Copyright (C) 2017 Make.TV, Inc. <info@make.tv> + * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin Street, Suite 500, + * Boston, MA 02110-1335, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "rtmpchunkstream.h" +#include "rtmputils.h" + +GST_DEBUG_CATEGORY_STATIC (gst_rtmp_chunk_stream_debug_category); +#define GST_CAT_DEFAULT gst_rtmp_chunk_stream_debug_category + +static void +init_debug (void) +{ + static volatile gsize done = 0; + if (g_once_init_enter (&done)) { + GST_DEBUG_CATEGORY_INIT (gst_rtmp_chunk_stream_debug_category, + "rtmpchunkstream", 0, "debug category for rtmp chunk streams"); + g_once_init_leave (&done, 1); + } +} + +enum +{ + CHUNK_BYTE_TWOBYTE = 0, + CHUNK_BYTE_THREEBYTE = 1, + CHUNK_BYTE_MASK = 0x3f, + CHUNK_STREAM_MIN_TWOBYTE = 0x40, + CHUNK_STREAM_MIN_THREEBYTE = 0x140, + CHUNK_STREAM_MAX_THREEBYTE = 0x1003f, +}; + +typedef enum +{ + CHUNK_TYPE_0 = 0, + CHUNK_TYPE_1 = 1, + CHUNK_TYPE_2 = 2, + CHUNK_TYPE_3 = 3, +} ChunkType; + +static const gsize chunk_header_sizes[4] = { 11, 7, 3, 0 }; + +struct _GstRtmpChunkStream +{ + GstBuffer *buffer; + GstRtmpMeta *meta; + GstMapInfo map; /* Only used for parsing */ + guint32 id; + guint32 offset; + guint64 bytes; +}; + +struct _GstRtmpChunkStreams +{ + GArray *array; +}; + +static inline gboolean +chunk_stream_is_open (GstRtmpChunkStream * cstream) +{ + return cstream->map.data != NULL; +} + +static void +chunk_stream_take_buffer (GstRtmpChunkStream * cstream, GstBuffer * buffer) +{ + GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (buffer); + g_assert (meta); + g_assert (cstream->buffer == NULL); + cstream->buffer = buffer; + cstream->meta = meta; +} + +static void +chunk_stream_clear (GstRtmpChunkStream * cstream) +{ + if (chunk_stream_is_open (cstream)) { + gst_buffer_unmap (cstream->buffer, &cstream->map); + cstream->map.data = NULL; + } + + gst_buffer_replace (&cstream->buffer, NULL); + cstream->meta = NULL; + cstream->offset = 0; +} + +static guint32 +chunk_stream_next_size (GstRtmpChunkStream * cstream, guint32 chunk_size) +{ + guint32 size, offset; + + size = cstream->meta->size; + offset = cstream->offset; + + g_return_val_if_fail (offset <= size, 0); + return MIN (size - offset, chunk_size); +} + +static inline gboolean +needs_ext_ts (GstRtmpMeta * meta) +{ + return meta->ts_delta >= 0xffffff; +} + + +static guint32 +dts_to_abs_ts (GstBuffer * buffer) +{ + GstClockTime dts = GST_BUFFER_DTS (buffer); + guint32 ret = 0; + + if (GST_CLOCK_TIME_IS_VALID (dts)) { + ret = gst_util_uint64_scale_round (dts, 1, GST_MSECOND); + } + + GST_TRACE ("Converted DTS %" GST_TIME_FORMAT " into abs TS %" + G_GUINT32_FORMAT " ms", GST_TIME_ARGS (dts), ret); + return ret; +} + +static gboolean +dts_diff_to_delta_ts (GstBuffer * old_buffer, GstBuffer * buffer, + guint32 * out_ts) +{ + GstClockTime dts = GST_BUFFER_DTS (buffer), + old_dts = GST_BUFFER_DTS (old_buffer); + guint32 abs_ts, old_abs_ts, delta_32 = 0; + + if (!GST_CLOCK_TIME_IS_VALID (dts) || !GST_CLOCK_TIME_IS_VALID (old_dts)) { + GST_LOG ("Timestamps not valid; using delta TS 0"); + goto out; + } + + if (ABS (GST_CLOCK_DIFF (old_dts, dts)) > GST_MSECOND * G_MAXINT32) { + GST_WARNING ("Timestamp delta too large: %" GST_TIME_FORMAT " -> %" + GST_TIME_FORMAT, GST_TIME_ARGS (old_dts), GST_TIME_ARGS (dts)); + return FALSE; + } + + abs_ts = gst_util_uint64_scale_round (dts, 1, GST_MSECOND); + old_abs_ts = gst_util_uint64_scale_round (old_dts, 1, GST_MSECOND); + + /* underflow wraps around */ + delta_32 = abs_ts - old_abs_ts; + + GST_TRACE ("Converted DTS %" GST_TIME_FORMAT " (%" G_GUINT32_FORMAT + " ms) -> %" GST_TIME_FORMAT " (%" G_GUINT32_FORMAT " ms) into delta TS %" + G_GUINT32_FORMAT " ms", GST_TIME_ARGS (old_dts), old_abs_ts, + GST_TIME_ARGS (dts), abs_ts, delta_32); + +out: + *out_ts = delta_32; + return TRUE; +} + +static ChunkType +select_chunk_type (GstRtmpChunkStream * cstream, GstBuffer * buffer) +{ + GstBuffer *old_buffer = cstream->buffer; + GstRtmpMeta *meta, *old_meta; + + g_return_val_if_fail (buffer, -1); + + meta = gst_buffer_get_rtmp_meta (buffer); + + g_return_val_if_fail (meta, -1); + g_return_val_if_fail (gst_rtmp_message_type_is_valid (meta->type), -1); + + meta->size = gst_buffer_get_size (buffer); + meta->cstream = cstream->id; + + if (!old_buffer) { + GST_TRACE ("Picking header 0: no previous header"); + meta->ts_delta = dts_to_abs_ts (buffer); + return CHUNK_TYPE_0; + } + + old_meta = gst_buffer_get_rtmp_meta (old_buffer); + + if (old_meta->mstream != meta->mstream) { + GST_TRACE ("Picking header 0: stream mismatch; " + "want %" G_GUINT32_FORMAT " got %" G_GUINT32_FORMAT, + old_meta->mstream, meta->mstream); + meta->ts_delta = dts_to_abs_ts (buffer); + return CHUNK_TYPE_0; + } + + if (!dts_diff_to_delta_ts (old_buffer, buffer, &meta->ts_delta)) { + GST_TRACE ("Picking header 0: timestamp delta overflow"); + meta->ts_delta = dts_to_abs_ts (buffer); + return CHUNK_TYPE_0; + } + + /* now at least type 1 */ + + if (old_meta->type != meta->type) { + GST_TRACE ("Picking header 1: type mismatch; want %d got %d", + old_meta->type, meta->type); + return CHUNK_TYPE_1; + } + + if (old_meta->size != meta->size) { + GST_TRACE ("Picking header 1: size mismatch; " + "want %" G_GUINT32_FORMAT " got %" G_GUINT32_FORMAT, + old_meta->size, meta->size); + return CHUNK_TYPE_1; + } + + /* now at least type 2 */ + + if (old_meta->ts_delta != meta->ts_delta) { + GST_TRACE ("Picking header 2: timestamp delta mismatch; " + "want %" G_GUINT32_FORMAT " got %" G_GUINT32_FORMAT, + old_meta->ts_delta, meta->ts_delta); + return CHUNK_TYPE_2; + } + + /* now at least type 3 */ + + GST_TRACE ("Picking header 3"); + return CHUNK_TYPE_3; +} + +static GstBuffer * +serialize_next (GstRtmpChunkStream * cstream, guint32 chunk_size, + ChunkType type) +{ + GstRtmpMeta *meta = cstream->meta; + guint8 small_stream_id; + gsize header_size = chunk_header_sizes[type], offset; + gboolean ext_ts; + GstBuffer *ret; + GstMapInfo map; + + GST_TRACE ("Serializing a chunk of type %d, offset %" G_GUINT32_FORMAT, + type, cstream->offset); + + if (cstream->id < CHUNK_STREAM_MIN_TWOBYTE) { + small_stream_id = cstream->id; + header_size += 1; + } else if (cstream->id < CHUNK_STREAM_MIN_THREEBYTE) { + small_stream_id = CHUNK_BYTE_TWOBYTE; + header_size += 2; + } else { + small_stream_id = CHUNK_BYTE_THREEBYTE; + header_size += 3; + } + + ext_ts = needs_ext_ts (meta); + if (ext_ts) { + header_size += 4; + } + + GST_TRACE ("Allocating buffer, header size %" G_GSIZE_FORMAT, header_size); + + ret = gst_buffer_new_allocate (NULL, header_size, NULL); + if (!ret) { + GST_ERROR ("Failed to allocate chunk buffer"); + return NULL; + } + + if (!gst_buffer_map (ret, &map, GST_MAP_WRITE)) { + GST_ERROR ("Failed to map %" GST_PTR_FORMAT, ret); + gst_buffer_unref (ret); + return NULL; + } + + /* Chunk Basic Header */ + GST_WRITE_UINT8 (map.data, (type << 6) | small_stream_id); + offset = 1; + + switch (small_stream_id) { + case CHUNK_BYTE_TWOBYTE: + GST_WRITE_UINT8 (map.data + 1, cstream->id - CHUNK_STREAM_MIN_TWOBYTE); + offset += 1; + break; + + case CHUNK_BYTE_THREEBYTE: + GST_WRITE_UINT16_LE (map.data + 1, + cstream->id - CHUNK_STREAM_MIN_TWOBYTE); + offset += 2; + break; + } + + switch (type) { + case CHUNK_TYPE_0: + /* SRSLY: "Message stream ID is stored in little-endian format." */ + GST_WRITE_UINT32_LE (map.data + offset + 7, meta->mstream); + /* no break */ + case CHUNK_TYPE_1: + GST_WRITE_UINT24_BE (map.data + offset + 3, meta->size); + GST_WRITE_UINT8 (map.data + offset + 6, meta->type); + /* no break */ + case CHUNK_TYPE_2: + GST_WRITE_UINT24_BE (map.data + offset, + ext_ts ? 0xffffff : meta->ts_delta); + /* no break */ + case CHUNK_TYPE_3: + offset += chunk_header_sizes[type]; + + if (ext_ts) { + GST_WRITE_UINT32_BE (map.data + offset, meta->ts_delta); + offset += 4; + } + } + + g_assert (offset == header_size); + GST_MEMDUMP (">>> chunk header", map.data, offset); + + gst_buffer_unmap (ret, &map); + + GST_BUFFER_OFFSET (ret) = GST_BUFFER_OFFSET_IS_VALID (cstream->buffer) ? + GST_BUFFER_OFFSET (cstream->buffer) + cstream->offset : cstream->bytes; + GST_BUFFER_OFFSET_END (ret) = GST_BUFFER_OFFSET (ret); + + if (meta->size > 0) { + guint32 payload_size = chunk_stream_next_size (cstream, chunk_size); + + GST_TRACE ("Appending %" G_GUINT32_FORMAT " bytes of payload", + payload_size); + + gst_buffer_copy_into (ret, cstream->buffer, GST_BUFFER_COPY_MEMORY, + cstream->offset, payload_size); + + GST_BUFFER_OFFSET_END (ret) += payload_size; + cstream->offset += payload_size; + cstream->bytes += payload_size; + } else { + GST_TRACE ("Chunk has no payload"); + } + + gst_rtmp_buffer_dump (ret, ">>> chunk"); + + return ret; +} + +void +gst_rtmp_chunk_stream_clear (GstRtmpChunkStream * cstream) +{ + g_return_if_fail (cstream); + GST_LOG ("Clearing chunk stream %" G_GUINT32_FORMAT, cstream->id); + chunk_stream_clear (cstream); +} + +guint32 +gst_rtmp_chunk_stream_parse_id (const guint8 * data, gsize size) +{ + guint32 ret; + + if (size < 1) { + GST_TRACE ("Not enough bytes to read ID"); + return 0; + } + + ret = GST_READ_UINT8 (data) & CHUNK_BYTE_MASK; + + switch (ret) { + case CHUNK_BYTE_TWOBYTE: + if (size < 2) { + GST_TRACE ("Not enough bytes to read two-byte ID"); + return 0; + } + + ret = GST_READ_UINT8 (data + 1) + CHUNK_STREAM_MIN_TWOBYTE; + break; + + case CHUNK_BYTE_THREEBYTE: + if (size < 3) { + GST_TRACE ("Not enough bytes to read three-byte ID"); + return 0; + } + + ret = GST_READ_UINT16_LE (data + 1) + CHUNK_STREAM_MIN_TWOBYTE; + break; + } + + GST_TRACE ("Parsed chunk stream ID %" G_GUINT32_FORMAT, ret); + return ret; +} + +guint32 +gst_rtmp_chunk_stream_parse_header (GstRtmpChunkStream * cstream, + const guint8 * data, gsize size) +{ + GstBuffer *buffer; + GstRtmpMeta *meta; + const guint8 *message_header; + guint32 header_size; + ChunkType type; + gboolean has_abs_timestamp = FALSE; + + g_return_val_if_fail (cstream, 0); + g_return_val_if_fail (cstream->id == gst_rtmp_chunk_stream_parse_id (data, + size), 0); + + type = GST_READ_UINT8 (data) >> 6; + GST_TRACE ("Parsing chunk stream %" G_GUINT32_FORMAT " header type %d", + cstream->id, type); + + switch (GST_READ_UINT8 (data) & CHUNK_BYTE_MASK) { + case CHUNK_BYTE_TWOBYTE: + header_size = 2; + break; + case CHUNK_BYTE_THREEBYTE: + header_size = 3; + break; + default: + header_size = 1; + break; + } + + message_header = data + header_size; + header_size += chunk_header_sizes[type]; + + if (cstream->buffer) { + buffer = cstream->buffer; + meta = cstream->meta; + g_assert (meta->cstream == cstream->id); + } else { + buffer = gst_buffer_new (); + GST_BUFFER_DTS (buffer) = 0; + GST_BUFFER_OFFSET (buffer) = cstream->bytes; + GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); + + meta = gst_buffer_add_rtmp_meta (buffer); + meta->cstream = cstream->id; + + chunk_stream_take_buffer (cstream, buffer); + GST_DEBUG ("Starting parse with new %" GST_PTR_FORMAT, buffer); + } + + if (size < header_size) { + GST_TRACE ("not enough bytes to read header"); + return header_size; + } + + switch (type) { + case CHUNK_TYPE_0: + has_abs_timestamp = TRUE; + /* SRSLY: "Message stream ID is stored in little-endian format." */ + meta->mstream = GST_READ_UINT32_LE (message_header + 7); + /* no break */ + case CHUNK_TYPE_1: + meta->type = GST_READ_UINT8 (message_header + 6); + meta->size = GST_READ_UINT24_BE (message_header + 3); + /* no break */ + case CHUNK_TYPE_2: + meta->ts_delta = GST_READ_UINT24_BE (message_header); + /* no break */ + case CHUNK_TYPE_3: + if (needs_ext_ts (meta)) { + guint32 timestamp; + + if (size < header_size + 4) { + GST_TRACE ("not enough bytes to read extended timestamp"); + return header_size + 4; + } + + GST_TRACE ("Reading extended timestamp"); + timestamp = GST_READ_UINT32_BE (data + header_size); + + if (type == 3 && meta->ts_delta != timestamp) { + GST_WARNING ("Type 3 extended timestamp does not match expected" + " timestamp (want %" G_GUINT32_FORMAT " got %" G_GUINT32_FORMAT + "); assuming it's not present", meta->ts_delta, timestamp); + } else { + meta->ts_delta = timestamp; + header_size += 4; + } + } + } + + GST_MEMDUMP ("<<< chunk header", data, header_size); + + if (!chunk_stream_is_open (cstream)) { + GstClockTime dts = GST_BUFFER_DTS (buffer); + guint32 delta_32, abs_32; + gint64 delta_64; + + if (has_abs_timestamp) { + abs_32 = meta->ts_delta; + delta_32 = abs_32 - dts / GST_MSECOND; + } else { + delta_32 = meta->ts_delta; + abs_32 = delta_32 + dts / GST_MSECOND; + } + + GST_TRACE ("Timestamp delta is %" G_GUINT32_FORMAT " (absolute %" + G_GUINT32_FORMAT ")", delta_32, abs_32); + + /* emulate signed overflow */ + delta_64 = delta_32; + if (delta_64 > G_MAXINT32) { + delta_64 -= G_MAXUINT32; + delta_64 -= 1; + } + + delta_64 *= GST_MSECOND; + + if (G_LIKELY (delta_64 >= 0)) { + /* Normal advancement */ + } else if (G_LIKELY ((guint64) (-delta_64) <= dts)) { + /* In-bounds regression */ + GST_WARNING ("Timestamp regression: %" GST_STIME_FORMAT, + GST_STIME_ARGS (delta_64)); + } else { + /* Out-of-bounds regression */ + GST_WARNING ("Timestamp regression: %" GST_STIME_FORMAT ", offsetting", + GST_STIME_ARGS (delta_64)); + delta_64 = delta_32 * GST_MSECOND; + } + + GST_BUFFER_DTS (buffer) += delta_64; + + GST_TRACE ("Adjusted buffer DTS (%" GST_TIME_FORMAT ") by %" + GST_STIME_FORMAT " to %" GST_TIME_FORMAT, GST_TIME_ARGS (dts), + GST_STIME_ARGS (delta_64), GST_TIME_ARGS (GST_BUFFER_DTS (buffer))); + } else { + GST_TRACE ("Message payload already started; not touching timestamp"); + } + + return header_size; +} + +guint32 +gst_rtmp_chunk_stream_parse_payload (GstRtmpChunkStream * cstream, + guint32 chunk_size, guint8 ** data) +{ + GstMemory *mem; + + g_return_val_if_fail (cstream, 0); + g_return_val_if_fail (cstream->buffer, 0); + + if (!chunk_stream_is_open (cstream)) { + guint32 size = cstream->meta->size; + + GST_TRACE ("Allocating buffer, payload size %" G_GUINT32_FORMAT, size); + + mem = gst_allocator_alloc (NULL, size, 0); + if (!mem) { + GST_ERROR ("Failed to allocate buffer for payload size %" + G_GUINT32_FORMAT, size); + return 0; + } + + gst_buffer_append_memory (cstream->buffer, mem); + gst_buffer_map (cstream->buffer, &cstream->map, GST_MAP_WRITE); + } + + g_return_val_if_fail (cstream->map.size == cstream->meta->size, 0); + + if (data) { + *data = cstream->map.data + cstream->offset; + } + + return chunk_stream_next_size (cstream, chunk_size); +} + +guint32 +gst_rtmp_chunk_stream_wrote_payload (GstRtmpChunkStream * cstream, + guint32 chunk_size) +{ + guint32 size; + + g_return_val_if_fail (cstream, FALSE); + g_return_val_if_fail (chunk_stream_is_open (cstream), FALSE); + + size = chunk_stream_next_size (cstream, chunk_size); + cstream->offset += size; + cstream->bytes += size; + + return chunk_stream_next_size (cstream, chunk_size); +} + +GstBuffer * +gst_rtmp_chunk_stream_parse_finish (GstRtmpChunkStream * cstream) +{ + GstBuffer *buffer, *empty; + + g_return_val_if_fail (cstream, NULL); + g_return_val_if_fail (cstream->buffer, NULL); + + buffer = gst_buffer_ref (cstream->buffer); + GST_BUFFER_OFFSET_END (buffer) = cstream->bytes; + + gst_rtmp_buffer_dump (buffer, "<<< message"); + + chunk_stream_clear (cstream); + + empty = gst_buffer_new (); + + if (!gst_buffer_copy_into (empty, buffer, GST_BUFFER_COPY_META, 0, 0)) { + GST_ERROR ("copy_into failed"); + return NULL; + } + + GST_BUFFER_DTS (empty) = GST_BUFFER_DTS (buffer); + GST_BUFFER_OFFSET (empty) = GST_BUFFER_OFFSET_END (buffer); + + chunk_stream_take_buffer (cstream, empty); + + return buffer; +} + +GstBuffer * +gst_rtmp_chunk_stream_serialize_start (GstRtmpChunkStream * cstream, + GstBuffer * buffer, guint32 chunk_size) +{ + ChunkType type; + + g_return_val_if_fail (cstream, NULL); + g_return_val_if_fail (GST_IS_BUFFER (buffer), NULL); + + type = select_chunk_type (cstream, buffer); + g_return_val_if_fail (type >= 0, NULL); + + GST_TRACE ("Starting serialization of message %" GST_PTR_FORMAT + " into stream %" G_GUINT32_FORMAT, buffer, cstream->id); + + gst_rtmp_buffer_dump (buffer, ">>> message"); + + chunk_stream_clear (cstream); + chunk_stream_take_buffer (cstream, buffer); + + return serialize_next (cstream, chunk_size, type); +} + +GstBuffer * +gst_rtmp_chunk_stream_serialize_next (GstRtmpChunkStream * cstream, + guint32 chunk_size) +{ + g_return_val_if_fail (cstream, NULL); + g_return_val_if_fail (cstream->buffer, NULL); + + if (chunk_stream_next_size (cstream, chunk_size) == 0) { + GST_TRACE ("Message serialization finished"); + return NULL; + } + + GST_TRACE ("Continuing serialization of message %" GST_PTR_FORMAT + " into stream %" G_GUINT32_FORMAT, cstream->buffer, cstream->id); + + return serialize_next (cstream, chunk_size, CHUNK_TYPE_3); +} + +GstRtmpChunkStreams * +gst_rtmp_chunk_streams_new (void) +{ + GstRtmpChunkStreams *cstreams; + + init_debug (); + + cstreams = g_slice_new (GstRtmpChunkStreams); + cstreams->array = g_array_new (FALSE, TRUE, sizeof (GstRtmpChunkStream)); + g_array_set_clear_func (cstreams->array, + (GDestroyNotify) gst_rtmp_chunk_stream_clear); + return cstreams; +} + +void +gst_rtmp_chunk_streams_free (gpointer ptr) +{ + GstRtmpChunkStreams *cstreams = ptr; + g_clear_pointer (&cstreams->array, g_array_unref); + g_slice_free (GstRtmpChunkStreams, cstreams); +} + +GstRtmpChunkStream * +gst_rtmp_chunk_streams_get (GstRtmpChunkStreams * cstreams, guint32 id) +{ + GArray *array; + GstRtmpChunkStream *entry; + guint i; + + g_return_val_if_fail (cstreams, NULL); + g_return_val_if_fail (id > CHUNK_BYTE_THREEBYTE, NULL); + g_return_val_if_fail (id <= CHUNK_STREAM_MAX_THREEBYTE, NULL); + + array = cstreams->array; + + for (i = 0; i < array->len; i++) { + entry = &g_array_index (array, GstRtmpChunkStream, i); + if (entry->id == id) { + GST_TRACE ("Obtaining chunk stream %" G_GUINT32_FORMAT, id); + return entry; + } + } + + GST_DEBUG ("Allocating chunk stream %" G_GUINT32_FORMAT, id); + + g_array_set_size (array, i + 1); + entry = &g_array_index (array, GstRtmpChunkStream, i); + entry->id = id; + return entry; +} diff --git a/gst/rtmp2/rtmp/rtmpchunkstream.h b/gst/rtmp2/rtmp/rtmpchunkstream.h new file mode 100644 index 000000000..bcc9b7d4a --- /dev/null +++ b/gst/rtmp2/rtmp/rtmpchunkstream.h @@ -0,0 +1,59 @@ +/* GStreamer RTMP Library + * Copyright (C) 2017 Make.TV, Inc. <info@make.tv> + * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef _GST_RTMP_CHUNK_STREAM_H_ +#define _GST_RTMP_CHUNK_STREAM_H_ + +#include "rtmpmessage.h" + +G_BEGIN_DECLS + +#define GST_RTMP_DEFAULT_CHUNK_SIZE 128 +#define GST_RTMP_CHUNK_STREAM_PROTOCOL 2 + +typedef struct _GstRtmpChunkStream GstRtmpChunkStream; +typedef struct _GstRtmpChunkStreams GstRtmpChunkStreams; + +void gst_rtmp_chunk_stream_clear (GstRtmpChunkStream * cstream); + +guint32 gst_rtmp_chunk_stream_parse_id (const guint8 * data, gsize size); +guint32 gst_rtmp_chunk_stream_parse_header (GstRtmpChunkStream * cstream, + const guint8 * data, gsize size); +guint32 gst_rtmp_chunk_stream_parse_payload (GstRtmpChunkStream * cstream, + guint32 chunk_size, guint8 ** data); +guint32 gst_rtmp_chunk_stream_wrote_payload (GstRtmpChunkStream * cstream, + guint32 chunk_size); +GstBuffer * gst_rtmp_chunk_stream_parse_finish (GstRtmpChunkStream * cstream); + +GstBuffer * gst_rtmp_chunk_stream_serialize_start (GstRtmpChunkStream * cstream, + GstBuffer * buffer, guint32 chunk_size); +GstBuffer * gst_rtmp_chunk_stream_serialize_next (GstRtmpChunkStream * cstream, + guint32 chunk_size); + +GstRtmpChunkStreams * gst_rtmp_chunk_streams_new (void); +void gst_rtmp_chunk_streams_free (gpointer ptr); + +GstRtmpChunkStream * +gst_rtmp_chunk_streams_get (GstRtmpChunkStreams * cstreams, guint32 id); + + +G_END_DECLS + +#endif diff --git a/gst/rtmp2/rtmp/rtmpclient.c b/gst/rtmp2/rtmp/rtmpclient.c new file mode 100644 index 000000000..657482c10 --- /dev/null +++ b/gst/rtmp2/rtmp/rtmpclient.c @@ -0,0 +1,1240 @@ +/* GStreamer RTMP Library + * Copyright (C) 2013 David Schleef <ds@schleef.org> + * Copyright (C) 2017 Make.TV, Inc. <info@make.tv> + * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin Street, Suite 500, + * Boston, MA 02110-1335, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gst/gst.h> +#include <gio/gio.h> +#include <string.h> +#include "rtmpclient.h" +#include "rtmphandshake.h" +#include "rtmpmessage.h" +#include "rtmputils.h" + +GST_DEBUG_CATEGORY_STATIC (gst_rtmp_client_debug_category); +#define GST_CAT_DEFAULT gst_rtmp_client_debug_category + +static void send_connect_done (const gchar * command_name, GPtrArray * args, + gpointer user_data); +static void create_stream_done (const gchar * command_name, GPtrArray * args, + gpointer user_data); +static void on_publish_or_play_status (const gchar * command_name, + GPtrArray * args, gpointer user_data); + +static void +init_debug (void) +{ + static volatile gsize done = 0; + if (g_once_init_enter (&done)) { + GST_DEBUG_CATEGORY_INIT (gst_rtmp_client_debug_category, + "rtmpclient", 0, "debug category for the rtmp client"); + GST_DEBUG_REGISTER_FUNCPTR (send_connect_done); + GST_DEBUG_REGISTER_FUNCPTR (create_stream_done); + GST_DEBUG_REGISTER_FUNCPTR (on_publish_or_play_status); + g_once_init_leave (&done, 1); + } +} + +static const gchar *scheme_strings[] = { + "rtmp", + "rtmps", + NULL +}; + +#define NUM_SCHEMES (G_N_ELEMENTS (scheme_strings) - 1) + +GType +gst_rtmp_scheme_get_type (void) +{ + static volatile gsize scheme_type = 0; + static const GEnumValue scheme[] = { + {GST_RTMP_SCHEME_RTMP, "GST_RTMP_SCHEME_RTMP", "rtmp"}, + {GST_RTMP_SCHEME_RTMPS, "GST_RTMP_SCHEME_RTMPS", "rtmps"}, + {0, NULL, NULL}, + }; + + if (g_once_init_enter (&scheme_type)) { + GType tmp = g_enum_register_static ("GstRtmpScheme", scheme); + g_once_init_leave (&scheme_type, tmp); + } + + return (GType) scheme_type; +} + +GstRtmpScheme +gst_rtmp_scheme_from_string (const gchar * string) +{ + if (string) { + gint value; + + for (value = 0; value < NUM_SCHEMES; value++) { + if (strcmp (scheme_strings[value], string) == 0) { + return value; + } + } + } + + return -1; +} + +GstRtmpScheme +gst_rtmp_scheme_from_uri (const GstUri * uri) +{ + const gchar *scheme = gst_uri_get_scheme (uri); + if (!scheme) { + return GST_RTMP_SCHEME_RTMP; + } + + return gst_rtmp_scheme_from_string (scheme); +} + +const gchar * +gst_rtmp_scheme_to_string (GstRtmpScheme scheme) +{ + if (scheme >= 0 && scheme < NUM_SCHEMES) { + return scheme_strings[scheme]; + } + + return "invalid"; +} + +const gchar *const * +gst_rtmp_scheme_get_strings (void) +{ + return scheme_strings; +} + +guint +gst_rtmp_scheme_get_default_port (GstRtmpScheme scheme) +{ + switch (scheme) { + case GST_RTMP_SCHEME_RTMP: + return 1935; + + case GST_RTMP_SCHEME_RTMPS: + return 443; + + default: + g_return_val_if_reached (0); + } +} + +GType +gst_rtmp_authmod_get_type (void) +{ + static volatile gsize authmod_type = 0; + static const GEnumValue authmod[] = { + {GST_RTMP_AUTHMOD_NONE, "GST_RTMP_AUTHMOD_NONE", "none"}, + {GST_RTMP_AUTHMOD_AUTO, "GST_RTMP_AUTHMOD_AUTO", "auto"}, + {GST_RTMP_AUTHMOD_ADOBE, "GST_RTMP_AUTHMOD_ADOBE", "adobe"}, + {0, NULL, NULL}, + }; + + if (g_once_init_enter (&authmod_type)) { + GType tmp = g_enum_register_static ("GstRtmpAuthmod", authmod); + g_once_init_leave (&authmod_type, tmp); + } + + return (GType) authmod_type; +} + +static const gchar * +gst_rtmp_authmod_get_nick (GstRtmpAuthmod value) +{ + GEnumClass *klass = g_type_class_peek (GST_TYPE_RTMP_AUTHMOD); + GEnumValue *ev = klass ? g_enum_get_value (klass, value) : NULL; + return ev ? ev->value_nick : "(unknown)"; +} + +void +gst_rtmp_location_copy (GstRtmpLocation * dest, const GstRtmpLocation * src) +{ + g_return_if_fail (dest); + g_return_if_fail (src); + + dest->scheme = src->scheme; + dest->host = g_strdup (src->host); + dest->port = src->port; + dest->application = g_strdup (src->application); + dest->stream = g_strdup (src->stream); + dest->username = g_strdup (src->username); + dest->password = g_strdup (src->password); + dest->secure_token = g_strdup (src->secure_token); + dest->authmod = src->authmod; + dest->timeout = src->timeout; + dest->tls_flags = src->tls_flags; + dest->flash_ver = g_strdup (src->flash_ver); +} + +void +gst_rtmp_location_clear (GstRtmpLocation * location) +{ + g_return_if_fail (location); + + g_clear_pointer (&location->host, g_free); + location->port = 0; + g_clear_pointer (&location->application, g_free); + g_clear_pointer (&location->stream, g_free); + g_clear_pointer (&location->username, g_free); + g_clear_pointer (&location->password, g_free); + g_clear_pointer (&location->secure_token, g_free); + g_clear_pointer (&location->flash_ver, g_free); +} + +gchar * +gst_rtmp_location_get_string (const GstRtmpLocation * location, + gboolean with_stream) +{ + GstUri *uri; + gchar *base, *string; + const gchar *scheme_string; + guint default_port; + + g_return_val_if_fail (location, NULL); + + scheme_string = gst_rtmp_scheme_to_string (location->scheme); + default_port = gst_rtmp_scheme_get_default_port (location->scheme); + + uri = gst_uri_new (scheme_string, NULL, location->host, + location->port == default_port ? GST_URI_NO_PORT : location->port, "/", + NULL, NULL); + base = gst_uri_to_string (uri); + + string = g_strconcat (base, location->application, with_stream ? "/" : NULL, + location->stream, NULL); + + g_free (base); + gst_uri_unref (uri); + + return string; +} + +static void socket_connect (GTask * task); +static void socket_connect_done (GObject * source, GAsyncResult * result, + gpointer user_data); +static void handshake_done (GObject * source, GAsyncResult * result, + gpointer user_data); +static void send_connect (GTask * task); +static void send_secure_token_response (GTask * task, + GstRtmpConnection * connection, const gchar * challenge); +static void connection_error (GstRtmpConnection * connection, + gpointer user_data); + +#define DEFAULT_TIMEOUT 5 + +typedef struct +{ + GstRtmpLocation location; + gchar *auth_query; + GstRtmpConnection *connection; + gulong error_handler_id; +} ConnectTaskData; + +static ConnectTaskData * +connect_task_data_new (const GstRtmpLocation * location) +{ + ConnectTaskData *data = g_slice_new0 (ConnectTaskData); + gst_rtmp_location_copy (&data->location, location); + return data; +} + +static void +connect_task_data_free (gpointer ptr) +{ + ConnectTaskData *data = ptr; + gst_rtmp_location_clear (&data->location); + g_clear_pointer (&data->auth_query, g_free); + if (data->error_handler_id) { + g_signal_handler_disconnect (data->connection, data->error_handler_id); + } + g_clear_object (&data->connection); + g_slice_free (ConnectTaskData, data); +} + +static GRegex *auth_regex = NULL; + +void +gst_rtmp_client_connect_async (const GstRtmpLocation * location, + GCancellable * cancellable, GAsyncReadyCallback callback, + gpointer user_data) +{ + GTask *task; + + init_debug (); + + if (g_once_init_enter (&auth_regex)) { + GRegex *re = g_regex_new ("\\[ *AccessManager.Reject *\\] *: *" + "\\[ *authmod=(?<authmod>.*?) *\\] *: *" + "(?<query>\\?.*)\\Z", G_REGEX_DOTALL, 0, NULL); + g_once_init_leave (&auth_regex, re); + } + + task = g_task_new (NULL, cancellable, callback, user_data); + + g_task_set_task_data (task, connect_task_data_new (location), + connect_task_data_free); + + socket_connect (task); +} + +static void +socket_connect (GTask * task) +{ + ConnectTaskData *data = g_task_get_task_data (task); + GSocketConnectable *addr; + GSocketClient *socket_client; + + if (data->location.timeout < 0) { + data->location.timeout = DEFAULT_TIMEOUT; + } + + if (data->error_handler_id) { + g_signal_handler_disconnect (data->connection, data->error_handler_id); + data->error_handler_id = 0; + } + + if (data->connection) { + gst_rtmp_connection_close (data->connection); + g_clear_object (&data->connection); + } + + if (!data->location.host) { + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED, + "Host is not set"); + g_object_unref (task); + return; + } + + if (!data->location.port) { + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED, + "Port is not set"); + g_object_unref (task); + return; + } + + socket_client = g_socket_client_new (); + g_socket_client_set_timeout (socket_client, data->location.timeout); + + switch (data->location.scheme) { + case GST_RTMP_SCHEME_RTMP: + break; + + case GST_RTMP_SCHEME_RTMPS: + GST_DEBUG ("Configuring TLS, validation flags 0x%02x", + data->location.tls_flags); + g_socket_client_set_tls (socket_client, TRUE); + g_socket_client_set_tls_validation_flags (socket_client, + data->location.tls_flags); + break; + + default: + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, + "Invalid scheme ID %d", data->location.scheme); + g_object_unref (socket_client); + g_object_unref (task); + return; + } + + addr = g_network_address_new (data->location.host, data->location.port); + + GST_DEBUG ("Starting socket connection"); + + g_socket_client_connect_async (socket_client, addr, + g_task_get_cancellable (task), socket_connect_done, task); + g_object_unref (addr); + g_object_unref (socket_client); +} + +static void +socket_connect_done (GObject * source, GAsyncResult * result, + gpointer user_data) +{ + GSocketClient *socket_client = G_SOCKET_CLIENT (source); + GSocketConnection *socket_connection; + GTask *task = user_data; + GError *error = NULL; + + socket_connection = + g_socket_client_connect_finish (socket_client, result, &error); + + if (g_task_return_error_if_cancelled (task)) { + GST_DEBUG ("Socket connection was cancelled"); + g_object_unref (task); + return; + } + + if (socket_connection == NULL) { + GST_ERROR ("Socket connection error"); + g_task_return_error (task, error); + g_object_unref (task); + return; + } + + GST_DEBUG ("Socket connection established"); + + gst_rtmp_client_handshake (G_IO_STREAM (socket_connection), FALSE, + g_task_get_cancellable (task), handshake_done, task); + g_object_unref (socket_connection); +} + + +static void +handshake_done (GObject * source, GAsyncResult * result, gpointer user_data) +{ + GIOStream *stream = G_IO_STREAM (source); + GSocketConnection *socket_connection = G_SOCKET_CONNECTION (stream); + GTask *task = user_data; + ConnectTaskData *data = g_task_get_task_data (task); + GError *error = NULL; + gboolean res; + + res = gst_rtmp_client_handshake_finish (stream, result, &error); + if (!res) { + g_io_stream_close_async (stream, G_PRIORITY_DEFAULT, NULL, NULL, NULL); + g_task_return_error (task, error); + g_object_unref (task); + return; + } + + data->connection = gst_rtmp_connection_new (socket_connection); + data->error_handler_id = g_signal_connect (data->connection, + "error", G_CALLBACK (connection_error), task); + + send_connect (task); +} + +static void +connection_error (GstRtmpConnection * connection, gpointer user_data) +{ + GTask *task = user_data; + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED, + "error during connection attempt"); +} + +static gchar * +do_adobe_auth (const gchar * username, const gchar * password, + const gchar * salt, const gchar * opaque, const gchar * challenge) +{ + guint8 hash[16]; /* MD5 digest */ + gsize hashlen = sizeof hash; + gchar *challenge2, *auth_query; + GChecksum *md5; + + g_return_val_if_fail (username, NULL); + g_return_val_if_fail (password, NULL); + g_return_val_if_fail (salt, NULL); + + md5 = g_checksum_new (G_CHECKSUM_MD5); + g_checksum_update (md5, (guchar *) username, -1); + g_checksum_update (md5, (guchar *) salt, -1); + g_checksum_update (md5, (guchar *) password, -1); + + g_checksum_get_digest (md5, hash, &hashlen); + g_warn_if_fail (hashlen == sizeof hash); + + { + gchar *hashstr = g_base64_encode ((guchar *) hash, sizeof hash); + g_checksum_reset (md5); + g_checksum_update (md5, (guchar *) hashstr, -1); + g_free (hashstr); + } + + if (opaque) + g_checksum_update (md5, (guchar *) opaque, -1); + else if (challenge) + g_checksum_update (md5, (guchar *) challenge, -1); + + challenge2 = g_strdup_printf ("%08x", g_random_int ()); + g_checksum_update (md5, (guchar *) challenge2, -1); + + g_checksum_get_digest (md5, hash, &hashlen); + g_warn_if_fail (hashlen == sizeof hash); + + { + gchar *hashstr = g_base64_encode ((guchar *) hash, sizeof hash); + + if (opaque) { + auth_query = + g_strdup_printf + ("authmod=%s&user=%s&challenge=%s&response=%s&opaque=%s", "adobe", + username, challenge2, hashstr, opaque); + } else { + auth_query = + g_strdup_printf ("authmod=%s&user=%s&challenge=%s&response=%s", + "adobe", username, challenge2, hashstr); + } + g_free (hashstr); + } + + g_checksum_free (md5); + g_free (challenge2); + + return auth_query; +} + +static void +send_connect (GTask * task) +{ + ConnectTaskData *data = g_task_get_task_data (task); + GstAmfNode *node; + const gchar *app, *flash_ver; + gchar *uri, *appstr = NULL, *uristr = NULL; + + node = gst_amf_node_new_object (); + app = data->location.application; + flash_ver = data->location.flash_ver; + uri = gst_rtmp_location_get_string (&data->location, FALSE); + + if (!app) { + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED, + "Application is not set"); + g_object_unref (task); + goto out; + } + + if (!flash_ver) { + flash_ver = "LNX 10,0,32,18"; + } + + if (data->auth_query) { + const gchar *query = data->auth_query; + appstr = g_strdup_printf ("%s?%s", app, query); + uristr = g_strdup_printf ("%s?%s", uri, query); + } else if (data->location.authmod == GST_RTMP_AUTHMOD_ADOBE) { + const gchar *user = data->location.username; + const gchar *authmod = "adobe"; + + if (!user) { + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED, + "no username for adobe authentication"); + g_object_unref (task); + goto out; + } + + if (!data->location.password) { + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED, + "no password for adobe authentication"); + g_object_unref (task); + goto out; + } + + appstr = g_strdup_printf ("%s?authmod=%s&user=%s", app, authmod, user); + uristr = g_strdup_printf ("%s?authmod=%s&user=%s", uri, authmod, user); + } else { + appstr = g_strdup (app); + uristr = g_strdup (uri); + } + + gst_amf_node_append_field_take_string (node, "app", appstr, -1); + gst_amf_node_append_field_take_string (node, "tcUrl", uristr, -1); + gst_amf_node_append_field_string (node, "type", "nonprivate", -1); + gst_amf_node_append_field_string (node, "flashVer", flash_ver, -1); + + gst_rtmp_connection_send_command (data->connection, send_connect_done, + task, 0, "connect", node, NULL); + +out: + gst_amf_node_free (node); + g_free (uri); +} + +static void +send_connect_done (const gchar * command_name, GPtrArray * args, + gpointer user_data) +{ + GTask *task = G_TASK (user_data); + ConnectTaskData *data = g_task_get_task_data (task); + const GstAmfNode *node, *optional_args; + const gchar *code; + + if (g_task_return_error_if_cancelled (task)) { + g_object_unref (task); + return; + } + + if (!args) { + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED, + "connect failed: %s", command_name); + g_object_unref (task); + return; + } + + if (args->len < 2) { + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED, + "connect failed; not enough return arguments"); + g_object_unref (task); + return; + } + + optional_args = g_ptr_array_index (args, 1); + + node = gst_amf_node_get_field (optional_args, "code"); + if (!node) { + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED, + "result code missing from connect cmd result"); + g_object_unref (task); + return; + } + + code = gst_amf_node_peek_string (node, NULL); + GST_INFO ("connect result: %s", GST_STR_NULL (code)); + + if (g_str_equal (code, "NetConnection.Connect.Success")) { + node = gst_amf_node_get_field (optional_args, "secureToken"); + send_secure_token_response (task, data->connection, + node ? gst_amf_node_peek_string (node, NULL) : NULL); + return; + } + + if (g_str_equal (code, "NetConnection.Connect.Rejected")) { + GstRtmpAuthmod authmod = data->location.authmod; + GMatchInfo *match_info; + const gchar *desc; + GstUri *query; + + node = gst_amf_node_get_field (optional_args, "description"); + if (!node) { + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED, + "Connect rejected; no description"); + g_object_unref (task); + return; + } + + desc = gst_amf_node_peek_string (node, NULL); + GST_DEBUG ("connect result desc: %s", GST_STR_NULL (desc)); + + if (authmod == GST_RTMP_AUTHMOD_AUTO && strstr (desc, "code=403 need auth")) { + if (strstr (desc, "authmod=adobe")) { + GST_INFO ("Reconnecting with authmod=adobe"); + data->location.authmod = GST_RTMP_AUTHMOD_ADOBE; + socket_connect (task); + return; + } + + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED, + "unhandled authentication mode: %s", desc); + g_object_unref (task); + return; + } + + if (!g_regex_match (auth_regex, desc, 0, &match_info)) { + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED, + "failed to parse auth rejection: %s", desc); + g_object_unref (task); + return; + } + + { + gchar *authmod_str = g_match_info_fetch_named (match_info, "authmod"); + gchar *query_str = g_match_info_fetch_named (match_info, "query"); + gboolean matches; + + GST_INFO ("regex parsed auth: authmod=%s, query=%s", + GST_STR_NULL (authmod_str), GST_STR_NULL (query_str)); + g_match_info_free (match_info); + + switch (authmod) { + case GST_RTMP_AUTHMOD_ADOBE: + matches = g_str_equal (authmod_str, "adobe"); + break; + + default: + matches = FALSE; + break; + } + + if (!matches) { + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED, + "server uses wrong authentication mode '%s'; expected %s", + GST_STR_NULL (authmod_str), gst_rtmp_authmod_get_nick (authmod)); + g_object_unref (task); + g_free (authmod_str); + g_free (query_str); + return; + } + g_free (authmod_str); + + query = gst_uri_from_string (query_str); + if (!query) { + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED, + "failed to parse authentication query '%s'", + GST_STR_NULL (query_str)); + g_object_unref (task); + g_free (query_str); + return; + } + g_free (query_str); + } + + { + const gchar *reason = gst_uri_get_query_value (query, "reason"); + + if (g_str_equal (reason, "authfailed")) { + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED, + "authentication failed! wrong credentials?"); + g_object_unref (task); + gst_uri_unref (query); + return; + } + + if (!g_str_equal (reason, "needauth")) { + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED, + "unhandled rejection reason '%s'", reason ? reason : ""); + g_object_unref (task); + gst_uri_unref (query); + return; + } + } + + g_warn_if_fail (!data->auth_query); + data->auth_query = do_adobe_auth (data->location.username, + data->location.password, gst_uri_get_query_value (query, "salt"), + gst_uri_get_query_value (query, "opaque"), + gst_uri_get_query_value (query, "challenge")); + + gst_uri_unref (query); + + if (!data->auth_query) { + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED, + "couldn't generate adobe style authentication query"); + g_object_unref (task); + return; + } + + socket_connect (task); + return; + } + + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED, + "unhandled connect result code: %s", GST_STR_NULL (code)); + g_object_unref (task); +} + +/* prep key: pack 1st 16 chars into 4 LittleEndian ints */ +static void +rtmp_tea_decode_prep_key (const gchar * key, guint32 out[4]) +{ + gchar copy[17]; + + g_return_if_fail (key); + g_return_if_fail (out); + + /* ensure we can read 16 bytes */ + strncpy (copy, key, 16); + /* placate GCC 8 -Wstringop-truncation */ + copy[16] = 0; + + out[0] = GST_READ_UINT32_LE (copy); + out[1] = GST_READ_UINT32_LE (copy + 4); + out[2] = GST_READ_UINT32_LE (copy + 8); + out[3] = GST_READ_UINT32_LE (copy + 12); +} + +/* prep text: hex2bin, each 8 digits -> 4 chars -> 1 uint32 */ +static GArray * +rtmp_tea_decode_prep_text (const gchar * text) +{ + GArray *arr; + gsize len, i; + + g_return_val_if_fail (text, NULL); + + len = strlen (text); + arr = g_array_sized_new (TRUE, TRUE, 4, (len + 7) / 8); + + for (i = 0; i < len; i += 8) { + gchar copy[9]; + guchar chars[4]; + gsize j; + guint32 val; + + /* ensure we can read 8 bytes */ + strncpy (copy, text + i, 8); + /* placate GCC 8 -Wstringop-truncation */ + copy[8] = 0; + + for (j = 0; j < 4; j++) { + gint hi, lo; + + hi = g_ascii_xdigit_value (copy[2 * j]); + lo = g_ascii_xdigit_value (copy[2 * j + 1]); + + chars[j] = (hi > 0 ? hi << 4 : 0) + (lo > 0 ? lo : 0); + } + + val = GST_READ_UINT32_LE (chars); + g_array_append_val (arr, val); + } + + return arr; +} + +/* return text from uint32s to chars */ +static gchar * +rtmp_tea_decode_return_text (GArray * arr) +{ +#if G_BYTE_ORDER != G_LITTLE_ENDIAN + gsize i; + + g_return_val_if_fail (arr, NULL); + + for (i = 0; i < arr->len; i++) { + guint32 *val = &g_array_index (arr, guint32, i); + *val = GUINT32_TO_LE (*val); + } +#endif + + /* array is alredy zero-terminated */ + return g_array_free (arr, FALSE); +} + +/* http://www.movable-type.co.uk/scripts/tea-block.html */ +static void +rtmp_tea_decode_btea (GArray * text, guint32 key[4]) +{ + guint32 *v, n, *k; + guint32 z, y, sum = 0, e, DELTA = 0x9e3779b9; + guint32 p, q; + + g_return_if_fail (text); + g_return_if_fail (text->len > 0); + g_return_if_fail (key); + + v = (guint32 *) text->data; + n = text->len; + k = key; + z = v[n - 1]; + y = v[0]; + q = 6 + 52 / n; + sum = q * DELTA; + +#define MX ((z>>5^y<<2) + (y>>3^z<<4)) ^ ((sum^y) + (k[(p&3)^e]^z)); + + while (sum != 0) { + e = sum >> 2 & 3; + for (p = n - 1; p > 0; p--) + z = v[p - 1], y = v[p] -= MX; + z = v[n - 1]; + y = v[0] -= MX; + sum -= DELTA; + } + +#undef MX +} + +/* taken from librtmp */ +static gchar * +rtmp_tea_decode (const gchar * bin_key, const gchar * hex_text) +{ + guint32 key[4]; + GArray *text; + + rtmp_tea_decode_prep_key (bin_key, key); + text = rtmp_tea_decode_prep_text (hex_text); + rtmp_tea_decode_btea (text, key); + return rtmp_tea_decode_return_text (text); +} + +static void +send_secure_token_response (GTask * task, GstRtmpConnection * connection, + const gchar * challenge) +{ + ConnectTaskData *data = g_task_get_task_data (task); + if (challenge) { + GstAmfNode *node1; + GstAmfNode *node2; + gchar *response; + + if (!data->location.secure_token || !data->location.secure_token[0]) { + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED, + "server requires secure token authentication"); + g_object_unref (task); + return; + } + + response = rtmp_tea_decode (data->location.secure_token, challenge); + + GST_DEBUG ("response: %s", response); + + node1 = gst_amf_node_new_null (); + node2 = gst_amf_node_new_take_string (response, -1); + gst_rtmp_connection_send_command (connection, NULL, NULL, 0, + "secureTokenResponse", node1, node2, NULL); + gst_amf_node_free (node1); + gst_amf_node_free (node2); + } + + g_signal_handler_disconnect (connection, data->error_handler_id); + data->error_handler_id = 0; + + g_task_return_pointer (task, g_object_ref (connection), + gst_rtmp_connection_close_and_unref); + g_object_unref (task); +} + +GstRtmpConnection * +gst_rtmp_client_connect_finish (GAsyncResult * result, GError ** error) +{ + GTask *task = G_TASK (result); + return g_task_propagate_pointer (task, error); +} + +static void send_create_stream (GTask * task); +static void send_publish_or_play (GTask * task); + +typedef struct +{ + GstRtmpConnection *connection; + gulong error_handler_id; + gchar *stream; + gboolean publish; + guint32 id; +} StreamTaskData; + +static StreamTaskData * +stream_task_data_new (GstRtmpConnection * connection, const gchar * stream, + gboolean publish) +{ + StreamTaskData *data = g_slice_new0 (StreamTaskData); + data->connection = g_object_ref (connection); + data->stream = g_strdup (stream); + data->publish = publish; + return data; +} + +static void +stream_task_data_free (gpointer ptr) +{ + StreamTaskData *data = ptr; + g_clear_pointer (&data->stream, g_free); + if (data->error_handler_id) { + g_signal_handler_disconnect (data->connection, data->error_handler_id); + } + g_clear_object (&data->connection); + g_slice_free (StreamTaskData, data); +} + +static void +start_stream (GstRtmpConnection * connection, const gchar * stream, + gboolean publish, GCancellable * cancellable, + GAsyncReadyCallback callback, gpointer user_data) +{ + GTask *task; + StreamTaskData *data; + + init_debug (); + + task = g_task_new (connection, cancellable, callback, user_data); + + if (!stream) { + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED, + "Stream is not set"); + g_object_unref (task); + return; + } + + data = stream_task_data_new (connection, stream, publish); + g_task_set_task_data (task, data, stream_task_data_free); + + data->error_handler_id = g_signal_connect (connection, + "error", G_CALLBACK (connection_error), task); + + send_create_stream (task); +} + +void +gst_rtmp_client_start_publish_async (GstRtmpConnection * connection, + const gchar * stream, GCancellable * cancellable, + GAsyncReadyCallback callback, gpointer user_data) +{ + start_stream (connection, stream, TRUE, cancellable, callback, user_data); +} + +void +gst_rtmp_client_start_play_async (GstRtmpConnection * connection, + const gchar * stream, GCancellable * cancellable, + GAsyncReadyCallback callback, gpointer user_data) +{ + start_stream (connection, stream, FALSE, cancellable, callback, user_data); +} + +static void +send_set_buffer_length (GstRtmpConnection * connection, guint32 stream, + guint32 ms) +{ + GstRtmpUserControl uc = { + .type = GST_RTMP_USER_CONTROL_TYPE_SET_BUFFER_LENGTH, + .param = stream, + .param2 = ms, + }; + + gst_rtmp_connection_queue_message (connection, + gst_rtmp_message_new_user_control (&uc)); +} + +static void +send_create_stream (GTask * task) +{ + GstRtmpConnection *connection = g_task_get_source_object (task); + StreamTaskData *data = g_task_get_task_data (task); + GstAmfNode *command_object, *stream_name; + + command_object = gst_amf_node_new_null (); + stream_name = gst_amf_node_new_string (data->stream, -1); + + if (data->publish) { + /* Not part of RTMP documentation */ + GST_DEBUG ("Releasing stream '%s'", data->stream); + gst_rtmp_connection_send_command (connection, NULL, NULL, 0, + "releaseStream", command_object, stream_name, NULL); + gst_rtmp_connection_send_command (connection, NULL, NULL, 0, + "FCPublish", command_object, stream_name, NULL); + } else { + /* Matches librtmp */ + gst_rtmp_connection_request_window_size (connection, 2500000); + send_set_buffer_length (connection, 0, 300); + } + + GST_INFO ("Creating stream '%s'", data->stream); + gst_rtmp_connection_send_command (connection, create_stream_done, task, 0, + "createStream", command_object, NULL); + + gst_amf_node_free (stream_name); + gst_amf_node_free (command_object); +} + +static void +create_stream_done (const gchar * command_name, GPtrArray * args, + gpointer user_data) +{ + GTask *task = G_TASK (user_data); + StreamTaskData *data = g_task_get_task_data (task); + GstAmfNode *result; + + if (g_task_return_error_if_cancelled (task)) { + g_object_unref (task); + return; + } + + if (!args) { + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED, + "createStream failed: %s", command_name); + g_object_unref (task); + return; + } + + if (args->len < 2) { + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED, + "createStream failed; not enough return arguments"); + g_object_unref (task); + return; + } + + result = g_ptr_array_index (args, 1); + if (gst_amf_node_get_type (result) != GST_AMF_TYPE_NUMBER) { + GString *error_dump = g_string_new (""); + + gst_amf_node_dump (result, -1, error_dump); + + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED, + "createStream failed: %s", error_dump->str); + g_object_unref (task); + + g_string_free (error_dump, TRUE); + return; + } + + data->id = gst_amf_node_get_number (result); + GST_INFO ("createStream success, stream_id=%" G_GUINT32_FORMAT, data->id); + + if (data->id == 0) { + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_DATA, + "createStream returned ID 0"); + g_object_unref (task); + return; + } + + send_publish_or_play (task); +} + +static void +send_publish_or_play (GTask * task) +{ + GstRtmpConnection *connection = g_task_get_source_object (task); + StreamTaskData *data = g_task_get_task_data (task); + const gchar *command = data->publish ? "publish" : "play"; + GstAmfNode *command_object, *stream_name, *argument; + + command_object = gst_amf_node_new_null (); + stream_name = gst_amf_node_new_string (data->stream, -1); + + if (data->publish) { + /* publishing type (live, record, append) */ + argument = gst_amf_node_new_string ("live", -1); + } else { + /* "Start" argument: -2 = live or recording, -1 = only live + 0 or positive = only recording, seek to X seconds */ + argument = gst_amf_node_new_number (-2); + } + + GST_INFO ("Sending %s for '%s' on stream %" G_GUINT32_FORMAT, + command, data->stream, data->id); + gst_rtmp_connection_expect_command (connection, on_publish_or_play_status, + task, data->id, "onStatus"); + gst_rtmp_connection_send_command (connection, NULL, NULL, data->id, + command, command_object, stream_name, argument, NULL); + + if (!data->publish) { + /* Matches librtmp */ + send_set_buffer_length (connection, data->id, 30000); + } + + gst_amf_node_free (command_object); + gst_amf_node_free (stream_name); + gst_amf_node_free (argument); +} + +static void +on_publish_or_play_status (const gchar * command_name, GPtrArray * args, + gpointer user_data) +{ + GTask *task = G_TASK (user_data); + GstRtmpConnection *connection = g_task_get_source_object (task); + StreamTaskData *data = g_task_get_task_data (task); + const gchar *command = data->publish ? "publish" : "play", *code = NULL; + GString *info_dump; + + if (g_task_return_error_if_cancelled (task)) { + g_object_unref (task); + return; + } + + if (!args) { + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED, + "%s failed: %s", command, command_name); + g_object_unref (task); + return; + } + + if (args->len < 2) { + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED, + "%s failed; not enough return arguments", command); + g_object_unref (task); + return; + } + + { + const GstAmfNode *info_object, *code_object; + info_object = g_ptr_array_index (args, 1); + code_object = gst_amf_node_get_field (info_object, "code"); + + if (code_object) { + code = gst_amf_node_peek_string (code_object, NULL); + } + + info_dump = g_string_new (""); + gst_amf_node_dump (info_object, -1, info_dump); + } + + if (data->publish) { + if (g_strcmp0 (code, "NetStream.Publish.Start") == 0) { + GST_INFO ("publish success: %s", info_dump->str); + g_task_return_boolean (task, TRUE); + goto out; + } + + if (g_strcmp0 (code, "NetStream.Publish.BadName") == 0) { + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_EXISTS, + "publish denied: stream already exists: %s", info_dump->str); + goto out; + } + + if (g_strcmp0 (code, "NetStream.Publish.Denied") == 0) { + g_task_return_new_error (task, G_IO_ERROR, + G_IO_ERROR_PERMISSION_DENIED, "publish denied: %s", info_dump->str); + goto out; + } + } else { + if (g_strcmp0 (code, "NetStream.Play.Start") == 0 || + g_strcmp0 (code, "NetStream.Play.Reset") == 0) { + GST_INFO ("play success: %s", info_dump->str); + g_task_return_boolean (task, TRUE); + goto out; + } + + if (g_strcmp0 (code, "NetStream.Play.StreamNotFound") == 0) { + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_FOUND, + "play denied: stream not found: %s", info_dump->str); + goto out; + } + } + + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED, + "unhandled %s result: %s", command, info_dump->str); + +out: + g_string_free (info_dump, TRUE); + + g_signal_handler_disconnect (connection, data->error_handler_id); + data->error_handler_id = 0; + + g_object_unref (task); +} + +static gboolean +start_stream_finish (GstRtmpConnection * connection, + GAsyncResult * result, guint32 * stream_id, GError ** error) +{ + GTask *task; + StreamTaskData *data; + + g_return_val_if_fail (g_task_is_valid (result, connection), FALSE); + + task = G_TASK (result); + + if (!g_task_propagate_boolean (G_TASK (result), error)) { + return FALSE; + } + + data = g_task_get_task_data (task); + + if (stream_id) { + *stream_id = data->id; + } + + return TRUE; +} + +gboolean +gst_rtmp_client_start_publish_finish (GstRtmpConnection * connection, + GAsyncResult * result, guint32 * stream_id, GError ** error) +{ + return start_stream_finish (connection, result, stream_id, error); +} + +gboolean +gst_rtmp_client_start_play_finish (GstRtmpConnection * connection, + GAsyncResult * result, guint32 * stream_id, GError ** error) +{ + return start_stream_finish (connection, result, stream_id, error); +} diff --git a/gst/rtmp2/rtmp/rtmpclient.h b/gst/rtmp2/rtmp/rtmpclient.h new file mode 100644 index 000000000..ec75d43f9 --- /dev/null +++ b/gst/rtmp2/rtmp/rtmpclient.h @@ -0,0 +1,101 @@ +/* GStreamer RTMP Library + * Copyright (C) 2017 Make.TV, Inc. <info@make.tv> + * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef _GST_RTMP_CLIENT_H_ +#define _GST_RTMP_CLIENT_H_ + +#include "rtmpconnection.h" + +G_BEGIN_DECLS + +#define GST_TYPE_RTMP_SCHEME (gst_rtmp_scheme_get_type ()) + +typedef enum +{ + GST_RTMP_SCHEME_RTMP = 0, + GST_RTMP_SCHEME_RTMPS, +} GstRtmpScheme; + +GType gst_rtmp_scheme_get_type (void); + +GstRtmpScheme gst_rtmp_scheme_from_string (const gchar * string); +GstRtmpScheme gst_rtmp_scheme_from_uri (const GstUri * uri); +const gchar * gst_rtmp_scheme_to_string (GstRtmpScheme scheme); +const gchar * const * gst_rtmp_scheme_get_strings (void); +guint gst_rtmp_scheme_get_default_port (GstRtmpScheme scheme); + + + +#define GST_TYPE_RTMP_AUTHMOD (gst_rtmp_authmod_get_type ()) + +typedef enum +{ + GST_RTMP_AUTHMOD_NONE = 0, + GST_RTMP_AUTHMOD_AUTO, + GST_RTMP_AUTHMOD_ADOBE, +} GstRtmpAuthmod; + +GType gst_rtmp_authmod_get_type (void); + + + +typedef struct _GstRtmpLocation +{ + GstRtmpScheme scheme; + gchar *host; + guint port; + gchar *application; + gchar *stream; + gchar *username; + gchar *password; + gchar *secure_token; + GstRtmpAuthmod authmod; + gint timeout; + GTlsCertificateFlags tls_flags; + gchar *flash_ver; +} GstRtmpLocation; + +void gst_rtmp_location_copy (GstRtmpLocation * dest, + const GstRtmpLocation * src); +void gst_rtmp_location_clear (GstRtmpLocation * uri); +gchar *gst_rtmp_location_get_string (const GstRtmpLocation * location, + gboolean with_stream); + + + +void gst_rtmp_client_connect_async (const GstRtmpLocation * location, + GCancellable * cancellable, GAsyncReadyCallback callback, + gpointer user_data); +GstRtmpConnection *gst_rtmp_client_connect_finish (GAsyncResult * result, + GError ** error); +void gst_rtmp_client_start_publish_async (GstRtmpConnection * connection, + const gchar * stream, GCancellable * cancellable, + GAsyncReadyCallback callback, gpointer user_data); +gboolean gst_rtmp_client_start_publish_finish (GstRtmpConnection * connection, + GAsyncResult * result, guint * stream_id, GError ** error); + +void gst_rtmp_client_start_play_async (GstRtmpConnection * connection, + const gchar * stream, GCancellable * cancellable, + GAsyncReadyCallback callback, gpointer user_data); +gboolean gst_rtmp_client_start_play_finish (GstRtmpConnection * connection, + GAsyncResult * result, guint * stream_id, GError ** error); + +G_END_DECLS +#endif diff --git a/gst/rtmp2/rtmp/rtmpconnection.c b/gst/rtmp2/rtmp/rtmpconnection.c new file mode 100644 index 000000000..937adcbab --- /dev/null +++ b/gst/rtmp2/rtmp/rtmpconnection.c @@ -0,0 +1,996 @@ +/* GStreamer RTMP Library + * Copyright (C) 2013 David Schleef <ds@schleef.org> + * Copyright (C) 2017 Make.TV, Inc. <info@make.tv> + * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin Street, Suite 500, + * Boston, MA 02110-1335, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gst/gst.h> +#include <string.h> +#include <math.h> +#include "rtmpconnection.h" +#include "rtmpchunkstream.h" +#include "rtmpmessage.h" +#include "rtmputils.h" +#include "amf.h" + +GST_DEBUG_CATEGORY_STATIC (gst_rtmp_connection_debug_category); +#define GST_CAT_DEFAULT gst_rtmp_connection_debug_category + +#define READ_SIZE 8192 + +typedef void (*GstRtmpConnectionCallback) (GstRtmpConnection * connection); + +struct _GstRtmpConnection +{ + GObject parent_instance; + + /* should be properties */ + gboolean input_paused; + gboolean error; + + /* private */ + GThread *thread; + GSocketConnection *connection; + GCancellable *cancellable; + GSocketClient *socket_client; + GAsyncQueue *output_queue; + GMainContext *main_context; + + GSource *input_source; + GByteArray *input_bytes; + guint input_needed_bytes; + GstRtmpChunkStreams *input_streams, *output_streams; + GList *transactions; + GList *expected_commands; + guint transaction_count; + + GstRtmpConnectionMessageFunc input_handler; + gpointer input_handler_user_data; + GDestroyNotify input_handler_user_data_destroy; + + GstRtmpConnectionFunc output_handler; + gpointer output_handler_user_data; + GDestroyNotify output_handler_user_data_destroy; + + gboolean writing; + + /* RTMP configuration */ + gsize in_chunk_size; + gsize out_chunk_size; + gsize in_window_ack_size; + gsize out_window_ack_size; + gsize total_input_bytes; + gsize bytes_since_ack; +}; + + +typedef struct +{ + GObjectClass parent_class; +} GstRtmpConnectionClass; + +/* prototypes */ + +static void gst_rtmp_connection_dispose (GObject * object); +static void gst_rtmp_connection_finalize (GObject * object); +static void gst_rtmp_connection_emit_error (GstRtmpConnection * self); +static gboolean gst_rtmp_connection_input_ready (GInputStream * is, + gpointer user_data); +static void gst_rtmp_connection_start_write (GstRtmpConnection * self); +static void gst_rtmp_connection_write_bytes_done (GObject * obj, + GAsyncResult * result, gpointer user_data); +static void gst_rtmp_connection_start_read (GstRtmpConnection * sc, + guint needed_bytes); +static void gst_rtmp_connection_try_read (GstRtmpConnection * sc); +static void gst_rtmp_connection_do_read (GstRtmpConnection * sc); +static void gst_rtmp_connection_handle_protocol_control (GstRtmpConnection * + connection, GstBuffer * buffer); +static void gst_rtmp_connection_handle_cm (GstRtmpConnection * connection, + GstBuffer * buffer); +static void gst_rtmp_connection_handle_user_control (GstRtmpConnection * sc, + GstBuffer * buffer); +static void gst_rtmp_connection_handle_message (GstRtmpConnection * sc, + GstBuffer * buffer); + +static void gst_rtmp_connection_send_ack (GstRtmpConnection * connection); +static void +gst_rtmp_connection_send_ping_response (GstRtmpConnection * connection, + guint32 event_data); + +typedef struct +{ + gdouble transaction_id; + GstRtmpCommandCallback func; + gpointer user_data; +} Transaction; + +static Transaction * +transaction_new (gdouble transaction_id, GstRtmpCommandCallback func, + gpointer user_data) +{ + Transaction *data = g_slice_new (Transaction); + data->transaction_id = transaction_id; + data->func = func; + data->user_data = user_data; + return data; +} + +static void +transaction_free (gpointer ptr) +{ + Transaction *data = ptr; + g_slice_free (Transaction, data); +} + +typedef struct +{ + guint32 stream_id; + gchar *command_name; + GstRtmpCommandCallback func; + gpointer user_data; +} ExpectedCommand; + +static ExpectedCommand * +expected_command_new (guint32 stream_id, const gchar * command_name, + GstRtmpCommandCallback func, gpointer user_data) +{ + ExpectedCommand *data = g_slice_new (ExpectedCommand); + data->stream_id = stream_id; + data->command_name = g_strdup (command_name); + data->func = func; + data->user_data = user_data; + return data; +} + +static void +expected_command_free (gpointer ptr) +{ + ExpectedCommand *data = ptr; + g_free (data->command_name); + g_slice_free (ExpectedCommand, data); +} + +enum +{ + SIGNAL_ERROR, + SIGNAL_STREAM_CONTROL, + + N_SIGNALS +}; + +static guint signals[N_SIGNALS] = { 0, }; + +/* class initialization */ + +G_DEFINE_TYPE_WITH_CODE (GstRtmpConnection, gst_rtmp_connection, + G_TYPE_OBJECT, + GST_DEBUG_CATEGORY_INIT (gst_rtmp_connection_debug_category, + "rtmpconnection", 0, "debug category for GstRtmpConnection class")); + +static void +gst_rtmp_connection_class_init (GstRtmpConnectionClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + + gobject_class->dispose = gst_rtmp_connection_dispose; + gobject_class->finalize = gst_rtmp_connection_finalize; + + signals[SIGNAL_ERROR] = g_signal_new ("error", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0); + + signals[SIGNAL_STREAM_CONTROL] = g_signal_new ("stream-control", + G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, + G_TYPE_NONE, 2, G_TYPE_INT, G_TYPE_UINT); + + GST_DEBUG_REGISTER_FUNCPTR (gst_rtmp_connection_do_read); +} + +static void +gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection) +{ + rtmpconnection->cancellable = g_cancellable_new (); + rtmpconnection->output_queue = + g_async_queue_new_full ((GDestroyNotify) g_bytes_unref); + rtmpconnection->input_streams = gst_rtmp_chunk_streams_new (); + rtmpconnection->output_streams = gst_rtmp_chunk_streams_new (); + + rtmpconnection->in_chunk_size = GST_RTMP_DEFAULT_CHUNK_SIZE; + rtmpconnection->out_chunk_size = GST_RTMP_DEFAULT_CHUNK_SIZE; + + rtmpconnection->input_bytes = g_byte_array_sized_new (2 * READ_SIZE); + rtmpconnection->input_needed_bytes = 1; +} + +void +gst_rtmp_connection_dispose (GObject * object) +{ + GstRtmpConnection *rtmpconnection = GST_RTMP_CONNECTION (object); + GST_DEBUG_OBJECT (rtmpconnection, "dispose"); + + /* clean up as possible. may be called multiple times */ + + gst_rtmp_connection_close (rtmpconnection); + g_cancellable_cancel (rtmpconnection->cancellable); + gst_rtmp_connection_set_input_handler (rtmpconnection, NULL, NULL, NULL); + gst_rtmp_connection_set_output_handler (rtmpconnection, NULL, NULL, NULL); + + G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->dispose (object); +} + +void +gst_rtmp_connection_finalize (GObject * object) +{ + GstRtmpConnection *rtmpconnection = GST_RTMP_CONNECTION (object); + GST_DEBUG_OBJECT (rtmpconnection, "finalize"); + + /* clean up object here */ + + g_clear_object (&rtmpconnection->cancellable); + g_clear_object (&rtmpconnection->connection); + g_clear_pointer (&rtmpconnection->output_queue, g_async_queue_unref); + g_clear_pointer (&rtmpconnection->input_streams, gst_rtmp_chunk_streams_free); + g_clear_pointer (&rtmpconnection->output_streams, + gst_rtmp_chunk_streams_free); + g_clear_pointer (&rtmpconnection->input_bytes, g_byte_array_unref); + g_clear_pointer (&rtmpconnection->main_context, g_main_context_unref); + g_clear_pointer (&rtmpconnection->thread, g_thread_unref); + + G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->finalize (object); +} + +GSocket * +gst_rtmp_connection_get_socket (GstRtmpConnection * sc) +{ + return g_socket_connection_get_socket (sc->connection); +} + +static void +gst_rtmp_connection_set_socket_connection (GstRtmpConnection * sc, + GSocketConnection * connection) +{ + GInputStream *is; + + sc->thread = g_thread_ref (g_thread_self ()); + sc->main_context = g_main_context_ref_thread_default (); + sc->connection = g_object_ref (connection); + + /* refs the socket because it's creating an input stream, which holds a ref */ + is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection)); + /* refs the socket because it's creating a socket source */ + g_warn_if_fail (!sc->input_source); + sc->input_source = + g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (is), + sc->cancellable); + g_source_set_callback (sc->input_source, + (GSourceFunc) gst_rtmp_connection_input_ready, g_object_ref (sc), + g_object_unref); + g_source_attach (sc->input_source, sc->main_context); +} + +GstRtmpConnection * +gst_rtmp_connection_new (GSocketConnection * connection) +{ + GstRtmpConnection *sc; + + sc = g_object_new (GST_TYPE_RTMP_CONNECTION, NULL); + + gst_rtmp_connection_set_socket_connection (sc, connection); + + return sc; +} + +static void +cancel_all_commands (GstRtmpConnection * self) +{ + GList *l; + + for (l = self->transactions; l; l = g_list_next (l)) { + Transaction *cc = l->data; + GST_LOG ("calling transaction callback %s", + GST_DEBUG_FUNCPTR_NAME (cc->func)); + cc->func ("<cancelled>", NULL, cc->user_data); + } + g_list_free_full (self->transactions, transaction_free); + self->transactions = NULL; + + for (l = self->expected_commands; l; l = g_list_next (l)) { + ExpectedCommand *cc = l->data; + GST_LOG ("calling expected command callback %s", + GST_DEBUG_FUNCPTR_NAME (cc->func)); + cc->func ("<cancelled>", NULL, cc->user_data); + } + g_list_free_full (self->expected_commands, expected_command_free); + self->expected_commands = NULL; +} + +void +gst_rtmp_connection_close (GstRtmpConnection * self) +{ + if (self->thread != g_thread_self ()) { + GST_ERROR ("Called from wrong thread"); + } + + g_cancellable_cancel (self->cancellable); + cancel_all_commands (self); + + if (self->input_source) { + g_source_destroy (self->input_source); + g_clear_pointer (&self->input_source, g_source_unref); + } + + if (self->connection) { + g_io_stream_close_async (G_IO_STREAM (self->connection), + G_PRIORITY_DEFAULT, NULL, NULL, NULL); + } +} + +void +gst_rtmp_connection_close_and_unref (gpointer ptr) +{ + GstRtmpConnection *connection; + + g_return_if_fail (ptr); + + connection = GST_RTMP_CONNECTION (ptr); + gst_rtmp_connection_close (connection); + g_object_unref (connection); +} + +void +gst_rtmp_connection_set_input_handler (GstRtmpConnection * sc, + GstRtmpConnectionMessageFunc callback, gpointer user_data, + GDestroyNotify user_data_destroy) +{ + if (sc->input_handler_user_data_destroy) { + sc->input_handler_user_data_destroy (sc->input_handler_user_data); + } + + sc->input_handler = callback; + sc->input_handler_user_data = user_data; + sc->input_handler_user_data_destroy = user_data_destroy; +} + +void +gst_rtmp_connection_set_output_handler (GstRtmpConnection * sc, + GstRtmpConnectionFunc callback, gpointer user_data, + GDestroyNotify user_data_destroy) +{ + if (sc->output_handler_user_data_destroy) { + sc->output_handler_user_data_destroy (sc->output_handler_user_data); + } + + sc->output_handler = callback; + sc->output_handler_user_data = user_data; + sc->output_handler_user_data_destroy = user_data_destroy; +} + +static gboolean +gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data) +{ + GstRtmpConnection *sc = user_data; + gssize ret; + guint oldsize; + GError *error = NULL; + + GST_TRACE ("input ready"); + + oldsize = sc->input_bytes->len; + g_byte_array_set_size (sc->input_bytes, oldsize + READ_SIZE); + ret = + g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is), + sc->input_bytes->data + oldsize, READ_SIZE, sc->cancellable, &error); + g_byte_array_set_size (sc->input_bytes, oldsize + (ret > 0 ? ret : 0)); + + if (ret < 0) { + gint code = error->code; + + if (error->domain == G_IO_ERROR && (code == G_IO_ERROR_WOULD_BLOCK || + code == G_IO_ERROR_TIMED_OUT || code == G_IO_ERROR_AGAIN)) { + /* should retry */ + GST_DEBUG ("read IO error %d %s, continuing", code, error->message); + g_error_free (error); + return G_SOURCE_CONTINUE; + } + + GST_ERROR ("read error: %s %d %s", g_quark_to_string (error->domain), + code, error->message); + g_error_free (error); + } else if (ret == 0) { + GST_INFO ("read EOF"); + } + + if (ret <= 0) { + gst_rtmp_connection_emit_error (sc); + return G_SOURCE_REMOVE; + } + + GST_TRACE ("read %" G_GSIZE_FORMAT " bytes", ret); + + sc->total_input_bytes += ret; + sc->bytes_since_ack += ret; + if (sc->bytes_since_ack >= sc->in_window_ack_size) { + gst_rtmp_connection_send_ack (sc); + } + + gst_rtmp_connection_try_read (sc); + return G_SOURCE_CONTINUE; +} + +static void +gst_rtmp_connection_start_write (GstRtmpConnection * self) +{ + GOutputStream *os; + GBytes *bytes; + + if (self->writing) { + return; + } + + bytes = g_async_queue_try_pop (self->output_queue); + if (!bytes) { + return; + } + + self->writing = TRUE; + if (self->output_handler) { + self->output_handler (self, self->output_handler_user_data); + } + + os = g_io_stream_get_output_stream (G_IO_STREAM (self->connection)); + gst_rtmp_output_stream_write_all_bytes_async (os, bytes, + G_PRIORITY_DEFAULT, self->cancellable, + gst_rtmp_connection_write_bytes_done, g_object_ref (self)); + g_bytes_unref (bytes); +} + +static void +gst_rtmp_connection_emit_error (GstRtmpConnection * self) +{ + if (self->error) { + return; + } + + GST_INFO ("connection error"); + self->error = TRUE; + + cancel_all_commands (self); + + g_signal_emit (self, signals[SIGNAL_ERROR], 0); +} + +static void +gst_rtmp_connection_write_bytes_done (GObject * obj, + GAsyncResult * result, gpointer user_data) +{ + GOutputStream *os = G_OUTPUT_STREAM (obj); + GstRtmpConnection *self = GST_RTMP_CONNECTION (user_data); + GError *error = NULL; + gboolean res; + + self->writing = FALSE; + + res = gst_rtmp_output_stream_write_all_bytes_finish (os, result, &error); + if (!res) { + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + GST_INFO ("write cancelled"); + } else { + GST_ERROR ("write error: %s", error->message); + } + gst_rtmp_connection_emit_error (self); + g_error_free (error); + g_object_unref (self); + return; + } + + GST_LOG ("write completed"); + gst_rtmp_connection_start_write (self); + g_object_unref (self); +} + +static void +gst_rtmp_connection_start_read (GstRtmpConnection * connection, + guint needed_bytes) +{ + g_return_if_fail (needed_bytes > 0); + connection->input_needed_bytes = needed_bytes; + gst_rtmp_connection_try_read (connection); +} + +static void +gst_rtmp_connection_try_read (GstRtmpConnection * connection) +{ + guint need = connection->input_needed_bytes, + len = connection->input_bytes->len; + + if (len < need) { + GST_TRACE ("got %u < %u bytes, need more", len, need); + return; + } + + GST_TRACE ("got %u >= %u bytes, proceeding", len, need); + gst_rtmp_connection_do_read (connection); +} + +static void +gst_rtmp_connection_take_input_bytes (GstRtmpConnection * sc, gsize size, + GBytes ** outbytes) +{ + g_return_if_fail (size <= sc->input_bytes->len); + + if (outbytes) { + *outbytes = g_bytes_new (sc->input_bytes->data, size); + } + + g_byte_array_remove_range (sc->input_bytes, 0, size); +} + +static void +gst_rtmp_connection_do_read (GstRtmpConnection * sc) +{ + GByteArray *input_bytes = sc->input_bytes; + gsize needed_bytes = 1; + + while (1) { + GstRtmpChunkStream *cstream; + guint32 chunk_stream_id, header_size, next_size; + guint8 *data; + + chunk_stream_id = gst_rtmp_chunk_stream_parse_id (input_bytes->data, + input_bytes->len); + + if (!chunk_stream_id) { + needed_bytes = input_bytes->len + 1; + break; + } + + cstream = gst_rtmp_chunk_streams_get (sc->input_streams, chunk_stream_id); + header_size = gst_rtmp_chunk_stream_parse_header (cstream, + input_bytes->data, input_bytes->len); + + if (input_bytes->len < header_size) { + needed_bytes = header_size; + break; + } + + next_size = gst_rtmp_chunk_stream_parse_payload (cstream, + sc->in_chunk_size, &data); + + if (input_bytes->len < header_size + next_size) { + needed_bytes = header_size + next_size; + break; + } + + memcpy (data, input_bytes->data + header_size, next_size); + gst_rtmp_connection_take_input_bytes (sc, header_size + next_size, NULL); + + next_size = gst_rtmp_chunk_stream_wrote_payload (cstream, + sc->in_chunk_size); + + if (next_size == 0) { + GstBuffer *buffer = gst_rtmp_chunk_stream_parse_finish (cstream); + gst_rtmp_connection_handle_message (sc, buffer); + gst_buffer_unref (buffer); + } + } + + gst_rtmp_connection_start_read (sc, needed_bytes); +} + +static void +gst_rtmp_connection_handle_message (GstRtmpConnection * sc, GstBuffer * buffer) +{ + if (gst_rtmp_message_is_protocol_control (buffer)) { + gst_rtmp_connection_handle_protocol_control (sc, buffer); + return; + } + + if (gst_rtmp_message_is_user_control (buffer)) { + gst_rtmp_connection_handle_user_control (sc, buffer); + return; + } + + switch (gst_rtmp_message_get_type (buffer)) { + case GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0: + gst_rtmp_connection_handle_cm (sc, buffer); + return; + + default: + if (sc->input_handler) { + sc->input_handler (sc, buffer, sc->input_handler_user_data); + } + return; + } +} + +static void +gst_rtmp_connection_handle_protocol_control (GstRtmpConnection * connection, + GstBuffer * buffer) +{ + GstRtmpProtocolControl pc; + + if (!gst_rtmp_message_parse_protocol_control (buffer, &pc)) { + GST_ERROR ("can't parse protocol control message"); + return; + } + + GST_LOG ("got protocol control message %d:%s", pc.type, + gst_rtmp_message_type_get_nick (pc.type)); + + switch (pc.type) { + case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE: + GST_INFO ("new chunk size %" G_GUINT32_FORMAT, pc.param); + connection->in_chunk_size = pc.param; + break; + + case GST_RTMP_MESSAGE_TYPE_ABORT_MESSAGE: + GST_ERROR ("unimplemented: chunk abort, stream_id = %" G_GUINT32_FORMAT, + pc.param); + break; + + case GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT: + /* We don't really send ack requests that we care about, so ignore */ + GST_DEBUG ("acknowledgement %" G_GUINT32_FORMAT, pc.param); + break; + + case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE: + GST_INFO ("window ack size: %" G_GUINT32_FORMAT, pc.param); + connection->in_window_ack_size = pc.param; + break; + + case GST_RTMP_MESSAGE_TYPE_SET_PEER_BANDWIDTH: + GST_FIXME ("set peer bandwidth: %" G_GUINT32_FORMAT ", %" + G_GUINT32_FORMAT, pc.param, pc.param2); + /* FIXME this is not correct, but close enough */ + gst_rtmp_connection_request_window_size (connection, pc.param); + break; + + default: + GST_ERROR ("unimplemented protocol control type %d:%s", pc.type, + gst_rtmp_message_type_get_nick (pc.type)); + break; + } +} + +static void +gst_rtmp_connection_handle_user_control (GstRtmpConnection * connection, + GstBuffer * buffer) +{ + GstRtmpUserControl uc; + + if (!gst_rtmp_message_parse_user_control (buffer, &uc)) { + GST_ERROR ("can't parse user control message"); + return; + } + + GST_LOG ("got user control message %d:%s", uc.type, + gst_rtmp_user_control_type_get_nick (uc.type)); + + switch (uc.type) { + case GST_RTMP_USER_CONTROL_TYPE_STREAM_BEGIN: + case GST_RTMP_USER_CONTROL_TYPE_STREAM_EOF: + case GST_RTMP_USER_CONTROL_TYPE_STREAM_DRY: + case GST_RTMP_USER_CONTROL_TYPE_STREAM_IS_RECORDED: + GST_INFO ("stream %u got %s", uc.param, + gst_rtmp_user_control_type_get_nick (uc.type)); + g_signal_emit (connection, signals[SIGNAL_STREAM_CONTROL], 0, + uc.type, uc.param); + break; + + case GST_RTMP_USER_CONTROL_TYPE_SET_BUFFER_LENGTH: + GST_FIXME ("ignoring set buffer length: %" G_GUINT32_FORMAT ", %" + G_GUINT32_FORMAT " ms", uc.param, uc.param2); + break; + + case GST_RTMP_USER_CONTROL_TYPE_PING_REQUEST: + GST_DEBUG ("ping request: %" G_GUINT32_FORMAT, uc.param); + gst_rtmp_connection_send_ping_response (connection, uc.param); + break; + + case GST_RTMP_USER_CONTROL_TYPE_PING_RESPONSE: + GST_DEBUG ("ignoring ping response: %" G_GUINT32_FORMAT, uc.param); + break; + + case GST_RTMP_USER_CONTROL_TYPE_BUFFER_EMPTY: + GST_LOG ("ignoring buffer empty: %" G_GUINT32_FORMAT, uc.param); + break; + + case GST_RTMP_USER_CONTROL_TYPE_BUFFER_READY: + GST_LOG ("ignoring buffer ready: %" G_GUINT32_FORMAT, uc.param); + break; + + default: + GST_ERROR ("unimplemented user control type %d:%s", uc.type, + gst_rtmp_user_control_type_get_nick (uc.type)); + break; + } +} + +static gboolean +is_command_response (const gchar * command_name) +{ + return g_strcmp0 (command_name, "_result") == 0 || + g_strcmp0 (command_name, "_error") == 0; +} + +static void +gst_rtmp_connection_handle_cm (GstRtmpConnection * sc, GstBuffer * buffer) +{ + GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (buffer); + gchar *command_name; + gdouble transaction_id; + GPtrArray *args; + + { + GstMapInfo map; + gst_buffer_map (buffer, &map, GST_MAP_READ); + args = gst_amf_parse_command (map.data, map.size, &transaction_id, + &command_name); + gst_buffer_unmap (buffer, &map); + } + + if (!args) { + return; + } + + if (!isfinite (transaction_id) || transaction_id < 0 || + transaction_id > G_MAXUINT) { + GST_WARNING ("Server sent command \"%s\" with extreme transaction ID %.0f", + GST_STR_NULL (command_name), transaction_id); + } else if (transaction_id > sc->transaction_count) { + GST_WARNING ("Server sent command \"%s\" with unused transaction ID " + "(%.0f > %u)", GST_STR_NULL (command_name), transaction_id, + sc->transaction_count); + sc->transaction_count = transaction_id; + } + + GST_DEBUG ("got control message \"%s\" transaction %.0f size %" + G_GUINT32_FORMAT, GST_STR_NULL (command_name), transaction_id, + meta->size); + + if (is_command_response (command_name)) { + if (transaction_id != 0) { + GList *l; + + for (l = sc->transactions; l; l = g_list_next (l)) { + Transaction *t = l->data; + + if (t->transaction_id != transaction_id) { + continue; + } + + GST_LOG ("calling transaction callback %s", + GST_DEBUG_FUNCPTR_NAME (t->func)); + sc->transactions = g_list_remove_link (sc->transactions, l); + t->func (command_name, args, t->user_data); + g_list_free_full (l, transaction_free); + break; + } + } else { + GST_WARNING ("Server sent response \"%s\" without transaction", + GST_STR_NULL (command_name)); + } + } else { + GList *l; + + if (transaction_id != 0) { + GST_FIXME ("Server sent command \"%s\" expecting reply", + GST_STR_NULL (command_name)); + } + + for (l = sc->expected_commands; l; l = g_list_next (l)) { + ExpectedCommand *ec = l->data; + + if (ec->stream_id != meta->mstream) { + continue; + } + + if (g_strcmp0 (ec->command_name, command_name)) { + continue; + } + + GST_LOG ("calling expected command callback %s", + GST_DEBUG_FUNCPTR_NAME (ec->func)); + sc->expected_commands = g_list_remove_link (sc->expected_commands, l); + ec->func (command_name, args, ec->user_data); + g_list_free_full (l, expected_command_free); + break; + } + } + + g_free (command_name); + g_ptr_array_unref (args); +} + +static gboolean +start_write (gpointer user_data) +{ + GstRtmpConnection *sc = user_data; + gst_rtmp_connection_start_write (sc); + g_object_unref (sc); + return G_SOURCE_REMOVE; +} + +static void +byte_array_take_buffer (GByteArray * byte_array, GstBuffer * buffer) +{ + GstMapInfo map; + gboolean ret; + ret = gst_buffer_map (buffer, &map, GST_MAP_READ); + g_assert (ret); + g_assert (byte_array->len + map.size <= (guint64) G_MAXUINT); + g_byte_array_append (byte_array, map.data, map.size); + gst_buffer_unmap (buffer, &map); + gst_buffer_unref (buffer); +} + +void +gst_rtmp_connection_queue_message (GstRtmpConnection * self, GstBuffer * buffer) +{ + GstRtmpMeta *meta; + GstRtmpChunkStream *cstream; + GstBuffer *out_buffer; + GByteArray *out_ba; + + g_return_if_fail (GST_IS_RTMP_CONNECTION (self)); + g_return_if_fail (GST_IS_BUFFER (buffer)); + + meta = gst_buffer_get_rtmp_meta (buffer); + g_return_if_fail (meta); + + cstream = gst_rtmp_chunk_streams_get (self->output_streams, meta->cstream); + g_return_if_fail (cstream); + + out_buffer = gst_rtmp_chunk_stream_serialize_start (cstream, buffer, + self->out_chunk_size); + g_return_if_fail (out_buffer); + + out_ba = g_byte_array_new (); + + while (out_buffer) { + byte_array_take_buffer (out_ba, out_buffer); + + out_buffer = gst_rtmp_chunk_stream_serialize_next (cstream, + self->out_chunk_size); + } + + g_async_queue_push (self->output_queue, g_byte_array_free_to_bytes (out_ba)); + g_main_context_invoke (self->main_context, start_write, g_object_ref (self)); +} + +guint +gst_rtmp_connection_get_num_queued (GstRtmpConnection * connection) +{ + return g_async_queue_length (connection->output_queue); +} + +guint +gst_rtmp_connection_send_command (GstRtmpConnection * connection, + GstRtmpCommandCallback response_command, gpointer user_data, + guint32 stream_id, const gchar * command_name, const GstAmfNode * argument, + ...) +{ + GstBuffer *buffer; + gdouble transaction_id = 0; + va_list ap; + GBytes *payload; + guint8 *data; + gsize size; + + if (connection->thread != g_thread_self ()) { + GST_ERROR ("Called from wrong thread"); + } + + GST_DEBUG ("Sending command '%s' on stream id %" G_GUINT32_FORMAT, + command_name, stream_id); + + if (response_command) { + Transaction *t; + + transaction_id = ++connection->transaction_count; + + GST_LOG ("Registering %s for transid %.0f", + GST_DEBUG_FUNCPTR_NAME (response_command), transaction_id); + + t = transaction_new (transaction_id, response_command, user_data); + + connection->transactions = g_list_append (connection->transactions, t); + } + + va_start (ap, argument); + payload = gst_amf_serialize_command_valist (transaction_id, + command_name, argument, ap); + va_end (ap); + + data = g_bytes_unref_to_data (payload, &size); + buffer = gst_rtmp_message_new_wrapped (GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0, + 3, stream_id, data, size); + + gst_rtmp_connection_queue_message (connection, buffer); + return transaction_id; +} + +void +gst_rtmp_connection_expect_command (GstRtmpConnection * connection, + GstRtmpCommandCallback response_command, gpointer user_data, + guint32 stream_id, const gchar * command_name) +{ + ExpectedCommand *ec; + + g_return_if_fail (response_command); + g_return_if_fail (command_name); + g_return_if_fail (!is_command_response (command_name)); + + GST_LOG ("Registering %s for stream id %" G_GUINT32_FORMAT + " name \"%s\"", GST_DEBUG_FUNCPTR_NAME (response_command), + stream_id, command_name); + + ec = expected_command_new (stream_id, command_name, response_command, + user_data); + + connection->expected_commands = + g_list_append (connection->expected_commands, ec); +} + +static void +gst_rtmp_connection_send_ack (GstRtmpConnection * connection) +{ + GstRtmpProtocolControl pc = { + .type = GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT, + .param = (guint32) connection->total_input_bytes, + }; + + gst_rtmp_connection_queue_message (connection, + gst_rtmp_message_new_protocol_control (&pc)); + + connection->bytes_since_ack = 0; +} + +static void +gst_rtmp_connection_send_ping_response (GstRtmpConnection * connection, + guint32 event_data) +{ + GstRtmpUserControl uc = { + .type = GST_RTMP_USER_CONTROL_TYPE_PING_RESPONSE, + .param = event_data, + }; + + gst_rtmp_connection_queue_message (connection, + gst_rtmp_message_new_user_control (&uc)); +} + +void +gst_rtmp_connection_request_window_size (GstRtmpConnection * connection, + guint32 window_ack_size) +{ + GstRtmpProtocolControl pc = { + .type = GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE, + .param = window_ack_size, + }; + + if (connection->out_window_ack_size == window_ack_size) + return; + + connection->out_window_ack_size = window_ack_size; + + gst_rtmp_connection_queue_message (connection, + gst_rtmp_message_new_protocol_control (&pc)); +} diff --git a/gst/rtmp2/rtmp/rtmpconnection.h b/gst/rtmp2/rtmp/rtmpconnection.h new file mode 100644 index 000000000..7b5eb6c09 --- /dev/null +++ b/gst/rtmp2/rtmp/rtmpconnection.h @@ -0,0 +1,82 @@ +/* GStreamer RTMP Library + * Copyright (C) 2013 David Schleef <ds@schleef.org> + * Copyright (C) 2017 Make.TV, Inc. <info@make.tv> + * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef _GST_RTMP_CONNECTION_H_ +#define _GST_RTMP_CONNECTION_H_ + +#include <gio/gio.h> +#include <gst/gst.h> +#include "amf.h" + +G_BEGIN_DECLS + +#define GST_TYPE_RTMP_CONNECTION (gst_rtmp_connection_get_type()) +#define GST_RTMP_CONNECTION(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTMP_CONNECTION,GstRtmpConnection)) +#define GST_IS_RTMP_CONNECTION(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTMP_CONNECTION)) + +typedef struct _GstRtmpConnection GstRtmpConnection; + +typedef void (*GstRtmpConnectionFunc) + (GstRtmpConnection * connection, gpointer user_data); +typedef void (*GstRtmpConnectionMessageFunc) + (GstRtmpConnection * connection, GstBuffer * buffer, gpointer user_data); + +typedef void (*GstRtmpCommandCallback) (const gchar * command_name, + GPtrArray * arguments, gpointer user_data); + +GType gst_rtmp_connection_get_type (void); + +GstRtmpConnection *gst_rtmp_connection_new (GSocketConnection * connection); + +GSocket *gst_rtmp_connection_get_socket (GstRtmpConnection * connection); + +void gst_rtmp_connection_close (GstRtmpConnection * connection); +void gst_rtmp_connection_close_and_unref (gpointer ptr); + +void gst_rtmp_connection_set_input_handler (GstRtmpConnection * connection, + GstRtmpConnectionMessageFunc callback, gpointer user_data, + GDestroyNotify user_data_destroy); + +void gst_rtmp_connection_set_output_handler (GstRtmpConnection * connection, + GstRtmpConnectionFunc callback, gpointer user_data, + GDestroyNotify user_data_destroy); + +void gst_rtmp_connection_queue_bytes (GstRtmpConnection *self, + GBytes * bytes); +void gst_rtmp_connection_queue_message (GstRtmpConnection * connection, + GstBuffer * buffer); +guint gst_rtmp_connection_get_num_queued (GstRtmpConnection * connection); + +guint gst_rtmp_connection_send_command (GstRtmpConnection * connection, + GstRtmpCommandCallback response_command, gpointer user_data, + guint32 stream_id, const gchar * command_name, const GstAmfNode * argument, + ...) G_GNUC_NULL_TERMINATED; + +void gst_rtmp_connection_expect_command (GstRtmpConnection * connection, + GstRtmpCommandCallback response_command, gpointer user_data, + guint32 stream_id, const gchar * command_name); + +void gst_rtmp_connection_request_window_size (GstRtmpConnection * connection, + guint32 window_ack_size); + +G_END_DECLS + +#endif diff --git a/gst/rtmp2/rtmp/rtmphandshake.c b/gst/rtmp2/rtmp/rtmphandshake.c new file mode 100644 index 000000000..0d821c3d8 --- /dev/null +++ b/gst/rtmp2/rtmp/rtmphandshake.c @@ -0,0 +1,311 @@ +/* GStreamer RTMP Library + * Copyright (C) 2017 Make.TV, Inc. <info@make.tv> + * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "rtmphandshake.h" +#include "rtmputils.h" + +#include <gst/gst.h> +#include <string.h> + +GST_DEBUG_CATEGORY_STATIC (gst_rtmp_handshake_debug_category); +#define GST_CAT_DEFAULT gst_rtmp_handshake_debug_category + +static void +init_debug (void) +{ + static volatile gsize done = 0; + if (g_once_init_enter (&done)) { + GST_DEBUG_CATEGORY_INIT (gst_rtmp_handshake_debug_category, "rtmphandshake", + 0, "debug category for the rtmp connection handshake"); + g_once_init_leave (&done, 1); + } +} + +static void client_handshake1_done (GObject * source, GAsyncResult * result, + gpointer user_data); +static void client_handshake2_done (GObject * source, GAsyncResult * result, + gpointer user_data); +static void client_handshake3_done (GObject * source, GAsyncResult * result, + gpointer user_data); + +static inline void +serialize_u8 (GByteArray * array, guint8 value) +{ + g_byte_array_append (array, (guint8 *) & value, sizeof value); +} + +static inline void +serialize_u32 (GByteArray * array, guint32 value) +{ + value = GUINT32_TO_BE (value); + g_byte_array_append (array, (guint8 *) & value, sizeof value); +} + +#define SIZE_P0 1 +#define SIZE_P1 1536 +#define SIZE_P2 SIZE_P1 +#define SIZE_P0P1 (SIZE_P0 + SIZE_P1) +#define SIZE_P0P1P2 (SIZE_P0P1 + SIZE_P2) +#define SIZE_RANDOM (SIZE_P1 - 8) + +typedef struct +{ + GBytes *random_bytes; + gboolean strict; +} HandshakeData; + +static GBytes * +handshake_random_data (void) +{ + G_STATIC_ASSERT (SIZE_RANDOM % 4 == 0); + + GByteArray *ba = g_byte_array_sized_new (SIZE_RANDOM); + gint i; + + for (i = 0; i < SIZE_RANDOM; i += 4) { + serialize_u32 (ba, g_random_int ()); + } + + return g_byte_array_free_to_bytes (ba); +} + +static HandshakeData * +handshake_data_new (gboolean strict) +{ + HandshakeData *data = g_slice_new0 (HandshakeData); + data->random_bytes = handshake_random_data (); + data->strict = strict; + return data; +} + +static void +handshake_data_free (gpointer ptr) +{ + HandshakeData *data = ptr; + g_clear_pointer (&data->random_bytes, g_bytes_unref); + g_slice_free (HandshakeData, data); +} + +static gboolean +handshake_data_check (HandshakeData * data, const guint8 * p2) +{ + const guint8 *ourrandom = g_bytes_get_data (data->random_bytes, NULL); + return memcmp (ourrandom, p2 + 8, SIZE_P2 - 8) == 0; +} + +static GBytes * +create_c0c1 (GBytes * random_bytes) +{ + GByteArray *ba = g_byte_array_sized_new (SIZE_P0P1); + + /* C0 version */ + serialize_u8 (ba, 3); + + /* C1 time */ + serialize_u32 (ba, g_get_monotonic_time () / 1000); + + /* C1 zero */ + serialize_u32 (ba, 0); + + /* C1 random data */ + gst_rtmp_byte_array_append_bytes (ba, random_bytes); + + GST_DEBUG ("Sending C0+C1"); + GST_MEMDUMP (">>> C0", ba->data, SIZE_P0); + GST_MEMDUMP (">>> C1", ba->data + SIZE_P0, SIZE_P1); + + return g_byte_array_free_to_bytes (ba); +} + +void +gst_rtmp_client_handshake (GIOStream * stream, gboolean strict, + GCancellable * cancellable, GAsyncReadyCallback callback, + gpointer user_data) +{ + GTask *task; + HandshakeData *data; + + g_return_if_fail (G_IS_IO_STREAM (stream)); + + init_debug (); + GST_INFO ("Starting client handshake"); + + task = g_task_new (stream, cancellable, callback, user_data); + data = handshake_data_new (strict); + g_task_set_task_data (task, data, handshake_data_free); + + { + GOutputStream *os = g_io_stream_get_output_stream (stream); + GBytes *bytes = create_c0c1 (data->random_bytes); + + gst_rtmp_output_stream_write_all_bytes_async (os, + bytes, G_PRIORITY_DEFAULT, + g_task_get_cancellable (task), client_handshake1_done, task); + + g_bytes_unref (bytes); + } +} + +static void +client_handshake1_done (GObject * source, GAsyncResult * result, + gpointer user_data) +{ + GOutputStream *os = G_OUTPUT_STREAM (source); + GTask *task = user_data; + GIOStream *stream = g_task_get_source_object (task); + GInputStream *is = g_io_stream_get_input_stream (stream); + GError *error = NULL; + gboolean res; + + res = gst_rtmp_output_stream_write_all_bytes_finish (os, result, &error); + if (!res) { + GST_ERROR ("Failed to send C0+C1: %s", error->message); + g_task_return_error (task, error); + g_object_unref (task); + return; + } + + GST_DEBUG ("Sent C0+C1, waiting for S0+S1+S2"); + gst_rtmp_input_stream_read_all_bytes_async (is, SIZE_P0P1P2, + G_PRIORITY_DEFAULT, g_task_get_cancellable (task), + client_handshake2_done, task); +} + +static GBytes * +create_c2 (const guint8 * s0s1s2) +{ + G_STATIC_ASSERT (SIZE_P1 == SIZE_P2); + + GByteArray *ba = g_byte_array_sized_new (SIZE_P2); + gint64 c2time = g_get_monotonic_time (); + + /* Copy S1 to C2 */ + g_byte_array_set_size (ba, SIZE_P2); + memcpy (ba->data, s0s1s2 + SIZE_P0, SIZE_P1); + + /* C2 time2 */ + GST_WRITE_UINT32_BE (ba->data + 4, c2time / 1000); + + GST_DEBUG ("Sending C2"); + GST_MEMDUMP (">>> C2", ba->data, SIZE_P2); + + return g_byte_array_free_to_bytes (ba); +} + +static void +client_handshake2_done (GObject * source, GAsyncResult * result, + gpointer user_data) +{ + GInputStream *is = G_INPUT_STREAM (source); + GTask *task = user_data; + GIOStream *stream = g_task_get_source_object (task); + HandshakeData *data = g_task_get_task_data (task); + GError *error = NULL; + GBytes *res; + const guint8 *s0s1s2; + gsize size; + + res = gst_rtmp_input_stream_read_all_bytes_finish (is, result, &error); + if (!res) { + GST_ERROR ("Failed to read S0+S1+S2: %s", error->message); + g_task_return_error (task, error); + g_object_unref (task); + return; + } + + s0s1s2 = g_bytes_get_data (res, &size); + if (size < SIZE_P0P1P2) { + GST_ERROR ("Short read (want %d have %" G_GSIZE_FORMAT ")", SIZE_P0P1P2, + size); + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PARTIAL_INPUT, + "Short read (want %d have %" G_GSIZE_FORMAT ")", SIZE_P0P1P2, size); + g_object_unref (task); + goto out; + } + + GST_DEBUG ("Got S0+S1+S2"); + GST_MEMDUMP ("<<< S0", s0s1s2, SIZE_P0); + GST_MEMDUMP ("<<< S1", s0s1s2 + SIZE_P0, SIZE_P1); + GST_MEMDUMP ("<<< S2", s0s1s2 + SIZE_P0P1, SIZE_P2); + + if (handshake_data_check (data, s0s1s2 + SIZE_P0P1)) { + GST_DEBUG ("S2 random data matches C1"); + } else { + if (data->strict) { + GST_ERROR ("Handshake response data did not match"); + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_DATA, + "Handshake response data did not match"); + g_object_unref (task); + goto out; + } + + GST_WARNING ("Handshake reponse data did not match; continuing anyway"); + } + + { + GOutputStream *os = g_io_stream_get_output_stream (stream); + GBytes *bytes = create_c2 (s0s1s2); + + gst_rtmp_output_stream_write_all_bytes_async (os, + bytes, G_PRIORITY_DEFAULT, + g_task_get_cancellable (task), client_handshake3_done, task); + + g_bytes_unref (bytes); + } + +out: + g_bytes_unref (res); +} + +static void +client_handshake3_done (GObject * source, GAsyncResult * result, + gpointer user_data) +{ + GOutputStream *os = G_OUTPUT_STREAM (source); + GTask *task = user_data; + GError *error = NULL; + gboolean res; + + res = gst_rtmp_output_stream_write_all_bytes_finish (os, result, &error); + if (!res) { + GST_ERROR ("Failed to send C2: %s", error->message); + g_task_return_error (task, error); + g_object_unref (task); + return; + } + + GST_DEBUG ("Sent C2"); + GST_INFO ("Client handshake finished"); + + g_task_return_boolean (task, TRUE); + g_object_unref (task); +} + +gboolean +gst_rtmp_client_handshake_finish (GIOStream * stream, GAsyncResult * result, + GError ** error) +{ + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + return g_task_propagate_boolean (G_TASK (result), error); +} diff --git a/gst/rtmp2/rtmp/rtmphandshake.h b/gst/rtmp2/rtmp/rtmphandshake.h new file mode 100644 index 000000000..1147948d4 --- /dev/null +++ b/gst/rtmp2/rtmp/rtmphandshake.h @@ -0,0 +1,35 @@ +/* GStreamer RTMP Library + * Copyright (C) 2017 Make.TV, Inc. <info@make.tv> + * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef _GST_RTMP_HANDSHAKE_H_ +#define _GST_RTMP_HANDSHAKE_H_ + +#include <gio/gio.h> + +G_BEGIN_DECLS + +void gst_rtmp_client_handshake (GIOStream * stream, gboolean strict, + GCancellable * cancellable, GAsyncReadyCallback callback, + gpointer user_data); +gboolean gst_rtmp_client_handshake_finish (GIOStream * stream, + GAsyncResult * result, GError ** error); + +G_END_DECLS +#endif diff --git a/gst/rtmp2/rtmp/rtmpmessage.c b/gst/rtmp2/rtmp/rtmpmessage.c new file mode 100644 index 000000000..1fba1d47b --- /dev/null +++ b/gst/rtmp2/rtmp/rtmpmessage.c @@ -0,0 +1,494 @@ +/* GStreamer RTMP Library + * Copyright (C) 2017 Make.TV, Inc. <info@make.tv> + * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "rtmpmessage.h" +#include "rtmpchunkstream.h" + +GST_DEBUG_CATEGORY_STATIC (gst_rtmp_message_debug_category); +#define GST_CAT_DEFAULT gst_rtmp_message_debug_category + +gboolean +gst_rtmp_message_type_is_valid (GstRtmpMessageType type) +{ + switch (type) { + case GST_RTMP_MESSAGE_TYPE_INVALID: + case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE: + case GST_RTMP_MESSAGE_TYPE_ABORT_MESSAGE: + case GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT: + case GST_RTMP_MESSAGE_TYPE_USER_CONTROL: + case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE: + case GST_RTMP_MESSAGE_TYPE_SET_PEER_BANDWIDTH: + case GST_RTMP_MESSAGE_TYPE_AUDIO: + case GST_RTMP_MESSAGE_TYPE_VIDEO: + case GST_RTMP_MESSAGE_TYPE_DATA_AMF3: + case GST_RTMP_MESSAGE_TYPE_SHARED_OBJECT_AMF3: + case GST_RTMP_MESSAGE_TYPE_COMMAND_AMF3: + case GST_RTMP_MESSAGE_TYPE_DATA_AMF0: + case GST_RTMP_MESSAGE_TYPE_SHARED_OBJECT_AMF0: + case GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0: + case GST_RTMP_MESSAGE_TYPE_AGGREGATE: + return TRUE; + default: + return FALSE; + } +} + +gboolean +gst_rtmp_message_type_is_protocol_control (GstRtmpMessageType type) +{ + switch (type) { + case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE: + case GST_RTMP_MESSAGE_TYPE_ABORT_MESSAGE: + case GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT: + case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE: + case GST_RTMP_MESSAGE_TYPE_SET_PEER_BANDWIDTH: + return TRUE; + + default: + return FALSE; + } +} + +const gchar * +gst_rtmp_message_type_get_nick (GstRtmpMessageType type) +{ + switch (type) { + case GST_RTMP_MESSAGE_TYPE_INVALID: + return "invalid"; + case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE: + return "set-chunk-size"; + case GST_RTMP_MESSAGE_TYPE_ABORT_MESSAGE: + return "abort-message"; + case GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT: + return "acknowledgement"; + case GST_RTMP_MESSAGE_TYPE_USER_CONTROL: + return "user-control"; + case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE: + return "window-ack-size"; + case GST_RTMP_MESSAGE_TYPE_SET_PEER_BANDWIDTH: + return "set-peer-bandwidth"; + case GST_RTMP_MESSAGE_TYPE_AUDIO: + return "audio"; + case GST_RTMP_MESSAGE_TYPE_VIDEO: + return "video"; + case GST_RTMP_MESSAGE_TYPE_DATA_AMF3: + return "data-amf3"; + case GST_RTMP_MESSAGE_TYPE_SHARED_OBJECT_AMF3: + return "shared-object-amf3"; + case GST_RTMP_MESSAGE_TYPE_COMMAND_AMF3: + return "command-amf3"; + case GST_RTMP_MESSAGE_TYPE_DATA_AMF0: + return "data-amf0"; + case GST_RTMP_MESSAGE_TYPE_SHARED_OBJECT_AMF0: + return "shared-object-amf0"; + case GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0: + return "command-amf0"; + case GST_RTMP_MESSAGE_TYPE_AGGREGATE: + return "aggregate"; + default: + return "unknown"; + } +} + +const gchar * +gst_rtmp_user_control_type_get_nick (GstRtmpUserControlType type) +{ + switch (type) { + case GST_RTMP_USER_CONTROL_TYPE_STREAM_BEGIN: + return "stream-begin"; + case GST_RTMP_USER_CONTROL_TYPE_STREAM_EOF: + return "stream-eof"; + case GST_RTMP_USER_CONTROL_TYPE_STREAM_DRY: + return "stream-dry"; + case GST_RTMP_USER_CONTROL_TYPE_SET_BUFFER_LENGTH: + return "set-buffer-length"; + case GST_RTMP_USER_CONTROL_TYPE_STREAM_IS_RECORDED: + return "stream-is-recorded"; + case GST_RTMP_USER_CONTROL_TYPE_PING_REQUEST: + return "ping-request"; + case GST_RTMP_USER_CONTROL_TYPE_PING_RESPONSE: + return "ping-response"; + case GST_RTMP_USER_CONTROL_TYPE_SWF_VERIFICATION_REQUEST: + return "swf-verification-request"; + case GST_RTMP_USER_CONTROL_TYPE_SWF_VERIFICATION_RESPONSE: + return "swf-verification-response"; + case GST_RTMP_USER_CONTROL_TYPE_BUFFER_EMPTY: + return "buffer-empty"; + case GST_RTMP_USER_CONTROL_TYPE_BUFFER_READY: + return "buffer-ready"; + default: + return "unknown"; + } +} + +GType +gst_rtmp_meta_api_get_type (void) +{ + static volatile GType type = 0; + static const gchar *tags[] = { + NULL + }; + + if (g_once_init_enter (&type)) { + GType _type = gst_meta_api_type_register ("GstRtmpMetaAPI", tags); + GST_DEBUG_CATEGORY_INIT (gst_rtmp_message_debug_category, + "rtmpmessage", 0, "debug category for rtmp messages"); + g_once_init_leave (&type, _type); + } + return type; +} + +static gboolean +gst_rtmp_meta_init (GstMeta * meta, gpointer params, GstBuffer * buffer) +{ + GstRtmpMeta *emeta = (GstRtmpMeta *) meta; + + emeta->cstream = 0; + emeta->ts_delta = 0; + emeta->size = 0; + emeta->type = GST_RTMP_MESSAGE_TYPE_INVALID; + emeta->mstream = 0; + + return TRUE; +} + +static gboolean +gst_rtmp_meta_transform (GstBuffer * dest, GstMeta * meta, GstBuffer * buffer, + GQuark type, gpointer data) +{ + GstRtmpMeta *smeta, *dmeta; + + if (!GST_META_TRANSFORM_IS_COPY (type)) { + /* We only support copy transforms */ + return FALSE; + } + + smeta = (GstRtmpMeta *) meta; + dmeta = gst_buffer_get_rtmp_meta (dest); + if (!dmeta) { + dmeta = gst_buffer_add_rtmp_meta (dest); + } + + dmeta->cstream = smeta->cstream; + dmeta->ts_delta = smeta->ts_delta; + dmeta->size = smeta->size; + dmeta->type = smeta->type; + dmeta->mstream = smeta->mstream; + + return dmeta != NULL; +} + +const GstMetaInfo * +gst_rtmp_meta_get_info (void) +{ + static const GstMetaInfo *rtmp_meta_info = NULL; + + if (g_once_init_enter (&rtmp_meta_info)) { + const GstMetaInfo *meta = gst_meta_register (GST_RTMP_META_API_TYPE, + "GstRtmpMeta", sizeof *meta, gst_rtmp_meta_init, NULL, + gst_rtmp_meta_transform); + g_once_init_leave (&rtmp_meta_info, meta); + } + return rtmp_meta_info; +} + +GstRtmpMeta * +gst_buffer_add_rtmp_meta (GstBuffer * buffer) +{ + GstRtmpMeta *meta; + + g_return_val_if_fail (GST_IS_BUFFER (buffer), NULL); + + meta = (GstRtmpMeta *) gst_buffer_add_meta (buffer, GST_RTMP_META_INFO, NULL); + + return meta; +} + +GstBuffer * +gst_rtmp_message_new (GstRtmpMessageType type, guint32 cstream, guint32 mstream) +{ + GstBuffer *buffer = gst_buffer_new (); + GstRtmpMeta *meta = gst_buffer_add_rtmp_meta (buffer); + + meta->type = type; + meta->cstream = cstream; + meta->mstream = mstream; + + return buffer; +} + +GstBuffer * +gst_rtmp_message_new_wrapped (GstRtmpMessageType type, guint32 cstream, + guint32 mstream, guint8 * data, gsize size) +{ + GstBuffer *message = gst_rtmp_message_new (type, cstream, mstream); + + gst_buffer_append_memory (message, + gst_memory_new_wrapped (0, data, size, 0, size, data, g_free)); + + return message; +} + +void +gst_rtmp_buffer_dump (GstBuffer * buffer, const gchar * prefix) +{ + GstRtmpMeta *meta; + GstMapInfo map; + + if (G_LIKELY (GST_LEVEL_LOG > _gst_debug_min || GST_LEVEL_LOG > + gst_debug_category_get_threshold (GST_CAT_DEFAULT))) { + return; + } + + g_return_if_fail (GST_IS_BUFFER (buffer)); + g_return_if_fail (prefix); + + GST_LOG ("%s %" GST_PTR_FORMAT, prefix, buffer); + + meta = gst_buffer_get_rtmp_meta (buffer); + if (meta) { + GST_LOG ("%s cstream:%-4" G_GUINT32_FORMAT " mstream:%-4" G_GUINT32_FORMAT + " ts:%-8" G_GUINT32_FORMAT " len:%-6" G_GUINT32_FORMAT " type:%s", + prefix, meta->cstream, meta->mstream, meta->ts_delta, meta->size, + gst_rtmp_message_type_get_nick (meta->type)); + } + + if (G_LIKELY (GST_LEVEL_MEMDUMP > _gst_debug_min || GST_LEVEL_MEMDUMP > + gst_debug_category_get_threshold (GST_CAT_DEFAULT))) { + return; + } + + if (!gst_buffer_map (buffer, &map, GST_MAP_READ)) { + GST_ERROR ("Failed to map %" GST_PTR_FORMAT " for memdump", buffer); + return; + } + + if (map.size > 0) { + GST_MEMDUMP (prefix, map.data, map.size); + } + + gst_buffer_unmap (buffer, &map); +} + +GstRtmpMessageType +gst_rtmp_message_get_type (GstBuffer * buffer) +{ + GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (buffer); + g_return_val_if_fail (meta, GST_RTMP_MESSAGE_TYPE_INVALID); + return meta->type; +} + +gboolean +gst_rtmp_message_is_protocol_control (GstBuffer * buffer) +{ + GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (buffer); + + g_return_val_if_fail (meta, FALSE); + + if (!gst_rtmp_message_type_is_protocol_control (meta->type)) { + return FALSE; + } + + if (meta->cstream != GST_RTMP_CHUNK_STREAM_PROTOCOL) { + GST_WARNING ("Protocol control message on chunk stream %" + G_GUINT32_FORMAT ", not 2", meta->cstream); + } + + if (meta->mstream != 0) { + GST_WARNING ("Protocol control message on message stream %" + G_GUINT32_FORMAT ", not 0", meta->mstream); + } + + return TRUE; +} + +gboolean +gst_rtmp_message_is_user_control (GstBuffer * buffer) +{ + GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (buffer); + + g_return_val_if_fail (meta, FALSE); + + if (meta->type != GST_RTMP_MESSAGE_TYPE_USER_CONTROL) { + return FALSE; + } + + if (meta->cstream != GST_RTMP_CHUNK_STREAM_PROTOCOL) { + GST_WARNING ("User control message on chunk stream %" + G_GUINT32_FORMAT ", not 2", meta->cstream); + } + + if (meta->mstream != 0) { + GST_WARNING ("User control message on message stream %" + G_GUINT32_FORMAT ", not 0", meta->mstream); + } + + return TRUE; +} + +static inline gboolean +pc_has_param2 (GstRtmpMessageType type) +{ + return type == GST_RTMP_MESSAGE_TYPE_SET_PEER_BANDWIDTH; +} + +gboolean +gst_rtmp_message_parse_protocol_control (GstBuffer * buffer, + GstRtmpProtocolControl * out) +{ + GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (buffer); + GstMapInfo map; + GstRtmpProtocolControl pc; + gsize pc_size = 4; + gboolean ret = FALSE; + + g_return_val_if_fail (meta, FALSE); + g_return_val_if_fail (gst_rtmp_message_type_is_protocol_control (meta->type), + FALSE); + + if (!gst_buffer_map (buffer, &map, GST_MAP_READ)) { + GST_ERROR ("can't map protocol control message"); + return FALSE; + } + + pc.type = meta->type; + pc_size = pc_has_param2 (pc.type) ? 5 : 4; + + if (map.size < pc_size) { + GST_ERROR ("can't read protocol control param"); + goto err; + } else if (map.size > pc_size) { + GST_WARNING ("overlength protocol control: %" G_GSIZE_FORMAT " > %" + G_GSIZE_FORMAT, map.size, pc_size); + } + + pc.param = GST_READ_UINT32_BE (map.data); + pc.param2 = pc_has_param2 (pc.type) ? GST_READ_UINT8 (map.data + 4) : 0; + + ret = TRUE; + if (out) { + *out = pc; + } + +err: + gst_buffer_unmap (buffer, &map); + return ret; +} + +GstBuffer * +gst_rtmp_message_new_protocol_control (GstRtmpProtocolControl * pc) +{ + guint8 *data; + gsize size; + + g_return_val_if_fail (pc, NULL); + g_return_val_if_fail (gst_rtmp_message_type_is_protocol_control (pc->type), + NULL); + + size = pc_has_param2 (pc->type) ? 5 : 4; + + data = g_malloc (size); + GST_WRITE_UINT32_BE (data, pc->param); + if (pc_has_param2 (pc->type)) { + GST_WRITE_UINT32_BE (data + 4, pc->param2); + } + + return gst_rtmp_message_new_wrapped (pc->type, + GST_RTMP_CHUNK_STREAM_PROTOCOL, 0, data, size); +} + +static inline gboolean +uc_has_param2 (GstRtmpUserControlType type) +{ + return type == GST_RTMP_USER_CONTROL_TYPE_SET_BUFFER_LENGTH; +} + +gboolean +gst_rtmp_message_parse_user_control (GstBuffer * buffer, + GstRtmpUserControl * out) +{ + GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (buffer); + GstMapInfo map; + GstRtmpUserControl uc; + gsize uc_size; + gboolean ret = FALSE; + + g_return_val_if_fail (meta, FALSE); + g_return_val_if_fail (meta->type == GST_RTMP_MESSAGE_TYPE_USER_CONTROL, + FALSE); + + if (!gst_buffer_map (buffer, &map, GST_MAP_READ)) { + GST_ERROR ("can't map user control message"); + return FALSE; + } + + if (map.size < 2) { + GST_ERROR ("can't read user control type"); + goto err; + } + + uc.type = GST_READ_UINT16_BE (map.data); + uc_size = uc_has_param2 (uc.type) ? 10 : 6; + + if (map.size < uc_size) { + GST_ERROR ("can't read user control param"); + goto err; + } else if (map.size > uc_size) { + GST_WARNING ("overlength user control: %" G_GSIZE_FORMAT " > %" + G_GSIZE_FORMAT, map.size, uc_size); + } + + uc.param = GST_READ_UINT32_BE (map.data + 2); + uc.param2 = uc_has_param2 (uc.type) ? GST_READ_UINT32_BE (map.data + 6) : 0; + + ret = TRUE; + if (out) { + *out = uc; + } + +err: + gst_buffer_unmap (buffer, &map); + return ret; +} + +GstBuffer * +gst_rtmp_message_new_user_control (GstRtmpUserControl * uc) +{ + guint8 *data; + gsize size; + + g_return_val_if_fail (uc, NULL); + + size = uc_has_param2 (uc->type) ? 10 : 6; + + data = g_malloc (size); + GST_WRITE_UINT16_BE (data, uc->type); + GST_WRITE_UINT32_BE (data + 2, uc->param); + if (uc_has_param2 (uc->type)) { + GST_WRITE_UINT32_BE (data + 6, uc->param2); + } + + return gst_rtmp_message_new_wrapped (GST_RTMP_MESSAGE_TYPE_USER_CONTROL, + GST_RTMP_CHUNK_STREAM_PROTOCOL, 0, data, size); +} diff --git a/gst/rtmp2/rtmp/rtmpmessage.h b/gst/rtmp2/rtmp/rtmpmessage.h new file mode 100644 index 000000000..92f3fdc95 --- /dev/null +++ b/gst/rtmp2/rtmp/rtmpmessage.h @@ -0,0 +1,142 @@ +/* GStreamer RTMP Library + * Copyright (C) 2017 Make.TV, Inc. <info@make.tv> + * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef _GST_RTMP_MESSAGE_H_ +#define _GST_RTMP_MESSAGE_H_ + +#include <gst/gst.h> + +G_BEGIN_DECLS + +typedef enum { + GST_RTMP_MESSAGE_TYPE_INVALID = 0, + GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE = 1, + GST_RTMP_MESSAGE_TYPE_ABORT_MESSAGE = 2, + GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT = 3, + GST_RTMP_MESSAGE_TYPE_USER_CONTROL = 4, + GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE = 5, + GST_RTMP_MESSAGE_TYPE_SET_PEER_BANDWIDTH = 6, + GST_RTMP_MESSAGE_TYPE_AUDIO = 8, + GST_RTMP_MESSAGE_TYPE_VIDEO = 9, + GST_RTMP_MESSAGE_TYPE_DATA_AMF3 = 15, + GST_RTMP_MESSAGE_TYPE_SHARED_OBJECT_AMF3 = 16, + GST_RTMP_MESSAGE_TYPE_COMMAND_AMF3 = 17, + GST_RTMP_MESSAGE_TYPE_DATA_AMF0 = 18, + GST_RTMP_MESSAGE_TYPE_SHARED_OBJECT_AMF0 = 19, + GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0 = 20, + GST_RTMP_MESSAGE_TYPE_AGGREGATE = 22, +} GstRtmpMessageType; + +gboolean gst_rtmp_message_type_is_valid (GstRtmpMessageType type); +gboolean gst_rtmp_message_type_is_protocol_control (GstRtmpMessageType type); +const gchar * gst_rtmp_message_type_get_nick (GstRtmpMessageType type); + +typedef enum +{ + GST_RTMP_USER_CONTROL_TYPE_STREAM_BEGIN = 0, + GST_RTMP_USER_CONTROL_TYPE_STREAM_EOF = 1, + GST_RTMP_USER_CONTROL_TYPE_STREAM_DRY = 2, + GST_RTMP_USER_CONTROL_TYPE_SET_BUFFER_LENGTH = 3, + GST_RTMP_USER_CONTROL_TYPE_STREAM_IS_RECORDED = 4, + GST_RTMP_USER_CONTROL_TYPE_PING_REQUEST = 6, + GST_RTMP_USER_CONTROL_TYPE_PING_RESPONSE = 7, + + /* undocumented */ + GST_RTMP_USER_CONTROL_TYPE_SWF_VERIFICATION_REQUEST = 26, + GST_RTMP_USER_CONTROL_TYPE_SWF_VERIFICATION_RESPONSE = 27, + GST_RTMP_USER_CONTROL_TYPE_BUFFER_EMPTY = 31, + GST_RTMP_USER_CONTROL_TYPE_BUFFER_READY = 32, +} GstRtmpUserControlType; + +const gchar * gst_rtmp_user_control_type_get_nick ( + GstRtmpUserControlType type); + +#define GST_RTMP_META_API_TYPE (gst_rtmp_meta_api_get_type()) +#define GST_RTMP_META_INFO (gst_rtmp_meta_get_info()) +typedef struct _GstRtmpMeta GstRtmpMeta; + +struct _GstRtmpMeta { + GstMeta meta; + guint32 cstream; + guint32 ts_delta; + guint32 size; + GstRtmpMessageType type; + guint32 mstream; +}; + +GType gst_rtmp_meta_api_get_type (void); +const GstMetaInfo * gst_rtmp_meta_get_info (void); + +GstRtmpMeta * gst_buffer_add_rtmp_meta (GstBuffer * buffer); + +static inline GstRtmpMeta * +gst_buffer_get_rtmp_meta (GstBuffer * buffer) +{ + return (GstRtmpMeta *) gst_buffer_get_meta (buffer, GST_RTMP_META_API_TYPE); +} + +GstBuffer * gst_rtmp_message_new (GstRtmpMessageType type, guint32 cstream, + guint32 mstream); +GstBuffer * gst_rtmp_message_new_wrapped (GstRtmpMessageType type, guint32 cstream, + guint32 mstream, guint8 * data, gsize size); + +void gst_rtmp_buffer_dump (GstBuffer * buffer, const gchar * prefix); + +GstRtmpMessageType gst_rtmp_message_get_type (GstBuffer * buffer); +gboolean gst_rtmp_message_is_protocol_control (GstBuffer * buffer); +gboolean gst_rtmp_message_is_user_control (GstBuffer * buffer); + +typedef struct { + GstRtmpMessageType type; + + /* for SET_CHUNK_SIZE: chunk size */ + /* for ABORT_MESSAGE: chunk stream ID */ + /* for ACKNOWLEDGEMENT: acknowledged byte count */ + /* for WINDOW_ACK_SIZE and SET_PEER_BANDWIDTH: acknowledgement window size */ + guint32 param; + + /* for SET_PEER_BANDWIDTH: limit type */ + guint8 param2; +} GstRtmpProtocolControl; + +gboolean gst_rtmp_message_parse_protocol_control (GstBuffer * buffer, + GstRtmpProtocolControl * out); + +GstBuffer * gst_rtmp_message_new_protocol_control (GstRtmpProtocolControl * pc); + +typedef struct { + GstRtmpUserControlType type; + + /* for STREAM_BEGIN to STREAM_IS_RECORDED: message stream ID */ + /* for PING_REQUEST and PING_RESPONSE: timestamp of request */ + guint32 param; + + /* for SET_BUFFER_LENGTH: buffer length in ms */ + guint32 param2; +} GstRtmpUserControl; + +gboolean gst_rtmp_message_parse_user_control (GstBuffer * buffer, + GstRtmpUserControl * out); + +GstBuffer * gst_rtmp_message_new_user_control (GstRtmpUserControl * uc); + +G_END_DECLS + +#endif diff --git a/gst/rtmp2/rtmp/rtmputils.c b/gst/rtmp2/rtmp/rtmputils.c new file mode 100644 index 000000000..c9bbc3a98 --- /dev/null +++ b/gst/rtmp2/rtmp/rtmputils.c @@ -0,0 +1,246 @@ +/* GStreamer RTMP Library + * Copyright (C) 2013 David Schleef <ds@schleef.org> + * Copyright (C) 2017 Make.TV, Inc. <info@make.tv> + * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin Street, Suite 500, + * Boston, MA 02110-1335, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "rtmputils.h" +#include <string.h> + +static void read_all_bytes_done (GObject * source, GAsyncResult * result, + gpointer user_data); +static void write_all_bytes_done (GObject * source, GAsyncResult * result, + gpointer user_data); + +void +gst_rtmp_byte_array_append_bytes (GByteArray * bytearray, GBytes * bytes) +{ + const guint8 *data; + gsize size; + guint offset; + + g_return_if_fail (bytearray); + + offset = bytearray->len; + data = g_bytes_get_data (bytes, &size); + + g_return_if_fail (data); + + g_byte_array_set_size (bytearray, offset + size); + memcpy (bytearray->data + offset, data, size); +} + +void +gst_rtmp_input_stream_read_all_bytes_async (GInputStream * stream, gsize count, + int io_priority, GCancellable * cancellable, GAsyncReadyCallback callback, + gpointer user_data) +{ + GTask *task; + GByteArray *ba; + + g_return_if_fail (G_IS_INPUT_STREAM (stream)); + + task = g_task_new (stream, cancellable, callback, user_data); + + ba = g_byte_array_sized_new (count); + g_byte_array_set_size (ba, count); + g_task_set_task_data (task, ba, (GDestroyNotify) g_byte_array_unref); + + g_input_stream_read_all_async (stream, ba->data, count, io_priority, + cancellable, read_all_bytes_done, task); +} + +static void +read_all_bytes_done (GObject * source, GAsyncResult * result, + gpointer user_data) +{ + GInputStream *is = G_INPUT_STREAM (source); + GTask *task = user_data; + GByteArray *ba = g_task_get_task_data (task); + GError *error = NULL; + gboolean res; + gsize bytes_read; + GBytes *bytes; + + res = g_input_stream_read_all_finish (is, result, &bytes_read, &error); + if (!res) { + g_task_return_error (task, error); + g_object_unref (task); + return; + } + + g_byte_array_set_size (ba, bytes_read); + bytes = g_byte_array_free_to_bytes (g_byte_array_ref (ba)); + + g_task_return_pointer (task, bytes, (GDestroyNotify) g_bytes_unref); + g_object_unref (task); +} + +GBytes * +gst_rtmp_input_stream_read_all_bytes_finish (GInputStream * stream, + GAsyncResult * result, GError ** error) +{ + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + return g_task_propagate_pointer (G_TASK (result), error); +} + +void +gst_rtmp_output_stream_write_all_bytes_async (GOutputStream * stream, + GBytes * bytes, int io_priority, GCancellable * cancellable, + GAsyncReadyCallback callback, gpointer user_data) +{ + GTask *task; + const void *data; + gsize size; + + g_return_if_fail (G_IS_OUTPUT_STREAM (stream)); + g_return_if_fail (bytes); + + data = g_bytes_get_data (bytes, &size); + g_return_if_fail (data); + + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_task_data (task, g_bytes_ref (bytes), + (GDestroyNotify) g_bytes_unref); + + g_output_stream_write_all_async (stream, data, size, io_priority, + cancellable, write_all_bytes_done, task); +} + +static void +write_all_bytes_done (GObject * source, GAsyncResult * result, + gpointer user_data) +{ + GOutputStream *os = G_OUTPUT_STREAM (source); + GTask *task = user_data; + GError *error = NULL; + gboolean res; + + res = g_output_stream_write_all_finish (os, result, NULL, &error); + if (!res) { + g_task_return_error (task, error); + g_object_unref (task); + return; + } + + g_task_return_boolean (task, TRUE); + g_object_unref (task); +} + +gboolean +gst_rtmp_output_stream_write_all_bytes_finish (GOutputStream * stream, + GAsyncResult * result, GError ** error) +{ + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + return g_task_propagate_boolean (G_TASK (result), error); +} + +static const gchar ascii_table[128] = { + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + ' ', '!', 0x0, '#', '$', '%', '&', '\'', + '(', ')', '*', '+', ',', '-', '.', '/', + '0', '1', '2', '3', '4', '5', '6', '7', + '8', '9', ':', ';', '<', '=', '>', '?', + '@', 'A', 'B', 'C', 'D', 'E', 'F', 'G', + 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', + 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', + 'X', 'Y', 'Z', '[', 0x0, ']', '^', '_', + '`', 'a', 'b', 'c', 'd', 'e', 'f', 'g', + 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', + 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', + 'x', 'y', 'z', '{', '|', '}', '~', 0x0, +}; + +static const gchar ascii_escapes[128] = { + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 'a', + 'b', 't', 'n', 'v', 'f', 'r', 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, '"', 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, '\\', 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, +}; + +void +gst_rtmp_string_print_escaped (GString * string, const gchar * data, + gssize size) +{ + gssize i; + + g_return_if_fail (string); + + if (!data) { + g_string_append (string, "(NULL)"); + return; + } + + g_string_append_c (string, '"'); + + for (i = 0; size < 0 ? data[i] != 0 : i < size; i++) { + guchar c = data[i]; + + if (G_LIKELY (c < G_N_ELEMENTS (ascii_table))) { + if (ascii_table[c]) { + g_string_append_c (string, c); + continue; + } + + if (ascii_escapes[c]) { + g_string_append_c (string, '\\'); + g_string_append_c (string, ascii_escapes[c]); + continue; + } + } else { + gunichar uc = g_utf8_get_char_validated (data + i, + size < 0 ? -1 : size - i); + if (uc != (gunichar) (-2) && uc != (gunichar) (-1)) { + if (g_unichar_isprint (uc)) { + g_string_append_unichar (string, uc); + } else if (uc <= G_MAXUINT16) { + g_string_append_printf (string, "\\u%04X", uc); + } else { + g_string_append_printf (string, "\\U%08X", uc); + } + + i += g_utf8_skip[c] - 1; + continue; + } + } + + g_string_append_printf (string, "\\x%02X", c); + } + + g_string_append_c (string, '"'); + +} diff --git a/gst/rtmp2/rtmp/rtmputils.h b/gst/rtmp2/rtmp/rtmputils.h new file mode 100644 index 000000000..b427f37b4 --- /dev/null +++ b/gst/rtmp2/rtmp/rtmputils.h @@ -0,0 +1,46 @@ +/* GStreamer RTMP Library + * Copyright (C) 2013 David Schleef <ds@schleef.org> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef _GST_RTMP_UTILS_H_ +#define _GST_RTMP_UTILS_H_ + +#include <gio/gio.h> + +G_BEGIN_DECLS + +void gst_rtmp_byte_array_append_bytes (GByteArray * bytearray, GBytes * bytes); + +void gst_rtmp_input_stream_read_all_bytes_async (GInputStream * stream, + gsize count, int io_priority, GCancellable * cancellable, + GAsyncReadyCallback callback, gpointer user_data); +GBytes * gst_rtmp_input_stream_read_all_bytes_finish (GInputStream * stream, + GAsyncResult * result, GError ** error); + +void gst_rtmp_output_stream_write_all_bytes_async (GOutputStream * stream, + GBytes * bytes, int io_priority, GCancellable * cancellable, + GAsyncReadyCallback callback, gpointer user_data); +gboolean gst_rtmp_output_stream_write_all_bytes_finish (GOutputStream * stream, + GAsyncResult * result, GError ** error); + +void gst_rtmp_string_print_escaped (GString * string, const gchar *data, + gssize size); + +G_END_DECLS + +#endif |