summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGeorge Kiagiadakis <george.kiagiadakis@collabora.com>2017-07-05 16:50:22 +0300
committerGeorge Kiagiadakis <george.kiagiadakis@collabora.com>2017-08-01 14:42:53 +0300
commit3089d142b015d9c8fcf0c9b25ce0c6c705f4be7d (patch)
treea207b4cbeb3ef7815676fd633b0e81109f0c8fb0
parentb89c94b37eda4e9c093b159d6536bbef2ef30e27 (diff)
downloadgstreamer-plugins-bad-3089d142b015d9c8fcf0c9b25ce0c6c705f4be7d.tar.gz
ipcpipeline: introduce new plugin for inter-process pipelines
These elements allow splitting a pipeline across several processes, with communication done by the ipcpipelinesink and ipcpipelinesrc elements. The main use case is to split a playback pipeline into a process that runs networking, parser & demuxer and another process that runs the decoder & sink, for security reasons. https://bugzilla.gnome.org/show_bug.cgi?id=752214
-rw-r--r--configure.ac2
-rw-r--r--gst/ipcpipeline/Makefile.am28
-rw-r--r--gst/ipcpipeline/gstipcpipeline.c48
-rw-r--r--gst/ipcpipeline/gstipcpipelinecomm.c2341
-rw-r--r--gst/ipcpipeline/gstipcpipelinecomm.h132
-rw-r--r--gst/ipcpipeline/gstipcpipelinesink.c722
-rw-r--r--gst/ipcpipeline/gstipcpipelinesink.h67
-rw-r--r--gst/ipcpipeline/gstipcpipelinesrc.c954
-rw-r--r--gst/ipcpipeline/gstipcpipelinesrc.h76
-rw-r--r--gst/ipcpipeline/gstipcslavepipeline.c122
-rw-r--r--gst/ipcpipeline/gstipcslavepipeline.h59
-rw-r--r--gst/ipcpipeline/protocol.txt92
12 files changed, 4643 insertions, 0 deletions
diff --git a/configure.ac b/configure.ac
index 5bd76a0dd..42d54c9d8 100644
--- a/configure.ac
+++ b/configure.ac
@@ -469,6 +469,7 @@ AG_GST_CHECK_PLUGIN(gdp)
AG_GST_CHECK_PLUGIN(id3tag)
AG_GST_CHECK_PLUGIN(inter)
AG_GST_CHECK_PLUGIN(interlace)
+AG_GST_CHECK_PLUGIN(ipcpipeline)
AG_GST_CHECK_PLUGIN(ivfparse)
AG_GST_CHECK_PLUGIN(ivtc)
AG_GST_CHECK_PLUGIN(jp2kdecimator)
@@ -3587,6 +3588,7 @@ gst/gdp/Makefile
gst/id3tag/Makefile
gst/inter/Makefile
gst/interlace/Makefile
+gst/ipcpipeline/Makefile
gst/ivfparse/Makefile
gst/ivtc/Makefile
gst/jp2kdecimator/Makefile
diff --git a/gst/ipcpipeline/Makefile.am b/gst/ipcpipeline/Makefile.am
new file mode 100644
index 000000000..12bbed14b
--- /dev/null
+++ b/gst/ipcpipeline/Makefile.am
@@ -0,0 +1,28 @@
+plugin_LTLIBRARIES = libgstipcpipeline.la
+
+libgstipcpipeline_la_SOURCES = \
+ gstipcpipeline.c \
+ gstipcpipelinecomm.c \
+ gstipcpipelinesink.c \
+ gstipcpipelinesrc.c \
+ gstipcslavepipeline.c
+
+noinst_HEADERS = \
+ gstipcpipelinecomm.h \
+ gstipcpipelinesink.h \
+ gstipcpipelinesrc.h \
+ gstipcslavepipeline.h
+
+libgstipcpipeline_la_CFLAGS = \
+ $(GST_PLUGINS_BAD_CFLAGS) \
+ $(GST_PLUGINS_BASE_CFLAGS) \
+ $(GST_BASE_CFLAGS) \
+ $(GST_CFLAGS)
+
+libgstipcpipeline_la_LIBADD = \
+ $(GST_PLUGINS_BASE_LIBS) \
+ $(GST_BASE_LIBS) \
+ $(GST_LIBS) \
+ $(LIBM)
+
+libgstipcpipeline_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
diff --git a/gst/ipcpipeline/gstipcpipeline.c b/gst/ipcpipeline/gstipcpipeline.c
new file mode 100644
index 000000000..4d647d49d
--- /dev/null
+++ b/gst/ipcpipeline/gstipcpipeline.c
@@ -0,0 +1,48 @@
+/* GStreamer
+ * Copyright (C) 2017 YouView TV Ltd
+ * Author: George Kiagiadakis <george.Kiagiadakis@collabora.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+ * Boston, MA 02110-1335, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstipcpipelinecomm.h"
+#include "gstipcpipelinesink.h"
+#include "gstipcpipelinesrc.h"
+#include "gstipcslavepipeline.h"
+
+static gboolean
+plugin_init (GstPlugin * plugin)
+{
+ gst_ipc_pipeline_comm_plugin_init ();
+ gst_element_register (plugin, "ipcpipelinesrc", GST_RANK_NONE,
+ GST_TYPE_IPC_PIPELINE_SRC);
+ gst_element_register (plugin, "ipcpipelinesink", GST_RANK_NONE,
+ GST_TYPE_IPC_PIPELINE_SINK);
+ gst_element_register (plugin, "ipcslavepipeline", GST_RANK_NONE,
+ GST_TYPE_IPC_SLAVE_PIPELINE);
+
+ return TRUE;
+}
+
+GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
+ GST_VERSION_MINOR,
+ ipcpipeline,
+ "plugin for inter-process pipeline communication",
+ plugin_init, VERSION, "LGPL", PACKAGE_NAME, GST_PACKAGE_ORIGIN)
diff --git a/gst/ipcpipeline/gstipcpipelinecomm.c b/gst/ipcpipeline/gstipcpipelinecomm.c
new file mode 100644
index 000000000..eee8e528f
--- /dev/null
+++ b/gst/ipcpipeline/gstipcpipelinecomm.c
@@ -0,0 +1,2341 @@
+/* GStreamer
+ * Copyright (C) 2015-2017 YouView TV Ltd
+ * Author: Vincent Penquerch <vincent.penquerch@collabora.co.uk>
+ *
+ * gstipcpipelinecomm.c:
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <stdlib.h>
+#include <gst/base/gstbytewriter.h>
+#include <gst/gstprotection.h>
+#include "gstipcpipelinecomm.h"
+
+GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_comm_debug);
+#define GST_CAT_DEFAULT gst_ipc_pipeline_comm_debug
+
+#define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND)
+
+GQuark QUARK_ID;
+
+typedef enum
+{
+ ACK_TYPE_NONE,
+ ACK_TYPE_TIMED,
+ ACK_TYPE_BLOCKING
+} AckType;
+
+typedef enum
+{
+ COMM_REQUEST_TYPE_BUFFER,
+ COMM_REQUEST_TYPE_EVENT,
+ COMM_REQUEST_TYPE_QUERY,
+ COMM_REQUEST_TYPE_STATE_CHANGE,
+ COMM_REQUEST_TYPE_MESSAGE,
+} CommRequestType;
+
+typedef struct
+{
+ guint32 id;
+ gboolean replied;
+ gboolean comm_error;
+ guint32 ret;
+ GstQuery *query;
+ CommRequestType type;
+ GCond cond;
+} CommRequest;
+
+static const gchar *comm_request_ret_get_name (CommRequestType type,
+ guint32 ret);
+static guint32 comm_request_ret_get_failure_value (CommRequestType type);
+
+static CommRequest *
+comm_request_new (guint32 id, CommRequestType type, GstQuery * query)
+{
+ CommRequest *req;
+
+ req = g_malloc (sizeof (CommRequest));
+ req->id = id;
+ g_cond_init (&req->cond);
+ req->replied = FALSE;
+ req->comm_error = FALSE;
+ req->query = query;
+ req->ret = comm_request_ret_get_failure_value (type);
+ req->type = type;
+
+ return req;
+}
+
+static guint32
+comm_request_wait (GstIpcPipelineComm * comm, CommRequest * req,
+ AckType ack_type)
+{
+ guint32 ret = comm_request_ret_get_failure_value (req->type);
+ guint64 end_time;
+
+ if (ack_type == ACK_TYPE_TIMED)
+ end_time = g_get_monotonic_time () + comm->ack_time;
+ else
+ end_time = G_MAXUINT64;
+
+ GST_TRACE_OBJECT (comm->element, "Waiting for ACK/NAK for request %u",
+ req->id);
+ while (!req->replied) {
+ if (ack_type == ACK_TYPE_TIMED) {
+ g_cond_wait_until (&req->cond, &comm->mutex, end_time);
+ if (g_get_monotonic_time () >= end_time)
+ break;
+ } else
+ g_cond_wait (&req->cond, &comm->mutex);
+ }
+
+ if (req->replied) {
+ ret = req->ret;
+ GST_TRACE_OBJECT (comm->element, "Got reply for request %u: %d (%s)",
+ req->id, ret, comm_request_ret_get_name (req->type, ret));
+ } else {
+ req->comm_error = TRUE;
+ GST_ERROR_OBJECT (comm->element, "Timeout waiting for reply for request %u",
+ req->id);
+ }
+
+ return ret;
+}
+
+static void
+comm_request_free (CommRequest * req)
+{
+ g_cond_clear (&req->cond);
+ g_free (req);
+}
+
+static const gchar *
+comm_request_ret_get_name (CommRequestType type, guint32 ret)
+{
+ switch (type) {
+ case COMM_REQUEST_TYPE_BUFFER:
+ return gst_flow_get_name (ret);
+ case COMM_REQUEST_TYPE_EVENT:
+ case COMM_REQUEST_TYPE_QUERY:
+ case COMM_REQUEST_TYPE_MESSAGE:
+ return ret ? "TRUE" : "FALSE";
+ case COMM_REQUEST_TYPE_STATE_CHANGE:
+ return gst_element_state_change_return_get_name (ret);
+ default:
+ g_assert_not_reached ();
+ }
+}
+
+static guint32
+comm_request_ret_get_failure_value (CommRequestType type)
+{
+ switch (type) {
+ case COMM_REQUEST_TYPE_BUFFER:
+ return GST_FLOW_COMM_ERROR;
+ case COMM_REQUEST_TYPE_EVENT:
+ case COMM_REQUEST_TYPE_MESSAGE:
+ case COMM_REQUEST_TYPE_QUERY:
+ return FALSE;
+ case COMM_REQUEST_TYPE_STATE_CHANGE:
+ return GST_STATE_CHANGE_FAILURE;
+ default:
+ g_assert_not_reached ();
+ }
+}
+
+static const gchar *
+gst_ipc_pipeline_comm_data_type_get_name (GstIpcPipelineCommDataType type)
+{
+ switch (type) {
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
+ return "ACK";
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
+ return "QUERY_RESULT";
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
+ return "BUFFER";
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
+ return "EVENT";
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
+ return "SINK_MESSAGE_EVENT";
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
+ return "QUERY";
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
+ return "STATE_CHANGE";
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
+ return "STATE_LOST";
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
+ return "MESSAGE";
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
+ return "GERROR_MESSAGE";
+ default:
+ return "UNKNOWN";
+ }
+}
+
+static gboolean
+gst_ipc_pipeline_comm_sync_fd (GstIpcPipelineComm * comm, guint32 id,
+ GstQuery * query, guint32 * ret, AckType ack_type, CommRequestType type)
+{
+ CommRequest *req;
+ gboolean comm_error;
+ GHashTable *waiting_ids;
+
+ if (ack_type == ACK_TYPE_NONE)
+ return TRUE;
+
+ req = comm_request_new (id, type, query);
+ waiting_ids = g_hash_table_ref (comm->waiting_ids);
+ g_hash_table_insert (waiting_ids, GINT_TO_POINTER (id), req);
+ *ret = comm_request_wait (comm, req, ack_type);
+ comm_error = req->comm_error;
+ g_hash_table_remove (waiting_ids, GINT_TO_POINTER (id));
+ g_hash_table_unref (waiting_ids);
+ return !comm_error;
+}
+
+static gboolean
+write_to_fd_raw (GstIpcPipelineComm * comm, const void *data, size_t size)
+{
+ size_t offset;
+ gboolean ret = TRUE;
+
+ offset = 0;
+ GST_TRACE_OBJECT (comm->element, "Writing %zu bytes to fdout", size);
+ while (size) {
+ ssize_t written =
+ write (comm->fdout, (const unsigned char *) data + offset, size);
+ if (written < 0) {
+ if (errno == EAGAIN || errno == EINTR)
+ continue;
+ GST_ERROR_OBJECT (comm->element, "Failed to write to fd: %s",
+ strerror (errno));
+ ret = FALSE;
+ goto done;
+ }
+ size -= written;
+ offset += written;
+ }
+
+done:
+ return ret;
+}
+
+static gboolean
+write_byte_writer_to_fd (GstIpcPipelineComm * comm, GstByteWriter * bw)
+{
+ guint8 *data;
+ gboolean ret;
+ guint size;
+
+ size = gst_byte_writer_get_size (bw);
+ data = gst_byte_writer_reset_and_get_data (bw);
+ if (!data)
+ return FALSE;
+ ret = write_to_fd_raw (comm, data, size);
+ g_free (data);
+ return ret;
+}
+
+static void
+gst_ipc_pipeline_comm_write_ack_to_fd (GstIpcPipelineComm * comm, guint32 id,
+ guint32 ret, CommRequestType type)
+{
+ const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK;
+ guint32 size;
+ GstByteWriter bw;
+
+ g_mutex_lock (&comm->mutex);
+
+ GST_TRACE_OBJECT (comm->element, "Writing ACK for %u: %s (%d)", id,
+ comm_request_ret_get_name (type, ret), ret);
+ gst_byte_writer_init (&bw);
+ if (!gst_byte_writer_put_uint8 (&bw, payload_type))
+ goto write_failed;
+ if (!gst_byte_writer_put_uint32_le (&bw, id))
+ goto write_failed;
+ size = sizeof (ret);
+ if (!gst_byte_writer_put_uint32_le (&bw, size))
+ goto write_failed;
+ if (!gst_byte_writer_put_uint32_le (&bw, ret))
+ goto write_failed;
+
+ if (!write_byte_writer_to_fd (comm, &bw))
+ goto write_failed;
+
+done:
+ g_mutex_unlock (&comm->mutex);
+ gst_byte_writer_reset (&bw);
+ return;
+
+write_failed:
+ GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
+ ("Failed to write to socket"));
+ goto done;
+}
+
+void
+gst_ipc_pipeline_comm_write_flow_ack_to_fd (GstIpcPipelineComm * comm,
+ guint32 id, GstFlowReturn ret)
+{
+ gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
+ COMM_REQUEST_TYPE_BUFFER);
+}
+
+void
+gst_ipc_pipeline_comm_write_boolean_ack_to_fd (GstIpcPipelineComm * comm,
+ guint32 id, gboolean ret)
+{
+ gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
+ COMM_REQUEST_TYPE_EVENT);
+}
+
+void
+gst_ipc_pipeline_comm_write_state_change_ack_to_fd (GstIpcPipelineComm * comm,
+ guint32 id, GstStateChangeReturn ret)
+{
+ gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
+ COMM_REQUEST_TYPE_STATE_CHANGE);
+}
+
+void
+gst_ipc_pipeline_comm_write_query_result_to_fd (GstIpcPipelineComm * comm,
+ guint32 id, gboolean result, GstQuery * query)
+{
+ const unsigned char payload_type =
+ GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT;
+ guint8 result8 = result;
+ guint32 size;
+ size_t len;
+ char *str = NULL;
+ guint32 type;
+ const GstStructure *structure;
+ GstByteWriter bw;
+
+ g_mutex_lock (&comm->mutex);
+
+ GST_TRACE_OBJECT (comm->element,
+ "Writing query result for %u: %d, %" GST_PTR_FORMAT, id, result, query);
+ gst_byte_writer_init (&bw);
+ if (!gst_byte_writer_put_uint8 (&bw, payload_type))
+ goto write_failed;
+ if (!gst_byte_writer_put_uint32_le (&bw, id))
+ goto write_failed;
+ structure = gst_query_get_structure (query);
+ if (structure) {
+ str = gst_structure_to_string (structure);
+ len = strlen (str);
+ } else {
+ str = NULL;
+ len = 0;
+ }
+ size = 1 + sizeof (guint32) + len + 1;
+ if (!gst_byte_writer_put_uint32_le (&bw, size))
+ goto write_failed;
+ if (!gst_byte_writer_put_uint8 (&bw, result8))
+ goto write_failed;
+ type = GST_QUERY_TYPE (query);
+ if (!gst_byte_writer_put_uint32_le (&bw, type))
+ goto write_failed;
+ if (str) {
+ if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, len + 1))
+ goto write_failed;
+ } else {
+ if (!gst_byte_writer_put_uint8 (&bw, 0))
+ goto write_failed;
+ }
+
+ if (!write_byte_writer_to_fd (comm, &bw))
+ goto write_failed;
+
+done:
+ g_mutex_unlock (&comm->mutex);
+ gst_byte_writer_reset (&bw);
+ g_free (str);
+ return;
+
+write_failed:
+ GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
+ ("Failed to write to socket"));
+ goto done;
+}
+
+static gboolean
+gst_ipc_pipeline_comm_read_query_result (GstIpcPipelineComm * comm,
+ guint32 size, GstQuery ** query)
+{
+ gchar *end = NULL;
+ GstStructure *structure;
+ guint8 result;
+ guint32 type;
+ const guint8 *payload = NULL;
+ guint32 mapped_size = size;
+
+ /* this should not be called if we don't have enough yet */
+ *query = NULL;
+ g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE);
+ g_return_val_if_fail (size >= 1 + sizeof (guint32), FALSE);
+
+ payload = gst_adapter_map (comm->adapter, mapped_size);
+ if (!payload)
+ return FALSE;
+ result = *payload++;
+ memcpy (&type, payload, sizeof (type));
+ payload += sizeof (type);
+
+ size -= 1 + sizeof (guint32);
+ if (size == 0)
+ goto done;
+
+ if (payload[size - 1]) {
+ result = FALSE;
+ goto done;
+ }
+ if (*payload) {
+ structure = gst_structure_from_string ((const char *) payload, &end);
+ } else {
+ structure = NULL;
+ }
+ if (!structure) {
+ result = FALSE;
+ goto done;
+ }
+
+ *query = gst_query_new_custom (type, structure);
+
+done:
+ gst_adapter_unmap (comm->adapter);
+ gst_adapter_flush (comm->adapter, mapped_size);
+ return result;
+}
+
+typedef struct
+{
+ guint32 bytes;
+
+ guint64 size;
+ guint32 flags;
+ guint64 api;
+ char *str;
+} MetaBuildInfo;
+
+typedef struct
+{
+ GstIpcPipelineComm *comm;
+ guint32 n_meta;
+ guint32 total_bytes;
+ MetaBuildInfo *info;
+} MetaListRepresentation;
+
+static gboolean
+build_meta (GstBuffer * buffer, GstMeta ** meta, gpointer user_data)
+{
+ MetaListRepresentation *repr = user_data;
+
+ repr->n_meta++;
+ repr->info = g_realloc (repr->info, repr->n_meta * sizeof (MetaBuildInfo));
+ repr->info[repr->n_meta - 1].bytes =
+ /* 4 byte bytes */
+ 4
+ /* 4 byte GstMetaFlags */
+ + 4
+ /* GstMetaInfo::api */
+ + 4 + strlen (g_type_name ((*meta)->info->api)) + 1
+ /* GstMetaInfo::size */
+ + 8
+ /* str length */
+ + 4;
+
+ repr->info[repr->n_meta - 1].flags = (*meta)->flags;
+ repr->info[repr->n_meta - 1].api = (*meta)->info->api;
+ repr->info[repr->n_meta - 1].size = (*meta)->info->size;
+ repr->info[repr->n_meta - 1].str = NULL;
+
+ /* GstMeta is a base class, and actual useful classes are all different...
+ So we list a few of them we know we want and ignore the open ended rest */
+ if ((*meta)->info->api == GST_PROTECTION_META_API_TYPE) {
+ GstProtectionMeta *m = (GstProtectionMeta *) * meta;
+ repr->info[repr->n_meta - 1].str = gst_structure_to_string (m->info);
+ repr->info[repr->n_meta - 1].bytes +=
+ strlen (repr->info[repr->n_meta - 1].str) + 1;
+ GST_TRACE_OBJECT (repr->comm->element, "Found GstMeta type %s: %s",
+ g_type_name ((*meta)->info->api), repr->info[repr->n_meta - 1].str);
+ } else {
+ GST_WARNING_OBJECT (repr->comm->element, "Ignoring GstMeta type %s",
+ g_type_name ((*meta)->info->api));
+ }
+ repr->total_bytes += repr->info[repr->n_meta - 1].bytes;
+ return TRUE;
+}
+
+typedef struct
+{
+ guint64 pts;
+ guint64 dts;
+ guint64 duration;
+ guint64 offset;
+ guint64 offset_end;
+ guint64 flags;
+} CommBufferMetadata;
+
+GstFlowReturn
+gst_ipc_pipeline_comm_write_buffer_to_fd (GstIpcPipelineComm * comm,
+ GstBuffer * buffer)
+{
+ const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER;
+ GstMapInfo map;
+ guint32 ret32 = GST_FLOW_OK;
+ guint32 size, n;
+ CommBufferMetadata meta;
+ GstFlowReturn ret;
+ MetaListRepresentation repr = { comm, 0, 4, NULL }; /* starts a 4 for n_meta */
+ GstByteWriter bw;
+
+ g_mutex_lock (&comm->mutex);
+ ++comm->send_id;
+
+ GST_TRACE_OBJECT (comm->element, "Writing buffer %u: %" GST_PTR_FORMAT,
+ comm->send_id, buffer);
+
+ gst_byte_writer_init (&bw);
+
+ meta.pts = GST_BUFFER_PTS (buffer);
+ meta.dts = GST_BUFFER_DTS (buffer);
+ meta.duration = GST_BUFFER_DURATION (buffer);
+ meta.offset = GST_BUFFER_OFFSET (buffer);
+ meta.offset_end = GST_BUFFER_OFFSET_END (buffer);
+ meta.flags = GST_BUFFER_FLAGS (buffer);
+
+ /* work out meta size */
+ gst_buffer_foreach_meta (buffer, build_meta, &repr);
+
+ if (!gst_byte_writer_put_uint8 (&bw, payload_type))
+ goto write_failed;
+ if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
+ goto write_failed;
+ size =
+ gst_buffer_get_size (buffer) + sizeof (guint32) +
+ sizeof (CommBufferMetadata) + repr.total_bytes;
+ if (!gst_byte_writer_put_uint32_le (&bw, size))
+ goto write_failed;
+ if (!gst_byte_writer_put_data (&bw, (const guint8 *) &meta, sizeof (meta)))
+ goto write_failed;
+ size = gst_buffer_get_size (buffer);
+ if (!gst_byte_writer_put_uint32_le (&bw, size))
+ goto write_failed;
+ if (!write_byte_writer_to_fd (comm, &bw))
+ goto write_failed;
+
+ if (!gst_buffer_map (buffer, &map, GST_MAP_READ))
+ goto map_failed;
+ ret = write_to_fd_raw (comm, map.data, map.size);
+ gst_buffer_unmap (buffer, &map);
+ if (!ret)
+ goto write_failed;
+
+ /* meta */
+ gst_byte_writer_init (&bw);
+ if (!gst_byte_writer_put_uint32_le (&bw, repr.n_meta))
+ goto write_failed;
+ for (n = 0; n < repr.n_meta; ++n) {
+ const MetaBuildInfo *info = repr.info + n;
+ guint32 len;
+ const char *s;
+
+ if (!gst_byte_writer_put_uint32_le (&bw, info->bytes))
+ goto write_failed;
+
+ if (!gst_byte_writer_put_uint32_le (&bw, info->flags))
+ goto write_failed;
+
+ s = g_type_name (info->api);
+ len = strlen (s) + 1;
+ if (!gst_byte_writer_put_uint32_le (&bw, len))
+ goto write_failed;
+ if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len))
+ goto write_failed;
+
+ if (!gst_byte_writer_put_uint64_le (&bw, info->size))
+ goto write_failed;
+
+ s = info->str;
+ len = s ? (strlen (s) + 1) : 0;
+ if (!gst_byte_writer_put_uint32_le (&bw, len))
+ goto write_failed;
+ if (len)
+ if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len))
+ goto write_failed;
+ }
+
+ if (!write_byte_writer_to_fd (comm, &bw))
+ goto write_failed;
+
+ if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
+ ACK_TYPE_BLOCKING, COMM_REQUEST_TYPE_BUFFER))
+ goto wait_failed;
+ ret = ret32;
+
+done:
+ g_mutex_unlock (&comm->mutex);
+ gst_byte_writer_reset (&bw);
+ for (n = 0; n < repr.n_meta; ++n)
+ g_free (repr.info[n].str);
+ g_free (repr.info);
+ return ret;
+
+write_failed:
+ GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
+ ("Failed to write to socket"));
+ ret = GST_FLOW_COMM_ERROR;
+ goto done;
+
+wait_failed:
+ GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
+ ("Failed to wait for reply on socket"));
+ ret = GST_FLOW_COMM_ERROR;
+ goto done;
+
+map_failed:
+ GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL),
+ ("Failed to map buffer"));
+ ret = GST_FLOW_ERROR;
+ goto done;
+}
+
+static GstBuffer *
+gst_ipc_pipeline_comm_read_buffer (GstIpcPipelineComm * comm, guint32 size)
+{
+ GstBuffer *buffer;
+ CommBufferMetadata meta;
+ guint32 n_meta, n;
+ const guint8 *payload = NULL;
+ guint32 mapped_size, buffer_data_size;
+
+ /* this should not be called if we don't have enough yet */
+ g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
+ g_return_val_if_fail (size >= sizeof (CommBufferMetadata), NULL);
+
+ mapped_size = sizeof (CommBufferMetadata) + sizeof (buffer_data_size);
+ payload = gst_adapter_map (comm->adapter, mapped_size);
+ if (!payload)
+ return NULL;
+ memcpy (&meta, payload, sizeof (CommBufferMetadata));
+ payload += sizeof (CommBufferMetadata);
+ memcpy (&buffer_data_size, payload, sizeof (buffer_data_size));
+ size -= mapped_size;
+ gst_adapter_unmap (comm->adapter);
+ gst_adapter_flush (comm->adapter, mapped_size);
+
+ if (buffer_data_size == 0) {
+ buffer = gst_buffer_new ();
+ } else {
+ buffer = gst_adapter_get_buffer (comm->adapter, buffer_data_size);
+ gst_adapter_flush (comm->adapter, buffer_data_size);
+ }
+ size -= buffer_data_size;
+
+ GST_BUFFER_PTS (buffer) = meta.pts;
+ GST_BUFFER_DTS (buffer) = meta.dts;
+ GST_BUFFER_DURATION (buffer) = meta.duration;
+ GST_BUFFER_OFFSET (buffer) = meta.offset;
+ GST_BUFFER_OFFSET_END (buffer) = meta.offset_end;
+ GST_BUFFER_FLAGS (buffer) = meta.flags;
+
+ /* If you don't call that, the GType isn't yet known at the
+ g_type_from_name below */
+ gst_protection_meta_get_info ();
+
+ mapped_size = size;
+ payload = gst_adapter_map (comm->adapter, mapped_size);
+ if (!payload) {
+ gst_buffer_unref (buffer);
+ return NULL;
+ }
+ memcpy (&n_meta, payload, sizeof (n_meta));
+ payload += sizeof (n_meta);
+
+ for (n = 0; n < n_meta; ++n) {
+ guint32 flags, len, bytes;
+ guint64 msize;
+ GType api;
+ GstMeta *meta;
+ GstStructure *structure = NULL;
+
+ memcpy (&bytes, payload, sizeof (bytes));
+ payload += sizeof (bytes);
+
+#define READ_FIELD(f) do { \
+ memcpy (&f, payload, sizeof (f)); \
+ payload += sizeof(f); \
+ } while(0)
+
+ READ_FIELD (flags);
+ READ_FIELD (len);
+ api = g_type_from_name ((const char *) payload);
+ payload = (const guint8 *) strchr ((const char *) payload, 0) + 1;
+ READ_FIELD (msize);
+ READ_FIELD (len);
+ if (len) {
+ structure = gst_structure_new_from_string ((const char *) payload);
+ payload += len + 1;
+ }
+
+ /* Seems we can add a meta from the api nor type ? */
+ if (api == GST_PROTECTION_META_API_TYPE) {
+ meta =
+ gst_buffer_add_meta (buffer, gst_protection_meta_get_info (), NULL);
+ ((GstProtectionMeta *) meta)->info = structure;
+ } else {
+ GST_WARNING_OBJECT (comm->element, "Unsupported meta: %s",
+ g_type_name (api));
+ }
+
+#undef READ_FIELD
+
+ }
+
+ gst_adapter_unmap (comm->adapter);
+ gst_adapter_flush (comm->adapter, mapped_size);
+
+ return buffer;
+}
+
+static gboolean
+gst_ipc_pipeline_comm_write_sink_message_event_to_fd (GstIpcPipelineComm * comm,
+ GstEvent * event)
+{
+ const unsigned char payload_type =
+ GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT;
+ gboolean ret;
+ guint32 type, size, eseqnum, mseqnum, ret32 = TRUE, slen, structure_slen;
+ char *str = NULL;
+ const GstStructure *structure;
+ GstMessage *message = NULL;
+ const char *name;
+ GstByteWriter bw;
+
+ g_return_val_if_fail (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE,
+ FALSE);
+
+ g_mutex_lock (&comm->mutex);
+ ++comm->send_id;
+
+ GST_TRACE_OBJECT (comm->element,
+ "Writing sink message event %u: %" GST_PTR_FORMAT, comm->send_id, event);
+
+ gst_byte_writer_init (&bw);
+ if (!gst_byte_writer_put_uint8 (&bw, payload_type))
+ goto write_failed;
+ if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
+ goto write_failed;
+ name = gst_structure_get_name (gst_event_get_structure (event));
+ slen = strlen (name) + 1;
+ gst_event_parse_sink_message (event, &message);
+ structure = gst_message_get_structure (message);
+ if (structure) {
+ str = gst_structure_to_string (structure);
+ structure_slen = strlen (str);
+ } else {
+ str = NULL;
+ structure_slen = 0;
+ }
+ size = sizeof (type) + sizeof (eseqnum) + sizeof (mseqnum) + sizeof (slen) +
+ strlen (name) + 1 + structure_slen + 1;
+ if (!gst_byte_writer_put_uint32_le (&bw, size))
+ goto write_failed;
+
+ type = GST_MESSAGE_TYPE (message);
+ if (!gst_byte_writer_put_uint32_le (&bw, type))
+ goto write_failed;
+ size -= sizeof (type);
+
+ eseqnum = GST_EVENT_SEQNUM (event);
+ if (!gst_byte_writer_put_uint32_le (&bw, eseqnum))
+ goto write_failed;
+ size -= sizeof (eseqnum);
+
+ mseqnum = GST_MESSAGE_SEQNUM (message);
+ if (!gst_byte_writer_put_uint32_le (&bw, mseqnum))
+ goto write_failed;
+ size -= sizeof (mseqnum);
+
+ if (!gst_byte_writer_put_uint32_le (&bw, slen))
+ goto write_failed;
+ size -= sizeof (slen);
+
+ if (!gst_byte_writer_put_data (&bw, (const guint8 *) name, slen))
+ goto write_failed;
+ size -= slen;
+
+ if (str) {
+ if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size))
+ goto write_failed;
+ } else {
+ if (!gst_byte_writer_put_uint8 (&bw, 0))
+ goto write_failed;
+ }
+
+ if (!write_byte_writer_to_fd (comm, &bw))
+ goto write_failed;
+
+ if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
+ GST_EVENT_IS_SERIALIZED (event) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED,
+ COMM_REQUEST_TYPE_EVENT))
+ goto write_failed;
+
+ ret = ret32;
+
+done:
+ g_mutex_unlock (&comm->mutex);
+ gst_byte_writer_reset (&bw);
+ g_free (str);
+ if (message)
+ gst_message_unref (message);
+ return ret;
+
+write_failed:
+ GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
+ ("Failed to write to socket"));
+ ret = FALSE;
+ goto done;
+}
+
+static GstEvent *
+gst_ipc_pipeline_comm_read_sink_message_event (GstIpcPipelineComm * comm,
+ guint32 size)
+{
+ GstMessage *message;
+ GstEvent *event = NULL;
+ gchar *end = NULL;
+ GstStructure *structure;
+ guint32 type, eseqnum, mseqnum, slen;
+ const char *name;
+ guint32 mapped_size = size;
+ const guint8 *payload;
+
+ /* this should not be called if we don't have enough yet */
+ g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
+ g_return_val_if_fail (size >= sizeof (type) + sizeof (slen), NULL);
+
+ payload = gst_adapter_map (comm->adapter, mapped_size);
+ if (!payload)
+ return NULL;
+ memcpy (&type, payload, sizeof (type));
+ payload += sizeof (type);
+ size -= sizeof (type);
+ if (size == 0)
+ goto done;
+
+ memcpy (&eseqnum, payload, sizeof (eseqnum));
+ payload += sizeof (eseqnum);
+ size -= sizeof (eseqnum);
+ if (size == 0)
+ goto done;
+
+ memcpy (&mseqnum, payload, sizeof (mseqnum));
+ payload += sizeof (mseqnum);
+ size -= sizeof (mseqnum);
+ if (size == 0)
+ goto done;
+
+ memcpy (&slen, payload, sizeof (slen));
+ payload += sizeof (slen);
+ size -= sizeof (slen);
+ if (size == 0)
+ goto done;
+
+ if (payload[slen - 1])
+ goto done;
+ name = (const char *) payload;
+ payload += slen;
+ size -= slen;
+
+ if ((payload)[size - 1]) {
+ goto done;
+ }
+ if (*payload) {
+ structure = gst_structure_from_string ((const char *) payload, &end);
+ } else {
+ structure = NULL;
+ }
+
+ message =
+ gst_message_new_custom (type, GST_OBJECT (comm->element), structure);
+ gst_message_set_seqnum (message, mseqnum);
+ event = gst_event_new_sink_message (name, message);
+ gst_event_set_seqnum (event, eseqnum);
+ gst_message_unref (message);
+
+done:
+ gst_adapter_unmap (comm->adapter);
+ gst_adapter_flush (comm->adapter, mapped_size);
+ return event;
+}
+
+gboolean
+gst_ipc_pipeline_comm_write_event_to_fd (GstIpcPipelineComm * comm,
+ gboolean upstream, GstEvent * event)
+{
+ const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT;
+ gboolean ret;
+ guint32 type, size, ret32 = TRUE, seqnum, slen;
+ char *str = NULL;
+ const GstStructure *structure;
+ GstByteWriter bw;
+
+ /* we special case sink-message event as gst can't serialize/de-serialize it */
+ if (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE)
+ return gst_ipc_pipeline_comm_write_sink_message_event_to_fd (comm, event);
+
+ g_mutex_lock (&comm->mutex);
+ ++comm->send_id;
+
+ GST_TRACE_OBJECT (comm->element, "Writing event %u: %" GST_PTR_FORMAT,
+ comm->send_id, event);
+
+ gst_byte_writer_init (&bw);
+ if (!gst_byte_writer_put_uint8 (&bw, payload_type))
+ goto write_failed;
+ if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
+ goto write_failed;
+ structure = gst_event_get_structure (event);
+ if (structure) {
+
+ if (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START) {
+ GstStructure *s = gst_structure_copy (structure);
+ gst_structure_remove_field (s, "stream");
+ str = gst_structure_to_string (s);
+ gst_structure_free (s);
+ } else {
+ str = gst_structure_to_string (structure);
+ }
+
+ slen = strlen (str);
+ } else {
+ str = NULL;
+ slen = 0;
+ }
+ size = sizeof (type) + sizeof (seqnum) + 1 + slen + 1;
+ if (!gst_byte_writer_put_uint32_le (&bw, size))
+ goto write_failed;
+
+ type = GST_EVENT_TYPE (event);
+ if (!gst_byte_writer_put_uint32_le (&bw, type))
+ goto write_failed;
+
+ seqnum = GST_EVENT_SEQNUM (event);
+ if (!gst_byte_writer_put_uint32_le (&bw, seqnum))
+ goto write_failed;
+
+ if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0))
+ goto write_failed;
+
+ if (str) {
+ if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1))
+ goto write_failed;
+ } else {
+ if (!gst_byte_writer_put_uint8 (&bw, 0))
+ goto write_failed;
+ }
+
+ if (!write_byte_writer_to_fd (comm, &bw))
+ goto write_failed;
+
+ /* Upstream events get serialized, this is required to send seeks only
+ * one at a time. */
+ if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
+ (GST_EVENT_IS_SERIALIZED (event) || GST_EVENT_IS_UPSTREAM (event)) ?
+ ACK_TYPE_BLOCKING : ACK_TYPE_NONE, COMM_REQUEST_TYPE_EVENT))
+ goto write_failed;
+ ret = ret32;
+
+done:
+ g_mutex_unlock (&comm->mutex);
+ g_free (str);
+ gst_byte_writer_reset (&bw);
+ return ret;
+
+write_failed:
+ GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
+ ("Failed to write to socket"));
+ ret = FALSE;
+ goto done;
+}
+
+static GstEvent *
+gst_ipc_pipeline_comm_read_event (GstIpcPipelineComm * comm, guint32 size,
+ gboolean * upstream)
+{
+ GstEvent *event = NULL;
+ gchar *end = NULL;
+ GstStructure *structure;
+ guint32 type, seqnum;
+ guint32 mapped_size = size;
+ const guint8 *payload;
+
+ /* this should not be called if we don't have enough yet */
+ g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
+ g_return_val_if_fail (size >= sizeof (type), NULL);
+
+ payload = gst_adapter_map (comm->adapter, mapped_size);
+ if (!payload)
+ return NULL;
+
+ memcpy (&type, payload, sizeof (type));
+ payload += sizeof (type);
+ size -= sizeof (type);
+ if (size == 0)
+ goto done;
+
+ memcpy (&seqnum, payload, sizeof (seqnum));
+ payload += sizeof (seqnum);
+ size -= sizeof (seqnum);
+ if (size == 0)
+ goto done;
+
+ *upstream = (*payload) ? TRUE : FALSE;
+ payload += 1;
+ size -= 1;
+ if (size == 0)
+ goto done;
+
+ if (payload[size - 1])
+ goto done;
+ if (*payload) {
+ structure = gst_structure_from_string ((const char *) payload, &end);
+ } else {
+ structure = NULL;
+ }
+
+ event = gst_event_new_custom (type, structure);
+ gst_event_set_seqnum (event, seqnum);
+
+done:
+ gst_adapter_unmap (comm->adapter);
+ gst_adapter_flush (comm->adapter, mapped_size);
+ return event;
+}
+
+gboolean
+gst_ipc_pipeline_comm_write_query_to_fd (GstIpcPipelineComm * comm,
+ gboolean upstream, GstQuery * query)
+{
+ const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY;
+ gboolean ret;
+ guint32 type, size, ret32 = TRUE, slen;
+ char *str = NULL;
+ const GstStructure *structure;
+ GstByteWriter bw;
+
+ g_mutex_lock (&comm->mutex);
+ ++comm->send_id;
+
+ GST_TRACE_OBJECT (comm->element, "Writing query %u: %" GST_PTR_FORMAT,
+ comm->send_id, query);
+
+ gst_byte_writer_init (&bw);
+ if (!gst_byte_writer_put_uint8 (&bw, payload_type))
+ goto write_failed;
+ if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
+ goto write_failed;
+ structure = gst_query_get_structure (query);
+ if (structure) {
+ str = gst_structure_to_string (structure);
+ slen = strlen (str);
+ } else {
+ str = NULL;
+ slen = 0;
+ }
+ size = sizeof (type) + 1 + slen + 1;
+ if (!gst_byte_writer_put_uint32_le (&bw, size))
+ goto write_failed;
+
+ type = GST_QUERY_TYPE (query);
+ if (!gst_byte_writer_put_uint32_le (&bw, type))
+ goto write_failed;
+
+ if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0))
+ goto write_failed;
+
+ if (str) {
+ if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1))
+ goto write_failed;
+ } else {
+ if (!gst_byte_writer_put_uint8 (&bw, 0))
+ goto write_failed;
+ }
+
+ if (!write_byte_writer_to_fd (comm, &bw))
+ goto write_failed;
+
+ if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, query, &ret32,
+ GST_QUERY_IS_SERIALIZED (query) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED,
+ COMM_REQUEST_TYPE_QUERY))
+ goto write_failed;
+
+ ret = ret32;
+
+done:
+ g_mutex_unlock (&comm->mutex);
+ g_free (str);
+ gst_byte_writer_reset (&bw);
+ return ret;
+
+write_failed:
+ GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
+ ("Failed to write to socket"));
+ ret = FALSE;
+ goto done;
+}
+
+static GstQuery *
+gst_ipc_pipeline_comm_read_query (GstIpcPipelineComm * comm, guint32 size,
+ gboolean * upstream)
+{
+ GstQuery *query = NULL;
+ gchar *end = NULL;
+ GstStructure *structure;
+ guint32 type;
+ guint32 mapped_size = size;
+ const guint8 *payload;
+
+ /* this should not be called if we don't have enough yet */
+ g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
+ g_return_val_if_fail (size >= sizeof (type), NULL);
+
+ payload = gst_adapter_map (comm->adapter, mapped_size);
+ if (!payload)
+ return NULL;
+
+ memcpy (&type, payload, sizeof (type));
+ payload += sizeof (type);
+ size -= sizeof (type);
+ if (size == 0)
+ goto done;
+
+ *upstream = (*payload) ? TRUE : FALSE;
+ payload += 1;
+ size -= 1;
+ if (size == 0)
+ goto done;
+
+ if (payload[size - 1])
+ goto done;
+ if (*payload) {
+ structure = gst_structure_from_string ((const char *) payload, &end);
+ } else {
+ structure = NULL;
+ }
+
+ query = gst_query_new_custom (type, structure);
+
+ /* CAPS queries contain a filter field, of GstCaps type, which can be NULL.
+ This does not play well with the serialization/deserialization system,
+ which will give us a non-NULL GstCaps which has a value of NULL. This
+ in turn wreaks havoc with any code that tests whether filter is NULL
+ (which basically means, am I being given an optional GstCaps ?).
+ So we look for non-NULL GstCaps which have NULL contents, and replace
+ them with NULL instead. */
+ if (GST_QUERY_TYPE (query) == GST_QUERY_CAPS) {
+ GstCaps *filter;
+ gst_query_parse_caps (query, &filter);
+ if (filter
+ && !strcmp (gst_structure_get_name (gst_caps_get_structure (filter, 0)),
+ "NULL")) {
+ gst_query_unref (query);
+ query = gst_query_new_caps (NULL);
+ }
+ }
+
+done:
+ gst_adapter_unmap (comm->adapter);
+ gst_adapter_flush (comm->adapter, mapped_size);
+ return query;
+}
+
+GstStateChangeReturn
+gst_ipc_pipeline_comm_write_state_change_to_fd (GstIpcPipelineComm * comm,
+ GstStateChange transition)
+{
+ const unsigned char payload_type =
+ GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE;
+ GstStateChangeReturn ret;
+ guint32 size, ret32 = GST_STATE_CHANGE_SUCCESS;
+ GstByteWriter bw;
+
+ g_mutex_lock (&comm->mutex);
+ ++comm->send_id;
+
+ GST_TRACE_OBJECT (comm->element, "Writing state change %u: %s -> %s",
+ comm->send_id,
+ gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
+ gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
+
+ gst_byte_writer_init (&bw);
+ if (!gst_byte_writer_put_uint8 (&bw, payload_type))
+ goto write_failed;
+ if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
+ goto write_failed;
+ size = sizeof (transition);
+ if (!gst_byte_writer_put_uint32_le (&bw, size))
+ goto write_failed;
+ if (!gst_byte_writer_put_uint32_le (&bw, transition))
+ goto write_failed;
+
+ if (!write_byte_writer_to_fd (comm, &bw))
+ goto write_failed;
+
+ if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
+ ACK_TYPE_TIMED, COMM_REQUEST_TYPE_STATE_CHANGE))
+ goto write_failed;
+ ret = ret32;
+
+done:
+ g_mutex_unlock (&comm->mutex);
+ gst_byte_writer_reset (&bw);
+ return ret;
+
+write_failed:
+ GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
+ ("Failed to write to socket"));
+ ret = GST_STATE_CHANGE_FAILURE;
+ goto done;
+}
+
+static gboolean
+is_valid_state_change (GstStateChange transition)
+{
+ if (transition == GST_STATE_CHANGE_NULL_TO_READY)
+ return TRUE;
+ if (transition == GST_STATE_CHANGE_READY_TO_PAUSED)
+ return TRUE;
+ if (transition == GST_STATE_CHANGE_PAUSED_TO_PLAYING)
+ return TRUE;
+ if (transition == GST_STATE_CHANGE_PLAYING_TO_PAUSED)
+ return TRUE;
+ if (transition == GST_STATE_CHANGE_PAUSED_TO_READY)
+ return TRUE;
+ if (transition == GST_STATE_CHANGE_READY_TO_NULL)
+ return TRUE;
+ if (GST_STATE_TRANSITION_CURRENT (transition) ==
+ GST_STATE_TRANSITION_NEXT (transition))
+ return TRUE;
+ return FALSE;
+}
+
+static gboolean
+gst_ipc_pipeline_comm_read_state_change (GstIpcPipelineComm * comm,
+ guint32 size, guint32 * transition)
+{
+ guint32 mapped_size = size;
+ const guint8 *payload;
+
+ /* this should not be called if we don't have enough yet */
+ g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE);
+ g_return_val_if_fail (size >= sizeof (*transition), FALSE);
+
+ payload = gst_adapter_map (comm->adapter, size);
+ if (!payload)
+ return FALSE;
+ memcpy (transition, payload, sizeof (*transition));
+ gst_adapter_unmap (comm->adapter);
+ gst_adapter_flush (comm->adapter, mapped_size);
+ return is_valid_state_change (*transition);
+}
+
+void
+gst_ipc_pipeline_comm_write_state_lost_to_fd (GstIpcPipelineComm * comm)
+{
+ const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST;
+ guint32 size;
+ GstByteWriter bw;
+
+ g_mutex_lock (&comm->mutex);
+ ++comm->send_id;
+
+ GST_TRACE_OBJECT (comm->element, "Writing state-lost %u", comm->send_id);
+ gst_byte_writer_init (&bw);
+ if (!gst_byte_writer_put_uint8 (&bw, payload_type))
+ goto write_failed;
+ if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
+ goto write_failed;
+ size = 0;
+ if (!gst_byte_writer_put_uint32_le (&bw, size))
+ goto write_failed;
+
+ if (!write_byte_writer_to_fd (comm, &bw))
+ goto write_failed;
+
+done:
+ g_mutex_unlock (&comm->mutex);
+ gst_byte_writer_reset (&bw);
+ return;
+
+write_failed:
+ GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
+ ("Failed to write to socket"));
+ goto done;
+}
+
+static gboolean
+gst_ipc_pipeline_comm_read_state_lost (GstIpcPipelineComm * comm, guint32 size)
+{
+ /* no payload */
+ return TRUE;
+}
+
+static gboolean
+gst_ipc_pipeline_comm_write_gerror_message_to_fd (GstIpcPipelineComm * comm,
+ GstMessage * message)
+{
+ const unsigned char payload_type =
+ GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE;
+ gboolean ret;
+ guint32 code, size, ret32 = TRUE;
+ char *str = NULL;
+ GError *error;
+ char *extra_message;
+ const char *domain_string;
+ unsigned char msgtype;
+ GstByteWriter bw;
+
+ g_mutex_lock (&comm->mutex);
+ ++comm->send_id;
+
+ if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR) {
+ gst_message_parse_error (message, &error, &extra_message);
+ msgtype = 2;
+ } else if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING) {
+ gst_message_parse_warning (message, &error, &extra_message);
+ msgtype = 1;
+ } else {
+ gst_message_parse_info (message, &error, &extra_message);
+ msgtype = 0;
+ }
+ code = error->code;
+ domain_string = g_quark_to_string (error->domain);
+ GST_TRACE_OBJECT (comm->element,
+ "Writing error %u: domain %s, code %u, message %s, extra message %s",
+ comm->send_id, domain_string, error->code, error->message, extra_message);
+
+ gst_byte_writer_init (&bw);
+ if (!gst_byte_writer_put_uint8 (&bw, payload_type))
+ goto write_failed;
+ if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
+ goto write_failed;
+
+ size = sizeof (size);
+ size += 1;
+ size += strlen (domain_string) + 1;
+ size += sizeof (code);
+ size += sizeof (size);
+ size += error->message ? strlen (error->message) + 1 : 0;
+ size += sizeof (size);
+ size += extra_message ? strlen (extra_message) + 1 : 0;
+
+ if (!gst_byte_writer_put_uint32_le (&bw, size))
+ goto write_failed;
+
+ if (!gst_byte_writer_put_uint8 (&bw, msgtype))
+ goto write_failed;
+ size = strlen (domain_string) + 1;
+ if (!gst_byte_writer_put_uint32_le (&bw, size))
+ goto write_failed;
+ if (!gst_byte_writer_put_data (&bw, (const guint8 *) domain_string, size))
+ goto write_failed;
+ if (!gst_byte_writer_put_uint32_le (&bw, code))
+ goto write_failed;
+ size = error->message ? strlen (error->message) + 1 : 0;
+ if (!gst_byte_writer_put_uint32_le (&bw, size))
+ goto write_failed;
+ if (error->message) {
+ if (!gst_byte_writer_put_data (&bw, (const guint8 *) error->message, size))
+ goto write_failed;
+ }
+ size = extra_message ? strlen (extra_message) + 1 : 0;
+ if (!gst_byte_writer_put_uint32_le (&bw, size))
+ goto write_failed;
+ if (extra_message) {
+ if (!gst_byte_writer_put_data (&bw, (const guint8 *) extra_message, size))
+ goto write_failed;
+ }
+
+ if (!write_byte_writer_to_fd (comm, &bw))
+ goto write_failed;
+
+ if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
+ ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE))
+ goto write_failed;
+
+ ret = ret32;
+
+done:
+ g_mutex_unlock (&comm->mutex);
+ g_free (str);
+ g_error_free (error);
+ g_free (extra_message);
+ gst_byte_writer_reset (&bw);
+ return ret;
+
+write_failed:
+ GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
+ ("Failed to write to socket"));
+ ret = FALSE;
+ goto done;
+}
+
+static GstMessage *
+gst_ipc_pipeline_comm_read_gerror_message (GstIpcPipelineComm * comm,
+ guint32 size)
+{
+ GstMessage *message = NULL;
+ guint32 code;
+ GQuark domain;
+ const char *msg, *extra_message;
+ GError *error;
+ unsigned char msgtype;
+ guint32 mapped_size = size;
+ const guint8 *payload;
+
+ /* this should not be called if we don't have enough yet */
+ g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
+ g_return_val_if_fail (size >= sizeof (code) + sizeof (size) * 3 + 1 + 1,
+ NULL);
+
+ payload = gst_adapter_map (comm->adapter, mapped_size);
+ if (!payload)
+ return NULL;
+ msgtype = *payload++;
+ memcpy (&size, payload, sizeof (size));
+ payload += sizeof (size);
+ if (payload[size - 1])
+ goto done;
+ domain = g_quark_from_string ((const char *) payload);
+ payload += size;
+
+ memcpy (&code, payload, sizeof (code));
+ payload += sizeof (code);
+
+ memcpy (&size, payload, sizeof (size));
+ payload += sizeof (size);
+ if (size) {
+ if (payload[size - 1])
+ goto done;
+ msg = (const char *) payload;
+ } else {
+ msg = NULL;
+ }
+ payload += size;
+
+ memcpy (&size, payload, sizeof (size));
+ payload += sizeof (size);
+ if (size) {
+ if (payload[size - 1])
+ goto done;
+ extra_message = (const char *) payload;
+ } else {
+ extra_message = NULL;
+ }
+ payload += size;
+
+ error = g_error_new (domain, code, "%s", msg);
+ if (msgtype == 2)
+ message =
+ gst_message_new_error (GST_OBJECT (comm->element), error,
+ extra_message);
+ else if (msgtype == 1)
+ message =
+ gst_message_new_warning (GST_OBJECT (comm->element), error,
+ extra_message);
+ else
+ message =
+ gst_message_new_info (GST_OBJECT (comm->element), error, extra_message);
+ g_error_free (error);
+
+done:
+ gst_adapter_unmap (comm->adapter);
+ gst_adapter_flush (comm->adapter, mapped_size);
+
+ return message;
+}
+
+gboolean
+gst_ipc_pipeline_comm_write_message_to_fd (GstIpcPipelineComm * comm,
+ GstMessage * message)
+{
+ const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE;
+ gboolean ret;
+ guint32 type, size, ret32 = TRUE, slen;
+ char *str = NULL;
+ const GstStructure *structure;
+ GstByteWriter bw;
+
+ /* we special case error as gst can't serialize/de-serialize it */
+ if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR
+ || GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING
+ || GST_MESSAGE_TYPE (message) == GST_MESSAGE_INFO)
+ return gst_ipc_pipeline_comm_write_gerror_message_to_fd (comm, message);
+
+ g_mutex_lock (&comm->mutex);
+ ++comm->send_id;
+
+ GST_TRACE_OBJECT (comm->element, "Writing message %u: %" GST_PTR_FORMAT,
+ comm->send_id, message);
+
+ gst_byte_writer_init (&bw);
+ if (!gst_byte_writer_put_uint8 (&bw, payload_type))
+ goto write_failed;
+ if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
+ goto write_failed;
+ structure = gst_message_get_structure (message);
+ if (structure) {
+ str = gst_structure_to_string (structure);
+ slen = strlen (str);
+ } else {
+ str = NULL;
+ slen = 0;
+ }
+ size = sizeof (type) + slen + 1;
+ if (!gst_byte_writer_put_uint32_le (&bw, size))
+ goto write_failed;
+
+ type = GST_MESSAGE_TYPE (message);
+ if (!gst_byte_writer_put_uint32_le (&bw, type))
+ goto write_failed;
+ size -= sizeof (type);
+ if (str) {
+ if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size))
+ goto write_failed;
+ } else {
+ if (!gst_byte_writer_put_uint8 (&bw, 0))
+ goto write_failed;
+ }
+
+ if (!write_byte_writer_to_fd (comm, &bw))
+ goto write_failed;
+
+ if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
+ ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE))
+ goto write_failed;
+
+ ret = ret32;
+
+done:
+ g_mutex_unlock (&comm->mutex);
+ g_free (str);
+ gst_byte_writer_reset (&bw);
+ return ret;
+
+write_failed:
+ GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
+ ("Failed to write to socket"));
+ ret = FALSE;
+ goto done;
+}
+
+static GstMessage *
+gst_ipc_pipeline_comm_read_message (GstIpcPipelineComm * comm, guint32 size)
+{
+ GstMessage *message = NULL;
+ gchar *end = NULL;
+ GstStructure *structure;
+ guint32 type;
+ guint32 mapped_size = size;
+ const guint8 *payload;
+
+ /* this should not be called if we don't have enough yet */
+ g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
+ g_return_val_if_fail (size >= sizeof (type), NULL);
+
+ payload = gst_adapter_map (comm->adapter, mapped_size);
+ if (!payload)
+ return NULL;
+ memcpy (&type, payload, sizeof (type));
+ payload += sizeof (type);
+ size -= sizeof (type);
+ if (size == 0)
+ goto done;
+
+ if (payload[size - 1])
+ goto done;
+ if (*payload) {
+ structure = gst_structure_from_string ((const char *) payload, &end);
+ } else {
+ structure = NULL;
+ }
+
+ message =
+ gst_message_new_custom (type, GST_OBJECT (comm->element), structure);
+
+done:
+ gst_adapter_unmap (comm->adapter);
+ gst_adapter_flush (comm->adapter, mapped_size);
+
+ return message;
+}
+
+void
+gst_ipc_pipeline_comm_init (GstIpcPipelineComm * comm, GstElement * element)
+{
+ g_mutex_init (&comm->mutex);
+ comm->element = element;
+ comm->fdin = comm->fdout = -1;
+ comm->ack_time = DEFAULT_ACK_TIME;
+ comm->waiting_ids =
+ g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
+ (GDestroyNotify) comm_request_free);
+ comm->adapter = gst_adapter_new ();
+ g_atomic_int_set (&comm->thread_running, 0);
+}
+
+void
+gst_ipc_pipeline_comm_clear (GstIpcPipelineComm * comm)
+{
+ g_assert (!g_atomic_int_get (&comm->thread_running));
+ g_hash_table_destroy (comm->waiting_ids);
+ gst_object_unref (comm->adapter);
+ g_mutex_clear (&comm->mutex);
+}
+
+static void
+cancel_request (gpointer key, gpointer value, gpointer user_data,
+ GstFlowReturn fret)
+{
+ GstIpcPipelineComm *comm = (GstIpcPipelineComm *) user_data;
+ guint32 id = GPOINTER_TO_INT (key);
+ CommRequest *req = (CommRequest *) value;
+
+ GST_TRACE_OBJECT (comm->element, "Cancelling request %u, type %d", id,
+ req->type);
+ req->ret = fret;
+ req->replied = TRUE;
+ g_cond_signal (&req->cond);
+}
+
+static void
+cancel_request_error (gpointer key, gpointer value, gpointer user_data)
+{
+ CommRequest *req = (CommRequest *) value;
+ GstFlowReturn fret = comm_request_ret_get_failure_value (req->type);
+
+ cancel_request (key, value, user_data, fret);
+}
+
+void
+gst_ipc_pipeline_comm_cancel (GstIpcPipelineComm * comm, gboolean cleanup)
+{
+ g_mutex_lock (&comm->mutex);
+ g_hash_table_foreach (comm->waiting_ids, cancel_request_error, comm);
+ if (cleanup) {
+ g_hash_table_unref (comm->waiting_ids);
+ comm->waiting_ids =
+ g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
+ (GDestroyNotify) comm_request_free);
+ }
+ g_mutex_unlock (&comm->mutex);
+}
+
+static gboolean
+set_field (GQuark field_id, const GValue * value, gpointer user_data)
+{
+ GstStructure *structure = user_data;
+
+ gst_structure_id_set_value (structure, field_id, value);
+
+ return TRUE;
+}
+
+static gboolean
+gst_ipc_pipeline_comm_reply_request (GstIpcPipelineComm * comm, guint32 id,
+ GstFlowReturn ret, GstQuery * query)
+{
+ CommRequest *req;
+
+ req = g_hash_table_lookup (comm->waiting_ids, GINT_TO_POINTER (id));
+ if (!req) {
+ GST_WARNING_OBJECT (comm->element, "Got reply for unknown request %u", id);
+ return FALSE;
+ }
+
+ GST_TRACE_OBJECT (comm->element, "Got reply %d (%s) for request %u", ret,
+ comm_request_ret_get_name (req->type, ret), req->id);
+ req->replied = TRUE;
+ req->ret = ret;
+ if (query) {
+ if (req->query) {
+ /* We need to update the original query in place, as the caller
+ will expect the object to be the same */
+ GstStructure *structure = gst_query_writable_structure (req->query);
+ gst_structure_remove_all_fields (structure);
+ gst_structure_foreach (gst_query_get_structure (query), set_field,
+ structure);
+ } else {
+ GST_WARNING_OBJECT (comm->element,
+ "Got query reply, but no query was in the request");
+ }
+ }
+ g_cond_signal (&req->cond);
+ return TRUE;
+}
+
+static gboolean
+update_adapter (GstIpcPipelineComm * comm)
+{
+ GstMemory *mem = NULL;
+
+ for (;;) {
+ fd_set set;
+ struct timeval tv;
+ int sret;
+ ssize_t sz;
+ GstBuffer *buf;
+ GstMapInfo map;
+ int fdin = comm->fdin;
+ int fdclose = comm->reader_thread_stopping_pipe[0];
+ int fdmax;
+
+ FD_ZERO (&set);
+ FD_SET (fdclose, &set);
+ fdmax = fdclose;
+ if (fdin >= 0 && GST_ELEMENT_PARENT (comm->element)) {
+ FD_SET (fdin, &set);
+ if (fdin > fdmax)
+ fdmax = fdin;
+ }
+ tv.tv_sec = 0;
+ tv.tv_usec = 100000;
+ sret = select (fdmax + 1, &set, NULL, NULL, &tv);
+ if (sret < 0) {
+ if (errno == EAGAIN)
+ continue;
+ if (errno == EINTR)
+ break;
+ if (mem)
+ gst_memory_unref (mem);
+ return FALSE;
+ }
+ if (FD_ISSET (fdclose, &set)) {
+ GST_INFO_OBJECT (comm->element, "data received on close notify pipe");
+ comm->reader_thread_stopping = TRUE;
+ break;
+ }
+ if (fdin < 0)
+ break;
+ if (!FD_ISSET (fdin, &set))
+ break;
+ if (mem == NULL)
+ mem = gst_allocator_alloc (NULL, comm->read_chunk_size, NULL);
+ gst_memory_map (mem, &map, GST_MAP_WRITE);
+ sz = read (fdin, map.data, map.size);
+ gst_memory_unmap (mem, &map);
+ if (sz < 0) {
+ if (errno == EAGAIN)
+ continue;
+ mem = NULL;
+ if (errno == EINTR)
+ break;
+ gst_memory_unref (mem);
+ return FALSE;
+ }
+ if (sz == 0) {
+ GST_INFO_OBJECT (comm->element, "fd closed");
+ comm->reader_thread_stopping = TRUE;
+ break;
+ }
+ gst_memory_resize (mem, 0, sz);
+ buf = gst_buffer_new ();
+ gst_buffer_append_memory (buf, mem);
+ mem = NULL;
+ GST_TRACE_OBJECT (comm->element, "Read %u bytes from fd", (unsigned) sz);
+ gst_adapter_push (comm->adapter, buf);
+
+ /* If we have more data, we loop, otherwise we break */
+ FD_ZERO (&set);
+ if (fdin >= 0)
+ FD_SET (comm->fdin, &set);
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ sret = select (fdin + 1, &set, NULL, NULL, &tv);
+ if (sret < 0 || !FD_ISSET (fdin, &set))
+ break;
+ }
+ if (mem)
+ gst_memory_unref (mem);
+
+ return TRUE;
+}
+
+static gboolean
+read_many (GstIpcPipelineComm * comm)
+{
+ gboolean ret = TRUE;
+ gsize available;
+ const guint8 *payload;
+
+ while (1)
+ switch (comm->state) {
+ case GST_IPC_PIPELINE_COMM_STATE_TYPE:
+ {
+ guint8 type;
+ guint32 mapped_size;
+
+ available = gst_adapter_available (comm->adapter);
+ mapped_size = 1 + sizeof (gint32) * 2;
+ if (available < mapped_size)
+ goto done;
+
+ payload = gst_adapter_map (comm->adapter, mapped_size);
+ type = *payload++;
+ g_mutex_lock (&comm->mutex);
+ memcpy (&comm->id, payload, sizeof (guint32));
+ memcpy (&comm->payload_length, payload + 4, sizeof (guint32));
+ g_mutex_unlock (&comm->mutex);
+ gst_adapter_unmap (comm->adapter);
+ gst_adapter_flush (comm->adapter, mapped_size);
+ GST_TRACE_OBJECT (comm->element, "Got id %u, type %d, payload %u",
+ comm->id, type, comm->payload_length);
+ switch (type) {
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
+ GST_TRACE_OBJECT (comm->element, "switching to state %s",
+ gst_ipc_pipeline_comm_data_type_get_name (type));
+ comm->state = type;
+ break;
+ default:
+ goto out_of_sync;
+ }
+ break;
+ }
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
+ {
+ const guint8 *rets;
+ guint32 ret32;
+
+ available = gst_adapter_available (comm->adapter);
+ if (available < comm->payload_length)
+ goto done;
+
+ if (available < sizeof (guint32))
+ goto ack_failed;
+
+ rets = gst_adapter_map (comm->adapter, sizeof (guint32));
+ memcpy (&ret32, rets, sizeof (ret32));
+ gst_adapter_unmap (comm->adapter);
+ gst_adapter_flush (comm->adapter, sizeof (guint32));
+ GST_TRACE_OBJECT (comm->element, "Got ACK %s for id %u",
+ gst_flow_get_name (ret32), comm->id);
+
+ g_mutex_lock (&comm->mutex);
+ gst_ipc_pipeline_comm_reply_request (comm, comm->id, ret32, NULL);
+ g_mutex_unlock (&comm->mutex);
+
+ GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
+ comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
+ break;
+ }
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
+ {
+ GstQuery *query = NULL;
+ gboolean qret;
+
+ available = gst_adapter_available (comm->adapter);
+ if (available < comm->payload_length)
+ goto done;
+
+ qret =
+ gst_ipc_pipeline_comm_read_query_result (comm, comm->payload_length,
+ &query);
+
+ GST_TRACE_OBJECT (comm->element,
+ "deserialized query result %p: %d, %" GST_PTR_FORMAT, query, qret,
+ query);
+
+ g_mutex_lock (&comm->mutex);
+ gst_ipc_pipeline_comm_reply_request (comm, comm->id, qret, query);
+ g_mutex_unlock (&comm->mutex);
+
+ gst_query_unref (query);
+
+ GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
+ comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
+ break;
+ }
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
+ {
+ GstBuffer *buf;
+
+ available = gst_adapter_available (comm->adapter);
+ if (available < comm->payload_length)
+ goto done;
+
+ buf = gst_ipc_pipeline_comm_read_buffer (comm, comm->payload_length);
+ if (!buf)
+ goto buffer_failed;
+
+ /* set caps and push */
+ GST_TRACE_OBJECT (comm->element,
+ "deserialized buffer %p, pushing, timestamp %" GST_TIME_FORMAT
+ ", duration %" GST_TIME_FORMAT ", offset %" G_GINT64_FORMAT
+ ", offset_end %" G_GINT64_FORMAT ", size %" G_GSIZE_FORMAT
+ ", flags 0x%x", buf, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
+ GST_TIME_ARGS (GST_BUFFER_DURATION (buf)), GST_BUFFER_OFFSET (buf),
+ GST_BUFFER_OFFSET_END (buf), gst_buffer_get_size (buf),
+ GST_BUFFER_FLAGS (buf));
+
+ gst_mini_object_set_qdata (GST_MINI_OBJECT (buf), QUARK_ID,
+ GINT_TO_POINTER (comm->id), NULL);
+
+ if (comm->on_buffer)
+ (*comm->on_buffer) (comm->id, buf, comm->user_data);
+
+ GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
+ comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
+ break;
+ }
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
+ {
+ GstEvent *event;
+ gboolean upstream;
+
+ available = gst_adapter_available (comm->adapter);
+ if (available < comm->payload_length)
+ goto done;
+
+ event = gst_ipc_pipeline_comm_read_event (comm, comm->payload_length,
+ &upstream);
+ if (!event)
+ goto event_failed;
+
+ GST_TRACE_OBJECT (comm->element, "deserialized event %p of type %s",
+ event, gst_event_type_get_name (event->type));
+
+ gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID,
+ GINT_TO_POINTER (comm->id), NULL);
+
+ if (comm->on_event)
+ (*comm->on_event) (comm->id, event, upstream, comm->user_data);
+
+ GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
+ comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
+ break;
+ }
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
+ {
+ GstEvent *event;
+
+ available = gst_adapter_available (comm->adapter);
+ if (available < comm->payload_length)
+ goto done;
+
+ event = gst_ipc_pipeline_comm_read_sink_message_event (comm,
+ comm->payload_length);
+ if (!event)
+ goto event_failed;
+
+ GST_TRACE_OBJECT (comm->element, "deserialized sink message event %p",
+ event);
+
+ gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID,
+ GINT_TO_POINTER (comm->id), NULL);
+
+ if (comm->on_event)
+ (*comm->on_event) (comm->id, event, FALSE, comm->user_data);
+
+ GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
+ comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
+ break;
+ }
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
+ {
+ GstQuery *query;
+ gboolean upstream;
+
+ available = gst_adapter_available (comm->adapter);
+ if (available < comm->payload_length)
+ goto done;
+
+ query = gst_ipc_pipeline_comm_read_query (comm, comm->payload_length,
+ &upstream);
+ if (!query)
+ goto query_failed;
+
+ GST_TRACE_OBJECT (comm->element, "deserialized query %p of type %s",
+ query, gst_query_type_get_name (query->type));
+
+ gst_mini_object_set_qdata (GST_MINI_OBJECT (query), QUARK_ID,
+ GINT_TO_POINTER (comm->id), NULL);
+
+ if (comm->on_query)
+ (*comm->on_query) (comm->id, query, upstream, comm->user_data);
+
+ GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
+ comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
+ break;
+ }
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
+ {
+ guint32 transition;
+
+ available = gst_adapter_available (comm->adapter);
+ if (available < comm->payload_length)
+ goto done;
+
+ if (!gst_ipc_pipeline_comm_read_state_change (comm,
+ comm->payload_length, &transition))
+ goto state_change_failed;
+
+ GST_TRACE_OBJECT (comm->element,
+ "deserialized state change request: %s -> %s",
+ gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT
+ (transition)),
+ gst_element_state_get_name (GST_STATE_TRANSITION_NEXT
+ (transition)));
+
+ if (comm->on_state_change)
+ (*comm->on_state_change) (comm->id, transition, comm->user_data);
+
+ GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
+ comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
+ break;
+ }
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
+ {
+ available = gst_adapter_available (comm->adapter);
+ if (available < comm->payload_length)
+ goto done;
+
+ if (!gst_ipc_pipeline_comm_read_state_lost (comm, comm->payload_length))
+ goto event_failed;
+
+ GST_TRACE_OBJECT (comm->element, "deserialized state-lost");
+
+ if (comm->on_state_lost)
+ (*comm->on_state_lost) (comm->user_data);
+
+ GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
+ comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
+ break;
+ }
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
+ {
+ GstMessage *message;
+
+ available = gst_adapter_available (comm->adapter);
+ if (available < comm->payload_length)
+ goto done;
+
+ message = gst_ipc_pipeline_comm_read_message (comm,
+ comm->payload_length);
+ if (!message)
+ goto message_failed;
+
+ GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s",
+ message, gst_message_type_get_name (message->type));
+
+ if (comm->on_message)
+ (*comm->on_message) (comm->id, message, comm->user_data);
+
+ GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
+ comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
+ break;
+ }
+ case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
+ {
+ GstMessage *message;
+
+ available = gst_adapter_available (comm->adapter);
+ if (available < comm->payload_length)
+ goto done;
+
+ message = gst_ipc_pipeline_comm_read_gerror_message (comm,
+ comm->payload_length);
+ if (!message)
+ goto message_failed;
+
+ GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s",
+ message, gst_message_type_get_name (message->type));
+
+ if (comm->on_message)
+ (*comm->on_message) (comm->id, message, comm->user_data);
+
+ GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
+ comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
+ break;
+ }
+ }
+
+done:
+ return ret;
+
+ /* ERRORS */
+out_of_sync:
+ {
+ GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
+ ("Socket out of sync"));
+ ret = FALSE;
+ goto done;
+ }
+state_change_failed:
+ {
+ GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
+ ("could not read state change from fd"));
+ ret = FALSE;
+ goto done;
+ }
+ack_failed:
+ {
+ GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
+ ("could not read ack from fd"));
+ ret = FALSE;
+ goto done;
+ }
+buffer_failed:
+ {
+ GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
+ ("could not read buffer from fd"));
+ ret = FALSE;
+ goto done;
+ }
+event_failed:
+ {
+ GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
+ ("could not read event from fd"));
+ ret = FALSE;
+ goto done;
+ }
+message_failed:
+ {
+ GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
+ ("could not read message from fd"));
+ ret = FALSE;
+ goto done;
+ }
+query_failed:
+ {
+ GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
+ ("could not read query from fd"));
+ ret = FALSE;
+ goto done;
+ }
+}
+
+static gpointer
+reader_thread (gpointer data)
+{
+ GstIpcPipelineComm *comm = (GstIpcPipelineComm *) data;
+
+ g_atomic_int_set (&comm->thread_running, 1);
+ while (!comm->reader_thread_stopping) {
+ if (!update_adapter (comm)) {
+ if (comm->reader_thread_stopping) {
+ GST_INFO_OBJECT (comm->element, "We're stopping, all good");
+ break;
+ }
+ GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL),
+ ("Failed to read from socket"));
+ break;
+ }
+ read_many (comm);
+ }
+
+ GST_INFO_OBJECT (comm->element, "Reader thread ending");
+ g_atomic_int_set (&comm->thread_running, 0);
+
+ return NULL;
+}
+
+gboolean
+gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm,
+ void (*on_buffer) (guint32, GstBuffer *, gpointer),
+ void (*on_event) (guint32, GstEvent *, gboolean, gpointer),
+ void (*on_query) (guint32, GstQuery *, gboolean, gpointer),
+ void (*on_state_change) (guint32, GstStateChange, gpointer),
+ void (*on_state_lost) (gpointer),
+ void (*on_message) (guint32, GstMessage *, gpointer), gpointer user_data)
+{
+ if (comm->reader_thread)
+ return FALSE;
+
+ comm->reader_thread_stopping = FALSE;
+ comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
+ comm->on_buffer = on_buffer;
+ comm->on_event = on_event;
+ comm->on_query = on_query;
+ comm->on_state_change = on_state_change;
+ comm->on_state_lost = on_state_lost;
+ comm->on_message = on_message;
+ comm->user_data = user_data;
+ if (pipe (comm->reader_thread_stopping_pipe) < 0) {
+ GST_WARNING_OBJECT (comm->element, "Failed to create pipes");
+ return FALSE;
+ }
+ comm->reader_thread =
+ g_thread_new ("reader", (GThreadFunc) reader_thread, comm);
+ return TRUE;
+}
+
+void
+gst_ipc_pipeline_comm_stop_reader_thread (GstIpcPipelineComm * comm)
+{
+ char dummy = 0;
+
+ if (!comm->reader_thread)
+ return;
+ while (write (comm->reader_thread_stopping_pipe[1], &dummy, 1) < 0
+ && errno == EINTR);
+ g_thread_join (comm->reader_thread);
+ close (comm->reader_thread_stopping_pipe[0]);
+ close (comm->reader_thread_stopping_pipe[1]);
+ comm->reader_thread = NULL;
+}
+
+static gchar *
+gst_value_serialize_event (const GValue * value)
+{
+ const GstStructure *structure;
+ GstEvent *ev;
+ gchar *type, *ts, *seqnum, *rt_offset, *str, *str64, *s;
+ GValue val = G_VALUE_INIT;
+
+ ev = g_value_get_boxed (value);
+
+ g_value_init (&val, gst_event_type_get_type ());
+ g_value_set_enum (&val, ev->type);
+ type = gst_value_serialize (&val);
+ g_value_unset (&val);
+
+ g_value_init (&val, G_TYPE_UINT64);
+ g_value_set_uint64 (&val, ev->timestamp);
+ ts = gst_value_serialize (&val);
+ g_value_unset (&val);
+
+ g_value_init (&val, G_TYPE_UINT);
+ g_value_set_uint (&val, ev->seqnum);
+ seqnum = gst_value_serialize (&val);
+ g_value_unset (&val);
+
+ g_value_init (&val, G_TYPE_INT64);
+ g_value_set_int64 (&val, gst_event_get_running_time_offset (ev));
+ rt_offset = gst_value_serialize (&val);
+ g_value_unset (&val);
+
+ structure = gst_event_get_structure (ev);
+ str = gst_structure_to_string (structure);
+ str64 = g_base64_encode ((guchar *) str, strlen (str) + 1);
+ g_strdelimit (str64, "=", '_');
+ g_free (str);
+
+ s = g_strconcat (type, ":", ts, ":", seqnum, ":", rt_offset, ":", str64,
+ NULL);
+
+ g_free (type);
+ g_free (ts);
+ g_free (seqnum);
+ g_free (rt_offset);
+ g_free (str64);
+
+ return s;
+}
+
+static gboolean
+gst_value_deserialize_event (GValue * dest, const gchar * s)
+{
+ GstEvent *ev = NULL;
+ GValue val = G_VALUE_INIT;
+ gboolean ret = FALSE;
+ gchar **fields;
+ gsize len;
+
+ fields = g_strsplit (s, ":", -1);
+ if (g_strv_length (fields) != 5)
+ goto wrong_length;
+
+ g_strdelimit (fields[4], "_", '=');
+ g_base64_decode_inplace (fields[4], &len);
+
+ g_value_init (&val, gst_event_type_get_type ());
+ if (!gst_value_deserialize (&val, fields[0]))
+ goto fail;
+ ev = gst_event_new_custom (g_value_get_enum (&val),
+ gst_structure_new_from_string (fields[4]));
+
+ g_value_unset (&val);
+ g_value_init (&val, G_TYPE_UINT64);
+ if (!gst_value_deserialize (&val, fields[1]))
+ goto fail;
+ ev->timestamp = g_value_get_uint64 (&val);
+
+ g_value_unset (&val);
+ g_value_init (&val, G_TYPE_UINT);
+ if (!gst_value_deserialize (&val, fields[2]))
+ goto fail;
+ ev->seqnum = g_value_get_uint (&val);
+
+ g_value_unset (&val);
+ g_value_init (&val, G_TYPE_INT64);
+ if (!gst_value_deserialize (&val, fields[3]))
+ goto fail;
+ gst_event_set_running_time_offset (ev, g_value_get_int64 (&val));
+
+ g_value_take_boxed (dest, g_steal_pointer (&ev));
+ ret = TRUE;
+
+fail:
+ g_clear_pointer (&ev, gst_event_unref);
+ g_value_unset (&val);
+
+wrong_length:
+ g_strfreev (fields);
+ return ret;
+}
+
+#define REGISTER_SERIALIZATION_NO_COMPARE(_gtype, _type) \
+G_STMT_START { \
+ static GstValueTable gst_value = \
+ { 0, NULL, \
+ gst_value_serialize_ ## _type, gst_value_deserialize_ ## _type }; \
+ gst_value.type = _gtype; \
+ gst_value_register (&gst_value); \
+} G_STMT_END
+
+void
+gst_ipc_pipeline_comm_plugin_init (void)
+{
+ static volatile gsize once = 0;
+
+ if (g_once_init_enter (&once)) {
+ GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_comm_debug, "ipcpipelinecomm", 0,
+ "ipc pipeline comm");
+ QUARK_ID = g_quark_from_static_string ("ipcpipeline-id");
+ REGISTER_SERIALIZATION_NO_COMPARE (gst_event_get_type (), event);
+ g_once_init_leave (&once, (gsize) 1);
+ }
+}
diff --git a/gst/ipcpipeline/gstipcpipelinecomm.h b/gst/ipcpipeline/gstipcpipelinecomm.h
new file mode 100644
index 000000000..bd7335e9e
--- /dev/null
+++ b/gst/ipcpipeline/gstipcpipelinecomm.h
@@ -0,0 +1,132 @@
+/* GStreamer
+ * Copyright (C) 2015-2017 YouView TV Ltd
+ * Author: Vincent Penquerch <vincent.penquerch@collabora.co.uk>
+ *
+ * gstipcpipelinecomm.h:
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+
+#ifndef __GST_IPC_PIPELINE_COMM_H__
+#define __GST_IPC_PIPELINE_COMM_H__
+
+#include <glib.h>
+#include <gst/gst.h>
+#include <gst/base/gstadapter.h>
+
+G_BEGIN_DECLS
+
+#define GST_FLOW_COMM_ERROR GST_FLOW_CUSTOM_ERROR_1
+
+extern GQuark QUARK_ID;
+
+typedef enum {
+ GST_IPC_PIPELINE_COMM_STATE_TYPE = 0,
+ /* for the rest of the states we use directly the data type enums below */
+} GstIpcPipelineCommState;
+
+typedef enum {
+ /* reply types */
+ GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK = 1,
+ GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT,
+ /* data send types */
+ GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER,
+ GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT,
+ GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT,
+ GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY,
+ GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE,
+ GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST,
+ GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE,
+ GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE,
+} GstIpcPipelineCommDataType;
+
+typedef struct
+{
+ GstElement *element;
+
+ GMutex mutex;
+ int fdin;
+ int fdout;
+ GHashTable *waiting_ids;
+
+ GThread *reader_thread;
+ gboolean reader_thread_stopping;
+ volatile gint thread_running;
+ int reader_thread_stopping_pipe[2];
+ GstAdapter *adapter;
+ guint8 state;
+ guint32 send_id;
+
+ guint32 payload_length;
+ guint32 id;
+
+ guint read_chunk_size;
+ GstClockTime ack_time;
+
+ void (*on_buffer) (guint32, GstBuffer *, gpointer);
+ void (*on_event) (guint32, GstEvent *, gboolean, gpointer);
+ void (*on_query) (guint32, GstQuery *, gboolean, gpointer);
+ void (*on_state_change) (guint32, GstStateChange, gpointer);
+ void (*on_state_lost) (gpointer);
+ void (*on_message) (guint32, GstMessage *, gpointer);
+ gpointer user_data;
+
+} GstIpcPipelineComm;
+
+void gst_ipc_pipeline_comm_plugin_init (void);
+
+void gst_ipc_pipeline_comm_init (GstIpcPipelineComm *comm, GstElement *e);
+void gst_ipc_pipeline_comm_clear (GstIpcPipelineComm *comm);
+void gst_ipc_pipeline_comm_cancel (GstIpcPipelineComm * comm,
+ gboolean flushing);
+
+void gst_ipc_pipeline_comm_write_flow_ack_to_fd (GstIpcPipelineComm * comm,
+ guint32 id, GstFlowReturn ret);
+void gst_ipc_pipeline_comm_write_boolean_ack_to_fd (GstIpcPipelineComm * comm,
+ guint32 id, gboolean ret);
+void gst_ipc_pipeline_comm_write_state_change_ack_to_fd (
+ GstIpcPipelineComm * comm, guint32 id, GstStateChangeReturn ret);
+
+void gst_ipc_pipeline_comm_write_query_result_to_fd (GstIpcPipelineComm * comm,
+ guint32 id, gboolean result, GstQuery *query);
+
+GstFlowReturn gst_ipc_pipeline_comm_write_buffer_to_fd (
+ GstIpcPipelineComm * comm, GstBuffer * buffer);
+gboolean gst_ipc_pipeline_comm_write_event_to_fd (GstIpcPipelineComm * comm,
+ gboolean upstream, GstEvent * event);
+gboolean gst_ipc_pipeline_comm_write_query_to_fd (GstIpcPipelineComm * comm,
+ gboolean upstream, GstQuery * query);
+GstStateChangeReturn gst_ipc_pipeline_comm_write_state_change_to_fd (
+ GstIpcPipelineComm * comm, GstStateChange transition);
+void gst_ipc_pipeline_comm_write_state_lost_to_fd (GstIpcPipelineComm * comm);
+gboolean gst_ipc_pipeline_comm_write_message_to_fd (GstIpcPipelineComm * comm,
+ GstMessage *message);
+
+gboolean gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm,
+ void (*on_buffer) (guint32, GstBuffer *, gpointer),
+ void (*on_event) (guint32, GstEvent *, gboolean, gpointer),
+ void (*on_query) (guint32, GstQuery *, gboolean, gpointer),
+ void (*on_state_change) (guint32, GstStateChange, gpointer),
+ void (*on_state_lost) (gpointer),
+ void (*on_message) (guint32, GstMessage *, gpointer),
+ gpointer user_data);
+void gst_ipc_pipeline_comm_stop_reader_thread (GstIpcPipelineComm * comm);
+
+G_END_DECLS
+
+#endif
+
diff --git a/gst/ipcpipeline/gstipcpipelinesink.c b/gst/ipcpipeline/gstipcpipelinesink.c
new file mode 100644
index 000000000..f1f02bfcb
--- /dev/null
+++ b/gst/ipcpipeline/gstipcpipelinesink.c
@@ -0,0 +1,722 @@
+/* GStreamer
+ * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
+ * 2005 Wim Taymans <wim@fluendo.com>
+ * 2006 Thomas Vander Stichele <thomas at apestaart dot org>
+ * 2014 Tim-Philipp Müller <tim centricular com>
+ * 2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
+ *
+ * gstipcpipelinesink.c:
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+/**
+ * SECTION:element-ipcpipelinesink
+ * @see_also: #GstIpcPipelineSrc, #GstIpcSlavePipeline
+ *
+ * Communicates with an ipcpipelinesrc element in another process via a socket.
+ *
+ * This element, together with ipcpipelinesrc and ipcslavepipeline form a
+ * mechanism that allows splitting a single pipeline in different processes.
+ * The main use-case for it is a playback pipeline split in two parts, where the
+ * first part contains the networking, parsing and demuxing and the second part
+ * contains the decoding and display. The intention of this split is to improve
+ * security of an application, by letting the networking, parsing and demuxing
+ * parts run in a less privileged process than the process that accesses the
+ * decoder and display.
+ *
+ * Once the pipelines in those different processes have been created, the
+ * playback can be controlled entirely from the first pipeline, which is the
+ * one that contains ipcpipelinesink. We call this pipeline the “master”.
+ * All relevant events and queries sent from the application are sent to
+ * the master pipeline and messages to the application are sent from the master
+ * pipeline. The second pipeline, in the other process, is transparently slaved.
+ *
+ * ipcpipelinesink can work only in push mode and does not synchronize buffers
+ * to the clock. Synchronization is meant to happen either at the real sink at
+ * the end of the remote slave pipeline, or not to happen at all, if the
+ * pipeline is live.
+ *
+ * A master pipeline may contain more than one ipcpipelinesink elements, which
+ * can be connected either to the same slave pipeline or to different ones.
+ *
+ * Communication with ipcpipelinesrc on the slave happens via a socket, using a
+ * custom protocol. Each buffer, event, query, message or state change is
+ * serialized in a "packet" and sent over the socket. The sender then
+ * performs a blocking wait for a reply, if a return code is needed.
+ *
+ * All objects that contan a GstStructure (messages, queries, events) are
+ * serialized by serializing the GstStructure to a string
+ * (gst_structure_to_string). This implies some limitations, of course.
+ * All fields of this structures that are not serializable to strings (ex.
+ * object pointers) are ignored, except for some cases where custom
+ * serialization may occur (ex error/warning/info messages that contain a
+ * GError are serialized differently).
+ *
+ * Buffers are transported by writing their content directly on the socket.
+ * More efficient ways for memory sharing could be implemented in the future.
+ */
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include <unistd.h>
+#include "gstipcpipelinesink.h"
+#include <string.h>
+
+static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
+ GST_PAD_SINK,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS_ANY);
+
+GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_sink_debug);
+#define GST_CAT_DEFAULT gst_ipc_pipeline_sink_debug
+
+enum
+{
+ SIGNAL_DISCONNECT,
+ /* FILL ME */
+ LAST_SIGNAL
+};
+static guint gst_ipc_pipeline_sink_signals[LAST_SIGNAL] = { 0 };
+
+enum
+{
+ PROP_0,
+ PROP_FDIN,
+ PROP_FDOUT,
+ PROP_READ_CHUNK_SIZE,
+ PROP_ACK_TIME,
+};
+
+
+#define DEFAULT_READ_CHUNK_SIZE 4096
+#define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND)
+
+#define _do_init \
+ GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_sink_debug, "ipcpipelinesink", 0, "ipcpipelinesink element");
+#define gst_ipc_pipeline_sink_parent_class parent_class
+G_DEFINE_TYPE_WITH_CODE (GstIpcPipelineSink, gst_ipc_pipeline_sink,
+ GST_TYPE_ELEMENT, _do_init);
+
+static void gst_ipc_pipeline_sink_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec);
+static void gst_ipc_pipeline_sink_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec);
+static void gst_ipc_pipeline_sink_dispose (GObject * obj);
+static void gst_ipc_pipeline_sink_finalize (GObject * obj);
+static gboolean gst_ipc_pipeline_sink_start_reader_thread (GstIpcPipelineSink *
+ sink);
+static void gst_ipc_pipeline_sink_stop_reader_thread (GstIpcPipelineSink *
+ sink);
+
+static GstStateChangeReturn gst_ipc_pipeline_sink_change_state (GstElement *
+ element, GstStateChange transition);
+
+static GstFlowReturn gst_ipc_pipeline_sink_chain (GstPad * pad,
+ GstObject * parent, GstBuffer * buffer);
+static gboolean gst_ipc_pipeline_sink_event (GstPad * pad, GstObject * parent,
+ GstEvent * event);
+static gboolean gst_ipc_pipeline_sink_element_query (GstElement * element,
+ GstQuery * query);
+static gboolean gst_ipc_pipeline_sink_send_event (GstElement * element,
+ GstEvent * event);
+static gboolean gst_ipc_pipeline_sink_query (GstPad * pad, GstObject * parent,
+ GstQuery * query);
+static gboolean gst_ipc_pipeline_sink_pad_activate_mode (GstPad * pad,
+ GstObject * parent, GstPadMode mode, gboolean active);
+
+
+static void gst_ipc_pipeline_sink_disconnect (GstIpcPipelineSink * sink);
+static void pusher (gpointer data, gpointer user_data);
+
+
+static void
+gst_ipc_pipeline_sink_class_init (GstIpcPipelineSinkClass * klass)
+{
+ GObjectClass *gobject_class;
+ GstElementClass *gstelement_class;
+
+ gobject_class = G_OBJECT_CLASS (klass);
+ gstelement_class = GST_ELEMENT_CLASS (klass);
+
+ gobject_class->set_property = gst_ipc_pipeline_sink_set_property;
+ gobject_class->get_property = gst_ipc_pipeline_sink_get_property;
+ gobject_class->dispose = gst_ipc_pipeline_sink_dispose;
+ gobject_class->finalize = gst_ipc_pipeline_sink_finalize;
+
+ g_object_class_install_property (gobject_class, PROP_FDIN,
+ g_param_spec_int ("fdin", "Input file descriptor",
+ "File descriptor to received data from",
+ -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_FDOUT,
+ g_param_spec_int ("fdout", "Output file descriptor",
+ "File descriptor to send data through",
+ -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_READ_CHUNK_SIZE,
+ g_param_spec_uint ("read-chunk-size", "Read chunk size",
+ "Read chunk size",
+ 1, 1 << 24, DEFAULT_READ_CHUNK_SIZE,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_ACK_TIME,
+ g_param_spec_uint64 ("ack-time", "Ack time",
+ "Maximum time to wait for a response to a message",
+ 0, G_MAXUINT64, DEFAULT_ACK_TIME,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ gst_ipc_pipeline_sink_signals[SIGNAL_DISCONNECT] =
+ g_signal_new ("disconnect",
+ G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
+ G_STRUCT_OFFSET (GstIpcPipelineSinkClass, disconnect),
+ NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
+
+ gst_element_class_set_static_metadata (gstelement_class,
+ "Inter-process Pipeline Sink",
+ "Sink",
+ "Allows splitting and continuing a pipeline in another process",
+ "Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>");
+ gst_element_class_add_pad_template (gstelement_class,
+ gst_static_pad_template_get (&sinktemplate));
+
+ gstelement_class->change_state =
+ GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_change_state);
+ gstelement_class->query =
+ GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_element_query);
+ gstelement_class->send_event =
+ GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_send_event);
+
+ klass->disconnect = GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_disconnect);
+}
+
+static void
+gst_ipc_pipeline_sink_init (GstIpcPipelineSink * sink)
+{
+ GstPadTemplate *pad_template;
+
+ GST_OBJECT_FLAG_SET (sink, GST_ELEMENT_FLAG_SINK);
+
+ gst_ipc_pipeline_comm_init (&sink->comm, GST_ELEMENT (sink));
+ sink->comm.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
+ sink->comm.ack_time = DEFAULT_ACK_TIME;
+ sink->comm.fdin = -1;
+ sink->comm.fdout = -1;
+ sink->threads = g_thread_pool_new (pusher, sink, -1, FALSE, NULL);
+ gst_ipc_pipeline_sink_start_reader_thread (sink);
+
+ pad_template =
+ gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (sink), "sink");
+ g_return_if_fail (pad_template != NULL);
+
+ sink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
+
+ gst_pad_set_activatemode_function (sink->sinkpad,
+ gst_ipc_pipeline_sink_pad_activate_mode);
+ gst_pad_set_query_function (sink->sinkpad, gst_ipc_pipeline_sink_query);
+ gst_pad_set_event_function (sink->sinkpad, gst_ipc_pipeline_sink_event);
+ gst_pad_set_chain_function (sink->sinkpad, gst_ipc_pipeline_sink_chain);
+ gst_element_add_pad (GST_ELEMENT_CAST (sink), sink->sinkpad);
+
+}
+
+static void
+gst_ipc_pipeline_sink_dispose (GObject * obj)
+{
+ GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (obj);
+
+ gst_ipc_pipeline_sink_stop_reader_thread (sink);
+ gst_ipc_pipeline_comm_cancel (&sink->comm, TRUE);
+
+ G_OBJECT_CLASS (parent_class)->dispose (obj);
+}
+
+static void
+gst_ipc_pipeline_sink_finalize (GObject * obj)
+{
+ GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (obj);
+
+ gst_ipc_pipeline_comm_clear (&sink->comm);
+ g_thread_pool_free (sink->threads, TRUE, TRUE);
+
+ G_OBJECT_CLASS (parent_class)->finalize (obj);
+}
+
+static void
+gst_ipc_pipeline_sink_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (object);
+
+ switch (prop_id) {
+ case PROP_FDIN:
+ sink->comm.fdin = g_value_get_int (value);
+ break;
+ case PROP_FDOUT:
+ sink->comm.fdout = g_value_get_int (value);
+ break;
+ case PROP_READ_CHUNK_SIZE:
+ sink->comm.read_chunk_size = g_value_get_uint (value);
+ break;
+ case PROP_ACK_TIME:
+ sink->comm.ack_time = g_value_get_uint64 (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_ipc_pipeline_sink_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (object);
+
+ switch (prop_id) {
+ case PROP_FDIN:
+ g_value_set_int (value, sink->comm.fdin);
+ break;
+ case PROP_FDOUT:
+ g_value_set_int (value, sink->comm.fdout);
+ break;
+ case PROP_READ_CHUNK_SIZE:
+ g_value_set_uint (value, sink->comm.read_chunk_size);
+ break;
+ case PROP_ACK_TIME:
+ g_value_set_uint64 (value, sink->comm.ack_time);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static gboolean
+gst_ipc_pipeline_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+ GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent);
+ gboolean ret;
+
+ GST_DEBUG_OBJECT (sink, "received event %p of type %s (%d)",
+ event, gst_event_type_get_name (event->type), event->type);
+
+ ret = gst_ipc_pipeline_comm_write_event_to_fd (&sink->comm, FALSE, event);
+ gst_event_unref (event);
+ return ret;
+}
+
+static GstFlowReturn
+gst_ipc_pipeline_sink_chain (GstPad * pad, GstObject * parent,
+ GstBuffer * buffer)
+{
+ GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent);
+ GstFlowReturn ret;
+
+ GST_DEBUG_OBJECT (sink, "Rendering buffer %" GST_PTR_FORMAT, buffer);
+
+ ret = gst_ipc_pipeline_comm_write_buffer_to_fd (&sink->comm, buffer);
+ if (ret != GST_FLOW_OK)
+ GST_DEBUG_OBJECT (sink, "Peer result was %s", gst_flow_get_name (ret));
+
+ gst_buffer_unref (buffer);
+ return ret;
+}
+
+static gboolean
+gst_ipc_pipeline_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
+{
+ GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent);
+ gboolean ret;
+
+ GST_DEBUG_OBJECT (sink, "Got query %s: %" GST_PTR_FORMAT,
+ GST_QUERY_TYPE_NAME (query), query);
+
+ switch (GST_QUERY_TYPE (query)) {
+ case GST_QUERY_ALLOCATION:
+ GST_DEBUG_OBJECT (sink, "Rejecting ALLOCATION query");
+ return FALSE;
+ case GST_QUERY_CAPS:
+ {
+ /* caps queries occur even while linking the pipeline.
+ * It is possible that the ipcpipelinesrc may not be connected at this
+ * point, so let's avoid a couple of errors... */
+ GstState state;
+ GST_OBJECT_LOCK (sink);
+ state = GST_STATE (sink);
+ GST_OBJECT_UNLOCK (sink);
+ if (state == GST_STATE_NULL)
+ return FALSE;
+ }
+ default:
+ break;
+ }
+ ret = gst_ipc_pipeline_comm_write_query_to_fd (&sink->comm, FALSE, query);
+
+ return ret;
+}
+
+static gboolean
+gst_ipc_pipeline_sink_element_query (GstElement * element, GstQuery * query)
+{
+ GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
+ gboolean ret;
+
+ GST_DEBUG_OBJECT (sink, "Got element query %s: %" GST_PTR_FORMAT,
+ GST_QUERY_TYPE_NAME (query), query);
+
+ ret = gst_ipc_pipeline_comm_write_query_to_fd (&sink->comm, TRUE, query);
+ GST_DEBUG_OBJECT (sink, "Got query reply: %d: %" GST_PTR_FORMAT, ret, query);
+ return ret;
+}
+
+static gboolean
+gst_ipc_pipeline_sink_send_event (GstElement * element, GstEvent * event)
+{
+ GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
+ gboolean ret;
+
+ GST_DEBUG_OBJECT (sink, "Got element event %s: %" GST_PTR_FORMAT,
+ GST_EVENT_TYPE_NAME (event), event);
+
+ ret = gst_ipc_pipeline_comm_write_event_to_fd (&sink->comm, TRUE, event);
+ GST_DEBUG_OBJECT (sink, "Got event reply: %d: %" GST_PTR_FORMAT, ret, event);
+
+ gst_event_unref (event);
+ return ret;
+}
+
+
+static gboolean
+gst_ipc_pipeline_sink_pad_activate_mode (GstPad * pad,
+ GstObject * parent, GstPadMode mode, gboolean active)
+{
+ if (mode == GST_PAD_MODE_PULL)
+ return FALSE;
+ return TRUE;
+}
+
+static void
+on_buffer (guint32 id, GstBuffer * buffer, gpointer user_data)
+{
+ GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
+ GST_ERROR_OBJECT (sink,
+ "Got buffer id %u! I never knew buffers could go upstream...", id);
+ gst_buffer_unref (buffer);
+}
+
+static void
+pusher (gpointer data, gpointer user_data)
+{
+ GstIpcPipelineSink *sink = user_data;
+ gboolean ret;
+ guint32 id;
+
+ id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (data),
+ QUARK_ID));
+
+ if (GST_IS_EVENT (data)) {
+ GstEvent *event = GST_EVENT (data);
+ GST_DEBUG_OBJECT (sink, "Pushing event async: %" GST_PTR_FORMAT, event);
+ ret = gst_pad_push_event (sink->sinkpad, event);
+ GST_DEBUG_OBJECT (sink, "Event pushed, return %d", ret);
+ gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&sink->comm, id, ret);
+ } else if (GST_IS_QUERY (data)) {
+ GstQuery *query = GST_QUERY (data);
+ GST_DEBUG_OBJECT (sink, "Pushing query async: %" GST_PTR_FORMAT, query);
+ ret = gst_pad_peer_query (sink->sinkpad, query);
+ GST_DEBUG_OBJECT (sink, "Query pushed, return %d", ret);
+ gst_ipc_pipeline_comm_write_query_result_to_fd (&sink->comm, id, ret,
+ query);
+ gst_query_unref (query);
+ } else {
+ GST_ERROR_OBJECT (sink, "Unsupported object type");
+ }
+ gst_object_unref (sink);
+}
+
+static void
+on_event (guint32 id, GstEvent * event, gboolean upstream, gpointer user_data)
+{
+ GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
+
+ if (!upstream) {
+ GST_ERROR_OBJECT (sink, "Got downstream event id %u! Not supposed to...",
+ id);
+ gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&sink->comm, id, FALSE);
+ gst_event_unref (event);
+ return;
+ }
+
+ GST_DEBUG_OBJECT (sink, "Got event id %u: %" GST_PTR_FORMAT, id, event);
+ gst_object_ref (sink);
+ g_thread_pool_push (sink->threads, event, NULL);
+}
+
+static void
+on_query (guint32 id, GstQuery * query, gboolean upstream, gpointer user_data)
+{
+ GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
+
+ if (!upstream) {
+ GST_ERROR_OBJECT (sink, "Got downstream query id %u! Not supposed to...",
+ id);
+ gst_ipc_pipeline_comm_write_query_result_to_fd (&sink->comm, id, FALSE,
+ query);
+ gst_query_unref (query);
+ return;
+ }
+
+ GST_DEBUG_OBJECT (sink, "Got query id %u: %" GST_PTR_FORMAT, id, query);
+ gst_object_ref (sink);
+ g_thread_pool_push (sink->threads, query, NULL);
+}
+
+static void
+on_state_change (guint32 id, GstStateChange transition, gpointer user_data)
+{
+ GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
+ GST_ERROR_OBJECT (sink, "Got state change id %u! Not supposed to...", id);
+}
+
+static void
+on_state_lost (gpointer user_data)
+{
+ GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
+
+ GST_DEBUG_OBJECT (sink, "Got state lost notification, losing state");
+
+ GST_OBJECT_LOCK (sink);
+ sink->pass_next_async_done = TRUE;
+ GST_OBJECT_UNLOCK (sink);
+
+ gst_element_lost_state (GST_ELEMENT (sink));
+}
+
+static void
+do_async_done (GstElement * element, gpointer user_data)
+{
+ GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
+ GstMessage *message = user_data;
+
+ GST_STATE_LOCK (sink);
+ GST_OBJECT_LOCK (sink);
+ if (sink->pass_next_async_done) {
+ sink->pass_next_async_done = FALSE;
+ GST_OBJECT_UNLOCK (sink);
+ gst_element_continue_state (element, GST_STATE_CHANGE_SUCCESS);
+ GST_STATE_UNLOCK (sink);
+ gst_element_post_message (element, gst_message_ref (message));
+
+ } else {
+ GST_OBJECT_UNLOCK (sink);
+ GST_STATE_UNLOCK (sink);
+ }
+}
+
+static void
+on_message (guint32 id, GstMessage * message, gpointer user_data)
+{
+ GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
+
+ GST_DEBUG_OBJECT (sink, "Got message id %u: %" GST_PTR_FORMAT, id, message);
+
+ switch (GST_MESSAGE_TYPE (message)) {
+ case GST_MESSAGE_ASYNC_DONE:
+ GST_OBJECT_LOCK (sink);
+ if (sink->pass_next_async_done) {
+ GST_OBJECT_UNLOCK (sink);
+ gst_element_call_async (GST_ELEMENT (sink), do_async_done,
+ message, (GDestroyNotify) gst_message_unref);
+ } else {
+ GST_OBJECT_UNLOCK (sink);
+ gst_message_unref (message);
+ }
+ return;
+ default:
+ break;
+ }
+
+ gst_element_post_message (GST_ELEMENT (sink), message);
+}
+
+static gboolean
+gst_ipc_pipeline_sink_start_reader_thread (GstIpcPipelineSink * sink)
+{
+ if (!gst_ipc_pipeline_comm_start_reader_thread (&sink->comm, on_buffer,
+ on_event, on_query, on_state_change, on_state_lost, on_message,
+ sink)) {
+ GST_ERROR_OBJECT (sink, "Failed to start reader thread");
+ return FALSE;
+ }
+ return TRUE;
+}
+
+static void
+gst_ipc_pipeline_sink_stop_reader_thread (GstIpcPipelineSink * sink)
+{
+ gst_ipc_pipeline_comm_stop_reader_thread (&sink->comm);
+}
+
+
+static void
+gst_ipc_pipeline_sink_disconnect (GstIpcPipelineSink * sink)
+{
+ GST_DEBUG_OBJECT (sink, "Disconnecting");
+ gst_ipc_pipeline_sink_stop_reader_thread (sink);
+ sink->comm.fdin = -1;
+ sink->comm.fdout = -1;
+ gst_ipc_pipeline_comm_cancel (&sink->comm, FALSE);
+ gst_ipc_pipeline_sink_start_reader_thread (sink);
+}
+
+static GstStateChangeReturn
+gst_ipc_pipeline_sink_change_state (GstElement * element,
+ GstStateChange transition)
+{
+ GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
+ GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
+ GstStateChangeReturn peer_ret = GST_STATE_CHANGE_SUCCESS;
+ gboolean async = FALSE;
+ gboolean down = FALSE;
+
+ GST_DEBUG_OBJECT (sink, "Got state change request: %s -> %s",
+ gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
+ gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
+
+ switch (transition) {
+ case GST_STATE_CHANGE_NULL_TO_READY:
+ if (sink->comm.fdin < 0) {
+ GST_ERROR_OBJECT (element, "Invalid fdin: %d", sink->comm.fdin);
+ return GST_STATE_CHANGE_FAILURE;
+ }
+ if (sink->comm.fdout < 0) {
+ GST_ERROR_OBJECT (element, "Invalid fdout: %d", sink->comm.fdout);
+ return GST_STATE_CHANGE_FAILURE;
+ }
+ if (!sink->comm.reader_thread) {
+ GST_ERROR_OBJECT (element, "Failed to start reader thread");
+ return GST_STATE_CHANGE_FAILURE;
+ }
+ break;
+ case GST_STATE_CHANGE_READY_TO_PAUSED:
+ case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+ case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+ /* In these transitions, it is possible that the peer returns ASYNC.
+ * We don't know that in advance, but we post async-start anyway because
+ * it needs to be delivered *before* async-done, and async-done may
+ * arrive at any point in time after we've set the state of the peer.
+ * In case the peer doesn't return ASYNC, we just post async-done
+ * ourselves and the parent GstBin takes care of matching and deleting
+ * them, so the app never gets any of these. */
+ async = TRUE;
+ break;
+ default:
+ break;
+ }
+
+ /* downwards state change */
+ down = (GST_STATE_TRANSITION_CURRENT (transition) >=
+ GST_STATE_TRANSITION_NEXT (transition));
+
+ if (async) {
+ GST_DEBUG_OBJECT (sink,
+ "Posting async-start for %s, will need state-change-done",
+ gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
+
+ gst_element_post_message (GST_ELEMENT (sink),
+ gst_message_new_async_start (GST_OBJECT (sink)));
+
+ GST_OBJECT_LOCK (sink);
+ sink->pass_next_async_done = TRUE;
+ GST_OBJECT_UNLOCK (sink);
+ }
+
+ /* change the state of the peer first */
+ /* If the fd out is -1, we do not actually call the peer. This will happen
+ when we explicitely disconnected, and in that case we want to be able
+ to bring the element down to NULL, so it can be restarted with a new
+ slave pipeline. */
+ if (sink->comm.fdout >= 0) {
+ GST_DEBUG_OBJECT (sink, "Calling peer with state change");
+ peer_ret = gst_ipc_pipeline_comm_write_state_change_to_fd (&sink->comm,
+ transition);
+ if (ret == GST_STATE_CHANGE_FAILURE && down) {
+ GST_WARNING_OBJECT (sink, "Peer returned state change failure, "
+ "but ignoring because we are going down");
+ peer_ret = GST_STATE_CHANGE_SUCCESS;
+ }
+ } else {
+ if (down) {
+ GST_WARNING_OBJECT (sink, "Not calling peer (fdout %d)",
+ sink->comm.fdout);
+ peer_ret = GST_STATE_CHANGE_SUCCESS;
+ } else {
+ GST_ERROR_OBJECT (sink, "Not calling peer (fdout %d) and failing",
+ sink->comm.fdout);
+ peer_ret = GST_STATE_CHANGE_FAILURE;
+ }
+ }
+
+ /* chain up to the parent class to change our state, if the peer succeeded */
+ if (peer_ret != GST_STATE_CHANGE_FAILURE) {
+ ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+
+ if (G_UNLIKELY (ret == GST_STATE_CHANGE_FAILURE && down)) {
+ GST_WARNING_OBJECT (sink, "Parent returned state change failure, "
+ "but ignoring because we are going down");
+ ret = GST_STATE_CHANGE_SUCCESS;
+ }
+ }
+
+ GST_DEBUG_OBJECT (sink, "For %s -> %s: Peer ret: %s, parent ret: %s",
+ gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
+ gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)),
+ gst_element_state_change_return_get_name (peer_ret),
+ gst_element_state_change_return_get_name (ret));
+
+ /* now interpret the return codes */
+ if (async && peer_ret != GST_STATE_CHANGE_ASYNC) {
+ GST_DEBUG_OBJECT (sink, "Posting async-done for %s; peer wasn't ASYNC",
+ gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
+
+ GST_OBJECT_LOCK (sink);
+ sink->pass_next_async_done = FALSE;
+ GST_OBJECT_UNLOCK (sink);
+
+ gst_element_post_message (GST_ELEMENT (sink),
+ gst_message_new_async_done (GST_OBJECT (sink), GST_CLOCK_TIME_NONE));
+ } else if (G_UNLIKELY (!async && peer_ret == GST_STATE_CHANGE_ASYNC)) {
+ GST_WARNING_OBJECT (sink, "Transition not async but peer returned ASYNC");
+ peer_ret = GST_STATE_CHANGE_SUCCESS;
+ }
+
+ if (peer_ret == GST_STATE_CHANGE_FAILURE || ret == GST_STATE_CHANGE_FAILURE) {
+ if (peer_ret != GST_STATE_CHANGE_FAILURE && sink->comm.fdout >= 0) {
+ /* only the parent's ret was FAILURE - revert remote changes */
+ GST_DEBUG_OBJECT (sink, "Reverting remote state change because parent "
+ "returned failure");
+ gst_ipc_pipeline_comm_write_state_change_to_fd (&sink->comm,
+ GST_STATE_TRANSITION (GST_STATE_TRANSITION_NEXT (transition),
+ GST_STATE_TRANSITION_CURRENT (transition)));
+ }
+ return GST_STATE_CHANGE_FAILURE;
+ }
+
+ /* the parent's (GstElement) state change func won't return ASYNC or
+ * NO_PREROLL, so unless it has returned FAILURE, which we have catched above,
+ * we are not interested in its return code... just return the peer's */
+ return peer_ret;
+}
diff --git a/gst/ipcpipeline/gstipcpipelinesink.h b/gst/ipcpipeline/gstipcpipelinesink.h
new file mode 100644
index 000000000..18c880eee
--- /dev/null
+++ b/gst/ipcpipeline/gstipcpipelinesink.h
@@ -0,0 +1,67 @@
+/* GStreamer
+ * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
+ * 2000 Wim Taymans <wtay@chello.be>
+ * 2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
+ *
+ * gstipcpipelinesink.h:
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+
+#ifndef __GST_IPC_PIPELINE_SINK_H__
+#define __GST_IPC_PIPELINE_SINK_H__
+
+#include <gst/gst.h>
+#include "gstipcpipelinecomm.h"
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_IPC_PIPELINE_SINK \
+ (gst_ipc_pipeline_sink_get_type())
+#define GST_IPC_PIPELINE_SINK(obj) \
+ (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_IPC_PIPELINE_SINK,GstIpcPipelineSink))
+#define GST_IPC_PIPELINE_SINK_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_IPC_PIPELINE_SINK,GstIpcPipelineSinkClass))
+#define GST_IS_IPC_PIPELINE_SINK(obj) \
+ (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_IPC_PIPELINE_SINK))
+#define GST_IS_IPC_PIPELINE_SINK_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_IPC_PIPELINE_SINK))
+#define GST_IPC_PIPELINE_SINK_CAST(obj) ((GstIpcPipelineSink *)obj)
+
+typedef struct _GstIpcPipelineSink GstIpcPipelineSink;
+typedef struct _GstIpcPipelineSinkClass GstIpcPipelineSinkClass;
+
+struct _GstIpcPipelineSink {
+ GstElement element;
+
+ GstIpcPipelineComm comm;
+ GThreadPool *threads;
+ gboolean pass_next_async_done;
+ GstPad *sinkpad;
+};
+
+struct _GstIpcPipelineSinkClass {
+ GstElementClass parent_class;
+
+ void (*disconnect) (GstIpcPipelineSink * sink);
+};
+
+G_GNUC_INTERNAL GType gst_ipc_pipeline_sink_get_type (void);
+
+G_END_DECLS
+
+#endif /* __GST_IPC_PIPELINE_SINK_H__ */
diff --git a/gst/ipcpipeline/gstipcpipelinesrc.c b/gst/ipcpipeline/gstipcpipelinesrc.c
new file mode 100644
index 000000000..656bd5a71
--- /dev/null
+++ b/gst/ipcpipeline/gstipcpipelinesrc.c
@@ -0,0 +1,954 @@
+/* GStreamer
+ * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
+ * 2000 Wim Taymans <wim@fluendo.com>
+ * 2006 Thomas Vander Stichele <thomas at apestaart dot org>
+ * 2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
+ *
+ * gstipcpipelinesrc.c:
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+/**
+ * SECTION:element-ipcpipelinesrc
+ * @see_also: #GstIpcPipelineSink, #GstIpcSlavePipeline
+ *
+ * Communicates with an ipcpipelinesink element in another process via a socket.
+ *
+ * The ipcpipelinesrc element allows 2-way communication with an ipcpipelinesink
+ * element on another process/pipeline. It is meant to run inside an
+ * interslavepipeline and when paired with an ipcpipelinesink, it will slave its
+ * whole parent pipeline to the "master" one, which contains the ipcpipelinesink.
+ *
+ * For more details about this mechanism and its uses, see the documentation
+ * of the ipcpipelinesink element.
+ */
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "gstipcpipelinesrc.h"
+
+static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
+ GST_PAD_SRC,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS_ANY);
+
+GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_src_debug);
+#define GST_CAT_DEFAULT gst_ipc_pipeline_src_debug
+
+enum
+{
+ /* FILL ME */
+ SIGNAL_FORWARD_MESSAGE,
+ SIGNAL_DISCONNECT,
+ LAST_SIGNAL
+};
+static guint gst_ipc_pipeline_src_signals[LAST_SIGNAL] = { 0 };
+
+enum
+{
+ PROP_0,
+ PROP_FDIN,
+ PROP_FDOUT,
+ PROP_READ_CHUNK_SIZE,
+ PROP_ACK_TIME,
+ PROP_LAST,
+};
+
+static GQuark QUARK_UPSTREAM;
+
+#define DEFAULT_READ_CHUNK_SIZE 65536
+#define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND)
+
+#define _do_init \
+ GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_src_debug, "ipcpipelinesrc", 0, "ipcpipelinesrc element");
+#define gst_ipc_pipeline_src_parent_class parent_class
+G_DEFINE_TYPE_WITH_CODE (GstIpcPipelineSrc, gst_ipc_pipeline_src,
+ GST_TYPE_ELEMENT, _do_init);
+
+static void gst_ipc_pipeline_src_finalize (GObject * object);
+static void gst_ipc_pipeline_src_dispose (GObject * object);
+static void gst_ipc_pipeline_src_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec);
+static void gst_ipc_pipeline_src_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec);
+
+static void gst_ipc_pipeline_src_cancel_queued (GstIpcPipelineSrc * src);
+
+static gboolean gst_ipc_pipeline_src_start_reader_thread (GstIpcPipelineSrc *
+ src);
+static void gst_ipc_pipeline_src_stop_reader_thread (GstIpcPipelineSrc * src);
+
+static gboolean gst_ipc_pipeline_src_activate_mode (GstPad * pad,
+ GstObject * parent, GstPadMode mode, gboolean active);
+static gboolean gst_ipc_pipeline_src_srcpad_event (GstPad * pad,
+ GstObject * parent, GstEvent * event);
+static gboolean gst_ipc_pipeline_src_srcpad_query (GstPad * pad,
+ GstObject * parent, GstQuery * query);
+static void gst_ipc_pipeline_src_loop (GstIpcPipelineSrc * src);
+
+static gboolean gst_ipc_pipeline_src_send_event (GstElement * element,
+ GstEvent * event);
+static gboolean gst_ipc_pipeline_src_query (GstElement * element,
+ GstQuery * query);
+static GstStateChangeReturn gst_ipc_pipeline_src_change_state (GstElement *
+ element, GstStateChange transition);
+
+static gboolean gst_ipc_pipeline_src_forward_message (GstIpcPipelineSrc * src,
+ GstMessage * msg);
+static void gst_ipc_pipeline_src_disconnect (GstIpcPipelineSrc * src);
+
+static void
+gst_ipc_pipeline_src_class_init (GstIpcPipelineSrcClass * klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
+
+ QUARK_UPSTREAM = g_quark_from_static_string ("ipcpipeline-upstream");
+
+ gobject_class->dispose = gst_ipc_pipeline_src_dispose;
+ gobject_class->finalize = gst_ipc_pipeline_src_finalize;
+
+ gobject_class->set_property = gst_ipc_pipeline_src_set_property;
+ gobject_class->get_property = gst_ipc_pipeline_src_get_property;
+
+ gstelement_class->send_event =
+ GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_send_event);
+ gstelement_class->query = GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_query);
+ gstelement_class->change_state =
+ GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_change_state);
+
+ klass->forward_message =
+ GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_forward_message);
+ klass->disconnect = GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_disconnect);
+
+ GST_DEBUG_REGISTER_FUNCPTR (gst_ipc_pipeline_src_activate_mode);
+ GST_DEBUG_REGISTER_FUNCPTR (gst_ipc_pipeline_src_srcpad_event);
+ GST_DEBUG_REGISTER_FUNCPTR (gst_ipc_pipeline_src_srcpad_query);
+
+ g_object_class_install_property (gobject_class, PROP_FDIN,
+ g_param_spec_int ("fdin", "Input file descriptor",
+ "File descriptor to read data from",
+ -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_FDOUT,
+ g_param_spec_int ("fdout", "Output file descriptor",
+ "File descriptor to write data through",
+ -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_READ_CHUNK_SIZE,
+ g_param_spec_uint ("read-chunk-size", "Read chunk size",
+ "Read chunk size",
+ 1, 1 << 24, DEFAULT_READ_CHUNK_SIZE,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_ACK_TIME,
+ g_param_spec_uint64 ("ack-time", "Ack time",
+ "Maximum time to wait for a response to a message",
+ 0, G_MAXUINT64, DEFAULT_ACK_TIME,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ gst_ipc_pipeline_src_signals[SIGNAL_FORWARD_MESSAGE] =
+ g_signal_new ("forward-message", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
+ G_STRUCT_OFFSET (GstIpcPipelineSrcClass, forward_message), NULL, NULL,
+ g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 1, GST_TYPE_MESSAGE);
+
+ gst_ipc_pipeline_src_signals[SIGNAL_DISCONNECT] =
+ g_signal_new ("disconnect", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
+ G_STRUCT_OFFSET (GstIpcPipelineSrcClass, disconnect),
+ NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
+
+ gst_element_class_set_static_metadata (gstelement_class,
+ "Inter-process Pipeline Source",
+ "Source",
+ "Continues a split pipeline from another process",
+ "Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>");
+ gst_element_class_add_pad_template (gstelement_class,
+ gst_static_pad_template_get (&srctemplate));
+}
+
+static void
+gst_ipc_pipeline_src_init (GstIpcPipelineSrc * src)
+{
+ GST_OBJECT_FLAG_SET (src, GST_ELEMENT_FLAG_SOURCE);
+
+ gst_ipc_pipeline_comm_init (&src->comm, GST_ELEMENT (src));
+ src->comm.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
+ src->comm.ack_time = DEFAULT_ACK_TIME;
+ src->flushing = TRUE;
+ src->last_ret = GST_FLOW_FLUSHING;
+ src->queued = NULL;
+ g_cond_init (&src->create_cond);
+
+ src->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
+ gst_pad_set_activatemode_function (src->srcpad,
+ gst_ipc_pipeline_src_activate_mode);
+ gst_pad_set_event_function (src->srcpad, gst_ipc_pipeline_src_srcpad_event);
+ gst_pad_set_query_function (src->srcpad, gst_ipc_pipeline_src_srcpad_query);
+ gst_element_add_pad (GST_ELEMENT (src), src->srcpad);
+
+ gst_ipc_pipeline_src_start_reader_thread (src);
+}
+
+static void
+gst_ipc_pipeline_src_dispose (GObject * object)
+{
+ GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object);
+
+ gst_ipc_pipeline_src_stop_reader_thread (src);
+ gst_ipc_pipeline_src_cancel_queued (src);
+ gst_ipc_pipeline_comm_cancel (&src->comm, TRUE);
+
+ G_OBJECT_CLASS (parent_class)->dispose (object);
+}
+
+static void
+gst_ipc_pipeline_src_finalize (GObject * object)
+{
+ GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object);
+
+ gst_ipc_pipeline_comm_clear (&src->comm);
+ g_cond_clear (&src->create_cond);
+
+ G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static void
+gst_ipc_pipeline_src_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object);
+
+ switch (prop_id) {
+ case PROP_FDIN:
+ src->comm.fdin = g_value_get_int (value);
+ break;
+ case PROP_FDOUT:
+ src->comm.fdout = g_value_get_int (value);
+ break;
+ case PROP_READ_CHUNK_SIZE:
+ src->comm.read_chunk_size = g_value_get_uint (value);
+ break;
+ case PROP_ACK_TIME:
+ src->comm.ack_time = g_value_get_uint64 (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_ipc_pipeline_src_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object);
+
+ g_return_if_fail (GST_IS_IPC_PIPELINE_SRC (object));
+
+ switch (prop_id) {
+ case PROP_FDIN:
+ g_value_set_int (value, src->comm.fdin);
+ break;
+ case PROP_FDOUT:
+ g_value_set_int (value, src->comm.fdout);
+ break;
+ case PROP_READ_CHUNK_SIZE:
+ g_value_set_uint (value, src->comm.read_chunk_size);
+ break;
+ case PROP_ACK_TIME:
+ g_value_set_uint64 (value, src->comm.ack_time);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_ipc_pipeline_src_log_queue (GstIpcPipelineSrc * src)
+{
+ GList *queued;
+ guint n;
+
+ queued = src->queued;
+ n = 0;
+ GST_LOG_OBJECT (src, "There are %u objects in the queue",
+ g_list_length (queued));
+ while (queued) {
+ void *object = queued->data;
+ if (GST_IS_EVENT (object)) {
+ GST_LOG_OBJECT (src, " #%u: %s event", n, GST_EVENT_TYPE_NAME (object));
+ } else if (GST_IS_QUERY (object)) {
+ GST_LOG_OBJECT (src, " #%u: %s query", n, GST_QUERY_TYPE_NAME (object));
+ } else if (GST_IS_BUFFER (object)) {
+ GST_LOG_OBJECT (src, " #%u: %zu bytes buffer", n,
+ (size_t) gst_buffer_get_size (object));
+ } else {
+ GST_LOG_OBJECT (src, " #%u: unknown item in queue", n);
+ }
+ queued = queued->next;
+ ++n;
+ }
+}
+
+static void
+gst_ipc_pipeline_src_cancel_queued (GstIpcPipelineSrc * src)
+{
+ GList *queued;
+ guint32 id;
+
+ g_mutex_lock (&src->comm.mutex);
+ queued = src->queued;
+ src->queued = NULL;
+ g_cond_broadcast (&src->create_cond);
+ g_mutex_unlock (&src->comm.mutex);
+
+ while (queued) {
+ void *object = queued->data;
+
+ id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (object),
+ QUARK_ID));
+
+ queued = g_list_delete_link (queued, queued);
+ if (GST_IS_EVENT (object)) {
+ GstEvent *event = GST_EVENT (object);
+ GST_DEBUG_OBJECT (src, "Cancelling queued event: %" GST_PTR_FORMAT,
+ event);
+ gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, FALSE);
+ gst_event_unref (event);
+ } else if (GST_IS_BUFFER (object)) {
+ GstBuffer *buffer = GST_BUFFER (object);
+ GST_DEBUG_OBJECT (src, "Cancelling queued buffer: %" GST_PTR_FORMAT,
+ buffer);
+ gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id,
+ GST_FLOW_FLUSHING);
+ gst_buffer_unref (buffer);
+ } else if (GST_IS_QUERY (object)) {
+ GstQuery *query = GST_QUERY (object);
+ GST_DEBUG_OBJECT (src, "Cancelling queued query: %" GST_PTR_FORMAT,
+ query);
+ gst_ipc_pipeline_comm_write_query_result_to_fd (&src->comm, id, FALSE,
+ query);
+ gst_query_unref (query);
+ }
+ }
+
+}
+
+static void
+gst_ipc_pipeline_src_start_loop (GstIpcPipelineSrc * src)
+{
+ g_mutex_lock (&src->comm.mutex);
+ src->flushing = FALSE;
+ src->last_ret = GST_FLOW_OK;
+ g_mutex_unlock (&src->comm.mutex);
+
+ gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_ipc_pipeline_src_loop,
+ src, NULL);
+}
+
+static void
+gst_ipc_pipeline_src_stop_loop (GstIpcPipelineSrc * src, gboolean stop)
+{
+ g_mutex_lock (&src->comm.mutex);
+ src->flushing = TRUE;
+ g_cond_broadcast (&src->create_cond);
+ g_mutex_unlock (&src->comm.mutex);
+
+ if (stop)
+ gst_pad_stop_task (src->srcpad);
+}
+
+static gboolean
+gst_ipc_pipeline_src_activate_mode (GstPad * pad, GstObject * parent,
+ GstPadMode mode, gboolean active)
+{
+ GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (parent);
+
+ switch (mode) {
+ case GST_PAD_MODE_PUSH:
+ GST_DEBUG_OBJECT (pad, "%s in push mode", active ? "activating" :
+ "deactivating");
+ if (active) {
+ gst_ipc_pipeline_src_start_loop (src);
+ } else {
+ gst_ipc_pipeline_src_stop_loop (src, TRUE);
+ gst_ipc_pipeline_comm_cancel (&src->comm, FALSE);
+ }
+ return TRUE;
+ default:
+ GST_DEBUG_OBJECT (pad, "unsupported activation mode");
+ return FALSE;
+ }
+}
+
+static gboolean
+gst_ipc_pipeline_src_srcpad_event (GstPad * pad, GstObject * parent,
+ GstEvent * event)
+{
+ GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (parent);
+ gboolean ret;
+
+ GST_DEBUG_OBJECT (src, "Got upstream event %s", GST_EVENT_TYPE_NAME (event));
+
+ ret = gst_ipc_pipeline_comm_write_event_to_fd (&src->comm, TRUE, event);
+ gst_event_unref (event);
+
+ GST_DEBUG_OBJECT (src, "Returning event result: %d", ret);
+ return ret;
+}
+
+static gboolean
+gst_ipc_pipeline_src_srcpad_query (GstPad * pad, GstObject * parent,
+ GstQuery * query)
+{
+ GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (parent);
+ gboolean ret;
+
+ /* answer some queries that do not make sense to be forwarded */
+ switch (GST_QUERY_TYPE (query)) {
+ case GST_QUERY_LATENCY:
+ return TRUE;
+ case GST_QUERY_CONTEXT:
+ return FALSE;
+ case GST_QUERY_CAPS:
+ {
+ /* caps queries occur even while linking the pipeline.
+ * It is possible that the ipcpipelinesink may not be connected at this
+ * point, so let's avoid a couple of errors... */
+ GstState state;
+ GST_OBJECT_LOCK (src);
+ state = GST_STATE (src);
+ GST_OBJECT_UNLOCK (src);
+ if (state == GST_STATE_NULL)
+ return FALSE;
+ }
+ default:
+ break;
+ }
+
+ GST_DEBUG_OBJECT (src, "Got upstream query %s: %" GST_PTR_FORMAT,
+ GST_QUERY_TYPE_NAME (query), query);
+
+ ret = gst_ipc_pipeline_comm_write_query_to_fd (&src->comm, TRUE, query);
+
+ GST_DEBUG_OBJECT (src, "Returning query result: %d, %" GST_PTR_FORMAT,
+ ret, query);
+ return ret;
+}
+
+static void
+gst_ipc_pipeline_src_loop (GstIpcPipelineSrc * src)
+{
+ gpointer object;
+ guint32 id;
+ gboolean ok;
+ GstFlowReturn ret = GST_FLOW_OK;
+
+ g_mutex_lock (&src->comm.mutex);
+
+ while (!src->queued && !src->flushing)
+ g_cond_wait (&src->create_cond, &src->comm.mutex);
+
+ if (src->flushing)
+ goto out;
+
+ object = src->queued->data;
+ src->queued = g_list_delete_link (src->queued, src->queued);
+ g_mutex_unlock (&src->comm.mutex);
+
+ id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (object),
+ QUARK_ID));
+
+ if (GST_IS_BUFFER (object)) {
+ GstBuffer *buf = GST_BUFFER (object);
+ GST_DEBUG_OBJECT (src, "Pushing queued buffer: %" GST_PTR_FORMAT, buf);
+ ret = gst_pad_push (src->srcpad, buf);
+ GST_DEBUG_OBJECT (src, "pushed id %u, ret: %s", id,
+ gst_flow_get_name (ret));
+ gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id, ret);
+ } else if (GST_IS_EVENT (object)) {
+ GstEvent *event = GST_EVENT (object);
+ GST_DEBUG_OBJECT (src, "Pushing queued event: %" GST_PTR_FORMAT, event);
+ ok = gst_pad_push_event (src->srcpad, event);
+ gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ok);
+ } else if (GST_IS_QUERY (object)) {
+ GstQuery *query = GST_QUERY (object);
+ GST_DEBUG_OBJECT (src, "Pushing queued query: %" GST_PTR_FORMAT, query);
+ ok = gst_pad_peer_query (src->srcpad, query);
+ gst_ipc_pipeline_comm_write_query_result_to_fd (&src->comm, id, ok, query);
+ gst_query_unref (query);
+ } else {
+ GST_WARNING_OBJECT (src, "Unknown data type queued");
+ }
+
+ g_mutex_lock (&src->comm.mutex);
+ if (!src->queued)
+ g_cond_broadcast (&src->create_cond);
+out:
+ if (src->flushing)
+ ret = GST_FLOW_FLUSHING;
+ if (ret != GST_FLOW_OK)
+ src->last_ret = ret;
+ g_mutex_unlock (&src->comm.mutex);
+
+ if (ret == GST_FLOW_FLUSHING) {
+ gst_ipc_pipeline_src_cancel_queued (src);
+ gst_pad_pause_task (src->srcpad);
+ }
+}
+
+static gboolean
+gst_ipc_pipeline_src_send_event (GstElement * element, GstEvent * event)
+{
+ GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
+ return gst_pad_push_event (src->srcpad, event);
+}
+
+static gboolean
+gst_ipc_pipeline_src_query (GstElement * element, GstQuery * query)
+{
+ GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
+ return gst_pad_query (src->srcpad, query);
+}
+
+static GstElement *
+find_pipeline (GstElement * element)
+{
+ GstElement *pipeline = element;
+ while (GST_ELEMENT_PARENT (pipeline)) {
+ pipeline = GST_ELEMENT_PARENT (pipeline);
+ if (GST_IS_PIPELINE (pipeline))
+ break;
+ }
+ if (!pipeline || !GST_IS_PIPELINE (pipeline)) {
+ pipeline = NULL;
+ }
+ return pipeline;
+}
+
+static gboolean
+gst_ipc_pipeline_src_forward_message (GstIpcPipelineSrc * src, GstMessage * msg)
+{
+ gboolean skip = FALSE;
+
+ GST_DEBUG_OBJECT (src, "Message to forward: %" GST_PTR_FORMAT, msg);
+
+ switch (GST_MESSAGE_TYPE (msg)) {
+ case GST_MESSAGE_STATE_CHANGED:
+ {
+ GstState old, new, pending;
+ GstElement *pipeline = find_pipeline (GST_ELEMENT (src));
+
+ gst_message_parse_state_changed (msg, &old, &new, &pending);
+
+ if (GST_MESSAGE_SRC (msg) == GST_OBJECT (pipeline) &&
+ old == new && new == pending) {
+ GST_DEBUG_OBJECT (src, "Detected lost state, notifying master");
+ gst_ipc_pipeline_comm_write_state_lost_to_fd (&src->comm);
+ }
+ /* fall through & skip */
+ }
+ case GST_MESSAGE_ASYNC_START:
+ case GST_MESSAGE_CLOCK_PROVIDE:
+ case GST_MESSAGE_CLOCK_LOST:
+ case GST_MESSAGE_NEW_CLOCK:
+ case GST_MESSAGE_STREAM_STATUS:
+ case GST_MESSAGE_NEED_CONTEXT:
+ case GST_MESSAGE_HAVE_CONTEXT:
+ case GST_MESSAGE_STRUCTURE_CHANGE:
+ skip = TRUE;
+ break;
+ case GST_MESSAGE_RESET_TIME:
+ {
+ GQuark ipcpipelinesrc_posted = g_quark_from_static_string
+ ("gstinterslavepipeline-message-already-posted");
+
+ skip = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (msg),
+ ipcpipelinesrc_posted));
+ if (!skip) {
+ gst_mini_object_set_qdata (GST_MINI_OBJECT (msg), ipcpipelinesrc_posted,
+ GUINT_TO_POINTER (1), NULL);
+ }
+ break;
+ }
+ case GST_MESSAGE_ERROR:
+ {
+ GError *error = NULL;
+
+ /* skip forwarding a RESOURCE/WRITE error message that originated from
+ * ipcpipelinesrc; we post this error when writing to the comm fd fails,
+ * so if we try to forward it here, we will likely get another one posted
+ * immediately and end up doing an endless loop */
+ gst_message_parse_error (msg, &error, NULL);
+ skip = (GST_MESSAGE_SRC (msg) == GST_OBJECT_CAST (src)
+ && error->domain == gst_resource_error_quark ()
+ && error->code == GST_RESOURCE_ERROR_WRITE);
+ g_error_free (error);
+ break;
+ }
+ default:
+ break;
+ }
+
+ if (skip) {
+ GST_DEBUG_OBJECT (src, "message will not be forwarded");
+ return TRUE;
+ }
+
+ return gst_ipc_pipeline_comm_write_message_to_fd (&src->comm, msg);
+}
+
+static void
+on_buffer (guint32 id, GstBuffer * buffer, gpointer user_data)
+{
+ GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data);
+ GST_DEBUG_OBJECT (src, "Got buffer id %u, queueing: %" GST_PTR_FORMAT, id,
+ buffer);
+ g_mutex_lock (&src->comm.mutex);
+ if (!GST_PAD_IS_ACTIVE (src->srcpad) || src->flushing) {
+ g_mutex_unlock (&src->comm.mutex);
+ GST_INFO_OBJECT (src, "We're not started or flushing, buffer ignored");
+ gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id,
+ GST_FLOW_FLUSHING);
+ gst_buffer_unref (buffer);
+ return;
+ }
+ if (src->last_ret != GST_FLOW_OK) {
+ GstFlowReturn last_ret = src->last_ret;
+ g_mutex_unlock (&src->comm.mutex);
+ GST_DEBUG_OBJECT (src, "Last flow was %s, rejecting buffer",
+ gst_flow_get_name (last_ret));
+ gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id, last_ret);
+ gst_buffer_unref (buffer);
+ return;
+ }
+ src->queued = g_list_append (src->queued, buffer); /* keep the ref */
+ gst_ipc_pipeline_src_log_queue (src);
+ g_cond_broadcast (&src->create_cond);
+ g_mutex_unlock (&src->comm.mutex);
+}
+
+static void
+do_oob_event (GstElement * element, gpointer user_data)
+{
+ GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
+ GstEvent *event = user_data;
+ gboolean ret, upstream;
+ guint32 id;
+
+ id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT
+ (event), QUARK_ID));
+ upstream = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT
+ (event), QUARK_UPSTREAM));
+
+ if (upstream) {
+ GstElement *pipeline;
+ gboolean ok = FALSE;
+
+ if (!(pipeline = find_pipeline (element))) {
+ GST_ERROR_OBJECT (src, "No pipeline found");
+ gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ok);
+ } else {
+ GST_DEBUG_OBJECT (src, "Posting upstream event on pipeline: %"
+ GST_PTR_FORMAT, event);
+ ok = gst_element_send_event (pipeline, gst_event_ref (event));
+ gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ok);
+ }
+ } else {
+ GST_DEBUG_OBJECT (src, "Pushing event async: %" GST_PTR_FORMAT, event);
+ ret = gst_element_send_event (element, gst_event_ref (event));
+ GST_DEBUG_OBJECT (src, "Event pushed, return %d", ret);
+ gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ret);
+ }
+}
+
+static void
+on_event (guint32 id, GstEvent * event, gboolean upstream, gpointer user_data)
+{
+ GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data);
+ GstFlowReturn last_ret = GST_FLOW_OK;
+
+ GST_DEBUG_OBJECT (src, "Got event id %u, queueing: %" GST_PTR_FORMAT, id,
+ event);
+
+ gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_UPSTREAM,
+ GINT_TO_POINTER (upstream), NULL);
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_FLUSH_START:
+ gst_ipc_pipeline_src_stop_loop (src, FALSE);
+ break;
+ case GST_EVENT_FLUSH_STOP:
+ gst_ipc_pipeline_src_start_loop (src);
+ break;
+ default:
+ g_mutex_lock (&src->comm.mutex);
+ last_ret = src->last_ret;
+ g_mutex_unlock (&src->comm.mutex);
+ break;
+ }
+
+ if (GST_EVENT_IS_SERIALIZED (event) && !upstream) {
+ if (last_ret != GST_FLOW_OK) {
+ GST_DEBUG_OBJECT (src, "Last flow was %s, rejecting event",
+ gst_flow_get_name (last_ret));
+ gst_event_unref (event);
+ gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, FALSE);
+ } else {
+ GST_DEBUG_OBJECT (src, "This is a serialized event, adding to queue %p",
+ src->queued);
+ g_mutex_lock (&src->comm.mutex);
+ src->queued = g_list_append (src->queued, event); /* keep the ref */
+ gst_ipc_pipeline_src_log_queue (src);
+ g_cond_broadcast (&src->create_cond);
+ g_mutex_unlock (&src->comm.mutex);
+ }
+ } else {
+ if (last_ret != GST_FLOW_OK && !upstream) {
+ GST_DEBUG_OBJECT (src, "Last flow was %s, rejecting event",
+ gst_flow_get_name (last_ret));
+ gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, FALSE);
+ gst_event_unref (event);
+ } else {
+ GST_DEBUG_OBJECT (src,
+ "This is not a serialized event, pushing in a thread");
+ gst_element_call_async (GST_ELEMENT (src), do_oob_event, event,
+ (GDestroyNotify) gst_event_unref);
+ }
+ }
+}
+
+static void
+do_oob_query (GstElement * element, gpointer user_data)
+{
+ GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
+ GstQuery *query = GST_QUERY (user_data);
+ guint32 id;
+ gboolean upstream;
+ gboolean ret;
+
+ id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT
+ (query), QUARK_ID));
+ upstream = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT
+ (query), QUARK_UPSTREAM));
+
+ if (upstream) {
+ GstElement *pipeline;
+
+ if (!(pipeline = find_pipeline (element))) {
+ GST_ERROR_OBJECT (src, "No pipeline found");
+ ret = FALSE;
+ } else {
+ GST_DEBUG_OBJECT (src, "Posting query on pipeline: %" GST_PTR_FORMAT,
+ query);
+ ret = gst_element_query (pipeline, query);
+ }
+ } else {
+ GST_DEBUG_OBJECT (src, "Pushing query async: %" GST_PTR_FORMAT, query);
+ ret = gst_pad_peer_query (src->srcpad, query);
+ GST_DEBUG_OBJECT (src, "Query pushed, return %d", ret);
+ }
+ gst_ipc_pipeline_comm_write_query_result_to_fd (&src->comm, id, ret, query);
+}
+
+static void
+on_query (guint32 id, GstQuery * query, gboolean upstream, gpointer user_data)
+{
+ GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data);
+
+ GST_DEBUG_OBJECT (src, "Got query id %u, queueing: %" GST_PTR_FORMAT, id,
+ query);
+
+ if (GST_QUERY_IS_SERIALIZED (query) && !upstream) {
+ g_mutex_lock (&src->comm.mutex);
+ src->queued = g_list_append (src->queued, query); /* keep the ref */
+ gst_ipc_pipeline_src_log_queue (src);
+ g_cond_broadcast (&src->create_cond);
+ g_mutex_unlock (&src->comm.mutex);
+ } else {
+ gst_mini_object_set_qdata (GST_MINI_OBJECT (query), QUARK_UPSTREAM,
+ GINT_TO_POINTER (upstream), NULL);
+ gst_element_call_async (GST_ELEMENT (src), do_oob_query, query,
+ (GDestroyNotify) gst_query_unref);
+ }
+}
+
+struct StateChangeData
+{
+ guint32 id;
+ GstStateChange transition;
+};
+
+static void
+do_state_change (GstElement * element, gpointer data)
+{
+ GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
+ GstElement *pipeline;
+ GstStateChangeReturn ret = GST_STATE_CHANGE_FAILURE;
+ GstState state, pending, effective;
+ struct StateChangeData *d = data;
+ guint32 id = d->id;
+ GstStateChange transition = d->transition;
+ gboolean down;
+
+ GST_DEBUG_OBJECT (src, "Doing state change id %u, %s -> %s", id,
+ gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
+ gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
+
+ if (!(pipeline = find_pipeline (element))) {
+ GST_ERROR_OBJECT (src, "No pipeline found");
+ ret = GST_STATE_CHANGE_FAILURE;
+ goto done_nolock;
+ }
+
+ down = (GST_STATE_TRANSITION_CURRENT (transition) >=
+ GST_STATE_TRANSITION_NEXT (transition));
+
+ GST_STATE_LOCK (pipeline);
+ ret = gst_element_get_state (pipeline, &state, &pending, 0);
+
+ /* if we are pending a state change, count the pending state as
+ * the current one */
+ effective = pending == GST_STATE_VOID_PENDING ? state : pending;
+
+ GST_DEBUG_OBJECT (src, "Current element state: ret:%s state:%s pending:%s "
+ "effective:%s", gst_element_state_change_return_get_name (ret),
+ gst_element_state_get_name (state),
+ gst_element_state_get_name (pending),
+ gst_element_state_get_name (effective));
+
+ if ((GST_STATE_TRANSITION_NEXT (transition) <= effective && !down) ||
+ (GST_STATE_TRANSITION_NEXT (transition) > effective && down)) {
+ /* if the request was to transition to a state that we have already
+ * transitioned to in the same direction, then we just silently return */
+ GST_DEBUG_OBJECT (src, "State transition to %s is unnecessary",
+ gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
+ /* make sure we return SUCCESS if the transition is to NULL or READY,
+ * even if our current ret is ASYNC for example; also, make sure not
+ * to return FAILURE, since our state is already committed */
+ if (GST_STATE_TRANSITION_NEXT (transition) <= GST_STATE_READY ||
+ ret == GST_STATE_CHANGE_FAILURE) {
+ ret = GST_STATE_CHANGE_SUCCESS;
+ }
+ } else if (ret != GST_STATE_CHANGE_FAILURE || down) {
+ /* if the request was to transition to a state that we haven't already
+ * transitioned to in the same direction, then we need to request a state
+ * change in the pipeline, *unless* we are going upwards and the last ret
+ * was FAILURE, in which case we should just return FAILURE and stop.
+ * We don't stop a downwards state change though in case of FAILURE, since
+ * we need to be able to bring the pipeline down to NULL. Note that
+ * GST_MESSAGE_ERROR will cause ret to be GST_STATE_CHANGE_FAILURE */
+ ret = gst_element_set_state (pipeline,
+ GST_STATE_TRANSITION_NEXT (transition));
+ GST_DEBUG_OBJECT (src, "gst_element_set_state returned %s",
+ gst_element_state_change_return_get_name (ret));
+ }
+
+ GST_STATE_UNLOCK (pipeline);
+
+done_nolock:
+ GST_DEBUG_OBJECT (src, "sending state change ack, ret = %s",
+ gst_element_state_change_return_get_name (ret));
+ gst_ipc_pipeline_comm_write_state_change_ack_to_fd (&src->comm, id, ret);
+}
+
+static void
+on_state_change (guint32 id, GstStateChange transition, gpointer user_data)
+{
+ struct StateChangeData *d;
+ GstElement *ipcpipelinesrc = GST_ELEMENT (user_data);
+
+ GST_DEBUG_OBJECT (ipcpipelinesrc, "Got state change id %u, %s -> %s", id,
+ gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
+ gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
+
+ d = g_new (struct StateChangeData, 1);
+ d->id = id;
+ d->transition = transition;
+
+ gst_element_call_async (ipcpipelinesrc, do_state_change, d, g_free);
+}
+
+static void
+on_message (guint32 id, GstMessage * message, gpointer user_data)
+{
+ GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data);
+
+ GST_ERROR_OBJECT (src, "Got message id %u, not supposed to: %" GST_PTR_FORMAT,
+ id, message);
+ gst_message_unref (message);
+}
+
+static gboolean
+gst_ipc_pipeline_src_start_reader_thread (GstIpcPipelineSrc * src)
+{
+ if (!gst_ipc_pipeline_comm_start_reader_thread (&src->comm, on_buffer,
+ on_event, on_query, on_state_change, NULL, on_message, src)) {
+ GST_ERROR_OBJECT (src, "Failed to start reader thread");
+ return FALSE;
+ }
+ return TRUE;
+}
+
+static void
+gst_ipc_pipeline_src_stop_reader_thread (GstIpcPipelineSrc * src)
+{
+ gst_ipc_pipeline_comm_stop_reader_thread (&src->comm);
+}
+
+static void
+gst_ipc_pipeline_src_disconnect (GstIpcPipelineSrc * src)
+{
+ GST_DEBUG_OBJECT (src, "Disconnecting");
+ gst_ipc_pipeline_src_stop_reader_thread (src);
+ src->comm.fdin = -1;
+ src->comm.fdout = -1;
+ gst_ipc_pipeline_comm_cancel (&src->comm, FALSE);
+ gst_ipc_pipeline_src_start_reader_thread (src);
+}
+
+static GstStateChangeReturn
+gst_ipc_pipeline_src_change_state (GstElement * element,
+ GstStateChange transition)
+{
+ GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
+
+ switch (transition) {
+ case GST_STATE_CHANGE_NULL_TO_READY:
+ if (src->comm.fdin < 0) {
+ GST_ERROR_OBJECT (element, "Invalid fdin: %d", src->comm.fdin);
+ return GST_STATE_CHANGE_FAILURE;
+ }
+ if (src->comm.fdout < 0) {
+ GST_ERROR_OBJECT (element, "Invalid fdout: %d", src->comm.fdout);
+ return GST_STATE_CHANGE_FAILURE;
+ }
+ if (!src->comm.reader_thread) {
+ GST_ERROR_OBJECT (element, "Failed to start reader thread");
+ return GST_STATE_CHANGE_FAILURE;
+ }
+ break;
+ default:
+ break;
+ }
+ return GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+}
diff --git a/gst/ipcpipeline/gstipcpipelinesrc.h b/gst/ipcpipeline/gstipcpipelinesrc.h
new file mode 100644
index 000000000..221fccdd1
--- /dev/null
+++ b/gst/ipcpipeline/gstipcpipelinesrc.h
@@ -0,0 +1,76 @@
+/* GStreamer
+ * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
+ * 2000 Wim Taymans <wtay@chello.be>
+ * 2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
+ *
+ * gstipcpipelinesrc.h:
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+
+#ifndef __GST_IPC_PIPELINE_SRC_H__
+#define __GST_IPC_PIPELINE_SRC_H__
+
+#include <gst/gst.h>
+#include "gstipcpipelinecomm.h"
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_IPC_PIPELINE_SRC \
+ (gst_ipc_pipeline_src_get_type())
+#define GST_IPC_PIPELINE_SRC(obj) \
+ (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_IPC_PIPELINE_SRC,GstIpcPipelineSrc))
+#define GST_IPC_PIPELINE_SRC_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_IPC_PIPELINE_SRC,GstIpcPipelineSrcClass))
+#define GST_IS_IPC_PIPELINE_SRC(obj) \
+ (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_IPC_PIPELINE_SRC))
+#define GST_IS_IPC_PIPELINE_SRC_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_IPC_PIPELINE_SRC))
+
+typedef struct _GstIpcPipelineSrc GstIpcPipelineSrc;
+typedef struct _GstIpcPipelineSrcClass GstIpcPipelineSrcClass;
+
+/**
+ * GstIpcPipelineSrc:
+ *
+ * Opaque #GstIpcPipelineSrc data structure.
+ */
+struct _GstIpcPipelineSrc {
+ GstElement element;
+
+ GstIpcPipelineComm comm;
+ GstPad *srcpad;
+
+ gboolean flushing;
+ GList *queued;
+ GstFlowReturn last_ret;
+
+ GCond create_cond;
+};
+
+struct _GstIpcPipelineSrcClass {
+ GstElementClass parent_class;
+
+ gboolean (*forward_message) (GstIpcPipelineSrc *, GstMessage *);
+ void (*disconnect) (GstIpcPipelineSrc * src);
+};
+
+G_GNUC_INTERNAL GType gst_ipc_pipeline_src_get_type (void);
+
+G_END_DECLS
+
+#endif /* __GST_IPC_PIPELINE_SRC_H__ */
diff --git a/gst/ipcpipeline/gstipcslavepipeline.c b/gst/ipcpipeline/gstipcslavepipeline.c
new file mode 100644
index 000000000..2c9dd1516
--- /dev/null
+++ b/gst/ipcpipeline/gstipcslavepipeline.c
@@ -0,0 +1,122 @@
+/* GStreamer
+ * Copyright (C) 2015-2017 YouView TV Ltd
+ * Author: Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
+ *
+ * gstipcslavepipeline.c:
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+/**
+ * SECTION:element-ipcslavepipeline
+ * @see_also: #GstIpcPipelineSink, #GstIpcPipelineSrc
+ *
+ * This is a GstPipeline subclass meant to embed one ore more ipcpipelinesrc
+ * elements, and be slaved transparently to the master pipeline, using one ore
+ * more ipcpipelinesink elements on the master.
+ *
+ * The actual pipeline slaving logic happens in ipcpipelinesrc. The only thing
+ * that this class actually does is that it watches the pipeline bus for
+ * messages and forwards them to the master pipeline through the ipcpipelinesrc
+ * elements that it contains.
+ *
+ * For more details about this mechanism and its uses, see the documentation
+ * of the ipcpipelinesink element.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <string.h>
+
+#include "gstipcpipelinesrc.h"
+#include "gstipcslavepipeline.h"
+
+GST_DEBUG_CATEGORY_STATIC (gst_ipcslavepipeline_debug);
+#define GST_CAT_DEFAULT gst_ipcslavepipeline_debug
+
+#define _do_init \
+ GST_DEBUG_CATEGORY_INIT (gst_ipcslavepipeline_debug, "ipcslavepipeline", 0, "ipcslavepipeline element");
+#define gst_ipc_slave_pipeline_parent_class parent_class
+G_DEFINE_TYPE_WITH_CODE (GstIpcSlavePipeline, gst_ipc_slave_pipeline,
+ GST_TYPE_PIPELINE, _do_init);
+
+static gboolean gst_ipc_slave_pipeline_post_message (GstElement * element,
+ GstMessage * message);
+
+static void
+gst_ipc_slave_pipeline_class_init (GstIpcSlavePipelineClass * klass)
+{
+ GstElementClass *element_class;
+
+ element_class = GST_ELEMENT_CLASS (klass);
+
+ element_class->post_message = gst_ipc_slave_pipeline_post_message;
+
+ gst_element_class_set_static_metadata (element_class,
+ "Inter-process slave pipeline",
+ "Generic/Bin/Slave",
+ "Contains the slave part of an inter-process pipeline",
+ "Vincent Penquerc'h <vincent.penquerch@collabora.co.uk");
+}
+
+static void
+gst_ipc_slave_pipeline_init (GstIpcSlavePipeline * isp)
+{
+}
+
+static gboolean
+send_message_if_ipcpipelinesrc (const GValue * v, GValue * r,
+ gpointer user_data)
+{
+ GstElement *e;
+ GType et;
+ gboolean ret;
+ GstMessage *message = user_data;
+
+ e = g_value_get_object (v);
+ et = gst_element_factory_get_element_type (gst_element_get_factory (e));
+ if (et == GST_TYPE_IPC_PIPELINE_SRC) {
+ g_signal_emit_by_name (G_OBJECT (e), "forward-message", message, &ret);
+
+ /* if we succesfully sent this to the master and it's not ASYNC_DONE or EOS,
+ * we can skip sending it again through the other ipcpipelinesrcs */
+ if (ret && GST_MESSAGE_TYPE (message) != GST_MESSAGE_ASYNC_DONE &&
+ GST_MESSAGE_TYPE (message) != GST_MESSAGE_EOS)
+ return FALSE;
+ }
+ return TRUE;
+}
+
+static void
+gst_ipc_slave_pipeline_forward_message (GstIpcSlavePipeline * pipeline,
+ GstMessage * message)
+{
+ GstIterator *it;
+
+ it = gst_bin_iterate_sources (GST_BIN (pipeline));
+ gst_iterator_fold (it, send_message_if_ipcpipelinesrc, NULL, message);
+ gst_iterator_free (it);
+}
+
+static gboolean
+gst_ipc_slave_pipeline_post_message (GstElement * element, GstMessage * message)
+{
+ gst_ipc_slave_pipeline_forward_message (GST_IPC_SLAVE_PIPELINE
+ (element), message);
+
+ return GST_ELEMENT_CLASS (parent_class)->post_message (element, message);
+}
diff --git a/gst/ipcpipeline/gstipcslavepipeline.h b/gst/ipcpipeline/gstipcslavepipeline.h
new file mode 100644
index 000000000..354dfacf0
--- /dev/null
+++ b/gst/ipcpipeline/gstipcslavepipeline.h
@@ -0,0 +1,59 @@
+/* GStreamer
+ * Copyright (C) 2015-2017 YouView TV Ltd
+ * Author: Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
+ *
+ * gstipcslavepipeline.h:
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+#ifndef _GST_IPC_SLAVE_PIPELINE_H_
+#define _GST_IPC_SLAVE_PIPELINE_H_
+
+#include <gst/gst.h>
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_IPC_SLAVE_PIPELINE \
+ (gst_ipc_slave_pipeline_get_type())
+#define GST_IPC_SLAVE_PIPELINE(obj) \
+ (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_IPC_SLAVE_PIPELINE,GstIpcSlavePipeline))
+#define GST_IPC_SLAVE_PIPELINE_CAST(obj) \
+ ((GstIpcSlavePipeline *) obj)
+#define GST_IPC_SLAVE_PIPELINE_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_IPC_SLAVE_PIPELINE,GstIpcSlavePipelineClass))
+#define GST_IS_IPC_SLAVE_PIPELINE(obj) \
+ (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_IPC_SLAVE_PIPELINE))
+#define GST_IS_IPC_SLAVE_PIPELINE_CLASS(obj) \
+ (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_IPC_SLAVE_PIPELINE))
+
+typedef struct _GstIpcSlavePipeline GstIpcSlavePipeline;
+typedef struct _GstIpcSlavePipelineClass GstIpcSlavePipelineClass;
+
+struct _GstIpcSlavePipeline
+{
+ GstPipeline pipeline;
+};
+
+struct _GstIpcSlavePipelineClass
+{
+ GstPipelineClass pipeline_class;
+};
+
+G_GNUC_INTERNAL GType gst_ipc_slave_pipeline_get_type (void);
+
+G_END_DECLS
+
+#endif
diff --git a/gst/ipcpipeline/protocol.txt b/gst/ipcpipeline/protocol.txt
new file mode 100644
index 000000000..77f6e568e
--- /dev/null
+++ b/gst/ipcpipeline/protocol.txt
@@ -0,0 +1,92 @@
+This documents the protocol used to pass data over fds between ipcpipelinesrc
+and ipcpipelinesink.
+
+The protocol is used in both directions. However, some combinations do
+not make sense (eg, a buffer going from ipcpipelinesrc to ipcpipelinesink).
+
+The protocol consists of an arbitrary number of variable sized chunks
+with a type. Each chunk has a request ID which can be used to match a
+request with its reply (ack / query result).
+
+Each chunk consists of:
+ - a type (byte):
+ 1: ack
+ 2: query result
+ 3: buffer
+ 4: event
+ 5: sink message event
+ 6: query
+ 7: state change
+ 8: state lost
+ 9: message
+ 10: error/warning/info message
+ - a request ID, 4 bytes, little endian
+ - the payload size, 4 bytes, little endian
+ - N bytes payload
+
+Depending on the type, the payload can contain:
+
+ - 1: ack
+ result: 4 bytes, little endian
+ interpreted as GstFlowReturn for buffers, boolean for events and
+ GstStateChangeReturn for state changes
+ - 2: query result
+ result boolean: 1 byte
+ query type: 4 bytes, little endian
+ returned query string representation, NUL terminated
+ - 3: buffer:
+ pts: 8 bytes, little endian
+ dts: 8 bytes, little endian
+ duration: 8 bytes, little endian
+ offset: 8 bytes, little endian
+ offset end: 8 bytes, little endian
+ flags: 8 bytes, little endian
+ buffer size: 4 bytes, little endian
+ data: contents of the buffer data, size specified in "buffer size"
+ number of GstMeta: 4 bytes, little endian
+ For each GstMeta:
+ bytes: 4 bytes, little endian
+ this is the number of bytes before the string representation
+ at the end of this block, including the 4 bytes of itself
+ flags: 4 bytes, little endian
+ length of the GstMetaInfo::api name: 4 bytes, little endian
+ GstMetaInfo::api name: string, NUL terminated
+ GstMetaInfo::size: 8 bytes, little endian
+ length of the string representation: 4 bytes, little endian
+ string representation, NUL terminated
+ - 4: event
+ event type: 4 bytes, little endian
+ sequence number: 4 bytes, little endian
+ direction: 1 byte
+ whether the event is going upstream (1) or downstream (0)
+ string representation, NUL terminated
+ - 5: sink message event
+ message type: 4 bytes, little endian
+ event sequence number: 4 bytes, little endian
+ message sequence number: 4 bytes, little endian
+ length: 4 bytes, little endian
+ event structure name: length bytes, NUL terminated
+ message structure string representation: remaining bytes, NUL terminated
+ - 6: query
+ query type: 4 bytes, little endian
+ direction: 1 byte
+ whether the query is going upstream (1) or downstream (0)
+ string representation, NUL terminated
+ - 7: state change
+ GstStateChange: 4 bytes, little endian
+ - 8: state lost
+ no payload
+ - 9: message
+ message type: 4 bytes, little endian
+ string representation, NUL terminated
+ - 10: error/warning/info message
+ message type (2 = error, 1 = warning, 0 = info): 1 byte
+ error domain string length: 4 bytes, little endian
+ string representation of the error domain, NUL terminated
+ error code: 4 bytes, little endian
+ length: 4 bytes, little endian
+ if zero: no error message
+ if non zero: As many bytes as this length: the error message, NUL terminated
+ length: 4 bytes, little endian
+ if zero: no extra message
+ if non zero: As many bytes as this length: the error extra debug message, NUL terminated