summaryrefslogtreecommitdiff
path: root/gst
diff options
context:
space:
mode:
authorGeorge Kiagiadakis <george.kiagiadakis@collabora.com>2017-08-01 17:07:59 +0300
committerGeorge Kiagiadakis <george.kiagiadakis@collabora.com>2017-08-02 10:40:24 +0300
commit30f5abc32c7c8d8517f5089a54756efa4ab63428 (patch)
treef43544b9c7d88cebb82ce24e8e3447d1eba7d47a /gst
parenta2053380821bbadbd44768cd231b4b68fa94d0a0 (diff)
downloadgstreamer-plugins-bad-30f5abc32c7c8d8517f5089a54756efa4ab63428.tar.gz
ipcpipeline: move to sys/ and make it dependent on platform support for unix sockets
Diffstat (limited to 'gst')
-rw-r--r--gst/ipcpipeline/Makefile.am28
-rw-r--r--gst/ipcpipeline/gstipcpipeline.c48
-rw-r--r--gst/ipcpipeline/gstipcpipelinecomm.c2341
-rw-r--r--gst/ipcpipeline/gstipcpipelinecomm.h132
-rw-r--r--gst/ipcpipeline/gstipcpipelinesink.c722
-rw-r--r--gst/ipcpipeline/gstipcpipelinesink.h67
-rw-r--r--gst/ipcpipeline/gstipcpipelinesrc.c954
-rw-r--r--gst/ipcpipeline/gstipcpipelinesrc.h76
-rw-r--r--gst/ipcpipeline/gstipcslavepipeline.c122
-rw-r--r--gst/ipcpipeline/gstipcslavepipeline.h59
-rw-r--r--gst/ipcpipeline/meson.build16
-rw-r--r--gst/ipcpipeline/protocol.txt92
-rw-r--r--gst/meson.build1
13 files changed, 0 insertions, 4658 deletions
diff --git a/gst/ipcpipeline/Makefile.am b/gst/ipcpipeline/Makefile.am
deleted file mode 100644
index 12bbed14b..000000000
--- a/gst/ipcpipeline/Makefile.am
+++ /dev/null
@@ -1,28 +0,0 @@
-plugin_LTLIBRARIES = libgstipcpipeline.la
-
-libgstipcpipeline_la_SOURCES = \
- gstipcpipeline.c \
- gstipcpipelinecomm.c \
- gstipcpipelinesink.c \
- gstipcpipelinesrc.c \
- gstipcslavepipeline.c
-
-noinst_HEADERS = \
- gstipcpipelinecomm.h \
- gstipcpipelinesink.h \
- gstipcpipelinesrc.h \
- gstipcslavepipeline.h
-
-libgstipcpipeline_la_CFLAGS = \
- $(GST_PLUGINS_BAD_CFLAGS) \
- $(GST_PLUGINS_BASE_CFLAGS) \
- $(GST_BASE_CFLAGS) \
- $(GST_CFLAGS)
-
-libgstipcpipeline_la_LIBADD = \
- $(GST_PLUGINS_BASE_LIBS) \
- $(GST_BASE_LIBS) \
- $(GST_LIBS) \
- $(LIBM)
-
-libgstipcpipeline_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
diff --git a/gst/ipcpipeline/gstipcpipeline.c b/gst/ipcpipeline/gstipcpipeline.c
deleted file mode 100644
index 4d647d49d..000000000
--- a/gst/ipcpipeline/gstipcpipeline.c
+++ /dev/null
@@ -1,48 +0,0 @@
-/* GStreamer
- * Copyright (C) 2017 YouView TV Ltd
- * Author: George Kiagiadakis <george.Kiagiadakis@collabora.com>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
- * Boston, MA 02110-1335, USA.
- */
-
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-
-#include "gstipcpipelinecomm.h"
-#include "gstipcpipelinesink.h"
-#include "gstipcpipelinesrc.h"
-#include "gstipcslavepipeline.h"
-
-static gboolean
-plugin_init (GstPlugin * plugin)
-{
- gst_ipc_pipeline_comm_plugin_init ();
- gst_element_register (plugin, "ipcpipelinesrc", GST_RANK_NONE,
- GST_TYPE_IPC_PIPELINE_SRC);
- gst_element_register (plugin, "ipcpipelinesink", GST_RANK_NONE,
- GST_TYPE_IPC_PIPELINE_SINK);
- gst_element_register (plugin, "ipcslavepipeline", GST_RANK_NONE,
- GST_TYPE_IPC_SLAVE_PIPELINE);
-
- return TRUE;
-}
-
-GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
- GST_VERSION_MINOR,
- ipcpipeline,
- "plugin for inter-process pipeline communication",
- plugin_init, VERSION, "LGPL", PACKAGE_NAME, GST_PACKAGE_ORIGIN)
diff --git a/gst/ipcpipeline/gstipcpipelinecomm.c b/gst/ipcpipeline/gstipcpipelinecomm.c
deleted file mode 100644
index eee8e528f..000000000
--- a/gst/ipcpipeline/gstipcpipelinecomm.c
+++ /dev/null
@@ -1,2341 +0,0 @@
-/* GStreamer
- * Copyright (C) 2015-2017 YouView TV Ltd
- * Author: Vincent Penquerch <vincent.penquerch@collabora.co.uk>
- *
- * gstipcpipelinecomm.c:
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
- * Boston, MA 02110-1301, USA.
- */
-
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#endif
-
-#include <unistd.h>
-#include <errno.h>
-#include <string.h>
-#include <stdlib.h>
-#include <gst/base/gstbytewriter.h>
-#include <gst/gstprotection.h>
-#include "gstipcpipelinecomm.h"
-
-GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_comm_debug);
-#define GST_CAT_DEFAULT gst_ipc_pipeline_comm_debug
-
-#define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND)
-
-GQuark QUARK_ID;
-
-typedef enum
-{
- ACK_TYPE_NONE,
- ACK_TYPE_TIMED,
- ACK_TYPE_BLOCKING
-} AckType;
-
-typedef enum
-{
- COMM_REQUEST_TYPE_BUFFER,
- COMM_REQUEST_TYPE_EVENT,
- COMM_REQUEST_TYPE_QUERY,
- COMM_REQUEST_TYPE_STATE_CHANGE,
- COMM_REQUEST_TYPE_MESSAGE,
-} CommRequestType;
-
-typedef struct
-{
- guint32 id;
- gboolean replied;
- gboolean comm_error;
- guint32 ret;
- GstQuery *query;
- CommRequestType type;
- GCond cond;
-} CommRequest;
-
-static const gchar *comm_request_ret_get_name (CommRequestType type,
- guint32 ret);
-static guint32 comm_request_ret_get_failure_value (CommRequestType type);
-
-static CommRequest *
-comm_request_new (guint32 id, CommRequestType type, GstQuery * query)
-{
- CommRequest *req;
-
- req = g_malloc (sizeof (CommRequest));
- req->id = id;
- g_cond_init (&req->cond);
- req->replied = FALSE;
- req->comm_error = FALSE;
- req->query = query;
- req->ret = comm_request_ret_get_failure_value (type);
- req->type = type;
-
- return req;
-}
-
-static guint32
-comm_request_wait (GstIpcPipelineComm * comm, CommRequest * req,
- AckType ack_type)
-{
- guint32 ret = comm_request_ret_get_failure_value (req->type);
- guint64 end_time;
-
- if (ack_type == ACK_TYPE_TIMED)
- end_time = g_get_monotonic_time () + comm->ack_time;
- else
- end_time = G_MAXUINT64;
-
- GST_TRACE_OBJECT (comm->element, "Waiting for ACK/NAK for request %u",
- req->id);
- while (!req->replied) {
- if (ack_type == ACK_TYPE_TIMED) {
- g_cond_wait_until (&req->cond, &comm->mutex, end_time);
- if (g_get_monotonic_time () >= end_time)
- break;
- } else
- g_cond_wait (&req->cond, &comm->mutex);
- }
-
- if (req->replied) {
- ret = req->ret;
- GST_TRACE_OBJECT (comm->element, "Got reply for request %u: %d (%s)",
- req->id, ret, comm_request_ret_get_name (req->type, ret));
- } else {
- req->comm_error = TRUE;
- GST_ERROR_OBJECT (comm->element, "Timeout waiting for reply for request %u",
- req->id);
- }
-
- return ret;
-}
-
-static void
-comm_request_free (CommRequest * req)
-{
- g_cond_clear (&req->cond);
- g_free (req);
-}
-
-static const gchar *
-comm_request_ret_get_name (CommRequestType type, guint32 ret)
-{
- switch (type) {
- case COMM_REQUEST_TYPE_BUFFER:
- return gst_flow_get_name (ret);
- case COMM_REQUEST_TYPE_EVENT:
- case COMM_REQUEST_TYPE_QUERY:
- case COMM_REQUEST_TYPE_MESSAGE:
- return ret ? "TRUE" : "FALSE";
- case COMM_REQUEST_TYPE_STATE_CHANGE:
- return gst_element_state_change_return_get_name (ret);
- default:
- g_assert_not_reached ();
- }
-}
-
-static guint32
-comm_request_ret_get_failure_value (CommRequestType type)
-{
- switch (type) {
- case COMM_REQUEST_TYPE_BUFFER:
- return GST_FLOW_COMM_ERROR;
- case COMM_REQUEST_TYPE_EVENT:
- case COMM_REQUEST_TYPE_MESSAGE:
- case COMM_REQUEST_TYPE_QUERY:
- return FALSE;
- case COMM_REQUEST_TYPE_STATE_CHANGE:
- return GST_STATE_CHANGE_FAILURE;
- default:
- g_assert_not_reached ();
- }
-}
-
-static const gchar *
-gst_ipc_pipeline_comm_data_type_get_name (GstIpcPipelineCommDataType type)
-{
- switch (type) {
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
- return "ACK";
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
- return "QUERY_RESULT";
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
- return "BUFFER";
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
- return "EVENT";
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
- return "SINK_MESSAGE_EVENT";
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
- return "QUERY";
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
- return "STATE_CHANGE";
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
- return "STATE_LOST";
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
- return "MESSAGE";
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
- return "GERROR_MESSAGE";
- default:
- return "UNKNOWN";
- }
-}
-
-static gboolean
-gst_ipc_pipeline_comm_sync_fd (GstIpcPipelineComm * comm, guint32 id,
- GstQuery * query, guint32 * ret, AckType ack_type, CommRequestType type)
-{
- CommRequest *req;
- gboolean comm_error;
- GHashTable *waiting_ids;
-
- if (ack_type == ACK_TYPE_NONE)
- return TRUE;
-
- req = comm_request_new (id, type, query);
- waiting_ids = g_hash_table_ref (comm->waiting_ids);
- g_hash_table_insert (waiting_ids, GINT_TO_POINTER (id), req);
- *ret = comm_request_wait (comm, req, ack_type);
- comm_error = req->comm_error;
- g_hash_table_remove (waiting_ids, GINT_TO_POINTER (id));
- g_hash_table_unref (waiting_ids);
- return !comm_error;
-}
-
-static gboolean
-write_to_fd_raw (GstIpcPipelineComm * comm, const void *data, size_t size)
-{
- size_t offset;
- gboolean ret = TRUE;
-
- offset = 0;
- GST_TRACE_OBJECT (comm->element, "Writing %zu bytes to fdout", size);
- while (size) {
- ssize_t written =
- write (comm->fdout, (const unsigned char *) data + offset, size);
- if (written < 0) {
- if (errno == EAGAIN || errno == EINTR)
- continue;
- GST_ERROR_OBJECT (comm->element, "Failed to write to fd: %s",
- strerror (errno));
- ret = FALSE;
- goto done;
- }
- size -= written;
- offset += written;
- }
-
-done:
- return ret;
-}
-
-static gboolean
-write_byte_writer_to_fd (GstIpcPipelineComm * comm, GstByteWriter * bw)
-{
- guint8 *data;
- gboolean ret;
- guint size;
-
- size = gst_byte_writer_get_size (bw);
- data = gst_byte_writer_reset_and_get_data (bw);
- if (!data)
- return FALSE;
- ret = write_to_fd_raw (comm, data, size);
- g_free (data);
- return ret;
-}
-
-static void
-gst_ipc_pipeline_comm_write_ack_to_fd (GstIpcPipelineComm * comm, guint32 id,
- guint32 ret, CommRequestType type)
-{
- const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK;
- guint32 size;
- GstByteWriter bw;
-
- g_mutex_lock (&comm->mutex);
-
- GST_TRACE_OBJECT (comm->element, "Writing ACK for %u: %s (%d)", id,
- comm_request_ret_get_name (type, ret), ret);
- gst_byte_writer_init (&bw);
- if (!gst_byte_writer_put_uint8 (&bw, payload_type))
- goto write_failed;
- if (!gst_byte_writer_put_uint32_le (&bw, id))
- goto write_failed;
- size = sizeof (ret);
- if (!gst_byte_writer_put_uint32_le (&bw, size))
- goto write_failed;
- if (!gst_byte_writer_put_uint32_le (&bw, ret))
- goto write_failed;
-
- if (!write_byte_writer_to_fd (comm, &bw))
- goto write_failed;
-
-done:
- g_mutex_unlock (&comm->mutex);
- gst_byte_writer_reset (&bw);
- return;
-
-write_failed:
- GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
- ("Failed to write to socket"));
- goto done;
-}
-
-void
-gst_ipc_pipeline_comm_write_flow_ack_to_fd (GstIpcPipelineComm * comm,
- guint32 id, GstFlowReturn ret)
-{
- gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
- COMM_REQUEST_TYPE_BUFFER);
-}
-
-void
-gst_ipc_pipeline_comm_write_boolean_ack_to_fd (GstIpcPipelineComm * comm,
- guint32 id, gboolean ret)
-{
- gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
- COMM_REQUEST_TYPE_EVENT);
-}
-
-void
-gst_ipc_pipeline_comm_write_state_change_ack_to_fd (GstIpcPipelineComm * comm,
- guint32 id, GstStateChangeReturn ret)
-{
- gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
- COMM_REQUEST_TYPE_STATE_CHANGE);
-}
-
-void
-gst_ipc_pipeline_comm_write_query_result_to_fd (GstIpcPipelineComm * comm,
- guint32 id, gboolean result, GstQuery * query)
-{
- const unsigned char payload_type =
- GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT;
- guint8 result8 = result;
- guint32 size;
- size_t len;
- char *str = NULL;
- guint32 type;
- const GstStructure *structure;
- GstByteWriter bw;
-
- g_mutex_lock (&comm->mutex);
-
- GST_TRACE_OBJECT (comm->element,
- "Writing query result for %u: %d, %" GST_PTR_FORMAT, id, result, query);
- gst_byte_writer_init (&bw);
- if (!gst_byte_writer_put_uint8 (&bw, payload_type))
- goto write_failed;
- if (!gst_byte_writer_put_uint32_le (&bw, id))
- goto write_failed;
- structure = gst_query_get_structure (query);
- if (structure) {
- str = gst_structure_to_string (structure);
- len = strlen (str);
- } else {
- str = NULL;
- len = 0;
- }
- size = 1 + sizeof (guint32) + len + 1;
- if (!gst_byte_writer_put_uint32_le (&bw, size))
- goto write_failed;
- if (!gst_byte_writer_put_uint8 (&bw, result8))
- goto write_failed;
- type = GST_QUERY_TYPE (query);
- if (!gst_byte_writer_put_uint32_le (&bw, type))
- goto write_failed;
- if (str) {
- if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, len + 1))
- goto write_failed;
- } else {
- if (!gst_byte_writer_put_uint8 (&bw, 0))
- goto write_failed;
- }
-
- if (!write_byte_writer_to_fd (comm, &bw))
- goto write_failed;
-
-done:
- g_mutex_unlock (&comm->mutex);
- gst_byte_writer_reset (&bw);
- g_free (str);
- return;
-
-write_failed:
- GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
- ("Failed to write to socket"));
- goto done;
-}
-
-static gboolean
-gst_ipc_pipeline_comm_read_query_result (GstIpcPipelineComm * comm,
- guint32 size, GstQuery ** query)
-{
- gchar *end = NULL;
- GstStructure *structure;
- guint8 result;
- guint32 type;
- const guint8 *payload = NULL;
- guint32 mapped_size = size;
-
- /* this should not be called if we don't have enough yet */
- *query = NULL;
- g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE);
- g_return_val_if_fail (size >= 1 + sizeof (guint32), FALSE);
-
- payload = gst_adapter_map (comm->adapter, mapped_size);
- if (!payload)
- return FALSE;
- result = *payload++;
- memcpy (&type, payload, sizeof (type));
- payload += sizeof (type);
-
- size -= 1 + sizeof (guint32);
- if (size == 0)
- goto done;
-
- if (payload[size - 1]) {
- result = FALSE;
- goto done;
- }
- if (*payload) {
- structure = gst_structure_from_string ((const char *) payload, &end);
- } else {
- structure = NULL;
- }
- if (!structure) {
- result = FALSE;
- goto done;
- }
-
- *query = gst_query_new_custom (type, structure);
-
-done:
- gst_adapter_unmap (comm->adapter);
- gst_adapter_flush (comm->adapter, mapped_size);
- return result;
-}
-
-typedef struct
-{
- guint32 bytes;
-
- guint64 size;
- guint32 flags;
- guint64 api;
- char *str;
-} MetaBuildInfo;
-
-typedef struct
-{
- GstIpcPipelineComm *comm;
- guint32 n_meta;
- guint32 total_bytes;
- MetaBuildInfo *info;
-} MetaListRepresentation;
-
-static gboolean
-build_meta (GstBuffer * buffer, GstMeta ** meta, gpointer user_data)
-{
- MetaListRepresentation *repr = user_data;
-
- repr->n_meta++;
- repr->info = g_realloc (repr->info, repr->n_meta * sizeof (MetaBuildInfo));
- repr->info[repr->n_meta - 1].bytes =
- /* 4 byte bytes */
- 4
- /* 4 byte GstMetaFlags */
- + 4
- /* GstMetaInfo::api */
- + 4 + strlen (g_type_name ((*meta)->info->api)) + 1
- /* GstMetaInfo::size */
- + 8
- /* str length */
- + 4;
-
- repr->info[repr->n_meta - 1].flags = (*meta)->flags;
- repr->info[repr->n_meta - 1].api = (*meta)->info->api;
- repr->info[repr->n_meta - 1].size = (*meta)->info->size;
- repr->info[repr->n_meta - 1].str = NULL;
-
- /* GstMeta is a base class, and actual useful classes are all different...
- So we list a few of them we know we want and ignore the open ended rest */
- if ((*meta)->info->api == GST_PROTECTION_META_API_TYPE) {
- GstProtectionMeta *m = (GstProtectionMeta *) * meta;
- repr->info[repr->n_meta - 1].str = gst_structure_to_string (m->info);
- repr->info[repr->n_meta - 1].bytes +=
- strlen (repr->info[repr->n_meta - 1].str) + 1;
- GST_TRACE_OBJECT (repr->comm->element, "Found GstMeta type %s: %s",
- g_type_name ((*meta)->info->api), repr->info[repr->n_meta - 1].str);
- } else {
- GST_WARNING_OBJECT (repr->comm->element, "Ignoring GstMeta type %s",
- g_type_name ((*meta)->info->api));
- }
- repr->total_bytes += repr->info[repr->n_meta - 1].bytes;
- return TRUE;
-}
-
-typedef struct
-{
- guint64 pts;
- guint64 dts;
- guint64 duration;
- guint64 offset;
- guint64 offset_end;
- guint64 flags;
-} CommBufferMetadata;
-
-GstFlowReturn
-gst_ipc_pipeline_comm_write_buffer_to_fd (GstIpcPipelineComm * comm,
- GstBuffer * buffer)
-{
- const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER;
- GstMapInfo map;
- guint32 ret32 = GST_FLOW_OK;
- guint32 size, n;
- CommBufferMetadata meta;
- GstFlowReturn ret;
- MetaListRepresentation repr = { comm, 0, 4, NULL }; /* starts a 4 for n_meta */
- GstByteWriter bw;
-
- g_mutex_lock (&comm->mutex);
- ++comm->send_id;
-
- GST_TRACE_OBJECT (comm->element, "Writing buffer %u: %" GST_PTR_FORMAT,
- comm->send_id, buffer);
-
- gst_byte_writer_init (&bw);
-
- meta.pts = GST_BUFFER_PTS (buffer);
- meta.dts = GST_BUFFER_DTS (buffer);
- meta.duration = GST_BUFFER_DURATION (buffer);
- meta.offset = GST_BUFFER_OFFSET (buffer);
- meta.offset_end = GST_BUFFER_OFFSET_END (buffer);
- meta.flags = GST_BUFFER_FLAGS (buffer);
-
- /* work out meta size */
- gst_buffer_foreach_meta (buffer, build_meta, &repr);
-
- if (!gst_byte_writer_put_uint8 (&bw, payload_type))
- goto write_failed;
- if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
- goto write_failed;
- size =
- gst_buffer_get_size (buffer) + sizeof (guint32) +
- sizeof (CommBufferMetadata) + repr.total_bytes;
- if (!gst_byte_writer_put_uint32_le (&bw, size))
- goto write_failed;
- if (!gst_byte_writer_put_data (&bw, (const guint8 *) &meta, sizeof (meta)))
- goto write_failed;
- size = gst_buffer_get_size (buffer);
- if (!gst_byte_writer_put_uint32_le (&bw, size))
- goto write_failed;
- if (!write_byte_writer_to_fd (comm, &bw))
- goto write_failed;
-
- if (!gst_buffer_map (buffer, &map, GST_MAP_READ))
- goto map_failed;
- ret = write_to_fd_raw (comm, map.data, map.size);
- gst_buffer_unmap (buffer, &map);
- if (!ret)
- goto write_failed;
-
- /* meta */
- gst_byte_writer_init (&bw);
- if (!gst_byte_writer_put_uint32_le (&bw, repr.n_meta))
- goto write_failed;
- for (n = 0; n < repr.n_meta; ++n) {
- const MetaBuildInfo *info = repr.info + n;
- guint32 len;
- const char *s;
-
- if (!gst_byte_writer_put_uint32_le (&bw, info->bytes))
- goto write_failed;
-
- if (!gst_byte_writer_put_uint32_le (&bw, info->flags))
- goto write_failed;
-
- s = g_type_name (info->api);
- len = strlen (s) + 1;
- if (!gst_byte_writer_put_uint32_le (&bw, len))
- goto write_failed;
- if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len))
- goto write_failed;
-
- if (!gst_byte_writer_put_uint64_le (&bw, info->size))
- goto write_failed;
-
- s = info->str;
- len = s ? (strlen (s) + 1) : 0;
- if (!gst_byte_writer_put_uint32_le (&bw, len))
- goto write_failed;
- if (len)
- if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len))
- goto write_failed;
- }
-
- if (!write_byte_writer_to_fd (comm, &bw))
- goto write_failed;
-
- if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
- ACK_TYPE_BLOCKING, COMM_REQUEST_TYPE_BUFFER))
- goto wait_failed;
- ret = ret32;
-
-done:
- g_mutex_unlock (&comm->mutex);
- gst_byte_writer_reset (&bw);
- for (n = 0; n < repr.n_meta; ++n)
- g_free (repr.info[n].str);
- g_free (repr.info);
- return ret;
-
-write_failed:
- GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
- ("Failed to write to socket"));
- ret = GST_FLOW_COMM_ERROR;
- goto done;
-
-wait_failed:
- GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
- ("Failed to wait for reply on socket"));
- ret = GST_FLOW_COMM_ERROR;
- goto done;
-
-map_failed:
- GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL),
- ("Failed to map buffer"));
- ret = GST_FLOW_ERROR;
- goto done;
-}
-
-static GstBuffer *
-gst_ipc_pipeline_comm_read_buffer (GstIpcPipelineComm * comm, guint32 size)
-{
- GstBuffer *buffer;
- CommBufferMetadata meta;
- guint32 n_meta, n;
- const guint8 *payload = NULL;
- guint32 mapped_size, buffer_data_size;
-
- /* this should not be called if we don't have enough yet */
- g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
- g_return_val_if_fail (size >= sizeof (CommBufferMetadata), NULL);
-
- mapped_size = sizeof (CommBufferMetadata) + sizeof (buffer_data_size);
- payload = gst_adapter_map (comm->adapter, mapped_size);
- if (!payload)
- return NULL;
- memcpy (&meta, payload, sizeof (CommBufferMetadata));
- payload += sizeof (CommBufferMetadata);
- memcpy (&buffer_data_size, payload, sizeof (buffer_data_size));
- size -= mapped_size;
- gst_adapter_unmap (comm->adapter);
- gst_adapter_flush (comm->adapter, mapped_size);
-
- if (buffer_data_size == 0) {
- buffer = gst_buffer_new ();
- } else {
- buffer = gst_adapter_get_buffer (comm->adapter, buffer_data_size);
- gst_adapter_flush (comm->adapter, buffer_data_size);
- }
- size -= buffer_data_size;
-
- GST_BUFFER_PTS (buffer) = meta.pts;
- GST_BUFFER_DTS (buffer) = meta.dts;
- GST_BUFFER_DURATION (buffer) = meta.duration;
- GST_BUFFER_OFFSET (buffer) = meta.offset;
- GST_BUFFER_OFFSET_END (buffer) = meta.offset_end;
- GST_BUFFER_FLAGS (buffer) = meta.flags;
-
- /* If you don't call that, the GType isn't yet known at the
- g_type_from_name below */
- gst_protection_meta_get_info ();
-
- mapped_size = size;
- payload = gst_adapter_map (comm->adapter, mapped_size);
- if (!payload) {
- gst_buffer_unref (buffer);
- return NULL;
- }
- memcpy (&n_meta, payload, sizeof (n_meta));
- payload += sizeof (n_meta);
-
- for (n = 0; n < n_meta; ++n) {
- guint32 flags, len, bytes;
- guint64 msize;
- GType api;
- GstMeta *meta;
- GstStructure *structure = NULL;
-
- memcpy (&bytes, payload, sizeof (bytes));
- payload += sizeof (bytes);
-
-#define READ_FIELD(f) do { \
- memcpy (&f, payload, sizeof (f)); \
- payload += sizeof(f); \
- } while(0)
-
- READ_FIELD (flags);
- READ_FIELD (len);
- api = g_type_from_name ((const char *) payload);
- payload = (const guint8 *) strchr ((const char *) payload, 0) + 1;
- READ_FIELD (msize);
- READ_FIELD (len);
- if (len) {
- structure = gst_structure_new_from_string ((const char *) payload);
- payload += len + 1;
- }
-
- /* Seems we can add a meta from the api nor type ? */
- if (api == GST_PROTECTION_META_API_TYPE) {
- meta =
- gst_buffer_add_meta (buffer, gst_protection_meta_get_info (), NULL);
- ((GstProtectionMeta *) meta)->info = structure;
- } else {
- GST_WARNING_OBJECT (comm->element, "Unsupported meta: %s",
- g_type_name (api));
- }
-
-#undef READ_FIELD
-
- }
-
- gst_adapter_unmap (comm->adapter);
- gst_adapter_flush (comm->adapter, mapped_size);
-
- return buffer;
-}
-
-static gboolean
-gst_ipc_pipeline_comm_write_sink_message_event_to_fd (GstIpcPipelineComm * comm,
- GstEvent * event)
-{
- const unsigned char payload_type =
- GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT;
- gboolean ret;
- guint32 type, size, eseqnum, mseqnum, ret32 = TRUE, slen, structure_slen;
- char *str = NULL;
- const GstStructure *structure;
- GstMessage *message = NULL;
- const char *name;
- GstByteWriter bw;
-
- g_return_val_if_fail (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE,
- FALSE);
-
- g_mutex_lock (&comm->mutex);
- ++comm->send_id;
-
- GST_TRACE_OBJECT (comm->element,
- "Writing sink message event %u: %" GST_PTR_FORMAT, comm->send_id, event);
-
- gst_byte_writer_init (&bw);
- if (!gst_byte_writer_put_uint8 (&bw, payload_type))
- goto write_failed;
- if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
- goto write_failed;
- name = gst_structure_get_name (gst_event_get_structure (event));
- slen = strlen (name) + 1;
- gst_event_parse_sink_message (event, &message);
- structure = gst_message_get_structure (message);
- if (structure) {
- str = gst_structure_to_string (structure);
- structure_slen = strlen (str);
- } else {
- str = NULL;
- structure_slen = 0;
- }
- size = sizeof (type) + sizeof (eseqnum) + sizeof (mseqnum) + sizeof (slen) +
- strlen (name) + 1 + structure_slen + 1;
- if (!gst_byte_writer_put_uint32_le (&bw, size))
- goto write_failed;
-
- type = GST_MESSAGE_TYPE (message);
- if (!gst_byte_writer_put_uint32_le (&bw, type))
- goto write_failed;
- size -= sizeof (type);
-
- eseqnum = GST_EVENT_SEQNUM (event);
- if (!gst_byte_writer_put_uint32_le (&bw, eseqnum))
- goto write_failed;
- size -= sizeof (eseqnum);
-
- mseqnum = GST_MESSAGE_SEQNUM (message);
- if (!gst_byte_writer_put_uint32_le (&bw, mseqnum))
- goto write_failed;
- size -= sizeof (mseqnum);
-
- if (!gst_byte_writer_put_uint32_le (&bw, slen))
- goto write_failed;
- size -= sizeof (slen);
-
- if (!gst_byte_writer_put_data (&bw, (const guint8 *) name, slen))
- goto write_failed;
- size -= slen;
-
- if (str) {
- if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size))
- goto write_failed;
- } else {
- if (!gst_byte_writer_put_uint8 (&bw, 0))
- goto write_failed;
- }
-
- if (!write_byte_writer_to_fd (comm, &bw))
- goto write_failed;
-
- if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
- GST_EVENT_IS_SERIALIZED (event) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED,
- COMM_REQUEST_TYPE_EVENT))
- goto write_failed;
-
- ret = ret32;
-
-done:
- g_mutex_unlock (&comm->mutex);
- gst_byte_writer_reset (&bw);
- g_free (str);
- if (message)
- gst_message_unref (message);
- return ret;
-
-write_failed:
- GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
- ("Failed to write to socket"));
- ret = FALSE;
- goto done;
-}
-
-static GstEvent *
-gst_ipc_pipeline_comm_read_sink_message_event (GstIpcPipelineComm * comm,
- guint32 size)
-{
- GstMessage *message;
- GstEvent *event = NULL;
- gchar *end = NULL;
- GstStructure *structure;
- guint32 type, eseqnum, mseqnum, slen;
- const char *name;
- guint32 mapped_size = size;
- const guint8 *payload;
-
- /* this should not be called if we don't have enough yet */
- g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
- g_return_val_if_fail (size >= sizeof (type) + sizeof (slen), NULL);
-
- payload = gst_adapter_map (comm->adapter, mapped_size);
- if (!payload)
- return NULL;
- memcpy (&type, payload, sizeof (type));
- payload += sizeof (type);
- size -= sizeof (type);
- if (size == 0)
- goto done;
-
- memcpy (&eseqnum, payload, sizeof (eseqnum));
- payload += sizeof (eseqnum);
- size -= sizeof (eseqnum);
- if (size == 0)
- goto done;
-
- memcpy (&mseqnum, payload, sizeof (mseqnum));
- payload += sizeof (mseqnum);
- size -= sizeof (mseqnum);
- if (size == 0)
- goto done;
-
- memcpy (&slen, payload, sizeof (slen));
- payload += sizeof (slen);
- size -= sizeof (slen);
- if (size == 0)
- goto done;
-
- if (payload[slen - 1])
- goto done;
- name = (const char *) payload;
- payload += slen;
- size -= slen;
-
- if ((payload)[size - 1]) {
- goto done;
- }
- if (*payload) {
- structure = gst_structure_from_string ((const char *) payload, &end);
- } else {
- structure = NULL;
- }
-
- message =
- gst_message_new_custom (type, GST_OBJECT (comm->element), structure);
- gst_message_set_seqnum (message, mseqnum);
- event = gst_event_new_sink_message (name, message);
- gst_event_set_seqnum (event, eseqnum);
- gst_message_unref (message);
-
-done:
- gst_adapter_unmap (comm->adapter);
- gst_adapter_flush (comm->adapter, mapped_size);
- return event;
-}
-
-gboolean
-gst_ipc_pipeline_comm_write_event_to_fd (GstIpcPipelineComm * comm,
- gboolean upstream, GstEvent * event)
-{
- const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT;
- gboolean ret;
- guint32 type, size, ret32 = TRUE, seqnum, slen;
- char *str = NULL;
- const GstStructure *structure;
- GstByteWriter bw;
-
- /* we special case sink-message event as gst can't serialize/de-serialize it */
- if (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE)
- return gst_ipc_pipeline_comm_write_sink_message_event_to_fd (comm, event);
-
- g_mutex_lock (&comm->mutex);
- ++comm->send_id;
-
- GST_TRACE_OBJECT (comm->element, "Writing event %u: %" GST_PTR_FORMAT,
- comm->send_id, event);
-
- gst_byte_writer_init (&bw);
- if (!gst_byte_writer_put_uint8 (&bw, payload_type))
- goto write_failed;
- if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
- goto write_failed;
- structure = gst_event_get_structure (event);
- if (structure) {
-
- if (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START) {
- GstStructure *s = gst_structure_copy (structure);
- gst_structure_remove_field (s, "stream");
- str = gst_structure_to_string (s);
- gst_structure_free (s);
- } else {
- str = gst_structure_to_string (structure);
- }
-
- slen = strlen (str);
- } else {
- str = NULL;
- slen = 0;
- }
- size = sizeof (type) + sizeof (seqnum) + 1 + slen + 1;
- if (!gst_byte_writer_put_uint32_le (&bw, size))
- goto write_failed;
-
- type = GST_EVENT_TYPE (event);
- if (!gst_byte_writer_put_uint32_le (&bw, type))
- goto write_failed;
-
- seqnum = GST_EVENT_SEQNUM (event);
- if (!gst_byte_writer_put_uint32_le (&bw, seqnum))
- goto write_failed;
-
- if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0))
- goto write_failed;
-
- if (str) {
- if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1))
- goto write_failed;
- } else {
- if (!gst_byte_writer_put_uint8 (&bw, 0))
- goto write_failed;
- }
-
- if (!write_byte_writer_to_fd (comm, &bw))
- goto write_failed;
-
- /* Upstream events get serialized, this is required to send seeks only
- * one at a time. */
- if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
- (GST_EVENT_IS_SERIALIZED (event) || GST_EVENT_IS_UPSTREAM (event)) ?
- ACK_TYPE_BLOCKING : ACK_TYPE_NONE, COMM_REQUEST_TYPE_EVENT))
- goto write_failed;
- ret = ret32;
-
-done:
- g_mutex_unlock (&comm->mutex);
- g_free (str);
- gst_byte_writer_reset (&bw);
- return ret;
-
-write_failed:
- GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
- ("Failed to write to socket"));
- ret = FALSE;
- goto done;
-}
-
-static GstEvent *
-gst_ipc_pipeline_comm_read_event (GstIpcPipelineComm * comm, guint32 size,
- gboolean * upstream)
-{
- GstEvent *event = NULL;
- gchar *end = NULL;
- GstStructure *structure;
- guint32 type, seqnum;
- guint32 mapped_size = size;
- const guint8 *payload;
-
- /* this should not be called if we don't have enough yet */
- g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
- g_return_val_if_fail (size >= sizeof (type), NULL);
-
- payload = gst_adapter_map (comm->adapter, mapped_size);
- if (!payload)
- return NULL;
-
- memcpy (&type, payload, sizeof (type));
- payload += sizeof (type);
- size -= sizeof (type);
- if (size == 0)
- goto done;
-
- memcpy (&seqnum, payload, sizeof (seqnum));
- payload += sizeof (seqnum);
- size -= sizeof (seqnum);
- if (size == 0)
- goto done;
-
- *upstream = (*payload) ? TRUE : FALSE;
- payload += 1;
- size -= 1;
- if (size == 0)
- goto done;
-
- if (payload[size - 1])
- goto done;
- if (*payload) {
- structure = gst_structure_from_string ((const char *) payload, &end);
- } else {
- structure = NULL;
- }
-
- event = gst_event_new_custom (type, structure);
- gst_event_set_seqnum (event, seqnum);
-
-done:
- gst_adapter_unmap (comm->adapter);
- gst_adapter_flush (comm->adapter, mapped_size);
- return event;
-}
-
-gboolean
-gst_ipc_pipeline_comm_write_query_to_fd (GstIpcPipelineComm * comm,
- gboolean upstream, GstQuery * query)
-{
- const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY;
- gboolean ret;
- guint32 type, size, ret32 = TRUE, slen;
- char *str = NULL;
- const GstStructure *structure;
- GstByteWriter bw;
-
- g_mutex_lock (&comm->mutex);
- ++comm->send_id;
-
- GST_TRACE_OBJECT (comm->element, "Writing query %u: %" GST_PTR_FORMAT,
- comm->send_id, query);
-
- gst_byte_writer_init (&bw);
- if (!gst_byte_writer_put_uint8 (&bw, payload_type))
- goto write_failed;
- if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
- goto write_failed;
- structure = gst_query_get_structure (query);
- if (structure) {
- str = gst_structure_to_string (structure);
- slen = strlen (str);
- } else {
- str = NULL;
- slen = 0;
- }
- size = sizeof (type) + 1 + slen + 1;
- if (!gst_byte_writer_put_uint32_le (&bw, size))
- goto write_failed;
-
- type = GST_QUERY_TYPE (query);
- if (!gst_byte_writer_put_uint32_le (&bw, type))
- goto write_failed;
-
- if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0))
- goto write_failed;
-
- if (str) {
- if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1))
- goto write_failed;
- } else {
- if (!gst_byte_writer_put_uint8 (&bw, 0))
- goto write_failed;
- }
-
- if (!write_byte_writer_to_fd (comm, &bw))
- goto write_failed;
-
- if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, query, &ret32,
- GST_QUERY_IS_SERIALIZED (query) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED,
- COMM_REQUEST_TYPE_QUERY))
- goto write_failed;
-
- ret = ret32;
-
-done:
- g_mutex_unlock (&comm->mutex);
- g_free (str);
- gst_byte_writer_reset (&bw);
- return ret;
-
-write_failed:
- GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
- ("Failed to write to socket"));
- ret = FALSE;
- goto done;
-}
-
-static GstQuery *
-gst_ipc_pipeline_comm_read_query (GstIpcPipelineComm * comm, guint32 size,
- gboolean * upstream)
-{
- GstQuery *query = NULL;
- gchar *end = NULL;
- GstStructure *structure;
- guint32 type;
- guint32 mapped_size = size;
- const guint8 *payload;
-
- /* this should not be called if we don't have enough yet */
- g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
- g_return_val_if_fail (size >= sizeof (type), NULL);
-
- payload = gst_adapter_map (comm->adapter, mapped_size);
- if (!payload)
- return NULL;
-
- memcpy (&type, payload, sizeof (type));
- payload += sizeof (type);
- size -= sizeof (type);
- if (size == 0)
- goto done;
-
- *upstream = (*payload) ? TRUE : FALSE;
- payload += 1;
- size -= 1;
- if (size == 0)
- goto done;
-
- if (payload[size - 1])
- goto done;
- if (*payload) {
- structure = gst_structure_from_string ((const char *) payload, &end);
- } else {
- structure = NULL;
- }
-
- query = gst_query_new_custom (type, structure);
-
- /* CAPS queries contain a filter field, of GstCaps type, which can be NULL.
- This does not play well with the serialization/deserialization system,
- which will give us a non-NULL GstCaps which has a value of NULL. This
- in turn wreaks havoc with any code that tests whether filter is NULL
- (which basically means, am I being given an optional GstCaps ?).
- So we look for non-NULL GstCaps which have NULL contents, and replace
- them with NULL instead. */
- if (GST_QUERY_TYPE (query) == GST_QUERY_CAPS) {
- GstCaps *filter;
- gst_query_parse_caps (query, &filter);
- if (filter
- && !strcmp (gst_structure_get_name (gst_caps_get_structure (filter, 0)),
- "NULL")) {
- gst_query_unref (query);
- query = gst_query_new_caps (NULL);
- }
- }
-
-done:
- gst_adapter_unmap (comm->adapter);
- gst_adapter_flush (comm->adapter, mapped_size);
- return query;
-}
-
-GstStateChangeReturn
-gst_ipc_pipeline_comm_write_state_change_to_fd (GstIpcPipelineComm * comm,
- GstStateChange transition)
-{
- const unsigned char payload_type =
- GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE;
- GstStateChangeReturn ret;
- guint32 size, ret32 = GST_STATE_CHANGE_SUCCESS;
- GstByteWriter bw;
-
- g_mutex_lock (&comm->mutex);
- ++comm->send_id;
-
- GST_TRACE_OBJECT (comm->element, "Writing state change %u: %s -> %s",
- comm->send_id,
- gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
- gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
-
- gst_byte_writer_init (&bw);
- if (!gst_byte_writer_put_uint8 (&bw, payload_type))
- goto write_failed;
- if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
- goto write_failed;
- size = sizeof (transition);
- if (!gst_byte_writer_put_uint32_le (&bw, size))
- goto write_failed;
- if (!gst_byte_writer_put_uint32_le (&bw, transition))
- goto write_failed;
-
- if (!write_byte_writer_to_fd (comm, &bw))
- goto write_failed;
-
- if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
- ACK_TYPE_TIMED, COMM_REQUEST_TYPE_STATE_CHANGE))
- goto write_failed;
- ret = ret32;
-
-done:
- g_mutex_unlock (&comm->mutex);
- gst_byte_writer_reset (&bw);
- return ret;
-
-write_failed:
- GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
- ("Failed to write to socket"));
- ret = GST_STATE_CHANGE_FAILURE;
- goto done;
-}
-
-static gboolean
-is_valid_state_change (GstStateChange transition)
-{
- if (transition == GST_STATE_CHANGE_NULL_TO_READY)
- return TRUE;
- if (transition == GST_STATE_CHANGE_READY_TO_PAUSED)
- return TRUE;
- if (transition == GST_STATE_CHANGE_PAUSED_TO_PLAYING)
- return TRUE;
- if (transition == GST_STATE_CHANGE_PLAYING_TO_PAUSED)
- return TRUE;
- if (transition == GST_STATE_CHANGE_PAUSED_TO_READY)
- return TRUE;
- if (transition == GST_STATE_CHANGE_READY_TO_NULL)
- return TRUE;
- if (GST_STATE_TRANSITION_CURRENT (transition) ==
- GST_STATE_TRANSITION_NEXT (transition))
- return TRUE;
- return FALSE;
-}
-
-static gboolean
-gst_ipc_pipeline_comm_read_state_change (GstIpcPipelineComm * comm,
- guint32 size, guint32 * transition)
-{
- guint32 mapped_size = size;
- const guint8 *payload;
-
- /* this should not be called if we don't have enough yet */
- g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE);
- g_return_val_if_fail (size >= sizeof (*transition), FALSE);
-
- payload = gst_adapter_map (comm->adapter, size);
- if (!payload)
- return FALSE;
- memcpy (transition, payload, sizeof (*transition));
- gst_adapter_unmap (comm->adapter);
- gst_adapter_flush (comm->adapter, mapped_size);
- return is_valid_state_change (*transition);
-}
-
-void
-gst_ipc_pipeline_comm_write_state_lost_to_fd (GstIpcPipelineComm * comm)
-{
- const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST;
- guint32 size;
- GstByteWriter bw;
-
- g_mutex_lock (&comm->mutex);
- ++comm->send_id;
-
- GST_TRACE_OBJECT (comm->element, "Writing state-lost %u", comm->send_id);
- gst_byte_writer_init (&bw);
- if (!gst_byte_writer_put_uint8 (&bw, payload_type))
- goto write_failed;
- if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
- goto write_failed;
- size = 0;
- if (!gst_byte_writer_put_uint32_le (&bw, size))
- goto write_failed;
-
- if (!write_byte_writer_to_fd (comm, &bw))
- goto write_failed;
-
-done:
- g_mutex_unlock (&comm->mutex);
- gst_byte_writer_reset (&bw);
- return;
-
-write_failed:
- GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
- ("Failed to write to socket"));
- goto done;
-}
-
-static gboolean
-gst_ipc_pipeline_comm_read_state_lost (GstIpcPipelineComm * comm, guint32 size)
-{
- /* no payload */
- return TRUE;
-}
-
-static gboolean
-gst_ipc_pipeline_comm_write_gerror_message_to_fd (GstIpcPipelineComm * comm,
- GstMessage * message)
-{
- const unsigned char payload_type =
- GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE;
- gboolean ret;
- guint32 code, size, ret32 = TRUE;
- char *str = NULL;
- GError *error;
- char *extra_message;
- const char *domain_string;
- unsigned char msgtype;
- GstByteWriter bw;
-
- g_mutex_lock (&comm->mutex);
- ++comm->send_id;
-
- if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR) {
- gst_message_parse_error (message, &error, &extra_message);
- msgtype = 2;
- } else if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING) {
- gst_message_parse_warning (message, &error, &extra_message);
- msgtype = 1;
- } else {
- gst_message_parse_info (message, &error, &extra_message);
- msgtype = 0;
- }
- code = error->code;
- domain_string = g_quark_to_string (error->domain);
- GST_TRACE_OBJECT (comm->element,
- "Writing error %u: domain %s, code %u, message %s, extra message %s",
- comm->send_id, domain_string, error->code, error->message, extra_message);
-
- gst_byte_writer_init (&bw);
- if (!gst_byte_writer_put_uint8 (&bw, payload_type))
- goto write_failed;
- if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
- goto write_failed;
-
- size = sizeof (size);
- size += 1;
- size += strlen (domain_string) + 1;
- size += sizeof (code);
- size += sizeof (size);
- size += error->message ? strlen (error->message) + 1 : 0;
- size += sizeof (size);
- size += extra_message ? strlen (extra_message) + 1 : 0;
-
- if (!gst_byte_writer_put_uint32_le (&bw, size))
- goto write_failed;
-
- if (!gst_byte_writer_put_uint8 (&bw, msgtype))
- goto write_failed;
- size = strlen (domain_string) + 1;
- if (!gst_byte_writer_put_uint32_le (&bw, size))
- goto write_failed;
- if (!gst_byte_writer_put_data (&bw, (const guint8 *) domain_string, size))
- goto write_failed;
- if (!gst_byte_writer_put_uint32_le (&bw, code))
- goto write_failed;
- size = error->message ? strlen (error->message) + 1 : 0;
- if (!gst_byte_writer_put_uint32_le (&bw, size))
- goto write_failed;
- if (error->message) {
- if (!gst_byte_writer_put_data (&bw, (const guint8 *) error->message, size))
- goto write_failed;
- }
- size = extra_message ? strlen (extra_message) + 1 : 0;
- if (!gst_byte_writer_put_uint32_le (&bw, size))
- goto write_failed;
- if (extra_message) {
- if (!gst_byte_writer_put_data (&bw, (const guint8 *) extra_message, size))
- goto write_failed;
- }
-
- if (!write_byte_writer_to_fd (comm, &bw))
- goto write_failed;
-
- if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
- ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE))
- goto write_failed;
-
- ret = ret32;
-
-done:
- g_mutex_unlock (&comm->mutex);
- g_free (str);
- g_error_free (error);
- g_free (extra_message);
- gst_byte_writer_reset (&bw);
- return ret;
-
-write_failed:
- GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
- ("Failed to write to socket"));
- ret = FALSE;
- goto done;
-}
-
-static GstMessage *
-gst_ipc_pipeline_comm_read_gerror_message (GstIpcPipelineComm * comm,
- guint32 size)
-{
- GstMessage *message = NULL;
- guint32 code;
- GQuark domain;
- const char *msg, *extra_message;
- GError *error;
- unsigned char msgtype;
- guint32 mapped_size = size;
- const guint8 *payload;
-
- /* this should not be called if we don't have enough yet */
- g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
- g_return_val_if_fail (size >= sizeof (code) + sizeof (size) * 3 + 1 + 1,
- NULL);
-
- payload = gst_adapter_map (comm->adapter, mapped_size);
- if (!payload)
- return NULL;
- msgtype = *payload++;
- memcpy (&size, payload, sizeof (size));
- payload += sizeof (size);
- if (payload[size - 1])
- goto done;
- domain = g_quark_from_string ((const char *) payload);
- payload += size;
-
- memcpy (&code, payload, sizeof (code));
- payload += sizeof (code);
-
- memcpy (&size, payload, sizeof (size));
- payload += sizeof (size);
- if (size) {
- if (payload[size - 1])
- goto done;
- msg = (const char *) payload;
- } else {
- msg = NULL;
- }
- payload += size;
-
- memcpy (&size, payload, sizeof (size));
- payload += sizeof (size);
- if (size) {
- if (payload[size - 1])
- goto done;
- extra_message = (const char *) payload;
- } else {
- extra_message = NULL;
- }
- payload += size;
-
- error = g_error_new (domain, code, "%s", msg);
- if (msgtype == 2)
- message =
- gst_message_new_error (GST_OBJECT (comm->element), error,
- extra_message);
- else if (msgtype == 1)
- message =
- gst_message_new_warning (GST_OBJECT (comm->element), error,
- extra_message);
- else
- message =
- gst_message_new_info (GST_OBJECT (comm->element), error, extra_message);
- g_error_free (error);
-
-done:
- gst_adapter_unmap (comm->adapter);
- gst_adapter_flush (comm->adapter, mapped_size);
-
- return message;
-}
-
-gboolean
-gst_ipc_pipeline_comm_write_message_to_fd (GstIpcPipelineComm * comm,
- GstMessage * message)
-{
- const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE;
- gboolean ret;
- guint32 type, size, ret32 = TRUE, slen;
- char *str = NULL;
- const GstStructure *structure;
- GstByteWriter bw;
-
- /* we special case error as gst can't serialize/de-serialize it */
- if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR
- || GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING
- || GST_MESSAGE_TYPE (message) == GST_MESSAGE_INFO)
- return gst_ipc_pipeline_comm_write_gerror_message_to_fd (comm, message);
-
- g_mutex_lock (&comm->mutex);
- ++comm->send_id;
-
- GST_TRACE_OBJECT (comm->element, "Writing message %u: %" GST_PTR_FORMAT,
- comm->send_id, message);
-
- gst_byte_writer_init (&bw);
- if (!gst_byte_writer_put_uint8 (&bw, payload_type))
- goto write_failed;
- if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
- goto write_failed;
- structure = gst_message_get_structure (message);
- if (structure) {
- str = gst_structure_to_string (structure);
- slen = strlen (str);
- } else {
- str = NULL;
- slen = 0;
- }
- size = sizeof (type) + slen + 1;
- if (!gst_byte_writer_put_uint32_le (&bw, size))
- goto write_failed;
-
- type = GST_MESSAGE_TYPE (message);
- if (!gst_byte_writer_put_uint32_le (&bw, type))
- goto write_failed;
- size -= sizeof (type);
- if (str) {
- if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size))
- goto write_failed;
- } else {
- if (!gst_byte_writer_put_uint8 (&bw, 0))
- goto write_failed;
- }
-
- if (!write_byte_writer_to_fd (comm, &bw))
- goto write_failed;
-
- if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
- ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE))
- goto write_failed;
-
- ret = ret32;
-
-done:
- g_mutex_unlock (&comm->mutex);
- g_free (str);
- gst_byte_writer_reset (&bw);
- return ret;
-
-write_failed:
- GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
- ("Failed to write to socket"));
- ret = FALSE;
- goto done;
-}
-
-static GstMessage *
-gst_ipc_pipeline_comm_read_message (GstIpcPipelineComm * comm, guint32 size)
-{
- GstMessage *message = NULL;
- gchar *end = NULL;
- GstStructure *structure;
- guint32 type;
- guint32 mapped_size = size;
- const guint8 *payload;
-
- /* this should not be called if we don't have enough yet */
- g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
- g_return_val_if_fail (size >= sizeof (type), NULL);
-
- payload = gst_adapter_map (comm->adapter, mapped_size);
- if (!payload)
- return NULL;
- memcpy (&type, payload, sizeof (type));
- payload += sizeof (type);
- size -= sizeof (type);
- if (size == 0)
- goto done;
-
- if (payload[size - 1])
- goto done;
- if (*payload) {
- structure = gst_structure_from_string ((const char *) payload, &end);
- } else {
- structure = NULL;
- }
-
- message =
- gst_message_new_custom (type, GST_OBJECT (comm->element), structure);
-
-done:
- gst_adapter_unmap (comm->adapter);
- gst_adapter_flush (comm->adapter, mapped_size);
-
- return message;
-}
-
-void
-gst_ipc_pipeline_comm_init (GstIpcPipelineComm * comm, GstElement * element)
-{
- g_mutex_init (&comm->mutex);
- comm->element = element;
- comm->fdin = comm->fdout = -1;
- comm->ack_time = DEFAULT_ACK_TIME;
- comm->waiting_ids =
- g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
- (GDestroyNotify) comm_request_free);
- comm->adapter = gst_adapter_new ();
- g_atomic_int_set (&comm->thread_running, 0);
-}
-
-void
-gst_ipc_pipeline_comm_clear (GstIpcPipelineComm * comm)
-{
- g_assert (!g_atomic_int_get (&comm->thread_running));
- g_hash_table_destroy (comm->waiting_ids);
- gst_object_unref (comm->adapter);
- g_mutex_clear (&comm->mutex);
-}
-
-static void
-cancel_request (gpointer key, gpointer value, gpointer user_data,
- GstFlowReturn fret)
-{
- GstIpcPipelineComm *comm = (GstIpcPipelineComm *) user_data;
- guint32 id = GPOINTER_TO_INT (key);
- CommRequest *req = (CommRequest *) value;
-
- GST_TRACE_OBJECT (comm->element, "Cancelling request %u, type %d", id,
- req->type);
- req->ret = fret;
- req->replied = TRUE;
- g_cond_signal (&req->cond);
-}
-
-static void
-cancel_request_error (gpointer key, gpointer value, gpointer user_data)
-{
- CommRequest *req = (CommRequest *) value;
- GstFlowReturn fret = comm_request_ret_get_failure_value (req->type);
-
- cancel_request (key, value, user_data, fret);
-}
-
-void
-gst_ipc_pipeline_comm_cancel (GstIpcPipelineComm * comm, gboolean cleanup)
-{
- g_mutex_lock (&comm->mutex);
- g_hash_table_foreach (comm->waiting_ids, cancel_request_error, comm);
- if (cleanup) {
- g_hash_table_unref (comm->waiting_ids);
- comm->waiting_ids =
- g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
- (GDestroyNotify) comm_request_free);
- }
- g_mutex_unlock (&comm->mutex);
-}
-
-static gboolean
-set_field (GQuark field_id, const GValue * value, gpointer user_data)
-{
- GstStructure *structure = user_data;
-
- gst_structure_id_set_value (structure, field_id, value);
-
- return TRUE;
-}
-
-static gboolean
-gst_ipc_pipeline_comm_reply_request (GstIpcPipelineComm * comm, guint32 id,
- GstFlowReturn ret, GstQuery * query)
-{
- CommRequest *req;
-
- req = g_hash_table_lookup (comm->waiting_ids, GINT_TO_POINTER (id));
- if (!req) {
- GST_WARNING_OBJECT (comm->element, "Got reply for unknown request %u", id);
- return FALSE;
- }
-
- GST_TRACE_OBJECT (comm->element, "Got reply %d (%s) for request %u", ret,
- comm_request_ret_get_name (req->type, ret), req->id);
- req->replied = TRUE;
- req->ret = ret;
- if (query) {
- if (req->query) {
- /* We need to update the original query in place, as the caller
- will expect the object to be the same */
- GstStructure *structure = gst_query_writable_structure (req->query);
- gst_structure_remove_all_fields (structure);
- gst_structure_foreach (gst_query_get_structure (query), set_field,
- structure);
- } else {
- GST_WARNING_OBJECT (comm->element,
- "Got query reply, but no query was in the request");
- }
- }
- g_cond_signal (&req->cond);
- return TRUE;
-}
-
-static gboolean
-update_adapter (GstIpcPipelineComm * comm)
-{
- GstMemory *mem = NULL;
-
- for (;;) {
- fd_set set;
- struct timeval tv;
- int sret;
- ssize_t sz;
- GstBuffer *buf;
- GstMapInfo map;
- int fdin = comm->fdin;
- int fdclose = comm->reader_thread_stopping_pipe[0];
- int fdmax;
-
- FD_ZERO (&set);
- FD_SET (fdclose, &set);
- fdmax = fdclose;
- if (fdin >= 0 && GST_ELEMENT_PARENT (comm->element)) {
- FD_SET (fdin, &set);
- if (fdin > fdmax)
- fdmax = fdin;
- }
- tv.tv_sec = 0;
- tv.tv_usec = 100000;
- sret = select (fdmax + 1, &set, NULL, NULL, &tv);
- if (sret < 0) {
- if (errno == EAGAIN)
- continue;
- if (errno == EINTR)
- break;
- if (mem)
- gst_memory_unref (mem);
- return FALSE;
- }
- if (FD_ISSET (fdclose, &set)) {
- GST_INFO_OBJECT (comm->element, "data received on close notify pipe");
- comm->reader_thread_stopping = TRUE;
- break;
- }
- if (fdin < 0)
- break;
- if (!FD_ISSET (fdin, &set))
- break;
- if (mem == NULL)
- mem = gst_allocator_alloc (NULL, comm->read_chunk_size, NULL);
- gst_memory_map (mem, &map, GST_MAP_WRITE);
- sz = read (fdin, map.data, map.size);
- gst_memory_unmap (mem, &map);
- if (sz < 0) {
- if (errno == EAGAIN)
- continue;
- mem = NULL;
- if (errno == EINTR)
- break;
- gst_memory_unref (mem);
- return FALSE;
- }
- if (sz == 0) {
- GST_INFO_OBJECT (comm->element, "fd closed");
- comm->reader_thread_stopping = TRUE;
- break;
- }
- gst_memory_resize (mem, 0, sz);
- buf = gst_buffer_new ();
- gst_buffer_append_memory (buf, mem);
- mem = NULL;
- GST_TRACE_OBJECT (comm->element, "Read %u bytes from fd", (unsigned) sz);
- gst_adapter_push (comm->adapter, buf);
-
- /* If we have more data, we loop, otherwise we break */
- FD_ZERO (&set);
- if (fdin >= 0)
- FD_SET (comm->fdin, &set);
- tv.tv_sec = 0;
- tv.tv_usec = 0;
- sret = select (fdin + 1, &set, NULL, NULL, &tv);
- if (sret < 0 || !FD_ISSET (fdin, &set))
- break;
- }
- if (mem)
- gst_memory_unref (mem);
-
- return TRUE;
-}
-
-static gboolean
-read_many (GstIpcPipelineComm * comm)
-{
- gboolean ret = TRUE;
- gsize available;
- const guint8 *payload;
-
- while (1)
- switch (comm->state) {
- case GST_IPC_PIPELINE_COMM_STATE_TYPE:
- {
- guint8 type;
- guint32 mapped_size;
-
- available = gst_adapter_available (comm->adapter);
- mapped_size = 1 + sizeof (gint32) * 2;
- if (available < mapped_size)
- goto done;
-
- payload = gst_adapter_map (comm->adapter, mapped_size);
- type = *payload++;
- g_mutex_lock (&comm->mutex);
- memcpy (&comm->id, payload, sizeof (guint32));
- memcpy (&comm->payload_length, payload + 4, sizeof (guint32));
- g_mutex_unlock (&comm->mutex);
- gst_adapter_unmap (comm->adapter);
- gst_adapter_flush (comm->adapter, mapped_size);
- GST_TRACE_OBJECT (comm->element, "Got id %u, type %d, payload %u",
- comm->id, type, comm->payload_length);
- switch (type) {
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
- GST_TRACE_OBJECT (comm->element, "switching to state %s",
- gst_ipc_pipeline_comm_data_type_get_name (type));
- comm->state = type;
- break;
- default:
- goto out_of_sync;
- }
- break;
- }
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
- {
- const guint8 *rets;
- guint32 ret32;
-
- available = gst_adapter_available (comm->adapter);
- if (available < comm->payload_length)
- goto done;
-
- if (available < sizeof (guint32))
- goto ack_failed;
-
- rets = gst_adapter_map (comm->adapter, sizeof (guint32));
- memcpy (&ret32, rets, sizeof (ret32));
- gst_adapter_unmap (comm->adapter);
- gst_adapter_flush (comm->adapter, sizeof (guint32));
- GST_TRACE_OBJECT (comm->element, "Got ACK %s for id %u",
- gst_flow_get_name (ret32), comm->id);
-
- g_mutex_lock (&comm->mutex);
- gst_ipc_pipeline_comm_reply_request (comm, comm->id, ret32, NULL);
- g_mutex_unlock (&comm->mutex);
-
- GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
- comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
- break;
- }
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
- {
- GstQuery *query = NULL;
- gboolean qret;
-
- available = gst_adapter_available (comm->adapter);
- if (available < comm->payload_length)
- goto done;
-
- qret =
- gst_ipc_pipeline_comm_read_query_result (comm, comm->payload_length,
- &query);
-
- GST_TRACE_OBJECT (comm->element,
- "deserialized query result %p: %d, %" GST_PTR_FORMAT, query, qret,
- query);
-
- g_mutex_lock (&comm->mutex);
- gst_ipc_pipeline_comm_reply_request (comm, comm->id, qret, query);
- g_mutex_unlock (&comm->mutex);
-
- gst_query_unref (query);
-
- GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
- comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
- break;
- }
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
- {
- GstBuffer *buf;
-
- available = gst_adapter_available (comm->adapter);
- if (available < comm->payload_length)
- goto done;
-
- buf = gst_ipc_pipeline_comm_read_buffer (comm, comm->payload_length);
- if (!buf)
- goto buffer_failed;
-
- /* set caps and push */
- GST_TRACE_OBJECT (comm->element,
- "deserialized buffer %p, pushing, timestamp %" GST_TIME_FORMAT
- ", duration %" GST_TIME_FORMAT ", offset %" G_GINT64_FORMAT
- ", offset_end %" G_GINT64_FORMAT ", size %" G_GSIZE_FORMAT
- ", flags 0x%x", buf, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
- GST_TIME_ARGS (GST_BUFFER_DURATION (buf)), GST_BUFFER_OFFSET (buf),
- GST_BUFFER_OFFSET_END (buf), gst_buffer_get_size (buf),
- GST_BUFFER_FLAGS (buf));
-
- gst_mini_object_set_qdata (GST_MINI_OBJECT (buf), QUARK_ID,
- GINT_TO_POINTER (comm->id), NULL);
-
- if (comm->on_buffer)
- (*comm->on_buffer) (comm->id, buf, comm->user_data);
-
- GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
- comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
- break;
- }
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
- {
- GstEvent *event;
- gboolean upstream;
-
- available = gst_adapter_available (comm->adapter);
- if (available < comm->payload_length)
- goto done;
-
- event = gst_ipc_pipeline_comm_read_event (comm, comm->payload_length,
- &upstream);
- if (!event)
- goto event_failed;
-
- GST_TRACE_OBJECT (comm->element, "deserialized event %p of type %s",
- event, gst_event_type_get_name (event->type));
-
- gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID,
- GINT_TO_POINTER (comm->id), NULL);
-
- if (comm->on_event)
- (*comm->on_event) (comm->id, event, upstream, comm->user_data);
-
- GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
- comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
- break;
- }
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
- {
- GstEvent *event;
-
- available = gst_adapter_available (comm->adapter);
- if (available < comm->payload_length)
- goto done;
-
- event = gst_ipc_pipeline_comm_read_sink_message_event (comm,
- comm->payload_length);
- if (!event)
- goto event_failed;
-
- GST_TRACE_OBJECT (comm->element, "deserialized sink message event %p",
- event);
-
- gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID,
- GINT_TO_POINTER (comm->id), NULL);
-
- if (comm->on_event)
- (*comm->on_event) (comm->id, event, FALSE, comm->user_data);
-
- GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
- comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
- break;
- }
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
- {
- GstQuery *query;
- gboolean upstream;
-
- available = gst_adapter_available (comm->adapter);
- if (available < comm->payload_length)
- goto done;
-
- query = gst_ipc_pipeline_comm_read_query (comm, comm->payload_length,
- &upstream);
- if (!query)
- goto query_failed;
-
- GST_TRACE_OBJECT (comm->element, "deserialized query %p of type %s",
- query, gst_query_type_get_name (query->type));
-
- gst_mini_object_set_qdata (GST_MINI_OBJECT (query), QUARK_ID,
- GINT_TO_POINTER (comm->id), NULL);
-
- if (comm->on_query)
- (*comm->on_query) (comm->id, query, upstream, comm->user_data);
-
- GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
- comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
- break;
- }
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
- {
- guint32 transition;
-
- available = gst_adapter_available (comm->adapter);
- if (available < comm->payload_length)
- goto done;
-
- if (!gst_ipc_pipeline_comm_read_state_change (comm,
- comm->payload_length, &transition))
- goto state_change_failed;
-
- GST_TRACE_OBJECT (comm->element,
- "deserialized state change request: %s -> %s",
- gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT
- (transition)),
- gst_element_state_get_name (GST_STATE_TRANSITION_NEXT
- (transition)));
-
- if (comm->on_state_change)
- (*comm->on_state_change) (comm->id, transition, comm->user_data);
-
- GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
- comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
- break;
- }
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
- {
- available = gst_adapter_available (comm->adapter);
- if (available < comm->payload_length)
- goto done;
-
- if (!gst_ipc_pipeline_comm_read_state_lost (comm, comm->payload_length))
- goto event_failed;
-
- GST_TRACE_OBJECT (comm->element, "deserialized state-lost");
-
- if (comm->on_state_lost)
- (*comm->on_state_lost) (comm->user_data);
-
- GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
- comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
- break;
- }
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
- {
- GstMessage *message;
-
- available = gst_adapter_available (comm->adapter);
- if (available < comm->payload_length)
- goto done;
-
- message = gst_ipc_pipeline_comm_read_message (comm,
- comm->payload_length);
- if (!message)
- goto message_failed;
-
- GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s",
- message, gst_message_type_get_name (message->type));
-
- if (comm->on_message)
- (*comm->on_message) (comm->id, message, comm->user_data);
-
- GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
- comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
- break;
- }
- case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
- {
- GstMessage *message;
-
- available = gst_adapter_available (comm->adapter);
- if (available < comm->payload_length)
- goto done;
-
- message = gst_ipc_pipeline_comm_read_gerror_message (comm,
- comm->payload_length);
- if (!message)
- goto message_failed;
-
- GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s",
- message, gst_message_type_get_name (message->type));
-
- if (comm->on_message)
- (*comm->on_message) (comm->id, message, comm->user_data);
-
- GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
- comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
- break;
- }
- }
-
-done:
- return ret;
-
- /* ERRORS */
-out_of_sync:
- {
- GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
- ("Socket out of sync"));
- ret = FALSE;
- goto done;
- }
-state_change_failed:
- {
- GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
- ("could not read state change from fd"));
- ret = FALSE;
- goto done;
- }
-ack_failed:
- {
- GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
- ("could not read ack from fd"));
- ret = FALSE;
- goto done;
- }
-buffer_failed:
- {
- GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
- ("could not read buffer from fd"));
- ret = FALSE;
- goto done;
- }
-event_failed:
- {
- GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
- ("could not read event from fd"));
- ret = FALSE;
- goto done;
- }
-message_failed:
- {
- GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
- ("could not read message from fd"));
- ret = FALSE;
- goto done;
- }
-query_failed:
- {
- GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
- ("could not read query from fd"));
- ret = FALSE;
- goto done;
- }
-}
-
-static gpointer
-reader_thread (gpointer data)
-{
- GstIpcPipelineComm *comm = (GstIpcPipelineComm *) data;
-
- g_atomic_int_set (&comm->thread_running, 1);
- while (!comm->reader_thread_stopping) {
- if (!update_adapter (comm)) {
- if (comm->reader_thread_stopping) {
- GST_INFO_OBJECT (comm->element, "We're stopping, all good");
- break;
- }
- GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL),
- ("Failed to read from socket"));
- break;
- }
- read_many (comm);
- }
-
- GST_INFO_OBJECT (comm->element, "Reader thread ending");
- g_atomic_int_set (&comm->thread_running, 0);
-
- return NULL;
-}
-
-gboolean
-gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm,
- void (*on_buffer) (guint32, GstBuffer *, gpointer),
- void (*on_event) (guint32, GstEvent *, gboolean, gpointer),
- void (*on_query) (guint32, GstQuery *, gboolean, gpointer),
- void (*on_state_change) (guint32, GstStateChange, gpointer),
- void (*on_state_lost) (gpointer),
- void (*on_message) (guint32, GstMessage *, gpointer), gpointer user_data)
-{
- if (comm->reader_thread)
- return FALSE;
-
- comm->reader_thread_stopping = FALSE;
- comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
- comm->on_buffer = on_buffer;
- comm->on_event = on_event;
- comm->on_query = on_query;
- comm->on_state_change = on_state_change;
- comm->on_state_lost = on_state_lost;
- comm->on_message = on_message;
- comm->user_data = user_data;
- if (pipe (comm->reader_thread_stopping_pipe) < 0) {
- GST_WARNING_OBJECT (comm->element, "Failed to create pipes");
- return FALSE;
- }
- comm->reader_thread =
- g_thread_new ("reader", (GThreadFunc) reader_thread, comm);
- return TRUE;
-}
-
-void
-gst_ipc_pipeline_comm_stop_reader_thread (GstIpcPipelineComm * comm)
-{
- char dummy = 0;
-
- if (!comm->reader_thread)
- return;
- while (write (comm->reader_thread_stopping_pipe[1], &dummy, 1) < 0
- && errno == EINTR);
- g_thread_join (comm->reader_thread);
- close (comm->reader_thread_stopping_pipe[0]);
- close (comm->reader_thread_stopping_pipe[1]);
- comm->reader_thread = NULL;
-}
-
-static gchar *
-gst_value_serialize_event (const GValue * value)
-{
- const GstStructure *structure;
- GstEvent *ev;
- gchar *type, *ts, *seqnum, *rt_offset, *str, *str64, *s;
- GValue val = G_VALUE_INIT;
-
- ev = g_value_get_boxed (value);
-
- g_value_init (&val, gst_event_type_get_type ());
- g_value_set_enum (&val, ev->type);
- type = gst_value_serialize (&val);
- g_value_unset (&val);
-
- g_value_init (&val, G_TYPE_UINT64);
- g_value_set_uint64 (&val, ev->timestamp);
- ts = gst_value_serialize (&val);
- g_value_unset (&val);
-
- g_value_init (&val, G_TYPE_UINT);
- g_value_set_uint (&val, ev->seqnum);
- seqnum = gst_value_serialize (&val);
- g_value_unset (&val);
-
- g_value_init (&val, G_TYPE_INT64);
- g_value_set_int64 (&val, gst_event_get_running_time_offset (ev));
- rt_offset = gst_value_serialize (&val);
- g_value_unset (&val);
-
- structure = gst_event_get_structure (ev);
- str = gst_structure_to_string (structure);
- str64 = g_base64_encode ((guchar *) str, strlen (str) + 1);
- g_strdelimit (str64, "=", '_');
- g_free (str);
-
- s = g_strconcat (type, ":", ts, ":", seqnum, ":", rt_offset, ":", str64,
- NULL);
-
- g_free (type);
- g_free (ts);
- g_free (seqnum);
- g_free (rt_offset);
- g_free (str64);
-
- return s;
-}
-
-static gboolean
-gst_value_deserialize_event (GValue * dest, const gchar * s)
-{
- GstEvent *ev = NULL;
- GValue val = G_VALUE_INIT;
- gboolean ret = FALSE;
- gchar **fields;
- gsize len;
-
- fields = g_strsplit (s, ":", -1);
- if (g_strv_length (fields) != 5)
- goto wrong_length;
-
- g_strdelimit (fields[4], "_", '=');
- g_base64_decode_inplace (fields[4], &len);
-
- g_value_init (&val, gst_event_type_get_type ());
- if (!gst_value_deserialize (&val, fields[0]))
- goto fail;
- ev = gst_event_new_custom (g_value_get_enum (&val),
- gst_structure_new_from_string (fields[4]));
-
- g_value_unset (&val);
- g_value_init (&val, G_TYPE_UINT64);
- if (!gst_value_deserialize (&val, fields[1]))
- goto fail;
- ev->timestamp = g_value_get_uint64 (&val);
-
- g_value_unset (&val);
- g_value_init (&val, G_TYPE_UINT);
- if (!gst_value_deserialize (&val, fields[2]))
- goto fail;
- ev->seqnum = g_value_get_uint (&val);
-
- g_value_unset (&val);
- g_value_init (&val, G_TYPE_INT64);
- if (!gst_value_deserialize (&val, fields[3]))
- goto fail;
- gst_event_set_running_time_offset (ev, g_value_get_int64 (&val));
-
- g_value_take_boxed (dest, g_steal_pointer (&ev));
- ret = TRUE;
-
-fail:
- g_clear_pointer (&ev, gst_event_unref);
- g_value_unset (&val);
-
-wrong_length:
- g_strfreev (fields);
- return ret;
-}
-
-#define REGISTER_SERIALIZATION_NO_COMPARE(_gtype, _type) \
-G_STMT_START { \
- static GstValueTable gst_value = \
- { 0, NULL, \
- gst_value_serialize_ ## _type, gst_value_deserialize_ ## _type }; \
- gst_value.type = _gtype; \
- gst_value_register (&gst_value); \
-} G_STMT_END
-
-void
-gst_ipc_pipeline_comm_plugin_init (void)
-{
- static volatile gsize once = 0;
-
- if (g_once_init_enter (&once)) {
- GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_comm_debug, "ipcpipelinecomm", 0,
- "ipc pipeline comm");
- QUARK_ID = g_quark_from_static_string ("ipcpipeline-id");
- REGISTER_SERIALIZATION_NO_COMPARE (gst_event_get_type (), event);
- g_once_init_leave (&once, (gsize) 1);
- }
-}
diff --git a/gst/ipcpipeline/gstipcpipelinecomm.h b/gst/ipcpipeline/gstipcpipelinecomm.h
deleted file mode 100644
index bd7335e9e..000000000
--- a/gst/ipcpipeline/gstipcpipelinecomm.h
+++ /dev/null
@@ -1,132 +0,0 @@
-/* GStreamer
- * Copyright (C) 2015-2017 YouView TV Ltd
- * Author: Vincent Penquerch <vincent.penquerch@collabora.co.uk>
- *
- * gstipcpipelinecomm.h:
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
- * Boston, MA 02110-1301, USA.
- */
-
-
-#ifndef __GST_IPC_PIPELINE_COMM_H__
-#define __GST_IPC_PIPELINE_COMM_H__
-
-#include <glib.h>
-#include <gst/gst.h>
-#include <gst/base/gstadapter.h>
-
-G_BEGIN_DECLS
-
-#define GST_FLOW_COMM_ERROR GST_FLOW_CUSTOM_ERROR_1
-
-extern GQuark QUARK_ID;
-
-typedef enum {
- GST_IPC_PIPELINE_COMM_STATE_TYPE = 0,
- /* for the rest of the states we use directly the data type enums below */
-} GstIpcPipelineCommState;
-
-typedef enum {
- /* reply types */
- GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK = 1,
- GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT,
- /* data send types */
- GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER,
- GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT,
- GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT,
- GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY,
- GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE,
- GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST,
- GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE,
- GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE,
-} GstIpcPipelineCommDataType;
-
-typedef struct
-{
- GstElement *element;
-
- GMutex mutex;
- int fdin;
- int fdout;
- GHashTable *waiting_ids;
-
- GThread *reader_thread;
- gboolean reader_thread_stopping;
- volatile gint thread_running;
- int reader_thread_stopping_pipe[2];
- GstAdapter *adapter;
- guint8 state;
- guint32 send_id;
-
- guint32 payload_length;
- guint32 id;
-
- guint read_chunk_size;
- GstClockTime ack_time;
-
- void (*on_buffer) (guint32, GstBuffer *, gpointer);
- void (*on_event) (guint32, GstEvent *, gboolean, gpointer);
- void (*on_query) (guint32, GstQuery *, gboolean, gpointer);
- void (*on_state_change) (guint32, GstStateChange, gpointer);
- void (*on_state_lost) (gpointer);
- void (*on_message) (guint32, GstMessage *, gpointer);
- gpointer user_data;
-
-} GstIpcPipelineComm;
-
-void gst_ipc_pipeline_comm_plugin_init (void);
-
-void gst_ipc_pipeline_comm_init (GstIpcPipelineComm *comm, GstElement *e);
-void gst_ipc_pipeline_comm_clear (GstIpcPipelineComm *comm);
-void gst_ipc_pipeline_comm_cancel (GstIpcPipelineComm * comm,
- gboolean flushing);
-
-void gst_ipc_pipeline_comm_write_flow_ack_to_fd (GstIpcPipelineComm * comm,
- guint32 id, GstFlowReturn ret);
-void gst_ipc_pipeline_comm_write_boolean_ack_to_fd (GstIpcPipelineComm * comm,
- guint32 id, gboolean ret);
-void gst_ipc_pipeline_comm_write_state_change_ack_to_fd (
- GstIpcPipelineComm * comm, guint32 id, GstStateChangeReturn ret);
-
-void gst_ipc_pipeline_comm_write_query_result_to_fd (GstIpcPipelineComm * comm,
- guint32 id, gboolean result, GstQuery *query);
-
-GstFlowReturn gst_ipc_pipeline_comm_write_buffer_to_fd (
- GstIpcPipelineComm * comm, GstBuffer * buffer);
-gboolean gst_ipc_pipeline_comm_write_event_to_fd (GstIpcPipelineComm * comm,
- gboolean upstream, GstEvent * event);
-gboolean gst_ipc_pipeline_comm_write_query_to_fd (GstIpcPipelineComm * comm,
- gboolean upstream, GstQuery * query);
-GstStateChangeReturn gst_ipc_pipeline_comm_write_state_change_to_fd (
- GstIpcPipelineComm * comm, GstStateChange transition);
-void gst_ipc_pipeline_comm_write_state_lost_to_fd (GstIpcPipelineComm * comm);
-gboolean gst_ipc_pipeline_comm_write_message_to_fd (GstIpcPipelineComm * comm,
- GstMessage *message);
-
-gboolean gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm,
- void (*on_buffer) (guint32, GstBuffer *, gpointer),
- void (*on_event) (guint32, GstEvent *, gboolean, gpointer),
- void (*on_query) (guint32, GstQuery *, gboolean, gpointer),
- void (*on_state_change) (guint32, GstStateChange, gpointer),
- void (*on_state_lost) (gpointer),
- void (*on_message) (guint32, GstMessage *, gpointer),
- gpointer user_data);
-void gst_ipc_pipeline_comm_stop_reader_thread (GstIpcPipelineComm * comm);
-
-G_END_DECLS
-
-#endif
-
diff --git a/gst/ipcpipeline/gstipcpipelinesink.c b/gst/ipcpipeline/gstipcpipelinesink.c
deleted file mode 100644
index f1f02bfcb..000000000
--- a/gst/ipcpipeline/gstipcpipelinesink.c
+++ /dev/null
@@ -1,722 +0,0 @@
-/* GStreamer
- * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
- * 2005 Wim Taymans <wim@fluendo.com>
- * 2006 Thomas Vander Stichele <thomas at apestaart dot org>
- * 2014 Tim-Philipp Müller <tim centricular com>
- * 2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
- *
- * gstipcpipelinesink.c:
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
- * Boston, MA 02110-1301, USA.
- */
-/**
- * SECTION:element-ipcpipelinesink
- * @see_also: #GstIpcPipelineSrc, #GstIpcSlavePipeline
- *
- * Communicates with an ipcpipelinesrc element in another process via a socket.
- *
- * This element, together with ipcpipelinesrc and ipcslavepipeline form a
- * mechanism that allows splitting a single pipeline in different processes.
- * The main use-case for it is a playback pipeline split in two parts, where the
- * first part contains the networking, parsing and demuxing and the second part
- * contains the decoding and display. The intention of this split is to improve
- * security of an application, by letting the networking, parsing and demuxing
- * parts run in a less privileged process than the process that accesses the
- * decoder and display.
- *
- * Once the pipelines in those different processes have been created, the
- * playback can be controlled entirely from the first pipeline, which is the
- * one that contains ipcpipelinesink. We call this pipeline the “master”.
- * All relevant events and queries sent from the application are sent to
- * the master pipeline and messages to the application are sent from the master
- * pipeline. The second pipeline, in the other process, is transparently slaved.
- *
- * ipcpipelinesink can work only in push mode and does not synchronize buffers
- * to the clock. Synchronization is meant to happen either at the real sink at
- * the end of the remote slave pipeline, or not to happen at all, if the
- * pipeline is live.
- *
- * A master pipeline may contain more than one ipcpipelinesink elements, which
- * can be connected either to the same slave pipeline or to different ones.
- *
- * Communication with ipcpipelinesrc on the slave happens via a socket, using a
- * custom protocol. Each buffer, event, query, message or state change is
- * serialized in a "packet" and sent over the socket. The sender then
- * performs a blocking wait for a reply, if a return code is needed.
- *
- * All objects that contan a GstStructure (messages, queries, events) are
- * serialized by serializing the GstStructure to a string
- * (gst_structure_to_string). This implies some limitations, of course.
- * All fields of this structures that are not serializable to strings (ex.
- * object pointers) are ignored, except for some cases where custom
- * serialization may occur (ex error/warning/info messages that contain a
- * GError are serialized differently).
- *
- * Buffers are transported by writing their content directly on the socket.
- * More efficient ways for memory sharing could be implemented in the future.
- */
-
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#endif
-
-#include <unistd.h>
-#include "gstipcpipelinesink.h"
-#include <string.h>
-
-static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
- GST_PAD_SINK,
- GST_PAD_ALWAYS,
- GST_STATIC_CAPS_ANY);
-
-GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_sink_debug);
-#define GST_CAT_DEFAULT gst_ipc_pipeline_sink_debug
-
-enum
-{
- SIGNAL_DISCONNECT,
- /* FILL ME */
- LAST_SIGNAL
-};
-static guint gst_ipc_pipeline_sink_signals[LAST_SIGNAL] = { 0 };
-
-enum
-{
- PROP_0,
- PROP_FDIN,
- PROP_FDOUT,
- PROP_READ_CHUNK_SIZE,
- PROP_ACK_TIME,
-};
-
-
-#define DEFAULT_READ_CHUNK_SIZE 4096
-#define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND)
-
-#define _do_init \
- GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_sink_debug, "ipcpipelinesink", 0, "ipcpipelinesink element");
-#define gst_ipc_pipeline_sink_parent_class parent_class
-G_DEFINE_TYPE_WITH_CODE (GstIpcPipelineSink, gst_ipc_pipeline_sink,
- GST_TYPE_ELEMENT, _do_init);
-
-static void gst_ipc_pipeline_sink_set_property (GObject * object, guint prop_id,
- const GValue * value, GParamSpec * pspec);
-static void gst_ipc_pipeline_sink_get_property (GObject * object, guint prop_id,
- GValue * value, GParamSpec * pspec);
-static void gst_ipc_pipeline_sink_dispose (GObject * obj);
-static void gst_ipc_pipeline_sink_finalize (GObject * obj);
-static gboolean gst_ipc_pipeline_sink_start_reader_thread (GstIpcPipelineSink *
- sink);
-static void gst_ipc_pipeline_sink_stop_reader_thread (GstIpcPipelineSink *
- sink);
-
-static GstStateChangeReturn gst_ipc_pipeline_sink_change_state (GstElement *
- element, GstStateChange transition);
-
-static GstFlowReturn gst_ipc_pipeline_sink_chain (GstPad * pad,
- GstObject * parent, GstBuffer * buffer);
-static gboolean gst_ipc_pipeline_sink_event (GstPad * pad, GstObject * parent,
- GstEvent * event);
-static gboolean gst_ipc_pipeline_sink_element_query (GstElement * element,
- GstQuery * query);
-static gboolean gst_ipc_pipeline_sink_send_event (GstElement * element,
- GstEvent * event);
-static gboolean gst_ipc_pipeline_sink_query (GstPad * pad, GstObject * parent,
- GstQuery * query);
-static gboolean gst_ipc_pipeline_sink_pad_activate_mode (GstPad * pad,
- GstObject * parent, GstPadMode mode, gboolean active);
-
-
-static void gst_ipc_pipeline_sink_disconnect (GstIpcPipelineSink * sink);
-static void pusher (gpointer data, gpointer user_data);
-
-
-static void
-gst_ipc_pipeline_sink_class_init (GstIpcPipelineSinkClass * klass)
-{
- GObjectClass *gobject_class;
- GstElementClass *gstelement_class;
-
- gobject_class = G_OBJECT_CLASS (klass);
- gstelement_class = GST_ELEMENT_CLASS (klass);
-
- gobject_class->set_property = gst_ipc_pipeline_sink_set_property;
- gobject_class->get_property = gst_ipc_pipeline_sink_get_property;
- gobject_class->dispose = gst_ipc_pipeline_sink_dispose;
- gobject_class->finalize = gst_ipc_pipeline_sink_finalize;
-
- g_object_class_install_property (gobject_class, PROP_FDIN,
- g_param_spec_int ("fdin", "Input file descriptor",
- "File descriptor to received data from",
- -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_FDOUT,
- g_param_spec_int ("fdout", "Output file descriptor",
- "File descriptor to send data through",
- -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_READ_CHUNK_SIZE,
- g_param_spec_uint ("read-chunk-size", "Read chunk size",
- "Read chunk size",
- 1, 1 << 24, DEFAULT_READ_CHUNK_SIZE,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_ACK_TIME,
- g_param_spec_uint64 ("ack-time", "Ack time",
- "Maximum time to wait for a response to a message",
- 0, G_MAXUINT64, DEFAULT_ACK_TIME,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
- gst_ipc_pipeline_sink_signals[SIGNAL_DISCONNECT] =
- g_signal_new ("disconnect",
- G_TYPE_FROM_CLASS (klass),
- G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
- G_STRUCT_OFFSET (GstIpcPipelineSinkClass, disconnect),
- NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
-
- gst_element_class_set_static_metadata (gstelement_class,
- "Inter-process Pipeline Sink",
- "Sink",
- "Allows splitting and continuing a pipeline in another process",
- "Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>");
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&sinktemplate));
-
- gstelement_class->change_state =
- GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_change_state);
- gstelement_class->query =
- GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_element_query);
- gstelement_class->send_event =
- GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_send_event);
-
- klass->disconnect = GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_disconnect);
-}
-
-static void
-gst_ipc_pipeline_sink_init (GstIpcPipelineSink * sink)
-{
- GstPadTemplate *pad_template;
-
- GST_OBJECT_FLAG_SET (sink, GST_ELEMENT_FLAG_SINK);
-
- gst_ipc_pipeline_comm_init (&sink->comm, GST_ELEMENT (sink));
- sink->comm.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
- sink->comm.ack_time = DEFAULT_ACK_TIME;
- sink->comm.fdin = -1;
- sink->comm.fdout = -1;
- sink->threads = g_thread_pool_new (pusher, sink, -1, FALSE, NULL);
- gst_ipc_pipeline_sink_start_reader_thread (sink);
-
- pad_template =
- gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (sink), "sink");
- g_return_if_fail (pad_template != NULL);
-
- sink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
-
- gst_pad_set_activatemode_function (sink->sinkpad,
- gst_ipc_pipeline_sink_pad_activate_mode);
- gst_pad_set_query_function (sink->sinkpad, gst_ipc_pipeline_sink_query);
- gst_pad_set_event_function (sink->sinkpad, gst_ipc_pipeline_sink_event);
- gst_pad_set_chain_function (sink->sinkpad, gst_ipc_pipeline_sink_chain);
- gst_element_add_pad (GST_ELEMENT_CAST (sink), sink->sinkpad);
-
-}
-
-static void
-gst_ipc_pipeline_sink_dispose (GObject * obj)
-{
- GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (obj);
-
- gst_ipc_pipeline_sink_stop_reader_thread (sink);
- gst_ipc_pipeline_comm_cancel (&sink->comm, TRUE);
-
- G_OBJECT_CLASS (parent_class)->dispose (obj);
-}
-
-static void
-gst_ipc_pipeline_sink_finalize (GObject * obj)
-{
- GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (obj);
-
- gst_ipc_pipeline_comm_clear (&sink->comm);
- g_thread_pool_free (sink->threads, TRUE, TRUE);
-
- G_OBJECT_CLASS (parent_class)->finalize (obj);
-}
-
-static void
-gst_ipc_pipeline_sink_set_property (GObject * object, guint prop_id,
- const GValue * value, GParamSpec * pspec)
-{
- GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (object);
-
- switch (prop_id) {
- case PROP_FDIN:
- sink->comm.fdin = g_value_get_int (value);
- break;
- case PROP_FDOUT:
- sink->comm.fdout = g_value_get_int (value);
- break;
- case PROP_READ_CHUNK_SIZE:
- sink->comm.read_chunk_size = g_value_get_uint (value);
- break;
- case PROP_ACK_TIME:
- sink->comm.ack_time = g_value_get_uint64 (value);
- break;
- default:
- G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
- break;
- }
-}
-
-static void
-gst_ipc_pipeline_sink_get_property (GObject * object, guint prop_id,
- GValue * value, GParamSpec * pspec)
-{
- GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (object);
-
- switch (prop_id) {
- case PROP_FDIN:
- g_value_set_int (value, sink->comm.fdin);
- break;
- case PROP_FDOUT:
- g_value_set_int (value, sink->comm.fdout);
- break;
- case PROP_READ_CHUNK_SIZE:
- g_value_set_uint (value, sink->comm.read_chunk_size);
- break;
- case PROP_ACK_TIME:
- g_value_set_uint64 (value, sink->comm.ack_time);
- break;
- default:
- G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
- break;
- }
-}
-
-static gboolean
-gst_ipc_pipeline_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
-{
- GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent);
- gboolean ret;
-
- GST_DEBUG_OBJECT (sink, "received event %p of type %s (%d)",
- event, gst_event_type_get_name (event->type), event->type);
-
- ret = gst_ipc_pipeline_comm_write_event_to_fd (&sink->comm, FALSE, event);
- gst_event_unref (event);
- return ret;
-}
-
-static GstFlowReturn
-gst_ipc_pipeline_sink_chain (GstPad * pad, GstObject * parent,
- GstBuffer * buffer)
-{
- GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent);
- GstFlowReturn ret;
-
- GST_DEBUG_OBJECT (sink, "Rendering buffer %" GST_PTR_FORMAT, buffer);
-
- ret = gst_ipc_pipeline_comm_write_buffer_to_fd (&sink->comm, buffer);
- if (ret != GST_FLOW_OK)
- GST_DEBUG_OBJECT (sink, "Peer result was %s", gst_flow_get_name (ret));
-
- gst_buffer_unref (buffer);
- return ret;
-}
-
-static gboolean
-gst_ipc_pipeline_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
-{
- GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent);
- gboolean ret;
-
- GST_DEBUG_OBJECT (sink, "Got query %s: %" GST_PTR_FORMAT,
- GST_QUERY_TYPE_NAME (query), query);
-
- switch (GST_QUERY_TYPE (query)) {
- case GST_QUERY_ALLOCATION:
- GST_DEBUG_OBJECT (sink, "Rejecting ALLOCATION query");
- return FALSE;
- case GST_QUERY_CAPS:
- {
- /* caps queries occur even while linking the pipeline.
- * It is possible that the ipcpipelinesrc may not be connected at this
- * point, so let's avoid a couple of errors... */
- GstState state;
- GST_OBJECT_LOCK (sink);
- state = GST_STATE (sink);
- GST_OBJECT_UNLOCK (sink);
- if (state == GST_STATE_NULL)
- return FALSE;
- }
- default:
- break;
- }
- ret = gst_ipc_pipeline_comm_write_query_to_fd (&sink->comm, FALSE, query);
-
- return ret;
-}
-
-static gboolean
-gst_ipc_pipeline_sink_element_query (GstElement * element, GstQuery * query)
-{
- GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
- gboolean ret;
-
- GST_DEBUG_OBJECT (sink, "Got element query %s: %" GST_PTR_FORMAT,
- GST_QUERY_TYPE_NAME (query), query);
-
- ret = gst_ipc_pipeline_comm_write_query_to_fd (&sink->comm, TRUE, query);
- GST_DEBUG_OBJECT (sink, "Got query reply: %d: %" GST_PTR_FORMAT, ret, query);
- return ret;
-}
-
-static gboolean
-gst_ipc_pipeline_sink_send_event (GstElement * element, GstEvent * event)
-{
- GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
- gboolean ret;
-
- GST_DEBUG_OBJECT (sink, "Got element event %s: %" GST_PTR_FORMAT,
- GST_EVENT_TYPE_NAME (event), event);
-
- ret = gst_ipc_pipeline_comm_write_event_to_fd (&sink->comm, TRUE, event);
- GST_DEBUG_OBJECT (sink, "Got event reply: %d: %" GST_PTR_FORMAT, ret, event);
-
- gst_event_unref (event);
- return ret;
-}
-
-
-static gboolean
-gst_ipc_pipeline_sink_pad_activate_mode (GstPad * pad,
- GstObject * parent, GstPadMode mode, gboolean active)
-{
- if (mode == GST_PAD_MODE_PULL)
- return FALSE;
- return TRUE;
-}
-
-static void
-on_buffer (guint32 id, GstBuffer * buffer, gpointer user_data)
-{
- GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
- GST_ERROR_OBJECT (sink,
- "Got buffer id %u! I never knew buffers could go upstream...", id);
- gst_buffer_unref (buffer);
-}
-
-static void
-pusher (gpointer data, gpointer user_data)
-{
- GstIpcPipelineSink *sink = user_data;
- gboolean ret;
- guint32 id;
-
- id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (data),
- QUARK_ID));
-
- if (GST_IS_EVENT (data)) {
- GstEvent *event = GST_EVENT (data);
- GST_DEBUG_OBJECT (sink, "Pushing event async: %" GST_PTR_FORMAT, event);
- ret = gst_pad_push_event (sink->sinkpad, event);
- GST_DEBUG_OBJECT (sink, "Event pushed, return %d", ret);
- gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&sink->comm, id, ret);
- } else if (GST_IS_QUERY (data)) {
- GstQuery *query = GST_QUERY (data);
- GST_DEBUG_OBJECT (sink, "Pushing query async: %" GST_PTR_FORMAT, query);
- ret = gst_pad_peer_query (sink->sinkpad, query);
- GST_DEBUG_OBJECT (sink, "Query pushed, return %d", ret);
- gst_ipc_pipeline_comm_write_query_result_to_fd (&sink->comm, id, ret,
- query);
- gst_query_unref (query);
- } else {
- GST_ERROR_OBJECT (sink, "Unsupported object type");
- }
- gst_object_unref (sink);
-}
-
-static void
-on_event (guint32 id, GstEvent * event, gboolean upstream, gpointer user_data)
-{
- GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
-
- if (!upstream) {
- GST_ERROR_OBJECT (sink, "Got downstream event id %u! Not supposed to...",
- id);
- gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&sink->comm, id, FALSE);
- gst_event_unref (event);
- return;
- }
-
- GST_DEBUG_OBJECT (sink, "Got event id %u: %" GST_PTR_FORMAT, id, event);
- gst_object_ref (sink);
- g_thread_pool_push (sink->threads, event, NULL);
-}
-
-static void
-on_query (guint32 id, GstQuery * query, gboolean upstream, gpointer user_data)
-{
- GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
-
- if (!upstream) {
- GST_ERROR_OBJECT (sink, "Got downstream query id %u! Not supposed to...",
- id);
- gst_ipc_pipeline_comm_write_query_result_to_fd (&sink->comm, id, FALSE,
- query);
- gst_query_unref (query);
- return;
- }
-
- GST_DEBUG_OBJECT (sink, "Got query id %u: %" GST_PTR_FORMAT, id, query);
- gst_object_ref (sink);
- g_thread_pool_push (sink->threads, query, NULL);
-}
-
-static void
-on_state_change (guint32 id, GstStateChange transition, gpointer user_data)
-{
- GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
- GST_ERROR_OBJECT (sink, "Got state change id %u! Not supposed to...", id);
-}
-
-static void
-on_state_lost (gpointer user_data)
-{
- GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
-
- GST_DEBUG_OBJECT (sink, "Got state lost notification, losing state");
-
- GST_OBJECT_LOCK (sink);
- sink->pass_next_async_done = TRUE;
- GST_OBJECT_UNLOCK (sink);
-
- gst_element_lost_state (GST_ELEMENT (sink));
-}
-
-static void
-do_async_done (GstElement * element, gpointer user_data)
-{
- GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
- GstMessage *message = user_data;
-
- GST_STATE_LOCK (sink);
- GST_OBJECT_LOCK (sink);
- if (sink->pass_next_async_done) {
- sink->pass_next_async_done = FALSE;
- GST_OBJECT_UNLOCK (sink);
- gst_element_continue_state (element, GST_STATE_CHANGE_SUCCESS);
- GST_STATE_UNLOCK (sink);
- gst_element_post_message (element, gst_message_ref (message));
-
- } else {
- GST_OBJECT_UNLOCK (sink);
- GST_STATE_UNLOCK (sink);
- }
-}
-
-static void
-on_message (guint32 id, GstMessage * message, gpointer user_data)
-{
- GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
-
- GST_DEBUG_OBJECT (sink, "Got message id %u: %" GST_PTR_FORMAT, id, message);
-
- switch (GST_MESSAGE_TYPE (message)) {
- case GST_MESSAGE_ASYNC_DONE:
- GST_OBJECT_LOCK (sink);
- if (sink->pass_next_async_done) {
- GST_OBJECT_UNLOCK (sink);
- gst_element_call_async (GST_ELEMENT (sink), do_async_done,
- message, (GDestroyNotify) gst_message_unref);
- } else {
- GST_OBJECT_UNLOCK (sink);
- gst_message_unref (message);
- }
- return;
- default:
- break;
- }
-
- gst_element_post_message (GST_ELEMENT (sink), message);
-}
-
-static gboolean
-gst_ipc_pipeline_sink_start_reader_thread (GstIpcPipelineSink * sink)
-{
- if (!gst_ipc_pipeline_comm_start_reader_thread (&sink->comm, on_buffer,
- on_event, on_query, on_state_change, on_state_lost, on_message,
- sink)) {
- GST_ERROR_OBJECT (sink, "Failed to start reader thread");
- return FALSE;
- }
- return TRUE;
-}
-
-static void
-gst_ipc_pipeline_sink_stop_reader_thread (GstIpcPipelineSink * sink)
-{
- gst_ipc_pipeline_comm_stop_reader_thread (&sink->comm);
-}
-
-
-static void
-gst_ipc_pipeline_sink_disconnect (GstIpcPipelineSink * sink)
-{
- GST_DEBUG_OBJECT (sink, "Disconnecting");
- gst_ipc_pipeline_sink_stop_reader_thread (sink);
- sink->comm.fdin = -1;
- sink->comm.fdout = -1;
- gst_ipc_pipeline_comm_cancel (&sink->comm, FALSE);
- gst_ipc_pipeline_sink_start_reader_thread (sink);
-}
-
-static GstStateChangeReturn
-gst_ipc_pipeline_sink_change_state (GstElement * element,
- GstStateChange transition)
-{
- GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
- GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
- GstStateChangeReturn peer_ret = GST_STATE_CHANGE_SUCCESS;
- gboolean async = FALSE;
- gboolean down = FALSE;
-
- GST_DEBUG_OBJECT (sink, "Got state change request: %s -> %s",
- gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
- gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
-
- switch (transition) {
- case GST_STATE_CHANGE_NULL_TO_READY:
- if (sink->comm.fdin < 0) {
- GST_ERROR_OBJECT (element, "Invalid fdin: %d", sink->comm.fdin);
- return GST_STATE_CHANGE_FAILURE;
- }
- if (sink->comm.fdout < 0) {
- GST_ERROR_OBJECT (element, "Invalid fdout: %d", sink->comm.fdout);
- return GST_STATE_CHANGE_FAILURE;
- }
- if (!sink->comm.reader_thread) {
- GST_ERROR_OBJECT (element, "Failed to start reader thread");
- return GST_STATE_CHANGE_FAILURE;
- }
- break;
- case GST_STATE_CHANGE_READY_TO_PAUSED:
- case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
- case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
- /* In these transitions, it is possible that the peer returns ASYNC.
- * We don't know that in advance, but we post async-start anyway because
- * it needs to be delivered *before* async-done, and async-done may
- * arrive at any point in time after we've set the state of the peer.
- * In case the peer doesn't return ASYNC, we just post async-done
- * ourselves and the parent GstBin takes care of matching and deleting
- * them, so the app never gets any of these. */
- async = TRUE;
- break;
- default:
- break;
- }
-
- /* downwards state change */
- down = (GST_STATE_TRANSITION_CURRENT (transition) >=
- GST_STATE_TRANSITION_NEXT (transition));
-
- if (async) {
- GST_DEBUG_OBJECT (sink,
- "Posting async-start for %s, will need state-change-done",
- gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
-
- gst_element_post_message (GST_ELEMENT (sink),
- gst_message_new_async_start (GST_OBJECT (sink)));
-
- GST_OBJECT_LOCK (sink);
- sink->pass_next_async_done = TRUE;
- GST_OBJECT_UNLOCK (sink);
- }
-
- /* change the state of the peer first */
- /* If the fd out is -1, we do not actually call the peer. This will happen
- when we explicitely disconnected, and in that case we want to be able
- to bring the element down to NULL, so it can be restarted with a new
- slave pipeline. */
- if (sink->comm.fdout >= 0) {
- GST_DEBUG_OBJECT (sink, "Calling peer with state change");
- peer_ret = gst_ipc_pipeline_comm_write_state_change_to_fd (&sink->comm,
- transition);
- if (ret == GST_STATE_CHANGE_FAILURE && down) {
- GST_WARNING_OBJECT (sink, "Peer returned state change failure, "
- "but ignoring because we are going down");
- peer_ret = GST_STATE_CHANGE_SUCCESS;
- }
- } else {
- if (down) {
- GST_WARNING_OBJECT (sink, "Not calling peer (fdout %d)",
- sink->comm.fdout);
- peer_ret = GST_STATE_CHANGE_SUCCESS;
- } else {
- GST_ERROR_OBJECT (sink, "Not calling peer (fdout %d) and failing",
- sink->comm.fdout);
- peer_ret = GST_STATE_CHANGE_FAILURE;
- }
- }
-
- /* chain up to the parent class to change our state, if the peer succeeded */
- if (peer_ret != GST_STATE_CHANGE_FAILURE) {
- ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
-
- if (G_UNLIKELY (ret == GST_STATE_CHANGE_FAILURE && down)) {
- GST_WARNING_OBJECT (sink, "Parent returned state change failure, "
- "but ignoring because we are going down");
- ret = GST_STATE_CHANGE_SUCCESS;
- }
- }
-
- GST_DEBUG_OBJECT (sink, "For %s -> %s: Peer ret: %s, parent ret: %s",
- gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
- gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)),
- gst_element_state_change_return_get_name (peer_ret),
- gst_element_state_change_return_get_name (ret));
-
- /* now interpret the return codes */
- if (async && peer_ret != GST_STATE_CHANGE_ASYNC) {
- GST_DEBUG_OBJECT (sink, "Posting async-done for %s; peer wasn't ASYNC",
- gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
-
- GST_OBJECT_LOCK (sink);
- sink->pass_next_async_done = FALSE;
- GST_OBJECT_UNLOCK (sink);
-
- gst_element_post_message (GST_ELEMENT (sink),
- gst_message_new_async_done (GST_OBJECT (sink), GST_CLOCK_TIME_NONE));
- } else if (G_UNLIKELY (!async && peer_ret == GST_STATE_CHANGE_ASYNC)) {
- GST_WARNING_OBJECT (sink, "Transition not async but peer returned ASYNC");
- peer_ret = GST_STATE_CHANGE_SUCCESS;
- }
-
- if (peer_ret == GST_STATE_CHANGE_FAILURE || ret == GST_STATE_CHANGE_FAILURE) {
- if (peer_ret != GST_STATE_CHANGE_FAILURE && sink->comm.fdout >= 0) {
- /* only the parent's ret was FAILURE - revert remote changes */
- GST_DEBUG_OBJECT (sink, "Reverting remote state change because parent "
- "returned failure");
- gst_ipc_pipeline_comm_write_state_change_to_fd (&sink->comm,
- GST_STATE_TRANSITION (GST_STATE_TRANSITION_NEXT (transition),
- GST_STATE_TRANSITION_CURRENT (transition)));
- }
- return GST_STATE_CHANGE_FAILURE;
- }
-
- /* the parent's (GstElement) state change func won't return ASYNC or
- * NO_PREROLL, so unless it has returned FAILURE, which we have catched above,
- * we are not interested in its return code... just return the peer's */
- return peer_ret;
-}
diff --git a/gst/ipcpipeline/gstipcpipelinesink.h b/gst/ipcpipeline/gstipcpipelinesink.h
deleted file mode 100644
index 18c880eee..000000000
--- a/gst/ipcpipeline/gstipcpipelinesink.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/* GStreamer
- * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
- * 2000 Wim Taymans <wtay@chello.be>
- * 2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
- *
- * gstipcpipelinesink.h:
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
- * Boston, MA 02110-1301, USA.
- */
-
-
-#ifndef __GST_IPC_PIPELINE_SINK_H__
-#define __GST_IPC_PIPELINE_SINK_H__
-
-#include <gst/gst.h>
-#include "gstipcpipelinecomm.h"
-
-G_BEGIN_DECLS
-
-#define GST_TYPE_IPC_PIPELINE_SINK \
- (gst_ipc_pipeline_sink_get_type())
-#define GST_IPC_PIPELINE_SINK(obj) \
- (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_IPC_PIPELINE_SINK,GstIpcPipelineSink))
-#define GST_IPC_PIPELINE_SINK_CLASS(klass) \
- (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_IPC_PIPELINE_SINK,GstIpcPipelineSinkClass))
-#define GST_IS_IPC_PIPELINE_SINK(obj) \
- (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_IPC_PIPELINE_SINK))
-#define GST_IS_IPC_PIPELINE_SINK_CLASS(klass) \
- (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_IPC_PIPELINE_SINK))
-#define GST_IPC_PIPELINE_SINK_CAST(obj) ((GstIpcPipelineSink *)obj)
-
-typedef struct _GstIpcPipelineSink GstIpcPipelineSink;
-typedef struct _GstIpcPipelineSinkClass GstIpcPipelineSinkClass;
-
-struct _GstIpcPipelineSink {
- GstElement element;
-
- GstIpcPipelineComm comm;
- GThreadPool *threads;
- gboolean pass_next_async_done;
- GstPad *sinkpad;
-};
-
-struct _GstIpcPipelineSinkClass {
- GstElementClass parent_class;
-
- void (*disconnect) (GstIpcPipelineSink * sink);
-};
-
-G_GNUC_INTERNAL GType gst_ipc_pipeline_sink_get_type (void);
-
-G_END_DECLS
-
-#endif /* __GST_IPC_PIPELINE_SINK_H__ */
diff --git a/gst/ipcpipeline/gstipcpipelinesrc.c b/gst/ipcpipeline/gstipcpipelinesrc.c
deleted file mode 100644
index 656bd5a71..000000000
--- a/gst/ipcpipeline/gstipcpipelinesrc.c
+++ /dev/null
@@ -1,954 +0,0 @@
-/* GStreamer
- * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
- * 2000 Wim Taymans <wim@fluendo.com>
- * 2006 Thomas Vander Stichele <thomas at apestaart dot org>
- * 2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
- *
- * gstipcpipelinesrc.c:
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
- * Boston, MA 02110-1301, USA.
- */
-/**
- * SECTION:element-ipcpipelinesrc
- * @see_also: #GstIpcPipelineSink, #GstIpcSlavePipeline
- *
- * Communicates with an ipcpipelinesink element in another process via a socket.
- *
- * The ipcpipelinesrc element allows 2-way communication with an ipcpipelinesink
- * element on another process/pipeline. It is meant to run inside an
- * interslavepipeline and when paired with an ipcpipelinesink, it will slave its
- * whole parent pipeline to the "master" one, which contains the ipcpipelinesink.
- *
- * For more details about this mechanism and its uses, see the documentation
- * of the ipcpipelinesink element.
- */
-
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#endif
-
-#include <unistd.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include "gstipcpipelinesrc.h"
-
-static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
- GST_PAD_SRC,
- GST_PAD_ALWAYS,
- GST_STATIC_CAPS_ANY);
-
-GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_src_debug);
-#define GST_CAT_DEFAULT gst_ipc_pipeline_src_debug
-
-enum
-{
- /* FILL ME */
- SIGNAL_FORWARD_MESSAGE,
- SIGNAL_DISCONNECT,
- LAST_SIGNAL
-};
-static guint gst_ipc_pipeline_src_signals[LAST_SIGNAL] = { 0 };
-
-enum
-{
- PROP_0,
- PROP_FDIN,
- PROP_FDOUT,
- PROP_READ_CHUNK_SIZE,
- PROP_ACK_TIME,
- PROP_LAST,
-};
-
-static GQuark QUARK_UPSTREAM;
-
-#define DEFAULT_READ_CHUNK_SIZE 65536
-#define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND)
-
-#define _do_init \
- GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_src_debug, "ipcpipelinesrc", 0, "ipcpipelinesrc element");
-#define gst_ipc_pipeline_src_parent_class parent_class
-G_DEFINE_TYPE_WITH_CODE (GstIpcPipelineSrc, gst_ipc_pipeline_src,
- GST_TYPE_ELEMENT, _do_init);
-
-static void gst_ipc_pipeline_src_finalize (GObject * object);
-static void gst_ipc_pipeline_src_dispose (GObject * object);
-static void gst_ipc_pipeline_src_set_property (GObject * object, guint prop_id,
- const GValue * value, GParamSpec * pspec);
-static void gst_ipc_pipeline_src_get_property (GObject * object, guint prop_id,
- GValue * value, GParamSpec * pspec);
-
-static void gst_ipc_pipeline_src_cancel_queued (GstIpcPipelineSrc * src);
-
-static gboolean gst_ipc_pipeline_src_start_reader_thread (GstIpcPipelineSrc *
- src);
-static void gst_ipc_pipeline_src_stop_reader_thread (GstIpcPipelineSrc * src);
-
-static gboolean gst_ipc_pipeline_src_activate_mode (GstPad * pad,
- GstObject * parent, GstPadMode mode, gboolean active);
-static gboolean gst_ipc_pipeline_src_srcpad_event (GstPad * pad,
- GstObject * parent, GstEvent * event);
-static gboolean gst_ipc_pipeline_src_srcpad_query (GstPad * pad,
- GstObject * parent, GstQuery * query);
-static void gst_ipc_pipeline_src_loop (GstIpcPipelineSrc * src);
-
-static gboolean gst_ipc_pipeline_src_send_event (GstElement * element,
- GstEvent * event);
-static gboolean gst_ipc_pipeline_src_query (GstElement * element,
- GstQuery * query);
-static GstStateChangeReturn gst_ipc_pipeline_src_change_state (GstElement *
- element, GstStateChange transition);
-
-static gboolean gst_ipc_pipeline_src_forward_message (GstIpcPipelineSrc * src,
- GstMessage * msg);
-static void gst_ipc_pipeline_src_disconnect (GstIpcPipelineSrc * src);
-
-static void
-gst_ipc_pipeline_src_class_init (GstIpcPipelineSrcClass * klass)
-{
- GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
- GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
-
- QUARK_UPSTREAM = g_quark_from_static_string ("ipcpipeline-upstream");
-
- gobject_class->dispose = gst_ipc_pipeline_src_dispose;
- gobject_class->finalize = gst_ipc_pipeline_src_finalize;
-
- gobject_class->set_property = gst_ipc_pipeline_src_set_property;
- gobject_class->get_property = gst_ipc_pipeline_src_get_property;
-
- gstelement_class->send_event =
- GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_send_event);
- gstelement_class->query = GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_query);
- gstelement_class->change_state =
- GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_change_state);
-
- klass->forward_message =
- GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_forward_message);
- klass->disconnect = GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_disconnect);
-
- GST_DEBUG_REGISTER_FUNCPTR (gst_ipc_pipeline_src_activate_mode);
- GST_DEBUG_REGISTER_FUNCPTR (gst_ipc_pipeline_src_srcpad_event);
- GST_DEBUG_REGISTER_FUNCPTR (gst_ipc_pipeline_src_srcpad_query);
-
- g_object_class_install_property (gobject_class, PROP_FDIN,
- g_param_spec_int ("fdin", "Input file descriptor",
- "File descriptor to read data from",
- -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_FDOUT,
- g_param_spec_int ("fdout", "Output file descriptor",
- "File descriptor to write data through",
- -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_READ_CHUNK_SIZE,
- g_param_spec_uint ("read-chunk-size", "Read chunk size",
- "Read chunk size",
- 1, 1 << 24, DEFAULT_READ_CHUNK_SIZE,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_ACK_TIME,
- g_param_spec_uint64 ("ack-time", "Ack time",
- "Maximum time to wait for a response to a message",
- 0, G_MAXUINT64, DEFAULT_ACK_TIME,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
- gst_ipc_pipeline_src_signals[SIGNAL_FORWARD_MESSAGE] =
- g_signal_new ("forward-message", G_TYPE_FROM_CLASS (klass),
- G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
- G_STRUCT_OFFSET (GstIpcPipelineSrcClass, forward_message), NULL, NULL,
- g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 1, GST_TYPE_MESSAGE);
-
- gst_ipc_pipeline_src_signals[SIGNAL_DISCONNECT] =
- g_signal_new ("disconnect", G_TYPE_FROM_CLASS (klass),
- G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
- G_STRUCT_OFFSET (GstIpcPipelineSrcClass, disconnect),
- NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
-
- gst_element_class_set_static_metadata (gstelement_class,
- "Inter-process Pipeline Source",
- "Source",
- "Continues a split pipeline from another process",
- "Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>");
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&srctemplate));
-}
-
-static void
-gst_ipc_pipeline_src_init (GstIpcPipelineSrc * src)
-{
- GST_OBJECT_FLAG_SET (src, GST_ELEMENT_FLAG_SOURCE);
-
- gst_ipc_pipeline_comm_init (&src->comm, GST_ELEMENT (src));
- src->comm.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
- src->comm.ack_time = DEFAULT_ACK_TIME;
- src->flushing = TRUE;
- src->last_ret = GST_FLOW_FLUSHING;
- src->queued = NULL;
- g_cond_init (&src->create_cond);
-
- src->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
- gst_pad_set_activatemode_function (src->srcpad,
- gst_ipc_pipeline_src_activate_mode);
- gst_pad_set_event_function (src->srcpad, gst_ipc_pipeline_src_srcpad_event);
- gst_pad_set_query_function (src->srcpad, gst_ipc_pipeline_src_srcpad_query);
- gst_element_add_pad (GST_ELEMENT (src), src->srcpad);
-
- gst_ipc_pipeline_src_start_reader_thread (src);
-}
-
-static void
-gst_ipc_pipeline_src_dispose (GObject * object)
-{
- GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object);
-
- gst_ipc_pipeline_src_stop_reader_thread (src);
- gst_ipc_pipeline_src_cancel_queued (src);
- gst_ipc_pipeline_comm_cancel (&src->comm, TRUE);
-
- G_OBJECT_CLASS (parent_class)->dispose (object);
-}
-
-static void
-gst_ipc_pipeline_src_finalize (GObject * object)
-{
- GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object);
-
- gst_ipc_pipeline_comm_clear (&src->comm);
- g_cond_clear (&src->create_cond);
-
- G_OBJECT_CLASS (parent_class)->finalize (object);
-}
-
-static void
-gst_ipc_pipeline_src_set_property (GObject * object, guint prop_id,
- const GValue * value, GParamSpec * pspec)
-{
- GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object);
-
- switch (prop_id) {
- case PROP_FDIN:
- src->comm.fdin = g_value_get_int (value);
- break;
- case PROP_FDOUT:
- src->comm.fdout = g_value_get_int (value);
- break;
- case PROP_READ_CHUNK_SIZE:
- src->comm.read_chunk_size = g_value_get_uint (value);
- break;
- case PROP_ACK_TIME:
- src->comm.ack_time = g_value_get_uint64 (value);
- break;
- default:
- G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
- break;
- }
-}
-
-static void
-gst_ipc_pipeline_src_get_property (GObject * object, guint prop_id,
- GValue * value, GParamSpec * pspec)
-{
- GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object);
-
- g_return_if_fail (GST_IS_IPC_PIPELINE_SRC (object));
-
- switch (prop_id) {
- case PROP_FDIN:
- g_value_set_int (value, src->comm.fdin);
- break;
- case PROP_FDOUT:
- g_value_set_int (value, src->comm.fdout);
- break;
- case PROP_READ_CHUNK_SIZE:
- g_value_set_uint (value, src->comm.read_chunk_size);
- break;
- case PROP_ACK_TIME:
- g_value_set_uint64 (value, src->comm.ack_time);
- break;
- default:
- G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
- break;
- }
-}
-
-static void
-gst_ipc_pipeline_src_log_queue (GstIpcPipelineSrc * src)
-{
- GList *queued;
- guint n;
-
- queued = src->queued;
- n = 0;
- GST_LOG_OBJECT (src, "There are %u objects in the queue",
- g_list_length (queued));
- while (queued) {
- void *object = queued->data;
- if (GST_IS_EVENT (object)) {
- GST_LOG_OBJECT (src, " #%u: %s event", n, GST_EVENT_TYPE_NAME (object));
- } else if (GST_IS_QUERY (object)) {
- GST_LOG_OBJECT (src, " #%u: %s query", n, GST_QUERY_TYPE_NAME (object));
- } else if (GST_IS_BUFFER (object)) {
- GST_LOG_OBJECT (src, " #%u: %zu bytes buffer", n,
- (size_t) gst_buffer_get_size (object));
- } else {
- GST_LOG_OBJECT (src, " #%u: unknown item in queue", n);
- }
- queued = queued->next;
- ++n;
- }
-}
-
-static void
-gst_ipc_pipeline_src_cancel_queued (GstIpcPipelineSrc * src)
-{
- GList *queued;
- guint32 id;
-
- g_mutex_lock (&src->comm.mutex);
- queued = src->queued;
- src->queued = NULL;
- g_cond_broadcast (&src->create_cond);
- g_mutex_unlock (&src->comm.mutex);
-
- while (queued) {
- void *object = queued->data;
-
- id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (object),
- QUARK_ID));
-
- queued = g_list_delete_link (queued, queued);
- if (GST_IS_EVENT (object)) {
- GstEvent *event = GST_EVENT (object);
- GST_DEBUG_OBJECT (src, "Cancelling queued event: %" GST_PTR_FORMAT,
- event);
- gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, FALSE);
- gst_event_unref (event);
- } else if (GST_IS_BUFFER (object)) {
- GstBuffer *buffer = GST_BUFFER (object);
- GST_DEBUG_OBJECT (src, "Cancelling queued buffer: %" GST_PTR_FORMAT,
- buffer);
- gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id,
- GST_FLOW_FLUSHING);
- gst_buffer_unref (buffer);
- } else if (GST_IS_QUERY (object)) {
- GstQuery *query = GST_QUERY (object);
- GST_DEBUG_OBJECT (src, "Cancelling queued query: %" GST_PTR_FORMAT,
- query);
- gst_ipc_pipeline_comm_write_query_result_to_fd (&src->comm, id, FALSE,
- query);
- gst_query_unref (query);
- }
- }
-
-}
-
-static void
-gst_ipc_pipeline_src_start_loop (GstIpcPipelineSrc * src)
-{
- g_mutex_lock (&src->comm.mutex);
- src->flushing = FALSE;
- src->last_ret = GST_FLOW_OK;
- g_mutex_unlock (&src->comm.mutex);
-
- gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_ipc_pipeline_src_loop,
- src, NULL);
-}
-
-static void
-gst_ipc_pipeline_src_stop_loop (GstIpcPipelineSrc * src, gboolean stop)
-{
- g_mutex_lock (&src->comm.mutex);
- src->flushing = TRUE;
- g_cond_broadcast (&src->create_cond);
- g_mutex_unlock (&src->comm.mutex);
-
- if (stop)
- gst_pad_stop_task (src->srcpad);
-}
-
-static gboolean
-gst_ipc_pipeline_src_activate_mode (GstPad * pad, GstObject * parent,
- GstPadMode mode, gboolean active)
-{
- GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (parent);
-
- switch (mode) {
- case GST_PAD_MODE_PUSH:
- GST_DEBUG_OBJECT (pad, "%s in push mode", active ? "activating" :
- "deactivating");
- if (active) {
- gst_ipc_pipeline_src_start_loop (src);
- } else {
- gst_ipc_pipeline_src_stop_loop (src, TRUE);
- gst_ipc_pipeline_comm_cancel (&src->comm, FALSE);
- }
- return TRUE;
- default:
- GST_DEBUG_OBJECT (pad, "unsupported activation mode");
- return FALSE;
- }
-}
-
-static gboolean
-gst_ipc_pipeline_src_srcpad_event (GstPad * pad, GstObject * parent,
- GstEvent * event)
-{
- GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (parent);
- gboolean ret;
-
- GST_DEBUG_OBJECT (src, "Got upstream event %s", GST_EVENT_TYPE_NAME (event));
-
- ret = gst_ipc_pipeline_comm_write_event_to_fd (&src->comm, TRUE, event);
- gst_event_unref (event);
-
- GST_DEBUG_OBJECT (src, "Returning event result: %d", ret);
- return ret;
-}
-
-static gboolean
-gst_ipc_pipeline_src_srcpad_query (GstPad * pad, GstObject * parent,
- GstQuery * query)
-{
- GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (parent);
- gboolean ret;
-
- /* answer some queries that do not make sense to be forwarded */
- switch (GST_QUERY_TYPE (query)) {
- case GST_QUERY_LATENCY:
- return TRUE;
- case GST_QUERY_CONTEXT:
- return FALSE;
- case GST_QUERY_CAPS:
- {
- /* caps queries occur even while linking the pipeline.
- * It is possible that the ipcpipelinesink may not be connected at this
- * point, so let's avoid a couple of errors... */
- GstState state;
- GST_OBJECT_LOCK (src);
- state = GST_STATE (src);
- GST_OBJECT_UNLOCK (src);
- if (state == GST_STATE_NULL)
- return FALSE;
- }
- default:
- break;
- }
-
- GST_DEBUG_OBJECT (src, "Got upstream query %s: %" GST_PTR_FORMAT,
- GST_QUERY_TYPE_NAME (query), query);
-
- ret = gst_ipc_pipeline_comm_write_query_to_fd (&src->comm, TRUE, query);
-
- GST_DEBUG_OBJECT (src, "Returning query result: %d, %" GST_PTR_FORMAT,
- ret, query);
- return ret;
-}
-
-static void
-gst_ipc_pipeline_src_loop (GstIpcPipelineSrc * src)
-{
- gpointer object;
- guint32 id;
- gboolean ok;
- GstFlowReturn ret = GST_FLOW_OK;
-
- g_mutex_lock (&src->comm.mutex);
-
- while (!src->queued && !src->flushing)
- g_cond_wait (&src->create_cond, &src->comm.mutex);
-
- if (src->flushing)
- goto out;
-
- object = src->queued->data;
- src->queued = g_list_delete_link (src->queued, src->queued);
- g_mutex_unlock (&src->comm.mutex);
-
- id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (object),
- QUARK_ID));
-
- if (GST_IS_BUFFER (object)) {
- GstBuffer *buf = GST_BUFFER (object);
- GST_DEBUG_OBJECT (src, "Pushing queued buffer: %" GST_PTR_FORMAT, buf);
- ret = gst_pad_push (src->srcpad, buf);
- GST_DEBUG_OBJECT (src, "pushed id %u, ret: %s", id,
- gst_flow_get_name (ret));
- gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id, ret);
- } else if (GST_IS_EVENT (object)) {
- GstEvent *event = GST_EVENT (object);
- GST_DEBUG_OBJECT (src, "Pushing queued event: %" GST_PTR_FORMAT, event);
- ok = gst_pad_push_event (src->srcpad, event);
- gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ok);
- } else if (GST_IS_QUERY (object)) {
- GstQuery *query = GST_QUERY (object);
- GST_DEBUG_OBJECT (src, "Pushing queued query: %" GST_PTR_FORMAT, query);
- ok = gst_pad_peer_query (src->srcpad, query);
- gst_ipc_pipeline_comm_write_query_result_to_fd (&src->comm, id, ok, query);
- gst_query_unref (query);
- } else {
- GST_WARNING_OBJECT (src, "Unknown data type queued");
- }
-
- g_mutex_lock (&src->comm.mutex);
- if (!src->queued)
- g_cond_broadcast (&src->create_cond);
-out:
- if (src->flushing)
- ret = GST_FLOW_FLUSHING;
- if (ret != GST_FLOW_OK)
- src->last_ret = ret;
- g_mutex_unlock (&src->comm.mutex);
-
- if (ret == GST_FLOW_FLUSHING) {
- gst_ipc_pipeline_src_cancel_queued (src);
- gst_pad_pause_task (src->srcpad);
- }
-}
-
-static gboolean
-gst_ipc_pipeline_src_send_event (GstElement * element, GstEvent * event)
-{
- GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
- return gst_pad_push_event (src->srcpad, event);
-}
-
-static gboolean
-gst_ipc_pipeline_src_query (GstElement * element, GstQuery * query)
-{
- GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
- return gst_pad_query (src->srcpad, query);
-}
-
-static GstElement *
-find_pipeline (GstElement * element)
-{
- GstElement *pipeline = element;
- while (GST_ELEMENT_PARENT (pipeline)) {
- pipeline = GST_ELEMENT_PARENT (pipeline);
- if (GST_IS_PIPELINE (pipeline))
- break;
- }
- if (!pipeline || !GST_IS_PIPELINE (pipeline)) {
- pipeline = NULL;
- }
- return pipeline;
-}
-
-static gboolean
-gst_ipc_pipeline_src_forward_message (GstIpcPipelineSrc * src, GstMessage * msg)
-{
- gboolean skip = FALSE;
-
- GST_DEBUG_OBJECT (src, "Message to forward: %" GST_PTR_FORMAT, msg);
-
- switch (GST_MESSAGE_TYPE (msg)) {
- case GST_MESSAGE_STATE_CHANGED:
- {
- GstState old, new, pending;
- GstElement *pipeline = find_pipeline (GST_ELEMENT (src));
-
- gst_message_parse_state_changed (msg, &old, &new, &pending);
-
- if (GST_MESSAGE_SRC (msg) == GST_OBJECT (pipeline) &&
- old == new && new == pending) {
- GST_DEBUG_OBJECT (src, "Detected lost state, notifying master");
- gst_ipc_pipeline_comm_write_state_lost_to_fd (&src->comm);
- }
- /* fall through & skip */
- }
- case GST_MESSAGE_ASYNC_START:
- case GST_MESSAGE_CLOCK_PROVIDE:
- case GST_MESSAGE_CLOCK_LOST:
- case GST_MESSAGE_NEW_CLOCK:
- case GST_MESSAGE_STREAM_STATUS:
- case GST_MESSAGE_NEED_CONTEXT:
- case GST_MESSAGE_HAVE_CONTEXT:
- case GST_MESSAGE_STRUCTURE_CHANGE:
- skip = TRUE;
- break;
- case GST_MESSAGE_RESET_TIME:
- {
- GQuark ipcpipelinesrc_posted = g_quark_from_static_string
- ("gstinterslavepipeline-message-already-posted");
-
- skip = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (msg),
- ipcpipelinesrc_posted));
- if (!skip) {
- gst_mini_object_set_qdata (GST_MINI_OBJECT (msg), ipcpipelinesrc_posted,
- GUINT_TO_POINTER (1), NULL);
- }
- break;
- }
- case GST_MESSAGE_ERROR:
- {
- GError *error = NULL;
-
- /* skip forwarding a RESOURCE/WRITE error message that originated from
- * ipcpipelinesrc; we post this error when writing to the comm fd fails,
- * so if we try to forward it here, we will likely get another one posted
- * immediately and end up doing an endless loop */
- gst_message_parse_error (msg, &error, NULL);
- skip = (GST_MESSAGE_SRC (msg) == GST_OBJECT_CAST (src)
- && error->domain == gst_resource_error_quark ()
- && error->code == GST_RESOURCE_ERROR_WRITE);
- g_error_free (error);
- break;
- }
- default:
- break;
- }
-
- if (skip) {
- GST_DEBUG_OBJECT (src, "message will not be forwarded");
- return TRUE;
- }
-
- return gst_ipc_pipeline_comm_write_message_to_fd (&src->comm, msg);
-}
-
-static void
-on_buffer (guint32 id, GstBuffer * buffer, gpointer user_data)
-{
- GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data);
- GST_DEBUG_OBJECT (src, "Got buffer id %u, queueing: %" GST_PTR_FORMAT, id,
- buffer);
- g_mutex_lock (&src->comm.mutex);
- if (!GST_PAD_IS_ACTIVE (src->srcpad) || src->flushing) {
- g_mutex_unlock (&src->comm.mutex);
- GST_INFO_OBJECT (src, "We're not started or flushing, buffer ignored");
- gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id,
- GST_FLOW_FLUSHING);
- gst_buffer_unref (buffer);
- return;
- }
- if (src->last_ret != GST_FLOW_OK) {
- GstFlowReturn last_ret = src->last_ret;
- g_mutex_unlock (&src->comm.mutex);
- GST_DEBUG_OBJECT (src, "Last flow was %s, rejecting buffer",
- gst_flow_get_name (last_ret));
- gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id, last_ret);
- gst_buffer_unref (buffer);
- return;
- }
- src->queued = g_list_append (src->queued, buffer); /* keep the ref */
- gst_ipc_pipeline_src_log_queue (src);
- g_cond_broadcast (&src->create_cond);
- g_mutex_unlock (&src->comm.mutex);
-}
-
-static void
-do_oob_event (GstElement * element, gpointer user_data)
-{
- GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
- GstEvent *event = user_data;
- gboolean ret, upstream;
- guint32 id;
-
- id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT
- (event), QUARK_ID));
- upstream = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT
- (event), QUARK_UPSTREAM));
-
- if (upstream) {
- GstElement *pipeline;
- gboolean ok = FALSE;
-
- if (!(pipeline = find_pipeline (element))) {
- GST_ERROR_OBJECT (src, "No pipeline found");
- gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ok);
- } else {
- GST_DEBUG_OBJECT (src, "Posting upstream event on pipeline: %"
- GST_PTR_FORMAT, event);
- ok = gst_element_send_event (pipeline, gst_event_ref (event));
- gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ok);
- }
- } else {
- GST_DEBUG_OBJECT (src, "Pushing event async: %" GST_PTR_FORMAT, event);
- ret = gst_element_send_event (element, gst_event_ref (event));
- GST_DEBUG_OBJECT (src, "Event pushed, return %d", ret);
- gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ret);
- }
-}
-
-static void
-on_event (guint32 id, GstEvent * event, gboolean upstream, gpointer user_data)
-{
- GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data);
- GstFlowReturn last_ret = GST_FLOW_OK;
-
- GST_DEBUG_OBJECT (src, "Got event id %u, queueing: %" GST_PTR_FORMAT, id,
- event);
-
- gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_UPSTREAM,
- GINT_TO_POINTER (upstream), NULL);
-
- switch (GST_EVENT_TYPE (event)) {
- case GST_EVENT_FLUSH_START:
- gst_ipc_pipeline_src_stop_loop (src, FALSE);
- break;
- case GST_EVENT_FLUSH_STOP:
- gst_ipc_pipeline_src_start_loop (src);
- break;
- default:
- g_mutex_lock (&src->comm.mutex);
- last_ret = src->last_ret;
- g_mutex_unlock (&src->comm.mutex);
- break;
- }
-
- if (GST_EVENT_IS_SERIALIZED (event) && !upstream) {
- if (last_ret != GST_FLOW_OK) {
- GST_DEBUG_OBJECT (src, "Last flow was %s, rejecting event",
- gst_flow_get_name (last_ret));
- gst_event_unref (event);
- gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, FALSE);
- } else {
- GST_DEBUG_OBJECT (src, "This is a serialized event, adding to queue %p",
- src->queued);
- g_mutex_lock (&src->comm.mutex);
- src->queued = g_list_append (src->queued, event); /* keep the ref */
- gst_ipc_pipeline_src_log_queue (src);
- g_cond_broadcast (&src->create_cond);
- g_mutex_unlock (&src->comm.mutex);
- }
- } else {
- if (last_ret != GST_FLOW_OK && !upstream) {
- GST_DEBUG_OBJECT (src, "Last flow was %s, rejecting event",
- gst_flow_get_name (last_ret));
- gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, FALSE);
- gst_event_unref (event);
- } else {
- GST_DEBUG_OBJECT (src,
- "This is not a serialized event, pushing in a thread");
- gst_element_call_async (GST_ELEMENT (src), do_oob_event, event,
- (GDestroyNotify) gst_event_unref);
- }
- }
-}
-
-static void
-do_oob_query (GstElement * element, gpointer user_data)
-{
- GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
- GstQuery *query = GST_QUERY (user_data);
- guint32 id;
- gboolean upstream;
- gboolean ret;
-
- id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT
- (query), QUARK_ID));
- upstream = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT
- (query), QUARK_UPSTREAM));
-
- if (upstream) {
- GstElement *pipeline;
-
- if (!(pipeline = find_pipeline (element))) {
- GST_ERROR_OBJECT (src, "No pipeline found");
- ret = FALSE;
- } else {
- GST_DEBUG_OBJECT (src, "Posting query on pipeline: %" GST_PTR_FORMAT,
- query);
- ret = gst_element_query (pipeline, query);
- }
- } else {
- GST_DEBUG_OBJECT (src, "Pushing query async: %" GST_PTR_FORMAT, query);
- ret = gst_pad_peer_query (src->srcpad, query);
- GST_DEBUG_OBJECT (src, "Query pushed, return %d", ret);
- }
- gst_ipc_pipeline_comm_write_query_result_to_fd (&src->comm, id, ret, query);
-}
-
-static void
-on_query (guint32 id, GstQuery * query, gboolean upstream, gpointer user_data)
-{
- GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data);
-
- GST_DEBUG_OBJECT (src, "Got query id %u, queueing: %" GST_PTR_FORMAT, id,
- query);
-
- if (GST_QUERY_IS_SERIALIZED (query) && !upstream) {
- g_mutex_lock (&src->comm.mutex);
- src->queued = g_list_append (src->queued, query); /* keep the ref */
- gst_ipc_pipeline_src_log_queue (src);
- g_cond_broadcast (&src->create_cond);
- g_mutex_unlock (&src->comm.mutex);
- } else {
- gst_mini_object_set_qdata (GST_MINI_OBJECT (query), QUARK_UPSTREAM,
- GINT_TO_POINTER (upstream), NULL);
- gst_element_call_async (GST_ELEMENT (src), do_oob_query, query,
- (GDestroyNotify) gst_query_unref);
- }
-}
-
-struct StateChangeData
-{
- guint32 id;
- GstStateChange transition;
-};
-
-static void
-do_state_change (GstElement * element, gpointer data)
-{
- GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
- GstElement *pipeline;
- GstStateChangeReturn ret = GST_STATE_CHANGE_FAILURE;
- GstState state, pending, effective;
- struct StateChangeData *d = data;
- guint32 id = d->id;
- GstStateChange transition = d->transition;
- gboolean down;
-
- GST_DEBUG_OBJECT (src, "Doing state change id %u, %s -> %s", id,
- gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
- gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
-
- if (!(pipeline = find_pipeline (element))) {
- GST_ERROR_OBJECT (src, "No pipeline found");
- ret = GST_STATE_CHANGE_FAILURE;
- goto done_nolock;
- }
-
- down = (GST_STATE_TRANSITION_CURRENT (transition) >=
- GST_STATE_TRANSITION_NEXT (transition));
-
- GST_STATE_LOCK (pipeline);
- ret = gst_element_get_state (pipeline, &state, &pending, 0);
-
- /* if we are pending a state change, count the pending state as
- * the current one */
- effective = pending == GST_STATE_VOID_PENDING ? state : pending;
-
- GST_DEBUG_OBJECT (src, "Current element state: ret:%s state:%s pending:%s "
- "effective:%s", gst_element_state_change_return_get_name (ret),
- gst_element_state_get_name (state),
- gst_element_state_get_name (pending),
- gst_element_state_get_name (effective));
-
- if ((GST_STATE_TRANSITION_NEXT (transition) <= effective && !down) ||
- (GST_STATE_TRANSITION_NEXT (transition) > effective && down)) {
- /* if the request was to transition to a state that we have already
- * transitioned to in the same direction, then we just silently return */
- GST_DEBUG_OBJECT (src, "State transition to %s is unnecessary",
- gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
- /* make sure we return SUCCESS if the transition is to NULL or READY,
- * even if our current ret is ASYNC for example; also, make sure not
- * to return FAILURE, since our state is already committed */
- if (GST_STATE_TRANSITION_NEXT (transition) <= GST_STATE_READY ||
- ret == GST_STATE_CHANGE_FAILURE) {
- ret = GST_STATE_CHANGE_SUCCESS;
- }
- } else if (ret != GST_STATE_CHANGE_FAILURE || down) {
- /* if the request was to transition to a state that we haven't already
- * transitioned to in the same direction, then we need to request a state
- * change in the pipeline, *unless* we are going upwards and the last ret
- * was FAILURE, in which case we should just return FAILURE and stop.
- * We don't stop a downwards state change though in case of FAILURE, since
- * we need to be able to bring the pipeline down to NULL. Note that
- * GST_MESSAGE_ERROR will cause ret to be GST_STATE_CHANGE_FAILURE */
- ret = gst_element_set_state (pipeline,
- GST_STATE_TRANSITION_NEXT (transition));
- GST_DEBUG_OBJECT (src, "gst_element_set_state returned %s",
- gst_element_state_change_return_get_name (ret));
- }
-
- GST_STATE_UNLOCK (pipeline);
-
-done_nolock:
- GST_DEBUG_OBJECT (src, "sending state change ack, ret = %s",
- gst_element_state_change_return_get_name (ret));
- gst_ipc_pipeline_comm_write_state_change_ack_to_fd (&src->comm, id, ret);
-}
-
-static void
-on_state_change (guint32 id, GstStateChange transition, gpointer user_data)
-{
- struct StateChangeData *d;
- GstElement *ipcpipelinesrc = GST_ELEMENT (user_data);
-
- GST_DEBUG_OBJECT (ipcpipelinesrc, "Got state change id %u, %s -> %s", id,
- gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
- gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
-
- d = g_new (struct StateChangeData, 1);
- d->id = id;
- d->transition = transition;
-
- gst_element_call_async (ipcpipelinesrc, do_state_change, d, g_free);
-}
-
-static void
-on_message (guint32 id, GstMessage * message, gpointer user_data)
-{
- GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data);
-
- GST_ERROR_OBJECT (src, "Got message id %u, not supposed to: %" GST_PTR_FORMAT,
- id, message);
- gst_message_unref (message);
-}
-
-static gboolean
-gst_ipc_pipeline_src_start_reader_thread (GstIpcPipelineSrc * src)
-{
- if (!gst_ipc_pipeline_comm_start_reader_thread (&src->comm, on_buffer,
- on_event, on_query, on_state_change, NULL, on_message, src)) {
- GST_ERROR_OBJECT (src, "Failed to start reader thread");
- return FALSE;
- }
- return TRUE;
-}
-
-static void
-gst_ipc_pipeline_src_stop_reader_thread (GstIpcPipelineSrc * src)
-{
- gst_ipc_pipeline_comm_stop_reader_thread (&src->comm);
-}
-
-static void
-gst_ipc_pipeline_src_disconnect (GstIpcPipelineSrc * src)
-{
- GST_DEBUG_OBJECT (src, "Disconnecting");
- gst_ipc_pipeline_src_stop_reader_thread (src);
- src->comm.fdin = -1;
- src->comm.fdout = -1;
- gst_ipc_pipeline_comm_cancel (&src->comm, FALSE);
- gst_ipc_pipeline_src_start_reader_thread (src);
-}
-
-static GstStateChangeReturn
-gst_ipc_pipeline_src_change_state (GstElement * element,
- GstStateChange transition)
-{
- GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
-
- switch (transition) {
- case GST_STATE_CHANGE_NULL_TO_READY:
- if (src->comm.fdin < 0) {
- GST_ERROR_OBJECT (element, "Invalid fdin: %d", src->comm.fdin);
- return GST_STATE_CHANGE_FAILURE;
- }
- if (src->comm.fdout < 0) {
- GST_ERROR_OBJECT (element, "Invalid fdout: %d", src->comm.fdout);
- return GST_STATE_CHANGE_FAILURE;
- }
- if (!src->comm.reader_thread) {
- GST_ERROR_OBJECT (element, "Failed to start reader thread");
- return GST_STATE_CHANGE_FAILURE;
- }
- break;
- default:
- break;
- }
- return GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
-}
diff --git a/gst/ipcpipeline/gstipcpipelinesrc.h b/gst/ipcpipeline/gstipcpipelinesrc.h
deleted file mode 100644
index 221fccdd1..000000000
--- a/gst/ipcpipeline/gstipcpipelinesrc.h
+++ /dev/null
@@ -1,76 +0,0 @@
-/* GStreamer
- * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
- * 2000 Wim Taymans <wtay@chello.be>
- * 2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
- *
- * gstipcpipelinesrc.h:
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
- * Boston, MA 02110-1301, USA.
- */
-
-
-#ifndef __GST_IPC_PIPELINE_SRC_H__
-#define __GST_IPC_PIPELINE_SRC_H__
-
-#include <gst/gst.h>
-#include "gstipcpipelinecomm.h"
-
-G_BEGIN_DECLS
-
-#define GST_TYPE_IPC_PIPELINE_SRC \
- (gst_ipc_pipeline_src_get_type())
-#define GST_IPC_PIPELINE_SRC(obj) \
- (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_IPC_PIPELINE_SRC,GstIpcPipelineSrc))
-#define GST_IPC_PIPELINE_SRC_CLASS(klass) \
- (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_IPC_PIPELINE_SRC,GstIpcPipelineSrcClass))
-#define GST_IS_IPC_PIPELINE_SRC(obj) \
- (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_IPC_PIPELINE_SRC))
-#define GST_IS_IPC_PIPELINE_SRC_CLASS(klass) \
- (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_IPC_PIPELINE_SRC))
-
-typedef struct _GstIpcPipelineSrc GstIpcPipelineSrc;
-typedef struct _GstIpcPipelineSrcClass GstIpcPipelineSrcClass;
-
-/**
- * GstIpcPipelineSrc:
- *
- * Opaque #GstIpcPipelineSrc data structure.
- */
-struct _GstIpcPipelineSrc {
- GstElement element;
-
- GstIpcPipelineComm comm;
- GstPad *srcpad;
-
- gboolean flushing;
- GList *queued;
- GstFlowReturn last_ret;
-
- GCond create_cond;
-};
-
-struct _GstIpcPipelineSrcClass {
- GstElementClass parent_class;
-
- gboolean (*forward_message) (GstIpcPipelineSrc *, GstMessage *);
- void (*disconnect) (GstIpcPipelineSrc * src);
-};
-
-G_GNUC_INTERNAL GType gst_ipc_pipeline_src_get_type (void);
-
-G_END_DECLS
-
-#endif /* __GST_IPC_PIPELINE_SRC_H__ */
diff --git a/gst/ipcpipeline/gstipcslavepipeline.c b/gst/ipcpipeline/gstipcslavepipeline.c
deleted file mode 100644
index 2c9dd1516..000000000
--- a/gst/ipcpipeline/gstipcslavepipeline.c
+++ /dev/null
@@ -1,122 +0,0 @@
-/* GStreamer
- * Copyright (C) 2015-2017 YouView TV Ltd
- * Author: Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
- *
- * gstipcslavepipeline.c:
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
- * Boston, MA 02110-1301, USA.
- */
-/**
- * SECTION:element-ipcslavepipeline
- * @see_also: #GstIpcPipelineSink, #GstIpcPipelineSrc
- *
- * This is a GstPipeline subclass meant to embed one ore more ipcpipelinesrc
- * elements, and be slaved transparently to the master pipeline, using one ore
- * more ipcpipelinesink elements on the master.
- *
- * The actual pipeline slaving logic happens in ipcpipelinesrc. The only thing
- * that this class actually does is that it watches the pipeline bus for
- * messages and forwards them to the master pipeline through the ipcpipelinesrc
- * elements that it contains.
- *
- * For more details about this mechanism and its uses, see the documentation
- * of the ipcpipelinesink element.
- */
-
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-
-#include <string.h>
-
-#include "gstipcpipelinesrc.h"
-#include "gstipcslavepipeline.h"
-
-GST_DEBUG_CATEGORY_STATIC (gst_ipcslavepipeline_debug);
-#define GST_CAT_DEFAULT gst_ipcslavepipeline_debug
-
-#define _do_init \
- GST_DEBUG_CATEGORY_INIT (gst_ipcslavepipeline_debug, "ipcslavepipeline", 0, "ipcslavepipeline element");
-#define gst_ipc_slave_pipeline_parent_class parent_class
-G_DEFINE_TYPE_WITH_CODE (GstIpcSlavePipeline, gst_ipc_slave_pipeline,
- GST_TYPE_PIPELINE, _do_init);
-
-static gboolean gst_ipc_slave_pipeline_post_message (GstElement * element,
- GstMessage * message);
-
-static void
-gst_ipc_slave_pipeline_class_init (GstIpcSlavePipelineClass * klass)
-{
- GstElementClass *element_class;
-
- element_class = GST_ELEMENT_CLASS (klass);
-
- element_class->post_message = gst_ipc_slave_pipeline_post_message;
-
- gst_element_class_set_static_metadata (element_class,
- "Inter-process slave pipeline",
- "Generic/Bin/Slave",
- "Contains the slave part of an inter-process pipeline",
- "Vincent Penquerc'h <vincent.penquerch@collabora.co.uk");
-}
-
-static void
-gst_ipc_slave_pipeline_init (GstIpcSlavePipeline * isp)
-{
-}
-
-static gboolean
-send_message_if_ipcpipelinesrc (const GValue * v, GValue * r,
- gpointer user_data)
-{
- GstElement *e;
- GType et;
- gboolean ret;
- GstMessage *message = user_data;
-
- e = g_value_get_object (v);
- et = gst_element_factory_get_element_type (gst_element_get_factory (e));
- if (et == GST_TYPE_IPC_PIPELINE_SRC) {
- g_signal_emit_by_name (G_OBJECT (e), "forward-message", message, &ret);
-
- /* if we succesfully sent this to the master and it's not ASYNC_DONE or EOS,
- * we can skip sending it again through the other ipcpipelinesrcs */
- if (ret && GST_MESSAGE_TYPE (message) != GST_MESSAGE_ASYNC_DONE &&
- GST_MESSAGE_TYPE (message) != GST_MESSAGE_EOS)
- return FALSE;
- }
- return TRUE;
-}
-
-static void
-gst_ipc_slave_pipeline_forward_message (GstIpcSlavePipeline * pipeline,
- GstMessage * message)
-{
- GstIterator *it;
-
- it = gst_bin_iterate_sources (GST_BIN (pipeline));
- gst_iterator_fold (it, send_message_if_ipcpipelinesrc, NULL, message);
- gst_iterator_free (it);
-}
-
-static gboolean
-gst_ipc_slave_pipeline_post_message (GstElement * element, GstMessage * message)
-{
- gst_ipc_slave_pipeline_forward_message (GST_IPC_SLAVE_PIPELINE
- (element), message);
-
- return GST_ELEMENT_CLASS (parent_class)->post_message (element, message);
-}
diff --git a/gst/ipcpipeline/gstipcslavepipeline.h b/gst/ipcpipeline/gstipcslavepipeline.h
deleted file mode 100644
index 354dfacf0..000000000
--- a/gst/ipcpipeline/gstipcslavepipeline.h
+++ /dev/null
@@ -1,59 +0,0 @@
-/* GStreamer
- * Copyright (C) 2015-2017 YouView TV Ltd
- * Author: Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
- *
- * gstipcslavepipeline.h:
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
- * Boston, MA 02110-1301, USA.
- */
-#ifndef _GST_IPC_SLAVE_PIPELINE_H_
-#define _GST_IPC_SLAVE_PIPELINE_H_
-
-#include <gst/gst.h>
-
-G_BEGIN_DECLS
-
-#define GST_TYPE_IPC_SLAVE_PIPELINE \
- (gst_ipc_slave_pipeline_get_type())
-#define GST_IPC_SLAVE_PIPELINE(obj) \
- (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_IPC_SLAVE_PIPELINE,GstIpcSlavePipeline))
-#define GST_IPC_SLAVE_PIPELINE_CAST(obj) \
- ((GstIpcSlavePipeline *) obj)
-#define GST_IPC_SLAVE_PIPELINE_CLASS(klass) \
- (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_IPC_SLAVE_PIPELINE,GstIpcSlavePipelineClass))
-#define GST_IS_IPC_SLAVE_PIPELINE(obj) \
- (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_IPC_SLAVE_PIPELINE))
-#define GST_IS_IPC_SLAVE_PIPELINE_CLASS(obj) \
- (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_IPC_SLAVE_PIPELINE))
-
-typedef struct _GstIpcSlavePipeline GstIpcSlavePipeline;
-typedef struct _GstIpcSlavePipelineClass GstIpcSlavePipelineClass;
-
-struct _GstIpcSlavePipeline
-{
- GstPipeline pipeline;
-};
-
-struct _GstIpcSlavePipelineClass
-{
- GstPipelineClass pipeline_class;
-};
-
-G_GNUC_INTERNAL GType gst_ipc_slave_pipeline_get_type (void);
-
-G_END_DECLS
-
-#endif
diff --git a/gst/ipcpipeline/meson.build b/gst/ipcpipeline/meson.build
deleted file mode 100644
index 7ce3809f5..000000000
--- a/gst/ipcpipeline/meson.build
+++ /dev/null
@@ -1,16 +0,0 @@
-ipcpipeline_sources = [
- 'gstipcpipeline.c',
- 'gstipcpipelinecomm.c',
- 'gstipcpipelinesink.c',
- 'gstipcpipelinesrc.c',
- 'gstipcslavepipeline.c'
-]
-
-gstipcpipeline = library('gstipcpipeline',
- ipcpipeline_sources,
- c_args : gst_plugins_bad_args,
- include_directories : [configinc],
- dependencies : [gstbase_dep],
- install : true,
- install_dir : plugins_install_dir,
-)
diff --git a/gst/ipcpipeline/protocol.txt b/gst/ipcpipeline/protocol.txt
deleted file mode 100644
index 77f6e568e..000000000
--- a/gst/ipcpipeline/protocol.txt
+++ /dev/null
@@ -1,92 +0,0 @@
-This documents the protocol used to pass data over fds between ipcpipelinesrc
-and ipcpipelinesink.
-
-The protocol is used in both directions. However, some combinations do
-not make sense (eg, a buffer going from ipcpipelinesrc to ipcpipelinesink).
-
-The protocol consists of an arbitrary number of variable sized chunks
-with a type. Each chunk has a request ID which can be used to match a
-request with its reply (ack / query result).
-
-Each chunk consists of:
- - a type (byte):
- 1: ack
- 2: query result
- 3: buffer
- 4: event
- 5: sink message event
- 6: query
- 7: state change
- 8: state lost
- 9: message
- 10: error/warning/info message
- - a request ID, 4 bytes, little endian
- - the payload size, 4 bytes, little endian
- - N bytes payload
-
-Depending on the type, the payload can contain:
-
- - 1: ack
- result: 4 bytes, little endian
- interpreted as GstFlowReturn for buffers, boolean for events and
- GstStateChangeReturn for state changes
- - 2: query result
- result boolean: 1 byte
- query type: 4 bytes, little endian
- returned query string representation, NUL terminated
- - 3: buffer:
- pts: 8 bytes, little endian
- dts: 8 bytes, little endian
- duration: 8 bytes, little endian
- offset: 8 bytes, little endian
- offset end: 8 bytes, little endian
- flags: 8 bytes, little endian
- buffer size: 4 bytes, little endian
- data: contents of the buffer data, size specified in "buffer size"
- number of GstMeta: 4 bytes, little endian
- For each GstMeta:
- bytes: 4 bytes, little endian
- this is the number of bytes before the string representation
- at the end of this block, including the 4 bytes of itself
- flags: 4 bytes, little endian
- length of the GstMetaInfo::api name: 4 bytes, little endian
- GstMetaInfo::api name: string, NUL terminated
- GstMetaInfo::size: 8 bytes, little endian
- length of the string representation: 4 bytes, little endian
- string representation, NUL terminated
- - 4: event
- event type: 4 bytes, little endian
- sequence number: 4 bytes, little endian
- direction: 1 byte
- whether the event is going upstream (1) or downstream (0)
- string representation, NUL terminated
- - 5: sink message event
- message type: 4 bytes, little endian
- event sequence number: 4 bytes, little endian
- message sequence number: 4 bytes, little endian
- length: 4 bytes, little endian
- event structure name: length bytes, NUL terminated
- message structure string representation: remaining bytes, NUL terminated
- - 6: query
- query type: 4 bytes, little endian
- direction: 1 byte
- whether the query is going upstream (1) or downstream (0)
- string representation, NUL terminated
- - 7: state change
- GstStateChange: 4 bytes, little endian
- - 8: state lost
- no payload
- - 9: message
- message type: 4 bytes, little endian
- string representation, NUL terminated
- - 10: error/warning/info message
- message type (2 = error, 1 = warning, 0 = info): 1 byte
- error domain string length: 4 bytes, little endian
- string representation of the error domain, NUL terminated
- error code: 4 bytes, little endian
- length: 4 bytes, little endian
- if zero: no error message
- if non zero: As many bytes as this length: the error message, NUL terminated
- length: 4 bytes, little endian
- if zero: no extra message
- if non zero: As many bytes as this length: the error extra debug message, NUL terminated
diff --git a/gst/meson.build b/gst/meson.build
index 29872f224..1017adf30 100644
--- a/gst/meson.build
+++ b/gst/meson.build
@@ -27,7 +27,6 @@ subdir('geometrictransform')
subdir('id3tag')
subdir('inter')
subdir('interlace')
-subdir('ipcpipeline')
subdir('ivfparse')
subdir('ivtc')
subdir('jp2kdecimator')