diff options
author | George Kiagiadakis <george.kiagiadakis@collabora.com> | 2017-07-05 16:50:22 +0300 |
---|---|---|
committer | George Kiagiadakis <george.kiagiadakis@collabora.com> | 2017-08-01 14:42:53 +0300 |
commit | 3089d142b015d9c8fcf0c9b25ce0c6c705f4be7d (patch) | |
tree | a207b4cbeb3ef7815676fd633b0e81109f0c8fb0 | |
parent | b89c94b37eda4e9c093b159d6536bbef2ef30e27 (diff) | |
download | gstreamer-plugins-bad-3089d142b015d9c8fcf0c9b25ce0c6c705f4be7d.tar.gz |
ipcpipeline: introduce new plugin for inter-process pipelines
These elements allow splitting a pipeline across several processes,
with communication done by the ipcpipelinesink and ipcpipelinesrc
elements. The main use case is to split a playback pipeline into
a process that runs networking, parser & demuxer and another process
that runs the decoder & sink, for security reasons.
https://bugzilla.gnome.org/show_bug.cgi?id=752214
-rw-r--r-- | configure.ac | 2 | ||||
-rw-r--r-- | gst/ipcpipeline/Makefile.am | 28 | ||||
-rw-r--r-- | gst/ipcpipeline/gstipcpipeline.c | 48 | ||||
-rw-r--r-- | gst/ipcpipeline/gstipcpipelinecomm.c | 2341 | ||||
-rw-r--r-- | gst/ipcpipeline/gstipcpipelinecomm.h | 132 | ||||
-rw-r--r-- | gst/ipcpipeline/gstipcpipelinesink.c | 722 | ||||
-rw-r--r-- | gst/ipcpipeline/gstipcpipelinesink.h | 67 | ||||
-rw-r--r-- | gst/ipcpipeline/gstipcpipelinesrc.c | 954 | ||||
-rw-r--r-- | gst/ipcpipeline/gstipcpipelinesrc.h | 76 | ||||
-rw-r--r-- | gst/ipcpipeline/gstipcslavepipeline.c | 122 | ||||
-rw-r--r-- | gst/ipcpipeline/gstipcslavepipeline.h | 59 | ||||
-rw-r--r-- | gst/ipcpipeline/protocol.txt | 92 |
12 files changed, 4643 insertions, 0 deletions
diff --git a/configure.ac b/configure.ac index 5bd76a0dd..42d54c9d8 100644 --- a/configure.ac +++ b/configure.ac @@ -469,6 +469,7 @@ AG_GST_CHECK_PLUGIN(gdp) AG_GST_CHECK_PLUGIN(id3tag) AG_GST_CHECK_PLUGIN(inter) AG_GST_CHECK_PLUGIN(interlace) +AG_GST_CHECK_PLUGIN(ipcpipeline) AG_GST_CHECK_PLUGIN(ivfparse) AG_GST_CHECK_PLUGIN(ivtc) AG_GST_CHECK_PLUGIN(jp2kdecimator) @@ -3587,6 +3588,7 @@ gst/gdp/Makefile gst/id3tag/Makefile gst/inter/Makefile gst/interlace/Makefile +gst/ipcpipeline/Makefile gst/ivfparse/Makefile gst/ivtc/Makefile gst/jp2kdecimator/Makefile diff --git a/gst/ipcpipeline/Makefile.am b/gst/ipcpipeline/Makefile.am new file mode 100644 index 000000000..12bbed14b --- /dev/null +++ b/gst/ipcpipeline/Makefile.am @@ -0,0 +1,28 @@ +plugin_LTLIBRARIES = libgstipcpipeline.la + +libgstipcpipeline_la_SOURCES = \ + gstipcpipeline.c \ + gstipcpipelinecomm.c \ + gstipcpipelinesink.c \ + gstipcpipelinesrc.c \ + gstipcslavepipeline.c + +noinst_HEADERS = \ + gstipcpipelinecomm.h \ + gstipcpipelinesink.h \ + gstipcpipelinesrc.h \ + gstipcslavepipeline.h + +libgstipcpipeline_la_CFLAGS = \ + $(GST_PLUGINS_BAD_CFLAGS) \ + $(GST_PLUGINS_BASE_CFLAGS) \ + $(GST_BASE_CFLAGS) \ + $(GST_CFLAGS) + +libgstipcpipeline_la_LIBADD = \ + $(GST_PLUGINS_BASE_LIBS) \ + $(GST_BASE_LIBS) \ + $(GST_LIBS) \ + $(LIBM) + +libgstipcpipeline_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) diff --git a/gst/ipcpipeline/gstipcpipeline.c b/gst/ipcpipeline/gstipcpipeline.c new file mode 100644 index 000000000..4d647d49d --- /dev/null +++ b/gst/ipcpipeline/gstipcpipeline.c @@ -0,0 +1,48 @@ +/* GStreamer + * Copyright (C) 2017 YouView TV Ltd + * Author: George Kiagiadakis <george.Kiagiadakis@collabora.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin Street, Suite 500, + * Boston, MA 02110-1335, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstipcpipelinecomm.h" +#include "gstipcpipelinesink.h" +#include "gstipcpipelinesrc.h" +#include "gstipcslavepipeline.h" + +static gboolean +plugin_init (GstPlugin * plugin) +{ + gst_ipc_pipeline_comm_plugin_init (); + gst_element_register (plugin, "ipcpipelinesrc", GST_RANK_NONE, + GST_TYPE_IPC_PIPELINE_SRC); + gst_element_register (plugin, "ipcpipelinesink", GST_RANK_NONE, + GST_TYPE_IPC_PIPELINE_SINK); + gst_element_register (plugin, "ipcslavepipeline", GST_RANK_NONE, + GST_TYPE_IPC_SLAVE_PIPELINE); + + return TRUE; +} + +GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, + GST_VERSION_MINOR, + ipcpipeline, + "plugin for inter-process pipeline communication", + plugin_init, VERSION, "LGPL", PACKAGE_NAME, GST_PACKAGE_ORIGIN) diff --git a/gst/ipcpipeline/gstipcpipelinecomm.c b/gst/ipcpipeline/gstipcpipelinecomm.c new file mode 100644 index 000000000..eee8e528f --- /dev/null +++ b/gst/ipcpipeline/gstipcpipelinecomm.c @@ -0,0 +1,2341 @@ +/* GStreamer + * Copyright (C) 2015-2017 YouView TV Ltd + * Author: Vincent Penquerch <vincent.penquerch@collabora.co.uk> + * + * gstipcpipelinecomm.c: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include <unistd.h> +#include <errno.h> +#include <string.h> +#include <stdlib.h> +#include <gst/base/gstbytewriter.h> +#include <gst/gstprotection.h> +#include "gstipcpipelinecomm.h" + +GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_comm_debug); +#define GST_CAT_DEFAULT gst_ipc_pipeline_comm_debug + +#define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND) + +GQuark QUARK_ID; + +typedef enum +{ + ACK_TYPE_NONE, + ACK_TYPE_TIMED, + ACK_TYPE_BLOCKING +} AckType; + +typedef enum +{ + COMM_REQUEST_TYPE_BUFFER, + COMM_REQUEST_TYPE_EVENT, + COMM_REQUEST_TYPE_QUERY, + COMM_REQUEST_TYPE_STATE_CHANGE, + COMM_REQUEST_TYPE_MESSAGE, +} CommRequestType; + +typedef struct +{ + guint32 id; + gboolean replied; + gboolean comm_error; + guint32 ret; + GstQuery *query; + CommRequestType type; + GCond cond; +} CommRequest; + +static const gchar *comm_request_ret_get_name (CommRequestType type, + guint32 ret); +static guint32 comm_request_ret_get_failure_value (CommRequestType type); + +static CommRequest * +comm_request_new (guint32 id, CommRequestType type, GstQuery * query) +{ + CommRequest *req; + + req = g_malloc (sizeof (CommRequest)); + req->id = id; + g_cond_init (&req->cond); + req->replied = FALSE; + req->comm_error = FALSE; + req->query = query; + req->ret = comm_request_ret_get_failure_value (type); + req->type = type; + + return req; +} + +static guint32 +comm_request_wait (GstIpcPipelineComm * comm, CommRequest * req, + AckType ack_type) +{ + guint32 ret = comm_request_ret_get_failure_value (req->type); + guint64 end_time; + + if (ack_type == ACK_TYPE_TIMED) + end_time = g_get_monotonic_time () + comm->ack_time; + else + end_time = G_MAXUINT64; + + GST_TRACE_OBJECT (comm->element, "Waiting for ACK/NAK for request %u", + req->id); + while (!req->replied) { + if (ack_type == ACK_TYPE_TIMED) { + g_cond_wait_until (&req->cond, &comm->mutex, end_time); + if (g_get_monotonic_time () >= end_time) + break; + } else + g_cond_wait (&req->cond, &comm->mutex); + } + + if (req->replied) { + ret = req->ret; + GST_TRACE_OBJECT (comm->element, "Got reply for request %u: %d (%s)", + req->id, ret, comm_request_ret_get_name (req->type, ret)); + } else { + req->comm_error = TRUE; + GST_ERROR_OBJECT (comm->element, "Timeout waiting for reply for request %u", + req->id); + } + + return ret; +} + +static void +comm_request_free (CommRequest * req) +{ + g_cond_clear (&req->cond); + g_free (req); +} + +static const gchar * +comm_request_ret_get_name (CommRequestType type, guint32 ret) +{ + switch (type) { + case COMM_REQUEST_TYPE_BUFFER: + return gst_flow_get_name (ret); + case COMM_REQUEST_TYPE_EVENT: + case COMM_REQUEST_TYPE_QUERY: + case COMM_REQUEST_TYPE_MESSAGE: + return ret ? "TRUE" : "FALSE"; + case COMM_REQUEST_TYPE_STATE_CHANGE: + return gst_element_state_change_return_get_name (ret); + default: + g_assert_not_reached (); + } +} + +static guint32 +comm_request_ret_get_failure_value (CommRequestType type) +{ + switch (type) { + case COMM_REQUEST_TYPE_BUFFER: + return GST_FLOW_COMM_ERROR; + case COMM_REQUEST_TYPE_EVENT: + case COMM_REQUEST_TYPE_MESSAGE: + case COMM_REQUEST_TYPE_QUERY: + return FALSE; + case COMM_REQUEST_TYPE_STATE_CHANGE: + return GST_STATE_CHANGE_FAILURE; + default: + g_assert_not_reached (); + } +} + +static const gchar * +gst_ipc_pipeline_comm_data_type_get_name (GstIpcPipelineCommDataType type) +{ + switch (type) { + case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK: + return "ACK"; + case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT: + return "QUERY_RESULT"; + case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER: + return "BUFFER"; + case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT: + return "EVENT"; + case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT: + return "SINK_MESSAGE_EVENT"; + case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY: + return "QUERY"; + case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE: + return "STATE_CHANGE"; + case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST: + return "STATE_LOST"; + case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE: + return "MESSAGE"; + case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE: + return "GERROR_MESSAGE"; + default: + return "UNKNOWN"; + } +} + +static gboolean +gst_ipc_pipeline_comm_sync_fd (GstIpcPipelineComm * comm, guint32 id, + GstQuery * query, guint32 * ret, AckType ack_type, CommRequestType type) +{ + CommRequest *req; + gboolean comm_error; + GHashTable *waiting_ids; + + if (ack_type == ACK_TYPE_NONE) + return TRUE; + + req = comm_request_new (id, type, query); + waiting_ids = g_hash_table_ref (comm->waiting_ids); + g_hash_table_insert (waiting_ids, GINT_TO_POINTER (id), req); + *ret = comm_request_wait (comm, req, ack_type); + comm_error = req->comm_error; + g_hash_table_remove (waiting_ids, GINT_TO_POINTER (id)); + g_hash_table_unref (waiting_ids); + return !comm_error; +} + +static gboolean +write_to_fd_raw (GstIpcPipelineComm * comm, const void *data, size_t size) +{ + size_t offset; + gboolean ret = TRUE; + + offset = 0; + GST_TRACE_OBJECT (comm->element, "Writing %zu bytes to fdout", size); + while (size) { + ssize_t written = + write (comm->fdout, (const unsigned char *) data + offset, size); + if (written < 0) { + if (errno == EAGAIN || errno == EINTR) + continue; + GST_ERROR_OBJECT (comm->element, "Failed to write to fd: %s", + strerror (errno)); + ret = FALSE; + goto done; + } + size -= written; + offset += written; + } + +done: + return ret; +} + +static gboolean +write_byte_writer_to_fd (GstIpcPipelineComm * comm, GstByteWriter * bw) +{ + guint8 *data; + gboolean ret; + guint size; + + size = gst_byte_writer_get_size (bw); + data = gst_byte_writer_reset_and_get_data (bw); + if (!data) + return FALSE; + ret = write_to_fd_raw (comm, data, size); + g_free (data); + return ret; +} + +static void +gst_ipc_pipeline_comm_write_ack_to_fd (GstIpcPipelineComm * comm, guint32 id, + guint32 ret, CommRequestType type) +{ + const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK; + guint32 size; + GstByteWriter bw; + + g_mutex_lock (&comm->mutex); + + GST_TRACE_OBJECT (comm->element, "Writing ACK for %u: %s (%d)", id, + comm_request_ret_get_name (type, ret), ret); + gst_byte_writer_init (&bw); + if (!gst_byte_writer_put_uint8 (&bw, payload_type)) + goto write_failed; + if (!gst_byte_writer_put_uint32_le (&bw, id)) + goto write_failed; + size = sizeof (ret); + if (!gst_byte_writer_put_uint32_le (&bw, size)) + goto write_failed; + if (!gst_byte_writer_put_uint32_le (&bw, ret)) + goto write_failed; + + if (!write_byte_writer_to_fd (comm, &bw)) + goto write_failed; + +done: + g_mutex_unlock (&comm->mutex); + gst_byte_writer_reset (&bw); + return; + +write_failed: + GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), + ("Failed to write to socket")); + goto done; +} + +void +gst_ipc_pipeline_comm_write_flow_ack_to_fd (GstIpcPipelineComm * comm, + guint32 id, GstFlowReturn ret) +{ + gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret, + COMM_REQUEST_TYPE_BUFFER); +} + +void +gst_ipc_pipeline_comm_write_boolean_ack_to_fd (GstIpcPipelineComm * comm, + guint32 id, gboolean ret) +{ + gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret, + COMM_REQUEST_TYPE_EVENT); +} + +void +gst_ipc_pipeline_comm_write_state_change_ack_to_fd (GstIpcPipelineComm * comm, + guint32 id, GstStateChangeReturn ret) +{ + gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret, + COMM_REQUEST_TYPE_STATE_CHANGE); +} + +void +gst_ipc_pipeline_comm_write_query_result_to_fd (GstIpcPipelineComm * comm, + guint32 id, gboolean result, GstQuery * query) +{ + const unsigned char payload_type = + GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT; + guint8 result8 = result; + guint32 size; + size_t len; + char *str = NULL; + guint32 type; + const GstStructure *structure; + GstByteWriter bw; + + g_mutex_lock (&comm->mutex); + + GST_TRACE_OBJECT (comm->element, + "Writing query result for %u: %d, %" GST_PTR_FORMAT, id, result, query); + gst_byte_writer_init (&bw); + if (!gst_byte_writer_put_uint8 (&bw, payload_type)) + goto write_failed; + if (!gst_byte_writer_put_uint32_le (&bw, id)) + goto write_failed; + structure = gst_query_get_structure (query); + if (structure) { + str = gst_structure_to_string (structure); + len = strlen (str); + } else { + str = NULL; + len = 0; + } + size = 1 + sizeof (guint32) + len + 1; + if (!gst_byte_writer_put_uint32_le (&bw, size)) + goto write_failed; + if (!gst_byte_writer_put_uint8 (&bw, result8)) + goto write_failed; + type = GST_QUERY_TYPE (query); + if (!gst_byte_writer_put_uint32_le (&bw, type)) + goto write_failed; + if (str) { + if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, len + 1)) + goto write_failed; + } else { + if (!gst_byte_writer_put_uint8 (&bw, 0)) + goto write_failed; + } + + if (!write_byte_writer_to_fd (comm, &bw)) + goto write_failed; + +done: + g_mutex_unlock (&comm->mutex); + gst_byte_writer_reset (&bw); + g_free (str); + return; + +write_failed: + GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), + ("Failed to write to socket")); + goto done; +} + +static gboolean +gst_ipc_pipeline_comm_read_query_result (GstIpcPipelineComm * comm, + guint32 size, GstQuery ** query) +{ + gchar *end = NULL; + GstStructure *structure; + guint8 result; + guint32 type; + const guint8 *payload = NULL; + guint32 mapped_size = size; + + /* this should not be called if we don't have enough yet */ + *query = NULL; + g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE); + g_return_val_if_fail (size >= 1 + sizeof (guint32), FALSE); + + payload = gst_adapter_map (comm->adapter, mapped_size); + if (!payload) + return FALSE; + result = *payload++; + memcpy (&type, payload, sizeof (type)); + payload += sizeof (type); + + size -= 1 + sizeof (guint32); + if (size == 0) + goto done; + + if (payload[size - 1]) { + result = FALSE; + goto done; + } + if (*payload) { + structure = gst_structure_from_string ((const char *) payload, &end); + } else { + structure = NULL; + } + if (!structure) { + result = FALSE; + goto done; + } + + *query = gst_query_new_custom (type, structure); + +done: + gst_adapter_unmap (comm->adapter); + gst_adapter_flush (comm->adapter, mapped_size); + return result; +} + +typedef struct +{ + guint32 bytes; + + guint64 size; + guint32 flags; + guint64 api; + char *str; +} MetaBuildInfo; + +typedef struct +{ + GstIpcPipelineComm *comm; + guint32 n_meta; + guint32 total_bytes; + MetaBuildInfo *info; +} MetaListRepresentation; + +static gboolean +build_meta (GstBuffer * buffer, GstMeta ** meta, gpointer user_data) +{ + MetaListRepresentation *repr = user_data; + + repr->n_meta++; + repr->info = g_realloc (repr->info, repr->n_meta * sizeof (MetaBuildInfo)); + repr->info[repr->n_meta - 1].bytes = + /* 4 byte bytes */ + 4 + /* 4 byte GstMetaFlags */ + + 4 + /* GstMetaInfo::api */ + + 4 + strlen (g_type_name ((*meta)->info->api)) + 1 + /* GstMetaInfo::size */ + + 8 + /* str length */ + + 4; + + repr->info[repr->n_meta - 1].flags = (*meta)->flags; + repr->info[repr->n_meta - 1].api = (*meta)->info->api; + repr->info[repr->n_meta - 1].size = (*meta)->info->size; + repr->info[repr->n_meta - 1].str = NULL; + + /* GstMeta is a base class, and actual useful classes are all different... + So we list a few of them we know we want and ignore the open ended rest */ + if ((*meta)->info->api == GST_PROTECTION_META_API_TYPE) { + GstProtectionMeta *m = (GstProtectionMeta *) * meta; + repr->info[repr->n_meta - 1].str = gst_structure_to_string (m->info); + repr->info[repr->n_meta - 1].bytes += + strlen (repr->info[repr->n_meta - 1].str) + 1; + GST_TRACE_OBJECT (repr->comm->element, "Found GstMeta type %s: %s", + g_type_name ((*meta)->info->api), repr->info[repr->n_meta - 1].str); + } else { + GST_WARNING_OBJECT (repr->comm->element, "Ignoring GstMeta type %s", + g_type_name ((*meta)->info->api)); + } + repr->total_bytes += repr->info[repr->n_meta - 1].bytes; + return TRUE; +} + +typedef struct +{ + guint64 pts; + guint64 dts; + guint64 duration; + guint64 offset; + guint64 offset_end; + guint64 flags; +} CommBufferMetadata; + +GstFlowReturn +gst_ipc_pipeline_comm_write_buffer_to_fd (GstIpcPipelineComm * comm, + GstBuffer * buffer) +{ + const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER; + GstMapInfo map; + guint32 ret32 = GST_FLOW_OK; + guint32 size, n; + CommBufferMetadata meta; + GstFlowReturn ret; + MetaListRepresentation repr = { comm, 0, 4, NULL }; /* starts a 4 for n_meta */ + GstByteWriter bw; + + g_mutex_lock (&comm->mutex); + ++comm->send_id; + + GST_TRACE_OBJECT (comm->element, "Writing buffer %u: %" GST_PTR_FORMAT, + comm->send_id, buffer); + + gst_byte_writer_init (&bw); + + meta.pts = GST_BUFFER_PTS (buffer); + meta.dts = GST_BUFFER_DTS (buffer); + meta.duration = GST_BUFFER_DURATION (buffer); + meta.offset = GST_BUFFER_OFFSET (buffer); + meta.offset_end = GST_BUFFER_OFFSET_END (buffer); + meta.flags = GST_BUFFER_FLAGS (buffer); + + /* work out meta size */ + gst_buffer_foreach_meta (buffer, build_meta, &repr); + + if (!gst_byte_writer_put_uint8 (&bw, payload_type)) + goto write_failed; + if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) + goto write_failed; + size = + gst_buffer_get_size (buffer) + sizeof (guint32) + + sizeof (CommBufferMetadata) + repr.total_bytes; + if (!gst_byte_writer_put_uint32_le (&bw, size)) + goto write_failed; + if (!gst_byte_writer_put_data (&bw, (const guint8 *) &meta, sizeof (meta))) + goto write_failed; + size = gst_buffer_get_size (buffer); + if (!gst_byte_writer_put_uint32_le (&bw, size)) + goto write_failed; + if (!write_byte_writer_to_fd (comm, &bw)) + goto write_failed; + + if (!gst_buffer_map (buffer, &map, GST_MAP_READ)) + goto map_failed; + ret = write_to_fd_raw (comm, map.data, map.size); + gst_buffer_unmap (buffer, &map); + if (!ret) + goto write_failed; + + /* meta */ + gst_byte_writer_init (&bw); + if (!gst_byte_writer_put_uint32_le (&bw, repr.n_meta)) + goto write_failed; + for (n = 0; n < repr.n_meta; ++n) { + const MetaBuildInfo *info = repr.info + n; + guint32 len; + const char *s; + + if (!gst_byte_writer_put_uint32_le (&bw, info->bytes)) + goto write_failed; + + if (!gst_byte_writer_put_uint32_le (&bw, info->flags)) + goto write_failed; + + s = g_type_name (info->api); + len = strlen (s) + 1; + if (!gst_byte_writer_put_uint32_le (&bw, len)) + goto write_failed; + if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len)) + goto write_failed; + + if (!gst_byte_writer_put_uint64_le (&bw, info->size)) + goto write_failed; + + s = info->str; + len = s ? (strlen (s) + 1) : 0; + if (!gst_byte_writer_put_uint32_le (&bw, len)) + goto write_failed; + if (len) + if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len)) + goto write_failed; + } + + if (!write_byte_writer_to_fd (comm, &bw)) + goto write_failed; + + if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32, + ACK_TYPE_BLOCKING, COMM_REQUEST_TYPE_BUFFER)) + goto wait_failed; + ret = ret32; + +done: + g_mutex_unlock (&comm->mutex); + gst_byte_writer_reset (&bw); + for (n = 0; n < repr.n_meta; ++n) + g_free (repr.info[n].str); + g_free (repr.info); + return ret; + +write_failed: + GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), + ("Failed to write to socket")); + ret = GST_FLOW_COMM_ERROR; + goto done; + +wait_failed: + GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), + ("Failed to wait for reply on socket")); + ret = GST_FLOW_COMM_ERROR; + goto done; + +map_failed: + GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL), + ("Failed to map buffer")); + ret = GST_FLOW_ERROR; + goto done; +} + +static GstBuffer * +gst_ipc_pipeline_comm_read_buffer (GstIpcPipelineComm * comm, guint32 size) +{ + GstBuffer *buffer; + CommBufferMetadata meta; + guint32 n_meta, n; + const guint8 *payload = NULL; + guint32 mapped_size, buffer_data_size; + + /* this should not be called if we don't have enough yet */ + g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL); + g_return_val_if_fail (size >= sizeof (CommBufferMetadata), NULL); + + mapped_size = sizeof (CommBufferMetadata) + sizeof (buffer_data_size); + payload = gst_adapter_map (comm->adapter, mapped_size); + if (!payload) + return NULL; + memcpy (&meta, payload, sizeof (CommBufferMetadata)); + payload += sizeof (CommBufferMetadata); + memcpy (&buffer_data_size, payload, sizeof (buffer_data_size)); + size -= mapped_size; + gst_adapter_unmap (comm->adapter); + gst_adapter_flush (comm->adapter, mapped_size); + + if (buffer_data_size == 0) { + buffer = gst_buffer_new (); + } else { + buffer = gst_adapter_get_buffer (comm->adapter, buffer_data_size); + gst_adapter_flush (comm->adapter, buffer_data_size); + } + size -= buffer_data_size; + + GST_BUFFER_PTS (buffer) = meta.pts; + GST_BUFFER_DTS (buffer) = meta.dts; + GST_BUFFER_DURATION (buffer) = meta.duration; + GST_BUFFER_OFFSET (buffer) = meta.offset; + GST_BUFFER_OFFSET_END (buffer) = meta.offset_end; + GST_BUFFER_FLAGS (buffer) = meta.flags; + + /* If you don't call that, the GType isn't yet known at the + g_type_from_name below */ + gst_protection_meta_get_info (); + + mapped_size = size; + payload = gst_adapter_map (comm->adapter, mapped_size); + if (!payload) { + gst_buffer_unref (buffer); + return NULL; + } + memcpy (&n_meta, payload, sizeof (n_meta)); + payload += sizeof (n_meta); + + for (n = 0; n < n_meta; ++n) { + guint32 flags, len, bytes; + guint64 msize; + GType api; + GstMeta *meta; + GstStructure *structure = NULL; + + memcpy (&bytes, payload, sizeof (bytes)); + payload += sizeof (bytes); + +#define READ_FIELD(f) do { \ + memcpy (&f, payload, sizeof (f)); \ + payload += sizeof(f); \ + } while(0) + + READ_FIELD (flags); + READ_FIELD (len); + api = g_type_from_name ((const char *) payload); + payload = (const guint8 *) strchr ((const char *) payload, 0) + 1; + READ_FIELD (msize); + READ_FIELD (len); + if (len) { + structure = gst_structure_new_from_string ((const char *) payload); + payload += len + 1; + } + + /* Seems we can add a meta from the api nor type ? */ + if (api == GST_PROTECTION_META_API_TYPE) { + meta = + gst_buffer_add_meta (buffer, gst_protection_meta_get_info (), NULL); + ((GstProtectionMeta *) meta)->info = structure; + } else { + GST_WARNING_OBJECT (comm->element, "Unsupported meta: %s", + g_type_name (api)); + } + +#undef READ_FIELD + + } + + gst_adapter_unmap (comm->adapter); + gst_adapter_flush (comm->adapter, mapped_size); + + return buffer; +} + +static gboolean +gst_ipc_pipeline_comm_write_sink_message_event_to_fd (GstIpcPipelineComm * comm, + GstEvent * event) +{ + const unsigned char payload_type = + GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT; + gboolean ret; + guint32 type, size, eseqnum, mseqnum, ret32 = TRUE, slen, structure_slen; + char *str = NULL; + const GstStructure *structure; + GstMessage *message = NULL; + const char *name; + GstByteWriter bw; + + g_return_val_if_fail (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE, + FALSE); + + g_mutex_lock (&comm->mutex); + ++comm->send_id; + + GST_TRACE_OBJECT (comm->element, + "Writing sink message event %u: %" GST_PTR_FORMAT, comm->send_id, event); + + gst_byte_writer_init (&bw); + if (!gst_byte_writer_put_uint8 (&bw, payload_type)) + goto write_failed; + if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) + goto write_failed; + name = gst_structure_get_name (gst_event_get_structure (event)); + slen = strlen (name) + 1; + gst_event_parse_sink_message (event, &message); + structure = gst_message_get_structure (message); + if (structure) { + str = gst_structure_to_string (structure); + structure_slen = strlen (str); + } else { + str = NULL; + structure_slen = 0; + } + size = sizeof (type) + sizeof (eseqnum) + sizeof (mseqnum) + sizeof (slen) + + strlen (name) + 1 + structure_slen + 1; + if (!gst_byte_writer_put_uint32_le (&bw, size)) + goto write_failed; + + type = GST_MESSAGE_TYPE (message); + if (!gst_byte_writer_put_uint32_le (&bw, type)) + goto write_failed; + size -= sizeof (type); + + eseqnum = GST_EVENT_SEQNUM (event); + if (!gst_byte_writer_put_uint32_le (&bw, eseqnum)) + goto write_failed; + size -= sizeof (eseqnum); + + mseqnum = GST_MESSAGE_SEQNUM (message); + if (!gst_byte_writer_put_uint32_le (&bw, mseqnum)) + goto write_failed; + size -= sizeof (mseqnum); + + if (!gst_byte_writer_put_uint32_le (&bw, slen)) + goto write_failed; + size -= sizeof (slen); + + if (!gst_byte_writer_put_data (&bw, (const guint8 *) name, slen)) + goto write_failed; + size -= slen; + + if (str) { + if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size)) + goto write_failed; + } else { + if (!gst_byte_writer_put_uint8 (&bw, 0)) + goto write_failed; + } + + if (!write_byte_writer_to_fd (comm, &bw)) + goto write_failed; + + if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32, + GST_EVENT_IS_SERIALIZED (event) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED, + COMM_REQUEST_TYPE_EVENT)) + goto write_failed; + + ret = ret32; + +done: + g_mutex_unlock (&comm->mutex); + gst_byte_writer_reset (&bw); + g_free (str); + if (message) + gst_message_unref (message); + return ret; + +write_failed: + GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), + ("Failed to write to socket")); + ret = FALSE; + goto done; +} + +static GstEvent * +gst_ipc_pipeline_comm_read_sink_message_event (GstIpcPipelineComm * comm, + guint32 size) +{ + GstMessage *message; + GstEvent *event = NULL; + gchar *end = NULL; + GstStructure *structure; + guint32 type, eseqnum, mseqnum, slen; + const char *name; + guint32 mapped_size = size; + const guint8 *payload; + + /* this should not be called if we don't have enough yet */ + g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL); + g_return_val_if_fail (size >= sizeof (type) + sizeof (slen), NULL); + + payload = gst_adapter_map (comm->adapter, mapped_size); + if (!payload) + return NULL; + memcpy (&type, payload, sizeof (type)); + payload += sizeof (type); + size -= sizeof (type); + if (size == 0) + goto done; + + memcpy (&eseqnum, payload, sizeof (eseqnum)); + payload += sizeof (eseqnum); + size -= sizeof (eseqnum); + if (size == 0) + goto done; + + memcpy (&mseqnum, payload, sizeof (mseqnum)); + payload += sizeof (mseqnum); + size -= sizeof (mseqnum); + if (size == 0) + goto done; + + memcpy (&slen, payload, sizeof (slen)); + payload += sizeof (slen); + size -= sizeof (slen); + if (size == 0) + goto done; + + if (payload[slen - 1]) + goto done; + name = (const char *) payload; + payload += slen; + size -= slen; + + if ((payload)[size - 1]) { + goto done; + } + if (*payload) { + structure = gst_structure_from_string ((const char *) payload, &end); + } else { + structure = NULL; + } + + message = + gst_message_new_custom (type, GST_OBJECT (comm->element), structure); + gst_message_set_seqnum (message, mseqnum); + event = gst_event_new_sink_message (name, message); + gst_event_set_seqnum (event, eseqnum); + gst_message_unref (message); + +done: + gst_adapter_unmap (comm->adapter); + gst_adapter_flush (comm->adapter, mapped_size); + return event; +} + +gboolean +gst_ipc_pipeline_comm_write_event_to_fd (GstIpcPipelineComm * comm, + gboolean upstream, GstEvent * event) +{ + const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT; + gboolean ret; + guint32 type, size, ret32 = TRUE, seqnum, slen; + char *str = NULL; + const GstStructure *structure; + GstByteWriter bw; + + /* we special case sink-message event as gst can't serialize/de-serialize it */ + if (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE) + return gst_ipc_pipeline_comm_write_sink_message_event_to_fd (comm, event); + + g_mutex_lock (&comm->mutex); + ++comm->send_id; + + GST_TRACE_OBJECT (comm->element, "Writing event %u: %" GST_PTR_FORMAT, + comm->send_id, event); + + gst_byte_writer_init (&bw); + if (!gst_byte_writer_put_uint8 (&bw, payload_type)) + goto write_failed; + if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) + goto write_failed; + structure = gst_event_get_structure (event); + if (structure) { + + if (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START) { + GstStructure *s = gst_structure_copy (structure); + gst_structure_remove_field (s, "stream"); + str = gst_structure_to_string (s); + gst_structure_free (s); + } else { + str = gst_structure_to_string (structure); + } + + slen = strlen (str); + } else { + str = NULL; + slen = 0; + } + size = sizeof (type) + sizeof (seqnum) + 1 + slen + 1; + if (!gst_byte_writer_put_uint32_le (&bw, size)) + goto write_failed; + + type = GST_EVENT_TYPE (event); + if (!gst_byte_writer_put_uint32_le (&bw, type)) + goto write_failed; + + seqnum = GST_EVENT_SEQNUM (event); + if (!gst_byte_writer_put_uint32_le (&bw, seqnum)) + goto write_failed; + + if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0)) + goto write_failed; + + if (str) { + if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1)) + goto write_failed; + } else { + if (!gst_byte_writer_put_uint8 (&bw, 0)) + goto write_failed; + } + + if (!write_byte_writer_to_fd (comm, &bw)) + goto write_failed; + + /* Upstream events get serialized, this is required to send seeks only + * one at a time. */ + if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32, + (GST_EVENT_IS_SERIALIZED (event) || GST_EVENT_IS_UPSTREAM (event)) ? + ACK_TYPE_BLOCKING : ACK_TYPE_NONE, COMM_REQUEST_TYPE_EVENT)) + goto write_failed; + ret = ret32; + +done: + g_mutex_unlock (&comm->mutex); + g_free (str); + gst_byte_writer_reset (&bw); + return ret; + +write_failed: + GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), + ("Failed to write to socket")); + ret = FALSE; + goto done; +} + +static GstEvent * +gst_ipc_pipeline_comm_read_event (GstIpcPipelineComm * comm, guint32 size, + gboolean * upstream) +{ + GstEvent *event = NULL; + gchar *end = NULL; + GstStructure *structure; + guint32 type, seqnum; + guint32 mapped_size = size; + const guint8 *payload; + + /* this should not be called if we don't have enough yet */ + g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL); + g_return_val_if_fail (size >= sizeof (type), NULL); + + payload = gst_adapter_map (comm->adapter, mapped_size); + if (!payload) + return NULL; + + memcpy (&type, payload, sizeof (type)); + payload += sizeof (type); + size -= sizeof (type); + if (size == 0) + goto done; + + memcpy (&seqnum, payload, sizeof (seqnum)); + payload += sizeof (seqnum); + size -= sizeof (seqnum); + if (size == 0) + goto done; + + *upstream = (*payload) ? TRUE : FALSE; + payload += 1; + size -= 1; + if (size == 0) + goto done; + + if (payload[size - 1]) + goto done; + if (*payload) { + structure = gst_structure_from_string ((const char *) payload, &end); + } else { + structure = NULL; + } + + event = gst_event_new_custom (type, structure); + gst_event_set_seqnum (event, seqnum); + +done: + gst_adapter_unmap (comm->adapter); + gst_adapter_flush (comm->adapter, mapped_size); + return event; +} + +gboolean +gst_ipc_pipeline_comm_write_query_to_fd (GstIpcPipelineComm * comm, + gboolean upstream, GstQuery * query) +{ + const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY; + gboolean ret; + guint32 type, size, ret32 = TRUE, slen; + char *str = NULL; + const GstStructure *structure; + GstByteWriter bw; + + g_mutex_lock (&comm->mutex); + ++comm->send_id; + + GST_TRACE_OBJECT (comm->element, "Writing query %u: %" GST_PTR_FORMAT, + comm->send_id, query); + + gst_byte_writer_init (&bw); + if (!gst_byte_writer_put_uint8 (&bw, payload_type)) + goto write_failed; + if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) + goto write_failed; + structure = gst_query_get_structure (query); + if (structure) { + str = gst_structure_to_string (structure); + slen = strlen (str); + } else { + str = NULL; + slen = 0; + } + size = sizeof (type) + 1 + slen + 1; + if (!gst_byte_writer_put_uint32_le (&bw, size)) + goto write_failed; + + type = GST_QUERY_TYPE (query); + if (!gst_byte_writer_put_uint32_le (&bw, type)) + goto write_failed; + + if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0)) + goto write_failed; + + if (str) { + if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1)) + goto write_failed; + } else { + if (!gst_byte_writer_put_uint8 (&bw, 0)) + goto write_failed; + } + + if (!write_byte_writer_to_fd (comm, &bw)) + goto write_failed; + + if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, query, &ret32, + GST_QUERY_IS_SERIALIZED (query) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED, + COMM_REQUEST_TYPE_QUERY)) + goto write_failed; + + ret = ret32; + +done: + g_mutex_unlock (&comm->mutex); + g_free (str); + gst_byte_writer_reset (&bw); + return ret; + +write_failed: + GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), + ("Failed to write to socket")); + ret = FALSE; + goto done; +} + +static GstQuery * +gst_ipc_pipeline_comm_read_query (GstIpcPipelineComm * comm, guint32 size, + gboolean * upstream) +{ + GstQuery *query = NULL; + gchar *end = NULL; + GstStructure *structure; + guint32 type; + guint32 mapped_size = size; + const guint8 *payload; + + /* this should not be called if we don't have enough yet */ + g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL); + g_return_val_if_fail (size >= sizeof (type), NULL); + + payload = gst_adapter_map (comm->adapter, mapped_size); + if (!payload) + return NULL; + + memcpy (&type, payload, sizeof (type)); + payload += sizeof (type); + size -= sizeof (type); + if (size == 0) + goto done; + + *upstream = (*payload) ? TRUE : FALSE; + payload += 1; + size -= 1; + if (size == 0) + goto done; + + if (payload[size - 1]) + goto done; + if (*payload) { + structure = gst_structure_from_string ((const char *) payload, &end); + } else { + structure = NULL; + } + + query = gst_query_new_custom (type, structure); + + /* CAPS queries contain a filter field, of GstCaps type, which can be NULL. + This does not play well with the serialization/deserialization system, + which will give us a non-NULL GstCaps which has a value of NULL. This + in turn wreaks havoc with any code that tests whether filter is NULL + (which basically means, am I being given an optional GstCaps ?). + So we look for non-NULL GstCaps which have NULL contents, and replace + them with NULL instead. */ + if (GST_QUERY_TYPE (query) == GST_QUERY_CAPS) { + GstCaps *filter; + gst_query_parse_caps (query, &filter); + if (filter + && !strcmp (gst_structure_get_name (gst_caps_get_structure (filter, 0)), + "NULL")) { + gst_query_unref (query); + query = gst_query_new_caps (NULL); + } + } + +done: + gst_adapter_unmap (comm->adapter); + gst_adapter_flush (comm->adapter, mapped_size); + return query; +} + +GstStateChangeReturn +gst_ipc_pipeline_comm_write_state_change_to_fd (GstIpcPipelineComm * comm, + GstStateChange transition) +{ + const unsigned char payload_type = + GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE; + GstStateChangeReturn ret; + guint32 size, ret32 = GST_STATE_CHANGE_SUCCESS; + GstByteWriter bw; + + g_mutex_lock (&comm->mutex); + ++comm->send_id; + + GST_TRACE_OBJECT (comm->element, "Writing state change %u: %s -> %s", + comm->send_id, + gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)), + gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition))); + + gst_byte_writer_init (&bw); + if (!gst_byte_writer_put_uint8 (&bw, payload_type)) + goto write_failed; + if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) + goto write_failed; + size = sizeof (transition); + if (!gst_byte_writer_put_uint32_le (&bw, size)) + goto write_failed; + if (!gst_byte_writer_put_uint32_le (&bw, transition)) + goto write_failed; + + if (!write_byte_writer_to_fd (comm, &bw)) + goto write_failed; + + if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32, + ACK_TYPE_TIMED, COMM_REQUEST_TYPE_STATE_CHANGE)) + goto write_failed; + ret = ret32; + +done: + g_mutex_unlock (&comm->mutex); + gst_byte_writer_reset (&bw); + return ret; + +write_failed: + GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), + ("Failed to write to socket")); + ret = GST_STATE_CHANGE_FAILURE; + goto done; +} + +static gboolean +is_valid_state_change (GstStateChange transition) +{ + if (transition == GST_STATE_CHANGE_NULL_TO_READY) + return TRUE; + if (transition == GST_STATE_CHANGE_READY_TO_PAUSED) + return TRUE; + if (transition == GST_STATE_CHANGE_PAUSED_TO_PLAYING) + return TRUE; + if (transition == GST_STATE_CHANGE_PLAYING_TO_PAUSED) + return TRUE; + if (transition == GST_STATE_CHANGE_PAUSED_TO_READY) + return TRUE; + if (transition == GST_STATE_CHANGE_READY_TO_NULL) + return TRUE; + if (GST_STATE_TRANSITION_CURRENT (transition) == + GST_STATE_TRANSITION_NEXT (transition)) + return TRUE; + return FALSE; +} + +static gboolean +gst_ipc_pipeline_comm_read_state_change (GstIpcPipelineComm * comm, + guint32 size, guint32 * transition) +{ + guint32 mapped_size = size; + const guint8 *payload; + + /* this should not be called if we don't have enough yet */ + g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE); + g_return_val_if_fail (size >= sizeof (*transition), FALSE); + + payload = gst_adapter_map (comm->adapter, size); + if (!payload) + return FALSE; + memcpy (transition, payload, sizeof (*transition)); + gst_adapter_unmap (comm->adapter); + gst_adapter_flush (comm->adapter, mapped_size); + return is_valid_state_change (*transition); +} + +void +gst_ipc_pipeline_comm_write_state_lost_to_fd (GstIpcPipelineComm * comm) +{ + const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST; + guint32 size; + GstByteWriter bw; + + g_mutex_lock (&comm->mutex); + ++comm->send_id; + + GST_TRACE_OBJECT (comm->element, "Writing state-lost %u", comm->send_id); + gst_byte_writer_init (&bw); + if (!gst_byte_writer_put_uint8 (&bw, payload_type)) + goto write_failed; + if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) + goto write_failed; + size = 0; + if (!gst_byte_writer_put_uint32_le (&bw, size)) + goto write_failed; + + if (!write_byte_writer_to_fd (comm, &bw)) + goto write_failed; + +done: + g_mutex_unlock (&comm->mutex); + gst_byte_writer_reset (&bw); + return; + +write_failed: + GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), + ("Failed to write to socket")); + goto done; +} + +static gboolean +gst_ipc_pipeline_comm_read_state_lost (GstIpcPipelineComm * comm, guint32 size) +{ + /* no payload */ + return TRUE; +} + +static gboolean +gst_ipc_pipeline_comm_write_gerror_message_to_fd (GstIpcPipelineComm * comm, + GstMessage * message) +{ + const unsigned char payload_type = + GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE; + gboolean ret; + guint32 code, size, ret32 = TRUE; + char *str = NULL; + GError *error; + char *extra_message; + const char *domain_string; + unsigned char msgtype; + GstByteWriter bw; + + g_mutex_lock (&comm->mutex); + ++comm->send_id; + + if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR) { + gst_message_parse_error (message, &error, &extra_message); + msgtype = 2; + } else if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING) { + gst_message_parse_warning (message, &error, &extra_message); + msgtype = 1; + } else { + gst_message_parse_info (message, &error, &extra_message); + msgtype = 0; + } + code = error->code; + domain_string = g_quark_to_string (error->domain); + GST_TRACE_OBJECT (comm->element, + "Writing error %u: domain %s, code %u, message %s, extra message %s", + comm->send_id, domain_string, error->code, error->message, extra_message); + + gst_byte_writer_init (&bw); + if (!gst_byte_writer_put_uint8 (&bw, payload_type)) + goto write_failed; + if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) + goto write_failed; + + size = sizeof (size); + size += 1; + size += strlen (domain_string) + 1; + size += sizeof (code); + size += sizeof (size); + size += error->message ? strlen (error->message) + 1 : 0; + size += sizeof (size); + size += extra_message ? strlen (extra_message) + 1 : 0; + + if (!gst_byte_writer_put_uint32_le (&bw, size)) + goto write_failed; + + if (!gst_byte_writer_put_uint8 (&bw, msgtype)) + goto write_failed; + size = strlen (domain_string) + 1; + if (!gst_byte_writer_put_uint32_le (&bw, size)) + goto write_failed; + if (!gst_byte_writer_put_data (&bw, (const guint8 *) domain_string, size)) + goto write_failed; + if (!gst_byte_writer_put_uint32_le (&bw, code)) + goto write_failed; + size = error->message ? strlen (error->message) + 1 : 0; + if (!gst_byte_writer_put_uint32_le (&bw, size)) + goto write_failed; + if (error->message) { + if (!gst_byte_writer_put_data (&bw, (const guint8 *) error->message, size)) + goto write_failed; + } + size = extra_message ? strlen (extra_message) + 1 : 0; + if (!gst_byte_writer_put_uint32_le (&bw, size)) + goto write_failed; + if (extra_message) { + if (!gst_byte_writer_put_data (&bw, (const guint8 *) extra_message, size)) + goto write_failed; + } + + if (!write_byte_writer_to_fd (comm, &bw)) + goto write_failed; + + if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32, + ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE)) + goto write_failed; + + ret = ret32; + +done: + g_mutex_unlock (&comm->mutex); + g_free (str); + g_error_free (error); + g_free (extra_message); + gst_byte_writer_reset (&bw); + return ret; + +write_failed: + GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), + ("Failed to write to socket")); + ret = FALSE; + goto done; +} + +static GstMessage * +gst_ipc_pipeline_comm_read_gerror_message (GstIpcPipelineComm * comm, + guint32 size) +{ + GstMessage *message = NULL; + guint32 code; + GQuark domain; + const char *msg, *extra_message; + GError *error; + unsigned char msgtype; + guint32 mapped_size = size; + const guint8 *payload; + + /* this should not be called if we don't have enough yet */ + g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL); + g_return_val_if_fail (size >= sizeof (code) + sizeof (size) * 3 + 1 + 1, + NULL); + + payload = gst_adapter_map (comm->adapter, mapped_size); + if (!payload) + return NULL; + msgtype = *payload++; + memcpy (&size, payload, sizeof (size)); + payload += sizeof (size); + if (payload[size - 1]) + goto done; + domain = g_quark_from_string ((const char *) payload); + payload += size; + + memcpy (&code, payload, sizeof (code)); + payload += sizeof (code); + + memcpy (&size, payload, sizeof (size)); + payload += sizeof (size); + if (size) { + if (payload[size - 1]) + goto done; + msg = (const char *) payload; + } else { + msg = NULL; + } + payload += size; + + memcpy (&size, payload, sizeof (size)); + payload += sizeof (size); + if (size) { + if (payload[size - 1]) + goto done; + extra_message = (const char *) payload; + } else { + extra_message = NULL; + } + payload += size; + + error = g_error_new (domain, code, "%s", msg); + if (msgtype == 2) + message = + gst_message_new_error (GST_OBJECT (comm->element), error, + extra_message); + else if (msgtype == 1) + message = + gst_message_new_warning (GST_OBJECT (comm->element), error, + extra_message); + else + message = + gst_message_new_info (GST_OBJECT (comm->element), error, extra_message); + g_error_free (error); + +done: + gst_adapter_unmap (comm->adapter); + gst_adapter_flush (comm->adapter, mapped_size); + + return message; +} + +gboolean +gst_ipc_pipeline_comm_write_message_to_fd (GstIpcPipelineComm * comm, + GstMessage * message) +{ + const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE; + gboolean ret; + guint32 type, size, ret32 = TRUE, slen; + char *str = NULL; + const GstStructure *structure; + GstByteWriter bw; + + /* we special case error as gst can't serialize/de-serialize it */ + if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR + || GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING + || GST_MESSAGE_TYPE (message) == GST_MESSAGE_INFO) + return gst_ipc_pipeline_comm_write_gerror_message_to_fd (comm, message); + + g_mutex_lock (&comm->mutex); + ++comm->send_id; + + GST_TRACE_OBJECT (comm->element, "Writing message %u: %" GST_PTR_FORMAT, + comm->send_id, message); + + gst_byte_writer_init (&bw); + if (!gst_byte_writer_put_uint8 (&bw, payload_type)) + goto write_failed; + if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) + goto write_failed; + structure = gst_message_get_structure (message); + if (structure) { + str = gst_structure_to_string (structure); + slen = strlen (str); + } else { + str = NULL; + slen = 0; + } + size = sizeof (type) + slen + 1; + if (!gst_byte_writer_put_uint32_le (&bw, size)) + goto write_failed; + + type = GST_MESSAGE_TYPE (message); + if (!gst_byte_writer_put_uint32_le (&bw, type)) + goto write_failed; + size -= sizeof (type); + if (str) { + if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size)) + goto write_failed; + } else { + if (!gst_byte_writer_put_uint8 (&bw, 0)) + goto write_failed; + } + + if (!write_byte_writer_to_fd (comm, &bw)) + goto write_failed; + + if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32, + ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE)) + goto write_failed; + + ret = ret32; + +done: + g_mutex_unlock (&comm->mutex); + g_free (str); + gst_byte_writer_reset (&bw); + return ret; + +write_failed: + GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), + ("Failed to write to socket")); + ret = FALSE; + goto done; +} + +static GstMessage * +gst_ipc_pipeline_comm_read_message (GstIpcPipelineComm * comm, guint32 size) +{ + GstMessage *message = NULL; + gchar *end = NULL; + GstStructure *structure; + guint32 type; + guint32 mapped_size = size; + const guint8 *payload; + + /* this should not be called if we don't have enough yet */ + g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL); + g_return_val_if_fail (size >= sizeof (type), NULL); + + payload = gst_adapter_map (comm->adapter, mapped_size); + if (!payload) + return NULL; + memcpy (&type, payload, sizeof (type)); + payload += sizeof (type); + size -= sizeof (type); + if (size == 0) + goto done; + + if (payload[size - 1]) + goto done; + if (*payload) { + structure = gst_structure_from_string ((const char *) payload, &end); + } else { + structure = NULL; + } + + message = + gst_message_new_custom (type, GST_OBJECT (comm->element), structure); + +done: + gst_adapter_unmap (comm->adapter); + gst_adapter_flush (comm->adapter, mapped_size); + + return message; +} + +void +gst_ipc_pipeline_comm_init (GstIpcPipelineComm * comm, GstElement * element) +{ + g_mutex_init (&comm->mutex); + comm->element = element; + comm->fdin = comm->fdout = -1; + comm->ack_time = DEFAULT_ACK_TIME; + comm->waiting_ids = + g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, + (GDestroyNotify) comm_request_free); + comm->adapter = gst_adapter_new (); + g_atomic_int_set (&comm->thread_running, 0); +} + +void +gst_ipc_pipeline_comm_clear (GstIpcPipelineComm * comm) +{ + g_assert (!g_atomic_int_get (&comm->thread_running)); + g_hash_table_destroy (comm->waiting_ids); + gst_object_unref (comm->adapter); + g_mutex_clear (&comm->mutex); +} + +static void +cancel_request (gpointer key, gpointer value, gpointer user_data, + GstFlowReturn fret) +{ + GstIpcPipelineComm *comm = (GstIpcPipelineComm *) user_data; + guint32 id = GPOINTER_TO_INT (key); + CommRequest *req = (CommRequest *) value; + + GST_TRACE_OBJECT (comm->element, "Cancelling request %u, type %d", id, + req->type); + req->ret = fret; + req->replied = TRUE; + g_cond_signal (&req->cond); +} + +static void +cancel_request_error (gpointer key, gpointer value, gpointer user_data) +{ + CommRequest *req = (CommRequest *) value; + GstFlowReturn fret = comm_request_ret_get_failure_value (req->type); + + cancel_request (key, value, user_data, fret); +} + +void +gst_ipc_pipeline_comm_cancel (GstIpcPipelineComm * comm, gboolean cleanup) +{ + g_mutex_lock (&comm->mutex); + g_hash_table_foreach (comm->waiting_ids, cancel_request_error, comm); + if (cleanup) { + g_hash_table_unref (comm->waiting_ids); + comm->waiting_ids = + g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, + (GDestroyNotify) comm_request_free); + } + g_mutex_unlock (&comm->mutex); +} + +static gboolean +set_field (GQuark field_id, const GValue * value, gpointer user_data) +{ + GstStructure *structure = user_data; + + gst_structure_id_set_value (structure, field_id, value); + + return TRUE; +} + +static gboolean +gst_ipc_pipeline_comm_reply_request (GstIpcPipelineComm * comm, guint32 id, + GstFlowReturn ret, GstQuery * query) +{ + CommRequest *req; + + req = g_hash_table_lookup (comm->waiting_ids, GINT_TO_POINTER (id)); + if (!req) { + GST_WARNING_OBJECT (comm->element, "Got reply for unknown request %u", id); + return FALSE; + } + + GST_TRACE_OBJECT (comm->element, "Got reply %d (%s) for request %u", ret, + comm_request_ret_get_name (req->type, ret), req->id); + req->replied = TRUE; + req->ret = ret; + if (query) { + if (req->query) { + /* We need to update the original query in place, as the caller + will expect the object to be the same */ + GstStructure *structure = gst_query_writable_structure (req->query); + gst_structure_remove_all_fields (structure); + gst_structure_foreach (gst_query_get_structure (query), set_field, + structure); + } else { + GST_WARNING_OBJECT (comm->element, + "Got query reply, but no query was in the request"); + } + } + g_cond_signal (&req->cond); + return TRUE; +} + +static gboolean +update_adapter (GstIpcPipelineComm * comm) +{ + GstMemory *mem = NULL; + + for (;;) { + fd_set set; + struct timeval tv; + int sret; + ssize_t sz; + GstBuffer *buf; + GstMapInfo map; + int fdin = comm->fdin; + int fdclose = comm->reader_thread_stopping_pipe[0]; + int fdmax; + + FD_ZERO (&set); + FD_SET (fdclose, &set); + fdmax = fdclose; + if (fdin >= 0 && GST_ELEMENT_PARENT (comm->element)) { + FD_SET (fdin, &set); + if (fdin > fdmax) + fdmax = fdin; + } + tv.tv_sec = 0; + tv.tv_usec = 100000; + sret = select (fdmax + 1, &set, NULL, NULL, &tv); + if (sret < 0) { + if (errno == EAGAIN) + continue; + if (errno == EINTR) + break; + if (mem) + gst_memory_unref (mem); + return FALSE; + } + if (FD_ISSET (fdclose, &set)) { + GST_INFO_OBJECT (comm->element, "data received on close notify pipe"); + comm->reader_thread_stopping = TRUE; + break; + } + if (fdin < 0) + break; + if (!FD_ISSET (fdin, &set)) + break; + if (mem == NULL) + mem = gst_allocator_alloc (NULL, comm->read_chunk_size, NULL); + gst_memory_map (mem, &map, GST_MAP_WRITE); + sz = read (fdin, map.data, map.size); + gst_memory_unmap (mem, &map); + if (sz < 0) { + if (errno == EAGAIN) + continue; + mem = NULL; + if (errno == EINTR) + break; + gst_memory_unref (mem); + return FALSE; + } + if (sz == 0) { + GST_INFO_OBJECT (comm->element, "fd closed"); + comm->reader_thread_stopping = TRUE; + break; + } + gst_memory_resize (mem, 0, sz); + buf = gst_buffer_new (); + gst_buffer_append_memory (buf, mem); + mem = NULL; + GST_TRACE_OBJECT (comm->element, "Read %u bytes from fd", (unsigned) sz); + gst_adapter_push (comm->adapter, buf); + + /* If we have more data, we loop, otherwise we break */ + FD_ZERO (&set); + if (fdin >= 0) + FD_SET (comm->fdin, &set); + tv.tv_sec = 0; + tv.tv_usec = 0; + sret = select (fdin + 1, &set, NULL, NULL, &tv); + if (sret < 0 || !FD_ISSET (fdin, &set)) + break; + } + if (mem) + gst_memory_unref (mem); + + return TRUE; +} + +static gboolean +read_many (GstIpcPipelineComm * comm) +{ + gboolean ret = TRUE; + gsize available; + const guint8 *payload; + + while (1) + switch (comm->state) { + case GST_IPC_PIPELINE_COMM_STATE_TYPE: + { + guint8 type; + guint32 mapped_size; + + available = gst_adapter_available (comm->adapter); + mapped_size = 1 + sizeof (gint32) * 2; + if (available < mapped_size) + goto done; + + payload = gst_adapter_map (comm->adapter, mapped_size); + type = *payload++; + g_mutex_lock (&comm->mutex); + memcpy (&comm->id, payload, sizeof (guint32)); + memcpy (&comm->payload_length, payload + 4, sizeof (guint32)); + g_mutex_unlock (&comm->mutex); + gst_adapter_unmap (comm->adapter); + gst_adapter_flush (comm->adapter, mapped_size); + GST_TRACE_OBJECT (comm->element, "Got id %u, type %d, payload %u", + comm->id, type, comm->payload_length); + switch (type) { + case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK: + case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT: + case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER: + case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT: + case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT: + case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY: + case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE: + case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST: + case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE: + case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE: + GST_TRACE_OBJECT (comm->element, "switching to state %s", + gst_ipc_pipeline_comm_data_type_get_name (type)); + comm->state = type; + break; + default: + goto out_of_sync; + } + break; + } + case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK: + { + const guint8 *rets; + guint32 ret32; + + available = gst_adapter_available (comm->adapter); + if (available < comm->payload_length) + goto done; + + if (available < sizeof (guint32)) + goto ack_failed; + + rets = gst_adapter_map (comm->adapter, sizeof (guint32)); + memcpy (&ret32, rets, sizeof (ret32)); + gst_adapter_unmap (comm->adapter); + gst_adapter_flush (comm->adapter, sizeof (guint32)); + GST_TRACE_OBJECT (comm->element, "Got ACK %s for id %u", + gst_flow_get_name (ret32), comm->id); + + g_mutex_lock (&comm->mutex); + gst_ipc_pipeline_comm_reply_request (comm, comm->id, ret32, NULL); + g_mutex_unlock (&comm->mutex); + + GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); + comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; + break; + } + case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT: + { + GstQuery *query = NULL; + gboolean qret; + + available = gst_adapter_available (comm->adapter); + if (available < comm->payload_length) + goto done; + + qret = + gst_ipc_pipeline_comm_read_query_result (comm, comm->payload_length, + &query); + + GST_TRACE_OBJECT (comm->element, + "deserialized query result %p: %d, %" GST_PTR_FORMAT, query, qret, + query); + + g_mutex_lock (&comm->mutex); + gst_ipc_pipeline_comm_reply_request (comm, comm->id, qret, query); + g_mutex_unlock (&comm->mutex); + + gst_query_unref (query); + + GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); + comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; + break; + } + case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER: + { + GstBuffer *buf; + + available = gst_adapter_available (comm->adapter); + if (available < comm->payload_length) + goto done; + + buf = gst_ipc_pipeline_comm_read_buffer (comm, comm->payload_length); + if (!buf) + goto buffer_failed; + + /* set caps and push */ + GST_TRACE_OBJECT (comm->element, + "deserialized buffer %p, pushing, timestamp %" GST_TIME_FORMAT + ", duration %" GST_TIME_FORMAT ", offset %" G_GINT64_FORMAT + ", offset_end %" G_GINT64_FORMAT ", size %" G_GSIZE_FORMAT + ", flags 0x%x", buf, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)), + GST_TIME_ARGS (GST_BUFFER_DURATION (buf)), GST_BUFFER_OFFSET (buf), + GST_BUFFER_OFFSET_END (buf), gst_buffer_get_size (buf), + GST_BUFFER_FLAGS (buf)); + + gst_mini_object_set_qdata (GST_MINI_OBJECT (buf), QUARK_ID, + GINT_TO_POINTER (comm->id), NULL); + + if (comm->on_buffer) + (*comm->on_buffer) (comm->id, buf, comm->user_data); + + GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); + comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; + break; + } + case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT: + { + GstEvent *event; + gboolean upstream; + + available = gst_adapter_available (comm->adapter); + if (available < comm->payload_length) + goto done; + + event = gst_ipc_pipeline_comm_read_event (comm, comm->payload_length, + &upstream); + if (!event) + goto event_failed; + + GST_TRACE_OBJECT (comm->element, "deserialized event %p of type %s", + event, gst_event_type_get_name (event->type)); + + gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID, + GINT_TO_POINTER (comm->id), NULL); + + if (comm->on_event) + (*comm->on_event) (comm->id, event, upstream, comm->user_data); + + GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); + comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; + break; + } + case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT: + { + GstEvent *event; + + available = gst_adapter_available (comm->adapter); + if (available < comm->payload_length) + goto done; + + event = gst_ipc_pipeline_comm_read_sink_message_event (comm, + comm->payload_length); + if (!event) + goto event_failed; + + GST_TRACE_OBJECT (comm->element, "deserialized sink message event %p", + event); + + gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID, + GINT_TO_POINTER (comm->id), NULL); + + if (comm->on_event) + (*comm->on_event) (comm->id, event, FALSE, comm->user_data); + + GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); + comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; + break; + } + case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY: + { + GstQuery *query; + gboolean upstream; + + available = gst_adapter_available (comm->adapter); + if (available < comm->payload_length) + goto done; + + query = gst_ipc_pipeline_comm_read_query (comm, comm->payload_length, + &upstream); + if (!query) + goto query_failed; + + GST_TRACE_OBJECT (comm->element, "deserialized query %p of type %s", + query, gst_query_type_get_name (query->type)); + + gst_mini_object_set_qdata (GST_MINI_OBJECT (query), QUARK_ID, + GINT_TO_POINTER (comm->id), NULL); + + if (comm->on_query) + (*comm->on_query) (comm->id, query, upstream, comm->user_data); + + GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); + comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; + break; + } + case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE: + { + guint32 transition; + + available = gst_adapter_available (comm->adapter); + if (available < comm->payload_length) + goto done; + + if (!gst_ipc_pipeline_comm_read_state_change (comm, + comm->payload_length, &transition)) + goto state_change_failed; + + GST_TRACE_OBJECT (comm->element, + "deserialized state change request: %s -> %s", + gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT + (transition)), + gst_element_state_get_name (GST_STATE_TRANSITION_NEXT + (transition))); + + if (comm->on_state_change) + (*comm->on_state_change) (comm->id, transition, comm->user_data); + + GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); + comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; + break; + } + case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST: + { + available = gst_adapter_available (comm->adapter); + if (available < comm->payload_length) + goto done; + + if (!gst_ipc_pipeline_comm_read_state_lost (comm, comm->payload_length)) + goto event_failed; + + GST_TRACE_OBJECT (comm->element, "deserialized state-lost"); + + if (comm->on_state_lost) + (*comm->on_state_lost) (comm->user_data); + + GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); + comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; + break; + } + case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE: + { + GstMessage *message; + + available = gst_adapter_available (comm->adapter); + if (available < comm->payload_length) + goto done; + + message = gst_ipc_pipeline_comm_read_message (comm, + comm->payload_length); + if (!message) + goto message_failed; + + GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s", + message, gst_message_type_get_name (message->type)); + + if (comm->on_message) + (*comm->on_message) (comm->id, message, comm->user_data); + + GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); + comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; + break; + } + case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE: + { + GstMessage *message; + + available = gst_adapter_available (comm->adapter); + if (available < comm->payload_length) + goto done; + + message = gst_ipc_pipeline_comm_read_gerror_message (comm, + comm->payload_length); + if (!message) + goto message_failed; + + GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s", + message, gst_message_type_get_name (message->type)); + + if (comm->on_message) + (*comm->on_message) (comm->id, message, comm->user_data); + + GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); + comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; + break; + } + } + +done: + return ret; + + /* ERRORS */ +out_of_sync: + { + GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), + ("Socket out of sync")); + ret = FALSE; + goto done; + } +state_change_failed: + { + GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), + ("could not read state change from fd")); + ret = FALSE; + goto done; + } +ack_failed: + { + GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), + ("could not read ack from fd")); + ret = FALSE; + goto done; + } +buffer_failed: + { + GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), + ("could not read buffer from fd")); + ret = FALSE; + goto done; + } +event_failed: + { + GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), + ("could not read event from fd")); + ret = FALSE; + goto done; + } +message_failed: + { + GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), + ("could not read message from fd")); + ret = FALSE; + goto done; + } +query_failed: + { + GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), + ("could not read query from fd")); + ret = FALSE; + goto done; + } +} + +static gpointer +reader_thread (gpointer data) +{ + GstIpcPipelineComm *comm = (GstIpcPipelineComm *) data; + + g_atomic_int_set (&comm->thread_running, 1); + while (!comm->reader_thread_stopping) { + if (!update_adapter (comm)) { + if (comm->reader_thread_stopping) { + GST_INFO_OBJECT (comm->element, "We're stopping, all good"); + break; + } + GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL), + ("Failed to read from socket")); + break; + } + read_many (comm); + } + + GST_INFO_OBJECT (comm->element, "Reader thread ending"); + g_atomic_int_set (&comm->thread_running, 0); + + return NULL; +} + +gboolean +gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm, + void (*on_buffer) (guint32, GstBuffer *, gpointer), + void (*on_event) (guint32, GstEvent *, gboolean, gpointer), + void (*on_query) (guint32, GstQuery *, gboolean, gpointer), + void (*on_state_change) (guint32, GstStateChange, gpointer), + void (*on_state_lost) (gpointer), + void (*on_message) (guint32, GstMessage *, gpointer), gpointer user_data) +{ + if (comm->reader_thread) + return FALSE; + + comm->reader_thread_stopping = FALSE; + comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; + comm->on_buffer = on_buffer; + comm->on_event = on_event; + comm->on_query = on_query; + comm->on_state_change = on_state_change; + comm->on_state_lost = on_state_lost; + comm->on_message = on_message; + comm->user_data = user_data; + if (pipe (comm->reader_thread_stopping_pipe) < 0) { + GST_WARNING_OBJECT (comm->element, "Failed to create pipes"); + return FALSE; + } + comm->reader_thread = + g_thread_new ("reader", (GThreadFunc) reader_thread, comm); + return TRUE; +} + +void +gst_ipc_pipeline_comm_stop_reader_thread (GstIpcPipelineComm * comm) +{ + char dummy = 0; + + if (!comm->reader_thread) + return; + while (write (comm->reader_thread_stopping_pipe[1], &dummy, 1) < 0 + && errno == EINTR); + g_thread_join (comm->reader_thread); + close (comm->reader_thread_stopping_pipe[0]); + close (comm->reader_thread_stopping_pipe[1]); + comm->reader_thread = NULL; +} + +static gchar * +gst_value_serialize_event (const GValue * value) +{ + const GstStructure *structure; + GstEvent *ev; + gchar *type, *ts, *seqnum, *rt_offset, *str, *str64, *s; + GValue val = G_VALUE_INIT; + + ev = g_value_get_boxed (value); + + g_value_init (&val, gst_event_type_get_type ()); + g_value_set_enum (&val, ev->type); + type = gst_value_serialize (&val); + g_value_unset (&val); + + g_value_init (&val, G_TYPE_UINT64); + g_value_set_uint64 (&val, ev->timestamp); + ts = gst_value_serialize (&val); + g_value_unset (&val); + + g_value_init (&val, G_TYPE_UINT); + g_value_set_uint (&val, ev->seqnum); + seqnum = gst_value_serialize (&val); + g_value_unset (&val); + + g_value_init (&val, G_TYPE_INT64); + g_value_set_int64 (&val, gst_event_get_running_time_offset (ev)); + rt_offset = gst_value_serialize (&val); + g_value_unset (&val); + + structure = gst_event_get_structure (ev); + str = gst_structure_to_string (structure); + str64 = g_base64_encode ((guchar *) str, strlen (str) + 1); + g_strdelimit (str64, "=", '_'); + g_free (str); + + s = g_strconcat (type, ":", ts, ":", seqnum, ":", rt_offset, ":", str64, + NULL); + + g_free (type); + g_free (ts); + g_free (seqnum); + g_free (rt_offset); + g_free (str64); + + return s; +} + +static gboolean +gst_value_deserialize_event (GValue * dest, const gchar * s) +{ + GstEvent *ev = NULL; + GValue val = G_VALUE_INIT; + gboolean ret = FALSE; + gchar **fields; + gsize len; + + fields = g_strsplit (s, ":", -1); + if (g_strv_length (fields) != 5) + goto wrong_length; + + g_strdelimit (fields[4], "_", '='); + g_base64_decode_inplace (fields[4], &len); + + g_value_init (&val, gst_event_type_get_type ()); + if (!gst_value_deserialize (&val, fields[0])) + goto fail; + ev = gst_event_new_custom (g_value_get_enum (&val), + gst_structure_new_from_string (fields[4])); + + g_value_unset (&val); + g_value_init (&val, G_TYPE_UINT64); + if (!gst_value_deserialize (&val, fields[1])) + goto fail; + ev->timestamp = g_value_get_uint64 (&val); + + g_value_unset (&val); + g_value_init (&val, G_TYPE_UINT); + if (!gst_value_deserialize (&val, fields[2])) + goto fail; + ev->seqnum = g_value_get_uint (&val); + + g_value_unset (&val); + g_value_init (&val, G_TYPE_INT64); + if (!gst_value_deserialize (&val, fields[3])) + goto fail; + gst_event_set_running_time_offset (ev, g_value_get_int64 (&val)); + + g_value_take_boxed (dest, g_steal_pointer (&ev)); + ret = TRUE; + +fail: + g_clear_pointer (&ev, gst_event_unref); + g_value_unset (&val); + +wrong_length: + g_strfreev (fields); + return ret; +} + +#define REGISTER_SERIALIZATION_NO_COMPARE(_gtype, _type) \ +G_STMT_START { \ + static GstValueTable gst_value = \ + { 0, NULL, \ + gst_value_serialize_ ## _type, gst_value_deserialize_ ## _type }; \ + gst_value.type = _gtype; \ + gst_value_register (&gst_value); \ +} G_STMT_END + +void +gst_ipc_pipeline_comm_plugin_init (void) +{ + static volatile gsize once = 0; + + if (g_once_init_enter (&once)) { + GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_comm_debug, "ipcpipelinecomm", 0, + "ipc pipeline comm"); + QUARK_ID = g_quark_from_static_string ("ipcpipeline-id"); + REGISTER_SERIALIZATION_NO_COMPARE (gst_event_get_type (), event); + g_once_init_leave (&once, (gsize) 1); + } +} diff --git a/gst/ipcpipeline/gstipcpipelinecomm.h b/gst/ipcpipeline/gstipcpipelinecomm.h new file mode 100644 index 000000000..bd7335e9e --- /dev/null +++ b/gst/ipcpipeline/gstipcpipelinecomm.h @@ -0,0 +1,132 @@ +/* GStreamer + * Copyright (C) 2015-2017 YouView TV Ltd + * Author: Vincent Penquerch <vincent.penquerch@collabora.co.uk> + * + * gstipcpipelinecomm.h: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + + +#ifndef __GST_IPC_PIPELINE_COMM_H__ +#define __GST_IPC_PIPELINE_COMM_H__ + +#include <glib.h> +#include <gst/gst.h> +#include <gst/base/gstadapter.h> + +G_BEGIN_DECLS + +#define GST_FLOW_COMM_ERROR GST_FLOW_CUSTOM_ERROR_1 + +extern GQuark QUARK_ID; + +typedef enum { + GST_IPC_PIPELINE_COMM_STATE_TYPE = 0, + /* for the rest of the states we use directly the data type enums below */ +} GstIpcPipelineCommState; + +typedef enum { + /* reply types */ + GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK = 1, + GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT, + /* data send types */ + GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER, + GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT, + GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT, + GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY, + GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE, + GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST, + GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE, + GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE, +} GstIpcPipelineCommDataType; + +typedef struct +{ + GstElement *element; + + GMutex mutex; + int fdin; + int fdout; + GHashTable *waiting_ids; + + GThread *reader_thread; + gboolean reader_thread_stopping; + volatile gint thread_running; + int reader_thread_stopping_pipe[2]; + GstAdapter *adapter; + guint8 state; + guint32 send_id; + + guint32 payload_length; + guint32 id; + + guint read_chunk_size; + GstClockTime ack_time; + + void (*on_buffer) (guint32, GstBuffer *, gpointer); + void (*on_event) (guint32, GstEvent *, gboolean, gpointer); + void (*on_query) (guint32, GstQuery *, gboolean, gpointer); + void (*on_state_change) (guint32, GstStateChange, gpointer); + void (*on_state_lost) (gpointer); + void (*on_message) (guint32, GstMessage *, gpointer); + gpointer user_data; + +} GstIpcPipelineComm; + +void gst_ipc_pipeline_comm_plugin_init (void); + +void gst_ipc_pipeline_comm_init (GstIpcPipelineComm *comm, GstElement *e); +void gst_ipc_pipeline_comm_clear (GstIpcPipelineComm *comm); +void gst_ipc_pipeline_comm_cancel (GstIpcPipelineComm * comm, + gboolean flushing); + +void gst_ipc_pipeline_comm_write_flow_ack_to_fd (GstIpcPipelineComm * comm, + guint32 id, GstFlowReturn ret); +void gst_ipc_pipeline_comm_write_boolean_ack_to_fd (GstIpcPipelineComm * comm, + guint32 id, gboolean ret); +void gst_ipc_pipeline_comm_write_state_change_ack_to_fd ( + GstIpcPipelineComm * comm, guint32 id, GstStateChangeReturn ret); + +void gst_ipc_pipeline_comm_write_query_result_to_fd (GstIpcPipelineComm * comm, + guint32 id, gboolean result, GstQuery *query); + +GstFlowReturn gst_ipc_pipeline_comm_write_buffer_to_fd ( + GstIpcPipelineComm * comm, GstBuffer * buffer); +gboolean gst_ipc_pipeline_comm_write_event_to_fd (GstIpcPipelineComm * comm, + gboolean upstream, GstEvent * event); +gboolean gst_ipc_pipeline_comm_write_query_to_fd (GstIpcPipelineComm * comm, + gboolean upstream, GstQuery * query); +GstStateChangeReturn gst_ipc_pipeline_comm_write_state_change_to_fd ( + GstIpcPipelineComm * comm, GstStateChange transition); +void gst_ipc_pipeline_comm_write_state_lost_to_fd (GstIpcPipelineComm * comm); +gboolean gst_ipc_pipeline_comm_write_message_to_fd (GstIpcPipelineComm * comm, + GstMessage *message); + +gboolean gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm, + void (*on_buffer) (guint32, GstBuffer *, gpointer), + void (*on_event) (guint32, GstEvent *, gboolean, gpointer), + void (*on_query) (guint32, GstQuery *, gboolean, gpointer), + void (*on_state_change) (guint32, GstStateChange, gpointer), + void (*on_state_lost) (gpointer), + void (*on_message) (guint32, GstMessage *, gpointer), + gpointer user_data); +void gst_ipc_pipeline_comm_stop_reader_thread (GstIpcPipelineComm * comm); + +G_END_DECLS + +#endif + diff --git a/gst/ipcpipeline/gstipcpipelinesink.c b/gst/ipcpipeline/gstipcpipelinesink.c new file mode 100644 index 000000000..f1f02bfcb --- /dev/null +++ b/gst/ipcpipeline/gstipcpipelinesink.c @@ -0,0 +1,722 @@ +/* GStreamer + * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu> + * 2005 Wim Taymans <wim@fluendo.com> + * 2006 Thomas Vander Stichele <thomas at apestaart dot org> + * 2014 Tim-Philipp Müller <tim centricular com> + * 2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk> + * + * gstipcpipelinesink.c: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ +/** + * SECTION:element-ipcpipelinesink + * @see_also: #GstIpcPipelineSrc, #GstIpcSlavePipeline + * + * Communicates with an ipcpipelinesrc element in another process via a socket. + * + * This element, together with ipcpipelinesrc and ipcslavepipeline form a + * mechanism that allows splitting a single pipeline in different processes. + * The main use-case for it is a playback pipeline split in two parts, where the + * first part contains the networking, parsing and demuxing and the second part + * contains the decoding and display. The intention of this split is to improve + * security of an application, by letting the networking, parsing and demuxing + * parts run in a less privileged process than the process that accesses the + * decoder and display. + * + * Once the pipelines in those different processes have been created, the + * playback can be controlled entirely from the first pipeline, which is the + * one that contains ipcpipelinesink. We call this pipeline the “master”. + * All relevant events and queries sent from the application are sent to + * the master pipeline and messages to the application are sent from the master + * pipeline. The second pipeline, in the other process, is transparently slaved. + * + * ipcpipelinesink can work only in push mode and does not synchronize buffers + * to the clock. Synchronization is meant to happen either at the real sink at + * the end of the remote slave pipeline, or not to happen at all, if the + * pipeline is live. + * + * A master pipeline may contain more than one ipcpipelinesink elements, which + * can be connected either to the same slave pipeline or to different ones. + * + * Communication with ipcpipelinesrc on the slave happens via a socket, using a + * custom protocol. Each buffer, event, query, message or state change is + * serialized in a "packet" and sent over the socket. The sender then + * performs a blocking wait for a reply, if a return code is needed. + * + * All objects that contan a GstStructure (messages, queries, events) are + * serialized by serializing the GstStructure to a string + * (gst_structure_to_string). This implies some limitations, of course. + * All fields of this structures that are not serializable to strings (ex. + * object pointers) are ignored, except for some cases where custom + * serialization may occur (ex error/warning/info messages that contain a + * GError are serialized differently). + * + * Buffers are transported by writing their content directly on the socket. + * More efficient ways for memory sharing could be implemented in the future. + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include <unistd.h> +#include "gstipcpipelinesink.h" +#include <string.h> + +static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + +GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_sink_debug); +#define GST_CAT_DEFAULT gst_ipc_pipeline_sink_debug + +enum +{ + SIGNAL_DISCONNECT, + /* FILL ME */ + LAST_SIGNAL +}; +static guint gst_ipc_pipeline_sink_signals[LAST_SIGNAL] = { 0 }; + +enum +{ + PROP_0, + PROP_FDIN, + PROP_FDOUT, + PROP_READ_CHUNK_SIZE, + PROP_ACK_TIME, +}; + + +#define DEFAULT_READ_CHUNK_SIZE 4096 +#define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND) + +#define _do_init \ + GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_sink_debug, "ipcpipelinesink", 0, "ipcpipelinesink element"); +#define gst_ipc_pipeline_sink_parent_class parent_class +G_DEFINE_TYPE_WITH_CODE (GstIpcPipelineSink, gst_ipc_pipeline_sink, + GST_TYPE_ELEMENT, _do_init); + +static void gst_ipc_pipeline_sink_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_ipc_pipeline_sink_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); +static void gst_ipc_pipeline_sink_dispose (GObject * obj); +static void gst_ipc_pipeline_sink_finalize (GObject * obj); +static gboolean gst_ipc_pipeline_sink_start_reader_thread (GstIpcPipelineSink * + sink); +static void gst_ipc_pipeline_sink_stop_reader_thread (GstIpcPipelineSink * + sink); + +static GstStateChangeReturn gst_ipc_pipeline_sink_change_state (GstElement * + element, GstStateChange transition); + +static GstFlowReturn gst_ipc_pipeline_sink_chain (GstPad * pad, + GstObject * parent, GstBuffer * buffer); +static gboolean gst_ipc_pipeline_sink_event (GstPad * pad, GstObject * parent, + GstEvent * event); +static gboolean gst_ipc_pipeline_sink_element_query (GstElement * element, + GstQuery * query); +static gboolean gst_ipc_pipeline_sink_send_event (GstElement * element, + GstEvent * event); +static gboolean gst_ipc_pipeline_sink_query (GstPad * pad, GstObject * parent, + GstQuery * query); +static gboolean gst_ipc_pipeline_sink_pad_activate_mode (GstPad * pad, + GstObject * parent, GstPadMode mode, gboolean active); + + +static void gst_ipc_pipeline_sink_disconnect (GstIpcPipelineSink * sink); +static void pusher (gpointer data, gpointer user_data); + + +static void +gst_ipc_pipeline_sink_class_init (GstIpcPipelineSinkClass * klass) +{ + GObjectClass *gobject_class; + GstElementClass *gstelement_class; + + gobject_class = G_OBJECT_CLASS (klass); + gstelement_class = GST_ELEMENT_CLASS (klass); + + gobject_class->set_property = gst_ipc_pipeline_sink_set_property; + gobject_class->get_property = gst_ipc_pipeline_sink_get_property; + gobject_class->dispose = gst_ipc_pipeline_sink_dispose; + gobject_class->finalize = gst_ipc_pipeline_sink_finalize; + + g_object_class_install_property (gobject_class, PROP_FDIN, + g_param_spec_int ("fdin", "Input file descriptor", + "File descriptor to received data from", + -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_FDOUT, + g_param_spec_int ("fdout", "Output file descriptor", + "File descriptor to send data through", + -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_READ_CHUNK_SIZE, + g_param_spec_uint ("read-chunk-size", "Read chunk size", + "Read chunk size", + 1, 1 << 24, DEFAULT_READ_CHUNK_SIZE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_ACK_TIME, + g_param_spec_uint64 ("ack-time", "Ack time", + "Maximum time to wait for a response to a message", + 0, G_MAXUINT64, DEFAULT_ACK_TIME, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + gst_ipc_pipeline_sink_signals[SIGNAL_DISCONNECT] = + g_signal_new ("disconnect", + G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, + G_STRUCT_OFFSET (GstIpcPipelineSinkClass, disconnect), + NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); + + gst_element_class_set_static_metadata (gstelement_class, + "Inter-process Pipeline Sink", + "Sink", + "Allows splitting and continuing a pipeline in another process", + "Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>"); + gst_element_class_add_pad_template (gstelement_class, + gst_static_pad_template_get (&sinktemplate)); + + gstelement_class->change_state = + GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_change_state); + gstelement_class->query = + GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_element_query); + gstelement_class->send_event = + GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_send_event); + + klass->disconnect = GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_disconnect); +} + +static void +gst_ipc_pipeline_sink_init (GstIpcPipelineSink * sink) +{ + GstPadTemplate *pad_template; + + GST_OBJECT_FLAG_SET (sink, GST_ELEMENT_FLAG_SINK); + + gst_ipc_pipeline_comm_init (&sink->comm, GST_ELEMENT (sink)); + sink->comm.read_chunk_size = DEFAULT_READ_CHUNK_SIZE; + sink->comm.ack_time = DEFAULT_ACK_TIME; + sink->comm.fdin = -1; + sink->comm.fdout = -1; + sink->threads = g_thread_pool_new (pusher, sink, -1, FALSE, NULL); + gst_ipc_pipeline_sink_start_reader_thread (sink); + + pad_template = + gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (sink), "sink"); + g_return_if_fail (pad_template != NULL); + + sink->sinkpad = gst_pad_new_from_template (pad_template, "sink"); + + gst_pad_set_activatemode_function (sink->sinkpad, + gst_ipc_pipeline_sink_pad_activate_mode); + gst_pad_set_query_function (sink->sinkpad, gst_ipc_pipeline_sink_query); + gst_pad_set_event_function (sink->sinkpad, gst_ipc_pipeline_sink_event); + gst_pad_set_chain_function (sink->sinkpad, gst_ipc_pipeline_sink_chain); + gst_element_add_pad (GST_ELEMENT_CAST (sink), sink->sinkpad); + +} + +static void +gst_ipc_pipeline_sink_dispose (GObject * obj) +{ + GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (obj); + + gst_ipc_pipeline_sink_stop_reader_thread (sink); + gst_ipc_pipeline_comm_cancel (&sink->comm, TRUE); + + G_OBJECT_CLASS (parent_class)->dispose (obj); +} + +static void +gst_ipc_pipeline_sink_finalize (GObject * obj) +{ + GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (obj); + + gst_ipc_pipeline_comm_clear (&sink->comm); + g_thread_pool_free (sink->threads, TRUE, TRUE); + + G_OBJECT_CLASS (parent_class)->finalize (obj); +} + +static void +gst_ipc_pipeline_sink_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (object); + + switch (prop_id) { + case PROP_FDIN: + sink->comm.fdin = g_value_get_int (value); + break; + case PROP_FDOUT: + sink->comm.fdout = g_value_get_int (value); + break; + case PROP_READ_CHUNK_SIZE: + sink->comm.read_chunk_size = g_value_get_uint (value); + break; + case PROP_ACK_TIME: + sink->comm.ack_time = g_value_get_uint64 (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_ipc_pipeline_sink_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (object); + + switch (prop_id) { + case PROP_FDIN: + g_value_set_int (value, sink->comm.fdin); + break; + case PROP_FDOUT: + g_value_set_int (value, sink->comm.fdout); + break; + case PROP_READ_CHUNK_SIZE: + g_value_set_uint (value, sink->comm.read_chunk_size); + break; + case PROP_ACK_TIME: + g_value_set_uint64 (value, sink->comm.ack_time); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static gboolean +gst_ipc_pipeline_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) +{ + GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent); + gboolean ret; + + GST_DEBUG_OBJECT (sink, "received event %p of type %s (%d)", + event, gst_event_type_get_name (event->type), event->type); + + ret = gst_ipc_pipeline_comm_write_event_to_fd (&sink->comm, FALSE, event); + gst_event_unref (event); + return ret; +} + +static GstFlowReturn +gst_ipc_pipeline_sink_chain (GstPad * pad, GstObject * parent, + GstBuffer * buffer) +{ + GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent); + GstFlowReturn ret; + + GST_DEBUG_OBJECT (sink, "Rendering buffer %" GST_PTR_FORMAT, buffer); + + ret = gst_ipc_pipeline_comm_write_buffer_to_fd (&sink->comm, buffer); + if (ret != GST_FLOW_OK) + GST_DEBUG_OBJECT (sink, "Peer result was %s", gst_flow_get_name (ret)); + + gst_buffer_unref (buffer); + return ret; +} + +static gboolean +gst_ipc_pipeline_sink_query (GstPad * pad, GstObject * parent, GstQuery * query) +{ + GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent); + gboolean ret; + + GST_DEBUG_OBJECT (sink, "Got query %s: %" GST_PTR_FORMAT, + GST_QUERY_TYPE_NAME (query), query); + + switch (GST_QUERY_TYPE (query)) { + case GST_QUERY_ALLOCATION: + GST_DEBUG_OBJECT (sink, "Rejecting ALLOCATION query"); + return FALSE; + case GST_QUERY_CAPS: + { + /* caps queries occur even while linking the pipeline. + * It is possible that the ipcpipelinesrc may not be connected at this + * point, so let's avoid a couple of errors... */ + GstState state; + GST_OBJECT_LOCK (sink); + state = GST_STATE (sink); + GST_OBJECT_UNLOCK (sink); + if (state == GST_STATE_NULL) + return FALSE; + } + default: + break; + } + ret = gst_ipc_pipeline_comm_write_query_to_fd (&sink->comm, FALSE, query); + + return ret; +} + +static gboolean +gst_ipc_pipeline_sink_element_query (GstElement * element, GstQuery * query) +{ + GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element); + gboolean ret; + + GST_DEBUG_OBJECT (sink, "Got element query %s: %" GST_PTR_FORMAT, + GST_QUERY_TYPE_NAME (query), query); + + ret = gst_ipc_pipeline_comm_write_query_to_fd (&sink->comm, TRUE, query); + GST_DEBUG_OBJECT (sink, "Got query reply: %d: %" GST_PTR_FORMAT, ret, query); + return ret; +} + +static gboolean +gst_ipc_pipeline_sink_send_event (GstElement * element, GstEvent * event) +{ + GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element); + gboolean ret; + + GST_DEBUG_OBJECT (sink, "Got element event %s: %" GST_PTR_FORMAT, + GST_EVENT_TYPE_NAME (event), event); + + ret = gst_ipc_pipeline_comm_write_event_to_fd (&sink->comm, TRUE, event); + GST_DEBUG_OBJECT (sink, "Got event reply: %d: %" GST_PTR_FORMAT, ret, event); + + gst_event_unref (event); + return ret; +} + + +static gboolean +gst_ipc_pipeline_sink_pad_activate_mode (GstPad * pad, + GstObject * parent, GstPadMode mode, gboolean active) +{ + if (mode == GST_PAD_MODE_PULL) + return FALSE; + return TRUE; +} + +static void +on_buffer (guint32 id, GstBuffer * buffer, gpointer user_data) +{ + GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data); + GST_ERROR_OBJECT (sink, + "Got buffer id %u! I never knew buffers could go upstream...", id); + gst_buffer_unref (buffer); +} + +static void +pusher (gpointer data, gpointer user_data) +{ + GstIpcPipelineSink *sink = user_data; + gboolean ret; + guint32 id; + + id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (data), + QUARK_ID)); + + if (GST_IS_EVENT (data)) { + GstEvent *event = GST_EVENT (data); + GST_DEBUG_OBJECT (sink, "Pushing event async: %" GST_PTR_FORMAT, event); + ret = gst_pad_push_event (sink->sinkpad, event); + GST_DEBUG_OBJECT (sink, "Event pushed, return %d", ret); + gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&sink->comm, id, ret); + } else if (GST_IS_QUERY (data)) { + GstQuery *query = GST_QUERY (data); + GST_DEBUG_OBJECT (sink, "Pushing query async: %" GST_PTR_FORMAT, query); + ret = gst_pad_peer_query (sink->sinkpad, query); + GST_DEBUG_OBJECT (sink, "Query pushed, return %d", ret); + gst_ipc_pipeline_comm_write_query_result_to_fd (&sink->comm, id, ret, + query); + gst_query_unref (query); + } else { + GST_ERROR_OBJECT (sink, "Unsupported object type"); + } + gst_object_unref (sink); +} + +static void +on_event (guint32 id, GstEvent * event, gboolean upstream, gpointer user_data) +{ + GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data); + + if (!upstream) { + GST_ERROR_OBJECT (sink, "Got downstream event id %u! Not supposed to...", + id); + gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&sink->comm, id, FALSE); + gst_event_unref (event); + return; + } + + GST_DEBUG_OBJECT (sink, "Got event id %u: %" GST_PTR_FORMAT, id, event); + gst_object_ref (sink); + g_thread_pool_push (sink->threads, event, NULL); +} + +static void +on_query (guint32 id, GstQuery * query, gboolean upstream, gpointer user_data) +{ + GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data); + + if (!upstream) { + GST_ERROR_OBJECT (sink, "Got downstream query id %u! Not supposed to...", + id); + gst_ipc_pipeline_comm_write_query_result_to_fd (&sink->comm, id, FALSE, + query); + gst_query_unref (query); + return; + } + + GST_DEBUG_OBJECT (sink, "Got query id %u: %" GST_PTR_FORMAT, id, query); + gst_object_ref (sink); + g_thread_pool_push (sink->threads, query, NULL); +} + +static void +on_state_change (guint32 id, GstStateChange transition, gpointer user_data) +{ + GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data); + GST_ERROR_OBJECT (sink, "Got state change id %u! Not supposed to...", id); +} + +static void +on_state_lost (gpointer user_data) +{ + GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data); + + GST_DEBUG_OBJECT (sink, "Got state lost notification, losing state"); + + GST_OBJECT_LOCK (sink); + sink->pass_next_async_done = TRUE; + GST_OBJECT_UNLOCK (sink); + + gst_element_lost_state (GST_ELEMENT (sink)); +} + +static void +do_async_done (GstElement * element, gpointer user_data) +{ + GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element); + GstMessage *message = user_data; + + GST_STATE_LOCK (sink); + GST_OBJECT_LOCK (sink); + if (sink->pass_next_async_done) { + sink->pass_next_async_done = FALSE; + GST_OBJECT_UNLOCK (sink); + gst_element_continue_state (element, GST_STATE_CHANGE_SUCCESS); + GST_STATE_UNLOCK (sink); + gst_element_post_message (element, gst_message_ref (message)); + + } else { + GST_OBJECT_UNLOCK (sink); + GST_STATE_UNLOCK (sink); + } +} + +static void +on_message (guint32 id, GstMessage * message, gpointer user_data) +{ + GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data); + + GST_DEBUG_OBJECT (sink, "Got message id %u: %" GST_PTR_FORMAT, id, message); + + switch (GST_MESSAGE_TYPE (message)) { + case GST_MESSAGE_ASYNC_DONE: + GST_OBJECT_LOCK (sink); + if (sink->pass_next_async_done) { + GST_OBJECT_UNLOCK (sink); + gst_element_call_async (GST_ELEMENT (sink), do_async_done, + message, (GDestroyNotify) gst_message_unref); + } else { + GST_OBJECT_UNLOCK (sink); + gst_message_unref (message); + } + return; + default: + break; + } + + gst_element_post_message (GST_ELEMENT (sink), message); +} + +static gboolean +gst_ipc_pipeline_sink_start_reader_thread (GstIpcPipelineSink * sink) +{ + if (!gst_ipc_pipeline_comm_start_reader_thread (&sink->comm, on_buffer, + on_event, on_query, on_state_change, on_state_lost, on_message, + sink)) { + GST_ERROR_OBJECT (sink, "Failed to start reader thread"); + return FALSE; + } + return TRUE; +} + +static void +gst_ipc_pipeline_sink_stop_reader_thread (GstIpcPipelineSink * sink) +{ + gst_ipc_pipeline_comm_stop_reader_thread (&sink->comm); +} + + +static void +gst_ipc_pipeline_sink_disconnect (GstIpcPipelineSink * sink) +{ + GST_DEBUG_OBJECT (sink, "Disconnecting"); + gst_ipc_pipeline_sink_stop_reader_thread (sink); + sink->comm.fdin = -1; + sink->comm.fdout = -1; + gst_ipc_pipeline_comm_cancel (&sink->comm, FALSE); + gst_ipc_pipeline_sink_start_reader_thread (sink); +} + +static GstStateChangeReturn +gst_ipc_pipeline_sink_change_state (GstElement * element, + GstStateChange transition) +{ + GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element); + GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS; + GstStateChangeReturn peer_ret = GST_STATE_CHANGE_SUCCESS; + gboolean async = FALSE; + gboolean down = FALSE; + + GST_DEBUG_OBJECT (sink, "Got state change request: %s -> %s", + gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)), + gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition))); + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY: + if (sink->comm.fdin < 0) { + GST_ERROR_OBJECT (element, "Invalid fdin: %d", sink->comm.fdin); + return GST_STATE_CHANGE_FAILURE; + } + if (sink->comm.fdout < 0) { + GST_ERROR_OBJECT (element, "Invalid fdout: %d", sink->comm.fdout); + return GST_STATE_CHANGE_FAILURE; + } + if (!sink->comm.reader_thread) { + GST_ERROR_OBJECT (element, "Failed to start reader thread"); + return GST_STATE_CHANGE_FAILURE; + } + break; + case GST_STATE_CHANGE_READY_TO_PAUSED: + case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + /* In these transitions, it is possible that the peer returns ASYNC. + * We don't know that in advance, but we post async-start anyway because + * it needs to be delivered *before* async-done, and async-done may + * arrive at any point in time after we've set the state of the peer. + * In case the peer doesn't return ASYNC, we just post async-done + * ourselves and the parent GstBin takes care of matching and deleting + * them, so the app never gets any of these. */ + async = TRUE; + break; + default: + break; + } + + /* downwards state change */ + down = (GST_STATE_TRANSITION_CURRENT (transition) >= + GST_STATE_TRANSITION_NEXT (transition)); + + if (async) { + GST_DEBUG_OBJECT (sink, + "Posting async-start for %s, will need state-change-done", + gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition))); + + gst_element_post_message (GST_ELEMENT (sink), + gst_message_new_async_start (GST_OBJECT (sink))); + + GST_OBJECT_LOCK (sink); + sink->pass_next_async_done = TRUE; + GST_OBJECT_UNLOCK (sink); + } + + /* change the state of the peer first */ + /* If the fd out is -1, we do not actually call the peer. This will happen + when we explicitely disconnected, and in that case we want to be able + to bring the element down to NULL, so it can be restarted with a new + slave pipeline. */ + if (sink->comm.fdout >= 0) { + GST_DEBUG_OBJECT (sink, "Calling peer with state change"); + peer_ret = gst_ipc_pipeline_comm_write_state_change_to_fd (&sink->comm, + transition); + if (ret == GST_STATE_CHANGE_FAILURE && down) { + GST_WARNING_OBJECT (sink, "Peer returned state change failure, " + "but ignoring because we are going down"); + peer_ret = GST_STATE_CHANGE_SUCCESS; + } + } else { + if (down) { + GST_WARNING_OBJECT (sink, "Not calling peer (fdout %d)", + sink->comm.fdout); + peer_ret = GST_STATE_CHANGE_SUCCESS; + } else { + GST_ERROR_OBJECT (sink, "Not calling peer (fdout %d) and failing", + sink->comm.fdout); + peer_ret = GST_STATE_CHANGE_FAILURE; + } + } + + /* chain up to the parent class to change our state, if the peer succeeded */ + if (peer_ret != GST_STATE_CHANGE_FAILURE) { + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + + if (G_UNLIKELY (ret == GST_STATE_CHANGE_FAILURE && down)) { + GST_WARNING_OBJECT (sink, "Parent returned state change failure, " + "but ignoring because we are going down"); + ret = GST_STATE_CHANGE_SUCCESS; + } + } + + GST_DEBUG_OBJECT (sink, "For %s -> %s: Peer ret: %s, parent ret: %s", + gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)), + gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)), + gst_element_state_change_return_get_name (peer_ret), + gst_element_state_change_return_get_name (ret)); + + /* now interpret the return codes */ + if (async && peer_ret != GST_STATE_CHANGE_ASYNC) { + GST_DEBUG_OBJECT (sink, "Posting async-done for %s; peer wasn't ASYNC", + gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition))); + + GST_OBJECT_LOCK (sink); + sink->pass_next_async_done = FALSE; + GST_OBJECT_UNLOCK (sink); + + gst_element_post_message (GST_ELEMENT (sink), + gst_message_new_async_done (GST_OBJECT (sink), GST_CLOCK_TIME_NONE)); + } else if (G_UNLIKELY (!async && peer_ret == GST_STATE_CHANGE_ASYNC)) { + GST_WARNING_OBJECT (sink, "Transition not async but peer returned ASYNC"); + peer_ret = GST_STATE_CHANGE_SUCCESS; + } + + if (peer_ret == GST_STATE_CHANGE_FAILURE || ret == GST_STATE_CHANGE_FAILURE) { + if (peer_ret != GST_STATE_CHANGE_FAILURE && sink->comm.fdout >= 0) { + /* only the parent's ret was FAILURE - revert remote changes */ + GST_DEBUG_OBJECT (sink, "Reverting remote state change because parent " + "returned failure"); + gst_ipc_pipeline_comm_write_state_change_to_fd (&sink->comm, + GST_STATE_TRANSITION (GST_STATE_TRANSITION_NEXT (transition), + GST_STATE_TRANSITION_CURRENT (transition))); + } + return GST_STATE_CHANGE_FAILURE; + } + + /* the parent's (GstElement) state change func won't return ASYNC or + * NO_PREROLL, so unless it has returned FAILURE, which we have catched above, + * we are not interested in its return code... just return the peer's */ + return peer_ret; +} diff --git a/gst/ipcpipeline/gstipcpipelinesink.h b/gst/ipcpipeline/gstipcpipelinesink.h new file mode 100644 index 000000000..18c880eee --- /dev/null +++ b/gst/ipcpipeline/gstipcpipelinesink.h @@ -0,0 +1,67 @@ +/* GStreamer + * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu> + * 2000 Wim Taymans <wtay@chello.be> + * 2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk> + * + * gstipcpipelinesink.h: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + + +#ifndef __GST_IPC_PIPELINE_SINK_H__ +#define __GST_IPC_PIPELINE_SINK_H__ + +#include <gst/gst.h> +#include "gstipcpipelinecomm.h" + +G_BEGIN_DECLS + +#define GST_TYPE_IPC_PIPELINE_SINK \ + (gst_ipc_pipeline_sink_get_type()) +#define GST_IPC_PIPELINE_SINK(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_IPC_PIPELINE_SINK,GstIpcPipelineSink)) +#define GST_IPC_PIPELINE_SINK_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_IPC_PIPELINE_SINK,GstIpcPipelineSinkClass)) +#define GST_IS_IPC_PIPELINE_SINK(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_IPC_PIPELINE_SINK)) +#define GST_IS_IPC_PIPELINE_SINK_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_IPC_PIPELINE_SINK)) +#define GST_IPC_PIPELINE_SINK_CAST(obj) ((GstIpcPipelineSink *)obj) + +typedef struct _GstIpcPipelineSink GstIpcPipelineSink; +typedef struct _GstIpcPipelineSinkClass GstIpcPipelineSinkClass; + +struct _GstIpcPipelineSink { + GstElement element; + + GstIpcPipelineComm comm; + GThreadPool *threads; + gboolean pass_next_async_done; + GstPad *sinkpad; +}; + +struct _GstIpcPipelineSinkClass { + GstElementClass parent_class; + + void (*disconnect) (GstIpcPipelineSink * sink); +}; + +G_GNUC_INTERNAL GType gst_ipc_pipeline_sink_get_type (void); + +G_END_DECLS + +#endif /* __GST_IPC_PIPELINE_SINK_H__ */ diff --git a/gst/ipcpipeline/gstipcpipelinesrc.c b/gst/ipcpipeline/gstipcpipelinesrc.c new file mode 100644 index 000000000..656bd5a71 --- /dev/null +++ b/gst/ipcpipeline/gstipcpipelinesrc.c @@ -0,0 +1,954 @@ +/* GStreamer + * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu> + * 2000 Wim Taymans <wim@fluendo.com> + * 2006 Thomas Vander Stichele <thomas at apestaart dot org> + * 2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk> + * + * gstipcpipelinesrc.c: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ +/** + * SECTION:element-ipcpipelinesrc + * @see_also: #GstIpcPipelineSink, #GstIpcSlavePipeline + * + * Communicates with an ipcpipelinesink element in another process via a socket. + * + * The ipcpipelinesrc element allows 2-way communication with an ipcpipelinesink + * element on another process/pipeline. It is meant to run inside an + * interslavepipeline and when paired with an ipcpipelinesink, it will slave its + * whole parent pipeline to the "master" one, which contains the ipcpipelinesink. + * + * For more details about this mechanism and its uses, see the documentation + * of the ipcpipelinesink element. + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include <unistd.h> +#include <stdlib.h> +#include <string.h> + +#include "gstipcpipelinesrc.h" + +static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + +GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_src_debug); +#define GST_CAT_DEFAULT gst_ipc_pipeline_src_debug + +enum +{ + /* FILL ME */ + SIGNAL_FORWARD_MESSAGE, + SIGNAL_DISCONNECT, + LAST_SIGNAL +}; +static guint gst_ipc_pipeline_src_signals[LAST_SIGNAL] = { 0 }; + +enum +{ + PROP_0, + PROP_FDIN, + PROP_FDOUT, + PROP_READ_CHUNK_SIZE, + PROP_ACK_TIME, + PROP_LAST, +}; + +static GQuark QUARK_UPSTREAM; + +#define DEFAULT_READ_CHUNK_SIZE 65536 +#define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND) + +#define _do_init \ + GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_src_debug, "ipcpipelinesrc", 0, "ipcpipelinesrc element"); +#define gst_ipc_pipeline_src_parent_class parent_class +G_DEFINE_TYPE_WITH_CODE (GstIpcPipelineSrc, gst_ipc_pipeline_src, + GST_TYPE_ELEMENT, _do_init); + +static void gst_ipc_pipeline_src_finalize (GObject * object); +static void gst_ipc_pipeline_src_dispose (GObject * object); +static void gst_ipc_pipeline_src_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_ipc_pipeline_src_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); + +static void gst_ipc_pipeline_src_cancel_queued (GstIpcPipelineSrc * src); + +static gboolean gst_ipc_pipeline_src_start_reader_thread (GstIpcPipelineSrc * + src); +static void gst_ipc_pipeline_src_stop_reader_thread (GstIpcPipelineSrc * src); + +static gboolean gst_ipc_pipeline_src_activate_mode (GstPad * pad, + GstObject * parent, GstPadMode mode, gboolean active); +static gboolean gst_ipc_pipeline_src_srcpad_event (GstPad * pad, + GstObject * parent, GstEvent * event); +static gboolean gst_ipc_pipeline_src_srcpad_query (GstPad * pad, + GstObject * parent, GstQuery * query); +static void gst_ipc_pipeline_src_loop (GstIpcPipelineSrc * src); + +static gboolean gst_ipc_pipeline_src_send_event (GstElement * element, + GstEvent * event); +static gboolean gst_ipc_pipeline_src_query (GstElement * element, + GstQuery * query); +static GstStateChangeReturn gst_ipc_pipeline_src_change_state (GstElement * + element, GstStateChange transition); + +static gboolean gst_ipc_pipeline_src_forward_message (GstIpcPipelineSrc * src, + GstMessage * msg); +static void gst_ipc_pipeline_src_disconnect (GstIpcPipelineSrc * src); + +static void +gst_ipc_pipeline_src_class_init (GstIpcPipelineSrcClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); + + QUARK_UPSTREAM = g_quark_from_static_string ("ipcpipeline-upstream"); + + gobject_class->dispose = gst_ipc_pipeline_src_dispose; + gobject_class->finalize = gst_ipc_pipeline_src_finalize; + + gobject_class->set_property = gst_ipc_pipeline_src_set_property; + gobject_class->get_property = gst_ipc_pipeline_src_get_property; + + gstelement_class->send_event = + GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_send_event); + gstelement_class->query = GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_query); + gstelement_class->change_state = + GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_change_state); + + klass->forward_message = + GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_forward_message); + klass->disconnect = GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_disconnect); + + GST_DEBUG_REGISTER_FUNCPTR (gst_ipc_pipeline_src_activate_mode); + GST_DEBUG_REGISTER_FUNCPTR (gst_ipc_pipeline_src_srcpad_event); + GST_DEBUG_REGISTER_FUNCPTR (gst_ipc_pipeline_src_srcpad_query); + + g_object_class_install_property (gobject_class, PROP_FDIN, + g_param_spec_int ("fdin", "Input file descriptor", + "File descriptor to read data from", + -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_FDOUT, + g_param_spec_int ("fdout", "Output file descriptor", + "File descriptor to write data through", + -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_READ_CHUNK_SIZE, + g_param_spec_uint ("read-chunk-size", "Read chunk size", + "Read chunk size", + 1, 1 << 24, DEFAULT_READ_CHUNK_SIZE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_ACK_TIME, + g_param_spec_uint64 ("ack-time", "Ack time", + "Maximum time to wait for a response to a message", + 0, G_MAXUINT64, DEFAULT_ACK_TIME, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + gst_ipc_pipeline_src_signals[SIGNAL_FORWARD_MESSAGE] = + g_signal_new ("forward-message", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, + G_STRUCT_OFFSET (GstIpcPipelineSrcClass, forward_message), NULL, NULL, + g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 1, GST_TYPE_MESSAGE); + + gst_ipc_pipeline_src_signals[SIGNAL_DISCONNECT] = + g_signal_new ("disconnect", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, + G_STRUCT_OFFSET (GstIpcPipelineSrcClass, disconnect), + NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); + + gst_element_class_set_static_metadata (gstelement_class, + "Inter-process Pipeline Source", + "Source", + "Continues a split pipeline from another process", + "Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>"); + gst_element_class_add_pad_template (gstelement_class, + gst_static_pad_template_get (&srctemplate)); +} + +static void +gst_ipc_pipeline_src_init (GstIpcPipelineSrc * src) +{ + GST_OBJECT_FLAG_SET (src, GST_ELEMENT_FLAG_SOURCE); + + gst_ipc_pipeline_comm_init (&src->comm, GST_ELEMENT (src)); + src->comm.read_chunk_size = DEFAULT_READ_CHUNK_SIZE; + src->comm.ack_time = DEFAULT_ACK_TIME; + src->flushing = TRUE; + src->last_ret = GST_FLOW_FLUSHING; + src->queued = NULL; + g_cond_init (&src->create_cond); + + src->srcpad = gst_pad_new_from_static_template (&srctemplate, "src"); + gst_pad_set_activatemode_function (src->srcpad, + gst_ipc_pipeline_src_activate_mode); + gst_pad_set_event_function (src->srcpad, gst_ipc_pipeline_src_srcpad_event); + gst_pad_set_query_function (src->srcpad, gst_ipc_pipeline_src_srcpad_query); + gst_element_add_pad (GST_ELEMENT (src), src->srcpad); + + gst_ipc_pipeline_src_start_reader_thread (src); +} + +static void +gst_ipc_pipeline_src_dispose (GObject * object) +{ + GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object); + + gst_ipc_pipeline_src_stop_reader_thread (src); + gst_ipc_pipeline_src_cancel_queued (src); + gst_ipc_pipeline_comm_cancel (&src->comm, TRUE); + + G_OBJECT_CLASS (parent_class)->dispose (object); +} + +static void +gst_ipc_pipeline_src_finalize (GObject * object) +{ + GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object); + + gst_ipc_pipeline_comm_clear (&src->comm); + g_cond_clear (&src->create_cond); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static void +gst_ipc_pipeline_src_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object); + + switch (prop_id) { + case PROP_FDIN: + src->comm.fdin = g_value_get_int (value); + break; + case PROP_FDOUT: + src->comm.fdout = g_value_get_int (value); + break; + case PROP_READ_CHUNK_SIZE: + src->comm.read_chunk_size = g_value_get_uint (value); + break; + case PROP_ACK_TIME: + src->comm.ack_time = g_value_get_uint64 (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_ipc_pipeline_src_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object); + + g_return_if_fail (GST_IS_IPC_PIPELINE_SRC (object)); + + switch (prop_id) { + case PROP_FDIN: + g_value_set_int (value, src->comm.fdin); + break; + case PROP_FDOUT: + g_value_set_int (value, src->comm.fdout); + break; + case PROP_READ_CHUNK_SIZE: + g_value_set_uint (value, src->comm.read_chunk_size); + break; + case PROP_ACK_TIME: + g_value_set_uint64 (value, src->comm.ack_time); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_ipc_pipeline_src_log_queue (GstIpcPipelineSrc * src) +{ + GList *queued; + guint n; + + queued = src->queued; + n = 0; + GST_LOG_OBJECT (src, "There are %u objects in the queue", + g_list_length (queued)); + while (queued) { + void *object = queued->data; + if (GST_IS_EVENT (object)) { + GST_LOG_OBJECT (src, " #%u: %s event", n, GST_EVENT_TYPE_NAME (object)); + } else if (GST_IS_QUERY (object)) { + GST_LOG_OBJECT (src, " #%u: %s query", n, GST_QUERY_TYPE_NAME (object)); + } else if (GST_IS_BUFFER (object)) { + GST_LOG_OBJECT (src, " #%u: %zu bytes buffer", n, + (size_t) gst_buffer_get_size (object)); + } else { + GST_LOG_OBJECT (src, " #%u: unknown item in queue", n); + } + queued = queued->next; + ++n; + } +} + +static void +gst_ipc_pipeline_src_cancel_queued (GstIpcPipelineSrc * src) +{ + GList *queued; + guint32 id; + + g_mutex_lock (&src->comm.mutex); + queued = src->queued; + src->queued = NULL; + g_cond_broadcast (&src->create_cond); + g_mutex_unlock (&src->comm.mutex); + + while (queued) { + void *object = queued->data; + + id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (object), + QUARK_ID)); + + queued = g_list_delete_link (queued, queued); + if (GST_IS_EVENT (object)) { + GstEvent *event = GST_EVENT (object); + GST_DEBUG_OBJECT (src, "Cancelling queued event: %" GST_PTR_FORMAT, + event); + gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, FALSE); + gst_event_unref (event); + } else if (GST_IS_BUFFER (object)) { + GstBuffer *buffer = GST_BUFFER (object); + GST_DEBUG_OBJECT (src, "Cancelling queued buffer: %" GST_PTR_FORMAT, + buffer); + gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id, + GST_FLOW_FLUSHING); + gst_buffer_unref (buffer); + } else if (GST_IS_QUERY (object)) { + GstQuery *query = GST_QUERY (object); + GST_DEBUG_OBJECT (src, "Cancelling queued query: %" GST_PTR_FORMAT, + query); + gst_ipc_pipeline_comm_write_query_result_to_fd (&src->comm, id, FALSE, + query); + gst_query_unref (query); + } + } + +} + +static void +gst_ipc_pipeline_src_start_loop (GstIpcPipelineSrc * src) +{ + g_mutex_lock (&src->comm.mutex); + src->flushing = FALSE; + src->last_ret = GST_FLOW_OK; + g_mutex_unlock (&src->comm.mutex); + + gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_ipc_pipeline_src_loop, + src, NULL); +} + +static void +gst_ipc_pipeline_src_stop_loop (GstIpcPipelineSrc * src, gboolean stop) +{ + g_mutex_lock (&src->comm.mutex); + src->flushing = TRUE; + g_cond_broadcast (&src->create_cond); + g_mutex_unlock (&src->comm.mutex); + + if (stop) + gst_pad_stop_task (src->srcpad); +} + +static gboolean +gst_ipc_pipeline_src_activate_mode (GstPad * pad, GstObject * parent, + GstPadMode mode, gboolean active) +{ + GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (parent); + + switch (mode) { + case GST_PAD_MODE_PUSH: + GST_DEBUG_OBJECT (pad, "%s in push mode", active ? "activating" : + "deactivating"); + if (active) { + gst_ipc_pipeline_src_start_loop (src); + } else { + gst_ipc_pipeline_src_stop_loop (src, TRUE); + gst_ipc_pipeline_comm_cancel (&src->comm, FALSE); + } + return TRUE; + default: + GST_DEBUG_OBJECT (pad, "unsupported activation mode"); + return FALSE; + } +} + +static gboolean +gst_ipc_pipeline_src_srcpad_event (GstPad * pad, GstObject * parent, + GstEvent * event) +{ + GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (parent); + gboolean ret; + + GST_DEBUG_OBJECT (src, "Got upstream event %s", GST_EVENT_TYPE_NAME (event)); + + ret = gst_ipc_pipeline_comm_write_event_to_fd (&src->comm, TRUE, event); + gst_event_unref (event); + + GST_DEBUG_OBJECT (src, "Returning event result: %d", ret); + return ret; +} + +static gboolean +gst_ipc_pipeline_src_srcpad_query (GstPad * pad, GstObject * parent, + GstQuery * query) +{ + GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (parent); + gboolean ret; + + /* answer some queries that do not make sense to be forwarded */ + switch (GST_QUERY_TYPE (query)) { + case GST_QUERY_LATENCY: + return TRUE; + case GST_QUERY_CONTEXT: + return FALSE; + case GST_QUERY_CAPS: + { + /* caps queries occur even while linking the pipeline. + * It is possible that the ipcpipelinesink may not be connected at this + * point, so let's avoid a couple of errors... */ + GstState state; + GST_OBJECT_LOCK (src); + state = GST_STATE (src); + GST_OBJECT_UNLOCK (src); + if (state == GST_STATE_NULL) + return FALSE; + } + default: + break; + } + + GST_DEBUG_OBJECT (src, "Got upstream query %s: %" GST_PTR_FORMAT, + GST_QUERY_TYPE_NAME (query), query); + + ret = gst_ipc_pipeline_comm_write_query_to_fd (&src->comm, TRUE, query); + + GST_DEBUG_OBJECT (src, "Returning query result: %d, %" GST_PTR_FORMAT, + ret, query); + return ret; +} + +static void +gst_ipc_pipeline_src_loop (GstIpcPipelineSrc * src) +{ + gpointer object; + guint32 id; + gboolean ok; + GstFlowReturn ret = GST_FLOW_OK; + + g_mutex_lock (&src->comm.mutex); + + while (!src->queued && !src->flushing) + g_cond_wait (&src->create_cond, &src->comm.mutex); + + if (src->flushing) + goto out; + + object = src->queued->data; + src->queued = g_list_delete_link (src->queued, src->queued); + g_mutex_unlock (&src->comm.mutex); + + id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (object), + QUARK_ID)); + + if (GST_IS_BUFFER (object)) { + GstBuffer *buf = GST_BUFFER (object); + GST_DEBUG_OBJECT (src, "Pushing queued buffer: %" GST_PTR_FORMAT, buf); + ret = gst_pad_push (src->srcpad, buf); + GST_DEBUG_OBJECT (src, "pushed id %u, ret: %s", id, + gst_flow_get_name (ret)); + gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id, ret); + } else if (GST_IS_EVENT (object)) { + GstEvent *event = GST_EVENT (object); + GST_DEBUG_OBJECT (src, "Pushing queued event: %" GST_PTR_FORMAT, event); + ok = gst_pad_push_event (src->srcpad, event); + gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ok); + } else if (GST_IS_QUERY (object)) { + GstQuery *query = GST_QUERY (object); + GST_DEBUG_OBJECT (src, "Pushing queued query: %" GST_PTR_FORMAT, query); + ok = gst_pad_peer_query (src->srcpad, query); + gst_ipc_pipeline_comm_write_query_result_to_fd (&src->comm, id, ok, query); + gst_query_unref (query); + } else { + GST_WARNING_OBJECT (src, "Unknown data type queued"); + } + + g_mutex_lock (&src->comm.mutex); + if (!src->queued) + g_cond_broadcast (&src->create_cond); +out: + if (src->flushing) + ret = GST_FLOW_FLUSHING; + if (ret != GST_FLOW_OK) + src->last_ret = ret; + g_mutex_unlock (&src->comm.mutex); + + if (ret == GST_FLOW_FLUSHING) { + gst_ipc_pipeline_src_cancel_queued (src); + gst_pad_pause_task (src->srcpad); + } +} + +static gboolean +gst_ipc_pipeline_src_send_event (GstElement * element, GstEvent * event) +{ + GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element); + return gst_pad_push_event (src->srcpad, event); +} + +static gboolean +gst_ipc_pipeline_src_query (GstElement * element, GstQuery * query) +{ + GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element); + return gst_pad_query (src->srcpad, query); +} + +static GstElement * +find_pipeline (GstElement * element) +{ + GstElement *pipeline = element; + while (GST_ELEMENT_PARENT (pipeline)) { + pipeline = GST_ELEMENT_PARENT (pipeline); + if (GST_IS_PIPELINE (pipeline)) + break; + } + if (!pipeline || !GST_IS_PIPELINE (pipeline)) { + pipeline = NULL; + } + return pipeline; +} + +static gboolean +gst_ipc_pipeline_src_forward_message (GstIpcPipelineSrc * src, GstMessage * msg) +{ + gboolean skip = FALSE; + + GST_DEBUG_OBJECT (src, "Message to forward: %" GST_PTR_FORMAT, msg); + + switch (GST_MESSAGE_TYPE (msg)) { + case GST_MESSAGE_STATE_CHANGED: + { + GstState old, new, pending; + GstElement *pipeline = find_pipeline (GST_ELEMENT (src)); + + gst_message_parse_state_changed (msg, &old, &new, &pending); + + if (GST_MESSAGE_SRC (msg) == GST_OBJECT (pipeline) && + old == new && new == pending) { + GST_DEBUG_OBJECT (src, "Detected lost state, notifying master"); + gst_ipc_pipeline_comm_write_state_lost_to_fd (&src->comm); + } + /* fall through & skip */ + } + case GST_MESSAGE_ASYNC_START: + case GST_MESSAGE_CLOCK_PROVIDE: + case GST_MESSAGE_CLOCK_LOST: + case GST_MESSAGE_NEW_CLOCK: + case GST_MESSAGE_STREAM_STATUS: + case GST_MESSAGE_NEED_CONTEXT: + case GST_MESSAGE_HAVE_CONTEXT: + case GST_MESSAGE_STRUCTURE_CHANGE: + skip = TRUE; + break; + case GST_MESSAGE_RESET_TIME: + { + GQuark ipcpipelinesrc_posted = g_quark_from_static_string + ("gstinterslavepipeline-message-already-posted"); + + skip = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (msg), + ipcpipelinesrc_posted)); + if (!skip) { + gst_mini_object_set_qdata (GST_MINI_OBJECT (msg), ipcpipelinesrc_posted, + GUINT_TO_POINTER (1), NULL); + } + break; + } + case GST_MESSAGE_ERROR: + { + GError *error = NULL; + + /* skip forwarding a RESOURCE/WRITE error message that originated from + * ipcpipelinesrc; we post this error when writing to the comm fd fails, + * so if we try to forward it here, we will likely get another one posted + * immediately and end up doing an endless loop */ + gst_message_parse_error (msg, &error, NULL); + skip = (GST_MESSAGE_SRC (msg) == GST_OBJECT_CAST (src) + && error->domain == gst_resource_error_quark () + && error->code == GST_RESOURCE_ERROR_WRITE); + g_error_free (error); + break; + } + default: + break; + } + + if (skip) { + GST_DEBUG_OBJECT (src, "message will not be forwarded"); + return TRUE; + } + + return gst_ipc_pipeline_comm_write_message_to_fd (&src->comm, msg); +} + +static void +on_buffer (guint32 id, GstBuffer * buffer, gpointer user_data) +{ + GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data); + GST_DEBUG_OBJECT (src, "Got buffer id %u, queueing: %" GST_PTR_FORMAT, id, + buffer); + g_mutex_lock (&src->comm.mutex); + if (!GST_PAD_IS_ACTIVE (src->srcpad) || src->flushing) { + g_mutex_unlock (&src->comm.mutex); + GST_INFO_OBJECT (src, "We're not started or flushing, buffer ignored"); + gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id, + GST_FLOW_FLUSHING); + gst_buffer_unref (buffer); + return; + } + if (src->last_ret != GST_FLOW_OK) { + GstFlowReturn last_ret = src->last_ret; + g_mutex_unlock (&src->comm.mutex); + GST_DEBUG_OBJECT (src, "Last flow was %s, rejecting buffer", + gst_flow_get_name (last_ret)); + gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id, last_ret); + gst_buffer_unref (buffer); + return; + } + src->queued = g_list_append (src->queued, buffer); /* keep the ref */ + gst_ipc_pipeline_src_log_queue (src); + g_cond_broadcast (&src->create_cond); + g_mutex_unlock (&src->comm.mutex); +} + +static void +do_oob_event (GstElement * element, gpointer user_data) +{ + GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element); + GstEvent *event = user_data; + gboolean ret, upstream; + guint32 id; + + id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT + (event), QUARK_ID)); + upstream = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT + (event), QUARK_UPSTREAM)); + + if (upstream) { + GstElement *pipeline; + gboolean ok = FALSE; + + if (!(pipeline = find_pipeline (element))) { + GST_ERROR_OBJECT (src, "No pipeline found"); + gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ok); + } else { + GST_DEBUG_OBJECT (src, "Posting upstream event on pipeline: %" + GST_PTR_FORMAT, event); + ok = gst_element_send_event (pipeline, gst_event_ref (event)); + gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ok); + } + } else { + GST_DEBUG_OBJECT (src, "Pushing event async: %" GST_PTR_FORMAT, event); + ret = gst_element_send_event (element, gst_event_ref (event)); + GST_DEBUG_OBJECT (src, "Event pushed, return %d", ret); + gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ret); + } +} + +static void +on_event (guint32 id, GstEvent * event, gboolean upstream, gpointer user_data) +{ + GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data); + GstFlowReturn last_ret = GST_FLOW_OK; + + GST_DEBUG_OBJECT (src, "Got event id %u, queueing: %" GST_PTR_FORMAT, id, + event); + + gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_UPSTREAM, + GINT_TO_POINTER (upstream), NULL); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH_START: + gst_ipc_pipeline_src_stop_loop (src, FALSE); + break; + case GST_EVENT_FLUSH_STOP: + gst_ipc_pipeline_src_start_loop (src); + break; + default: + g_mutex_lock (&src->comm.mutex); + last_ret = src->last_ret; + g_mutex_unlock (&src->comm.mutex); + break; + } + + if (GST_EVENT_IS_SERIALIZED (event) && !upstream) { + if (last_ret != GST_FLOW_OK) { + GST_DEBUG_OBJECT (src, "Last flow was %s, rejecting event", + gst_flow_get_name (last_ret)); + gst_event_unref (event); + gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, FALSE); + } else { + GST_DEBUG_OBJECT (src, "This is a serialized event, adding to queue %p", + src->queued); + g_mutex_lock (&src->comm.mutex); + src->queued = g_list_append (src->queued, event); /* keep the ref */ + gst_ipc_pipeline_src_log_queue (src); + g_cond_broadcast (&src->create_cond); + g_mutex_unlock (&src->comm.mutex); + } + } else { + if (last_ret != GST_FLOW_OK && !upstream) { + GST_DEBUG_OBJECT (src, "Last flow was %s, rejecting event", + gst_flow_get_name (last_ret)); + gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, FALSE); + gst_event_unref (event); + } else { + GST_DEBUG_OBJECT (src, + "This is not a serialized event, pushing in a thread"); + gst_element_call_async (GST_ELEMENT (src), do_oob_event, event, + (GDestroyNotify) gst_event_unref); + } + } +} + +static void +do_oob_query (GstElement * element, gpointer user_data) +{ + GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element); + GstQuery *query = GST_QUERY (user_data); + guint32 id; + gboolean upstream; + gboolean ret; + + id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT + (query), QUARK_ID)); + upstream = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT + (query), QUARK_UPSTREAM)); + + if (upstream) { + GstElement *pipeline; + + if (!(pipeline = find_pipeline (element))) { + GST_ERROR_OBJECT (src, "No pipeline found"); + ret = FALSE; + } else { + GST_DEBUG_OBJECT (src, "Posting query on pipeline: %" GST_PTR_FORMAT, + query); + ret = gst_element_query (pipeline, query); + } + } else { + GST_DEBUG_OBJECT (src, "Pushing query async: %" GST_PTR_FORMAT, query); + ret = gst_pad_peer_query (src->srcpad, query); + GST_DEBUG_OBJECT (src, "Query pushed, return %d", ret); + } + gst_ipc_pipeline_comm_write_query_result_to_fd (&src->comm, id, ret, query); +} + +static void +on_query (guint32 id, GstQuery * query, gboolean upstream, gpointer user_data) +{ + GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data); + + GST_DEBUG_OBJECT (src, "Got query id %u, queueing: %" GST_PTR_FORMAT, id, + query); + + if (GST_QUERY_IS_SERIALIZED (query) && !upstream) { + g_mutex_lock (&src->comm.mutex); + src->queued = g_list_append (src->queued, query); /* keep the ref */ + gst_ipc_pipeline_src_log_queue (src); + g_cond_broadcast (&src->create_cond); + g_mutex_unlock (&src->comm.mutex); + } else { + gst_mini_object_set_qdata (GST_MINI_OBJECT (query), QUARK_UPSTREAM, + GINT_TO_POINTER (upstream), NULL); + gst_element_call_async (GST_ELEMENT (src), do_oob_query, query, + (GDestroyNotify) gst_query_unref); + } +} + +struct StateChangeData +{ + guint32 id; + GstStateChange transition; +}; + +static void +do_state_change (GstElement * element, gpointer data) +{ + GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element); + GstElement *pipeline; + GstStateChangeReturn ret = GST_STATE_CHANGE_FAILURE; + GstState state, pending, effective; + struct StateChangeData *d = data; + guint32 id = d->id; + GstStateChange transition = d->transition; + gboolean down; + + GST_DEBUG_OBJECT (src, "Doing state change id %u, %s -> %s", id, + gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)), + gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition))); + + if (!(pipeline = find_pipeline (element))) { + GST_ERROR_OBJECT (src, "No pipeline found"); + ret = GST_STATE_CHANGE_FAILURE; + goto done_nolock; + } + + down = (GST_STATE_TRANSITION_CURRENT (transition) >= + GST_STATE_TRANSITION_NEXT (transition)); + + GST_STATE_LOCK (pipeline); + ret = gst_element_get_state (pipeline, &state, &pending, 0); + + /* if we are pending a state change, count the pending state as + * the current one */ + effective = pending == GST_STATE_VOID_PENDING ? state : pending; + + GST_DEBUG_OBJECT (src, "Current element state: ret:%s state:%s pending:%s " + "effective:%s", gst_element_state_change_return_get_name (ret), + gst_element_state_get_name (state), + gst_element_state_get_name (pending), + gst_element_state_get_name (effective)); + + if ((GST_STATE_TRANSITION_NEXT (transition) <= effective && !down) || + (GST_STATE_TRANSITION_NEXT (transition) > effective && down)) { + /* if the request was to transition to a state that we have already + * transitioned to in the same direction, then we just silently return */ + GST_DEBUG_OBJECT (src, "State transition to %s is unnecessary", + gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition))); + /* make sure we return SUCCESS if the transition is to NULL or READY, + * even if our current ret is ASYNC for example; also, make sure not + * to return FAILURE, since our state is already committed */ + if (GST_STATE_TRANSITION_NEXT (transition) <= GST_STATE_READY || + ret == GST_STATE_CHANGE_FAILURE) { + ret = GST_STATE_CHANGE_SUCCESS; + } + } else if (ret != GST_STATE_CHANGE_FAILURE || down) { + /* if the request was to transition to a state that we haven't already + * transitioned to in the same direction, then we need to request a state + * change in the pipeline, *unless* we are going upwards and the last ret + * was FAILURE, in which case we should just return FAILURE and stop. + * We don't stop a downwards state change though in case of FAILURE, since + * we need to be able to bring the pipeline down to NULL. Note that + * GST_MESSAGE_ERROR will cause ret to be GST_STATE_CHANGE_FAILURE */ + ret = gst_element_set_state (pipeline, + GST_STATE_TRANSITION_NEXT (transition)); + GST_DEBUG_OBJECT (src, "gst_element_set_state returned %s", + gst_element_state_change_return_get_name (ret)); + } + + GST_STATE_UNLOCK (pipeline); + +done_nolock: + GST_DEBUG_OBJECT (src, "sending state change ack, ret = %s", + gst_element_state_change_return_get_name (ret)); + gst_ipc_pipeline_comm_write_state_change_ack_to_fd (&src->comm, id, ret); +} + +static void +on_state_change (guint32 id, GstStateChange transition, gpointer user_data) +{ + struct StateChangeData *d; + GstElement *ipcpipelinesrc = GST_ELEMENT (user_data); + + GST_DEBUG_OBJECT (ipcpipelinesrc, "Got state change id %u, %s -> %s", id, + gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)), + gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition))); + + d = g_new (struct StateChangeData, 1); + d->id = id; + d->transition = transition; + + gst_element_call_async (ipcpipelinesrc, do_state_change, d, g_free); +} + +static void +on_message (guint32 id, GstMessage * message, gpointer user_data) +{ + GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data); + + GST_ERROR_OBJECT (src, "Got message id %u, not supposed to: %" GST_PTR_FORMAT, + id, message); + gst_message_unref (message); +} + +static gboolean +gst_ipc_pipeline_src_start_reader_thread (GstIpcPipelineSrc * src) +{ + if (!gst_ipc_pipeline_comm_start_reader_thread (&src->comm, on_buffer, + on_event, on_query, on_state_change, NULL, on_message, src)) { + GST_ERROR_OBJECT (src, "Failed to start reader thread"); + return FALSE; + } + return TRUE; +} + +static void +gst_ipc_pipeline_src_stop_reader_thread (GstIpcPipelineSrc * src) +{ + gst_ipc_pipeline_comm_stop_reader_thread (&src->comm); +} + +static void +gst_ipc_pipeline_src_disconnect (GstIpcPipelineSrc * src) +{ + GST_DEBUG_OBJECT (src, "Disconnecting"); + gst_ipc_pipeline_src_stop_reader_thread (src); + src->comm.fdin = -1; + src->comm.fdout = -1; + gst_ipc_pipeline_comm_cancel (&src->comm, FALSE); + gst_ipc_pipeline_src_start_reader_thread (src); +} + +static GstStateChangeReturn +gst_ipc_pipeline_src_change_state (GstElement * element, + GstStateChange transition) +{ + GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element); + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY: + if (src->comm.fdin < 0) { + GST_ERROR_OBJECT (element, "Invalid fdin: %d", src->comm.fdin); + return GST_STATE_CHANGE_FAILURE; + } + if (src->comm.fdout < 0) { + GST_ERROR_OBJECT (element, "Invalid fdout: %d", src->comm.fdout); + return GST_STATE_CHANGE_FAILURE; + } + if (!src->comm.reader_thread) { + GST_ERROR_OBJECT (element, "Failed to start reader thread"); + return GST_STATE_CHANGE_FAILURE; + } + break; + default: + break; + } + return GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); +} diff --git a/gst/ipcpipeline/gstipcpipelinesrc.h b/gst/ipcpipeline/gstipcpipelinesrc.h new file mode 100644 index 000000000..221fccdd1 --- /dev/null +++ b/gst/ipcpipeline/gstipcpipelinesrc.h @@ -0,0 +1,76 @@ +/* GStreamer + * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu> + * 2000 Wim Taymans <wtay@chello.be> + * 2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk> + * + * gstipcpipelinesrc.h: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + + +#ifndef __GST_IPC_PIPELINE_SRC_H__ +#define __GST_IPC_PIPELINE_SRC_H__ + +#include <gst/gst.h> +#include "gstipcpipelinecomm.h" + +G_BEGIN_DECLS + +#define GST_TYPE_IPC_PIPELINE_SRC \ + (gst_ipc_pipeline_src_get_type()) +#define GST_IPC_PIPELINE_SRC(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_IPC_PIPELINE_SRC,GstIpcPipelineSrc)) +#define GST_IPC_PIPELINE_SRC_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_IPC_PIPELINE_SRC,GstIpcPipelineSrcClass)) +#define GST_IS_IPC_PIPELINE_SRC(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_IPC_PIPELINE_SRC)) +#define GST_IS_IPC_PIPELINE_SRC_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_IPC_PIPELINE_SRC)) + +typedef struct _GstIpcPipelineSrc GstIpcPipelineSrc; +typedef struct _GstIpcPipelineSrcClass GstIpcPipelineSrcClass; + +/** + * GstIpcPipelineSrc: + * + * Opaque #GstIpcPipelineSrc data structure. + */ +struct _GstIpcPipelineSrc { + GstElement element; + + GstIpcPipelineComm comm; + GstPad *srcpad; + + gboolean flushing; + GList *queued; + GstFlowReturn last_ret; + + GCond create_cond; +}; + +struct _GstIpcPipelineSrcClass { + GstElementClass parent_class; + + gboolean (*forward_message) (GstIpcPipelineSrc *, GstMessage *); + void (*disconnect) (GstIpcPipelineSrc * src); +}; + +G_GNUC_INTERNAL GType gst_ipc_pipeline_src_get_type (void); + +G_END_DECLS + +#endif /* __GST_IPC_PIPELINE_SRC_H__ */ diff --git a/gst/ipcpipeline/gstipcslavepipeline.c b/gst/ipcpipeline/gstipcslavepipeline.c new file mode 100644 index 000000000..2c9dd1516 --- /dev/null +++ b/gst/ipcpipeline/gstipcslavepipeline.c @@ -0,0 +1,122 @@ +/* GStreamer + * Copyright (C) 2015-2017 YouView TV Ltd + * Author: Vincent Penquerc'h <vincent.penquerch@collabora.co.uk> + * + * gstipcslavepipeline.c: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ +/** + * SECTION:element-ipcslavepipeline + * @see_also: #GstIpcPipelineSink, #GstIpcPipelineSrc + * + * This is a GstPipeline subclass meant to embed one ore more ipcpipelinesrc + * elements, and be slaved transparently to the master pipeline, using one ore + * more ipcpipelinesink elements on the master. + * + * The actual pipeline slaving logic happens in ipcpipelinesrc. The only thing + * that this class actually does is that it watches the pipeline bus for + * messages and forwards them to the master pipeline through the ipcpipelinesrc + * elements that it contains. + * + * For more details about this mechanism and its uses, see the documentation + * of the ipcpipelinesink element. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <string.h> + +#include "gstipcpipelinesrc.h" +#include "gstipcslavepipeline.h" + +GST_DEBUG_CATEGORY_STATIC (gst_ipcslavepipeline_debug); +#define GST_CAT_DEFAULT gst_ipcslavepipeline_debug + +#define _do_init \ + GST_DEBUG_CATEGORY_INIT (gst_ipcslavepipeline_debug, "ipcslavepipeline", 0, "ipcslavepipeline element"); +#define gst_ipc_slave_pipeline_parent_class parent_class +G_DEFINE_TYPE_WITH_CODE (GstIpcSlavePipeline, gst_ipc_slave_pipeline, + GST_TYPE_PIPELINE, _do_init); + +static gboolean gst_ipc_slave_pipeline_post_message (GstElement * element, + GstMessage * message); + +static void +gst_ipc_slave_pipeline_class_init (GstIpcSlavePipelineClass * klass) +{ + GstElementClass *element_class; + + element_class = GST_ELEMENT_CLASS (klass); + + element_class->post_message = gst_ipc_slave_pipeline_post_message; + + gst_element_class_set_static_metadata (element_class, + "Inter-process slave pipeline", + "Generic/Bin/Slave", + "Contains the slave part of an inter-process pipeline", + "Vincent Penquerc'h <vincent.penquerch@collabora.co.uk"); +} + +static void +gst_ipc_slave_pipeline_init (GstIpcSlavePipeline * isp) +{ +} + +static gboolean +send_message_if_ipcpipelinesrc (const GValue * v, GValue * r, + gpointer user_data) +{ + GstElement *e; + GType et; + gboolean ret; + GstMessage *message = user_data; + + e = g_value_get_object (v); + et = gst_element_factory_get_element_type (gst_element_get_factory (e)); + if (et == GST_TYPE_IPC_PIPELINE_SRC) { + g_signal_emit_by_name (G_OBJECT (e), "forward-message", message, &ret); + + /* if we succesfully sent this to the master and it's not ASYNC_DONE or EOS, + * we can skip sending it again through the other ipcpipelinesrcs */ + if (ret && GST_MESSAGE_TYPE (message) != GST_MESSAGE_ASYNC_DONE && + GST_MESSAGE_TYPE (message) != GST_MESSAGE_EOS) + return FALSE; + } + return TRUE; +} + +static void +gst_ipc_slave_pipeline_forward_message (GstIpcSlavePipeline * pipeline, + GstMessage * message) +{ + GstIterator *it; + + it = gst_bin_iterate_sources (GST_BIN (pipeline)); + gst_iterator_fold (it, send_message_if_ipcpipelinesrc, NULL, message); + gst_iterator_free (it); +} + +static gboolean +gst_ipc_slave_pipeline_post_message (GstElement * element, GstMessage * message) +{ + gst_ipc_slave_pipeline_forward_message (GST_IPC_SLAVE_PIPELINE + (element), message); + + return GST_ELEMENT_CLASS (parent_class)->post_message (element, message); +} diff --git a/gst/ipcpipeline/gstipcslavepipeline.h b/gst/ipcpipeline/gstipcslavepipeline.h new file mode 100644 index 000000000..354dfacf0 --- /dev/null +++ b/gst/ipcpipeline/gstipcslavepipeline.h @@ -0,0 +1,59 @@ +/* GStreamer + * Copyright (C) 2015-2017 YouView TV Ltd + * Author: Vincent Penquerc'h <vincent.penquerch@collabora.co.uk> + * + * gstipcslavepipeline.h: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ +#ifndef _GST_IPC_SLAVE_PIPELINE_H_ +#define _GST_IPC_SLAVE_PIPELINE_H_ + +#include <gst/gst.h> + +G_BEGIN_DECLS + +#define GST_TYPE_IPC_SLAVE_PIPELINE \ + (gst_ipc_slave_pipeline_get_type()) +#define GST_IPC_SLAVE_PIPELINE(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_IPC_SLAVE_PIPELINE,GstIpcSlavePipeline)) +#define GST_IPC_SLAVE_PIPELINE_CAST(obj) \ + ((GstIpcSlavePipeline *) obj) +#define GST_IPC_SLAVE_PIPELINE_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_IPC_SLAVE_PIPELINE,GstIpcSlavePipelineClass)) +#define GST_IS_IPC_SLAVE_PIPELINE(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_IPC_SLAVE_PIPELINE)) +#define GST_IS_IPC_SLAVE_PIPELINE_CLASS(obj) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_IPC_SLAVE_PIPELINE)) + +typedef struct _GstIpcSlavePipeline GstIpcSlavePipeline; +typedef struct _GstIpcSlavePipelineClass GstIpcSlavePipelineClass; + +struct _GstIpcSlavePipeline +{ + GstPipeline pipeline; +}; + +struct _GstIpcSlavePipelineClass +{ + GstPipelineClass pipeline_class; +}; + +G_GNUC_INTERNAL GType gst_ipc_slave_pipeline_get_type (void); + +G_END_DECLS + +#endif diff --git a/gst/ipcpipeline/protocol.txt b/gst/ipcpipeline/protocol.txt new file mode 100644 index 000000000..77f6e568e --- /dev/null +++ b/gst/ipcpipeline/protocol.txt @@ -0,0 +1,92 @@ +This documents the protocol used to pass data over fds between ipcpipelinesrc +and ipcpipelinesink. + +The protocol is used in both directions. However, some combinations do +not make sense (eg, a buffer going from ipcpipelinesrc to ipcpipelinesink). + +The protocol consists of an arbitrary number of variable sized chunks +with a type. Each chunk has a request ID which can be used to match a +request with its reply (ack / query result). + +Each chunk consists of: + - a type (byte): + 1: ack + 2: query result + 3: buffer + 4: event + 5: sink message event + 6: query + 7: state change + 8: state lost + 9: message + 10: error/warning/info message + - a request ID, 4 bytes, little endian + - the payload size, 4 bytes, little endian + - N bytes payload + +Depending on the type, the payload can contain: + + - 1: ack + result: 4 bytes, little endian + interpreted as GstFlowReturn for buffers, boolean for events and + GstStateChangeReturn for state changes + - 2: query result + result boolean: 1 byte + query type: 4 bytes, little endian + returned query string representation, NUL terminated + - 3: buffer: + pts: 8 bytes, little endian + dts: 8 bytes, little endian + duration: 8 bytes, little endian + offset: 8 bytes, little endian + offset end: 8 bytes, little endian + flags: 8 bytes, little endian + buffer size: 4 bytes, little endian + data: contents of the buffer data, size specified in "buffer size" + number of GstMeta: 4 bytes, little endian + For each GstMeta: + bytes: 4 bytes, little endian + this is the number of bytes before the string representation + at the end of this block, including the 4 bytes of itself + flags: 4 bytes, little endian + length of the GstMetaInfo::api name: 4 bytes, little endian + GstMetaInfo::api name: string, NUL terminated + GstMetaInfo::size: 8 bytes, little endian + length of the string representation: 4 bytes, little endian + string representation, NUL terminated + - 4: event + event type: 4 bytes, little endian + sequence number: 4 bytes, little endian + direction: 1 byte + whether the event is going upstream (1) or downstream (0) + string representation, NUL terminated + - 5: sink message event + message type: 4 bytes, little endian + event sequence number: 4 bytes, little endian + message sequence number: 4 bytes, little endian + length: 4 bytes, little endian + event structure name: length bytes, NUL terminated + message structure string representation: remaining bytes, NUL terminated + - 6: query + query type: 4 bytes, little endian + direction: 1 byte + whether the query is going upstream (1) or downstream (0) + string representation, NUL terminated + - 7: state change + GstStateChange: 4 bytes, little endian + - 8: state lost + no payload + - 9: message + message type: 4 bytes, little endian + string representation, NUL terminated + - 10: error/warning/info message + message type (2 = error, 1 = warning, 0 = info): 1 byte + error domain string length: 4 bytes, little endian + string representation of the error domain, NUL terminated + error code: 4 bytes, little endian + length: 4 bytes, little endian + if zero: no error message + if non zero: As many bytes as this length: the error message, NUL terminated + length: 4 bytes, little endian + if zero: no extra message + if non zero: As many bytes as this length: the error extra debug message, NUL terminated |