summaryrefslogtreecommitdiff
path: root/gst/rtmp2
diff options
context:
space:
mode:
authorVivia Nikolaidou <vivia@ahiru.eu>2019-11-05 13:52:55 +0000
committerGStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org>2019-11-05 13:52:55 +0000
commit2386858a9179aff2ec249bdffa904bf407de455f (patch)
tree46bf7595022397f01c369ec1ca808c0e3963b2e2 /gst/rtmp2
parent5320bb9085ac3332d89ed9bfa3120b95ca2c1d97 (diff)
downloadgstreamer-plugins-bad-2386858a9179aff2ec249bdffa904bf407de455f.tar.gz
Add files from gst-rtmp
For master, without autotools.
Diffstat (limited to 'gst/rtmp2')
-rw-r--r--gst/rtmp2/TODO63
-rw-r--r--gst/rtmp2/gstrtmp2.c44
-rw-r--r--gst/rtmp2/gstrtmp2locationhandler.c267
-rw-r--r--gst/rtmp2/gstrtmp2locationhandler.h49
-rw-r--r--gst/rtmp2/gstrtmp2sink.c1064
-rw-r--r--gst/rtmp2/gstrtmp2sink.h34
-rw-r--r--gst/rtmp2/gstrtmp2src.c839
-rw-r--r--gst/rtmp2/gstrtmp2src.h34
-rw-r--r--gst/rtmp2/meson.build24
-rw-r--r--gst/rtmp2/rtmp/amf.c1141
-rw-r--r--gst/rtmp2/rtmp/amf.h112
-rw-r--r--gst/rtmp2/rtmp/rtmpchunkstream.c714
-rw-r--r--gst/rtmp2/rtmp/rtmpchunkstream.h59
-rw-r--r--gst/rtmp2/rtmp/rtmpclient.c1240
-rw-r--r--gst/rtmp2/rtmp/rtmpclient.h101
-rw-r--r--gst/rtmp2/rtmp/rtmpconnection.c996
-rw-r--r--gst/rtmp2/rtmp/rtmpconnection.h82
-rw-r--r--gst/rtmp2/rtmp/rtmphandshake.c311
-rw-r--r--gst/rtmp2/rtmp/rtmphandshake.h35
-rw-r--r--gst/rtmp2/rtmp/rtmpmessage.c494
-rw-r--r--gst/rtmp2/rtmp/rtmpmessage.h142
-rw-r--r--gst/rtmp2/rtmp/rtmputils.c246
-rw-r--r--gst/rtmp2/rtmp/rtmputils.h46
23 files changed, 8137 insertions, 0 deletions
diff --git a/gst/rtmp2/TODO b/gst/rtmp2/TODO
new file mode 100644
index 000000000..a43e48d8d
--- /dev/null
+++ b/gst/rtmp2/TODO
@@ -0,0 +1,63 @@
+- rtmp2sink: Should look into reconnecting and resuming stream without
+ deleting and recreating stream, which drops clients.
+
+- Move AMF parser/serializer to GstRtmpMeta?
+- Move AMF nodes from g_slice to GstMiniObject?
+
+- First video frame that comes from Wowza seems to be out-of-order; librtmp
+ does not have this problem
+
+- Refactor connection, pull out the ad-hoc read and write handling and put it
+ with the chunk layer into GBuffered{In,Out}putStream subclasses
+
+- Refactor elements and pull out the common connection+mainloop handling code
+ into a context object
+
+- Change the location properties into something with less boilerplate?
+
+ Perhaps a GstStructure-based prop, custom GValue transforms or GstValue
+ (de)serializing
+
+- Use glib-mkenums to generate GEnumClasses
+
+- Post-connect onStatus handling (needed for src EOS and async errors?)
+
+- Better mux/demux, at the cost of losing compatibility with flvmux/demux.
+
+ Something like (a/x = application/x-rtmp-messages):
+
+ rtmp2src ! a/x ! rtmp2demux ! a/x,type=video ! rtmp2videodecode ! h264parse
+ ! a/x,type=audio ! rtmp2audiodecode ! aacparse
+
+ x264enc ! rtmp2videoencode ! a/x,type=video ! rtmp2mux ! a/x ! rtmp2sink
+ fdkaacenc ! rtmp2audioencode ! a/x,type=audio !
+
+ And also, in case no muxing is required:
+
+ x264enc ! rtmp2videoencode ! a/x,type=video ! rtmp2sink
+ fdkaacenc ! rtmp2audioencode ! a/x,type=video ! rtmp2sink
+
+ Proper GstBuffer timestamps need proper timestamp wraparound handling
+
+- Better client element, which generalizes the existing sink/src to allow
+ multiple streams over one connection
+ - Request src pad to play a stream
+ - Request sink pad to publish a stream (base it on GstAggregator?)
+ - rtmp2sink/src just specialize the client element with a static pad
+
+- Server implementation
+
+- Support more protocols
+ - rtmpe (App-layer encryption)
+ - rtmpt (HTTP tunneling)
+ - rtmpte (HTTP tunneling + App-layer encryption)
+ - rtmpts (HTTPS tunneling)
+ - rtmfp (UDP)
+
+Needed testing:
+
+- AMF parsing
+
+- connection closure by peer
+
+- connection timeouts
diff --git a/gst/rtmp2/gstrtmp2.c b/gst/rtmp2/gstrtmp2.c
new file mode 100644
index 000000000..22c6399cf
--- /dev/null
+++ b/gst/rtmp2/gstrtmp2.c
@@ -0,0 +1,44 @@
+/* GStreamer
+ * Copyright (C) 2014 David Schleef <ds@schleef.org>
+ * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
+ * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+ * Boston, MA 02110-1335, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstrtmp2src.h"
+#include "gstrtmp2sink.h"
+
+static gboolean
+plugin_init (GstPlugin * plugin)
+{
+ gst_element_register (plugin, "rtmp2src", GST_RANK_PRIMARY + 1,
+ GST_TYPE_RTMP2_SRC);
+ gst_element_register (plugin, "rtmp2sink", GST_RANK_PRIMARY + 1,
+ GST_TYPE_RTMP2_SINK);
+
+ return TRUE;
+}
+
+GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
+ GST_VERSION_MINOR,
+ rtmp2,
+ "RTMP plugin",
+ plugin_init, VERSION, GST_LICENSE, PACKAGE_NAME, GST_PACKAGE_ORIGIN)
diff --git a/gst/rtmp2/gstrtmp2locationhandler.c b/gst/rtmp2/gstrtmp2locationhandler.c
new file mode 100644
index 000000000..6c1eada87
--- /dev/null
+++ b/gst/rtmp2/gstrtmp2locationhandler.c
@@ -0,0 +1,267 @@
+/* GStreamer
+ * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
+ * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#include "gstrtmp2locationhandler.h"
+#include "rtmp/rtmputils.h"
+#include "rtmp/rtmpclient.h"
+#include <string.h>
+
+#define DEFAULT_SCHEME GST_RTMP_SCHEME_RTMP
+#define DEFAULT_HOST "localhost"
+#define DEFAULT_APPLICATION "live"
+#define DEFAULT_STREAM "myStream"
+#define DEFAULT_LOCATION "rtmp://" DEFAULT_HOST "/" DEFAULT_APPLICATION "/" DEFAULT_STREAM
+#define DEFAULT_SECURE_TOKEN NULL
+#define DEFAULT_USERNAME NULL
+#define DEFAULT_PASSWORD NULL
+#define DEFAULT_AUTHMOD GST_RTMP_AUTHMOD_AUTO
+#define DEFAULT_TIMEOUT 5
+
+G_DEFINE_INTERFACE (GstRtmpLocationHandler, gst_rtmp_location_handler, 0);
+
+#define GST_CAT_DEFAULT gst_rtmp_location_handler_debug_category
+GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
+
+static void
+gst_rtmp_location_handler_default_init (GstRtmpLocationHandlerInterface * iface)
+{
+ GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "rtmp2locationhandler", 0,
+ "RTMP2 Location Handling");
+
+ g_object_interface_install_property (iface, g_param_spec_string ("location",
+ "Location", "Location of RTMP stream to access", DEFAULT_LOCATION,
+ G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_interface_install_property (iface, g_param_spec_enum ("scheme",
+ "Scheme", "RTMP connection scheme",
+ GST_TYPE_RTMP_SCHEME, DEFAULT_SCHEME,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_interface_install_property (iface, g_param_spec_string ("host",
+ "Host", "RTMP server host name", DEFAULT_HOST,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_interface_install_property (iface, g_param_spec_int ("port", "Port",
+ "RTMP server port", 1, 65535,
+ gst_rtmp_scheme_get_default_port (DEFAULT_SCHEME),
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_interface_install_property (iface,
+ g_param_spec_string ("application", "Application",
+ "RTMP application path", DEFAULT_APPLICATION,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_interface_install_property (iface, g_param_spec_string ("stream",
+ "Stream", "RTMP stream path", DEFAULT_STREAM,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_interface_install_property (iface, g_param_spec_string ("username",
+ "User name", "RTMP authorization user name", DEFAULT_USERNAME,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_interface_install_property (iface, g_param_spec_string ("password",
+ "Password", "RTMP authorization password", DEFAULT_PASSWORD,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_interface_install_property (iface,
+ g_param_spec_string ("secure-token", "Secure token",
+ "RTMP authorization token", DEFAULT_SECURE_TOKEN,
+ G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_interface_install_property (iface, g_param_spec_enum ("authmod",
+ "Authorization mode", "RTMP authorization mode",
+ GST_TYPE_RTMP_AUTHMOD, DEFAULT_AUTHMOD,
+ G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_interface_install_property (iface, g_param_spec_uint ("timeout",
+ "Timeout", "RTMP timeout in seconds", 0, G_MAXUINT, DEFAULT_TIMEOUT,
+ G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_interface_install_property (iface,
+ g_param_spec_flags ("tls-validation-flags", "TLS validation flags",
+ "TLS validation flags to use", G_TYPE_TLS_CERTIFICATE_FLAGS,
+ G_TLS_CERTIFICATE_VALIDATE_ALL,
+ G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+}
+
+static GstURIType
+uri_handler_get_type_sink (GType type)
+{
+ return GST_URI_SINK;
+}
+
+static GstURIType
+uri_handler_get_type_src (GType type)
+{
+ return GST_URI_SRC;
+}
+
+static const gchar *const *
+uri_handler_get_protocols (GType type)
+{
+ return gst_rtmp_scheme_get_strings ();
+}
+
+static gchar *
+uri_handler_get_uri (GstURIHandler * handler)
+{
+ GstRtmpLocation location = { 0, };
+ gchar *string;
+
+ g_object_get (handler, "scheme", &location.scheme, "host", &location.host,
+ "port", &location.port, "application", &location.application,
+ "stream", &location.stream, NULL);
+
+ string = gst_rtmp_location_get_string (&location, TRUE);
+ gst_rtmp_location_clear (&location);
+ return string;
+}
+
+static gboolean
+uri_handler_set_uri (GstURIHandler * handler, const gchar * string,
+ GError ** error)
+{
+ GstRtmpLocationHandler *self = GST_RTMP_LOCATION_HANDLER (handler);
+ GstUri *uri;
+ const gchar *scheme_sep, *path_sep, *stream_sep, *host, *userinfo;
+ GstRtmpScheme scheme;
+ guint port;
+ gboolean ret = FALSE;
+
+ GST_DEBUG_OBJECT (self, "setting URI from %s", GST_STR_NULL (string));
+ g_return_val_if_fail (string, FALSE);
+
+ scheme_sep = strstr (string, "://");
+ if (!scheme_sep) {
+ g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_REFERENCE,
+ "URI lacks scheme: %s", string);
+ return FALSE;
+ }
+
+ path_sep = strchr (scheme_sep + 3, '/');
+ if (!path_sep) {
+ g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_REFERENCE,
+ "URI lacks path: %s", string);
+ return FALSE;
+ }
+
+ stream_sep = strrchr (path_sep + 1, '/');
+ if (!stream_sep) {
+ g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_REFERENCE,
+ "URI lacks stream: %s", string);
+ return FALSE;
+ }
+
+ {
+ gchar *string_without_path = g_strndup (string, path_sep - string);
+ uri = gst_uri_from_string (string_without_path);
+ g_free (string_without_path);
+ }
+
+ if (!uri) {
+ g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
+ "URI failed to parse: %s", string);
+ return FALSE;
+ }
+
+ gst_uri_normalize (uri);
+
+ scheme = gst_rtmp_scheme_from_uri (uri);
+ if (scheme < 0) {
+ g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_REFERENCE,
+ "URI has bad scheme: %s", string);
+ goto out;
+ }
+
+ host = gst_uri_get_host (uri);
+ if (!host) {
+ g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_REFERENCE,
+ "URI lacks hostname: %s", string);
+ goto out;
+ }
+
+ port = gst_uri_get_port (uri);
+ if (port == GST_URI_NO_PORT) {
+ port = gst_rtmp_scheme_get_default_port (scheme);
+ }
+
+ {
+ const gchar *path = path_sep + 1, *stream = stream_sep + 1;
+ gchar *application = g_strndup (path, stream_sep - path);
+
+ GST_DEBUG_OBJECT (self, "setting location to %s://%s:%u/%s stream %s",
+ gst_rtmp_scheme_to_string (scheme), host, port, application, stream);
+
+ g_object_set (self, "scheme", scheme, "host", host, "port", port,
+ "application", application, "stream", stream, "username", NULL,
+ "password", NULL, NULL);
+
+ g_free (application);
+ }
+
+ userinfo = gst_uri_get_userinfo (uri);
+ if (userinfo) {
+ gchar **split = g_strsplit (userinfo, ":", 2);
+
+ if (!split || !split[0] || !split[1]) {
+ g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_REFERENCE,
+ "Failed to parse username:password data");
+ g_strfreev (split);
+ goto out;
+ }
+
+ g_object_set (self, "username", split[0], "password", split[1], NULL);
+ g_strfreev (split);
+ }
+
+ ret = TRUE;
+
+out:
+ gst_uri_unref (uri);
+ return ret;
+}
+
+void
+gst_rtmp_location_handler_implement_uri_handler (GstURIHandlerInterface * iface,
+ GstURIType type)
+{
+ switch (type) {
+ case GST_URI_SINK:
+ iface->get_type = uri_handler_get_type_sink;
+ break;
+ case GST_URI_SRC:
+ iface->get_type = uri_handler_get_type_src;
+ break;
+ default:
+ g_return_if_reached ();
+ }
+ iface->get_protocols = uri_handler_get_protocols;
+ iface->get_uri = uri_handler_get_uri;
+ iface->set_uri = uri_handler_set_uri;
+}
+
+gboolean
+gst_rtmp_location_handler_set_uri (GstRtmpLocationHandler * handler,
+ const gchar * uri)
+{
+ GError *error = NULL;
+ gboolean ret;
+
+ g_return_val_if_fail (GST_IS_RTMP_LOCATION_HANDLER (handler), FALSE);
+
+ ret = gst_uri_handler_set_uri (GST_URI_HANDLER (handler), uri, &error);
+ if (!ret) {
+ GST_ERROR_OBJECT (handler, "Failed to set URI: %s", error->message);
+ g_object_set (handler, "scheme", DEFAULT_SCHEME, "host", NULL,
+ "port", gst_rtmp_scheme_get_default_port (DEFAULT_SCHEME),
+ "application", NULL, "stream", NULL, NULL);
+ g_error_free (error);
+ }
+ return ret;
+}
diff --git a/gst/rtmp2/gstrtmp2locationhandler.h b/gst/rtmp2/gstrtmp2locationhandler.h
new file mode 100644
index 000000000..51b560ce6
--- /dev/null
+++ b/gst/rtmp2/gstrtmp2locationhandler.h
@@ -0,0 +1,49 @@
+/* GStreamer
+ * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
+ * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __GST_RTMP_LOCATION_HANDLER_H__
+#define __GST_RTMP_LOCATION_HANDLER_H__
+
+#include <gst/gst.h>
+#include "rtmp/rtmpclient.h"
+
+G_BEGIN_DECLS
+#define GST_TYPE_RTMP_LOCATION_HANDLER (gst_rtmp_location_handler_get_type ())
+#define GST_RTMP_LOCATION_HANDLER(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_RTMP_LOCATION_HANDLER, GstRtmpLocationHandler))
+#define GST_IS_RTMP_LOCATION_HANDLER(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_RTMP_LOCATION_HANDLER))
+#define GST_RTMP_LOCATION_HANDLER_GET_INTERFACE(inst) (G_TYPE_INSTANCE_GET_INTERFACE ((inst), GST_TYPE_RTMP_LOCATION_HANDLER, GstRtmpLocationHandlerInterface))
+typedef struct _GstRtmpLocationHandler GstRtmpLocationHandler; /* dummy object */
+typedef struct _GstRtmpLocationHandlerInterface GstRtmpLocationHandlerInterface;
+
+struct _GstRtmpLocationHandlerInterface
+{
+ GTypeInterface parent_iface;
+};
+
+GType gst_rtmp_location_handler_get_type (void);
+
+void gst_rtmp_location_handler_implement_uri_handler (GstURIHandlerInterface *
+ iface, GstURIType type);
+
+gboolean gst_rtmp_location_handler_set_uri (GstRtmpLocationHandler * handler,
+ const gchar * uri);
+
+G_END_DECLS
+#endif
diff --git a/gst/rtmp2/gstrtmp2sink.c b/gst/rtmp2/gstrtmp2sink.c
new file mode 100644
index 000000000..2ea8b2e38
--- /dev/null
+++ b/gst/rtmp2/gstrtmp2sink.c
@@ -0,0 +1,1064 @@
+/* GStreamer
+ * Copyright (C) 2014 David Schleef <ds@schleef.org>
+ * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
+ * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+ * Boston, MA 02110-1335, USA.
+ */
+/**
+ * SECTION:element-gstrtmp2sink
+ *
+ * The rtmp2sink element sends audio and video streams to an RTMP
+ * server.
+ *
+ * <refsect2>
+ * <title>Example launch line</title>
+ * |[
+ * gst-launch -v videotestsrc ! x264enc ! flvmux ! rtmp2sink
+ * location=rtmp://server.example.com/live/myStream
+ * ]|
+ * FIXME Describe what the pipeline does.
+ * </refsect2>
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstrtmp2sink.h"
+
+#include "gstrtmp2locationhandler.h"
+#include "rtmp/rtmpclient.h"
+#include "rtmp/rtmpmessage.h"
+
+#include <gst/gst.h>
+#include <gst/base/gstbasesink.h>
+#include <gio/gnetworking.h>
+#include <string.h>
+
+GST_DEBUG_CATEGORY_STATIC (gst_rtmp2_sink_debug_category);
+#define GST_CAT_DEFAULT gst_rtmp2_sink_debug_category
+
+/* prototypes */
+#define GST_RTMP2_SINK(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTMP2_SINK,GstRtmp2Sink))
+#define GST_IS_RTMP2_SINK(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTMP2_SINK))
+
+typedef struct
+{
+ GstBaseSink parent_instance;
+
+ /* properties */
+ GstRtmpLocation location;
+ gboolean async_connect;
+ guint peak_kbps;
+
+ /* stuff */
+ gboolean running, flushing;
+ GMutex lock;
+ GCond cond;
+
+ GstTask *task;
+ GRecMutex task_lock;
+
+ GMainLoop *loop;
+ GMainContext *context;
+
+ GCancellable *cancellable;
+ GstRtmpConnection *connection;
+ guint32 stream_id;
+
+ GPtrArray *headers;
+ guint64 last_ts, base_ts; /* timestamp fixup */
+} GstRtmp2Sink;
+
+typedef struct
+{
+ GstBaseSinkClass parent_class;
+} GstRtmp2SinkClass;
+
+/* GObject virtual functions */
+static void gst_rtmp2_sink_set_property (GObject * object,
+ guint property_id, const GValue * value, GParamSpec * pspec);
+static void gst_rtmp2_sink_get_property (GObject * object,
+ guint property_id, GValue * value, GParamSpec * pspec);
+static void gst_rtmp2_sink_finalize (GObject * object);
+static void gst_rtmp2_sink_uri_handler_init (GstURIHandlerInterface * iface);
+
+/* GstBaseSink virtual functions */
+static gboolean gst_rtmp2_sink_start (GstBaseSink * sink);
+static gboolean gst_rtmp2_sink_stop (GstBaseSink * sink);
+static gboolean gst_rtmp2_sink_unlock (GstBaseSink * sink);
+static gboolean gst_rtmp2_sink_unlock_stop (GstBaseSink * sink);
+static GstFlowReturn gst_rtmp2_sink_render (GstBaseSink * sink,
+ GstBuffer * buffer);
+static gboolean gst_rtmp2_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
+
+/* Internal API */
+static void gst_rtmp2_sink_task_func (gpointer user_data);
+
+static void client_connect_done (GObject * source, GAsyncResult * result,
+ gpointer user_data);
+static void start_publish_done (GObject * source, GAsyncResult * result,
+ gpointer user_data);
+static void connect_task_done (GObject * object, GAsyncResult * result,
+ gpointer user_data);
+
+static void set_pacing_rate (GstRtmp2Sink * self);
+
+enum
+{
+ PROP_0,
+ PROP_LOCATION,
+ PROP_SCHEME,
+ PROP_HOST,
+ PROP_PORT,
+ PROP_APPLICATION,
+ PROP_STREAM,
+ PROP_SECURE_TOKEN,
+ PROP_USERNAME,
+ PROP_PASSWORD,
+ PROP_AUTHMOD,
+ PROP_TIMEOUT,
+ PROP_TLS_VALIDATION_FLAGS,
+ PROP_ASYNC_CONNECT,
+ PROP_PEAK_KBPS,
+};
+
+/* pad templates */
+
+static GstStaticPadTemplate gst_rtmp2_sink_sink_template =
+GST_STATIC_PAD_TEMPLATE ("sink",
+ GST_PAD_SINK,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS ("video/x-flv")
+ );
+
+/* class initialization */
+
+G_DEFINE_TYPE_WITH_CODE (GstRtmp2Sink, gst_rtmp2_sink, GST_TYPE_BASE_SINK,
+ G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
+ gst_rtmp2_sink_uri_handler_init);
+ G_IMPLEMENT_INTERFACE (GST_TYPE_RTMP_LOCATION_HANDLER, NULL));
+
+static void
+gst_rtmp2_sink_class_init (GstRtmp2SinkClass * klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GstBaseSinkClass *base_sink_class = GST_BASE_SINK_CLASS (klass);
+
+ gst_element_class_add_static_pad_template (GST_ELEMENT_CLASS (klass),
+ &gst_rtmp2_sink_sink_template);
+
+ gst_element_class_set_static_metadata (GST_ELEMENT_CLASS (klass),
+ "RTMP sink element", "Sink", "Sink element for RTMP streams",
+ "Make.TV, Inc. <info@make.tv>");
+
+ gobject_class->set_property = gst_rtmp2_sink_set_property;
+ gobject_class->get_property = gst_rtmp2_sink_get_property;
+ gobject_class->finalize = gst_rtmp2_sink_finalize;
+ base_sink_class->start = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_start);
+ base_sink_class->stop = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_stop);
+ base_sink_class->unlock = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_unlock);
+ base_sink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_unlock_stop);
+ base_sink_class->render = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_render);
+ base_sink_class->set_caps = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_set_caps);
+
+ g_object_class_override_property (gobject_class, PROP_LOCATION, "location");
+ g_object_class_override_property (gobject_class, PROP_SCHEME, "scheme");
+ g_object_class_override_property (gobject_class, PROP_HOST, "host");
+ g_object_class_override_property (gobject_class, PROP_PORT, "port");
+ g_object_class_override_property (gobject_class, PROP_APPLICATION,
+ "application");
+ g_object_class_override_property (gobject_class, PROP_STREAM, "stream");
+ g_object_class_override_property (gobject_class, PROP_SECURE_TOKEN,
+ "secure-token");
+ g_object_class_override_property (gobject_class, PROP_USERNAME, "username");
+ g_object_class_override_property (gobject_class, PROP_PASSWORD, "password");
+ g_object_class_override_property (gobject_class, PROP_AUTHMOD, "authmod");
+ g_object_class_override_property (gobject_class, PROP_TIMEOUT, "timeout");
+ g_object_class_override_property (gobject_class, PROP_TLS_VALIDATION_FLAGS,
+ "tls-validation-flags");
+
+ g_object_class_install_property (gobject_class, PROP_ASYNC_CONNECT,
+ g_param_spec_boolean ("async-connect", "Async connect",
+ "Connect on READY, otherwise on first push", TRUE,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_PEAK_KBPS,
+ g_param_spec_uint ("peak-kbps", "Peak bitrate",
+ "Bitrate in kbit/sec to pace outgoing packets", 0, G_MAXINT / 125, 0,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
+ GST_PARAM_MUTABLE_PLAYING));
+
+ GST_DEBUG_CATEGORY_INIT (gst_rtmp2_sink_debug_category, "rtmp2sink", 0,
+ "debug category for rtmp2sink element");
+}
+
+static void
+gst_rtmp2_sink_init (GstRtmp2Sink * self)
+{
+ self->location.flash_ver = g_strdup ("FMLE/3.0 (compatible; FMSc/1.0)");
+ self->async_connect = TRUE;
+
+ g_mutex_init (&self->lock);
+ g_cond_init (&self->cond);
+
+ self->task = gst_task_new (gst_rtmp2_sink_task_func, self, NULL);
+ g_rec_mutex_init (&self->task_lock);
+ gst_task_set_lock (self->task, &self->task_lock);
+
+ self->headers = g_ptr_array_new_with_free_func
+ ((GDestroyNotify) gst_mini_object_unref);
+}
+
+static void
+gst_rtmp2_sink_uri_handler_init (GstURIHandlerInterface * iface)
+{
+ gst_rtmp_location_handler_implement_uri_handler (iface, GST_URI_SINK);
+}
+
+static void
+gst_rtmp2_sink_set_property (GObject * object, guint property_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstRtmp2Sink *self = GST_RTMP2_SINK (object);
+
+ switch (property_id) {
+ case PROP_LOCATION:
+ gst_rtmp_location_handler_set_uri (GST_RTMP_LOCATION_HANDLER (self),
+ g_value_get_string (value));
+ break;
+ case PROP_SCHEME:
+ GST_OBJECT_LOCK (self);
+ self->location.scheme = g_value_get_enum (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_HOST:
+ GST_OBJECT_LOCK (self);
+ g_free (self->location.host);
+ self->location.host = g_value_dup_string (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_PORT:
+ GST_OBJECT_LOCK (self);
+ self->location.port = g_value_get_int (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_APPLICATION:
+ GST_OBJECT_LOCK (self);
+ g_free (self->location.application);
+ self->location.application = g_value_dup_string (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_STREAM:
+ GST_OBJECT_LOCK (self);
+ g_free (self->location.stream);
+ self->location.stream = g_value_dup_string (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_SECURE_TOKEN:
+ GST_OBJECT_LOCK (self);
+ g_free (self->location.secure_token);
+ self->location.secure_token = g_value_dup_string (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_USERNAME:
+ GST_OBJECT_LOCK (self);
+ g_free (self->location.username);
+ self->location.username = g_value_dup_string (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_PASSWORD:
+ GST_OBJECT_LOCK (self);
+ g_free (self->location.password);
+ self->location.password = g_value_dup_string (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_AUTHMOD:
+ GST_OBJECT_LOCK (self);
+ self->location.authmod = g_value_get_enum (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_TIMEOUT:
+ GST_OBJECT_LOCK (self);
+ self->location.timeout = g_value_get_uint (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_TLS_VALIDATION_FLAGS:
+ GST_OBJECT_LOCK (self);
+ self->location.tls_flags = g_value_get_flags (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_ASYNC_CONNECT:
+ GST_OBJECT_LOCK (self);
+ self->async_connect = g_value_get_boolean (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_PEAK_KBPS:
+ GST_OBJECT_LOCK (self);
+ self->peak_kbps = g_value_get_uint (value);
+ GST_OBJECT_UNLOCK (self);
+
+ g_mutex_lock (&self->lock);
+ set_pacing_rate (self);
+ g_mutex_unlock (&self->lock);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_rtmp2_sink_get_property (GObject * object, guint property_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstRtmp2Sink *self = GST_RTMP2_SINK (object);
+
+ switch (property_id) {
+ case PROP_LOCATION:
+ GST_OBJECT_LOCK (self);
+ g_value_take_string (value, gst_rtmp_location_get_string (&self->location,
+ TRUE));
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_SCHEME:
+ GST_OBJECT_LOCK (self);
+ g_value_set_enum (value, self->location.scheme);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_HOST:
+ GST_OBJECT_LOCK (self);
+ g_value_set_string (value, self->location.host);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_PORT:
+ GST_OBJECT_LOCK (self);
+ g_value_set_int (value, self->location.port);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_APPLICATION:
+ GST_OBJECT_LOCK (self);
+ g_value_set_string (value, self->location.application);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_STREAM:
+ GST_OBJECT_LOCK (self);
+ g_value_set_string (value, self->location.stream);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_SECURE_TOKEN:
+ GST_OBJECT_LOCK (self);
+ g_value_set_string (value, self->location.secure_token);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_USERNAME:
+ GST_OBJECT_LOCK (self);
+ g_value_set_string (value, self->location.username);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_PASSWORD:
+ GST_OBJECT_LOCK (self);
+ g_value_set_string (value, self->location.password);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_AUTHMOD:
+ GST_OBJECT_LOCK (self);
+ g_value_set_enum (value, self->location.authmod);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_TIMEOUT:
+ GST_OBJECT_LOCK (self);
+ g_value_set_uint (value, self->location.timeout);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_TLS_VALIDATION_FLAGS:
+ GST_OBJECT_LOCK (self);
+ g_value_set_flags (value, self->location.tls_flags);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_ASYNC_CONNECT:
+ GST_OBJECT_LOCK (self);
+ g_value_set_boolean (value, self->async_connect);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_PEAK_KBPS:
+ GST_OBJECT_LOCK (self);
+ g_value_set_uint (value, self->peak_kbps);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_rtmp2_sink_finalize (GObject * object)
+{
+ GstRtmp2Sink *self = GST_RTMP2_SINK (object);
+
+ g_clear_pointer (&self->headers, g_ptr_array_unref);
+
+ g_clear_object (&self->cancellable);
+ g_clear_object (&self->connection);
+
+ g_clear_object (&self->task);
+ g_rec_mutex_clear (&self->task_lock);
+
+ g_mutex_clear (&self->lock);
+ g_cond_clear (&self->cond);
+
+ gst_rtmp_location_clear (&self->location);
+
+ G_OBJECT_CLASS (gst_rtmp2_sink_parent_class)->finalize (object);
+}
+
+static gboolean
+gst_rtmp2_sink_start (GstBaseSink * sink)
+{
+ GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
+ gboolean async;
+
+ GST_OBJECT_LOCK (self);
+ async = self->async_connect;
+ GST_OBJECT_UNLOCK (self);
+
+ GST_INFO_OBJECT (self, "Starting (%s)", async ? "async" : "delayed");
+
+ g_clear_object (&self->cancellable);
+
+ self->running = TRUE;
+ self->cancellable = g_cancellable_new ();
+ self->stream_id = 0;
+ self->last_ts = 0;
+ self->base_ts = 0;
+
+ if (async) {
+ gst_task_start (self->task);
+ }
+
+ return TRUE;
+}
+
+static gboolean
+quit_invoker (gpointer user_data)
+{
+ g_main_loop_quit (user_data);
+ return G_SOURCE_REMOVE;
+}
+
+static void
+stop_task (GstRtmp2Sink * self)
+{
+ gst_task_stop (self->task);
+ self->running = FALSE;
+
+ if (self->cancellable) {
+ GST_DEBUG_OBJECT (self, "Cancelling");
+ g_cancellable_cancel (self->cancellable);
+ }
+
+ if (self->loop) {
+ GST_DEBUG_OBJECT (self, "Stopping loop");
+ g_main_context_invoke_full (self->context, G_PRIORITY_DEFAULT_IDLE,
+ quit_invoker, g_main_loop_ref (self->loop),
+ (GDestroyNotify) g_main_loop_unref);
+ }
+
+ g_cond_broadcast (&self->cond);
+}
+
+static gboolean
+gst_rtmp2_sink_stop (GstBaseSink * sink)
+{
+ GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
+
+ GST_DEBUG_OBJECT (self, "stop");
+
+ g_mutex_lock (&self->lock);
+ stop_task (self);
+ g_mutex_unlock (&self->lock);
+
+ gst_task_join (self->task);
+
+ return TRUE;
+}
+
+static gboolean
+gst_rtmp2_sink_unlock (GstBaseSink * sink)
+{
+ GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
+
+ GST_DEBUG_OBJECT (self, "unlock");
+
+ g_mutex_lock (&self->lock);
+ self->flushing = TRUE;
+ g_cond_broadcast (&self->cond);
+ g_mutex_unlock (&self->lock);
+
+ return TRUE;
+}
+
+static gboolean
+gst_rtmp2_sink_unlock_stop (GstBaseSink * sink)
+{
+ GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
+
+ GST_DEBUG_OBJECT (self, "unlock_stop");
+
+ g_mutex_lock (&self->lock);
+ self->flushing = FALSE;
+ g_mutex_unlock (&self->lock);
+
+ return TRUE;
+}
+
+static gboolean
+buffer_to_message (GstRtmp2Sink * self, GstBuffer * buffer, GstBuffer ** outbuf)
+{
+ GstBuffer *message;
+ gsize payload_offset, payload_size;
+ guint64 timestamp;
+ guint32 cstream;
+ GstRtmpMessageType type;
+
+ {
+ GstMapInfo info;
+
+ if (G_UNLIKELY (!gst_buffer_map (buffer, &info, GST_MAP_READ))) {
+ GST_ERROR_OBJECT (self, "map failed: %" GST_PTR_FORMAT, buffer);
+ return FALSE;
+ }
+
+ /* FIXME: This is ugly and only works behind flvmux.
+ * Implement true RTMP muxing. */
+
+ if (G_UNLIKELY (info.size >= 4 && memcmp (info.data, "FLV", 3) == 0)) {
+ /* drop the header, we don't need it */
+ GST_DEBUG_OBJECT (self, "ignoring FLV header: %" GST_PTR_FORMAT, buffer);
+ gst_buffer_unmap (buffer, &info);
+ *outbuf = NULL;
+ return TRUE;
+ }
+
+ if (G_UNLIKELY (info.size < 11 + 4)) {
+ GST_ERROR_OBJECT (self, "too small: %" GST_PTR_FORMAT, buffer);
+ gst_buffer_unmap (buffer, &info);
+ return FALSE;
+ }
+
+ /* payload between 11 byte header and 4 byte size footer */
+ payload_offset = 11;
+ payload_size = info.size - 11 - 4;
+
+ type = GST_READ_UINT8 (info.data);
+ timestamp = GST_READ_UINT24_BE (info.data + 4);
+ timestamp |= (guint32) GST_READ_UINT8 (info.data + 7) << 24;
+
+ /* flvmux timestamps roll over after about 49 days */
+ if (timestamp + self->base_ts + G_MAXINT32 < self->last_ts) {
+ GST_WARNING_OBJECT (self, "Timestamp regression %" G_GUINT64_FORMAT
+ " -> %" G_GUINT64_FORMAT "; assuming overflow", self->last_ts,
+ timestamp + self->base_ts);
+ self->base_ts += G_MAXUINT32;
+ self->base_ts += 1;
+ } else if (timestamp + self->base_ts > self->last_ts + G_MAXINT32) {
+ GST_WARNING_OBJECT (self, "Timestamp jump %" G_GUINT64_FORMAT
+ " -> %" G_GUINT64_FORMAT "; assuming underflow", self->last_ts,
+ timestamp + self->base_ts);
+ if (self->base_ts > 0) {
+ self->base_ts -= G_MAXUINT32;
+ self->base_ts -= 1;
+ } else {
+ GST_WARNING_OBJECT (self, "Cannot regress further;"
+ " forcing timestamp to zero");
+ timestamp = 0;
+ }
+ }
+ timestamp += self->base_ts;
+ self->last_ts = timestamp;
+
+ gst_buffer_unmap (buffer, &info);
+ }
+
+ switch (type) {
+ case GST_RTMP_MESSAGE_TYPE_DATA_AMF0:
+ cstream = 4;
+ break;
+
+ case GST_RTMP_MESSAGE_TYPE_AUDIO:
+ cstream = 5;
+ break;
+
+ case GST_RTMP_MESSAGE_TYPE_VIDEO:
+ cstream = 6;
+ break;
+
+ default:
+ GST_ERROR_OBJECT (self, "unknown tag type %d", type);
+ return FALSE;
+ }
+
+ /* May not know stream ID yet; set later */
+ message = gst_rtmp_message_new (type, cstream, 0);
+ message = gst_buffer_append_region (message, gst_buffer_ref (buffer),
+ payload_offset, payload_size);
+
+ GST_BUFFER_DTS (message) = timestamp * GST_MSECOND;
+
+ if (type == GST_RTMP_MESSAGE_TYPE_DATA_AMF0) {
+ /* FIXME: HACK: Attach a setDataFrame header.
+ * This should be done using a command. */
+
+ static const guint8 header[] = {
+ 0x02, 0x00, 0x0d, 0x40, 0x73, 0x65, 0x74, 0x44,
+ 0x61, 0x74, 0x61, 0x46, 0x72, 0x61, 0x6d, 0x65
+ };
+
+ GstMemory *memory = gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY,
+ (guint8 *) header, sizeof header, 0, sizeof header, NULL, NULL);
+
+ gst_buffer_prepend_memory (message, memory);
+ }
+
+ *outbuf = message;
+ return TRUE;
+}
+
+static gboolean
+should_drop_header (GstRtmp2Sink * self, GstBuffer * buffer)
+{
+ guint len;
+
+ if (G_LIKELY (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_HEADER))) {
+ return FALSE;
+ }
+
+ g_mutex_lock (&self->lock);
+ len = self->headers->len;
+ g_mutex_unlock (&self->lock);
+
+ /* Drop header buffers when we have streamheader caps */
+ return len > 0;
+}
+
+static void
+send_message (GstRtmp2Sink * self, GstBuffer * message)
+{
+ GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (message);
+ g_return_if_fail (self->stream_id != 0);
+ meta->mstream = self->stream_id;
+ gst_rtmp_connection_queue_message (self->connection, message);
+}
+
+static void
+send_streamheader (GstRtmp2Sink * self)
+{
+ guint i;
+
+ if (G_LIKELY (self->headers->len == 0)) {
+ return;
+ }
+
+ GST_DEBUG_OBJECT (self, "Sending %u streamheader messages",
+ self->headers->len);
+
+ for (i = 0; i < self->headers->len; i++) {
+ send_message (self, g_ptr_array_index (self->headers, i));
+ }
+
+ /* Steal pointers: suppress free */
+ g_ptr_array_set_free_func (self->headers, NULL);
+ g_ptr_array_set_size (self->headers, 0);
+ g_ptr_array_set_free_func (self->headers,
+ (GDestroyNotify) gst_mini_object_unref);
+}
+
+static inline gboolean
+is_running (GstRtmp2Sink * self)
+{
+ return G_LIKELY (self->running && !self->flushing);
+}
+
+static GstFlowReturn
+gst_rtmp2_sink_render (GstBaseSink * sink, GstBuffer * buffer)
+{
+ GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
+ GstBuffer *message;
+ GstFlowReturn ret;
+
+ if (G_UNLIKELY (should_drop_header (self, buffer))) {
+ GST_DEBUG_OBJECT (self, "Skipping header %" GST_PTR_FORMAT, buffer);
+ return GST_FLOW_OK;
+ }
+
+ GST_LOG_OBJECT (self, "render %" GST_PTR_FORMAT, buffer);
+
+ if (G_UNLIKELY (!buffer_to_message (self, buffer, &message))) {
+ GST_ELEMENT_ERROR (self, STREAM, FAILED, ("Failed to convert FLV to RTMP"),
+ ("Failed to convert %" GST_PTR_FORMAT, message));
+ return GST_FLOW_ERROR;
+ }
+
+ if (G_UNLIKELY (!message)) {
+ GST_DEBUG_OBJECT (self, "Skipping %" GST_PTR_FORMAT, buffer);
+ return GST_FLOW_OK;
+ }
+
+ g_mutex_lock (&self->lock);
+
+ if (G_UNLIKELY (is_running (self) && self->cancellable &&
+ gst_task_get_state (self->task) != GST_TASK_STARTED)) {
+ GST_DEBUG_OBJECT (self, "Starting connect");
+ gst_task_start (self->task);
+ }
+
+ while (G_UNLIKELY (is_running (self) && !self->connection)) {
+ GST_DEBUG_OBJECT (self, "Waiting for connection");
+ g_cond_wait (&self->cond, &self->lock);
+ }
+
+ while (G_UNLIKELY (is_running (self) && self->connection &&
+ gst_rtmp_connection_get_num_queued (self->connection) > 3)) {
+ GST_LOG_OBJECT (self, "Waiting for queue");
+ g_cond_wait (&self->cond, &self->lock);
+ }
+
+ if (G_UNLIKELY (!is_running (self))) {
+ gst_buffer_unref (message);
+ ret = GST_FLOW_FLUSHING;
+ } else if (G_UNLIKELY (!self->connection)) {
+ gst_buffer_unref (message);
+ /* send_connect_error has sent an ERROR message */
+ ret = GST_FLOW_ERROR;
+ } else {
+ send_streamheader (self);
+ send_message (self, message);
+ ret = GST_FLOW_OK;
+ }
+
+ g_mutex_unlock (&self->lock);
+ return ret;
+}
+
+static gboolean
+add_streamheader (GstRtmp2Sink * self, const GValue * value)
+{
+ GstBuffer *buffer, *message;
+
+ g_return_val_if_fail (value, FALSE);
+
+ if (!GST_VALUE_HOLDS_BUFFER (value)) {
+ GST_ERROR_OBJECT (self, "'streamheader' item of unexpected type '%s'",
+ G_VALUE_TYPE_NAME (value));
+ return FALSE;
+ }
+
+ buffer = gst_value_get_buffer (value);
+
+ if (!buffer_to_message (self, buffer, &message)) {
+ GST_ERROR_OBJECT (self, "Failed to read streamheader %" GST_PTR_FORMAT,
+ buffer);
+ return FALSE;
+ }
+
+ if (message) {
+ GST_DEBUG_OBJECT (self, "Adding streamheader %" GST_PTR_FORMAT, buffer);
+ g_ptr_array_add (self->headers, message);
+ } else {
+ GST_DEBUG_OBJECT (self, "Skipping streamheader %" GST_PTR_FORMAT, buffer);
+ }
+
+ return TRUE;
+}
+
+static gboolean
+gst_rtmp2_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
+{
+ GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
+ GstStructure *s;
+ const GValue *streamheader;
+ guint i = 0;
+
+ GST_DEBUG_OBJECT (self, "setcaps %" GST_PTR_FORMAT, caps);
+
+ g_ptr_array_set_size (self->headers, 0);
+
+ s = gst_caps_get_structure (caps, 0);
+ streamheader = gst_structure_get_value (s, "streamheader");
+
+ if (!streamheader) {
+ GST_DEBUG_OBJECT (self, "'streamheader' field not present");
+ } else if (GST_VALUE_HOLDS_BUFFER (streamheader)) {
+ GST_DEBUG_OBJECT (self, "'streamheader' field holds buffer");
+ if (!add_streamheader (self, streamheader)) {
+ return FALSE;
+ }
+
+ i = 1;
+ } else if (GST_VALUE_HOLDS_ARRAY (streamheader)) {
+ guint size = gst_value_array_get_size (streamheader);
+
+ GST_DEBUG_OBJECT (self, "'streamheader' field holds array");
+
+ for (; i < size; i++) {
+ const GValue *v = gst_value_array_get_value (streamheader, i);
+
+ if (!add_streamheader (self, v)) {
+ return FALSE;
+ }
+ }
+ } else {
+ GST_ERROR_OBJECT (self, "'streamheader' field has unexpected type '%s'",
+ G_VALUE_TYPE_NAME (streamheader));
+ return FALSE;
+ }
+
+ GST_DEBUG_OBJECT (self, "Collected streamheaders: %u buffers -> %u messages",
+ i, self->headers->len);
+
+ return TRUE;
+}
+
+/* Mainloop task */
+static void
+gst_rtmp2_sink_task_func (gpointer user_data)
+{
+ GstRtmp2Sink *self = GST_RTMP2_SINK (user_data);
+ GMainContext *context;
+ GMainLoop *loop;
+ GTask *connector;
+
+ GST_DEBUG_OBJECT (self, "gst_rtmp2_sink_task starting");
+
+ g_mutex_lock (&self->lock);
+ context = self->context = g_main_context_new ();
+ g_main_context_push_thread_default (context);
+ loop = self->loop = g_main_loop_new (context, TRUE);
+ connector = g_task_new (self, self->cancellable, connect_task_done, NULL);
+ GST_OBJECT_LOCK (self);
+ gst_rtmp_client_connect_async (&self->location, self->cancellable,
+ client_connect_done, connector);
+ GST_OBJECT_UNLOCK (self);
+ g_mutex_unlock (&self->lock);
+
+ g_main_loop_run (loop);
+
+ g_mutex_lock (&self->lock);
+ g_clear_pointer (&self->loop, g_main_loop_unref);
+ g_clear_pointer (&self->connection, gst_rtmp_connection_close_and_unref);
+ g_cond_broadcast (&self->cond);
+ g_mutex_unlock (&self->lock);
+
+ while (g_main_context_pending (context)) {
+ GST_DEBUG_OBJECT (self, "iterating main context to clean up");
+ g_main_context_iteration (context, FALSE);
+ }
+
+ g_main_context_pop_thread_default (context);
+
+ g_mutex_lock (&self->lock);
+ g_clear_pointer (&self->context, g_main_context_unref);
+ g_ptr_array_set_size (self->headers, 0);
+ g_mutex_unlock (&self->lock);
+
+ GST_DEBUG_OBJECT (self, "gst_rtmp2_sink_task exiting");
+}
+
+static void
+client_connect_done (GObject * source, GAsyncResult * result,
+ gpointer user_data)
+{
+ GTask *task = user_data;
+ GstRtmp2Sink *self = g_task_get_source_object (task);
+ GError *error = NULL;
+ GstRtmpConnection *connection;
+
+ connection = gst_rtmp_client_connect_finish (result, &error);
+ if (!connection) {
+ g_task_return_error (task, error);
+ g_object_unref (task);
+ return;
+ }
+
+ g_task_set_task_data (task, connection, g_object_unref);
+
+ if (g_task_return_error_if_cancelled (task)) {
+ g_object_unref (task);
+ return;
+ }
+
+ GST_OBJECT_LOCK (self);
+ gst_rtmp_client_start_publish_async (connection, self->location.stream,
+ g_task_get_cancellable (task), start_publish_done, task);
+ GST_OBJECT_UNLOCK (self);
+}
+
+static void
+start_publish_done (GObject * source, GAsyncResult * result, gpointer user_data)
+{
+ GTask *task = G_TASK (user_data);
+ GstRtmp2Sink *self = g_task_get_source_object (task);
+ GstRtmpConnection *connection = g_task_get_task_data (task);
+ GError *error = NULL;
+
+ if (g_task_return_error_if_cancelled (task)) {
+ g_object_unref (task);
+ return;
+ }
+
+ if (gst_rtmp_client_start_publish_finish (connection, result,
+ &self->stream_id, &error)) {
+ g_task_return_pointer (task, g_object_ref (connection),
+ gst_rtmp_connection_close_and_unref);
+ } else {
+ g_task_return_error (task, error);
+ }
+
+ g_task_set_task_data (task, NULL, NULL);
+ g_object_unref (task);
+}
+
+static void
+put_chunk (GstRtmpConnection * connection, gpointer user_data)
+{
+ GstRtmp2Sink *self = GST_RTMP2_SINK (user_data);
+
+ g_mutex_lock (&self->lock);
+ g_cond_signal (&self->cond);
+ g_mutex_unlock (&self->lock);
+}
+
+static void
+error_callback (GstRtmpConnection * connection, GstRtmp2Sink * self)
+{
+ g_mutex_lock (&self->lock);
+ if (self->cancellable) {
+ g_cancellable_cancel (self->cancellable);
+ } else if (self->loop) {
+ GST_ELEMENT_ERROR (self, RESOURCE, WRITE, ("Connection error"), (NULL));
+ stop_task (self);
+ }
+ g_mutex_unlock (&self->lock);
+}
+
+static void
+send_connect_error (GstRtmp2Sink * self, GError * error)
+{
+ if (!error) {
+ GST_ERROR_OBJECT (self, "Connect failed with NULL error");
+ GST_ELEMENT_ERROR (self, RESOURCE, FAILED, ("Failed to connect"), (NULL));
+ return;
+ }
+
+ if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+ GST_DEBUG_OBJECT (self, "Connection was cancelled (%s)",
+ GST_STR_NULL (error->message));
+ return;
+ }
+
+ GST_ERROR_OBJECT (self, "Failed to connect (%s:%d): %s",
+ g_quark_to_string (error->domain), error->code,
+ GST_STR_NULL (error->message));
+
+ if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED)) {
+ GST_ELEMENT_ERROR (self, RESOURCE, NOT_AUTHORIZED,
+ ("Not authorized to connect"), ("%s", GST_STR_NULL (error->message)));
+ } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CONNECTION_REFUSED)) {
+ GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ,
+ ("Could not connect"), ("%s", GST_STR_NULL (error->message)));
+ } else {
+ GST_ELEMENT_ERROR (self, RESOURCE, FAILED,
+ ("Failed to connect"),
+ ("error %s:%d: %s", g_quark_to_string (error->domain), error->code,
+ GST_STR_NULL (error->message)));
+ }
+}
+
+static void
+connect_task_done (GObject * object, GAsyncResult * result, gpointer user_data)
+{
+ GstRtmp2Sink *self = GST_RTMP2_SINK (object);
+ GTask *task = G_TASK (result);
+ GError *error = NULL;
+
+ g_mutex_lock (&self->lock);
+
+ g_warn_if_fail (g_task_is_valid (task, object));
+
+ if (self->cancellable == g_task_get_cancellable (task)) {
+ g_clear_object (&self->cancellable);
+ }
+
+ self->connection = g_task_propagate_pointer (task, &error);
+ if (self->connection) {
+ set_pacing_rate (self);
+ gst_rtmp_connection_set_output_handler (self->connection,
+ put_chunk, g_object_ref (self), g_object_unref);
+ g_signal_connect_object (self->connection, "error",
+ G_CALLBACK (error_callback), self, 0);
+ } else {
+ send_connect_error (self, error);
+ stop_task (self);
+ g_error_free (error);
+ }
+
+ g_cond_broadcast (&self->cond);
+ g_mutex_unlock (&self->lock);
+}
+
+static gboolean
+socket_set_pacing_rate (GSocket * socket, gint pacing_rate, GError ** error)
+{
+#ifdef SO_MAX_PACING_RATE
+ if (!g_socket_set_option (socket, SOL_SOCKET, SO_MAX_PACING_RATE,
+ pacing_rate, error)) {
+ g_prefix_error (error, "setsockopt failed: ");
+ return FALSE;
+ }
+#else
+ if (pacing_rate != -1) {
+ g_set_error (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
+ "SO_MAX_PACING_RATE is not supported");
+ return FALSE;
+ }
+#endif
+
+ return TRUE;
+}
+
+static void
+set_pacing_rate (GstRtmp2Sink * self)
+{
+ GError *error = NULL;
+ gint pacing_rate;
+
+ if (!self->connection)
+ return;
+
+ GST_OBJECT_LOCK (self);
+ pacing_rate = self->peak_kbps ? self->peak_kbps * 125 : -1;
+ GST_OBJECT_UNLOCK (self);
+
+ if (socket_set_pacing_rate (gst_rtmp_connection_get_socket (self->connection),
+ pacing_rate, &error))
+ GST_INFO_OBJECT (self, "Set pacing rate to %d Bps", pacing_rate);
+ else
+ GST_WARNING_OBJECT (self, "Could not set pacing rate: %s", error->message);
+
+ g_clear_error (&error);
+}
diff --git a/gst/rtmp2/gstrtmp2sink.h b/gst/rtmp2/gstrtmp2sink.h
new file mode 100644
index 000000000..98f4d1173
--- /dev/null
+++ b/gst/rtmp2/gstrtmp2sink.h
@@ -0,0 +1,34 @@
+/* GStreamer
+ * Copyright (C) 2014 David Schleef <ds@schleef.org>
+ * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
+ * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef _GST_RTMP2_SINK_H_
+
+#define _GST_RTMP2_SINK_H_
+
+#include <gst/gst.h>
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_RTMP2_SINK (gst_rtmp2_sink_get_type())
+GType gst_rtmp2_sink_get_type (void);
+
+G_END_DECLS
+#endif
diff --git a/gst/rtmp2/gstrtmp2src.c b/gst/rtmp2/gstrtmp2src.c
new file mode 100644
index 000000000..0e7598e32
--- /dev/null
+++ b/gst/rtmp2/gstrtmp2src.c
@@ -0,0 +1,839 @@
+/* GStreamer
+ * Copyright (C) 2014 David Schleef <ds@schleef.org>
+ * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
+ * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+ * Boston, MA 02110-1335, USA.
+ */
+/**
+ * SECTION:element-gstrtmp2src
+ *
+ * The rtmp2src element receives input streams from an RTMP server.
+ *
+ * <refsect2>
+ * <title>Example launch line</title>
+ * |[
+ * gst-launch -v rtmp2src ! decodebin ! fakesink
+ * ]|
+ * FIXME Describe what the pipeline does.
+ * </refsect2>
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstrtmp2src.h"
+
+#include "gstrtmp2locationhandler.h"
+#include "rtmp/rtmpclient.h"
+#include "rtmp/rtmpmessage.h"
+
+#include <gst/base/gstpushsrc.h>
+#include <string.h>
+
+GST_DEBUG_CATEGORY_STATIC (gst_rtmp2_src_debug_category);
+#define GST_CAT_DEFAULT gst_rtmp2_src_debug_category
+
+/* prototypes */
+#define GST_RTMP2_SRC(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTMP2_SRC,GstRtmp2Src))
+#define GST_IS_RTMP2_SRC(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTMP2_SRC))
+
+typedef struct
+{
+ GstPushSrc parent_instance;
+
+ /* properties */
+ GstRtmpLocation location;
+ gboolean async_connect;
+
+ /* stuff */
+ gboolean running, flushing;
+ GMutex lock;
+ GCond cond;
+
+ GstTask *task;
+ GRecMutex task_lock;
+
+ GMainLoop *loop;
+ GMainContext *context;
+
+ GCancellable *cancellable;
+ GstRtmpConnection *connection;
+ guint32 stream_id;
+
+ GstBuffer *message;
+ gboolean sent_header;
+ GstClockTime last_ts;
+} GstRtmp2Src;
+
+typedef struct
+{
+ GstPushSrcClass parent_class;
+} GstRtmp2SrcClass;
+
+/* GObject virtual functions */
+static void gst_rtmp2_src_set_property (GObject * object,
+ guint property_id, const GValue * value, GParamSpec * pspec);
+static void gst_rtmp2_src_get_property (GObject * object,
+ guint property_id, GValue * value, GParamSpec * pspec);
+static void gst_rtmp2_src_finalize (GObject * object);
+static void gst_rtmp2_src_uri_handler_init (GstURIHandlerInterface * iface);
+
+/* GstBaseSrc virtual functions */
+static gboolean gst_rtmp2_src_start (GstBaseSrc * src);
+static gboolean gst_rtmp2_src_stop (GstBaseSrc * src);
+static gboolean gst_rtmp2_src_unlock (GstBaseSrc * src);
+static gboolean gst_rtmp2_src_unlock_stop (GstBaseSrc * src);
+static GstFlowReturn gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset,
+ guint size, GstBuffer ** outbuf);
+
+/* Internal API */
+static void gst_rtmp2_src_task_func (gpointer user_data);
+static void client_connect_done (GObject * source, GAsyncResult * result,
+ gpointer user_data);
+static void start_play_done (GObject * object, GAsyncResult * result,
+ gpointer user_data);
+static void connect_task_done (GObject * object, GAsyncResult * result,
+ gpointer user_data);
+
+enum
+{
+ PROP_0,
+ PROP_LOCATION,
+ PROP_SCHEME,
+ PROP_HOST,
+ PROP_PORT,
+ PROP_APPLICATION,
+ PROP_STREAM,
+ PROP_SECURE_TOKEN,
+ PROP_USERNAME,
+ PROP_PASSWORD,
+ PROP_AUTHMOD,
+ PROP_TIMEOUT,
+ PROP_TLS_VALIDATION_FLAGS,
+ PROP_ASYNC_CONNECT,
+};
+
+/* pad templates */
+
+static GstStaticPadTemplate gst_rtmp2_src_src_template =
+GST_STATIC_PAD_TEMPLATE ("src",
+ GST_PAD_SRC,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS ("video/x-flv")
+ );
+
+/* class initialization */
+
+G_DEFINE_TYPE_WITH_CODE (GstRtmp2Src, gst_rtmp2_src, GST_TYPE_PUSH_SRC,
+ G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
+ gst_rtmp2_src_uri_handler_init);
+ G_IMPLEMENT_INTERFACE (GST_TYPE_RTMP_LOCATION_HANDLER, NULL));
+
+static void
+gst_rtmp2_src_class_init (GstRtmp2SrcClass * klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GstBaseSrcClass *base_src_class = GST_BASE_SRC_CLASS (klass);
+
+ gst_element_class_add_static_pad_template (GST_ELEMENT_CLASS (klass),
+ &gst_rtmp2_src_src_template);
+
+ gst_element_class_set_static_metadata (GST_ELEMENT_CLASS (klass),
+ "RTMP source element", "Source", "Source element for RTMP streams",
+ "Make.TV, Inc. <info@make.tv>");
+
+ gobject_class->set_property = gst_rtmp2_src_set_property;
+ gobject_class->get_property = gst_rtmp2_src_get_property;
+ gobject_class->finalize = gst_rtmp2_src_finalize;
+ base_src_class->start = GST_DEBUG_FUNCPTR (gst_rtmp2_src_start);
+ base_src_class->stop = GST_DEBUG_FUNCPTR (gst_rtmp2_src_stop);
+ base_src_class->unlock = GST_DEBUG_FUNCPTR (gst_rtmp2_src_unlock);
+ base_src_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_rtmp2_src_unlock_stop);
+ base_src_class->create = GST_DEBUG_FUNCPTR (gst_rtmp2_src_create);
+
+ g_object_class_override_property (gobject_class, PROP_LOCATION, "location");
+ g_object_class_override_property (gobject_class, PROP_SCHEME, "scheme");
+ g_object_class_override_property (gobject_class, PROP_HOST, "host");
+ g_object_class_override_property (gobject_class, PROP_PORT, "port");
+ g_object_class_override_property (gobject_class, PROP_APPLICATION,
+ "application");
+ g_object_class_override_property (gobject_class, PROP_STREAM, "stream");
+ g_object_class_override_property (gobject_class, PROP_SECURE_TOKEN,
+ "secure-token");
+ g_object_class_override_property (gobject_class, PROP_USERNAME, "username");
+ g_object_class_override_property (gobject_class, PROP_PASSWORD, "password");
+ g_object_class_override_property (gobject_class, PROP_AUTHMOD, "authmod");
+ g_object_class_override_property (gobject_class, PROP_TIMEOUT, "timeout");
+ g_object_class_override_property (gobject_class, PROP_TLS_VALIDATION_FLAGS,
+ "tls-validation-flags");
+
+ g_object_class_install_property (gobject_class, PROP_ASYNC_CONNECT,
+ g_param_spec_boolean ("async-connect", "Async connect",
+ "Connect on READY, otherwise on first push", TRUE,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ GST_DEBUG_CATEGORY_INIT (gst_rtmp2_src_debug_category, "rtmp2src", 0,
+ "debug category for rtmp2src element");
+}
+
+static void
+gst_rtmp2_src_init (GstRtmp2Src * self)
+{
+ self->async_connect = TRUE;
+
+ g_mutex_init (&self->lock);
+ g_cond_init (&self->cond);
+
+ self->task = gst_task_new (gst_rtmp2_src_task_func, self, NULL);
+ g_rec_mutex_init (&self->task_lock);
+ gst_task_set_lock (self->task, &self->task_lock);
+}
+
+static void
+gst_rtmp2_src_uri_handler_init (GstURIHandlerInterface * iface)
+{
+ gst_rtmp_location_handler_implement_uri_handler (iface, GST_URI_SRC);
+}
+
+static void
+gst_rtmp2_src_set_property (GObject * object, guint property_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstRtmp2Src *self = GST_RTMP2_SRC (object);
+
+ switch (property_id) {
+ case PROP_LOCATION:
+ gst_rtmp_location_handler_set_uri (GST_RTMP_LOCATION_HANDLER (self),
+ g_value_get_string (value));
+ break;
+ case PROP_SCHEME:
+ GST_OBJECT_LOCK (self);
+ self->location.scheme = g_value_get_enum (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_HOST:
+ GST_OBJECT_LOCK (self);
+ g_free (self->location.host);
+ self->location.host = g_value_dup_string (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_PORT:
+ GST_OBJECT_LOCK (self);
+ self->location.port = g_value_get_int (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_APPLICATION:
+ GST_OBJECT_LOCK (self);
+ g_free (self->location.application);
+ self->location.application = g_value_dup_string (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_STREAM:
+ GST_OBJECT_LOCK (self);
+ g_free (self->location.stream);
+ self->location.stream = g_value_dup_string (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_SECURE_TOKEN:
+ GST_OBJECT_LOCK (self);
+ g_free (self->location.secure_token);
+ self->location.secure_token = g_value_dup_string (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_USERNAME:
+ GST_OBJECT_LOCK (self);
+ g_free (self->location.username);
+ self->location.username = g_value_dup_string (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_PASSWORD:
+ GST_OBJECT_LOCK (self);
+ g_free (self->location.password);
+ self->location.password = g_value_dup_string (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_AUTHMOD:
+ GST_OBJECT_LOCK (self);
+ self->location.authmod = g_value_get_enum (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_TIMEOUT:
+ GST_OBJECT_LOCK (self);
+ self->location.timeout = g_value_get_uint (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_TLS_VALIDATION_FLAGS:
+ GST_OBJECT_LOCK (self);
+ self->location.tls_flags = g_value_get_flags (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_ASYNC_CONNECT:
+ GST_OBJECT_LOCK (self);
+ self->async_connect = g_value_get_boolean (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_rtmp2_src_get_property (GObject * object, guint property_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstRtmp2Src *self = GST_RTMP2_SRC (object);
+
+ switch (property_id) {
+ case PROP_LOCATION:
+ GST_OBJECT_LOCK (self);
+ g_value_take_string (value, gst_rtmp_location_get_string (&self->location,
+ TRUE));
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_SCHEME:
+ GST_OBJECT_LOCK (self);
+ g_value_set_enum (value, self->location.scheme);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_HOST:
+ GST_OBJECT_LOCK (self);
+ g_value_set_string (value, self->location.host);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_PORT:
+ GST_OBJECT_LOCK (self);
+ g_value_set_int (value, self->location.port);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_APPLICATION:
+ GST_OBJECT_LOCK (self);
+ g_value_set_string (value, self->location.application);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_STREAM:
+ GST_OBJECT_LOCK (self);
+ g_value_set_string (value, self->location.stream);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_SECURE_TOKEN:
+ GST_OBJECT_LOCK (self);
+ g_value_set_string (value, self->location.secure_token);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_USERNAME:
+ GST_OBJECT_LOCK (self);
+ g_value_set_string (value, self->location.username);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_PASSWORD:
+ GST_OBJECT_LOCK (self);
+ g_value_set_string (value, self->location.password);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_AUTHMOD:
+ GST_OBJECT_LOCK (self);
+ g_value_set_enum (value, self->location.authmod);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_TIMEOUT:
+ GST_OBJECT_LOCK (self);
+ g_value_set_uint (value, self->location.timeout);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_TLS_VALIDATION_FLAGS:
+ GST_OBJECT_LOCK (self);
+ g_value_set_flags (value, self->location.tls_flags);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_ASYNC_CONNECT:
+ GST_OBJECT_LOCK (self);
+ g_value_set_boolean (value, self->async_connect);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_rtmp2_src_finalize (GObject * object)
+{
+ GstRtmp2Src *self = GST_RTMP2_SRC (object);
+
+ gst_buffer_replace (&self->message, NULL);
+
+ g_clear_object (&self->cancellable);
+ g_clear_object (&self->connection);
+
+ g_clear_object (&self->task);
+ g_rec_mutex_clear (&self->task_lock);
+
+ g_mutex_clear (&self->lock);
+ g_cond_clear (&self->cond);
+
+ gst_rtmp_location_clear (&self->location);
+
+ G_OBJECT_CLASS (gst_rtmp2_src_parent_class)->finalize (object);
+}
+
+static gboolean
+gst_rtmp2_src_start (GstBaseSrc * src)
+{
+ GstRtmp2Src *self = GST_RTMP2_SRC (src);
+ gboolean async;
+
+ GST_OBJECT_LOCK (self);
+ async = self->async_connect;
+ GST_OBJECT_UNLOCK (self);
+
+ GST_INFO_OBJECT (self, "Starting (%s)", async ? "async" : "delayed");
+
+ g_clear_object (&self->cancellable);
+
+ self->running = TRUE;
+ self->cancellable = g_cancellable_new ();
+ self->stream_id = 0;
+ self->sent_header = FALSE;
+ self->last_ts = GST_CLOCK_TIME_NONE;
+
+ if (async) {
+ gst_task_start (self->task);
+ }
+
+ return TRUE;
+}
+
+static gboolean
+quit_invoker (gpointer user_data)
+{
+ g_main_loop_quit (user_data);
+ return G_SOURCE_REMOVE;
+}
+
+static void
+stop_task (GstRtmp2Src * self)
+{
+ gst_task_stop (self->task);
+ self->running = FALSE;
+
+ if (self->cancellable) {
+ GST_DEBUG_OBJECT (self, "Cancelling");
+ g_cancellable_cancel (self->cancellable);
+ }
+
+ if (self->loop) {
+ GST_DEBUG_OBJECT (self, "Stopping loop");
+ g_main_context_invoke_full (self->context, G_PRIORITY_DEFAULT_IDLE,
+ quit_invoker, g_main_loop_ref (self->loop),
+ (GDestroyNotify) g_main_loop_unref);
+ }
+
+ g_cond_broadcast (&self->cond);
+}
+
+static gboolean
+gst_rtmp2_src_stop (GstBaseSrc * src)
+{
+ GstRtmp2Src *self = GST_RTMP2_SRC (src);
+
+ GST_DEBUG_OBJECT (self, "stop");
+
+ g_mutex_lock (&self->lock);
+ stop_task (self);
+ g_mutex_unlock (&self->lock);
+
+ gst_task_join (self->task);
+
+ return TRUE;
+}
+
+static gboolean
+gst_rtmp2_src_unlock (GstBaseSrc * src)
+{
+ GstRtmp2Src *self = GST_RTMP2_SRC (src);
+
+ GST_DEBUG_OBJECT (self, "unlock");
+
+ g_mutex_lock (&self->lock);
+ self->flushing = TRUE;
+ g_cond_broadcast (&self->cond);
+ g_mutex_unlock (&self->lock);
+
+ return TRUE;
+}
+
+static gboolean
+gst_rtmp2_src_unlock_stop (GstBaseSrc * src)
+{
+ GstRtmp2Src *self = GST_RTMP2_SRC (src);
+
+ GST_DEBUG_OBJECT (self, "unlock_stop");
+
+ g_mutex_lock (&self->lock);
+ self->flushing = FALSE;
+ g_mutex_unlock (&self->lock);
+
+ return TRUE;
+}
+
+static GstFlowReturn
+gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size,
+ GstBuffer ** outbuf)
+{
+ GstRtmp2Src *self = GST_RTMP2_SRC (src);
+ GstBuffer *message, *buffer;
+ GstRtmpMeta *meta;
+ guint32 timestamp = 0;
+
+ static const guint8 flv_header_data[] = {
+ 0x46, 0x4c, 0x56, 0x01, 0x01, 0x00, 0x00, 0x00,
+ 0x09, 0x00, 0x00, 0x00, 0x00,
+ };
+
+ GST_LOG_OBJECT (self, "create");
+
+ g_mutex_lock (&self->lock);
+
+ if (self->running) {
+ gst_task_start (self->task);
+ }
+
+ while (!self->message) {
+ if (!self->running) {
+ g_mutex_unlock (&self->lock);
+ return GST_FLOW_EOS;
+ }
+ if (self->flushing) {
+ g_mutex_unlock (&self->lock);
+ return GST_FLOW_FLUSHING;
+ }
+ g_cond_wait (&self->cond, &self->lock);
+ }
+
+ message = self->message;
+ self->message = NULL;
+ g_cond_signal (&self->cond);
+ g_mutex_unlock (&self->lock);
+
+ meta = gst_buffer_get_rtmp_meta (message);
+ if (!meta) {
+ GST_ELEMENT_ERROR (self, CORE, FAILED,
+ ("Internal error: No RTMP meta on buffer"),
+ ("No RTMP meta on %" GST_PTR_FORMAT, message));
+ gst_buffer_unref (message);
+ return GST_FLOW_ERROR;
+ }
+
+ if (GST_BUFFER_DTS_IS_VALID (message)) {
+ GstClockTime last_ts = self->last_ts, ts = GST_BUFFER_DTS (message);
+
+ if (GST_CLOCK_TIME_IS_VALID (last_ts) && last_ts > ts) {
+ GST_LOG_OBJECT (self, "Timestamp regression: %" GST_TIME_FORMAT
+ " > %" GST_TIME_FORMAT, GST_TIME_ARGS (last_ts), GST_TIME_ARGS (ts));
+ }
+
+ self->last_ts = ts;
+ timestamp = ts / GST_MSECOND;
+ }
+
+ buffer = gst_buffer_copy_region (message, GST_BUFFER_COPY_MEMORY, 0, -1);
+
+ {
+ guint8 *tag_header = g_malloc (11);
+ GstMemory *memory =
+ gst_memory_new_wrapped (0, tag_header, 11, 0, 11, tag_header, g_free);
+ GST_WRITE_UINT8 (tag_header, meta->type);
+ GST_WRITE_UINT24_BE (tag_header + 1, meta->size);
+ GST_WRITE_UINT24_BE (tag_header + 4, timestamp);
+ GST_WRITE_UINT8 (tag_header + 7, timestamp >> 24);
+ GST_WRITE_UINT24_BE (tag_header + 8, 0);
+ gst_buffer_prepend_memory (buffer, memory);
+ }
+
+ {
+ guint8 *tag_footer = g_malloc (4);
+ GstMemory *memory =
+ gst_memory_new_wrapped (0, tag_footer, 4, 0, 4, tag_footer, g_free);
+ GST_WRITE_UINT32_BE (tag_footer, meta->size + 11);
+ gst_buffer_append_memory (buffer, memory);
+ }
+
+ if (!self->sent_header) {
+ GstMemory *memory = gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY,
+ (guint8 *) flv_header_data, sizeof flv_header_data, 0,
+ sizeof flv_header_data, NULL, NULL);
+ gst_buffer_prepend_memory (buffer, memory);
+ self->sent_header = TRUE;
+ }
+
+ *outbuf = buffer;
+
+ gst_buffer_unref (message);
+ return GST_FLOW_OK;
+}
+
+/* Mainloop task */
+static void
+gst_rtmp2_src_task_func (gpointer user_data)
+{
+ GstRtmp2Src *self = GST_RTMP2_SRC (user_data);
+ GMainContext *context;
+ GMainLoop *loop;
+ GTask *connector;
+
+ GST_DEBUG_OBJECT (self, "gst_rtmp2_src_task starting");
+
+ g_mutex_lock (&self->lock);
+ context = self->context = g_main_context_new ();
+ g_main_context_push_thread_default (context);
+ loop = self->loop = g_main_loop_new (context, TRUE);
+ connector = g_task_new (self, self->cancellable, connect_task_done, NULL);
+ GST_OBJECT_LOCK (self);
+ gst_rtmp_client_connect_async (&self->location, self->cancellable,
+ client_connect_done, connector);
+ GST_OBJECT_UNLOCK (self);
+ g_mutex_unlock (&self->lock);
+
+ g_main_loop_run (loop);
+
+ g_mutex_lock (&self->lock);
+ g_clear_pointer (&self->loop, g_main_loop_unref);
+ g_clear_pointer (&self->connection, gst_rtmp_connection_close_and_unref);
+ g_cond_broadcast (&self->cond);
+ g_mutex_unlock (&self->lock);
+
+ while (g_main_context_pending (context)) {
+ GST_DEBUG_OBJECT (self, "iterating main context to clean up");
+ g_main_context_iteration (context, FALSE);
+ }
+
+ g_main_context_pop_thread_default (context);
+
+ g_mutex_lock (&self->lock);
+ g_clear_pointer (&self->context, g_main_context_unref);
+ gst_buffer_replace (&self->message, NULL);
+ g_mutex_unlock (&self->lock);
+
+ GST_DEBUG_OBJECT (self, "gst_rtmp2_src_task exiting");
+}
+
+static void
+client_connect_done (GObject * source, GAsyncResult * result,
+ gpointer user_data)
+{
+ GTask *task = user_data;
+ GstRtmp2Src *self = g_task_get_source_object (task);
+ GError *error = NULL;
+ GstRtmpConnection *connection;
+
+ connection = gst_rtmp_client_connect_finish (result, &error);
+ if (!connection) {
+ g_task_return_error (task, error);
+ g_object_unref (task);
+ return;
+ }
+
+ g_task_set_task_data (task, connection, g_object_unref);
+
+ if (g_task_return_error_if_cancelled (task)) {
+ g_object_unref (task);
+ return;
+ }
+
+ GST_OBJECT_LOCK (self);
+ gst_rtmp_client_start_play_async (connection, self->location.stream,
+ g_task_get_cancellable (task), start_play_done, task);
+ GST_OBJECT_UNLOCK (self);
+}
+
+static void
+start_play_done (GObject * source, GAsyncResult * result, gpointer user_data)
+{
+ GTask *task = G_TASK (user_data);
+ GstRtmp2Src *self = g_task_get_source_object (task);
+ GstRtmpConnection *connection = g_task_get_task_data (task);
+ GError *error = NULL;
+
+ if (g_task_return_error_if_cancelled (task)) {
+ g_object_unref (task);
+ return;
+ }
+
+ if (gst_rtmp_client_start_play_finish (connection, result,
+ &self->stream_id, &error)) {
+ g_task_return_pointer (task, g_object_ref (connection),
+ gst_rtmp_connection_close_and_unref);
+ } else {
+ g_task_return_error (task, error);
+ }
+
+ g_task_set_task_data (task, NULL, NULL);
+ g_object_unref (task);
+}
+
+static void
+got_message (GstRtmpConnection * connection, GstBuffer * buffer,
+ gpointer user_data)
+{
+ GstRtmp2Src *self = GST_RTMP2_SRC (user_data);
+ GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (buffer);
+ guint32 min_size = 1;
+
+ g_return_if_fail (meta);
+
+ if (meta->mstream != self->stream_id) {
+ GST_DEBUG_OBJECT (self, "Ignoring %s message with stream %" G_GUINT32_FORMAT
+ " != %" G_GUINT32_FORMAT, gst_rtmp_message_type_get_nick (meta->type),
+ meta->mstream, self->stream_id);
+ return;
+ }
+
+ switch (meta->type) {
+ case GST_RTMP_MESSAGE_TYPE_VIDEO:
+ min_size = 6;
+ break;
+
+ case GST_RTMP_MESSAGE_TYPE_AUDIO:
+ min_size = 2;
+ break;
+
+ case GST_RTMP_MESSAGE_TYPE_DATA_AMF0:
+ break;
+
+ default:
+ GST_DEBUG_OBJECT (self, "Ignoring %s message, wrong type",
+ gst_rtmp_message_type_get_nick (meta->type));
+ return;
+ }
+
+ if (meta->size < min_size) {
+ GST_DEBUG_OBJECT (self, "Ignoring too small %s message (%" G_GUINT32_FORMAT
+ " < %" G_GUINT32_FORMAT ")",
+ gst_rtmp_message_type_get_nick (meta->type), meta->size, min_size);
+ return;
+ }
+
+ g_mutex_lock (&self->lock);
+ while (self->message) {
+ if (!self->running) {
+ goto out;
+ }
+ g_cond_wait (&self->cond, &self->lock);
+ }
+
+ self->message = gst_buffer_ref (buffer);
+ g_cond_signal (&self->cond);
+
+out:
+ g_mutex_unlock (&self->lock);
+ return;
+}
+
+static void
+error_callback (GstRtmpConnection * connection, GstRtmp2Src * self)
+{
+ g_mutex_lock (&self->lock);
+ if (self->cancellable) {
+ g_cancellable_cancel (self->cancellable);
+ } else if (self->loop) {
+ GST_INFO_OBJECT (self, "Connection error");
+ stop_task (self);
+ }
+ g_mutex_unlock (&self->lock);
+}
+
+static void
+control_callback (GstRtmpConnection * connection, gint uc_type,
+ guint stream_id, GstRtmp2Src * self)
+{
+ GST_INFO_OBJECT (self, "stream %u got %s", stream_id,
+ gst_rtmp_user_control_type_get_nick (uc_type));
+
+ if (uc_type == GST_RTMP_USER_CONTROL_TYPE_STREAM_EOF && stream_id == 1) {
+ GST_INFO_OBJECT (self, "went EOS");
+ stop_task (self);
+ }
+}
+
+static void
+send_connect_error (GstRtmp2Src * self, GError * error)
+{
+ if (!error) {
+ GST_ERROR_OBJECT (self, "Connect failed with NULL error");
+ GST_ELEMENT_ERROR (self, RESOURCE, FAILED, ("Failed to connect"), (NULL));
+ return;
+ }
+
+ if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+ GST_DEBUG_OBJECT (self, "Connection was cancelled (%s)",
+ GST_STR_NULL (error->message));
+ return;
+ }
+
+ GST_ERROR_OBJECT (self, "Failed to connect (%s:%d): %s",
+ g_quark_to_string (error->domain), error->code,
+ GST_STR_NULL (error->message));
+
+ if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED)) {
+ GST_ELEMENT_ERROR (self, RESOURCE, NOT_AUTHORIZED,
+ ("Not authorized to connect"), ("%s", GST_STR_NULL (error->message)));
+ } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CONNECTION_REFUSED)) {
+ GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ,
+ ("Could not connect"), ("%s", GST_STR_NULL (error->message)));
+ } else {
+ GST_ELEMENT_ERROR (self, RESOURCE, FAILED,
+ ("Failed to connect"),
+ ("error %s:%d: %s", g_quark_to_string (error->domain), error->code,
+ GST_STR_NULL (error->message)));
+ }
+}
+
+static void
+connect_task_done (GObject * object, GAsyncResult * result, gpointer user_data)
+{
+ GstRtmp2Src *self = GST_RTMP2_SRC (object);
+ GTask *task = G_TASK (result);
+ GError *error = NULL;
+
+ g_mutex_lock (&self->lock);
+
+ g_warn_if_fail (g_task_is_valid (task, object));
+
+ if (self->cancellable == g_task_get_cancellable (task)) {
+ g_clear_object (&self->cancellable);
+ }
+
+ self->connection = g_task_propagate_pointer (task, &error);
+ if (self->connection) {
+ gst_rtmp_connection_set_input_handler (self->connection,
+ got_message, g_object_ref (self), g_object_unref);
+ g_signal_connect_object (self->connection, "error",
+ G_CALLBACK (error_callback), self, 0);
+ g_signal_connect_object (self->connection, "stream-control",
+ G_CALLBACK (control_callback), self, 0);
+ } else {
+ send_connect_error (self, error);
+ stop_task (self);
+ g_error_free (error);
+ }
+
+ g_cond_broadcast (&self->cond);
+ g_mutex_unlock (&self->lock);
+}
diff --git a/gst/rtmp2/gstrtmp2src.h b/gst/rtmp2/gstrtmp2src.h
new file mode 100644
index 000000000..e2c40a66e
--- /dev/null
+++ b/gst/rtmp2/gstrtmp2src.h
@@ -0,0 +1,34 @@
+/* GStreamer
+ * Copyright (C) 2014 David Schleef <ds@schleef.org>
+ * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
+ * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef _GST_RTMP2_SRC_H_
+
+#define _GST_RTMP2_SRC_H_
+
+#include <gst/gst.h>
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_RTMP2_SRC (gst_rtmp2_src_get_type())
+GType gst_rtmp2_src_get_type (void);
+
+G_END_DECLS
+#endif
diff --git a/gst/rtmp2/meson.build b/gst/rtmp2/meson.build
new file mode 100644
index 000000000..c67a24867
--- /dev/null
+++ b/gst/rtmp2/meson.build
@@ -0,0 +1,24 @@
+rtmp2_sources = [
+ 'gstrtmp2.c',
+ 'gstrtmp2locationhandler.c',
+ 'gstrtmp2sink.c',
+ 'gstrtmp2src.c',
+ 'rtmp/amf.c',
+ 'rtmp/rtmpchunkstream.c',
+ 'rtmp/rtmpclient.c',
+ 'rtmp/rtmpconnection.c',
+ 'rtmp/rtmphandshake.c',
+ 'rtmp/rtmpmessage.c',
+ 'rtmp/rtmputils.c',
+]
+
+gstrtmp2 = library('gstrtmp2',
+ rtmp2_sources,
+ c_args : gst_plugins_bad_args,
+ include_directories : [configinc, libsinc],
+ dependencies : [gstbase_dep, gio_dep, libm],
+ install : true,
+ install_dir : plugins_install_dir,
+)
+pkgconfig.generate(gstrtmp2, install_dir : plugins_pkgconfig_install_dir)
+plugins += [gstrtmp2]
diff --git a/gst/rtmp2/rtmp/amf.c b/gst/rtmp2/rtmp/amf.c
new file mode 100644
index 000000000..a6f5b3be0
--- /dev/null
+++ b/gst/rtmp2/rtmp/amf.c
@@ -0,0 +1,1141 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2014 David Schleef <ds@schleef.org>
+ * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
+ * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "amf.h"
+#include "rtmputils.h"
+#include <string.h>
+#include <gst/gst.h>
+
+#define MAX_RECURSION_DEPTH 16
+
+GST_DEBUG_CATEGORY_STATIC (gst_rtmp_amf_debug_category);
+#define GST_CAT_DEFAULT gst_rtmp_amf_debug_category
+
+static GBytes *empty_bytes;
+
+static void
+init_static (void)
+{
+ static volatile gsize done = 0;
+ if (g_once_init_enter (&done)) {
+ empty_bytes = g_bytes_new_static ("", 0);
+ GST_DEBUG_CATEGORY_INIT (gst_rtmp_amf_debug_category, "rtmpamf", 0,
+ "debug category for the amf parser");
+ g_once_init_leave (&done, 1);
+ }
+}
+
+const gchar *
+gst_amf_type_get_nick (GstAmfType type)
+{
+ switch (type) {
+ case GST_AMF_TYPE_INVALID:
+ return "invalid";
+ case GST_AMF_TYPE_NUMBER:
+ return "number";
+ case GST_AMF_TYPE_BOOLEAN:
+ return "boolean";
+ case GST_AMF_TYPE_STRING:
+ return "string";
+ case GST_AMF_TYPE_OBJECT:
+ return "object";
+ case GST_AMF_TYPE_MOVIECLIP:
+ return "movieclip";
+ case GST_AMF_TYPE_NULL:
+ return "null";
+ case GST_AMF_TYPE_UNDEFINED:
+ return "undefined";
+ case GST_AMF_TYPE_REFERENCE:
+ return "reference";
+ case GST_AMF_TYPE_ECMA_ARRAY:
+ return "ecma-array";
+ case GST_AMF_TYPE_OBJECT_END:
+ return "object-end";
+ case GST_AMF_TYPE_STRICT_ARRAY:
+ return "strict-array";
+ case GST_AMF_TYPE_DATE:
+ return "date";
+ case GST_AMF_TYPE_LONG_STRING:
+ return "long-string";
+ case GST_AMF_TYPE_UNSUPPORTED:
+ return "unsupported";
+ case GST_AMF_TYPE_RECORDSET:
+ return "recordset";
+ case GST_AMF_TYPE_XML_DOCUMENT:
+ return "xml-document";
+ case GST_AMF_TYPE_TYPED_OBJECT:
+ return "typed-object";
+ case GST_AMF_TYPE_AVMPLUS_OBJECT:
+ return "avmplus-object";
+ default:
+ return "unknown";
+ }
+}
+
+typedef struct
+{
+ gchar *name;
+ GstAmfNode *value;
+} AmfObjectField;
+
+static void
+amf_object_field_clear (gpointer ptr)
+{
+ AmfObjectField *field = ptr;
+ g_clear_pointer (&field->name, g_free);
+ g_clear_pointer (&field->value, gst_amf_node_free);
+}
+
+struct _GstAmfNode
+{
+ GstAmfType type;
+ union
+ {
+ gint v_int;
+ gdouble v_double;
+ GBytes *v_bytes;
+ GArray *v_fields;
+ GPtrArray *v_elements;
+ } value;
+};
+
+static inline const AmfObjectField *
+get_field (const GstAmfNode * node, guint index)
+{
+ return &g_array_index (node->value.v_fields, const AmfObjectField, index);
+}
+
+static inline void
+append_field (GstAmfNode * node, gchar * name, GstAmfNode * value)
+{
+ AmfObjectField field = {
+ .name = name,
+ .value = value,
+ };
+ g_array_append_val (node->value.v_fields, field);
+}
+
+static inline const GstAmfNode *
+get_element (const GstAmfNode * node, guint index)
+{
+ return g_ptr_array_index (node->value.v_elements, index);
+}
+
+static inline void
+append_element (GstAmfNode * node, GstAmfNode * value)
+{
+ g_ptr_array_add (node->value.v_elements, value);
+}
+
+static GstAmfNode *
+node_new (GstAmfType type)
+{
+ GstAmfNode *node;
+
+ init_static ();
+
+ node = g_slice_alloc0 (sizeof *node);
+ node->type = type;
+
+ switch (type) {
+ case GST_AMF_TYPE_STRING:
+ case GST_AMF_TYPE_LONG_STRING:
+ node->value.v_bytes = g_bytes_ref (empty_bytes);
+ break;
+
+ case GST_AMF_TYPE_OBJECT:
+ case GST_AMF_TYPE_ECMA_ARRAY:
+ node->value.v_fields =
+ g_array_new (FALSE, FALSE, sizeof (AmfObjectField));
+ g_array_set_clear_func (node->value.v_fields, amf_object_field_clear);
+ break;
+
+ case GST_AMF_TYPE_STRICT_ARRAY:
+ node->value.v_elements =
+ g_ptr_array_new_with_free_func (gst_amf_node_free);
+ break;
+
+ default:
+ break;
+ }
+
+ return node;
+}
+
+GstAmfNode *
+gst_amf_node_new_null (void)
+{
+ return node_new (GST_AMF_TYPE_NULL);
+}
+
+GstAmfNode *
+gst_amf_node_new_boolean (gboolean value)
+{
+ GstAmfNode *node = node_new (GST_AMF_TYPE_BOOLEAN);
+ node->value.v_int = ! !value;
+ return node;
+}
+
+GstAmfNode *
+gst_amf_node_new_number (gdouble value)
+{
+ GstAmfNode *node = node_new (GST_AMF_TYPE_NUMBER);
+ node->value.v_double = value;
+ return node;
+}
+
+GstAmfNode *
+gst_amf_node_new_string (const gchar * value, gssize size)
+{
+ GstAmfNode *node = node_new (GST_AMF_TYPE_STRING);
+ gst_amf_node_set_string (node, value, size);
+ return node;
+}
+
+GstAmfNode *
+gst_amf_node_new_take_string (gchar * value, gssize size)
+{
+ GstAmfNode *node = node_new (GST_AMF_TYPE_STRING);
+ gst_amf_node_take_string (node, value, size);
+ return node;
+}
+
+GstAmfNode *
+gst_amf_node_new_object (void)
+{
+ return node_new (GST_AMF_TYPE_OBJECT);
+}
+
+GstAmfNode *
+gst_amf_node_copy (const GstAmfNode * node)
+{
+ GstAmfNode *copy;
+
+ g_return_val_if_fail (node, NULL);
+
+ copy = node_new (node->type);
+
+ switch (node->type) {
+ case GST_AMF_TYPE_STRING:
+ case GST_AMF_TYPE_LONG_STRING:
+ copy->value.v_bytes = g_bytes_ref (node->value.v_bytes);
+ break;
+
+ case GST_AMF_TYPE_OBJECT:
+ case GST_AMF_TYPE_ECMA_ARRAY:{
+ guint i, len = gst_amf_node_get_num_fields (node);
+ for (i = 0; i < len; i++) {
+ const AmfObjectField *field = get_field (node, i);
+ append_field (copy, g_strdup (field->name),
+ gst_amf_node_copy (field->value));
+ }
+ break;
+ }
+
+ case GST_AMF_TYPE_STRICT_ARRAY:{
+ guint i, len = gst_amf_node_get_num_elements (node);
+ for (i = 0; i < len; i++) {
+ const GstAmfNode *elem = get_element (node, i);
+ append_element (copy, gst_amf_node_copy (elem));
+ }
+ break;
+ }
+
+ default:
+ copy->value = node->value;
+ break;
+ }
+
+ return copy;
+}
+
+void
+gst_amf_node_free (gpointer ptr)
+{
+ GstAmfNode *node = ptr;
+
+ switch (node->type) {
+ case GST_AMF_TYPE_STRING:
+ case GST_AMF_TYPE_LONG_STRING:
+ g_bytes_unref (node->value.v_bytes);
+ break;
+
+ case GST_AMF_TYPE_OBJECT:
+ case GST_AMF_TYPE_ECMA_ARRAY:
+ g_array_unref (node->value.v_fields);
+ break;
+
+ case GST_AMF_TYPE_STRICT_ARRAY:
+ g_ptr_array_unref (node->value.v_elements);
+ break;
+
+ default:
+ break;
+ }
+
+ g_slice_free (GstAmfNode, node);
+}
+
+GstAmfType
+gst_amf_node_get_type (const GstAmfNode * node)
+{
+ g_return_val_if_fail (node, GST_AMF_TYPE_INVALID);
+ return node->type;
+}
+
+gboolean
+gst_amf_node_get_boolean (const GstAmfNode * node)
+{
+ g_return_val_if_fail (gst_amf_node_get_type (node) == GST_AMF_TYPE_BOOLEAN,
+ FALSE);
+ return node->value.v_int;
+}
+
+gdouble
+gst_amf_node_get_number (const GstAmfNode * node)
+{
+ g_return_val_if_fail (gst_amf_node_get_type (node) == GST_AMF_TYPE_NUMBER,
+ FALSE);
+ return node->value.v_double;
+}
+
+gchar *
+gst_amf_node_get_string (const GstAmfNode * node, gsize * out_size)
+{
+ gsize size;
+ const gchar *data = gst_amf_node_peek_string (node, &size);
+
+ if (out_size) {
+ *out_size = size;
+ return g_memdup (data, size);
+ } else {
+ return g_strndup (data, size);
+ }
+}
+
+const gchar *
+gst_amf_node_peek_string (const GstAmfNode * node, gsize * size)
+{
+ GstAmfType type = gst_amf_node_get_type (node);
+ g_return_val_if_fail (type == GST_AMF_TYPE_STRING ||
+ type == GST_AMF_TYPE_LONG_STRING, FALSE);
+ return g_bytes_get_data (node->value.v_bytes, size);
+}
+
+const GstAmfNode *
+gst_amf_node_get_field (const GstAmfNode * node, const gchar * name)
+{
+ guint i, len = gst_amf_node_get_num_fields (node);
+
+ g_return_val_if_fail (name, NULL);
+
+ for (i = 0; i < len; i++) {
+ const AmfObjectField *field = get_field (node, i);
+ if (strcmp (field->name, name) == 0) {
+ return field->value;
+ }
+ }
+
+ return NULL;
+}
+
+const GstAmfNode *
+gst_amf_node_get_field_by_index (const GstAmfNode * node, guint index)
+{
+ guint len = gst_amf_node_get_num_fields (node);
+ g_return_val_if_fail (index < len, NULL);
+ return get_field (node, index)->value;
+}
+
+guint
+gst_amf_node_get_num_fields (const GstAmfNode * node)
+{
+ GstAmfType type = gst_amf_node_get_type (node);
+ g_return_val_if_fail (type == GST_AMF_TYPE_OBJECT ||
+ type == GST_AMF_TYPE_ECMA_ARRAY, 0);
+ return node->value.v_fields->len;
+}
+
+const GstAmfNode *
+gst_amf_node_get_element (const GstAmfNode * node, guint index)
+{
+ guint len = gst_amf_node_get_num_elements (node);
+ g_return_val_if_fail (index < len, NULL);
+ return get_element (node, index);
+}
+
+guint
+gst_amf_node_get_num_elements (const GstAmfNode * node)
+{
+ GstAmfType type = gst_amf_node_get_type (node);
+ g_return_val_if_fail (type == GST_AMF_TYPE_STRICT_ARRAY, 0);
+ return node->value.v_elements->len;
+}
+
+void
+gst_amf_node_set_boolean (GstAmfNode * node, gboolean value)
+{
+ g_return_if_fail (node->type == GST_AMF_TYPE_BOOLEAN);
+ node->value.v_int = ! !value;
+}
+
+void
+gst_amf_node_set_number (GstAmfNode * node, gdouble value)
+{
+ g_return_if_fail (node->type == GST_AMF_TYPE_NUMBER);
+ node->value.v_double = value;
+}
+
+void
+gst_amf_node_take_string (GstAmfNode * node, gchar * value, gssize size)
+{
+ g_return_if_fail (node->type == GST_AMF_TYPE_STRING ||
+ node->type == GST_AMF_TYPE_LONG_STRING);
+
+ g_return_if_fail (value);
+
+ if (size < 0) {
+ size = strlen (value);
+ }
+
+ if (size > G_MAXUINT32) {
+ GST_WARNING ("Long string too long (%" G_GSSIZE_FORMAT "), truncating",
+ size);
+ size = G_MAXUINT32;
+ value[size] = 0;
+ }
+
+ if (size > G_MAXUINT16) {
+ node->type = GST_AMF_TYPE_LONG_STRING;
+ }
+
+ g_bytes_unref (node->value.v_bytes);
+ node->value.v_bytes = g_bytes_new_take (value, size);
+}
+
+void
+gst_amf_node_set_string (GstAmfNode * node, const gchar * value, gssize size)
+{
+ gchar *copy;
+
+ g_return_if_fail (value);
+
+ if (size < 0) {
+ size = strlen (value);
+ copy = g_memdup (value, size + 1);
+ } else {
+ copy = g_memdup (value, size);
+ }
+
+ gst_amf_node_take_string (node, copy, size);
+}
+
+void
+gst_amf_node_append_field (GstAmfNode * node, const gchar * name,
+ const GstAmfNode * value)
+{
+ gst_amf_node_append_take_field (node, name, gst_amf_node_copy (value));
+}
+
+void
+gst_amf_node_append_take_field (GstAmfNode * node, const gchar * name,
+ GstAmfNode * value)
+{
+ g_return_if_fail (node->type == GST_AMF_TYPE_OBJECT ||
+ node->type == GST_AMF_TYPE_ECMA_ARRAY);
+ g_return_if_fail (name);
+ append_field (node, g_strdup (name), value);
+}
+
+void
+gst_amf_node_append_field_number (GstAmfNode * node, const gchar * name,
+ gdouble value)
+{
+ gst_amf_node_append_take_field (node, name, gst_amf_node_new_number (value));
+}
+
+void
+gst_amf_node_append_field_boolean (GstAmfNode * node, const gchar * name,
+ gboolean value)
+{
+ gst_amf_node_append_take_field (node, name, gst_amf_node_new_boolean (value));
+}
+
+void
+gst_amf_node_append_field_string (GstAmfNode * node, const gchar * name,
+ const gchar * value, gssize size)
+{
+ gst_amf_node_append_take_field (node, name,
+ gst_amf_node_new_string (value, size));
+}
+
+void
+gst_amf_node_append_field_take_string (GstAmfNode * node, const gchar * name,
+ gchar * value, gssize size)
+{
+ gst_amf_node_append_take_field (node, name,
+ gst_amf_node_new_take_string (value, size));
+}
+
+/* Dumper *******************************************************************/
+
+static inline void
+dump_indent (GString * string, gint indent, guint depth)
+{
+ if (indent < 0) {
+ g_string_append_c (string, ' ');
+ } else {
+ guint i;
+ g_string_append_c (string, '\n');
+ for (i = 0; i < indent + depth * 2; i++) {
+ g_string_append_c (string, ' ');
+ }
+ }
+}
+
+static inline void
+dump_bytes (GString * string, GBytes * value)
+{
+ gsize size;
+ const gchar *data = g_bytes_get_data (value, &size);
+ gst_rtmp_string_print_escaped (string, data, size);
+}
+
+static void
+dump_node (GString * string, const GstAmfNode * node, gint indent,
+ guint recursion_depth)
+{
+ const gchar *object_delim = "{}";
+
+ switch (gst_amf_node_get_type (node)) {
+ case GST_AMF_TYPE_NUMBER:
+ g_string_append_printf (string, "%g", node->value.v_double);
+ break;
+
+ case GST_AMF_TYPE_BOOLEAN:
+ g_string_append (string, node->value.v_int ? "True" : "False");
+ break;
+
+ case GST_AMF_TYPE_LONG_STRING:
+ g_string_append_c (string, 'L');
+ /* no break */
+ case GST_AMF_TYPE_STRING:
+ dump_bytes (string, node->value.v_bytes);
+ break;
+
+ case GST_AMF_TYPE_ECMA_ARRAY:
+ object_delim = "[]";
+ /* no break */
+ case GST_AMF_TYPE_OBJECT:{
+ guint i, len = gst_amf_node_get_num_fields (node);
+ g_string_append_c (string, object_delim[0]);
+ if (len) {
+ for (i = 0; i < len; i++) {
+ const AmfObjectField *field = get_field (node, i);
+ dump_indent (string, indent, recursion_depth + 1);
+ gst_rtmp_string_print_escaped (string, field->name, -1);
+ g_string_append_c (string, ':');
+ g_string_append_c (string, ' ');
+ dump_node (string, field->value, indent, recursion_depth + 1);
+ if (i < len - 1) {
+ g_string_append_c (string, ',');
+ }
+ }
+ dump_indent (string, indent, recursion_depth);
+ }
+ g_string_append_c (string, object_delim[1]);
+ break;
+ }
+
+ case GST_AMF_TYPE_STRICT_ARRAY:{
+ guint i, len = gst_amf_node_get_num_elements (node);
+ g_string_append_c (string, '(');
+ if (len) {
+ for (i = 0; i < len; i++) {
+ const GstAmfNode *value = get_element (node, i);
+ dump_indent (string, indent, recursion_depth + 1);
+ dump_node (string, value, indent, recursion_depth + 1);
+ if (i < len - 1) {
+ g_string_append_c (string, ',');
+ }
+ }
+ dump_indent (string, indent, recursion_depth);
+ }
+ g_string_append_c (string, ')');
+ break;
+ }
+
+ default:
+ g_string_append (string, gst_amf_type_get_nick (node->type));
+ break;
+ }
+}
+
+void
+gst_amf_node_dump (const GstAmfNode * node, gint indent, GString * string)
+{
+ dump_node (string, node, indent, 0);
+}
+
+static void
+dump_argument (const GstAmfNode * node, guint n)
+{
+ if (G_UNLIKELY (GST_LEVEL_LOG <= _gst_debug_min) &&
+ GST_LEVEL_LOG <= gst_debug_category_get_threshold (GST_CAT_DEFAULT)) {
+ GString *string = g_string_new (NULL);
+ gst_amf_node_dump (node, -1, string);
+ GST_LOG ("Argument #%u: %s", n, string->str);
+ g_string_free (string, TRUE);
+ }
+}
+
+/* Parser *******************************************************************/
+
+typedef struct
+{
+ const guint8 *data;
+ gsize size, offset;
+ guint8 recursion_depth;
+} AmfParser;
+
+static GstAmfNode *parse_value (AmfParser * parser);
+
+static inline guint8
+parse_u8 (AmfParser * parser)
+{
+ guint8 value;
+ value = parser->data[parser->offset];
+ parser->offset += sizeof value;
+ return value;
+}
+
+static inline guint16
+parse_u16 (AmfParser * parser)
+{
+ guint16 value;
+ value = GST_READ_UINT16_BE (parser->data + parser->offset);
+ parser->offset += sizeof value;
+ return value;
+}
+
+static inline guint32
+parse_u32 (AmfParser * parser)
+{
+ guint32 value;
+ value = GST_READ_UINT32_BE (parser->data + parser->offset);
+ parser->offset += sizeof value;
+ return value;
+}
+
+static gdouble
+parse_number (AmfParser * parser)
+{
+ gdouble value;
+
+ if (sizeof value > parser->size - parser->offset) {
+ GST_ERROR ("number too long");
+ return 0.0;
+ }
+
+ value = GST_READ_DOUBLE_BE (parser->data + parser->offset);
+ parser->offset += sizeof value;
+ return value;
+}
+
+static gboolean
+parse_boolean (AmfParser * parser)
+{
+ guint8 value;
+
+ if (sizeof value > parser->size - parser->offset) {
+ GST_ERROR ("boolean too long");
+ return FALSE;
+ }
+
+ value = parse_u8 (parser);
+ return ! !value;
+}
+
+static inline GBytes *
+read_string (AmfParser * parser, gsize size)
+{
+ gchar *string;
+
+ if (size == 0) {
+ return g_bytes_ref (empty_bytes);
+ }
+
+ if (size > parser->size - parser->offset) {
+ GST_ERROR ("string too long (%" G_GSIZE_FORMAT ")", size);
+ return NULL;
+ }
+
+ /* Null-terminate all incoming strings for internal safety */
+ if (parser->data[parser->offset + size - 1] == 0) {
+ string = g_malloc (size);
+ } else {
+ string = g_malloc (size + 1);
+ string[size] = 0;
+ }
+
+ memcpy (string, parser->data + parser->offset, size);
+
+ parser->offset += size;
+ return g_bytes_new_take (string, size);
+}
+
+static GBytes *
+parse_string (AmfParser * parser)
+{
+ guint16 size;
+
+ if (sizeof size > parser->size - parser->offset) {
+ GST_ERROR ("string size too long");
+ return NULL;
+ }
+
+ size = parse_u16 (parser);
+ return read_string (parser, size);
+}
+
+static GBytes *
+parse_long_string (AmfParser * parser)
+{
+ guint32 size;
+
+ if (sizeof size > parser->size - parser->offset) {
+ GST_ERROR ("long string size too long");
+ return NULL;
+ }
+
+ size = parse_u32 (parser);
+ return read_string (parser, size);
+}
+
+static guint32
+parse_object (AmfParser * parser, GstAmfNode * node)
+{
+ guint32 n_fields = 0;
+
+ while (TRUE) {
+ GBytes *name;
+ gsize size;
+ GstAmfNode *value;
+
+ name = parse_string (parser);
+ if (!name) {
+ GST_ERROR ("object too long");
+ break;
+ }
+
+ value = parse_value (parser);
+ if (!value) {
+ GST_ERROR ("object too long");
+ g_bytes_unref (name);
+ break;
+ }
+
+ if (gst_amf_node_get_type (value) == GST_AMF_TYPE_OBJECT_END) {
+ g_bytes_unref (name);
+ gst_amf_node_free (value);
+ break;
+ }
+
+ if (g_bytes_get_size (name) == 0) {
+ GST_ERROR ("empty object field name");
+ g_bytes_unref (name);
+ gst_amf_node_free (value);
+ break;
+ }
+
+ append_field (node, g_bytes_unref_to_data (name, &size), value);
+ n_fields++;
+ };
+
+ return n_fields;
+}
+
+static void
+parse_ecma_array (AmfParser * parser, GstAmfNode * node)
+{
+ guint32 n_elements, n_read;
+
+ if (sizeof n_elements > parser->size - parser->offset) {
+ GST_ERROR ("array size too long");
+ return;
+ }
+
+ n_elements = parse_u32 (parser);
+
+ /* FIXME This is weird. The one time I've seen this, the encoded value
+ * was 0, but the number of elements was 1. */
+ if (n_elements == 0) {
+ GST_DEBUG ("Interpreting ECMA array length 0 as 1");
+ n_elements = 1;
+ }
+
+ n_read = parse_object (parser, node);
+
+ if (n_read != n_elements) {
+ GST_WARNING ("Expected array with %" G_GUINT32_FORMAT " elements,"
+ " but read %" G_GUINT32_FORMAT, n_elements, n_read);
+ }
+}
+
+static void
+parse_strict_array (AmfParser * parser, GstAmfNode * node)
+{
+ GstAmfNode *value = NULL;
+ guint32 n_elements, i;
+
+ if (sizeof n_elements > parser->size - parser->offset) {
+ GST_ERROR ("array size too long");
+ return;
+ }
+
+ n_elements = parse_u32 (parser);
+
+ for (i = 0; i < n_elements; i++) {
+ value = parse_value (parser);
+ if (!value) {
+ GST_ERROR ("array too long");
+ break;
+ }
+
+ append_element (node, value);
+ }
+}
+
+static GstAmfNode *
+parse_value (AmfParser * parser)
+{
+ GstAmfNode *node = NULL;
+ GstAmfType type;
+
+ if (1 > parser->size - parser->offset) {
+ GST_ERROR ("value too long");
+ return NULL;
+ }
+
+ type = parse_u8 (parser);
+ node = node_new (type);
+ GST_TRACE ("parsing AMF type %d (%s)", type, gst_amf_type_get_nick (type));
+
+ parser->recursion_depth++;
+ if (parser->recursion_depth > MAX_RECURSION_DEPTH) {
+ GST_ERROR ("maximum recursion depth %d reached", parser->recursion_depth);
+ return node;
+ }
+
+ switch (type) {
+ case GST_AMF_TYPE_NUMBER:
+ node->value.v_double = parse_number (parser);
+ break;
+ case GST_AMF_TYPE_BOOLEAN:
+ node->value.v_int = parse_boolean (parser);
+ break;
+ case GST_AMF_TYPE_STRING:
+ node->value.v_bytes = parse_string (parser);
+ break;
+ case GST_AMF_TYPE_LONG_STRING:
+ node->value.v_bytes = parse_long_string (parser);
+ break;
+ case GST_AMF_TYPE_OBJECT:
+ parse_object (parser, node);
+ break;
+ case GST_AMF_TYPE_ECMA_ARRAY:
+ parse_ecma_array (parser, node);
+ break;
+ case GST_AMF_TYPE_STRICT_ARRAY:
+ parse_strict_array (parser, node);
+ break;
+ case GST_AMF_TYPE_NULL:
+ case GST_AMF_TYPE_UNDEFINED:
+ case GST_AMF_TYPE_OBJECT_END:
+ case GST_AMF_TYPE_UNSUPPORTED:
+ break;
+ default:
+ GST_ERROR ("unimplemented AMF type %d (%s)", type,
+ gst_amf_type_get_nick (type));
+ break;
+ }
+
+ parser->recursion_depth--;
+ return node;
+}
+
+GPtrArray *
+gst_amf_parse_command (const guint8 * data, gsize size,
+ gdouble * transaction_id, gchar ** command_name)
+{
+ AmfParser parser = {
+ .data = data,
+ .size = size,
+ .recursion_depth = 0,
+ };
+ GstAmfNode *node1 = NULL, *node2 = NULL;
+ GPtrArray *args = NULL;
+
+ g_return_val_if_fail (data, NULL);
+ g_return_val_if_fail (size, NULL);
+
+ init_static ();
+
+ GST_TRACE ("Starting parse with %" G_GSIZE_FORMAT " bytes", parser.size);
+
+ node1 = parse_value (&parser);
+ if (gst_amf_node_get_type (node1) != GST_AMF_TYPE_STRING) {
+ GST_ERROR ("no command name");
+ goto out;
+ }
+
+ node2 = parse_value (&parser);
+ if (gst_amf_node_get_type (node2) != GST_AMF_TYPE_NUMBER) {
+ GST_ERROR ("no transaction ID");
+ goto out;
+ }
+
+ GST_LOG ("Parsing command '%s', transid %.0f",
+ gst_amf_node_peek_string (node1, NULL), gst_amf_node_get_number (node2));
+
+ args = g_ptr_array_new_with_free_func (gst_amf_node_free);
+
+ while (parser.offset < parser.size) {
+ GstAmfNode *node = parse_value (&parser);
+ if (!node) {
+ break;
+ }
+
+ dump_argument (node, args->len);
+ g_ptr_array_add (args, node);
+ }
+
+ GST_TRACE ("Done parsing; consumed %" G_GSIZE_FORMAT " bytes and left %"
+ G_GSIZE_FORMAT " bytes", parser.offset, parser.size - parser.offset);
+
+ if (args->len == 0) {
+ GST_ERROR ("no command arguments");
+ g_clear_pointer (&args, g_ptr_array_unref);
+ goto out;
+ }
+
+ if (command_name) {
+ *command_name = gst_amf_node_get_string (node1, NULL);
+ }
+
+ if (transaction_id) {
+ *transaction_id = gst_amf_node_get_number (node2);
+ }
+
+out:
+ g_clear_pointer (&node1, gst_amf_node_free);
+ g_clear_pointer (&node2, gst_amf_node_free);
+ return args;
+}
+
+/* Serializer ***************************************************************/
+
+static void serialize_value (GByteArray * array, const GstAmfNode * node);
+
+static inline void
+serialize_u8 (GByteArray * array, guint8 value)
+{
+ g_byte_array_append (array, (guint8 *) & value, sizeof value);
+}
+
+static inline void
+serialize_u16 (GByteArray * array, guint16 value)
+{
+ value = GUINT16_TO_BE (value);
+ g_byte_array_append (array, (guint8 *) & value, sizeof value);
+}
+
+static inline void
+serialize_u32 (GByteArray * array, guint32 value)
+{
+ value = GUINT32_TO_BE (value);
+ g_byte_array_append (array, (guint8 *) & value, sizeof value);
+}
+
+static inline void
+serialize_number (GByteArray * array, gdouble value)
+{
+ value = GDOUBLE_TO_BE (value);
+ g_byte_array_append (array, (guint8 *) & value, sizeof value);
+}
+
+static inline void
+serialize_boolean (GByteArray * array, gboolean value)
+{
+ serialize_u8 (array, value);
+}
+
+static void
+serialize_string (GByteArray * array, const gchar * string, gssize size)
+{
+ if (size < 0) {
+ size = strlen (string);
+ }
+
+ if (size > G_MAXUINT16) {
+ GST_WARNING ("String too long: %" G_GSSIZE_FORMAT, size);
+ size = G_MAXUINT16;
+ }
+
+ serialize_u16 (array, size);
+ g_byte_array_append (array, (guint8 *) string, size);
+}
+
+static void
+serialize_long_string (GByteArray * array, const gchar * string, gssize size)
+{
+ if (size < 0) {
+ size = strlen (string);
+ }
+
+ if (size > G_MAXUINT32) {
+ GST_WARNING ("Long string too long: %" G_GSSIZE_FORMAT, size);
+ size = G_MAXUINT32;
+ }
+
+ serialize_u32 (array, size);
+ g_byte_array_append (array, (guint8 *) string, size);
+}
+
+static inline void
+serialize_bytes (GByteArray * array, GBytes * bytes, gboolean long_string)
+{
+ gsize size;
+ const gchar *data = g_bytes_get_data (bytes, &size);
+
+ if (long_string) {
+ serialize_long_string (array, data, size);
+ } else {
+ serialize_string (array, data, size);
+ }
+}
+
+static void
+serialize_object (GByteArray * array, const GstAmfNode * node)
+{
+ guint i;
+
+ for (i = 0; i < gst_amf_node_get_num_fields (node); i++) {
+ const AmfObjectField *field = get_field (node, i);
+ serialize_string (array, field->name, -1);
+ serialize_value (array, field->value);
+ }
+ serialize_u16 (array, 0);
+ serialize_u8 (array, GST_AMF_TYPE_OBJECT_END);
+}
+
+static void
+serialize_ecma_array (GByteArray * array, const GstAmfNode * node)
+{
+ /* FIXME: Shouldn't this be the field count? */
+ serialize_u32 (array, 0);
+ serialize_object (array, node);
+}
+
+static void
+serialize_value (GByteArray * array, const GstAmfNode * node)
+{
+ serialize_u8 (array, node->type);
+ switch (node->type) {
+ case GST_AMF_TYPE_NUMBER:
+ serialize_number (array, node->value.v_double);
+ break;
+ case GST_AMF_TYPE_BOOLEAN:
+ serialize_boolean (array, node->value.v_int);
+ break;
+ case GST_AMF_TYPE_STRING:
+ serialize_bytes (array, node->value.v_bytes, FALSE);
+ break;
+ case GST_AMF_TYPE_LONG_STRING:
+ serialize_bytes (array, node->value.v_bytes, TRUE);
+ break;
+ case GST_AMF_TYPE_OBJECT:
+ serialize_object (array, node);
+ break;
+ case GST_AMF_TYPE_ECMA_ARRAY:
+ serialize_ecma_array (array, node);
+ break;
+ case GST_AMF_TYPE_NULL:
+ case GST_AMF_TYPE_UNDEFINED:
+ case GST_AMF_TYPE_OBJECT_END:
+ case GST_AMF_TYPE_UNSUPPORTED:
+ break;
+ default:
+ GST_ERROR ("unimplemented AMF type %d (%s)", node->type,
+ gst_amf_type_get_nick (node->type));
+ break;
+ }
+}
+
+GBytes *
+gst_amf_serialize_command (gdouble transaction_id, const gchar * command_name,
+ const GstAmfNode * argument, ...)
+{
+ va_list ap;
+ GBytes *ret;
+
+ va_start (ap, argument);
+ ret = gst_amf_serialize_command_valist (transaction_id, command_name,
+ argument, ap);
+ va_end (ap);
+
+ return ret;
+}
+
+GBytes *
+gst_amf_serialize_command_valist (gdouble transaction_id,
+ const gchar * command_name, const GstAmfNode * argument, va_list var_args)
+{
+ GByteArray *array = g_byte_array_new ();
+ guint i = 0;
+
+ g_return_val_if_fail (command_name, NULL);
+ g_return_val_if_fail (argument, NULL);
+
+ GST_LOG ("Serializing command '%s', transid %.0f", command_name,
+ transaction_id);
+
+ serialize_u8 (array, GST_AMF_TYPE_STRING);
+ serialize_string (array, command_name, -1);
+ serialize_u8 (array, GST_AMF_TYPE_NUMBER);
+ serialize_number (array, transaction_id);
+
+ while (argument) {
+ serialize_value (array, argument);
+ dump_argument (argument, i++);
+
+ argument = va_arg (var_args, const GstAmfNode *);
+ }
+
+ GST_TRACE ("Done serializing; consumed %u args and produced %u bytes", i,
+ array->len);
+
+ return g_byte_array_free_to_bytes (array);
+}
diff --git a/gst/rtmp2/rtmp/amf.h b/gst/rtmp2/rtmp/amf.h
new file mode 100644
index 000000000..639456e4f
--- /dev/null
+++ b/gst/rtmp2/rtmp/amf.h
@@ -0,0 +1,112 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2014 David Schleef <ds@schleef.org>
+ * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
+ * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef _GST_RTMP_AMF_H_
+#define _GST_RTMP_AMF_H_
+
+#include <glib.h>
+
+G_BEGIN_DECLS
+
+typedef enum
+{
+ GST_AMF_TYPE_INVALID = -1,
+ GST_AMF_TYPE_NUMBER = 0,
+ GST_AMF_TYPE_BOOLEAN = 1,
+ GST_AMF_TYPE_STRING = 2,
+ GST_AMF_TYPE_OBJECT = 3,
+ GST_AMF_TYPE_MOVIECLIP = 4,
+ GST_AMF_TYPE_NULL = 5,
+ GST_AMF_TYPE_UNDEFINED = 6,
+ GST_AMF_TYPE_REFERENCE = 7,
+ GST_AMF_TYPE_ECMA_ARRAY = 8,
+ GST_AMF_TYPE_OBJECT_END = 9,
+ GST_AMF_TYPE_STRICT_ARRAY = 10,
+ GST_AMF_TYPE_DATE = 11,
+ GST_AMF_TYPE_LONG_STRING = 12,
+ GST_AMF_TYPE_UNSUPPORTED = 13,
+ GST_AMF_TYPE_RECORDSET = 14,
+ GST_AMF_TYPE_XML_DOCUMENT = 15,
+ GST_AMF_TYPE_TYPED_OBJECT = 16,
+ GST_AMF_TYPE_AVMPLUS_OBJECT = 17
+} GstAmfType;
+
+const gchar * gst_amf_type_get_nick (GstAmfType type);
+
+typedef struct _GstAmfNode GstAmfNode;
+
+GstAmfNode * gst_amf_node_new_null (void);
+GstAmfNode * gst_amf_node_new_number (gdouble value);
+GstAmfNode * gst_amf_node_new_boolean (gboolean value);
+GstAmfNode * gst_amf_node_new_string (const gchar * value, gssize size);
+GstAmfNode * gst_amf_node_new_take_string (gchar * value, gssize size);
+GstAmfNode * gst_amf_node_new_object (void);
+
+GstAmfNode * gst_amf_node_copy (const GstAmfNode * node);
+void gst_amf_node_free (gpointer ptr);
+
+GstAmfType gst_amf_node_get_type (const GstAmfNode * node);
+gdouble gst_amf_node_get_number (const GstAmfNode * node);
+gboolean gst_amf_node_get_boolean (const GstAmfNode * node);
+gchar * gst_amf_node_get_string (const GstAmfNode * node, gsize * size);
+const gchar * gst_amf_node_peek_string (const GstAmfNode * node, gsize * size);
+
+const GstAmfNode * gst_amf_node_get_field (const GstAmfNode * node,
+ const gchar * name);
+const GstAmfNode * gst_amf_node_get_field_by_index (const GstAmfNode * node,
+ guint index);
+guint gst_amf_node_get_num_fields (const GstAmfNode * node);
+
+const GstAmfNode * gst_amf_node_get_element (const GstAmfNode * node,
+ guint index);
+guint gst_amf_node_get_num_elements (const GstAmfNode * node);
+
+void gst_amf_node_set_number (GstAmfNode * node, gdouble value);
+void gst_amf_node_set_boolean (GstAmfNode * node, gboolean value);
+void gst_amf_node_set_string (GstAmfNode * node, const gchar * value, gssize size);
+void gst_amf_node_take_string (GstAmfNode * node, gchar * value, gssize size);
+
+void gst_amf_node_append_field (GstAmfNode * node,
+ const gchar * name, const GstAmfNode * value);
+void gst_amf_node_append_take_field (GstAmfNode * node,
+ const gchar * name, GstAmfNode * value);
+void gst_amf_node_append_field_number (GstAmfNode * node,
+ const gchar * name, gdouble value);
+void gst_amf_node_append_field_boolean (GstAmfNode * node,
+ const gchar * name, gboolean value);
+void gst_amf_node_append_field_string (GstAmfNode * node,
+ const gchar * name, const gchar * value, gssize size);
+void gst_amf_node_append_field_take_string (GstAmfNode * node,
+ const gchar * name, gchar * value, gssize size);
+
+void gst_amf_node_dump (const GstAmfNode * node, gint indent,
+ GString * string);
+
+GPtrArray * gst_amf_parse_command (const guint8 * data, gsize size,
+ gdouble * transaction_id, gchar ** command_name);
+
+GBytes * gst_amf_serialize_command (gdouble transaction_id,
+ const gchar * command_name, const GstAmfNode * argument, ...) G_GNUC_NULL_TERMINATED;
+GBytes * gst_amf_serialize_command_valist (gdouble transaction_id,
+ const gchar * command_name, const GstAmfNode * argument, va_list va_args);
+
+G_END_DECLS
+#endif
diff --git a/gst/rtmp2/rtmp/rtmpchunkstream.c b/gst/rtmp2/rtmp/rtmpchunkstream.c
new file mode 100644
index 000000000..95cb267a5
--- /dev/null
+++ b/gst/rtmp2/rtmp/rtmpchunkstream.c
@@ -0,0 +1,714 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
+ * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+ * Boston, MA 02110-1335, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "rtmpchunkstream.h"
+#include "rtmputils.h"
+
+GST_DEBUG_CATEGORY_STATIC (gst_rtmp_chunk_stream_debug_category);
+#define GST_CAT_DEFAULT gst_rtmp_chunk_stream_debug_category
+
+static void
+init_debug (void)
+{
+ static volatile gsize done = 0;
+ if (g_once_init_enter (&done)) {
+ GST_DEBUG_CATEGORY_INIT (gst_rtmp_chunk_stream_debug_category,
+ "rtmpchunkstream", 0, "debug category for rtmp chunk streams");
+ g_once_init_leave (&done, 1);
+ }
+}
+
+enum
+{
+ CHUNK_BYTE_TWOBYTE = 0,
+ CHUNK_BYTE_THREEBYTE = 1,
+ CHUNK_BYTE_MASK = 0x3f,
+ CHUNK_STREAM_MIN_TWOBYTE = 0x40,
+ CHUNK_STREAM_MIN_THREEBYTE = 0x140,
+ CHUNK_STREAM_MAX_THREEBYTE = 0x1003f,
+};
+
+typedef enum
+{
+ CHUNK_TYPE_0 = 0,
+ CHUNK_TYPE_1 = 1,
+ CHUNK_TYPE_2 = 2,
+ CHUNK_TYPE_3 = 3,
+} ChunkType;
+
+static const gsize chunk_header_sizes[4] = { 11, 7, 3, 0 };
+
+struct _GstRtmpChunkStream
+{
+ GstBuffer *buffer;
+ GstRtmpMeta *meta;
+ GstMapInfo map; /* Only used for parsing */
+ guint32 id;
+ guint32 offset;
+ guint64 bytes;
+};
+
+struct _GstRtmpChunkStreams
+{
+ GArray *array;
+};
+
+static inline gboolean
+chunk_stream_is_open (GstRtmpChunkStream * cstream)
+{
+ return cstream->map.data != NULL;
+}
+
+static void
+chunk_stream_take_buffer (GstRtmpChunkStream * cstream, GstBuffer * buffer)
+{
+ GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (buffer);
+ g_assert (meta);
+ g_assert (cstream->buffer == NULL);
+ cstream->buffer = buffer;
+ cstream->meta = meta;
+}
+
+static void
+chunk_stream_clear (GstRtmpChunkStream * cstream)
+{
+ if (chunk_stream_is_open (cstream)) {
+ gst_buffer_unmap (cstream->buffer, &cstream->map);
+ cstream->map.data = NULL;
+ }
+
+ gst_buffer_replace (&cstream->buffer, NULL);
+ cstream->meta = NULL;
+ cstream->offset = 0;
+}
+
+static guint32
+chunk_stream_next_size (GstRtmpChunkStream * cstream, guint32 chunk_size)
+{
+ guint32 size, offset;
+
+ size = cstream->meta->size;
+ offset = cstream->offset;
+
+ g_return_val_if_fail (offset <= size, 0);
+ return MIN (size - offset, chunk_size);
+}
+
+static inline gboolean
+needs_ext_ts (GstRtmpMeta * meta)
+{
+ return meta->ts_delta >= 0xffffff;
+}
+
+
+static guint32
+dts_to_abs_ts (GstBuffer * buffer)
+{
+ GstClockTime dts = GST_BUFFER_DTS (buffer);
+ guint32 ret = 0;
+
+ if (GST_CLOCK_TIME_IS_VALID (dts)) {
+ ret = gst_util_uint64_scale_round (dts, 1, GST_MSECOND);
+ }
+
+ GST_TRACE ("Converted DTS %" GST_TIME_FORMAT " into abs TS %"
+ G_GUINT32_FORMAT " ms", GST_TIME_ARGS (dts), ret);
+ return ret;
+}
+
+static gboolean
+dts_diff_to_delta_ts (GstBuffer * old_buffer, GstBuffer * buffer,
+ guint32 * out_ts)
+{
+ GstClockTime dts = GST_BUFFER_DTS (buffer),
+ old_dts = GST_BUFFER_DTS (old_buffer);
+ guint32 abs_ts, old_abs_ts, delta_32 = 0;
+
+ if (!GST_CLOCK_TIME_IS_VALID (dts) || !GST_CLOCK_TIME_IS_VALID (old_dts)) {
+ GST_LOG ("Timestamps not valid; using delta TS 0");
+ goto out;
+ }
+
+ if (ABS (GST_CLOCK_DIFF (old_dts, dts)) > GST_MSECOND * G_MAXINT32) {
+ GST_WARNING ("Timestamp delta too large: %" GST_TIME_FORMAT " -> %"
+ GST_TIME_FORMAT, GST_TIME_ARGS (old_dts), GST_TIME_ARGS (dts));
+ return FALSE;
+ }
+
+ abs_ts = gst_util_uint64_scale_round (dts, 1, GST_MSECOND);
+ old_abs_ts = gst_util_uint64_scale_round (old_dts, 1, GST_MSECOND);
+
+ /* underflow wraps around */
+ delta_32 = abs_ts - old_abs_ts;
+
+ GST_TRACE ("Converted DTS %" GST_TIME_FORMAT " (%" G_GUINT32_FORMAT
+ " ms) -> %" GST_TIME_FORMAT " (%" G_GUINT32_FORMAT " ms) into delta TS %"
+ G_GUINT32_FORMAT " ms", GST_TIME_ARGS (old_dts), old_abs_ts,
+ GST_TIME_ARGS (dts), abs_ts, delta_32);
+
+out:
+ *out_ts = delta_32;
+ return TRUE;
+}
+
+static ChunkType
+select_chunk_type (GstRtmpChunkStream * cstream, GstBuffer * buffer)
+{
+ GstBuffer *old_buffer = cstream->buffer;
+ GstRtmpMeta *meta, *old_meta;
+
+ g_return_val_if_fail (buffer, -1);
+
+ meta = gst_buffer_get_rtmp_meta (buffer);
+
+ g_return_val_if_fail (meta, -1);
+ g_return_val_if_fail (gst_rtmp_message_type_is_valid (meta->type), -1);
+
+ meta->size = gst_buffer_get_size (buffer);
+ meta->cstream = cstream->id;
+
+ if (!old_buffer) {
+ GST_TRACE ("Picking header 0: no previous header");
+ meta->ts_delta = dts_to_abs_ts (buffer);
+ return CHUNK_TYPE_0;
+ }
+
+ old_meta = gst_buffer_get_rtmp_meta (old_buffer);
+
+ if (old_meta->mstream != meta->mstream) {
+ GST_TRACE ("Picking header 0: stream mismatch; "
+ "want %" G_GUINT32_FORMAT " got %" G_GUINT32_FORMAT,
+ old_meta->mstream, meta->mstream);
+ meta->ts_delta = dts_to_abs_ts (buffer);
+ return CHUNK_TYPE_0;
+ }
+
+ if (!dts_diff_to_delta_ts (old_buffer, buffer, &meta->ts_delta)) {
+ GST_TRACE ("Picking header 0: timestamp delta overflow");
+ meta->ts_delta = dts_to_abs_ts (buffer);
+ return CHUNK_TYPE_0;
+ }
+
+ /* now at least type 1 */
+
+ if (old_meta->type != meta->type) {
+ GST_TRACE ("Picking header 1: type mismatch; want %d got %d",
+ old_meta->type, meta->type);
+ return CHUNK_TYPE_1;
+ }
+
+ if (old_meta->size != meta->size) {
+ GST_TRACE ("Picking header 1: size mismatch; "
+ "want %" G_GUINT32_FORMAT " got %" G_GUINT32_FORMAT,
+ old_meta->size, meta->size);
+ return CHUNK_TYPE_1;
+ }
+
+ /* now at least type 2 */
+
+ if (old_meta->ts_delta != meta->ts_delta) {
+ GST_TRACE ("Picking header 2: timestamp delta mismatch; "
+ "want %" G_GUINT32_FORMAT " got %" G_GUINT32_FORMAT,
+ old_meta->ts_delta, meta->ts_delta);
+ return CHUNK_TYPE_2;
+ }
+
+ /* now at least type 3 */
+
+ GST_TRACE ("Picking header 3");
+ return CHUNK_TYPE_3;
+}
+
+static GstBuffer *
+serialize_next (GstRtmpChunkStream * cstream, guint32 chunk_size,
+ ChunkType type)
+{
+ GstRtmpMeta *meta = cstream->meta;
+ guint8 small_stream_id;
+ gsize header_size = chunk_header_sizes[type], offset;
+ gboolean ext_ts;
+ GstBuffer *ret;
+ GstMapInfo map;
+
+ GST_TRACE ("Serializing a chunk of type %d, offset %" G_GUINT32_FORMAT,
+ type, cstream->offset);
+
+ if (cstream->id < CHUNK_STREAM_MIN_TWOBYTE) {
+ small_stream_id = cstream->id;
+ header_size += 1;
+ } else if (cstream->id < CHUNK_STREAM_MIN_THREEBYTE) {
+ small_stream_id = CHUNK_BYTE_TWOBYTE;
+ header_size += 2;
+ } else {
+ small_stream_id = CHUNK_BYTE_THREEBYTE;
+ header_size += 3;
+ }
+
+ ext_ts = needs_ext_ts (meta);
+ if (ext_ts) {
+ header_size += 4;
+ }
+
+ GST_TRACE ("Allocating buffer, header size %" G_GSIZE_FORMAT, header_size);
+
+ ret = gst_buffer_new_allocate (NULL, header_size, NULL);
+ if (!ret) {
+ GST_ERROR ("Failed to allocate chunk buffer");
+ return NULL;
+ }
+
+ if (!gst_buffer_map (ret, &map, GST_MAP_WRITE)) {
+ GST_ERROR ("Failed to map %" GST_PTR_FORMAT, ret);
+ gst_buffer_unref (ret);
+ return NULL;
+ }
+
+ /* Chunk Basic Header */
+ GST_WRITE_UINT8 (map.data, (type << 6) | small_stream_id);
+ offset = 1;
+
+ switch (small_stream_id) {
+ case CHUNK_BYTE_TWOBYTE:
+ GST_WRITE_UINT8 (map.data + 1, cstream->id - CHUNK_STREAM_MIN_TWOBYTE);
+ offset += 1;
+ break;
+
+ case CHUNK_BYTE_THREEBYTE:
+ GST_WRITE_UINT16_LE (map.data + 1,
+ cstream->id - CHUNK_STREAM_MIN_TWOBYTE);
+ offset += 2;
+ break;
+ }
+
+ switch (type) {
+ case CHUNK_TYPE_0:
+ /* SRSLY: "Message stream ID is stored in little-endian format." */
+ GST_WRITE_UINT32_LE (map.data + offset + 7, meta->mstream);
+ /* no break */
+ case CHUNK_TYPE_1:
+ GST_WRITE_UINT24_BE (map.data + offset + 3, meta->size);
+ GST_WRITE_UINT8 (map.data + offset + 6, meta->type);
+ /* no break */
+ case CHUNK_TYPE_2:
+ GST_WRITE_UINT24_BE (map.data + offset,
+ ext_ts ? 0xffffff : meta->ts_delta);
+ /* no break */
+ case CHUNK_TYPE_3:
+ offset += chunk_header_sizes[type];
+
+ if (ext_ts) {
+ GST_WRITE_UINT32_BE (map.data + offset, meta->ts_delta);
+ offset += 4;
+ }
+ }
+
+ g_assert (offset == header_size);
+ GST_MEMDUMP (">>> chunk header", map.data, offset);
+
+ gst_buffer_unmap (ret, &map);
+
+ GST_BUFFER_OFFSET (ret) = GST_BUFFER_OFFSET_IS_VALID (cstream->buffer) ?
+ GST_BUFFER_OFFSET (cstream->buffer) + cstream->offset : cstream->bytes;
+ GST_BUFFER_OFFSET_END (ret) = GST_BUFFER_OFFSET (ret);
+
+ if (meta->size > 0) {
+ guint32 payload_size = chunk_stream_next_size (cstream, chunk_size);
+
+ GST_TRACE ("Appending %" G_GUINT32_FORMAT " bytes of payload",
+ payload_size);
+
+ gst_buffer_copy_into (ret, cstream->buffer, GST_BUFFER_COPY_MEMORY,
+ cstream->offset, payload_size);
+
+ GST_BUFFER_OFFSET_END (ret) += payload_size;
+ cstream->offset += payload_size;
+ cstream->bytes += payload_size;
+ } else {
+ GST_TRACE ("Chunk has no payload");
+ }
+
+ gst_rtmp_buffer_dump (ret, ">>> chunk");
+
+ return ret;
+}
+
+void
+gst_rtmp_chunk_stream_clear (GstRtmpChunkStream * cstream)
+{
+ g_return_if_fail (cstream);
+ GST_LOG ("Clearing chunk stream %" G_GUINT32_FORMAT, cstream->id);
+ chunk_stream_clear (cstream);
+}
+
+guint32
+gst_rtmp_chunk_stream_parse_id (const guint8 * data, gsize size)
+{
+ guint32 ret;
+
+ if (size < 1) {
+ GST_TRACE ("Not enough bytes to read ID");
+ return 0;
+ }
+
+ ret = GST_READ_UINT8 (data) & CHUNK_BYTE_MASK;
+
+ switch (ret) {
+ case CHUNK_BYTE_TWOBYTE:
+ if (size < 2) {
+ GST_TRACE ("Not enough bytes to read two-byte ID");
+ return 0;
+ }
+
+ ret = GST_READ_UINT8 (data + 1) + CHUNK_STREAM_MIN_TWOBYTE;
+ break;
+
+ case CHUNK_BYTE_THREEBYTE:
+ if (size < 3) {
+ GST_TRACE ("Not enough bytes to read three-byte ID");
+ return 0;
+ }
+
+ ret = GST_READ_UINT16_LE (data + 1) + CHUNK_STREAM_MIN_TWOBYTE;
+ break;
+ }
+
+ GST_TRACE ("Parsed chunk stream ID %" G_GUINT32_FORMAT, ret);
+ return ret;
+}
+
+guint32
+gst_rtmp_chunk_stream_parse_header (GstRtmpChunkStream * cstream,
+ const guint8 * data, gsize size)
+{
+ GstBuffer *buffer;
+ GstRtmpMeta *meta;
+ const guint8 *message_header;
+ guint32 header_size;
+ ChunkType type;
+ gboolean has_abs_timestamp = FALSE;
+
+ g_return_val_if_fail (cstream, 0);
+ g_return_val_if_fail (cstream->id == gst_rtmp_chunk_stream_parse_id (data,
+ size), 0);
+
+ type = GST_READ_UINT8 (data) >> 6;
+ GST_TRACE ("Parsing chunk stream %" G_GUINT32_FORMAT " header type %d",
+ cstream->id, type);
+
+ switch (GST_READ_UINT8 (data) & CHUNK_BYTE_MASK) {
+ case CHUNK_BYTE_TWOBYTE:
+ header_size = 2;
+ break;
+ case CHUNK_BYTE_THREEBYTE:
+ header_size = 3;
+ break;
+ default:
+ header_size = 1;
+ break;
+ }
+
+ message_header = data + header_size;
+ header_size += chunk_header_sizes[type];
+
+ if (cstream->buffer) {
+ buffer = cstream->buffer;
+ meta = cstream->meta;
+ g_assert (meta->cstream == cstream->id);
+ } else {
+ buffer = gst_buffer_new ();
+ GST_BUFFER_DTS (buffer) = 0;
+ GST_BUFFER_OFFSET (buffer) = cstream->bytes;
+ GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+
+ meta = gst_buffer_add_rtmp_meta (buffer);
+ meta->cstream = cstream->id;
+
+ chunk_stream_take_buffer (cstream, buffer);
+ GST_DEBUG ("Starting parse with new %" GST_PTR_FORMAT, buffer);
+ }
+
+ if (size < header_size) {
+ GST_TRACE ("not enough bytes to read header");
+ return header_size;
+ }
+
+ switch (type) {
+ case CHUNK_TYPE_0:
+ has_abs_timestamp = TRUE;
+ /* SRSLY: "Message stream ID is stored in little-endian format." */
+ meta->mstream = GST_READ_UINT32_LE (message_header + 7);
+ /* no break */
+ case CHUNK_TYPE_1:
+ meta->type = GST_READ_UINT8 (message_header + 6);
+ meta->size = GST_READ_UINT24_BE (message_header + 3);
+ /* no break */
+ case CHUNK_TYPE_2:
+ meta->ts_delta = GST_READ_UINT24_BE (message_header);
+ /* no break */
+ case CHUNK_TYPE_3:
+ if (needs_ext_ts (meta)) {
+ guint32 timestamp;
+
+ if (size < header_size + 4) {
+ GST_TRACE ("not enough bytes to read extended timestamp");
+ return header_size + 4;
+ }
+
+ GST_TRACE ("Reading extended timestamp");
+ timestamp = GST_READ_UINT32_BE (data + header_size);
+
+ if (type == 3 && meta->ts_delta != timestamp) {
+ GST_WARNING ("Type 3 extended timestamp does not match expected"
+ " timestamp (want %" G_GUINT32_FORMAT " got %" G_GUINT32_FORMAT
+ "); assuming it's not present", meta->ts_delta, timestamp);
+ } else {
+ meta->ts_delta = timestamp;
+ header_size += 4;
+ }
+ }
+ }
+
+ GST_MEMDUMP ("<<< chunk header", data, header_size);
+
+ if (!chunk_stream_is_open (cstream)) {
+ GstClockTime dts = GST_BUFFER_DTS (buffer);
+ guint32 delta_32, abs_32;
+ gint64 delta_64;
+
+ if (has_abs_timestamp) {
+ abs_32 = meta->ts_delta;
+ delta_32 = abs_32 - dts / GST_MSECOND;
+ } else {
+ delta_32 = meta->ts_delta;
+ abs_32 = delta_32 + dts / GST_MSECOND;
+ }
+
+ GST_TRACE ("Timestamp delta is %" G_GUINT32_FORMAT " (absolute %"
+ G_GUINT32_FORMAT ")", delta_32, abs_32);
+
+ /* emulate signed overflow */
+ delta_64 = delta_32;
+ if (delta_64 > G_MAXINT32) {
+ delta_64 -= G_MAXUINT32;
+ delta_64 -= 1;
+ }
+
+ delta_64 *= GST_MSECOND;
+
+ if (G_LIKELY (delta_64 >= 0)) {
+ /* Normal advancement */
+ } else if (G_LIKELY ((guint64) (-delta_64) <= dts)) {
+ /* In-bounds regression */
+ GST_WARNING ("Timestamp regression: %" GST_STIME_FORMAT,
+ GST_STIME_ARGS (delta_64));
+ } else {
+ /* Out-of-bounds regression */
+ GST_WARNING ("Timestamp regression: %" GST_STIME_FORMAT ", offsetting",
+ GST_STIME_ARGS (delta_64));
+ delta_64 = delta_32 * GST_MSECOND;
+ }
+
+ GST_BUFFER_DTS (buffer) += delta_64;
+
+ GST_TRACE ("Adjusted buffer DTS (%" GST_TIME_FORMAT ") by %"
+ GST_STIME_FORMAT " to %" GST_TIME_FORMAT, GST_TIME_ARGS (dts),
+ GST_STIME_ARGS (delta_64), GST_TIME_ARGS (GST_BUFFER_DTS (buffer)));
+ } else {
+ GST_TRACE ("Message payload already started; not touching timestamp");
+ }
+
+ return header_size;
+}
+
+guint32
+gst_rtmp_chunk_stream_parse_payload (GstRtmpChunkStream * cstream,
+ guint32 chunk_size, guint8 ** data)
+{
+ GstMemory *mem;
+
+ g_return_val_if_fail (cstream, 0);
+ g_return_val_if_fail (cstream->buffer, 0);
+
+ if (!chunk_stream_is_open (cstream)) {
+ guint32 size = cstream->meta->size;
+
+ GST_TRACE ("Allocating buffer, payload size %" G_GUINT32_FORMAT, size);
+
+ mem = gst_allocator_alloc (NULL, size, 0);
+ if (!mem) {
+ GST_ERROR ("Failed to allocate buffer for payload size %"
+ G_GUINT32_FORMAT, size);
+ return 0;
+ }
+
+ gst_buffer_append_memory (cstream->buffer, mem);
+ gst_buffer_map (cstream->buffer, &cstream->map, GST_MAP_WRITE);
+ }
+
+ g_return_val_if_fail (cstream->map.size == cstream->meta->size, 0);
+
+ if (data) {
+ *data = cstream->map.data + cstream->offset;
+ }
+
+ return chunk_stream_next_size (cstream, chunk_size);
+}
+
+guint32
+gst_rtmp_chunk_stream_wrote_payload (GstRtmpChunkStream * cstream,
+ guint32 chunk_size)
+{
+ guint32 size;
+
+ g_return_val_if_fail (cstream, FALSE);
+ g_return_val_if_fail (chunk_stream_is_open (cstream), FALSE);
+
+ size = chunk_stream_next_size (cstream, chunk_size);
+ cstream->offset += size;
+ cstream->bytes += size;
+
+ return chunk_stream_next_size (cstream, chunk_size);
+}
+
+GstBuffer *
+gst_rtmp_chunk_stream_parse_finish (GstRtmpChunkStream * cstream)
+{
+ GstBuffer *buffer, *empty;
+
+ g_return_val_if_fail (cstream, NULL);
+ g_return_val_if_fail (cstream->buffer, NULL);
+
+ buffer = gst_buffer_ref (cstream->buffer);
+ GST_BUFFER_OFFSET_END (buffer) = cstream->bytes;
+
+ gst_rtmp_buffer_dump (buffer, "<<< message");
+
+ chunk_stream_clear (cstream);
+
+ empty = gst_buffer_new ();
+
+ if (!gst_buffer_copy_into (empty, buffer, GST_BUFFER_COPY_META, 0, 0)) {
+ GST_ERROR ("copy_into failed");
+ return NULL;
+ }
+
+ GST_BUFFER_DTS (empty) = GST_BUFFER_DTS (buffer);
+ GST_BUFFER_OFFSET (empty) = GST_BUFFER_OFFSET_END (buffer);
+
+ chunk_stream_take_buffer (cstream, empty);
+
+ return buffer;
+}
+
+GstBuffer *
+gst_rtmp_chunk_stream_serialize_start (GstRtmpChunkStream * cstream,
+ GstBuffer * buffer, guint32 chunk_size)
+{
+ ChunkType type;
+
+ g_return_val_if_fail (cstream, NULL);
+ g_return_val_if_fail (GST_IS_BUFFER (buffer), NULL);
+
+ type = select_chunk_type (cstream, buffer);
+ g_return_val_if_fail (type >= 0, NULL);
+
+ GST_TRACE ("Starting serialization of message %" GST_PTR_FORMAT
+ " into stream %" G_GUINT32_FORMAT, buffer, cstream->id);
+
+ gst_rtmp_buffer_dump (buffer, ">>> message");
+
+ chunk_stream_clear (cstream);
+ chunk_stream_take_buffer (cstream, buffer);
+
+ return serialize_next (cstream, chunk_size, type);
+}
+
+GstBuffer *
+gst_rtmp_chunk_stream_serialize_next (GstRtmpChunkStream * cstream,
+ guint32 chunk_size)
+{
+ g_return_val_if_fail (cstream, NULL);
+ g_return_val_if_fail (cstream->buffer, NULL);
+
+ if (chunk_stream_next_size (cstream, chunk_size) == 0) {
+ GST_TRACE ("Message serialization finished");
+ return NULL;
+ }
+
+ GST_TRACE ("Continuing serialization of message %" GST_PTR_FORMAT
+ " into stream %" G_GUINT32_FORMAT, cstream->buffer, cstream->id);
+
+ return serialize_next (cstream, chunk_size, CHUNK_TYPE_3);
+}
+
+GstRtmpChunkStreams *
+gst_rtmp_chunk_streams_new (void)
+{
+ GstRtmpChunkStreams *cstreams;
+
+ init_debug ();
+
+ cstreams = g_slice_new (GstRtmpChunkStreams);
+ cstreams->array = g_array_new (FALSE, TRUE, sizeof (GstRtmpChunkStream));
+ g_array_set_clear_func (cstreams->array,
+ (GDestroyNotify) gst_rtmp_chunk_stream_clear);
+ return cstreams;
+}
+
+void
+gst_rtmp_chunk_streams_free (gpointer ptr)
+{
+ GstRtmpChunkStreams *cstreams = ptr;
+ g_clear_pointer (&cstreams->array, g_array_unref);
+ g_slice_free (GstRtmpChunkStreams, cstreams);
+}
+
+GstRtmpChunkStream *
+gst_rtmp_chunk_streams_get (GstRtmpChunkStreams * cstreams, guint32 id)
+{
+ GArray *array;
+ GstRtmpChunkStream *entry;
+ guint i;
+
+ g_return_val_if_fail (cstreams, NULL);
+ g_return_val_if_fail (id > CHUNK_BYTE_THREEBYTE, NULL);
+ g_return_val_if_fail (id <= CHUNK_STREAM_MAX_THREEBYTE, NULL);
+
+ array = cstreams->array;
+
+ for (i = 0; i < array->len; i++) {
+ entry = &g_array_index (array, GstRtmpChunkStream, i);
+ if (entry->id == id) {
+ GST_TRACE ("Obtaining chunk stream %" G_GUINT32_FORMAT, id);
+ return entry;
+ }
+ }
+
+ GST_DEBUG ("Allocating chunk stream %" G_GUINT32_FORMAT, id);
+
+ g_array_set_size (array, i + 1);
+ entry = &g_array_index (array, GstRtmpChunkStream, i);
+ entry->id = id;
+ return entry;
+}
diff --git a/gst/rtmp2/rtmp/rtmpchunkstream.h b/gst/rtmp2/rtmp/rtmpchunkstream.h
new file mode 100644
index 000000000..bcc9b7d4a
--- /dev/null
+++ b/gst/rtmp2/rtmp/rtmpchunkstream.h
@@ -0,0 +1,59 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
+ * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef _GST_RTMP_CHUNK_STREAM_H_
+#define _GST_RTMP_CHUNK_STREAM_H_
+
+#include "rtmpmessage.h"
+
+G_BEGIN_DECLS
+
+#define GST_RTMP_DEFAULT_CHUNK_SIZE 128
+#define GST_RTMP_CHUNK_STREAM_PROTOCOL 2
+
+typedef struct _GstRtmpChunkStream GstRtmpChunkStream;
+typedef struct _GstRtmpChunkStreams GstRtmpChunkStreams;
+
+void gst_rtmp_chunk_stream_clear (GstRtmpChunkStream * cstream);
+
+guint32 gst_rtmp_chunk_stream_parse_id (const guint8 * data, gsize size);
+guint32 gst_rtmp_chunk_stream_parse_header (GstRtmpChunkStream * cstream,
+ const guint8 * data, gsize size);
+guint32 gst_rtmp_chunk_stream_parse_payload (GstRtmpChunkStream * cstream,
+ guint32 chunk_size, guint8 ** data);
+guint32 gst_rtmp_chunk_stream_wrote_payload (GstRtmpChunkStream * cstream,
+ guint32 chunk_size);
+GstBuffer * gst_rtmp_chunk_stream_parse_finish (GstRtmpChunkStream * cstream);
+
+GstBuffer * gst_rtmp_chunk_stream_serialize_start (GstRtmpChunkStream * cstream,
+ GstBuffer * buffer, guint32 chunk_size);
+GstBuffer * gst_rtmp_chunk_stream_serialize_next (GstRtmpChunkStream * cstream,
+ guint32 chunk_size);
+
+GstRtmpChunkStreams * gst_rtmp_chunk_streams_new (void);
+void gst_rtmp_chunk_streams_free (gpointer ptr);
+
+GstRtmpChunkStream *
+gst_rtmp_chunk_streams_get (GstRtmpChunkStreams * cstreams, guint32 id);
+
+
+G_END_DECLS
+
+#endif
diff --git a/gst/rtmp2/rtmp/rtmpclient.c b/gst/rtmp2/rtmp/rtmpclient.c
new file mode 100644
index 000000000..657482c10
--- /dev/null
+++ b/gst/rtmp2/rtmp/rtmpclient.c
@@ -0,0 +1,1240 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2013 David Schleef <ds@schleef.org>
+ * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
+ * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+ * Boston, MA 02110-1335, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gst/gst.h>
+#include <gio/gio.h>
+#include <string.h>
+#include "rtmpclient.h"
+#include "rtmphandshake.h"
+#include "rtmpmessage.h"
+#include "rtmputils.h"
+
+GST_DEBUG_CATEGORY_STATIC (gst_rtmp_client_debug_category);
+#define GST_CAT_DEFAULT gst_rtmp_client_debug_category
+
+static void send_connect_done (const gchar * command_name, GPtrArray * args,
+ gpointer user_data);
+static void create_stream_done (const gchar * command_name, GPtrArray * args,
+ gpointer user_data);
+static void on_publish_or_play_status (const gchar * command_name,
+ GPtrArray * args, gpointer user_data);
+
+static void
+init_debug (void)
+{
+ static volatile gsize done = 0;
+ if (g_once_init_enter (&done)) {
+ GST_DEBUG_CATEGORY_INIT (gst_rtmp_client_debug_category,
+ "rtmpclient", 0, "debug category for the rtmp client");
+ GST_DEBUG_REGISTER_FUNCPTR (send_connect_done);
+ GST_DEBUG_REGISTER_FUNCPTR (create_stream_done);
+ GST_DEBUG_REGISTER_FUNCPTR (on_publish_or_play_status);
+ g_once_init_leave (&done, 1);
+ }
+}
+
+static const gchar *scheme_strings[] = {
+ "rtmp",
+ "rtmps",
+ NULL
+};
+
+#define NUM_SCHEMES (G_N_ELEMENTS (scheme_strings) - 1)
+
+GType
+gst_rtmp_scheme_get_type (void)
+{
+ static volatile gsize scheme_type = 0;
+ static const GEnumValue scheme[] = {
+ {GST_RTMP_SCHEME_RTMP, "GST_RTMP_SCHEME_RTMP", "rtmp"},
+ {GST_RTMP_SCHEME_RTMPS, "GST_RTMP_SCHEME_RTMPS", "rtmps"},
+ {0, NULL, NULL},
+ };
+
+ if (g_once_init_enter (&scheme_type)) {
+ GType tmp = g_enum_register_static ("GstRtmpScheme", scheme);
+ g_once_init_leave (&scheme_type, tmp);
+ }
+
+ return (GType) scheme_type;
+}
+
+GstRtmpScheme
+gst_rtmp_scheme_from_string (const gchar * string)
+{
+ if (string) {
+ gint value;
+
+ for (value = 0; value < NUM_SCHEMES; value++) {
+ if (strcmp (scheme_strings[value], string) == 0) {
+ return value;
+ }
+ }
+ }
+
+ return -1;
+}
+
+GstRtmpScheme
+gst_rtmp_scheme_from_uri (const GstUri * uri)
+{
+ const gchar *scheme = gst_uri_get_scheme (uri);
+ if (!scheme) {
+ return GST_RTMP_SCHEME_RTMP;
+ }
+
+ return gst_rtmp_scheme_from_string (scheme);
+}
+
+const gchar *
+gst_rtmp_scheme_to_string (GstRtmpScheme scheme)
+{
+ if (scheme >= 0 && scheme < NUM_SCHEMES) {
+ return scheme_strings[scheme];
+ }
+
+ return "invalid";
+}
+
+const gchar *const *
+gst_rtmp_scheme_get_strings (void)
+{
+ return scheme_strings;
+}
+
+guint
+gst_rtmp_scheme_get_default_port (GstRtmpScheme scheme)
+{
+ switch (scheme) {
+ case GST_RTMP_SCHEME_RTMP:
+ return 1935;
+
+ case GST_RTMP_SCHEME_RTMPS:
+ return 443;
+
+ default:
+ g_return_val_if_reached (0);
+ }
+}
+
+GType
+gst_rtmp_authmod_get_type (void)
+{
+ static volatile gsize authmod_type = 0;
+ static const GEnumValue authmod[] = {
+ {GST_RTMP_AUTHMOD_NONE, "GST_RTMP_AUTHMOD_NONE", "none"},
+ {GST_RTMP_AUTHMOD_AUTO, "GST_RTMP_AUTHMOD_AUTO", "auto"},
+ {GST_RTMP_AUTHMOD_ADOBE, "GST_RTMP_AUTHMOD_ADOBE", "adobe"},
+ {0, NULL, NULL},
+ };
+
+ if (g_once_init_enter (&authmod_type)) {
+ GType tmp = g_enum_register_static ("GstRtmpAuthmod", authmod);
+ g_once_init_leave (&authmod_type, tmp);
+ }
+
+ return (GType) authmod_type;
+}
+
+static const gchar *
+gst_rtmp_authmod_get_nick (GstRtmpAuthmod value)
+{
+ GEnumClass *klass = g_type_class_peek (GST_TYPE_RTMP_AUTHMOD);
+ GEnumValue *ev = klass ? g_enum_get_value (klass, value) : NULL;
+ return ev ? ev->value_nick : "(unknown)";
+}
+
+void
+gst_rtmp_location_copy (GstRtmpLocation * dest, const GstRtmpLocation * src)
+{
+ g_return_if_fail (dest);
+ g_return_if_fail (src);
+
+ dest->scheme = src->scheme;
+ dest->host = g_strdup (src->host);
+ dest->port = src->port;
+ dest->application = g_strdup (src->application);
+ dest->stream = g_strdup (src->stream);
+ dest->username = g_strdup (src->username);
+ dest->password = g_strdup (src->password);
+ dest->secure_token = g_strdup (src->secure_token);
+ dest->authmod = src->authmod;
+ dest->timeout = src->timeout;
+ dest->tls_flags = src->tls_flags;
+ dest->flash_ver = g_strdup (src->flash_ver);
+}
+
+void
+gst_rtmp_location_clear (GstRtmpLocation * location)
+{
+ g_return_if_fail (location);
+
+ g_clear_pointer (&location->host, g_free);
+ location->port = 0;
+ g_clear_pointer (&location->application, g_free);
+ g_clear_pointer (&location->stream, g_free);
+ g_clear_pointer (&location->username, g_free);
+ g_clear_pointer (&location->password, g_free);
+ g_clear_pointer (&location->secure_token, g_free);
+ g_clear_pointer (&location->flash_ver, g_free);
+}
+
+gchar *
+gst_rtmp_location_get_string (const GstRtmpLocation * location,
+ gboolean with_stream)
+{
+ GstUri *uri;
+ gchar *base, *string;
+ const gchar *scheme_string;
+ guint default_port;
+
+ g_return_val_if_fail (location, NULL);
+
+ scheme_string = gst_rtmp_scheme_to_string (location->scheme);
+ default_port = gst_rtmp_scheme_get_default_port (location->scheme);
+
+ uri = gst_uri_new (scheme_string, NULL, location->host,
+ location->port == default_port ? GST_URI_NO_PORT : location->port, "/",
+ NULL, NULL);
+ base = gst_uri_to_string (uri);
+
+ string = g_strconcat (base, location->application, with_stream ? "/" : NULL,
+ location->stream, NULL);
+
+ g_free (base);
+ gst_uri_unref (uri);
+
+ return string;
+}
+
+static void socket_connect (GTask * task);
+static void socket_connect_done (GObject * source, GAsyncResult * result,
+ gpointer user_data);
+static void handshake_done (GObject * source, GAsyncResult * result,
+ gpointer user_data);
+static void send_connect (GTask * task);
+static void send_secure_token_response (GTask * task,
+ GstRtmpConnection * connection, const gchar * challenge);
+static void connection_error (GstRtmpConnection * connection,
+ gpointer user_data);
+
+#define DEFAULT_TIMEOUT 5
+
+typedef struct
+{
+ GstRtmpLocation location;
+ gchar *auth_query;
+ GstRtmpConnection *connection;
+ gulong error_handler_id;
+} ConnectTaskData;
+
+static ConnectTaskData *
+connect_task_data_new (const GstRtmpLocation * location)
+{
+ ConnectTaskData *data = g_slice_new0 (ConnectTaskData);
+ gst_rtmp_location_copy (&data->location, location);
+ return data;
+}
+
+static void
+connect_task_data_free (gpointer ptr)
+{
+ ConnectTaskData *data = ptr;
+ gst_rtmp_location_clear (&data->location);
+ g_clear_pointer (&data->auth_query, g_free);
+ if (data->error_handler_id) {
+ g_signal_handler_disconnect (data->connection, data->error_handler_id);
+ }
+ g_clear_object (&data->connection);
+ g_slice_free (ConnectTaskData, data);
+}
+
+static GRegex *auth_regex = NULL;
+
+void
+gst_rtmp_client_connect_async (const GstRtmpLocation * location,
+ GCancellable * cancellable, GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ GTask *task;
+
+ init_debug ();
+
+ if (g_once_init_enter (&auth_regex)) {
+ GRegex *re = g_regex_new ("\\[ *AccessManager.Reject *\\] *: *"
+ "\\[ *authmod=(?<authmod>.*?) *\\] *: *"
+ "(?<query>\\?.*)\\Z", G_REGEX_DOTALL, 0, NULL);
+ g_once_init_leave (&auth_regex, re);
+ }
+
+ task = g_task_new (NULL, cancellable, callback, user_data);
+
+ g_task_set_task_data (task, connect_task_data_new (location),
+ connect_task_data_free);
+
+ socket_connect (task);
+}
+
+static void
+socket_connect (GTask * task)
+{
+ ConnectTaskData *data = g_task_get_task_data (task);
+ GSocketConnectable *addr;
+ GSocketClient *socket_client;
+
+ if (data->location.timeout < 0) {
+ data->location.timeout = DEFAULT_TIMEOUT;
+ }
+
+ if (data->error_handler_id) {
+ g_signal_handler_disconnect (data->connection, data->error_handler_id);
+ data->error_handler_id = 0;
+ }
+
+ if (data->connection) {
+ gst_rtmp_connection_close (data->connection);
+ g_clear_object (&data->connection);
+ }
+
+ if (!data->location.host) {
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
+ "Host is not set");
+ g_object_unref (task);
+ return;
+ }
+
+ if (!data->location.port) {
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
+ "Port is not set");
+ g_object_unref (task);
+ return;
+ }
+
+ socket_client = g_socket_client_new ();
+ g_socket_client_set_timeout (socket_client, data->location.timeout);
+
+ switch (data->location.scheme) {
+ case GST_RTMP_SCHEME_RTMP:
+ break;
+
+ case GST_RTMP_SCHEME_RTMPS:
+ GST_DEBUG ("Configuring TLS, validation flags 0x%02x",
+ data->location.tls_flags);
+ g_socket_client_set_tls (socket_client, TRUE);
+ g_socket_client_set_tls_validation_flags (socket_client,
+ data->location.tls_flags);
+ break;
+
+ default:
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
+ "Invalid scheme ID %d", data->location.scheme);
+ g_object_unref (socket_client);
+ g_object_unref (task);
+ return;
+ }
+
+ addr = g_network_address_new (data->location.host, data->location.port);
+
+ GST_DEBUG ("Starting socket connection");
+
+ g_socket_client_connect_async (socket_client, addr,
+ g_task_get_cancellable (task), socket_connect_done, task);
+ g_object_unref (addr);
+ g_object_unref (socket_client);
+}
+
+static void
+socket_connect_done (GObject * source, GAsyncResult * result,
+ gpointer user_data)
+{
+ GSocketClient *socket_client = G_SOCKET_CLIENT (source);
+ GSocketConnection *socket_connection;
+ GTask *task = user_data;
+ GError *error = NULL;
+
+ socket_connection =
+ g_socket_client_connect_finish (socket_client, result, &error);
+
+ if (g_task_return_error_if_cancelled (task)) {
+ GST_DEBUG ("Socket connection was cancelled");
+ g_object_unref (task);
+ return;
+ }
+
+ if (socket_connection == NULL) {
+ GST_ERROR ("Socket connection error");
+ g_task_return_error (task, error);
+ g_object_unref (task);
+ return;
+ }
+
+ GST_DEBUG ("Socket connection established");
+
+ gst_rtmp_client_handshake (G_IO_STREAM (socket_connection), FALSE,
+ g_task_get_cancellable (task), handshake_done, task);
+ g_object_unref (socket_connection);
+}
+
+
+static void
+handshake_done (GObject * source, GAsyncResult * result, gpointer user_data)
+{
+ GIOStream *stream = G_IO_STREAM (source);
+ GSocketConnection *socket_connection = G_SOCKET_CONNECTION (stream);
+ GTask *task = user_data;
+ ConnectTaskData *data = g_task_get_task_data (task);
+ GError *error = NULL;
+ gboolean res;
+
+ res = gst_rtmp_client_handshake_finish (stream, result, &error);
+ if (!res) {
+ g_io_stream_close_async (stream, G_PRIORITY_DEFAULT, NULL, NULL, NULL);
+ g_task_return_error (task, error);
+ g_object_unref (task);
+ return;
+ }
+
+ data->connection = gst_rtmp_connection_new (socket_connection);
+ data->error_handler_id = g_signal_connect (data->connection,
+ "error", G_CALLBACK (connection_error), task);
+
+ send_connect (task);
+}
+
+static void
+connection_error (GstRtmpConnection * connection, gpointer user_data)
+{
+ GTask *task = user_data;
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
+ "error during connection attempt");
+}
+
+static gchar *
+do_adobe_auth (const gchar * username, const gchar * password,
+ const gchar * salt, const gchar * opaque, const gchar * challenge)
+{
+ guint8 hash[16]; /* MD5 digest */
+ gsize hashlen = sizeof hash;
+ gchar *challenge2, *auth_query;
+ GChecksum *md5;
+
+ g_return_val_if_fail (username, NULL);
+ g_return_val_if_fail (password, NULL);
+ g_return_val_if_fail (salt, NULL);
+
+ md5 = g_checksum_new (G_CHECKSUM_MD5);
+ g_checksum_update (md5, (guchar *) username, -1);
+ g_checksum_update (md5, (guchar *) salt, -1);
+ g_checksum_update (md5, (guchar *) password, -1);
+
+ g_checksum_get_digest (md5, hash, &hashlen);
+ g_warn_if_fail (hashlen == sizeof hash);
+
+ {
+ gchar *hashstr = g_base64_encode ((guchar *) hash, sizeof hash);
+ g_checksum_reset (md5);
+ g_checksum_update (md5, (guchar *) hashstr, -1);
+ g_free (hashstr);
+ }
+
+ if (opaque)
+ g_checksum_update (md5, (guchar *) opaque, -1);
+ else if (challenge)
+ g_checksum_update (md5, (guchar *) challenge, -1);
+
+ challenge2 = g_strdup_printf ("%08x", g_random_int ());
+ g_checksum_update (md5, (guchar *) challenge2, -1);
+
+ g_checksum_get_digest (md5, hash, &hashlen);
+ g_warn_if_fail (hashlen == sizeof hash);
+
+ {
+ gchar *hashstr = g_base64_encode ((guchar *) hash, sizeof hash);
+
+ if (opaque) {
+ auth_query =
+ g_strdup_printf
+ ("authmod=%s&user=%s&challenge=%s&response=%s&opaque=%s", "adobe",
+ username, challenge2, hashstr, opaque);
+ } else {
+ auth_query =
+ g_strdup_printf ("authmod=%s&user=%s&challenge=%s&response=%s",
+ "adobe", username, challenge2, hashstr);
+ }
+ g_free (hashstr);
+ }
+
+ g_checksum_free (md5);
+ g_free (challenge2);
+
+ return auth_query;
+}
+
+static void
+send_connect (GTask * task)
+{
+ ConnectTaskData *data = g_task_get_task_data (task);
+ GstAmfNode *node;
+ const gchar *app, *flash_ver;
+ gchar *uri, *appstr = NULL, *uristr = NULL;
+
+ node = gst_amf_node_new_object ();
+ app = data->location.application;
+ flash_ver = data->location.flash_ver;
+ uri = gst_rtmp_location_get_string (&data->location, FALSE);
+
+ if (!app) {
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
+ "Application is not set");
+ g_object_unref (task);
+ goto out;
+ }
+
+ if (!flash_ver) {
+ flash_ver = "LNX 10,0,32,18";
+ }
+
+ if (data->auth_query) {
+ const gchar *query = data->auth_query;
+ appstr = g_strdup_printf ("%s?%s", app, query);
+ uristr = g_strdup_printf ("%s?%s", uri, query);
+ } else if (data->location.authmod == GST_RTMP_AUTHMOD_ADOBE) {
+ const gchar *user = data->location.username;
+ const gchar *authmod = "adobe";
+
+ if (!user) {
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
+ "no username for adobe authentication");
+ g_object_unref (task);
+ goto out;
+ }
+
+ if (!data->location.password) {
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
+ "no password for adobe authentication");
+ g_object_unref (task);
+ goto out;
+ }
+
+ appstr = g_strdup_printf ("%s?authmod=%s&user=%s", app, authmod, user);
+ uristr = g_strdup_printf ("%s?authmod=%s&user=%s", uri, authmod, user);
+ } else {
+ appstr = g_strdup (app);
+ uristr = g_strdup (uri);
+ }
+
+ gst_amf_node_append_field_take_string (node, "app", appstr, -1);
+ gst_amf_node_append_field_take_string (node, "tcUrl", uristr, -1);
+ gst_amf_node_append_field_string (node, "type", "nonprivate", -1);
+ gst_amf_node_append_field_string (node, "flashVer", flash_ver, -1);
+
+ gst_rtmp_connection_send_command (data->connection, send_connect_done,
+ task, 0, "connect", node, NULL);
+
+out:
+ gst_amf_node_free (node);
+ g_free (uri);
+}
+
+static void
+send_connect_done (const gchar * command_name, GPtrArray * args,
+ gpointer user_data)
+{
+ GTask *task = G_TASK (user_data);
+ ConnectTaskData *data = g_task_get_task_data (task);
+ const GstAmfNode *node, *optional_args;
+ const gchar *code;
+
+ if (g_task_return_error_if_cancelled (task)) {
+ g_object_unref (task);
+ return;
+ }
+
+ if (!args) {
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
+ "connect failed: %s", command_name);
+ g_object_unref (task);
+ return;
+ }
+
+ if (args->len < 2) {
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
+ "connect failed; not enough return arguments");
+ g_object_unref (task);
+ return;
+ }
+
+ optional_args = g_ptr_array_index (args, 1);
+
+ node = gst_amf_node_get_field (optional_args, "code");
+ if (!node) {
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
+ "result code missing from connect cmd result");
+ g_object_unref (task);
+ return;
+ }
+
+ code = gst_amf_node_peek_string (node, NULL);
+ GST_INFO ("connect result: %s", GST_STR_NULL (code));
+
+ if (g_str_equal (code, "NetConnection.Connect.Success")) {
+ node = gst_amf_node_get_field (optional_args, "secureToken");
+ send_secure_token_response (task, data->connection,
+ node ? gst_amf_node_peek_string (node, NULL) : NULL);
+ return;
+ }
+
+ if (g_str_equal (code, "NetConnection.Connect.Rejected")) {
+ GstRtmpAuthmod authmod = data->location.authmod;
+ GMatchInfo *match_info;
+ const gchar *desc;
+ GstUri *query;
+
+ node = gst_amf_node_get_field (optional_args, "description");
+ if (!node) {
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
+ "Connect rejected; no description");
+ g_object_unref (task);
+ return;
+ }
+
+ desc = gst_amf_node_peek_string (node, NULL);
+ GST_DEBUG ("connect result desc: %s", GST_STR_NULL (desc));
+
+ if (authmod == GST_RTMP_AUTHMOD_AUTO && strstr (desc, "code=403 need auth")) {
+ if (strstr (desc, "authmod=adobe")) {
+ GST_INFO ("Reconnecting with authmod=adobe");
+ data->location.authmod = GST_RTMP_AUTHMOD_ADOBE;
+ socket_connect (task);
+ return;
+ }
+
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
+ "unhandled authentication mode: %s", desc);
+ g_object_unref (task);
+ return;
+ }
+
+ if (!g_regex_match (auth_regex, desc, 0, &match_info)) {
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
+ "failed to parse auth rejection: %s", desc);
+ g_object_unref (task);
+ return;
+ }
+
+ {
+ gchar *authmod_str = g_match_info_fetch_named (match_info, "authmod");
+ gchar *query_str = g_match_info_fetch_named (match_info, "query");
+ gboolean matches;
+
+ GST_INFO ("regex parsed auth: authmod=%s, query=%s",
+ GST_STR_NULL (authmod_str), GST_STR_NULL (query_str));
+ g_match_info_free (match_info);
+
+ switch (authmod) {
+ case GST_RTMP_AUTHMOD_ADOBE:
+ matches = g_str_equal (authmod_str, "adobe");
+ break;
+
+ default:
+ matches = FALSE;
+ break;
+ }
+
+ if (!matches) {
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
+ "server uses wrong authentication mode '%s'; expected %s",
+ GST_STR_NULL (authmod_str), gst_rtmp_authmod_get_nick (authmod));
+ g_object_unref (task);
+ g_free (authmod_str);
+ g_free (query_str);
+ return;
+ }
+ g_free (authmod_str);
+
+ query = gst_uri_from_string (query_str);
+ if (!query) {
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
+ "failed to parse authentication query '%s'",
+ GST_STR_NULL (query_str));
+ g_object_unref (task);
+ g_free (query_str);
+ return;
+ }
+ g_free (query_str);
+ }
+
+ {
+ const gchar *reason = gst_uri_get_query_value (query, "reason");
+
+ if (g_str_equal (reason, "authfailed")) {
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
+ "authentication failed! wrong credentials?");
+ g_object_unref (task);
+ gst_uri_unref (query);
+ return;
+ }
+
+ if (!g_str_equal (reason, "needauth")) {
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
+ "unhandled rejection reason '%s'", reason ? reason : "");
+ g_object_unref (task);
+ gst_uri_unref (query);
+ return;
+ }
+ }
+
+ g_warn_if_fail (!data->auth_query);
+ data->auth_query = do_adobe_auth (data->location.username,
+ data->location.password, gst_uri_get_query_value (query, "salt"),
+ gst_uri_get_query_value (query, "opaque"),
+ gst_uri_get_query_value (query, "challenge"));
+
+ gst_uri_unref (query);
+
+ if (!data->auth_query) {
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
+ "couldn't generate adobe style authentication query");
+ g_object_unref (task);
+ return;
+ }
+
+ socket_connect (task);
+ return;
+ }
+
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
+ "unhandled connect result code: %s", GST_STR_NULL (code));
+ g_object_unref (task);
+}
+
+/* prep key: pack 1st 16 chars into 4 LittleEndian ints */
+static void
+rtmp_tea_decode_prep_key (const gchar * key, guint32 out[4])
+{
+ gchar copy[17];
+
+ g_return_if_fail (key);
+ g_return_if_fail (out);
+
+ /* ensure we can read 16 bytes */
+ strncpy (copy, key, 16);
+ /* placate GCC 8 -Wstringop-truncation */
+ copy[16] = 0;
+
+ out[0] = GST_READ_UINT32_LE (copy);
+ out[1] = GST_READ_UINT32_LE (copy + 4);
+ out[2] = GST_READ_UINT32_LE (copy + 8);
+ out[3] = GST_READ_UINT32_LE (copy + 12);
+}
+
+/* prep text: hex2bin, each 8 digits -> 4 chars -> 1 uint32 */
+static GArray *
+rtmp_tea_decode_prep_text (const gchar * text)
+{
+ GArray *arr;
+ gsize len, i;
+
+ g_return_val_if_fail (text, NULL);
+
+ len = strlen (text);
+ arr = g_array_sized_new (TRUE, TRUE, 4, (len + 7) / 8);
+
+ for (i = 0; i < len; i += 8) {
+ gchar copy[9];
+ guchar chars[4];
+ gsize j;
+ guint32 val;
+
+ /* ensure we can read 8 bytes */
+ strncpy (copy, text + i, 8);
+ /* placate GCC 8 -Wstringop-truncation */
+ copy[8] = 0;
+
+ for (j = 0; j < 4; j++) {
+ gint hi, lo;
+
+ hi = g_ascii_xdigit_value (copy[2 * j]);
+ lo = g_ascii_xdigit_value (copy[2 * j + 1]);
+
+ chars[j] = (hi > 0 ? hi << 4 : 0) + (lo > 0 ? lo : 0);
+ }
+
+ val = GST_READ_UINT32_LE (chars);
+ g_array_append_val (arr, val);
+ }
+
+ return arr;
+}
+
+/* return text from uint32s to chars */
+static gchar *
+rtmp_tea_decode_return_text (GArray * arr)
+{
+#if G_BYTE_ORDER != G_LITTLE_ENDIAN
+ gsize i;
+
+ g_return_val_if_fail (arr, NULL);
+
+ for (i = 0; i < arr->len; i++) {
+ guint32 *val = &g_array_index (arr, guint32, i);
+ *val = GUINT32_TO_LE (*val);
+ }
+#endif
+
+ /* array is alredy zero-terminated */
+ return g_array_free (arr, FALSE);
+}
+
+/* http://www.movable-type.co.uk/scripts/tea-block.html */
+static void
+rtmp_tea_decode_btea (GArray * text, guint32 key[4])
+{
+ guint32 *v, n, *k;
+ guint32 z, y, sum = 0, e, DELTA = 0x9e3779b9;
+ guint32 p, q;
+
+ g_return_if_fail (text);
+ g_return_if_fail (text->len > 0);
+ g_return_if_fail (key);
+
+ v = (guint32 *) text->data;
+ n = text->len;
+ k = key;
+ z = v[n - 1];
+ y = v[0];
+ q = 6 + 52 / n;
+ sum = q * DELTA;
+
+#define MX ((z>>5^y<<2) + (y>>3^z<<4)) ^ ((sum^y) + (k[(p&3)^e]^z));
+
+ while (sum != 0) {
+ e = sum >> 2 & 3;
+ for (p = n - 1; p > 0; p--)
+ z = v[p - 1], y = v[p] -= MX;
+ z = v[n - 1];
+ y = v[0] -= MX;
+ sum -= DELTA;
+ }
+
+#undef MX
+}
+
+/* taken from librtmp */
+static gchar *
+rtmp_tea_decode (const gchar * bin_key, const gchar * hex_text)
+{
+ guint32 key[4];
+ GArray *text;
+
+ rtmp_tea_decode_prep_key (bin_key, key);
+ text = rtmp_tea_decode_prep_text (hex_text);
+ rtmp_tea_decode_btea (text, key);
+ return rtmp_tea_decode_return_text (text);
+}
+
+static void
+send_secure_token_response (GTask * task, GstRtmpConnection * connection,
+ const gchar * challenge)
+{
+ ConnectTaskData *data = g_task_get_task_data (task);
+ if (challenge) {
+ GstAmfNode *node1;
+ GstAmfNode *node2;
+ gchar *response;
+
+ if (!data->location.secure_token || !data->location.secure_token[0]) {
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
+ "server requires secure token authentication");
+ g_object_unref (task);
+ return;
+ }
+
+ response = rtmp_tea_decode (data->location.secure_token, challenge);
+
+ GST_DEBUG ("response: %s", response);
+
+ node1 = gst_amf_node_new_null ();
+ node2 = gst_amf_node_new_take_string (response, -1);
+ gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
+ "secureTokenResponse", node1, node2, NULL);
+ gst_amf_node_free (node1);
+ gst_amf_node_free (node2);
+ }
+
+ g_signal_handler_disconnect (connection, data->error_handler_id);
+ data->error_handler_id = 0;
+
+ g_task_return_pointer (task, g_object_ref (connection),
+ gst_rtmp_connection_close_and_unref);
+ g_object_unref (task);
+}
+
+GstRtmpConnection *
+gst_rtmp_client_connect_finish (GAsyncResult * result, GError ** error)
+{
+ GTask *task = G_TASK (result);
+ return g_task_propagate_pointer (task, error);
+}
+
+static void send_create_stream (GTask * task);
+static void send_publish_or_play (GTask * task);
+
+typedef struct
+{
+ GstRtmpConnection *connection;
+ gulong error_handler_id;
+ gchar *stream;
+ gboolean publish;
+ guint32 id;
+} StreamTaskData;
+
+static StreamTaskData *
+stream_task_data_new (GstRtmpConnection * connection, const gchar * stream,
+ gboolean publish)
+{
+ StreamTaskData *data = g_slice_new0 (StreamTaskData);
+ data->connection = g_object_ref (connection);
+ data->stream = g_strdup (stream);
+ data->publish = publish;
+ return data;
+}
+
+static void
+stream_task_data_free (gpointer ptr)
+{
+ StreamTaskData *data = ptr;
+ g_clear_pointer (&data->stream, g_free);
+ if (data->error_handler_id) {
+ g_signal_handler_disconnect (data->connection, data->error_handler_id);
+ }
+ g_clear_object (&data->connection);
+ g_slice_free (StreamTaskData, data);
+}
+
+static void
+start_stream (GstRtmpConnection * connection, const gchar * stream,
+ gboolean publish, GCancellable * cancellable,
+ GAsyncReadyCallback callback, gpointer user_data)
+{
+ GTask *task;
+ StreamTaskData *data;
+
+ init_debug ();
+
+ task = g_task_new (connection, cancellable, callback, user_data);
+
+ if (!stream) {
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
+ "Stream is not set");
+ g_object_unref (task);
+ return;
+ }
+
+ data = stream_task_data_new (connection, stream, publish);
+ g_task_set_task_data (task, data, stream_task_data_free);
+
+ data->error_handler_id = g_signal_connect (connection,
+ "error", G_CALLBACK (connection_error), task);
+
+ send_create_stream (task);
+}
+
+void
+gst_rtmp_client_start_publish_async (GstRtmpConnection * connection,
+ const gchar * stream, GCancellable * cancellable,
+ GAsyncReadyCallback callback, gpointer user_data)
+{
+ start_stream (connection, stream, TRUE, cancellable, callback, user_data);
+}
+
+void
+gst_rtmp_client_start_play_async (GstRtmpConnection * connection,
+ const gchar * stream, GCancellable * cancellable,
+ GAsyncReadyCallback callback, gpointer user_data)
+{
+ start_stream (connection, stream, FALSE, cancellable, callback, user_data);
+}
+
+static void
+send_set_buffer_length (GstRtmpConnection * connection, guint32 stream,
+ guint32 ms)
+{
+ GstRtmpUserControl uc = {
+ .type = GST_RTMP_USER_CONTROL_TYPE_SET_BUFFER_LENGTH,
+ .param = stream,
+ .param2 = ms,
+ };
+
+ gst_rtmp_connection_queue_message (connection,
+ gst_rtmp_message_new_user_control (&uc));
+}
+
+static void
+send_create_stream (GTask * task)
+{
+ GstRtmpConnection *connection = g_task_get_source_object (task);
+ StreamTaskData *data = g_task_get_task_data (task);
+ GstAmfNode *command_object, *stream_name;
+
+ command_object = gst_amf_node_new_null ();
+ stream_name = gst_amf_node_new_string (data->stream, -1);
+
+ if (data->publish) {
+ /* Not part of RTMP documentation */
+ GST_DEBUG ("Releasing stream '%s'", data->stream);
+ gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
+ "releaseStream", command_object, stream_name, NULL);
+ gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
+ "FCPublish", command_object, stream_name, NULL);
+ } else {
+ /* Matches librtmp */
+ gst_rtmp_connection_request_window_size (connection, 2500000);
+ send_set_buffer_length (connection, 0, 300);
+ }
+
+ GST_INFO ("Creating stream '%s'", data->stream);
+ gst_rtmp_connection_send_command (connection, create_stream_done, task, 0,
+ "createStream", command_object, NULL);
+
+ gst_amf_node_free (stream_name);
+ gst_amf_node_free (command_object);
+}
+
+static void
+create_stream_done (const gchar * command_name, GPtrArray * args,
+ gpointer user_data)
+{
+ GTask *task = G_TASK (user_data);
+ StreamTaskData *data = g_task_get_task_data (task);
+ GstAmfNode *result;
+
+ if (g_task_return_error_if_cancelled (task)) {
+ g_object_unref (task);
+ return;
+ }
+
+ if (!args) {
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
+ "createStream failed: %s", command_name);
+ g_object_unref (task);
+ return;
+ }
+
+ if (args->len < 2) {
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
+ "createStream failed; not enough return arguments");
+ g_object_unref (task);
+ return;
+ }
+
+ result = g_ptr_array_index (args, 1);
+ if (gst_amf_node_get_type (result) != GST_AMF_TYPE_NUMBER) {
+ GString *error_dump = g_string_new ("");
+
+ gst_amf_node_dump (result, -1, error_dump);
+
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
+ "createStream failed: %s", error_dump->str);
+ g_object_unref (task);
+
+ g_string_free (error_dump, TRUE);
+ return;
+ }
+
+ data->id = gst_amf_node_get_number (result);
+ GST_INFO ("createStream success, stream_id=%" G_GUINT32_FORMAT, data->id);
+
+ if (data->id == 0) {
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_DATA,
+ "createStream returned ID 0");
+ g_object_unref (task);
+ return;
+ }
+
+ send_publish_or_play (task);
+}
+
+static void
+send_publish_or_play (GTask * task)
+{
+ GstRtmpConnection *connection = g_task_get_source_object (task);
+ StreamTaskData *data = g_task_get_task_data (task);
+ const gchar *command = data->publish ? "publish" : "play";
+ GstAmfNode *command_object, *stream_name, *argument;
+
+ command_object = gst_amf_node_new_null ();
+ stream_name = gst_amf_node_new_string (data->stream, -1);
+
+ if (data->publish) {
+ /* publishing type (live, record, append) */
+ argument = gst_amf_node_new_string ("live", -1);
+ } else {
+ /* "Start" argument: -2 = live or recording, -1 = only live
+ 0 or positive = only recording, seek to X seconds */
+ argument = gst_amf_node_new_number (-2);
+ }
+
+ GST_INFO ("Sending %s for '%s' on stream %" G_GUINT32_FORMAT,
+ command, data->stream, data->id);
+ gst_rtmp_connection_expect_command (connection, on_publish_or_play_status,
+ task, data->id, "onStatus");
+ gst_rtmp_connection_send_command (connection, NULL, NULL, data->id,
+ command, command_object, stream_name, argument, NULL);
+
+ if (!data->publish) {
+ /* Matches librtmp */
+ send_set_buffer_length (connection, data->id, 30000);
+ }
+
+ gst_amf_node_free (command_object);
+ gst_amf_node_free (stream_name);
+ gst_amf_node_free (argument);
+}
+
+static void
+on_publish_or_play_status (const gchar * command_name, GPtrArray * args,
+ gpointer user_data)
+{
+ GTask *task = G_TASK (user_data);
+ GstRtmpConnection *connection = g_task_get_source_object (task);
+ StreamTaskData *data = g_task_get_task_data (task);
+ const gchar *command = data->publish ? "publish" : "play", *code = NULL;
+ GString *info_dump;
+
+ if (g_task_return_error_if_cancelled (task)) {
+ g_object_unref (task);
+ return;
+ }
+
+ if (!args) {
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
+ "%s failed: %s", command, command_name);
+ g_object_unref (task);
+ return;
+ }
+
+ if (args->len < 2) {
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
+ "%s failed; not enough return arguments", command);
+ g_object_unref (task);
+ return;
+ }
+
+ {
+ const GstAmfNode *info_object, *code_object;
+ info_object = g_ptr_array_index (args, 1);
+ code_object = gst_amf_node_get_field (info_object, "code");
+
+ if (code_object) {
+ code = gst_amf_node_peek_string (code_object, NULL);
+ }
+
+ info_dump = g_string_new ("");
+ gst_amf_node_dump (info_object, -1, info_dump);
+ }
+
+ if (data->publish) {
+ if (g_strcmp0 (code, "NetStream.Publish.Start") == 0) {
+ GST_INFO ("publish success: %s", info_dump->str);
+ g_task_return_boolean (task, TRUE);
+ goto out;
+ }
+
+ if (g_strcmp0 (code, "NetStream.Publish.BadName") == 0) {
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_EXISTS,
+ "publish denied: stream already exists: %s", info_dump->str);
+ goto out;
+ }
+
+ if (g_strcmp0 (code, "NetStream.Publish.Denied") == 0) {
+ g_task_return_new_error (task, G_IO_ERROR,
+ G_IO_ERROR_PERMISSION_DENIED, "publish denied: %s", info_dump->str);
+ goto out;
+ }
+ } else {
+ if (g_strcmp0 (code, "NetStream.Play.Start") == 0 ||
+ g_strcmp0 (code, "NetStream.Play.Reset") == 0) {
+ GST_INFO ("play success: %s", info_dump->str);
+ g_task_return_boolean (task, TRUE);
+ goto out;
+ }
+
+ if (g_strcmp0 (code, "NetStream.Play.StreamNotFound") == 0) {
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_FOUND,
+ "play denied: stream not found: %s", info_dump->str);
+ goto out;
+ }
+ }
+
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
+ "unhandled %s result: %s", command, info_dump->str);
+
+out:
+ g_string_free (info_dump, TRUE);
+
+ g_signal_handler_disconnect (connection, data->error_handler_id);
+ data->error_handler_id = 0;
+
+ g_object_unref (task);
+}
+
+static gboolean
+start_stream_finish (GstRtmpConnection * connection,
+ GAsyncResult * result, guint32 * stream_id, GError ** error)
+{
+ GTask *task;
+ StreamTaskData *data;
+
+ g_return_val_if_fail (g_task_is_valid (result, connection), FALSE);
+
+ task = G_TASK (result);
+
+ if (!g_task_propagate_boolean (G_TASK (result), error)) {
+ return FALSE;
+ }
+
+ data = g_task_get_task_data (task);
+
+ if (stream_id) {
+ *stream_id = data->id;
+ }
+
+ return TRUE;
+}
+
+gboolean
+gst_rtmp_client_start_publish_finish (GstRtmpConnection * connection,
+ GAsyncResult * result, guint32 * stream_id, GError ** error)
+{
+ return start_stream_finish (connection, result, stream_id, error);
+}
+
+gboolean
+gst_rtmp_client_start_play_finish (GstRtmpConnection * connection,
+ GAsyncResult * result, guint32 * stream_id, GError ** error)
+{
+ return start_stream_finish (connection, result, stream_id, error);
+}
diff --git a/gst/rtmp2/rtmp/rtmpclient.h b/gst/rtmp2/rtmp/rtmpclient.h
new file mode 100644
index 000000000..ec75d43f9
--- /dev/null
+++ b/gst/rtmp2/rtmp/rtmpclient.h
@@ -0,0 +1,101 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
+ * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef _GST_RTMP_CLIENT_H_
+#define _GST_RTMP_CLIENT_H_
+
+#include "rtmpconnection.h"
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_RTMP_SCHEME (gst_rtmp_scheme_get_type ())
+
+typedef enum
+{
+ GST_RTMP_SCHEME_RTMP = 0,
+ GST_RTMP_SCHEME_RTMPS,
+} GstRtmpScheme;
+
+GType gst_rtmp_scheme_get_type (void);
+
+GstRtmpScheme gst_rtmp_scheme_from_string (const gchar * string);
+GstRtmpScheme gst_rtmp_scheme_from_uri (const GstUri * uri);
+const gchar * gst_rtmp_scheme_to_string (GstRtmpScheme scheme);
+const gchar * const * gst_rtmp_scheme_get_strings (void);
+guint gst_rtmp_scheme_get_default_port (GstRtmpScheme scheme);
+
+
+
+#define GST_TYPE_RTMP_AUTHMOD (gst_rtmp_authmod_get_type ())
+
+typedef enum
+{
+ GST_RTMP_AUTHMOD_NONE = 0,
+ GST_RTMP_AUTHMOD_AUTO,
+ GST_RTMP_AUTHMOD_ADOBE,
+} GstRtmpAuthmod;
+
+GType gst_rtmp_authmod_get_type (void);
+
+
+
+typedef struct _GstRtmpLocation
+{
+ GstRtmpScheme scheme;
+ gchar *host;
+ guint port;
+ gchar *application;
+ gchar *stream;
+ gchar *username;
+ gchar *password;
+ gchar *secure_token;
+ GstRtmpAuthmod authmod;
+ gint timeout;
+ GTlsCertificateFlags tls_flags;
+ gchar *flash_ver;
+} GstRtmpLocation;
+
+void gst_rtmp_location_copy (GstRtmpLocation * dest,
+ const GstRtmpLocation * src);
+void gst_rtmp_location_clear (GstRtmpLocation * uri);
+gchar *gst_rtmp_location_get_string (const GstRtmpLocation * location,
+ gboolean with_stream);
+
+
+
+void gst_rtmp_client_connect_async (const GstRtmpLocation * location,
+ GCancellable * cancellable, GAsyncReadyCallback callback,
+ gpointer user_data);
+GstRtmpConnection *gst_rtmp_client_connect_finish (GAsyncResult * result,
+ GError ** error);
+void gst_rtmp_client_start_publish_async (GstRtmpConnection * connection,
+ const gchar * stream, GCancellable * cancellable,
+ GAsyncReadyCallback callback, gpointer user_data);
+gboolean gst_rtmp_client_start_publish_finish (GstRtmpConnection * connection,
+ GAsyncResult * result, guint * stream_id, GError ** error);
+
+void gst_rtmp_client_start_play_async (GstRtmpConnection * connection,
+ const gchar * stream, GCancellable * cancellable,
+ GAsyncReadyCallback callback, gpointer user_data);
+gboolean gst_rtmp_client_start_play_finish (GstRtmpConnection * connection,
+ GAsyncResult * result, guint * stream_id, GError ** error);
+
+G_END_DECLS
+#endif
diff --git a/gst/rtmp2/rtmp/rtmpconnection.c b/gst/rtmp2/rtmp/rtmpconnection.c
new file mode 100644
index 000000000..937adcbab
--- /dev/null
+++ b/gst/rtmp2/rtmp/rtmpconnection.c
@@ -0,0 +1,996 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2013 David Schleef <ds@schleef.org>
+ * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
+ * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+ * Boston, MA 02110-1335, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gst/gst.h>
+#include <string.h>
+#include <math.h>
+#include "rtmpconnection.h"
+#include "rtmpchunkstream.h"
+#include "rtmpmessage.h"
+#include "rtmputils.h"
+#include "amf.h"
+
+GST_DEBUG_CATEGORY_STATIC (gst_rtmp_connection_debug_category);
+#define GST_CAT_DEFAULT gst_rtmp_connection_debug_category
+
+#define READ_SIZE 8192
+
+typedef void (*GstRtmpConnectionCallback) (GstRtmpConnection * connection);
+
+struct _GstRtmpConnection
+{
+ GObject parent_instance;
+
+ /* should be properties */
+ gboolean input_paused;
+ gboolean error;
+
+ /* private */
+ GThread *thread;
+ GSocketConnection *connection;
+ GCancellable *cancellable;
+ GSocketClient *socket_client;
+ GAsyncQueue *output_queue;
+ GMainContext *main_context;
+
+ GSource *input_source;
+ GByteArray *input_bytes;
+ guint input_needed_bytes;
+ GstRtmpChunkStreams *input_streams, *output_streams;
+ GList *transactions;
+ GList *expected_commands;
+ guint transaction_count;
+
+ GstRtmpConnectionMessageFunc input_handler;
+ gpointer input_handler_user_data;
+ GDestroyNotify input_handler_user_data_destroy;
+
+ GstRtmpConnectionFunc output_handler;
+ gpointer output_handler_user_data;
+ GDestroyNotify output_handler_user_data_destroy;
+
+ gboolean writing;
+
+ /* RTMP configuration */
+ gsize in_chunk_size;
+ gsize out_chunk_size;
+ gsize in_window_ack_size;
+ gsize out_window_ack_size;
+ gsize total_input_bytes;
+ gsize bytes_since_ack;
+};
+
+
+typedef struct
+{
+ GObjectClass parent_class;
+} GstRtmpConnectionClass;
+
+/* prototypes */
+
+static void gst_rtmp_connection_dispose (GObject * object);
+static void gst_rtmp_connection_finalize (GObject * object);
+static void gst_rtmp_connection_emit_error (GstRtmpConnection * self);
+static gboolean gst_rtmp_connection_input_ready (GInputStream * is,
+ gpointer user_data);
+static void gst_rtmp_connection_start_write (GstRtmpConnection * self);
+static void gst_rtmp_connection_write_bytes_done (GObject * obj,
+ GAsyncResult * result, gpointer user_data);
+static void gst_rtmp_connection_start_read (GstRtmpConnection * sc,
+ guint needed_bytes);
+static void gst_rtmp_connection_try_read (GstRtmpConnection * sc);
+static void gst_rtmp_connection_do_read (GstRtmpConnection * sc);
+static void gst_rtmp_connection_handle_protocol_control (GstRtmpConnection *
+ connection, GstBuffer * buffer);
+static void gst_rtmp_connection_handle_cm (GstRtmpConnection * connection,
+ GstBuffer * buffer);
+static void gst_rtmp_connection_handle_user_control (GstRtmpConnection * sc,
+ GstBuffer * buffer);
+static void gst_rtmp_connection_handle_message (GstRtmpConnection * sc,
+ GstBuffer * buffer);
+
+static void gst_rtmp_connection_send_ack (GstRtmpConnection * connection);
+static void
+gst_rtmp_connection_send_ping_response (GstRtmpConnection * connection,
+ guint32 event_data);
+
+typedef struct
+{
+ gdouble transaction_id;
+ GstRtmpCommandCallback func;
+ gpointer user_data;
+} Transaction;
+
+static Transaction *
+transaction_new (gdouble transaction_id, GstRtmpCommandCallback func,
+ gpointer user_data)
+{
+ Transaction *data = g_slice_new (Transaction);
+ data->transaction_id = transaction_id;
+ data->func = func;
+ data->user_data = user_data;
+ return data;
+}
+
+static void
+transaction_free (gpointer ptr)
+{
+ Transaction *data = ptr;
+ g_slice_free (Transaction, data);
+}
+
+typedef struct
+{
+ guint32 stream_id;
+ gchar *command_name;
+ GstRtmpCommandCallback func;
+ gpointer user_data;
+} ExpectedCommand;
+
+static ExpectedCommand *
+expected_command_new (guint32 stream_id, const gchar * command_name,
+ GstRtmpCommandCallback func, gpointer user_data)
+{
+ ExpectedCommand *data = g_slice_new (ExpectedCommand);
+ data->stream_id = stream_id;
+ data->command_name = g_strdup (command_name);
+ data->func = func;
+ data->user_data = user_data;
+ return data;
+}
+
+static void
+expected_command_free (gpointer ptr)
+{
+ ExpectedCommand *data = ptr;
+ g_free (data->command_name);
+ g_slice_free (ExpectedCommand, data);
+}
+
+enum
+{
+ SIGNAL_ERROR,
+ SIGNAL_STREAM_CONTROL,
+
+ N_SIGNALS
+};
+
+static guint signals[N_SIGNALS] = { 0, };
+
+/* class initialization */
+
+G_DEFINE_TYPE_WITH_CODE (GstRtmpConnection, gst_rtmp_connection,
+ G_TYPE_OBJECT,
+ GST_DEBUG_CATEGORY_INIT (gst_rtmp_connection_debug_category,
+ "rtmpconnection", 0, "debug category for GstRtmpConnection class"));
+
+static void
+gst_rtmp_connection_class_init (GstRtmpConnectionClass * klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+
+ gobject_class->dispose = gst_rtmp_connection_dispose;
+ gobject_class->finalize = gst_rtmp_connection_finalize;
+
+ signals[SIGNAL_ERROR] = g_signal_new ("error", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0);
+
+ signals[SIGNAL_STREAM_CONTROL] = g_signal_new ("stream-control",
+ G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL,
+ G_TYPE_NONE, 2, G_TYPE_INT, G_TYPE_UINT);
+
+ GST_DEBUG_REGISTER_FUNCPTR (gst_rtmp_connection_do_read);
+}
+
+static void
+gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
+{
+ rtmpconnection->cancellable = g_cancellable_new ();
+ rtmpconnection->output_queue =
+ g_async_queue_new_full ((GDestroyNotify) g_bytes_unref);
+ rtmpconnection->input_streams = gst_rtmp_chunk_streams_new ();
+ rtmpconnection->output_streams = gst_rtmp_chunk_streams_new ();
+
+ rtmpconnection->in_chunk_size = GST_RTMP_DEFAULT_CHUNK_SIZE;
+ rtmpconnection->out_chunk_size = GST_RTMP_DEFAULT_CHUNK_SIZE;
+
+ rtmpconnection->input_bytes = g_byte_array_sized_new (2 * READ_SIZE);
+ rtmpconnection->input_needed_bytes = 1;
+}
+
+void
+gst_rtmp_connection_dispose (GObject * object)
+{
+ GstRtmpConnection *rtmpconnection = GST_RTMP_CONNECTION (object);
+ GST_DEBUG_OBJECT (rtmpconnection, "dispose");
+
+ /* clean up as possible. may be called multiple times */
+
+ gst_rtmp_connection_close (rtmpconnection);
+ g_cancellable_cancel (rtmpconnection->cancellable);
+ gst_rtmp_connection_set_input_handler (rtmpconnection, NULL, NULL, NULL);
+ gst_rtmp_connection_set_output_handler (rtmpconnection, NULL, NULL, NULL);
+
+ G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->dispose (object);
+}
+
+void
+gst_rtmp_connection_finalize (GObject * object)
+{
+ GstRtmpConnection *rtmpconnection = GST_RTMP_CONNECTION (object);
+ GST_DEBUG_OBJECT (rtmpconnection, "finalize");
+
+ /* clean up object here */
+
+ g_clear_object (&rtmpconnection->cancellable);
+ g_clear_object (&rtmpconnection->connection);
+ g_clear_pointer (&rtmpconnection->output_queue, g_async_queue_unref);
+ g_clear_pointer (&rtmpconnection->input_streams, gst_rtmp_chunk_streams_free);
+ g_clear_pointer (&rtmpconnection->output_streams,
+ gst_rtmp_chunk_streams_free);
+ g_clear_pointer (&rtmpconnection->input_bytes, g_byte_array_unref);
+ g_clear_pointer (&rtmpconnection->main_context, g_main_context_unref);
+ g_clear_pointer (&rtmpconnection->thread, g_thread_unref);
+
+ G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->finalize (object);
+}
+
+GSocket *
+gst_rtmp_connection_get_socket (GstRtmpConnection * sc)
+{
+ return g_socket_connection_get_socket (sc->connection);
+}
+
+static void
+gst_rtmp_connection_set_socket_connection (GstRtmpConnection * sc,
+ GSocketConnection * connection)
+{
+ GInputStream *is;
+
+ sc->thread = g_thread_ref (g_thread_self ());
+ sc->main_context = g_main_context_ref_thread_default ();
+ sc->connection = g_object_ref (connection);
+
+ /* refs the socket because it's creating an input stream, which holds a ref */
+ is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
+ /* refs the socket because it's creating a socket source */
+ g_warn_if_fail (!sc->input_source);
+ sc->input_source =
+ g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (is),
+ sc->cancellable);
+ g_source_set_callback (sc->input_source,
+ (GSourceFunc) gst_rtmp_connection_input_ready, g_object_ref (sc),
+ g_object_unref);
+ g_source_attach (sc->input_source, sc->main_context);
+}
+
+GstRtmpConnection *
+gst_rtmp_connection_new (GSocketConnection * connection)
+{
+ GstRtmpConnection *sc;
+
+ sc = g_object_new (GST_TYPE_RTMP_CONNECTION, NULL);
+
+ gst_rtmp_connection_set_socket_connection (sc, connection);
+
+ return sc;
+}
+
+static void
+cancel_all_commands (GstRtmpConnection * self)
+{
+ GList *l;
+
+ for (l = self->transactions; l; l = g_list_next (l)) {
+ Transaction *cc = l->data;
+ GST_LOG ("calling transaction callback %s",
+ GST_DEBUG_FUNCPTR_NAME (cc->func));
+ cc->func ("<cancelled>", NULL, cc->user_data);
+ }
+ g_list_free_full (self->transactions, transaction_free);
+ self->transactions = NULL;
+
+ for (l = self->expected_commands; l; l = g_list_next (l)) {
+ ExpectedCommand *cc = l->data;
+ GST_LOG ("calling expected command callback %s",
+ GST_DEBUG_FUNCPTR_NAME (cc->func));
+ cc->func ("<cancelled>", NULL, cc->user_data);
+ }
+ g_list_free_full (self->expected_commands, expected_command_free);
+ self->expected_commands = NULL;
+}
+
+void
+gst_rtmp_connection_close (GstRtmpConnection * self)
+{
+ if (self->thread != g_thread_self ()) {
+ GST_ERROR ("Called from wrong thread");
+ }
+
+ g_cancellable_cancel (self->cancellable);
+ cancel_all_commands (self);
+
+ if (self->input_source) {
+ g_source_destroy (self->input_source);
+ g_clear_pointer (&self->input_source, g_source_unref);
+ }
+
+ if (self->connection) {
+ g_io_stream_close_async (G_IO_STREAM (self->connection),
+ G_PRIORITY_DEFAULT, NULL, NULL, NULL);
+ }
+}
+
+void
+gst_rtmp_connection_close_and_unref (gpointer ptr)
+{
+ GstRtmpConnection *connection;
+
+ g_return_if_fail (ptr);
+
+ connection = GST_RTMP_CONNECTION (ptr);
+ gst_rtmp_connection_close (connection);
+ g_object_unref (connection);
+}
+
+void
+gst_rtmp_connection_set_input_handler (GstRtmpConnection * sc,
+ GstRtmpConnectionMessageFunc callback, gpointer user_data,
+ GDestroyNotify user_data_destroy)
+{
+ if (sc->input_handler_user_data_destroy) {
+ sc->input_handler_user_data_destroy (sc->input_handler_user_data);
+ }
+
+ sc->input_handler = callback;
+ sc->input_handler_user_data = user_data;
+ sc->input_handler_user_data_destroy = user_data_destroy;
+}
+
+void
+gst_rtmp_connection_set_output_handler (GstRtmpConnection * sc,
+ GstRtmpConnectionFunc callback, gpointer user_data,
+ GDestroyNotify user_data_destroy)
+{
+ if (sc->output_handler_user_data_destroy) {
+ sc->output_handler_user_data_destroy (sc->output_handler_user_data);
+ }
+
+ sc->output_handler = callback;
+ sc->output_handler_user_data = user_data;
+ sc->output_handler_user_data_destroy = user_data_destroy;
+}
+
+static gboolean
+gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data)
+{
+ GstRtmpConnection *sc = user_data;
+ gssize ret;
+ guint oldsize;
+ GError *error = NULL;
+
+ GST_TRACE ("input ready");
+
+ oldsize = sc->input_bytes->len;
+ g_byte_array_set_size (sc->input_bytes, oldsize + READ_SIZE);
+ ret =
+ g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
+ sc->input_bytes->data + oldsize, READ_SIZE, sc->cancellable, &error);
+ g_byte_array_set_size (sc->input_bytes, oldsize + (ret > 0 ? ret : 0));
+
+ if (ret < 0) {
+ gint code = error->code;
+
+ if (error->domain == G_IO_ERROR && (code == G_IO_ERROR_WOULD_BLOCK ||
+ code == G_IO_ERROR_TIMED_OUT || code == G_IO_ERROR_AGAIN)) {
+ /* should retry */
+ GST_DEBUG ("read IO error %d %s, continuing", code, error->message);
+ g_error_free (error);
+ return G_SOURCE_CONTINUE;
+ }
+
+ GST_ERROR ("read error: %s %d %s", g_quark_to_string (error->domain),
+ code, error->message);
+ g_error_free (error);
+ } else if (ret == 0) {
+ GST_INFO ("read EOF");
+ }
+
+ if (ret <= 0) {
+ gst_rtmp_connection_emit_error (sc);
+ return G_SOURCE_REMOVE;
+ }
+
+ GST_TRACE ("read %" G_GSIZE_FORMAT " bytes", ret);
+
+ sc->total_input_bytes += ret;
+ sc->bytes_since_ack += ret;
+ if (sc->bytes_since_ack >= sc->in_window_ack_size) {
+ gst_rtmp_connection_send_ack (sc);
+ }
+
+ gst_rtmp_connection_try_read (sc);
+ return G_SOURCE_CONTINUE;
+}
+
+static void
+gst_rtmp_connection_start_write (GstRtmpConnection * self)
+{
+ GOutputStream *os;
+ GBytes *bytes;
+
+ if (self->writing) {
+ return;
+ }
+
+ bytes = g_async_queue_try_pop (self->output_queue);
+ if (!bytes) {
+ return;
+ }
+
+ self->writing = TRUE;
+ if (self->output_handler) {
+ self->output_handler (self, self->output_handler_user_data);
+ }
+
+ os = g_io_stream_get_output_stream (G_IO_STREAM (self->connection));
+ gst_rtmp_output_stream_write_all_bytes_async (os, bytes,
+ G_PRIORITY_DEFAULT, self->cancellable,
+ gst_rtmp_connection_write_bytes_done, g_object_ref (self));
+ g_bytes_unref (bytes);
+}
+
+static void
+gst_rtmp_connection_emit_error (GstRtmpConnection * self)
+{
+ if (self->error) {
+ return;
+ }
+
+ GST_INFO ("connection error");
+ self->error = TRUE;
+
+ cancel_all_commands (self);
+
+ g_signal_emit (self, signals[SIGNAL_ERROR], 0);
+}
+
+static void
+gst_rtmp_connection_write_bytes_done (GObject * obj,
+ GAsyncResult * result, gpointer user_data)
+{
+ GOutputStream *os = G_OUTPUT_STREAM (obj);
+ GstRtmpConnection *self = GST_RTMP_CONNECTION (user_data);
+ GError *error = NULL;
+ gboolean res;
+
+ self->writing = FALSE;
+
+ res = gst_rtmp_output_stream_write_all_bytes_finish (os, result, &error);
+ if (!res) {
+ if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+ GST_INFO ("write cancelled");
+ } else {
+ GST_ERROR ("write error: %s", error->message);
+ }
+ gst_rtmp_connection_emit_error (self);
+ g_error_free (error);
+ g_object_unref (self);
+ return;
+ }
+
+ GST_LOG ("write completed");
+ gst_rtmp_connection_start_write (self);
+ g_object_unref (self);
+}
+
+static void
+gst_rtmp_connection_start_read (GstRtmpConnection * connection,
+ guint needed_bytes)
+{
+ g_return_if_fail (needed_bytes > 0);
+ connection->input_needed_bytes = needed_bytes;
+ gst_rtmp_connection_try_read (connection);
+}
+
+static void
+gst_rtmp_connection_try_read (GstRtmpConnection * connection)
+{
+ guint need = connection->input_needed_bytes,
+ len = connection->input_bytes->len;
+
+ if (len < need) {
+ GST_TRACE ("got %u < %u bytes, need more", len, need);
+ return;
+ }
+
+ GST_TRACE ("got %u >= %u bytes, proceeding", len, need);
+ gst_rtmp_connection_do_read (connection);
+}
+
+static void
+gst_rtmp_connection_take_input_bytes (GstRtmpConnection * sc, gsize size,
+ GBytes ** outbytes)
+{
+ g_return_if_fail (size <= sc->input_bytes->len);
+
+ if (outbytes) {
+ *outbytes = g_bytes_new (sc->input_bytes->data, size);
+ }
+
+ g_byte_array_remove_range (sc->input_bytes, 0, size);
+}
+
+static void
+gst_rtmp_connection_do_read (GstRtmpConnection * sc)
+{
+ GByteArray *input_bytes = sc->input_bytes;
+ gsize needed_bytes = 1;
+
+ while (1) {
+ GstRtmpChunkStream *cstream;
+ guint32 chunk_stream_id, header_size, next_size;
+ guint8 *data;
+
+ chunk_stream_id = gst_rtmp_chunk_stream_parse_id (input_bytes->data,
+ input_bytes->len);
+
+ if (!chunk_stream_id) {
+ needed_bytes = input_bytes->len + 1;
+ break;
+ }
+
+ cstream = gst_rtmp_chunk_streams_get (sc->input_streams, chunk_stream_id);
+ header_size = gst_rtmp_chunk_stream_parse_header (cstream,
+ input_bytes->data, input_bytes->len);
+
+ if (input_bytes->len < header_size) {
+ needed_bytes = header_size;
+ break;
+ }
+
+ next_size = gst_rtmp_chunk_stream_parse_payload (cstream,
+ sc->in_chunk_size, &data);
+
+ if (input_bytes->len < header_size + next_size) {
+ needed_bytes = header_size + next_size;
+ break;
+ }
+
+ memcpy (data, input_bytes->data + header_size, next_size);
+ gst_rtmp_connection_take_input_bytes (sc, header_size + next_size, NULL);
+
+ next_size = gst_rtmp_chunk_stream_wrote_payload (cstream,
+ sc->in_chunk_size);
+
+ if (next_size == 0) {
+ GstBuffer *buffer = gst_rtmp_chunk_stream_parse_finish (cstream);
+ gst_rtmp_connection_handle_message (sc, buffer);
+ gst_buffer_unref (buffer);
+ }
+ }
+
+ gst_rtmp_connection_start_read (sc, needed_bytes);
+}
+
+static void
+gst_rtmp_connection_handle_message (GstRtmpConnection * sc, GstBuffer * buffer)
+{
+ if (gst_rtmp_message_is_protocol_control (buffer)) {
+ gst_rtmp_connection_handle_protocol_control (sc, buffer);
+ return;
+ }
+
+ if (gst_rtmp_message_is_user_control (buffer)) {
+ gst_rtmp_connection_handle_user_control (sc, buffer);
+ return;
+ }
+
+ switch (gst_rtmp_message_get_type (buffer)) {
+ case GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0:
+ gst_rtmp_connection_handle_cm (sc, buffer);
+ return;
+
+ default:
+ if (sc->input_handler) {
+ sc->input_handler (sc, buffer, sc->input_handler_user_data);
+ }
+ return;
+ }
+}
+
+static void
+gst_rtmp_connection_handle_protocol_control (GstRtmpConnection * connection,
+ GstBuffer * buffer)
+{
+ GstRtmpProtocolControl pc;
+
+ if (!gst_rtmp_message_parse_protocol_control (buffer, &pc)) {
+ GST_ERROR ("can't parse protocol control message");
+ return;
+ }
+
+ GST_LOG ("got protocol control message %d:%s", pc.type,
+ gst_rtmp_message_type_get_nick (pc.type));
+
+ switch (pc.type) {
+ case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE:
+ GST_INFO ("new chunk size %" G_GUINT32_FORMAT, pc.param);
+ connection->in_chunk_size = pc.param;
+ break;
+
+ case GST_RTMP_MESSAGE_TYPE_ABORT_MESSAGE:
+ GST_ERROR ("unimplemented: chunk abort, stream_id = %" G_GUINT32_FORMAT,
+ pc.param);
+ break;
+
+ case GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT:
+ /* We don't really send ack requests that we care about, so ignore */
+ GST_DEBUG ("acknowledgement %" G_GUINT32_FORMAT, pc.param);
+ break;
+
+ case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE:
+ GST_INFO ("window ack size: %" G_GUINT32_FORMAT, pc.param);
+ connection->in_window_ack_size = pc.param;
+ break;
+
+ case GST_RTMP_MESSAGE_TYPE_SET_PEER_BANDWIDTH:
+ GST_FIXME ("set peer bandwidth: %" G_GUINT32_FORMAT ", %"
+ G_GUINT32_FORMAT, pc.param, pc.param2);
+ /* FIXME this is not correct, but close enough */
+ gst_rtmp_connection_request_window_size (connection, pc.param);
+ break;
+
+ default:
+ GST_ERROR ("unimplemented protocol control type %d:%s", pc.type,
+ gst_rtmp_message_type_get_nick (pc.type));
+ break;
+ }
+}
+
+static void
+gst_rtmp_connection_handle_user_control (GstRtmpConnection * connection,
+ GstBuffer * buffer)
+{
+ GstRtmpUserControl uc;
+
+ if (!gst_rtmp_message_parse_user_control (buffer, &uc)) {
+ GST_ERROR ("can't parse user control message");
+ return;
+ }
+
+ GST_LOG ("got user control message %d:%s", uc.type,
+ gst_rtmp_user_control_type_get_nick (uc.type));
+
+ switch (uc.type) {
+ case GST_RTMP_USER_CONTROL_TYPE_STREAM_BEGIN:
+ case GST_RTMP_USER_CONTROL_TYPE_STREAM_EOF:
+ case GST_RTMP_USER_CONTROL_TYPE_STREAM_DRY:
+ case GST_RTMP_USER_CONTROL_TYPE_STREAM_IS_RECORDED:
+ GST_INFO ("stream %u got %s", uc.param,
+ gst_rtmp_user_control_type_get_nick (uc.type));
+ g_signal_emit (connection, signals[SIGNAL_STREAM_CONTROL], 0,
+ uc.type, uc.param);
+ break;
+
+ case GST_RTMP_USER_CONTROL_TYPE_SET_BUFFER_LENGTH:
+ GST_FIXME ("ignoring set buffer length: %" G_GUINT32_FORMAT ", %"
+ G_GUINT32_FORMAT " ms", uc.param, uc.param2);
+ break;
+
+ case GST_RTMP_USER_CONTROL_TYPE_PING_REQUEST:
+ GST_DEBUG ("ping request: %" G_GUINT32_FORMAT, uc.param);
+ gst_rtmp_connection_send_ping_response (connection, uc.param);
+ break;
+
+ case GST_RTMP_USER_CONTROL_TYPE_PING_RESPONSE:
+ GST_DEBUG ("ignoring ping response: %" G_GUINT32_FORMAT, uc.param);
+ break;
+
+ case GST_RTMP_USER_CONTROL_TYPE_BUFFER_EMPTY:
+ GST_LOG ("ignoring buffer empty: %" G_GUINT32_FORMAT, uc.param);
+ break;
+
+ case GST_RTMP_USER_CONTROL_TYPE_BUFFER_READY:
+ GST_LOG ("ignoring buffer ready: %" G_GUINT32_FORMAT, uc.param);
+ break;
+
+ default:
+ GST_ERROR ("unimplemented user control type %d:%s", uc.type,
+ gst_rtmp_user_control_type_get_nick (uc.type));
+ break;
+ }
+}
+
+static gboolean
+is_command_response (const gchar * command_name)
+{
+ return g_strcmp0 (command_name, "_result") == 0 ||
+ g_strcmp0 (command_name, "_error") == 0;
+}
+
+static void
+gst_rtmp_connection_handle_cm (GstRtmpConnection * sc, GstBuffer * buffer)
+{
+ GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (buffer);
+ gchar *command_name;
+ gdouble transaction_id;
+ GPtrArray *args;
+
+ {
+ GstMapInfo map;
+ gst_buffer_map (buffer, &map, GST_MAP_READ);
+ args = gst_amf_parse_command (map.data, map.size, &transaction_id,
+ &command_name);
+ gst_buffer_unmap (buffer, &map);
+ }
+
+ if (!args) {
+ return;
+ }
+
+ if (!isfinite (transaction_id) || transaction_id < 0 ||
+ transaction_id > G_MAXUINT) {
+ GST_WARNING ("Server sent command \"%s\" with extreme transaction ID %.0f",
+ GST_STR_NULL (command_name), transaction_id);
+ } else if (transaction_id > sc->transaction_count) {
+ GST_WARNING ("Server sent command \"%s\" with unused transaction ID "
+ "(%.0f > %u)", GST_STR_NULL (command_name), transaction_id,
+ sc->transaction_count);
+ sc->transaction_count = transaction_id;
+ }
+
+ GST_DEBUG ("got control message \"%s\" transaction %.0f size %"
+ G_GUINT32_FORMAT, GST_STR_NULL (command_name), transaction_id,
+ meta->size);
+
+ if (is_command_response (command_name)) {
+ if (transaction_id != 0) {
+ GList *l;
+
+ for (l = sc->transactions; l; l = g_list_next (l)) {
+ Transaction *t = l->data;
+
+ if (t->transaction_id != transaction_id) {
+ continue;
+ }
+
+ GST_LOG ("calling transaction callback %s",
+ GST_DEBUG_FUNCPTR_NAME (t->func));
+ sc->transactions = g_list_remove_link (sc->transactions, l);
+ t->func (command_name, args, t->user_data);
+ g_list_free_full (l, transaction_free);
+ break;
+ }
+ } else {
+ GST_WARNING ("Server sent response \"%s\" without transaction",
+ GST_STR_NULL (command_name));
+ }
+ } else {
+ GList *l;
+
+ if (transaction_id != 0) {
+ GST_FIXME ("Server sent command \"%s\" expecting reply",
+ GST_STR_NULL (command_name));
+ }
+
+ for (l = sc->expected_commands; l; l = g_list_next (l)) {
+ ExpectedCommand *ec = l->data;
+
+ if (ec->stream_id != meta->mstream) {
+ continue;
+ }
+
+ if (g_strcmp0 (ec->command_name, command_name)) {
+ continue;
+ }
+
+ GST_LOG ("calling expected command callback %s",
+ GST_DEBUG_FUNCPTR_NAME (ec->func));
+ sc->expected_commands = g_list_remove_link (sc->expected_commands, l);
+ ec->func (command_name, args, ec->user_data);
+ g_list_free_full (l, expected_command_free);
+ break;
+ }
+ }
+
+ g_free (command_name);
+ g_ptr_array_unref (args);
+}
+
+static gboolean
+start_write (gpointer user_data)
+{
+ GstRtmpConnection *sc = user_data;
+ gst_rtmp_connection_start_write (sc);
+ g_object_unref (sc);
+ return G_SOURCE_REMOVE;
+}
+
+static void
+byte_array_take_buffer (GByteArray * byte_array, GstBuffer * buffer)
+{
+ GstMapInfo map;
+ gboolean ret;
+ ret = gst_buffer_map (buffer, &map, GST_MAP_READ);
+ g_assert (ret);
+ g_assert (byte_array->len + map.size <= (guint64) G_MAXUINT);
+ g_byte_array_append (byte_array, map.data, map.size);
+ gst_buffer_unmap (buffer, &map);
+ gst_buffer_unref (buffer);
+}
+
+void
+gst_rtmp_connection_queue_message (GstRtmpConnection * self, GstBuffer * buffer)
+{
+ GstRtmpMeta *meta;
+ GstRtmpChunkStream *cstream;
+ GstBuffer *out_buffer;
+ GByteArray *out_ba;
+
+ g_return_if_fail (GST_IS_RTMP_CONNECTION (self));
+ g_return_if_fail (GST_IS_BUFFER (buffer));
+
+ meta = gst_buffer_get_rtmp_meta (buffer);
+ g_return_if_fail (meta);
+
+ cstream = gst_rtmp_chunk_streams_get (self->output_streams, meta->cstream);
+ g_return_if_fail (cstream);
+
+ out_buffer = gst_rtmp_chunk_stream_serialize_start (cstream, buffer,
+ self->out_chunk_size);
+ g_return_if_fail (out_buffer);
+
+ out_ba = g_byte_array_new ();
+
+ while (out_buffer) {
+ byte_array_take_buffer (out_ba, out_buffer);
+
+ out_buffer = gst_rtmp_chunk_stream_serialize_next (cstream,
+ self->out_chunk_size);
+ }
+
+ g_async_queue_push (self->output_queue, g_byte_array_free_to_bytes (out_ba));
+ g_main_context_invoke (self->main_context, start_write, g_object_ref (self));
+}
+
+guint
+gst_rtmp_connection_get_num_queued (GstRtmpConnection * connection)
+{
+ return g_async_queue_length (connection->output_queue);
+}
+
+guint
+gst_rtmp_connection_send_command (GstRtmpConnection * connection,
+ GstRtmpCommandCallback response_command, gpointer user_data,
+ guint32 stream_id, const gchar * command_name, const GstAmfNode * argument,
+ ...)
+{
+ GstBuffer *buffer;
+ gdouble transaction_id = 0;
+ va_list ap;
+ GBytes *payload;
+ guint8 *data;
+ gsize size;
+
+ if (connection->thread != g_thread_self ()) {
+ GST_ERROR ("Called from wrong thread");
+ }
+
+ GST_DEBUG ("Sending command '%s' on stream id %" G_GUINT32_FORMAT,
+ command_name, stream_id);
+
+ if (response_command) {
+ Transaction *t;
+
+ transaction_id = ++connection->transaction_count;
+
+ GST_LOG ("Registering %s for transid %.0f",
+ GST_DEBUG_FUNCPTR_NAME (response_command), transaction_id);
+
+ t = transaction_new (transaction_id, response_command, user_data);
+
+ connection->transactions = g_list_append (connection->transactions, t);
+ }
+
+ va_start (ap, argument);
+ payload = gst_amf_serialize_command_valist (transaction_id,
+ command_name, argument, ap);
+ va_end (ap);
+
+ data = g_bytes_unref_to_data (payload, &size);
+ buffer = gst_rtmp_message_new_wrapped (GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0,
+ 3, stream_id, data, size);
+
+ gst_rtmp_connection_queue_message (connection, buffer);
+ return transaction_id;
+}
+
+void
+gst_rtmp_connection_expect_command (GstRtmpConnection * connection,
+ GstRtmpCommandCallback response_command, gpointer user_data,
+ guint32 stream_id, const gchar * command_name)
+{
+ ExpectedCommand *ec;
+
+ g_return_if_fail (response_command);
+ g_return_if_fail (command_name);
+ g_return_if_fail (!is_command_response (command_name));
+
+ GST_LOG ("Registering %s for stream id %" G_GUINT32_FORMAT
+ " name \"%s\"", GST_DEBUG_FUNCPTR_NAME (response_command),
+ stream_id, command_name);
+
+ ec = expected_command_new (stream_id, command_name, response_command,
+ user_data);
+
+ connection->expected_commands =
+ g_list_append (connection->expected_commands, ec);
+}
+
+static void
+gst_rtmp_connection_send_ack (GstRtmpConnection * connection)
+{
+ GstRtmpProtocolControl pc = {
+ .type = GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT,
+ .param = (guint32) connection->total_input_bytes,
+ };
+
+ gst_rtmp_connection_queue_message (connection,
+ gst_rtmp_message_new_protocol_control (&pc));
+
+ connection->bytes_since_ack = 0;
+}
+
+static void
+gst_rtmp_connection_send_ping_response (GstRtmpConnection * connection,
+ guint32 event_data)
+{
+ GstRtmpUserControl uc = {
+ .type = GST_RTMP_USER_CONTROL_TYPE_PING_RESPONSE,
+ .param = event_data,
+ };
+
+ gst_rtmp_connection_queue_message (connection,
+ gst_rtmp_message_new_user_control (&uc));
+}
+
+void
+gst_rtmp_connection_request_window_size (GstRtmpConnection * connection,
+ guint32 window_ack_size)
+{
+ GstRtmpProtocolControl pc = {
+ .type = GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE,
+ .param = window_ack_size,
+ };
+
+ if (connection->out_window_ack_size == window_ack_size)
+ return;
+
+ connection->out_window_ack_size = window_ack_size;
+
+ gst_rtmp_connection_queue_message (connection,
+ gst_rtmp_message_new_protocol_control (&pc));
+}
diff --git a/gst/rtmp2/rtmp/rtmpconnection.h b/gst/rtmp2/rtmp/rtmpconnection.h
new file mode 100644
index 000000000..7b5eb6c09
--- /dev/null
+++ b/gst/rtmp2/rtmp/rtmpconnection.h
@@ -0,0 +1,82 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2013 David Schleef <ds@schleef.org>
+ * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
+ * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef _GST_RTMP_CONNECTION_H_
+#define _GST_RTMP_CONNECTION_H_
+
+#include <gio/gio.h>
+#include <gst/gst.h>
+#include "amf.h"
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_RTMP_CONNECTION (gst_rtmp_connection_get_type())
+#define GST_RTMP_CONNECTION(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTMP_CONNECTION,GstRtmpConnection))
+#define GST_IS_RTMP_CONNECTION(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTMP_CONNECTION))
+
+typedef struct _GstRtmpConnection GstRtmpConnection;
+
+typedef void (*GstRtmpConnectionFunc)
+ (GstRtmpConnection * connection, gpointer user_data);
+typedef void (*GstRtmpConnectionMessageFunc)
+ (GstRtmpConnection * connection, GstBuffer * buffer, gpointer user_data);
+
+typedef void (*GstRtmpCommandCallback) (const gchar * command_name,
+ GPtrArray * arguments, gpointer user_data);
+
+GType gst_rtmp_connection_get_type (void);
+
+GstRtmpConnection *gst_rtmp_connection_new (GSocketConnection * connection);
+
+GSocket *gst_rtmp_connection_get_socket (GstRtmpConnection * connection);
+
+void gst_rtmp_connection_close (GstRtmpConnection * connection);
+void gst_rtmp_connection_close_and_unref (gpointer ptr);
+
+void gst_rtmp_connection_set_input_handler (GstRtmpConnection * connection,
+ GstRtmpConnectionMessageFunc callback, gpointer user_data,
+ GDestroyNotify user_data_destroy);
+
+void gst_rtmp_connection_set_output_handler (GstRtmpConnection * connection,
+ GstRtmpConnectionFunc callback, gpointer user_data,
+ GDestroyNotify user_data_destroy);
+
+void gst_rtmp_connection_queue_bytes (GstRtmpConnection *self,
+ GBytes * bytes);
+void gst_rtmp_connection_queue_message (GstRtmpConnection * connection,
+ GstBuffer * buffer);
+guint gst_rtmp_connection_get_num_queued (GstRtmpConnection * connection);
+
+guint gst_rtmp_connection_send_command (GstRtmpConnection * connection,
+ GstRtmpCommandCallback response_command, gpointer user_data,
+ guint32 stream_id, const gchar * command_name, const GstAmfNode * argument,
+ ...) G_GNUC_NULL_TERMINATED;
+
+void gst_rtmp_connection_expect_command (GstRtmpConnection * connection,
+ GstRtmpCommandCallback response_command, gpointer user_data,
+ guint32 stream_id, const gchar * command_name);
+
+void gst_rtmp_connection_request_window_size (GstRtmpConnection * connection,
+ guint32 window_ack_size);
+
+G_END_DECLS
+
+#endif
diff --git a/gst/rtmp2/rtmp/rtmphandshake.c b/gst/rtmp2/rtmp/rtmphandshake.c
new file mode 100644
index 000000000..0d821c3d8
--- /dev/null
+++ b/gst/rtmp2/rtmp/rtmphandshake.c
@@ -0,0 +1,311 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
+ * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "rtmphandshake.h"
+#include "rtmputils.h"
+
+#include <gst/gst.h>
+#include <string.h>
+
+GST_DEBUG_CATEGORY_STATIC (gst_rtmp_handshake_debug_category);
+#define GST_CAT_DEFAULT gst_rtmp_handshake_debug_category
+
+static void
+init_debug (void)
+{
+ static volatile gsize done = 0;
+ if (g_once_init_enter (&done)) {
+ GST_DEBUG_CATEGORY_INIT (gst_rtmp_handshake_debug_category, "rtmphandshake",
+ 0, "debug category for the rtmp connection handshake");
+ g_once_init_leave (&done, 1);
+ }
+}
+
+static void client_handshake1_done (GObject * source, GAsyncResult * result,
+ gpointer user_data);
+static void client_handshake2_done (GObject * source, GAsyncResult * result,
+ gpointer user_data);
+static void client_handshake3_done (GObject * source, GAsyncResult * result,
+ gpointer user_data);
+
+static inline void
+serialize_u8 (GByteArray * array, guint8 value)
+{
+ g_byte_array_append (array, (guint8 *) & value, sizeof value);
+}
+
+static inline void
+serialize_u32 (GByteArray * array, guint32 value)
+{
+ value = GUINT32_TO_BE (value);
+ g_byte_array_append (array, (guint8 *) & value, sizeof value);
+}
+
+#define SIZE_P0 1
+#define SIZE_P1 1536
+#define SIZE_P2 SIZE_P1
+#define SIZE_P0P1 (SIZE_P0 + SIZE_P1)
+#define SIZE_P0P1P2 (SIZE_P0P1 + SIZE_P2)
+#define SIZE_RANDOM (SIZE_P1 - 8)
+
+typedef struct
+{
+ GBytes *random_bytes;
+ gboolean strict;
+} HandshakeData;
+
+static GBytes *
+handshake_random_data (void)
+{
+ G_STATIC_ASSERT (SIZE_RANDOM % 4 == 0);
+
+ GByteArray *ba = g_byte_array_sized_new (SIZE_RANDOM);
+ gint i;
+
+ for (i = 0; i < SIZE_RANDOM; i += 4) {
+ serialize_u32 (ba, g_random_int ());
+ }
+
+ return g_byte_array_free_to_bytes (ba);
+}
+
+static HandshakeData *
+handshake_data_new (gboolean strict)
+{
+ HandshakeData *data = g_slice_new0 (HandshakeData);
+ data->random_bytes = handshake_random_data ();
+ data->strict = strict;
+ return data;
+}
+
+static void
+handshake_data_free (gpointer ptr)
+{
+ HandshakeData *data = ptr;
+ g_clear_pointer (&data->random_bytes, g_bytes_unref);
+ g_slice_free (HandshakeData, data);
+}
+
+static gboolean
+handshake_data_check (HandshakeData * data, const guint8 * p2)
+{
+ const guint8 *ourrandom = g_bytes_get_data (data->random_bytes, NULL);
+ return memcmp (ourrandom, p2 + 8, SIZE_P2 - 8) == 0;
+}
+
+static GBytes *
+create_c0c1 (GBytes * random_bytes)
+{
+ GByteArray *ba = g_byte_array_sized_new (SIZE_P0P1);
+
+ /* C0 version */
+ serialize_u8 (ba, 3);
+
+ /* C1 time */
+ serialize_u32 (ba, g_get_monotonic_time () / 1000);
+
+ /* C1 zero */
+ serialize_u32 (ba, 0);
+
+ /* C1 random data */
+ gst_rtmp_byte_array_append_bytes (ba, random_bytes);
+
+ GST_DEBUG ("Sending C0+C1");
+ GST_MEMDUMP (">>> C0", ba->data, SIZE_P0);
+ GST_MEMDUMP (">>> C1", ba->data + SIZE_P0, SIZE_P1);
+
+ return g_byte_array_free_to_bytes (ba);
+}
+
+void
+gst_rtmp_client_handshake (GIOStream * stream, gboolean strict,
+ GCancellable * cancellable, GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ GTask *task;
+ HandshakeData *data;
+
+ g_return_if_fail (G_IS_IO_STREAM (stream));
+
+ init_debug ();
+ GST_INFO ("Starting client handshake");
+
+ task = g_task_new (stream, cancellable, callback, user_data);
+ data = handshake_data_new (strict);
+ g_task_set_task_data (task, data, handshake_data_free);
+
+ {
+ GOutputStream *os = g_io_stream_get_output_stream (stream);
+ GBytes *bytes = create_c0c1 (data->random_bytes);
+
+ gst_rtmp_output_stream_write_all_bytes_async (os,
+ bytes, G_PRIORITY_DEFAULT,
+ g_task_get_cancellable (task), client_handshake1_done, task);
+
+ g_bytes_unref (bytes);
+ }
+}
+
+static void
+client_handshake1_done (GObject * source, GAsyncResult * result,
+ gpointer user_data)
+{
+ GOutputStream *os = G_OUTPUT_STREAM (source);
+ GTask *task = user_data;
+ GIOStream *stream = g_task_get_source_object (task);
+ GInputStream *is = g_io_stream_get_input_stream (stream);
+ GError *error = NULL;
+ gboolean res;
+
+ res = gst_rtmp_output_stream_write_all_bytes_finish (os, result, &error);
+ if (!res) {
+ GST_ERROR ("Failed to send C0+C1: %s", error->message);
+ g_task_return_error (task, error);
+ g_object_unref (task);
+ return;
+ }
+
+ GST_DEBUG ("Sent C0+C1, waiting for S0+S1+S2");
+ gst_rtmp_input_stream_read_all_bytes_async (is, SIZE_P0P1P2,
+ G_PRIORITY_DEFAULT, g_task_get_cancellable (task),
+ client_handshake2_done, task);
+}
+
+static GBytes *
+create_c2 (const guint8 * s0s1s2)
+{
+ G_STATIC_ASSERT (SIZE_P1 == SIZE_P2);
+
+ GByteArray *ba = g_byte_array_sized_new (SIZE_P2);
+ gint64 c2time = g_get_monotonic_time ();
+
+ /* Copy S1 to C2 */
+ g_byte_array_set_size (ba, SIZE_P2);
+ memcpy (ba->data, s0s1s2 + SIZE_P0, SIZE_P1);
+
+ /* C2 time2 */
+ GST_WRITE_UINT32_BE (ba->data + 4, c2time / 1000);
+
+ GST_DEBUG ("Sending C2");
+ GST_MEMDUMP (">>> C2", ba->data, SIZE_P2);
+
+ return g_byte_array_free_to_bytes (ba);
+}
+
+static void
+client_handshake2_done (GObject * source, GAsyncResult * result,
+ gpointer user_data)
+{
+ GInputStream *is = G_INPUT_STREAM (source);
+ GTask *task = user_data;
+ GIOStream *stream = g_task_get_source_object (task);
+ HandshakeData *data = g_task_get_task_data (task);
+ GError *error = NULL;
+ GBytes *res;
+ const guint8 *s0s1s2;
+ gsize size;
+
+ res = gst_rtmp_input_stream_read_all_bytes_finish (is, result, &error);
+ if (!res) {
+ GST_ERROR ("Failed to read S0+S1+S2: %s", error->message);
+ g_task_return_error (task, error);
+ g_object_unref (task);
+ return;
+ }
+
+ s0s1s2 = g_bytes_get_data (res, &size);
+ if (size < SIZE_P0P1P2) {
+ GST_ERROR ("Short read (want %d have %" G_GSIZE_FORMAT ")", SIZE_P0P1P2,
+ size);
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PARTIAL_INPUT,
+ "Short read (want %d have %" G_GSIZE_FORMAT ")", SIZE_P0P1P2, size);
+ g_object_unref (task);
+ goto out;
+ }
+
+ GST_DEBUG ("Got S0+S1+S2");
+ GST_MEMDUMP ("<<< S0", s0s1s2, SIZE_P0);
+ GST_MEMDUMP ("<<< S1", s0s1s2 + SIZE_P0, SIZE_P1);
+ GST_MEMDUMP ("<<< S2", s0s1s2 + SIZE_P0P1, SIZE_P2);
+
+ if (handshake_data_check (data, s0s1s2 + SIZE_P0P1)) {
+ GST_DEBUG ("S2 random data matches C1");
+ } else {
+ if (data->strict) {
+ GST_ERROR ("Handshake response data did not match");
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_DATA,
+ "Handshake response data did not match");
+ g_object_unref (task);
+ goto out;
+ }
+
+ GST_WARNING ("Handshake reponse data did not match; continuing anyway");
+ }
+
+ {
+ GOutputStream *os = g_io_stream_get_output_stream (stream);
+ GBytes *bytes = create_c2 (s0s1s2);
+
+ gst_rtmp_output_stream_write_all_bytes_async (os,
+ bytes, G_PRIORITY_DEFAULT,
+ g_task_get_cancellable (task), client_handshake3_done, task);
+
+ g_bytes_unref (bytes);
+ }
+
+out:
+ g_bytes_unref (res);
+}
+
+static void
+client_handshake3_done (GObject * source, GAsyncResult * result,
+ gpointer user_data)
+{
+ GOutputStream *os = G_OUTPUT_STREAM (source);
+ GTask *task = user_data;
+ GError *error = NULL;
+ gboolean res;
+
+ res = gst_rtmp_output_stream_write_all_bytes_finish (os, result, &error);
+ if (!res) {
+ GST_ERROR ("Failed to send C2: %s", error->message);
+ g_task_return_error (task, error);
+ g_object_unref (task);
+ return;
+ }
+
+ GST_DEBUG ("Sent C2");
+ GST_INFO ("Client handshake finished");
+
+ g_task_return_boolean (task, TRUE);
+ g_object_unref (task);
+}
+
+gboolean
+gst_rtmp_client_handshake_finish (GIOStream * stream, GAsyncResult * result,
+ GError ** error)
+{
+ g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
+ return g_task_propagate_boolean (G_TASK (result), error);
+}
diff --git a/gst/rtmp2/rtmp/rtmphandshake.h b/gst/rtmp2/rtmp/rtmphandshake.h
new file mode 100644
index 000000000..1147948d4
--- /dev/null
+++ b/gst/rtmp2/rtmp/rtmphandshake.h
@@ -0,0 +1,35 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
+ * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef _GST_RTMP_HANDSHAKE_H_
+#define _GST_RTMP_HANDSHAKE_H_
+
+#include <gio/gio.h>
+
+G_BEGIN_DECLS
+
+void gst_rtmp_client_handshake (GIOStream * stream, gboolean strict,
+ GCancellable * cancellable, GAsyncReadyCallback callback,
+ gpointer user_data);
+gboolean gst_rtmp_client_handshake_finish (GIOStream * stream,
+ GAsyncResult * result, GError ** error);
+
+G_END_DECLS
+#endif
diff --git a/gst/rtmp2/rtmp/rtmpmessage.c b/gst/rtmp2/rtmp/rtmpmessage.c
new file mode 100644
index 000000000..1fba1d47b
--- /dev/null
+++ b/gst/rtmp2/rtmp/rtmpmessage.c
@@ -0,0 +1,494 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
+ * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "rtmpmessage.h"
+#include "rtmpchunkstream.h"
+
+GST_DEBUG_CATEGORY_STATIC (gst_rtmp_message_debug_category);
+#define GST_CAT_DEFAULT gst_rtmp_message_debug_category
+
+gboolean
+gst_rtmp_message_type_is_valid (GstRtmpMessageType type)
+{
+ switch (type) {
+ case GST_RTMP_MESSAGE_TYPE_INVALID:
+ case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE:
+ case GST_RTMP_MESSAGE_TYPE_ABORT_MESSAGE:
+ case GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT:
+ case GST_RTMP_MESSAGE_TYPE_USER_CONTROL:
+ case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE:
+ case GST_RTMP_MESSAGE_TYPE_SET_PEER_BANDWIDTH:
+ case GST_RTMP_MESSAGE_TYPE_AUDIO:
+ case GST_RTMP_MESSAGE_TYPE_VIDEO:
+ case GST_RTMP_MESSAGE_TYPE_DATA_AMF3:
+ case GST_RTMP_MESSAGE_TYPE_SHARED_OBJECT_AMF3:
+ case GST_RTMP_MESSAGE_TYPE_COMMAND_AMF3:
+ case GST_RTMP_MESSAGE_TYPE_DATA_AMF0:
+ case GST_RTMP_MESSAGE_TYPE_SHARED_OBJECT_AMF0:
+ case GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0:
+ case GST_RTMP_MESSAGE_TYPE_AGGREGATE:
+ return TRUE;
+ default:
+ return FALSE;
+ }
+}
+
+gboolean
+gst_rtmp_message_type_is_protocol_control (GstRtmpMessageType type)
+{
+ switch (type) {
+ case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE:
+ case GST_RTMP_MESSAGE_TYPE_ABORT_MESSAGE:
+ case GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT:
+ case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE:
+ case GST_RTMP_MESSAGE_TYPE_SET_PEER_BANDWIDTH:
+ return TRUE;
+
+ default:
+ return FALSE;
+ }
+}
+
+const gchar *
+gst_rtmp_message_type_get_nick (GstRtmpMessageType type)
+{
+ switch (type) {
+ case GST_RTMP_MESSAGE_TYPE_INVALID:
+ return "invalid";
+ case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE:
+ return "set-chunk-size";
+ case GST_RTMP_MESSAGE_TYPE_ABORT_MESSAGE:
+ return "abort-message";
+ case GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT:
+ return "acknowledgement";
+ case GST_RTMP_MESSAGE_TYPE_USER_CONTROL:
+ return "user-control";
+ case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE:
+ return "window-ack-size";
+ case GST_RTMP_MESSAGE_TYPE_SET_PEER_BANDWIDTH:
+ return "set-peer-bandwidth";
+ case GST_RTMP_MESSAGE_TYPE_AUDIO:
+ return "audio";
+ case GST_RTMP_MESSAGE_TYPE_VIDEO:
+ return "video";
+ case GST_RTMP_MESSAGE_TYPE_DATA_AMF3:
+ return "data-amf3";
+ case GST_RTMP_MESSAGE_TYPE_SHARED_OBJECT_AMF3:
+ return "shared-object-amf3";
+ case GST_RTMP_MESSAGE_TYPE_COMMAND_AMF3:
+ return "command-amf3";
+ case GST_RTMP_MESSAGE_TYPE_DATA_AMF0:
+ return "data-amf0";
+ case GST_RTMP_MESSAGE_TYPE_SHARED_OBJECT_AMF0:
+ return "shared-object-amf0";
+ case GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0:
+ return "command-amf0";
+ case GST_RTMP_MESSAGE_TYPE_AGGREGATE:
+ return "aggregate";
+ default:
+ return "unknown";
+ }
+}
+
+const gchar *
+gst_rtmp_user_control_type_get_nick (GstRtmpUserControlType type)
+{
+ switch (type) {
+ case GST_RTMP_USER_CONTROL_TYPE_STREAM_BEGIN:
+ return "stream-begin";
+ case GST_RTMP_USER_CONTROL_TYPE_STREAM_EOF:
+ return "stream-eof";
+ case GST_RTMP_USER_CONTROL_TYPE_STREAM_DRY:
+ return "stream-dry";
+ case GST_RTMP_USER_CONTROL_TYPE_SET_BUFFER_LENGTH:
+ return "set-buffer-length";
+ case GST_RTMP_USER_CONTROL_TYPE_STREAM_IS_RECORDED:
+ return "stream-is-recorded";
+ case GST_RTMP_USER_CONTROL_TYPE_PING_REQUEST:
+ return "ping-request";
+ case GST_RTMP_USER_CONTROL_TYPE_PING_RESPONSE:
+ return "ping-response";
+ case GST_RTMP_USER_CONTROL_TYPE_SWF_VERIFICATION_REQUEST:
+ return "swf-verification-request";
+ case GST_RTMP_USER_CONTROL_TYPE_SWF_VERIFICATION_RESPONSE:
+ return "swf-verification-response";
+ case GST_RTMP_USER_CONTROL_TYPE_BUFFER_EMPTY:
+ return "buffer-empty";
+ case GST_RTMP_USER_CONTROL_TYPE_BUFFER_READY:
+ return "buffer-ready";
+ default:
+ return "unknown";
+ }
+}
+
+GType
+gst_rtmp_meta_api_get_type (void)
+{
+ static volatile GType type = 0;
+ static const gchar *tags[] = {
+ NULL
+ };
+
+ if (g_once_init_enter (&type)) {
+ GType _type = gst_meta_api_type_register ("GstRtmpMetaAPI", tags);
+ GST_DEBUG_CATEGORY_INIT (gst_rtmp_message_debug_category,
+ "rtmpmessage", 0, "debug category for rtmp messages");
+ g_once_init_leave (&type, _type);
+ }
+ return type;
+}
+
+static gboolean
+gst_rtmp_meta_init (GstMeta * meta, gpointer params, GstBuffer * buffer)
+{
+ GstRtmpMeta *emeta = (GstRtmpMeta *) meta;
+
+ emeta->cstream = 0;
+ emeta->ts_delta = 0;
+ emeta->size = 0;
+ emeta->type = GST_RTMP_MESSAGE_TYPE_INVALID;
+ emeta->mstream = 0;
+
+ return TRUE;
+}
+
+static gboolean
+gst_rtmp_meta_transform (GstBuffer * dest, GstMeta * meta, GstBuffer * buffer,
+ GQuark type, gpointer data)
+{
+ GstRtmpMeta *smeta, *dmeta;
+
+ if (!GST_META_TRANSFORM_IS_COPY (type)) {
+ /* We only support copy transforms */
+ return FALSE;
+ }
+
+ smeta = (GstRtmpMeta *) meta;
+ dmeta = gst_buffer_get_rtmp_meta (dest);
+ if (!dmeta) {
+ dmeta = gst_buffer_add_rtmp_meta (dest);
+ }
+
+ dmeta->cstream = smeta->cstream;
+ dmeta->ts_delta = smeta->ts_delta;
+ dmeta->size = smeta->size;
+ dmeta->type = smeta->type;
+ dmeta->mstream = smeta->mstream;
+
+ return dmeta != NULL;
+}
+
+const GstMetaInfo *
+gst_rtmp_meta_get_info (void)
+{
+ static const GstMetaInfo *rtmp_meta_info = NULL;
+
+ if (g_once_init_enter (&rtmp_meta_info)) {
+ const GstMetaInfo *meta = gst_meta_register (GST_RTMP_META_API_TYPE,
+ "GstRtmpMeta", sizeof *meta, gst_rtmp_meta_init, NULL,
+ gst_rtmp_meta_transform);
+ g_once_init_leave (&rtmp_meta_info, meta);
+ }
+ return rtmp_meta_info;
+}
+
+GstRtmpMeta *
+gst_buffer_add_rtmp_meta (GstBuffer * buffer)
+{
+ GstRtmpMeta *meta;
+
+ g_return_val_if_fail (GST_IS_BUFFER (buffer), NULL);
+
+ meta = (GstRtmpMeta *) gst_buffer_add_meta (buffer, GST_RTMP_META_INFO, NULL);
+
+ return meta;
+}
+
+GstBuffer *
+gst_rtmp_message_new (GstRtmpMessageType type, guint32 cstream, guint32 mstream)
+{
+ GstBuffer *buffer = gst_buffer_new ();
+ GstRtmpMeta *meta = gst_buffer_add_rtmp_meta (buffer);
+
+ meta->type = type;
+ meta->cstream = cstream;
+ meta->mstream = mstream;
+
+ return buffer;
+}
+
+GstBuffer *
+gst_rtmp_message_new_wrapped (GstRtmpMessageType type, guint32 cstream,
+ guint32 mstream, guint8 * data, gsize size)
+{
+ GstBuffer *message = gst_rtmp_message_new (type, cstream, mstream);
+
+ gst_buffer_append_memory (message,
+ gst_memory_new_wrapped (0, data, size, 0, size, data, g_free));
+
+ return message;
+}
+
+void
+gst_rtmp_buffer_dump (GstBuffer * buffer, const gchar * prefix)
+{
+ GstRtmpMeta *meta;
+ GstMapInfo map;
+
+ if (G_LIKELY (GST_LEVEL_LOG > _gst_debug_min || GST_LEVEL_LOG >
+ gst_debug_category_get_threshold (GST_CAT_DEFAULT))) {
+ return;
+ }
+
+ g_return_if_fail (GST_IS_BUFFER (buffer));
+ g_return_if_fail (prefix);
+
+ GST_LOG ("%s %" GST_PTR_FORMAT, prefix, buffer);
+
+ meta = gst_buffer_get_rtmp_meta (buffer);
+ if (meta) {
+ GST_LOG ("%s cstream:%-4" G_GUINT32_FORMAT " mstream:%-4" G_GUINT32_FORMAT
+ " ts:%-8" G_GUINT32_FORMAT " len:%-6" G_GUINT32_FORMAT " type:%s",
+ prefix, meta->cstream, meta->mstream, meta->ts_delta, meta->size,
+ gst_rtmp_message_type_get_nick (meta->type));
+ }
+
+ if (G_LIKELY (GST_LEVEL_MEMDUMP > _gst_debug_min || GST_LEVEL_MEMDUMP >
+ gst_debug_category_get_threshold (GST_CAT_DEFAULT))) {
+ return;
+ }
+
+ if (!gst_buffer_map (buffer, &map, GST_MAP_READ)) {
+ GST_ERROR ("Failed to map %" GST_PTR_FORMAT " for memdump", buffer);
+ return;
+ }
+
+ if (map.size > 0) {
+ GST_MEMDUMP (prefix, map.data, map.size);
+ }
+
+ gst_buffer_unmap (buffer, &map);
+}
+
+GstRtmpMessageType
+gst_rtmp_message_get_type (GstBuffer * buffer)
+{
+ GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (buffer);
+ g_return_val_if_fail (meta, GST_RTMP_MESSAGE_TYPE_INVALID);
+ return meta->type;
+}
+
+gboolean
+gst_rtmp_message_is_protocol_control (GstBuffer * buffer)
+{
+ GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (buffer);
+
+ g_return_val_if_fail (meta, FALSE);
+
+ if (!gst_rtmp_message_type_is_protocol_control (meta->type)) {
+ return FALSE;
+ }
+
+ if (meta->cstream != GST_RTMP_CHUNK_STREAM_PROTOCOL) {
+ GST_WARNING ("Protocol control message on chunk stream %"
+ G_GUINT32_FORMAT ", not 2", meta->cstream);
+ }
+
+ if (meta->mstream != 0) {
+ GST_WARNING ("Protocol control message on message stream %"
+ G_GUINT32_FORMAT ", not 0", meta->mstream);
+ }
+
+ return TRUE;
+}
+
+gboolean
+gst_rtmp_message_is_user_control (GstBuffer * buffer)
+{
+ GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (buffer);
+
+ g_return_val_if_fail (meta, FALSE);
+
+ if (meta->type != GST_RTMP_MESSAGE_TYPE_USER_CONTROL) {
+ return FALSE;
+ }
+
+ if (meta->cstream != GST_RTMP_CHUNK_STREAM_PROTOCOL) {
+ GST_WARNING ("User control message on chunk stream %"
+ G_GUINT32_FORMAT ", not 2", meta->cstream);
+ }
+
+ if (meta->mstream != 0) {
+ GST_WARNING ("User control message on message stream %"
+ G_GUINT32_FORMAT ", not 0", meta->mstream);
+ }
+
+ return TRUE;
+}
+
+static inline gboolean
+pc_has_param2 (GstRtmpMessageType type)
+{
+ return type == GST_RTMP_MESSAGE_TYPE_SET_PEER_BANDWIDTH;
+}
+
+gboolean
+gst_rtmp_message_parse_protocol_control (GstBuffer * buffer,
+ GstRtmpProtocolControl * out)
+{
+ GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (buffer);
+ GstMapInfo map;
+ GstRtmpProtocolControl pc;
+ gsize pc_size = 4;
+ gboolean ret = FALSE;
+
+ g_return_val_if_fail (meta, FALSE);
+ g_return_val_if_fail (gst_rtmp_message_type_is_protocol_control (meta->type),
+ FALSE);
+
+ if (!gst_buffer_map (buffer, &map, GST_MAP_READ)) {
+ GST_ERROR ("can't map protocol control message");
+ return FALSE;
+ }
+
+ pc.type = meta->type;
+ pc_size = pc_has_param2 (pc.type) ? 5 : 4;
+
+ if (map.size < pc_size) {
+ GST_ERROR ("can't read protocol control param");
+ goto err;
+ } else if (map.size > pc_size) {
+ GST_WARNING ("overlength protocol control: %" G_GSIZE_FORMAT " > %"
+ G_GSIZE_FORMAT, map.size, pc_size);
+ }
+
+ pc.param = GST_READ_UINT32_BE (map.data);
+ pc.param2 = pc_has_param2 (pc.type) ? GST_READ_UINT8 (map.data + 4) : 0;
+
+ ret = TRUE;
+ if (out) {
+ *out = pc;
+ }
+
+err:
+ gst_buffer_unmap (buffer, &map);
+ return ret;
+}
+
+GstBuffer *
+gst_rtmp_message_new_protocol_control (GstRtmpProtocolControl * pc)
+{
+ guint8 *data;
+ gsize size;
+
+ g_return_val_if_fail (pc, NULL);
+ g_return_val_if_fail (gst_rtmp_message_type_is_protocol_control (pc->type),
+ NULL);
+
+ size = pc_has_param2 (pc->type) ? 5 : 4;
+
+ data = g_malloc (size);
+ GST_WRITE_UINT32_BE (data, pc->param);
+ if (pc_has_param2 (pc->type)) {
+ GST_WRITE_UINT32_BE (data + 4, pc->param2);
+ }
+
+ return gst_rtmp_message_new_wrapped (pc->type,
+ GST_RTMP_CHUNK_STREAM_PROTOCOL, 0, data, size);
+}
+
+static inline gboolean
+uc_has_param2 (GstRtmpUserControlType type)
+{
+ return type == GST_RTMP_USER_CONTROL_TYPE_SET_BUFFER_LENGTH;
+}
+
+gboolean
+gst_rtmp_message_parse_user_control (GstBuffer * buffer,
+ GstRtmpUserControl * out)
+{
+ GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (buffer);
+ GstMapInfo map;
+ GstRtmpUserControl uc;
+ gsize uc_size;
+ gboolean ret = FALSE;
+
+ g_return_val_if_fail (meta, FALSE);
+ g_return_val_if_fail (meta->type == GST_RTMP_MESSAGE_TYPE_USER_CONTROL,
+ FALSE);
+
+ if (!gst_buffer_map (buffer, &map, GST_MAP_READ)) {
+ GST_ERROR ("can't map user control message");
+ return FALSE;
+ }
+
+ if (map.size < 2) {
+ GST_ERROR ("can't read user control type");
+ goto err;
+ }
+
+ uc.type = GST_READ_UINT16_BE (map.data);
+ uc_size = uc_has_param2 (uc.type) ? 10 : 6;
+
+ if (map.size < uc_size) {
+ GST_ERROR ("can't read user control param");
+ goto err;
+ } else if (map.size > uc_size) {
+ GST_WARNING ("overlength user control: %" G_GSIZE_FORMAT " > %"
+ G_GSIZE_FORMAT, map.size, uc_size);
+ }
+
+ uc.param = GST_READ_UINT32_BE (map.data + 2);
+ uc.param2 = uc_has_param2 (uc.type) ? GST_READ_UINT32_BE (map.data + 6) : 0;
+
+ ret = TRUE;
+ if (out) {
+ *out = uc;
+ }
+
+err:
+ gst_buffer_unmap (buffer, &map);
+ return ret;
+}
+
+GstBuffer *
+gst_rtmp_message_new_user_control (GstRtmpUserControl * uc)
+{
+ guint8 *data;
+ gsize size;
+
+ g_return_val_if_fail (uc, NULL);
+
+ size = uc_has_param2 (uc->type) ? 10 : 6;
+
+ data = g_malloc (size);
+ GST_WRITE_UINT16_BE (data, uc->type);
+ GST_WRITE_UINT32_BE (data + 2, uc->param);
+ if (uc_has_param2 (uc->type)) {
+ GST_WRITE_UINT32_BE (data + 6, uc->param2);
+ }
+
+ return gst_rtmp_message_new_wrapped (GST_RTMP_MESSAGE_TYPE_USER_CONTROL,
+ GST_RTMP_CHUNK_STREAM_PROTOCOL, 0, data, size);
+}
diff --git a/gst/rtmp2/rtmp/rtmpmessage.h b/gst/rtmp2/rtmp/rtmpmessage.h
new file mode 100644
index 000000000..92f3fdc95
--- /dev/null
+++ b/gst/rtmp2/rtmp/rtmpmessage.h
@@ -0,0 +1,142 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
+ * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef _GST_RTMP_MESSAGE_H_
+#define _GST_RTMP_MESSAGE_H_
+
+#include <gst/gst.h>
+
+G_BEGIN_DECLS
+
+typedef enum {
+ GST_RTMP_MESSAGE_TYPE_INVALID = 0,
+ GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE = 1,
+ GST_RTMP_MESSAGE_TYPE_ABORT_MESSAGE = 2,
+ GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT = 3,
+ GST_RTMP_MESSAGE_TYPE_USER_CONTROL = 4,
+ GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE = 5,
+ GST_RTMP_MESSAGE_TYPE_SET_PEER_BANDWIDTH = 6,
+ GST_RTMP_MESSAGE_TYPE_AUDIO = 8,
+ GST_RTMP_MESSAGE_TYPE_VIDEO = 9,
+ GST_RTMP_MESSAGE_TYPE_DATA_AMF3 = 15,
+ GST_RTMP_MESSAGE_TYPE_SHARED_OBJECT_AMF3 = 16,
+ GST_RTMP_MESSAGE_TYPE_COMMAND_AMF3 = 17,
+ GST_RTMP_MESSAGE_TYPE_DATA_AMF0 = 18,
+ GST_RTMP_MESSAGE_TYPE_SHARED_OBJECT_AMF0 = 19,
+ GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0 = 20,
+ GST_RTMP_MESSAGE_TYPE_AGGREGATE = 22,
+} GstRtmpMessageType;
+
+gboolean gst_rtmp_message_type_is_valid (GstRtmpMessageType type);
+gboolean gst_rtmp_message_type_is_protocol_control (GstRtmpMessageType type);
+const gchar * gst_rtmp_message_type_get_nick (GstRtmpMessageType type);
+
+typedef enum
+{
+ GST_RTMP_USER_CONTROL_TYPE_STREAM_BEGIN = 0,
+ GST_RTMP_USER_CONTROL_TYPE_STREAM_EOF = 1,
+ GST_RTMP_USER_CONTROL_TYPE_STREAM_DRY = 2,
+ GST_RTMP_USER_CONTROL_TYPE_SET_BUFFER_LENGTH = 3,
+ GST_RTMP_USER_CONTROL_TYPE_STREAM_IS_RECORDED = 4,
+ GST_RTMP_USER_CONTROL_TYPE_PING_REQUEST = 6,
+ GST_RTMP_USER_CONTROL_TYPE_PING_RESPONSE = 7,
+
+ /* undocumented */
+ GST_RTMP_USER_CONTROL_TYPE_SWF_VERIFICATION_REQUEST = 26,
+ GST_RTMP_USER_CONTROL_TYPE_SWF_VERIFICATION_RESPONSE = 27,
+ GST_RTMP_USER_CONTROL_TYPE_BUFFER_EMPTY = 31,
+ GST_RTMP_USER_CONTROL_TYPE_BUFFER_READY = 32,
+} GstRtmpUserControlType;
+
+const gchar * gst_rtmp_user_control_type_get_nick (
+ GstRtmpUserControlType type);
+
+#define GST_RTMP_META_API_TYPE (gst_rtmp_meta_api_get_type())
+#define GST_RTMP_META_INFO (gst_rtmp_meta_get_info())
+typedef struct _GstRtmpMeta GstRtmpMeta;
+
+struct _GstRtmpMeta {
+ GstMeta meta;
+ guint32 cstream;
+ guint32 ts_delta;
+ guint32 size;
+ GstRtmpMessageType type;
+ guint32 mstream;
+};
+
+GType gst_rtmp_meta_api_get_type (void);
+const GstMetaInfo * gst_rtmp_meta_get_info (void);
+
+GstRtmpMeta * gst_buffer_add_rtmp_meta (GstBuffer * buffer);
+
+static inline GstRtmpMeta *
+gst_buffer_get_rtmp_meta (GstBuffer * buffer)
+{
+ return (GstRtmpMeta *) gst_buffer_get_meta (buffer, GST_RTMP_META_API_TYPE);
+}
+
+GstBuffer * gst_rtmp_message_new (GstRtmpMessageType type, guint32 cstream,
+ guint32 mstream);
+GstBuffer * gst_rtmp_message_new_wrapped (GstRtmpMessageType type, guint32 cstream,
+ guint32 mstream, guint8 * data, gsize size);
+
+void gst_rtmp_buffer_dump (GstBuffer * buffer, const gchar * prefix);
+
+GstRtmpMessageType gst_rtmp_message_get_type (GstBuffer * buffer);
+gboolean gst_rtmp_message_is_protocol_control (GstBuffer * buffer);
+gboolean gst_rtmp_message_is_user_control (GstBuffer * buffer);
+
+typedef struct {
+ GstRtmpMessageType type;
+
+ /* for SET_CHUNK_SIZE: chunk size */
+ /* for ABORT_MESSAGE: chunk stream ID */
+ /* for ACKNOWLEDGEMENT: acknowledged byte count */
+ /* for WINDOW_ACK_SIZE and SET_PEER_BANDWIDTH: acknowledgement window size */
+ guint32 param;
+
+ /* for SET_PEER_BANDWIDTH: limit type */
+ guint8 param2;
+} GstRtmpProtocolControl;
+
+gboolean gst_rtmp_message_parse_protocol_control (GstBuffer * buffer,
+ GstRtmpProtocolControl * out);
+
+GstBuffer * gst_rtmp_message_new_protocol_control (GstRtmpProtocolControl * pc);
+
+typedef struct {
+ GstRtmpUserControlType type;
+
+ /* for STREAM_BEGIN to STREAM_IS_RECORDED: message stream ID */
+ /* for PING_REQUEST and PING_RESPONSE: timestamp of request */
+ guint32 param;
+
+ /* for SET_BUFFER_LENGTH: buffer length in ms */
+ guint32 param2;
+} GstRtmpUserControl;
+
+gboolean gst_rtmp_message_parse_user_control (GstBuffer * buffer,
+ GstRtmpUserControl * out);
+
+GstBuffer * gst_rtmp_message_new_user_control (GstRtmpUserControl * uc);
+
+G_END_DECLS
+
+#endif
diff --git a/gst/rtmp2/rtmp/rtmputils.c b/gst/rtmp2/rtmp/rtmputils.c
new file mode 100644
index 000000000..c9bbc3a98
--- /dev/null
+++ b/gst/rtmp2/rtmp/rtmputils.c
@@ -0,0 +1,246 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2013 David Schleef <ds@schleef.org>
+ * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
+ * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+ * Boston, MA 02110-1335, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "rtmputils.h"
+#include <string.h>
+
+static void read_all_bytes_done (GObject * source, GAsyncResult * result,
+ gpointer user_data);
+static void write_all_bytes_done (GObject * source, GAsyncResult * result,
+ gpointer user_data);
+
+void
+gst_rtmp_byte_array_append_bytes (GByteArray * bytearray, GBytes * bytes)
+{
+ const guint8 *data;
+ gsize size;
+ guint offset;
+
+ g_return_if_fail (bytearray);
+
+ offset = bytearray->len;
+ data = g_bytes_get_data (bytes, &size);
+
+ g_return_if_fail (data);
+
+ g_byte_array_set_size (bytearray, offset + size);
+ memcpy (bytearray->data + offset, data, size);
+}
+
+void
+gst_rtmp_input_stream_read_all_bytes_async (GInputStream * stream, gsize count,
+ int io_priority, GCancellable * cancellable, GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ GTask *task;
+ GByteArray *ba;
+
+ g_return_if_fail (G_IS_INPUT_STREAM (stream));
+
+ task = g_task_new (stream, cancellable, callback, user_data);
+
+ ba = g_byte_array_sized_new (count);
+ g_byte_array_set_size (ba, count);
+ g_task_set_task_data (task, ba, (GDestroyNotify) g_byte_array_unref);
+
+ g_input_stream_read_all_async (stream, ba->data, count, io_priority,
+ cancellable, read_all_bytes_done, task);
+}
+
+static void
+read_all_bytes_done (GObject * source, GAsyncResult * result,
+ gpointer user_data)
+{
+ GInputStream *is = G_INPUT_STREAM (source);
+ GTask *task = user_data;
+ GByteArray *ba = g_task_get_task_data (task);
+ GError *error = NULL;
+ gboolean res;
+ gsize bytes_read;
+ GBytes *bytes;
+
+ res = g_input_stream_read_all_finish (is, result, &bytes_read, &error);
+ if (!res) {
+ g_task_return_error (task, error);
+ g_object_unref (task);
+ return;
+ }
+
+ g_byte_array_set_size (ba, bytes_read);
+ bytes = g_byte_array_free_to_bytes (g_byte_array_ref (ba));
+
+ g_task_return_pointer (task, bytes, (GDestroyNotify) g_bytes_unref);
+ g_object_unref (task);
+}
+
+GBytes *
+gst_rtmp_input_stream_read_all_bytes_finish (GInputStream * stream,
+ GAsyncResult * result, GError ** error)
+{
+ g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
+ return g_task_propagate_pointer (G_TASK (result), error);
+}
+
+void
+gst_rtmp_output_stream_write_all_bytes_async (GOutputStream * stream,
+ GBytes * bytes, int io_priority, GCancellable * cancellable,
+ GAsyncReadyCallback callback, gpointer user_data)
+{
+ GTask *task;
+ const void *data;
+ gsize size;
+
+ g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
+ g_return_if_fail (bytes);
+
+ data = g_bytes_get_data (bytes, &size);
+ g_return_if_fail (data);
+
+ task = g_task_new (stream, cancellable, callback, user_data);
+ g_task_set_task_data (task, g_bytes_ref (bytes),
+ (GDestroyNotify) g_bytes_unref);
+
+ g_output_stream_write_all_async (stream, data, size, io_priority,
+ cancellable, write_all_bytes_done, task);
+}
+
+static void
+write_all_bytes_done (GObject * source, GAsyncResult * result,
+ gpointer user_data)
+{
+ GOutputStream *os = G_OUTPUT_STREAM (source);
+ GTask *task = user_data;
+ GError *error = NULL;
+ gboolean res;
+
+ res = g_output_stream_write_all_finish (os, result, NULL, &error);
+ if (!res) {
+ g_task_return_error (task, error);
+ g_object_unref (task);
+ return;
+ }
+
+ g_task_return_boolean (task, TRUE);
+ g_object_unref (task);
+}
+
+gboolean
+gst_rtmp_output_stream_write_all_bytes_finish (GOutputStream * stream,
+ GAsyncResult * result, GError ** error)
+{
+ g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
+ return g_task_propagate_boolean (G_TASK (result), error);
+}
+
+static const gchar ascii_table[128] = {
+ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
+ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
+ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
+ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
+ ' ', '!', 0x0, '#', '$', '%', '&', '\'',
+ '(', ')', '*', '+', ',', '-', '.', '/',
+ '0', '1', '2', '3', '4', '5', '6', '7',
+ '8', '9', ':', ';', '<', '=', '>', '?',
+ '@', 'A', 'B', 'C', 'D', 'E', 'F', 'G',
+ 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O',
+ 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W',
+ 'X', 'Y', 'Z', '[', 0x0, ']', '^', '_',
+ '`', 'a', 'b', 'c', 'd', 'e', 'f', 'g',
+ 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o',
+ 'p', 'q', 'r', 's', 't', 'u', 'v', 'w',
+ 'x', 'y', 'z', '{', '|', '}', '~', 0x0,
+};
+
+static const gchar ascii_escapes[128] = {
+ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 'a',
+ 'b', 't', 'n', 'v', 'f', 'r', 0x0, 0x0,
+ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
+ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
+ 0x0, 0x0, '"', 0x0, 0x0, 0x0, 0x0, 0x0,
+ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
+ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
+ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
+ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
+ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
+ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
+ 0x0, 0x0, 0x0, 0x0, '\\', 0x0, 0x0, 0x0,
+ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
+ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
+ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
+ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
+};
+
+void
+gst_rtmp_string_print_escaped (GString * string, const gchar * data,
+ gssize size)
+{
+ gssize i;
+
+ g_return_if_fail (string);
+
+ if (!data) {
+ g_string_append (string, "(NULL)");
+ return;
+ }
+
+ g_string_append_c (string, '"');
+
+ for (i = 0; size < 0 ? data[i] != 0 : i < size; i++) {
+ guchar c = data[i];
+
+ if (G_LIKELY (c < G_N_ELEMENTS (ascii_table))) {
+ if (ascii_table[c]) {
+ g_string_append_c (string, c);
+ continue;
+ }
+
+ if (ascii_escapes[c]) {
+ g_string_append_c (string, '\\');
+ g_string_append_c (string, ascii_escapes[c]);
+ continue;
+ }
+ } else {
+ gunichar uc = g_utf8_get_char_validated (data + i,
+ size < 0 ? -1 : size - i);
+ if (uc != (gunichar) (-2) && uc != (gunichar) (-1)) {
+ if (g_unichar_isprint (uc)) {
+ g_string_append_unichar (string, uc);
+ } else if (uc <= G_MAXUINT16) {
+ g_string_append_printf (string, "\\u%04X", uc);
+ } else {
+ g_string_append_printf (string, "\\U%08X", uc);
+ }
+
+ i += g_utf8_skip[c] - 1;
+ continue;
+ }
+ }
+
+ g_string_append_printf (string, "\\x%02X", c);
+ }
+
+ g_string_append_c (string, '"');
+
+}
diff --git a/gst/rtmp2/rtmp/rtmputils.h b/gst/rtmp2/rtmp/rtmputils.h
new file mode 100644
index 000000000..b427f37b4
--- /dev/null
+++ b/gst/rtmp2/rtmp/rtmputils.h
@@ -0,0 +1,46 @@
+/* GStreamer RTMP Library
+ * Copyright (C) 2013 David Schleef <ds@schleef.org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef _GST_RTMP_UTILS_H_
+#define _GST_RTMP_UTILS_H_
+
+#include <gio/gio.h>
+
+G_BEGIN_DECLS
+
+void gst_rtmp_byte_array_append_bytes (GByteArray * bytearray, GBytes * bytes);
+
+void gst_rtmp_input_stream_read_all_bytes_async (GInputStream * stream,
+ gsize count, int io_priority, GCancellable * cancellable,
+ GAsyncReadyCallback callback, gpointer user_data);
+GBytes * gst_rtmp_input_stream_read_all_bytes_finish (GInputStream * stream,
+ GAsyncResult * result, GError ** error);
+
+void gst_rtmp_output_stream_write_all_bytes_async (GOutputStream * stream,
+ GBytes * bytes, int io_priority, GCancellable * cancellable,
+ GAsyncReadyCallback callback, gpointer user_data);
+gboolean gst_rtmp_output_stream_write_all_bytes_finish (GOutputStream * stream,
+ GAsyncResult * result, GError ** error);
+
+void gst_rtmp_string_print_escaped (GString * string, const gchar *data,
+ gssize size);
+
+G_END_DECLS
+
+#endif