diff options
author | George Kiagiadakis <george.kiagiadakis@collabora.com> | 2017-08-01 17:07:59 +0300 |
---|---|---|
committer | George Kiagiadakis <george.kiagiadakis@collabora.com> | 2017-08-02 10:40:24 +0300 |
commit | 30f5abc32c7c8d8517f5089a54756efa4ab63428 (patch) | |
tree | f43544b9c7d88cebb82ce24e8e3447d1eba7d47a /gst | |
parent | a2053380821bbadbd44768cd231b4b68fa94d0a0 (diff) | |
download | gstreamer-plugins-bad-30f5abc32c7c8d8517f5089a54756efa4ab63428.tar.gz |
ipcpipeline: move to sys/ and make it dependent on platform support for unix sockets
Diffstat (limited to 'gst')
-rw-r--r-- | gst/ipcpipeline/Makefile.am | 28 | ||||
-rw-r--r-- | gst/ipcpipeline/gstipcpipeline.c | 48 | ||||
-rw-r--r-- | gst/ipcpipeline/gstipcpipelinecomm.c | 2341 | ||||
-rw-r--r-- | gst/ipcpipeline/gstipcpipelinecomm.h | 132 | ||||
-rw-r--r-- | gst/ipcpipeline/gstipcpipelinesink.c | 722 | ||||
-rw-r--r-- | gst/ipcpipeline/gstipcpipelinesink.h | 67 | ||||
-rw-r--r-- | gst/ipcpipeline/gstipcpipelinesrc.c | 954 | ||||
-rw-r--r-- | gst/ipcpipeline/gstipcpipelinesrc.h | 76 | ||||
-rw-r--r-- | gst/ipcpipeline/gstipcslavepipeline.c | 122 | ||||
-rw-r--r-- | gst/ipcpipeline/gstipcslavepipeline.h | 59 | ||||
-rw-r--r-- | gst/ipcpipeline/meson.build | 16 | ||||
-rw-r--r-- | gst/ipcpipeline/protocol.txt | 92 | ||||
-rw-r--r-- | gst/meson.build | 1 |
13 files changed, 0 insertions, 4658 deletions
diff --git a/gst/ipcpipeline/Makefile.am b/gst/ipcpipeline/Makefile.am deleted file mode 100644 index 12bbed14b..000000000 --- a/gst/ipcpipeline/Makefile.am +++ /dev/null @@ -1,28 +0,0 @@ -plugin_LTLIBRARIES = libgstipcpipeline.la - -libgstipcpipeline_la_SOURCES = \ - gstipcpipeline.c \ - gstipcpipelinecomm.c \ - gstipcpipelinesink.c \ - gstipcpipelinesrc.c \ - gstipcslavepipeline.c - -noinst_HEADERS = \ - gstipcpipelinecomm.h \ - gstipcpipelinesink.h \ - gstipcpipelinesrc.h \ - gstipcslavepipeline.h - -libgstipcpipeline_la_CFLAGS = \ - $(GST_PLUGINS_BAD_CFLAGS) \ - $(GST_PLUGINS_BASE_CFLAGS) \ - $(GST_BASE_CFLAGS) \ - $(GST_CFLAGS) - -libgstipcpipeline_la_LIBADD = \ - $(GST_PLUGINS_BASE_LIBS) \ - $(GST_BASE_LIBS) \ - $(GST_LIBS) \ - $(LIBM) - -libgstipcpipeline_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) diff --git a/gst/ipcpipeline/gstipcpipeline.c b/gst/ipcpipeline/gstipcpipeline.c deleted file mode 100644 index 4d647d49d..000000000 --- a/gst/ipcpipeline/gstipcpipeline.c +++ /dev/null @@ -1,48 +0,0 @@ -/* GStreamer - * Copyright (C) 2017 YouView TV Ltd - * Author: George Kiagiadakis <george.Kiagiadakis@collabora.com> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 51 Franklin Street, Suite 500, - * Boston, MA 02110-1335, USA. - */ - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif - -#include "gstipcpipelinecomm.h" -#include "gstipcpipelinesink.h" -#include "gstipcpipelinesrc.h" -#include "gstipcslavepipeline.h" - -static gboolean -plugin_init (GstPlugin * plugin) -{ - gst_ipc_pipeline_comm_plugin_init (); - gst_element_register (plugin, "ipcpipelinesrc", GST_RANK_NONE, - GST_TYPE_IPC_PIPELINE_SRC); - gst_element_register (plugin, "ipcpipelinesink", GST_RANK_NONE, - GST_TYPE_IPC_PIPELINE_SINK); - gst_element_register (plugin, "ipcslavepipeline", GST_RANK_NONE, - GST_TYPE_IPC_SLAVE_PIPELINE); - - return TRUE; -} - -GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, - GST_VERSION_MINOR, - ipcpipeline, - "plugin for inter-process pipeline communication", - plugin_init, VERSION, "LGPL", PACKAGE_NAME, GST_PACKAGE_ORIGIN) diff --git a/gst/ipcpipeline/gstipcpipelinecomm.c b/gst/ipcpipeline/gstipcpipelinecomm.c deleted file mode 100644 index eee8e528f..000000000 --- a/gst/ipcpipeline/gstipcpipelinecomm.c +++ /dev/null @@ -1,2341 +0,0 @@ -/* GStreamer - * Copyright (C) 2015-2017 YouView TV Ltd - * Author: Vincent Penquerch <vincent.penquerch@collabora.co.uk> - * - * gstipcpipelinecomm.c: - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, - * Boston, MA 02110-1301, USA. - */ - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#include <unistd.h> -#include <errno.h> -#include <string.h> -#include <stdlib.h> -#include <gst/base/gstbytewriter.h> -#include <gst/gstprotection.h> -#include "gstipcpipelinecomm.h" - -GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_comm_debug); -#define GST_CAT_DEFAULT gst_ipc_pipeline_comm_debug - -#define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND) - -GQuark QUARK_ID; - -typedef enum -{ - ACK_TYPE_NONE, - ACK_TYPE_TIMED, - ACK_TYPE_BLOCKING -} AckType; - -typedef enum -{ - COMM_REQUEST_TYPE_BUFFER, - COMM_REQUEST_TYPE_EVENT, - COMM_REQUEST_TYPE_QUERY, - COMM_REQUEST_TYPE_STATE_CHANGE, - COMM_REQUEST_TYPE_MESSAGE, -} CommRequestType; - -typedef struct -{ - guint32 id; - gboolean replied; - gboolean comm_error; - guint32 ret; - GstQuery *query; - CommRequestType type; - GCond cond; -} CommRequest; - -static const gchar *comm_request_ret_get_name (CommRequestType type, - guint32 ret); -static guint32 comm_request_ret_get_failure_value (CommRequestType type); - -static CommRequest * -comm_request_new (guint32 id, CommRequestType type, GstQuery * query) -{ - CommRequest *req; - - req = g_malloc (sizeof (CommRequest)); - req->id = id; - g_cond_init (&req->cond); - req->replied = FALSE; - req->comm_error = FALSE; - req->query = query; - req->ret = comm_request_ret_get_failure_value (type); - req->type = type; - - return req; -} - -static guint32 -comm_request_wait (GstIpcPipelineComm * comm, CommRequest * req, - AckType ack_type) -{ - guint32 ret = comm_request_ret_get_failure_value (req->type); - guint64 end_time; - - if (ack_type == ACK_TYPE_TIMED) - end_time = g_get_monotonic_time () + comm->ack_time; - else - end_time = G_MAXUINT64; - - GST_TRACE_OBJECT (comm->element, "Waiting for ACK/NAK for request %u", - req->id); - while (!req->replied) { - if (ack_type == ACK_TYPE_TIMED) { - g_cond_wait_until (&req->cond, &comm->mutex, end_time); - if (g_get_monotonic_time () >= end_time) - break; - } else - g_cond_wait (&req->cond, &comm->mutex); - } - - if (req->replied) { - ret = req->ret; - GST_TRACE_OBJECT (comm->element, "Got reply for request %u: %d (%s)", - req->id, ret, comm_request_ret_get_name (req->type, ret)); - } else { - req->comm_error = TRUE; - GST_ERROR_OBJECT (comm->element, "Timeout waiting for reply for request %u", - req->id); - } - - return ret; -} - -static void -comm_request_free (CommRequest * req) -{ - g_cond_clear (&req->cond); - g_free (req); -} - -static const gchar * -comm_request_ret_get_name (CommRequestType type, guint32 ret) -{ - switch (type) { - case COMM_REQUEST_TYPE_BUFFER: - return gst_flow_get_name (ret); - case COMM_REQUEST_TYPE_EVENT: - case COMM_REQUEST_TYPE_QUERY: - case COMM_REQUEST_TYPE_MESSAGE: - return ret ? "TRUE" : "FALSE"; - case COMM_REQUEST_TYPE_STATE_CHANGE: - return gst_element_state_change_return_get_name (ret); - default: - g_assert_not_reached (); - } -} - -static guint32 -comm_request_ret_get_failure_value (CommRequestType type) -{ - switch (type) { - case COMM_REQUEST_TYPE_BUFFER: - return GST_FLOW_COMM_ERROR; - case COMM_REQUEST_TYPE_EVENT: - case COMM_REQUEST_TYPE_MESSAGE: - case COMM_REQUEST_TYPE_QUERY: - return FALSE; - case COMM_REQUEST_TYPE_STATE_CHANGE: - return GST_STATE_CHANGE_FAILURE; - default: - g_assert_not_reached (); - } -} - -static const gchar * -gst_ipc_pipeline_comm_data_type_get_name (GstIpcPipelineCommDataType type) -{ - switch (type) { - case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK: - return "ACK"; - case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT: - return "QUERY_RESULT"; - case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER: - return "BUFFER"; - case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT: - return "EVENT"; - case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT: - return "SINK_MESSAGE_EVENT"; - case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY: - return "QUERY"; - case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE: - return "STATE_CHANGE"; - case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST: - return "STATE_LOST"; - case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE: - return "MESSAGE"; - case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE: - return "GERROR_MESSAGE"; - default: - return "UNKNOWN"; - } -} - -static gboolean -gst_ipc_pipeline_comm_sync_fd (GstIpcPipelineComm * comm, guint32 id, - GstQuery * query, guint32 * ret, AckType ack_type, CommRequestType type) -{ - CommRequest *req; - gboolean comm_error; - GHashTable *waiting_ids; - - if (ack_type == ACK_TYPE_NONE) - return TRUE; - - req = comm_request_new (id, type, query); - waiting_ids = g_hash_table_ref (comm->waiting_ids); - g_hash_table_insert (waiting_ids, GINT_TO_POINTER (id), req); - *ret = comm_request_wait (comm, req, ack_type); - comm_error = req->comm_error; - g_hash_table_remove (waiting_ids, GINT_TO_POINTER (id)); - g_hash_table_unref (waiting_ids); - return !comm_error; -} - -static gboolean -write_to_fd_raw (GstIpcPipelineComm * comm, const void *data, size_t size) -{ - size_t offset; - gboolean ret = TRUE; - - offset = 0; - GST_TRACE_OBJECT (comm->element, "Writing %zu bytes to fdout", size); - while (size) { - ssize_t written = - write (comm->fdout, (const unsigned char *) data + offset, size); - if (written < 0) { - if (errno == EAGAIN || errno == EINTR) - continue; - GST_ERROR_OBJECT (comm->element, "Failed to write to fd: %s", - strerror (errno)); - ret = FALSE; - goto done; - } - size -= written; - offset += written; - } - -done: - return ret; -} - -static gboolean -write_byte_writer_to_fd (GstIpcPipelineComm * comm, GstByteWriter * bw) -{ - guint8 *data; - gboolean ret; - guint size; - - size = gst_byte_writer_get_size (bw); - data = gst_byte_writer_reset_and_get_data (bw); - if (!data) - return FALSE; - ret = write_to_fd_raw (comm, data, size); - g_free (data); - return ret; -} - -static void -gst_ipc_pipeline_comm_write_ack_to_fd (GstIpcPipelineComm * comm, guint32 id, - guint32 ret, CommRequestType type) -{ - const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK; - guint32 size; - GstByteWriter bw; - - g_mutex_lock (&comm->mutex); - - GST_TRACE_OBJECT (comm->element, "Writing ACK for %u: %s (%d)", id, - comm_request_ret_get_name (type, ret), ret); - gst_byte_writer_init (&bw); - if (!gst_byte_writer_put_uint8 (&bw, payload_type)) - goto write_failed; - if (!gst_byte_writer_put_uint32_le (&bw, id)) - goto write_failed; - size = sizeof (ret); - if (!gst_byte_writer_put_uint32_le (&bw, size)) - goto write_failed; - if (!gst_byte_writer_put_uint32_le (&bw, ret)) - goto write_failed; - - if (!write_byte_writer_to_fd (comm, &bw)) - goto write_failed; - -done: - g_mutex_unlock (&comm->mutex); - gst_byte_writer_reset (&bw); - return; - -write_failed: - GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), - ("Failed to write to socket")); - goto done; -} - -void -gst_ipc_pipeline_comm_write_flow_ack_to_fd (GstIpcPipelineComm * comm, - guint32 id, GstFlowReturn ret) -{ - gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret, - COMM_REQUEST_TYPE_BUFFER); -} - -void -gst_ipc_pipeline_comm_write_boolean_ack_to_fd (GstIpcPipelineComm * comm, - guint32 id, gboolean ret) -{ - gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret, - COMM_REQUEST_TYPE_EVENT); -} - -void -gst_ipc_pipeline_comm_write_state_change_ack_to_fd (GstIpcPipelineComm * comm, - guint32 id, GstStateChangeReturn ret) -{ - gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret, - COMM_REQUEST_TYPE_STATE_CHANGE); -} - -void -gst_ipc_pipeline_comm_write_query_result_to_fd (GstIpcPipelineComm * comm, - guint32 id, gboolean result, GstQuery * query) -{ - const unsigned char payload_type = - GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT; - guint8 result8 = result; - guint32 size; - size_t len; - char *str = NULL; - guint32 type; - const GstStructure *structure; - GstByteWriter bw; - - g_mutex_lock (&comm->mutex); - - GST_TRACE_OBJECT (comm->element, - "Writing query result for %u: %d, %" GST_PTR_FORMAT, id, result, query); - gst_byte_writer_init (&bw); - if (!gst_byte_writer_put_uint8 (&bw, payload_type)) - goto write_failed; - if (!gst_byte_writer_put_uint32_le (&bw, id)) - goto write_failed; - structure = gst_query_get_structure (query); - if (structure) { - str = gst_structure_to_string (structure); - len = strlen (str); - } else { - str = NULL; - len = 0; - } - size = 1 + sizeof (guint32) + len + 1; - if (!gst_byte_writer_put_uint32_le (&bw, size)) - goto write_failed; - if (!gst_byte_writer_put_uint8 (&bw, result8)) - goto write_failed; - type = GST_QUERY_TYPE (query); - if (!gst_byte_writer_put_uint32_le (&bw, type)) - goto write_failed; - if (str) { - if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, len + 1)) - goto write_failed; - } else { - if (!gst_byte_writer_put_uint8 (&bw, 0)) - goto write_failed; - } - - if (!write_byte_writer_to_fd (comm, &bw)) - goto write_failed; - -done: - g_mutex_unlock (&comm->mutex); - gst_byte_writer_reset (&bw); - g_free (str); - return; - -write_failed: - GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), - ("Failed to write to socket")); - goto done; -} - -static gboolean -gst_ipc_pipeline_comm_read_query_result (GstIpcPipelineComm * comm, - guint32 size, GstQuery ** query) -{ - gchar *end = NULL; - GstStructure *structure; - guint8 result; - guint32 type; - const guint8 *payload = NULL; - guint32 mapped_size = size; - - /* this should not be called if we don't have enough yet */ - *query = NULL; - g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE); - g_return_val_if_fail (size >= 1 + sizeof (guint32), FALSE); - - payload = gst_adapter_map (comm->adapter, mapped_size); - if (!payload) - return FALSE; - result = *payload++; - memcpy (&type, payload, sizeof (type)); - payload += sizeof (type); - - size -= 1 + sizeof (guint32); - if (size == 0) - goto done; - - if (payload[size - 1]) { - result = FALSE; - goto done; - } - if (*payload) { - structure = gst_structure_from_string ((const char *) payload, &end); - } else { - structure = NULL; - } - if (!structure) { - result = FALSE; - goto done; - } - - *query = gst_query_new_custom (type, structure); - -done: - gst_adapter_unmap (comm->adapter); - gst_adapter_flush (comm->adapter, mapped_size); - return result; -} - -typedef struct -{ - guint32 bytes; - - guint64 size; - guint32 flags; - guint64 api; - char *str; -} MetaBuildInfo; - -typedef struct -{ - GstIpcPipelineComm *comm; - guint32 n_meta; - guint32 total_bytes; - MetaBuildInfo *info; -} MetaListRepresentation; - -static gboolean -build_meta (GstBuffer * buffer, GstMeta ** meta, gpointer user_data) -{ - MetaListRepresentation *repr = user_data; - - repr->n_meta++; - repr->info = g_realloc (repr->info, repr->n_meta * sizeof (MetaBuildInfo)); - repr->info[repr->n_meta - 1].bytes = - /* 4 byte bytes */ - 4 - /* 4 byte GstMetaFlags */ - + 4 - /* GstMetaInfo::api */ - + 4 + strlen (g_type_name ((*meta)->info->api)) + 1 - /* GstMetaInfo::size */ - + 8 - /* str length */ - + 4; - - repr->info[repr->n_meta - 1].flags = (*meta)->flags; - repr->info[repr->n_meta - 1].api = (*meta)->info->api; - repr->info[repr->n_meta - 1].size = (*meta)->info->size; - repr->info[repr->n_meta - 1].str = NULL; - - /* GstMeta is a base class, and actual useful classes are all different... - So we list a few of them we know we want and ignore the open ended rest */ - if ((*meta)->info->api == GST_PROTECTION_META_API_TYPE) { - GstProtectionMeta *m = (GstProtectionMeta *) * meta; - repr->info[repr->n_meta - 1].str = gst_structure_to_string (m->info); - repr->info[repr->n_meta - 1].bytes += - strlen (repr->info[repr->n_meta - 1].str) + 1; - GST_TRACE_OBJECT (repr->comm->element, "Found GstMeta type %s: %s", - g_type_name ((*meta)->info->api), repr->info[repr->n_meta - 1].str); - } else { - GST_WARNING_OBJECT (repr->comm->element, "Ignoring GstMeta type %s", - g_type_name ((*meta)->info->api)); - } - repr->total_bytes += repr->info[repr->n_meta - 1].bytes; - return TRUE; -} - -typedef struct -{ - guint64 pts; - guint64 dts; - guint64 duration; - guint64 offset; - guint64 offset_end; - guint64 flags; -} CommBufferMetadata; - -GstFlowReturn -gst_ipc_pipeline_comm_write_buffer_to_fd (GstIpcPipelineComm * comm, - GstBuffer * buffer) -{ - const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER; - GstMapInfo map; - guint32 ret32 = GST_FLOW_OK; - guint32 size, n; - CommBufferMetadata meta; - GstFlowReturn ret; - MetaListRepresentation repr = { comm, 0, 4, NULL }; /* starts a 4 for n_meta */ - GstByteWriter bw; - - g_mutex_lock (&comm->mutex); - ++comm->send_id; - - GST_TRACE_OBJECT (comm->element, "Writing buffer %u: %" GST_PTR_FORMAT, - comm->send_id, buffer); - - gst_byte_writer_init (&bw); - - meta.pts = GST_BUFFER_PTS (buffer); - meta.dts = GST_BUFFER_DTS (buffer); - meta.duration = GST_BUFFER_DURATION (buffer); - meta.offset = GST_BUFFER_OFFSET (buffer); - meta.offset_end = GST_BUFFER_OFFSET_END (buffer); - meta.flags = GST_BUFFER_FLAGS (buffer); - - /* work out meta size */ - gst_buffer_foreach_meta (buffer, build_meta, &repr); - - if (!gst_byte_writer_put_uint8 (&bw, payload_type)) - goto write_failed; - if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) - goto write_failed; - size = - gst_buffer_get_size (buffer) + sizeof (guint32) + - sizeof (CommBufferMetadata) + repr.total_bytes; - if (!gst_byte_writer_put_uint32_le (&bw, size)) - goto write_failed; - if (!gst_byte_writer_put_data (&bw, (const guint8 *) &meta, sizeof (meta))) - goto write_failed; - size = gst_buffer_get_size (buffer); - if (!gst_byte_writer_put_uint32_le (&bw, size)) - goto write_failed; - if (!write_byte_writer_to_fd (comm, &bw)) - goto write_failed; - - if (!gst_buffer_map (buffer, &map, GST_MAP_READ)) - goto map_failed; - ret = write_to_fd_raw (comm, map.data, map.size); - gst_buffer_unmap (buffer, &map); - if (!ret) - goto write_failed; - - /* meta */ - gst_byte_writer_init (&bw); - if (!gst_byte_writer_put_uint32_le (&bw, repr.n_meta)) - goto write_failed; - for (n = 0; n < repr.n_meta; ++n) { - const MetaBuildInfo *info = repr.info + n; - guint32 len; - const char *s; - - if (!gst_byte_writer_put_uint32_le (&bw, info->bytes)) - goto write_failed; - - if (!gst_byte_writer_put_uint32_le (&bw, info->flags)) - goto write_failed; - - s = g_type_name (info->api); - len = strlen (s) + 1; - if (!gst_byte_writer_put_uint32_le (&bw, len)) - goto write_failed; - if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len)) - goto write_failed; - - if (!gst_byte_writer_put_uint64_le (&bw, info->size)) - goto write_failed; - - s = info->str; - len = s ? (strlen (s) + 1) : 0; - if (!gst_byte_writer_put_uint32_le (&bw, len)) - goto write_failed; - if (len) - if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len)) - goto write_failed; - } - - if (!write_byte_writer_to_fd (comm, &bw)) - goto write_failed; - - if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32, - ACK_TYPE_BLOCKING, COMM_REQUEST_TYPE_BUFFER)) - goto wait_failed; - ret = ret32; - -done: - g_mutex_unlock (&comm->mutex); - gst_byte_writer_reset (&bw); - for (n = 0; n < repr.n_meta; ++n) - g_free (repr.info[n].str); - g_free (repr.info); - return ret; - -write_failed: - GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), - ("Failed to write to socket")); - ret = GST_FLOW_COMM_ERROR; - goto done; - -wait_failed: - GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), - ("Failed to wait for reply on socket")); - ret = GST_FLOW_COMM_ERROR; - goto done; - -map_failed: - GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL), - ("Failed to map buffer")); - ret = GST_FLOW_ERROR; - goto done; -} - -static GstBuffer * -gst_ipc_pipeline_comm_read_buffer (GstIpcPipelineComm * comm, guint32 size) -{ - GstBuffer *buffer; - CommBufferMetadata meta; - guint32 n_meta, n; - const guint8 *payload = NULL; - guint32 mapped_size, buffer_data_size; - - /* this should not be called if we don't have enough yet */ - g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL); - g_return_val_if_fail (size >= sizeof (CommBufferMetadata), NULL); - - mapped_size = sizeof (CommBufferMetadata) + sizeof (buffer_data_size); - payload = gst_adapter_map (comm->adapter, mapped_size); - if (!payload) - return NULL; - memcpy (&meta, payload, sizeof (CommBufferMetadata)); - payload += sizeof (CommBufferMetadata); - memcpy (&buffer_data_size, payload, sizeof (buffer_data_size)); - size -= mapped_size; - gst_adapter_unmap (comm->adapter); - gst_adapter_flush (comm->adapter, mapped_size); - - if (buffer_data_size == 0) { - buffer = gst_buffer_new (); - } else { - buffer = gst_adapter_get_buffer (comm->adapter, buffer_data_size); - gst_adapter_flush (comm->adapter, buffer_data_size); - } - size -= buffer_data_size; - - GST_BUFFER_PTS (buffer) = meta.pts; - GST_BUFFER_DTS (buffer) = meta.dts; - GST_BUFFER_DURATION (buffer) = meta.duration; - GST_BUFFER_OFFSET (buffer) = meta.offset; - GST_BUFFER_OFFSET_END (buffer) = meta.offset_end; - GST_BUFFER_FLAGS (buffer) = meta.flags; - - /* If you don't call that, the GType isn't yet known at the - g_type_from_name below */ - gst_protection_meta_get_info (); - - mapped_size = size; - payload = gst_adapter_map (comm->adapter, mapped_size); - if (!payload) { - gst_buffer_unref (buffer); - return NULL; - } - memcpy (&n_meta, payload, sizeof (n_meta)); - payload += sizeof (n_meta); - - for (n = 0; n < n_meta; ++n) { - guint32 flags, len, bytes; - guint64 msize; - GType api; - GstMeta *meta; - GstStructure *structure = NULL; - - memcpy (&bytes, payload, sizeof (bytes)); - payload += sizeof (bytes); - -#define READ_FIELD(f) do { \ - memcpy (&f, payload, sizeof (f)); \ - payload += sizeof(f); \ - } while(0) - - READ_FIELD (flags); - READ_FIELD (len); - api = g_type_from_name ((const char *) payload); - payload = (const guint8 *) strchr ((const char *) payload, 0) + 1; - READ_FIELD (msize); - READ_FIELD (len); - if (len) { - structure = gst_structure_new_from_string ((const char *) payload); - payload += len + 1; - } - - /* Seems we can add a meta from the api nor type ? */ - if (api == GST_PROTECTION_META_API_TYPE) { - meta = - gst_buffer_add_meta (buffer, gst_protection_meta_get_info (), NULL); - ((GstProtectionMeta *) meta)->info = structure; - } else { - GST_WARNING_OBJECT (comm->element, "Unsupported meta: %s", - g_type_name (api)); - } - -#undef READ_FIELD - - } - - gst_adapter_unmap (comm->adapter); - gst_adapter_flush (comm->adapter, mapped_size); - - return buffer; -} - -static gboolean -gst_ipc_pipeline_comm_write_sink_message_event_to_fd (GstIpcPipelineComm * comm, - GstEvent * event) -{ - const unsigned char payload_type = - GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT; - gboolean ret; - guint32 type, size, eseqnum, mseqnum, ret32 = TRUE, slen, structure_slen; - char *str = NULL; - const GstStructure *structure; - GstMessage *message = NULL; - const char *name; - GstByteWriter bw; - - g_return_val_if_fail (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE, - FALSE); - - g_mutex_lock (&comm->mutex); - ++comm->send_id; - - GST_TRACE_OBJECT (comm->element, - "Writing sink message event %u: %" GST_PTR_FORMAT, comm->send_id, event); - - gst_byte_writer_init (&bw); - if (!gst_byte_writer_put_uint8 (&bw, payload_type)) - goto write_failed; - if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) - goto write_failed; - name = gst_structure_get_name (gst_event_get_structure (event)); - slen = strlen (name) + 1; - gst_event_parse_sink_message (event, &message); - structure = gst_message_get_structure (message); - if (structure) { - str = gst_structure_to_string (structure); - structure_slen = strlen (str); - } else { - str = NULL; - structure_slen = 0; - } - size = sizeof (type) + sizeof (eseqnum) + sizeof (mseqnum) + sizeof (slen) + - strlen (name) + 1 + structure_slen + 1; - if (!gst_byte_writer_put_uint32_le (&bw, size)) - goto write_failed; - - type = GST_MESSAGE_TYPE (message); - if (!gst_byte_writer_put_uint32_le (&bw, type)) - goto write_failed; - size -= sizeof (type); - - eseqnum = GST_EVENT_SEQNUM (event); - if (!gst_byte_writer_put_uint32_le (&bw, eseqnum)) - goto write_failed; - size -= sizeof (eseqnum); - - mseqnum = GST_MESSAGE_SEQNUM (message); - if (!gst_byte_writer_put_uint32_le (&bw, mseqnum)) - goto write_failed; - size -= sizeof (mseqnum); - - if (!gst_byte_writer_put_uint32_le (&bw, slen)) - goto write_failed; - size -= sizeof (slen); - - if (!gst_byte_writer_put_data (&bw, (const guint8 *) name, slen)) - goto write_failed; - size -= slen; - - if (str) { - if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size)) - goto write_failed; - } else { - if (!gst_byte_writer_put_uint8 (&bw, 0)) - goto write_failed; - } - - if (!write_byte_writer_to_fd (comm, &bw)) - goto write_failed; - - if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32, - GST_EVENT_IS_SERIALIZED (event) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED, - COMM_REQUEST_TYPE_EVENT)) - goto write_failed; - - ret = ret32; - -done: - g_mutex_unlock (&comm->mutex); - gst_byte_writer_reset (&bw); - g_free (str); - if (message) - gst_message_unref (message); - return ret; - -write_failed: - GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), - ("Failed to write to socket")); - ret = FALSE; - goto done; -} - -static GstEvent * -gst_ipc_pipeline_comm_read_sink_message_event (GstIpcPipelineComm * comm, - guint32 size) -{ - GstMessage *message; - GstEvent *event = NULL; - gchar *end = NULL; - GstStructure *structure; - guint32 type, eseqnum, mseqnum, slen; - const char *name; - guint32 mapped_size = size; - const guint8 *payload; - - /* this should not be called if we don't have enough yet */ - g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL); - g_return_val_if_fail (size >= sizeof (type) + sizeof (slen), NULL); - - payload = gst_adapter_map (comm->adapter, mapped_size); - if (!payload) - return NULL; - memcpy (&type, payload, sizeof (type)); - payload += sizeof (type); - size -= sizeof (type); - if (size == 0) - goto done; - - memcpy (&eseqnum, payload, sizeof (eseqnum)); - payload += sizeof (eseqnum); - size -= sizeof (eseqnum); - if (size == 0) - goto done; - - memcpy (&mseqnum, payload, sizeof (mseqnum)); - payload += sizeof (mseqnum); - size -= sizeof (mseqnum); - if (size == 0) - goto done; - - memcpy (&slen, payload, sizeof (slen)); - payload += sizeof (slen); - size -= sizeof (slen); - if (size == 0) - goto done; - - if (payload[slen - 1]) - goto done; - name = (const char *) payload; - payload += slen; - size -= slen; - - if ((payload)[size - 1]) { - goto done; - } - if (*payload) { - structure = gst_structure_from_string ((const char *) payload, &end); - } else { - structure = NULL; - } - - message = - gst_message_new_custom (type, GST_OBJECT (comm->element), structure); - gst_message_set_seqnum (message, mseqnum); - event = gst_event_new_sink_message (name, message); - gst_event_set_seqnum (event, eseqnum); - gst_message_unref (message); - -done: - gst_adapter_unmap (comm->adapter); - gst_adapter_flush (comm->adapter, mapped_size); - return event; -} - -gboolean -gst_ipc_pipeline_comm_write_event_to_fd (GstIpcPipelineComm * comm, - gboolean upstream, GstEvent * event) -{ - const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT; - gboolean ret; - guint32 type, size, ret32 = TRUE, seqnum, slen; - char *str = NULL; - const GstStructure *structure; - GstByteWriter bw; - - /* we special case sink-message event as gst can't serialize/de-serialize it */ - if (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE) - return gst_ipc_pipeline_comm_write_sink_message_event_to_fd (comm, event); - - g_mutex_lock (&comm->mutex); - ++comm->send_id; - - GST_TRACE_OBJECT (comm->element, "Writing event %u: %" GST_PTR_FORMAT, - comm->send_id, event); - - gst_byte_writer_init (&bw); - if (!gst_byte_writer_put_uint8 (&bw, payload_type)) - goto write_failed; - if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) - goto write_failed; - structure = gst_event_get_structure (event); - if (structure) { - - if (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START) { - GstStructure *s = gst_structure_copy (structure); - gst_structure_remove_field (s, "stream"); - str = gst_structure_to_string (s); - gst_structure_free (s); - } else { - str = gst_structure_to_string (structure); - } - - slen = strlen (str); - } else { - str = NULL; - slen = 0; - } - size = sizeof (type) + sizeof (seqnum) + 1 + slen + 1; - if (!gst_byte_writer_put_uint32_le (&bw, size)) - goto write_failed; - - type = GST_EVENT_TYPE (event); - if (!gst_byte_writer_put_uint32_le (&bw, type)) - goto write_failed; - - seqnum = GST_EVENT_SEQNUM (event); - if (!gst_byte_writer_put_uint32_le (&bw, seqnum)) - goto write_failed; - - if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0)) - goto write_failed; - - if (str) { - if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1)) - goto write_failed; - } else { - if (!gst_byte_writer_put_uint8 (&bw, 0)) - goto write_failed; - } - - if (!write_byte_writer_to_fd (comm, &bw)) - goto write_failed; - - /* Upstream events get serialized, this is required to send seeks only - * one at a time. */ - if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32, - (GST_EVENT_IS_SERIALIZED (event) || GST_EVENT_IS_UPSTREAM (event)) ? - ACK_TYPE_BLOCKING : ACK_TYPE_NONE, COMM_REQUEST_TYPE_EVENT)) - goto write_failed; - ret = ret32; - -done: - g_mutex_unlock (&comm->mutex); - g_free (str); - gst_byte_writer_reset (&bw); - return ret; - -write_failed: - GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), - ("Failed to write to socket")); - ret = FALSE; - goto done; -} - -static GstEvent * -gst_ipc_pipeline_comm_read_event (GstIpcPipelineComm * comm, guint32 size, - gboolean * upstream) -{ - GstEvent *event = NULL; - gchar *end = NULL; - GstStructure *structure; - guint32 type, seqnum; - guint32 mapped_size = size; - const guint8 *payload; - - /* this should not be called if we don't have enough yet */ - g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL); - g_return_val_if_fail (size >= sizeof (type), NULL); - - payload = gst_adapter_map (comm->adapter, mapped_size); - if (!payload) - return NULL; - - memcpy (&type, payload, sizeof (type)); - payload += sizeof (type); - size -= sizeof (type); - if (size == 0) - goto done; - - memcpy (&seqnum, payload, sizeof (seqnum)); - payload += sizeof (seqnum); - size -= sizeof (seqnum); - if (size == 0) - goto done; - - *upstream = (*payload) ? TRUE : FALSE; - payload += 1; - size -= 1; - if (size == 0) - goto done; - - if (payload[size - 1]) - goto done; - if (*payload) { - structure = gst_structure_from_string ((const char *) payload, &end); - } else { - structure = NULL; - } - - event = gst_event_new_custom (type, structure); - gst_event_set_seqnum (event, seqnum); - -done: - gst_adapter_unmap (comm->adapter); - gst_adapter_flush (comm->adapter, mapped_size); - return event; -} - -gboolean -gst_ipc_pipeline_comm_write_query_to_fd (GstIpcPipelineComm * comm, - gboolean upstream, GstQuery * query) -{ - const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY; - gboolean ret; - guint32 type, size, ret32 = TRUE, slen; - char *str = NULL; - const GstStructure *structure; - GstByteWriter bw; - - g_mutex_lock (&comm->mutex); - ++comm->send_id; - - GST_TRACE_OBJECT (comm->element, "Writing query %u: %" GST_PTR_FORMAT, - comm->send_id, query); - - gst_byte_writer_init (&bw); - if (!gst_byte_writer_put_uint8 (&bw, payload_type)) - goto write_failed; - if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) - goto write_failed; - structure = gst_query_get_structure (query); - if (structure) { - str = gst_structure_to_string (structure); - slen = strlen (str); - } else { - str = NULL; - slen = 0; - } - size = sizeof (type) + 1 + slen + 1; - if (!gst_byte_writer_put_uint32_le (&bw, size)) - goto write_failed; - - type = GST_QUERY_TYPE (query); - if (!gst_byte_writer_put_uint32_le (&bw, type)) - goto write_failed; - - if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0)) - goto write_failed; - - if (str) { - if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1)) - goto write_failed; - } else { - if (!gst_byte_writer_put_uint8 (&bw, 0)) - goto write_failed; - } - - if (!write_byte_writer_to_fd (comm, &bw)) - goto write_failed; - - if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, query, &ret32, - GST_QUERY_IS_SERIALIZED (query) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED, - COMM_REQUEST_TYPE_QUERY)) - goto write_failed; - - ret = ret32; - -done: - g_mutex_unlock (&comm->mutex); - g_free (str); - gst_byte_writer_reset (&bw); - return ret; - -write_failed: - GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), - ("Failed to write to socket")); - ret = FALSE; - goto done; -} - -static GstQuery * -gst_ipc_pipeline_comm_read_query (GstIpcPipelineComm * comm, guint32 size, - gboolean * upstream) -{ - GstQuery *query = NULL; - gchar *end = NULL; - GstStructure *structure; - guint32 type; - guint32 mapped_size = size; - const guint8 *payload; - - /* this should not be called if we don't have enough yet */ - g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL); - g_return_val_if_fail (size >= sizeof (type), NULL); - - payload = gst_adapter_map (comm->adapter, mapped_size); - if (!payload) - return NULL; - - memcpy (&type, payload, sizeof (type)); - payload += sizeof (type); - size -= sizeof (type); - if (size == 0) - goto done; - - *upstream = (*payload) ? TRUE : FALSE; - payload += 1; - size -= 1; - if (size == 0) - goto done; - - if (payload[size - 1]) - goto done; - if (*payload) { - structure = gst_structure_from_string ((const char *) payload, &end); - } else { - structure = NULL; - } - - query = gst_query_new_custom (type, structure); - - /* CAPS queries contain a filter field, of GstCaps type, which can be NULL. - This does not play well with the serialization/deserialization system, - which will give us a non-NULL GstCaps which has a value of NULL. This - in turn wreaks havoc with any code that tests whether filter is NULL - (which basically means, am I being given an optional GstCaps ?). - So we look for non-NULL GstCaps which have NULL contents, and replace - them with NULL instead. */ - if (GST_QUERY_TYPE (query) == GST_QUERY_CAPS) { - GstCaps *filter; - gst_query_parse_caps (query, &filter); - if (filter - && !strcmp (gst_structure_get_name (gst_caps_get_structure (filter, 0)), - "NULL")) { - gst_query_unref (query); - query = gst_query_new_caps (NULL); - } - } - -done: - gst_adapter_unmap (comm->adapter); - gst_adapter_flush (comm->adapter, mapped_size); - return query; -} - -GstStateChangeReturn -gst_ipc_pipeline_comm_write_state_change_to_fd (GstIpcPipelineComm * comm, - GstStateChange transition) -{ - const unsigned char payload_type = - GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE; - GstStateChangeReturn ret; - guint32 size, ret32 = GST_STATE_CHANGE_SUCCESS; - GstByteWriter bw; - - g_mutex_lock (&comm->mutex); - ++comm->send_id; - - GST_TRACE_OBJECT (comm->element, "Writing state change %u: %s -> %s", - comm->send_id, - gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)), - gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition))); - - gst_byte_writer_init (&bw); - if (!gst_byte_writer_put_uint8 (&bw, payload_type)) - goto write_failed; - if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) - goto write_failed; - size = sizeof (transition); - if (!gst_byte_writer_put_uint32_le (&bw, size)) - goto write_failed; - if (!gst_byte_writer_put_uint32_le (&bw, transition)) - goto write_failed; - - if (!write_byte_writer_to_fd (comm, &bw)) - goto write_failed; - - if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32, - ACK_TYPE_TIMED, COMM_REQUEST_TYPE_STATE_CHANGE)) - goto write_failed; - ret = ret32; - -done: - g_mutex_unlock (&comm->mutex); - gst_byte_writer_reset (&bw); - return ret; - -write_failed: - GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), - ("Failed to write to socket")); - ret = GST_STATE_CHANGE_FAILURE; - goto done; -} - -static gboolean -is_valid_state_change (GstStateChange transition) -{ - if (transition == GST_STATE_CHANGE_NULL_TO_READY) - return TRUE; - if (transition == GST_STATE_CHANGE_READY_TO_PAUSED) - return TRUE; - if (transition == GST_STATE_CHANGE_PAUSED_TO_PLAYING) - return TRUE; - if (transition == GST_STATE_CHANGE_PLAYING_TO_PAUSED) - return TRUE; - if (transition == GST_STATE_CHANGE_PAUSED_TO_READY) - return TRUE; - if (transition == GST_STATE_CHANGE_READY_TO_NULL) - return TRUE; - if (GST_STATE_TRANSITION_CURRENT (transition) == - GST_STATE_TRANSITION_NEXT (transition)) - return TRUE; - return FALSE; -} - -static gboolean -gst_ipc_pipeline_comm_read_state_change (GstIpcPipelineComm * comm, - guint32 size, guint32 * transition) -{ - guint32 mapped_size = size; - const guint8 *payload; - - /* this should not be called if we don't have enough yet */ - g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE); - g_return_val_if_fail (size >= sizeof (*transition), FALSE); - - payload = gst_adapter_map (comm->adapter, size); - if (!payload) - return FALSE; - memcpy (transition, payload, sizeof (*transition)); - gst_adapter_unmap (comm->adapter); - gst_adapter_flush (comm->adapter, mapped_size); - return is_valid_state_change (*transition); -} - -void -gst_ipc_pipeline_comm_write_state_lost_to_fd (GstIpcPipelineComm * comm) -{ - const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST; - guint32 size; - GstByteWriter bw; - - g_mutex_lock (&comm->mutex); - ++comm->send_id; - - GST_TRACE_OBJECT (comm->element, "Writing state-lost %u", comm->send_id); - gst_byte_writer_init (&bw); - if (!gst_byte_writer_put_uint8 (&bw, payload_type)) - goto write_failed; - if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) - goto write_failed; - size = 0; - if (!gst_byte_writer_put_uint32_le (&bw, size)) - goto write_failed; - - if (!write_byte_writer_to_fd (comm, &bw)) - goto write_failed; - -done: - g_mutex_unlock (&comm->mutex); - gst_byte_writer_reset (&bw); - return; - -write_failed: - GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), - ("Failed to write to socket")); - goto done; -} - -static gboolean -gst_ipc_pipeline_comm_read_state_lost (GstIpcPipelineComm * comm, guint32 size) -{ - /* no payload */ - return TRUE; -} - -static gboolean -gst_ipc_pipeline_comm_write_gerror_message_to_fd (GstIpcPipelineComm * comm, - GstMessage * message) -{ - const unsigned char payload_type = - GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE; - gboolean ret; - guint32 code, size, ret32 = TRUE; - char *str = NULL; - GError *error; - char *extra_message; - const char *domain_string; - unsigned char msgtype; - GstByteWriter bw; - - g_mutex_lock (&comm->mutex); - ++comm->send_id; - - if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR) { - gst_message_parse_error (message, &error, &extra_message); - msgtype = 2; - } else if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING) { - gst_message_parse_warning (message, &error, &extra_message); - msgtype = 1; - } else { - gst_message_parse_info (message, &error, &extra_message); - msgtype = 0; - } - code = error->code; - domain_string = g_quark_to_string (error->domain); - GST_TRACE_OBJECT (comm->element, - "Writing error %u: domain %s, code %u, message %s, extra message %s", - comm->send_id, domain_string, error->code, error->message, extra_message); - - gst_byte_writer_init (&bw); - if (!gst_byte_writer_put_uint8 (&bw, payload_type)) - goto write_failed; - if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) - goto write_failed; - - size = sizeof (size); - size += 1; - size += strlen (domain_string) + 1; - size += sizeof (code); - size += sizeof (size); - size += error->message ? strlen (error->message) + 1 : 0; - size += sizeof (size); - size += extra_message ? strlen (extra_message) + 1 : 0; - - if (!gst_byte_writer_put_uint32_le (&bw, size)) - goto write_failed; - - if (!gst_byte_writer_put_uint8 (&bw, msgtype)) - goto write_failed; - size = strlen (domain_string) + 1; - if (!gst_byte_writer_put_uint32_le (&bw, size)) - goto write_failed; - if (!gst_byte_writer_put_data (&bw, (const guint8 *) domain_string, size)) - goto write_failed; - if (!gst_byte_writer_put_uint32_le (&bw, code)) - goto write_failed; - size = error->message ? strlen (error->message) + 1 : 0; - if (!gst_byte_writer_put_uint32_le (&bw, size)) - goto write_failed; - if (error->message) { - if (!gst_byte_writer_put_data (&bw, (const guint8 *) error->message, size)) - goto write_failed; - } - size = extra_message ? strlen (extra_message) + 1 : 0; - if (!gst_byte_writer_put_uint32_le (&bw, size)) - goto write_failed; - if (extra_message) { - if (!gst_byte_writer_put_data (&bw, (const guint8 *) extra_message, size)) - goto write_failed; - } - - if (!write_byte_writer_to_fd (comm, &bw)) - goto write_failed; - - if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32, - ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE)) - goto write_failed; - - ret = ret32; - -done: - g_mutex_unlock (&comm->mutex); - g_free (str); - g_error_free (error); - g_free (extra_message); - gst_byte_writer_reset (&bw); - return ret; - -write_failed: - GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), - ("Failed to write to socket")); - ret = FALSE; - goto done; -} - -static GstMessage * -gst_ipc_pipeline_comm_read_gerror_message (GstIpcPipelineComm * comm, - guint32 size) -{ - GstMessage *message = NULL; - guint32 code; - GQuark domain; - const char *msg, *extra_message; - GError *error; - unsigned char msgtype; - guint32 mapped_size = size; - const guint8 *payload; - - /* this should not be called if we don't have enough yet */ - g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL); - g_return_val_if_fail (size >= sizeof (code) + sizeof (size) * 3 + 1 + 1, - NULL); - - payload = gst_adapter_map (comm->adapter, mapped_size); - if (!payload) - return NULL; - msgtype = *payload++; - memcpy (&size, payload, sizeof (size)); - payload += sizeof (size); - if (payload[size - 1]) - goto done; - domain = g_quark_from_string ((const char *) payload); - payload += size; - - memcpy (&code, payload, sizeof (code)); - payload += sizeof (code); - - memcpy (&size, payload, sizeof (size)); - payload += sizeof (size); - if (size) { - if (payload[size - 1]) - goto done; - msg = (const char *) payload; - } else { - msg = NULL; - } - payload += size; - - memcpy (&size, payload, sizeof (size)); - payload += sizeof (size); - if (size) { - if (payload[size - 1]) - goto done; - extra_message = (const char *) payload; - } else { - extra_message = NULL; - } - payload += size; - - error = g_error_new (domain, code, "%s", msg); - if (msgtype == 2) - message = - gst_message_new_error (GST_OBJECT (comm->element), error, - extra_message); - else if (msgtype == 1) - message = - gst_message_new_warning (GST_OBJECT (comm->element), error, - extra_message); - else - message = - gst_message_new_info (GST_OBJECT (comm->element), error, extra_message); - g_error_free (error); - -done: - gst_adapter_unmap (comm->adapter); - gst_adapter_flush (comm->adapter, mapped_size); - - return message; -} - -gboolean -gst_ipc_pipeline_comm_write_message_to_fd (GstIpcPipelineComm * comm, - GstMessage * message) -{ - const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE; - gboolean ret; - guint32 type, size, ret32 = TRUE, slen; - char *str = NULL; - const GstStructure *structure; - GstByteWriter bw; - - /* we special case error as gst can't serialize/de-serialize it */ - if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR - || GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING - || GST_MESSAGE_TYPE (message) == GST_MESSAGE_INFO) - return gst_ipc_pipeline_comm_write_gerror_message_to_fd (comm, message); - - g_mutex_lock (&comm->mutex); - ++comm->send_id; - - GST_TRACE_OBJECT (comm->element, "Writing message %u: %" GST_PTR_FORMAT, - comm->send_id, message); - - gst_byte_writer_init (&bw); - if (!gst_byte_writer_put_uint8 (&bw, payload_type)) - goto write_failed; - if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) - goto write_failed; - structure = gst_message_get_structure (message); - if (structure) { - str = gst_structure_to_string (structure); - slen = strlen (str); - } else { - str = NULL; - slen = 0; - } - size = sizeof (type) + slen + 1; - if (!gst_byte_writer_put_uint32_le (&bw, size)) - goto write_failed; - - type = GST_MESSAGE_TYPE (message); - if (!gst_byte_writer_put_uint32_le (&bw, type)) - goto write_failed; - size -= sizeof (type); - if (str) { - if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size)) - goto write_failed; - } else { - if (!gst_byte_writer_put_uint8 (&bw, 0)) - goto write_failed; - } - - if (!write_byte_writer_to_fd (comm, &bw)) - goto write_failed; - - if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32, - ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE)) - goto write_failed; - - ret = ret32; - -done: - g_mutex_unlock (&comm->mutex); - g_free (str); - gst_byte_writer_reset (&bw); - return ret; - -write_failed: - GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), - ("Failed to write to socket")); - ret = FALSE; - goto done; -} - -static GstMessage * -gst_ipc_pipeline_comm_read_message (GstIpcPipelineComm * comm, guint32 size) -{ - GstMessage *message = NULL; - gchar *end = NULL; - GstStructure *structure; - guint32 type; - guint32 mapped_size = size; - const guint8 *payload; - - /* this should not be called if we don't have enough yet */ - g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL); - g_return_val_if_fail (size >= sizeof (type), NULL); - - payload = gst_adapter_map (comm->adapter, mapped_size); - if (!payload) - return NULL; - memcpy (&type, payload, sizeof (type)); - payload += sizeof (type); - size -= sizeof (type); - if (size == 0) - goto done; - - if (payload[size - 1]) - goto done; - if (*payload) { - structure = gst_structure_from_string ((const char *) payload, &end); - } else { - structure = NULL; - } - - message = - gst_message_new_custom (type, GST_OBJECT (comm->element), structure); - -done: - gst_adapter_unmap (comm->adapter); - gst_adapter_flush (comm->adapter, mapped_size); - - return message; -} - -void -gst_ipc_pipeline_comm_init (GstIpcPipelineComm * comm, GstElement * element) -{ - g_mutex_init (&comm->mutex); - comm->element = element; - comm->fdin = comm->fdout = -1; - comm->ack_time = DEFAULT_ACK_TIME; - comm->waiting_ids = - g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, - (GDestroyNotify) comm_request_free); - comm->adapter = gst_adapter_new (); - g_atomic_int_set (&comm->thread_running, 0); -} - -void -gst_ipc_pipeline_comm_clear (GstIpcPipelineComm * comm) -{ - g_assert (!g_atomic_int_get (&comm->thread_running)); - g_hash_table_destroy (comm->waiting_ids); - gst_object_unref (comm->adapter); - g_mutex_clear (&comm->mutex); -} - -static void -cancel_request (gpointer key, gpointer value, gpointer user_data, - GstFlowReturn fret) -{ - GstIpcPipelineComm *comm = (GstIpcPipelineComm *) user_data; - guint32 id = GPOINTER_TO_INT (key); - CommRequest *req = (CommRequest *) value; - - GST_TRACE_OBJECT (comm->element, "Cancelling request %u, type %d", id, - req->type); - req->ret = fret; - req->replied = TRUE; - g_cond_signal (&req->cond); -} - -static void -cancel_request_error (gpointer key, gpointer value, gpointer user_data) -{ - CommRequest *req = (CommRequest *) value; - GstFlowReturn fret = comm_request_ret_get_failure_value (req->type); - - cancel_request (key, value, user_data, fret); -} - -void -gst_ipc_pipeline_comm_cancel (GstIpcPipelineComm * comm, gboolean cleanup) -{ - g_mutex_lock (&comm->mutex); - g_hash_table_foreach (comm->waiting_ids, cancel_request_error, comm); - if (cleanup) { - g_hash_table_unref (comm->waiting_ids); - comm->waiting_ids = - g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, - (GDestroyNotify) comm_request_free); - } - g_mutex_unlock (&comm->mutex); -} - -static gboolean -set_field (GQuark field_id, const GValue * value, gpointer user_data) -{ - GstStructure *structure = user_data; - - gst_structure_id_set_value (structure, field_id, value); - - return TRUE; -} - -static gboolean -gst_ipc_pipeline_comm_reply_request (GstIpcPipelineComm * comm, guint32 id, - GstFlowReturn ret, GstQuery * query) -{ - CommRequest *req; - - req = g_hash_table_lookup (comm->waiting_ids, GINT_TO_POINTER (id)); - if (!req) { - GST_WARNING_OBJECT (comm->element, "Got reply for unknown request %u", id); - return FALSE; - } - - GST_TRACE_OBJECT (comm->element, "Got reply %d (%s) for request %u", ret, - comm_request_ret_get_name (req->type, ret), req->id); - req->replied = TRUE; - req->ret = ret; - if (query) { - if (req->query) { - /* We need to update the original query in place, as the caller - will expect the object to be the same */ - GstStructure *structure = gst_query_writable_structure (req->query); - gst_structure_remove_all_fields (structure); - gst_structure_foreach (gst_query_get_structure (query), set_field, - structure); - } else { - GST_WARNING_OBJECT (comm->element, - "Got query reply, but no query was in the request"); - } - } - g_cond_signal (&req->cond); - return TRUE; -} - -static gboolean -update_adapter (GstIpcPipelineComm * comm) -{ - GstMemory *mem = NULL; - - for (;;) { - fd_set set; - struct timeval tv; - int sret; - ssize_t sz; - GstBuffer *buf; - GstMapInfo map; - int fdin = comm->fdin; - int fdclose = comm->reader_thread_stopping_pipe[0]; - int fdmax; - - FD_ZERO (&set); - FD_SET (fdclose, &set); - fdmax = fdclose; - if (fdin >= 0 && GST_ELEMENT_PARENT (comm->element)) { - FD_SET (fdin, &set); - if (fdin > fdmax) - fdmax = fdin; - } - tv.tv_sec = 0; - tv.tv_usec = 100000; - sret = select (fdmax + 1, &set, NULL, NULL, &tv); - if (sret < 0) { - if (errno == EAGAIN) - continue; - if (errno == EINTR) - break; - if (mem) - gst_memory_unref (mem); - return FALSE; - } - if (FD_ISSET (fdclose, &set)) { - GST_INFO_OBJECT (comm->element, "data received on close notify pipe"); - comm->reader_thread_stopping = TRUE; - break; - } - if (fdin < 0) - break; - if (!FD_ISSET (fdin, &set)) - break; - if (mem == NULL) - mem = gst_allocator_alloc (NULL, comm->read_chunk_size, NULL); - gst_memory_map (mem, &map, GST_MAP_WRITE); - sz = read (fdin, map.data, map.size); - gst_memory_unmap (mem, &map); - if (sz < 0) { - if (errno == EAGAIN) - continue; - mem = NULL; - if (errno == EINTR) - break; - gst_memory_unref (mem); - return FALSE; - } - if (sz == 0) { - GST_INFO_OBJECT (comm->element, "fd closed"); - comm->reader_thread_stopping = TRUE; - break; - } - gst_memory_resize (mem, 0, sz); - buf = gst_buffer_new (); - gst_buffer_append_memory (buf, mem); - mem = NULL; - GST_TRACE_OBJECT (comm->element, "Read %u bytes from fd", (unsigned) sz); - gst_adapter_push (comm->adapter, buf); - - /* If we have more data, we loop, otherwise we break */ - FD_ZERO (&set); - if (fdin >= 0) - FD_SET (comm->fdin, &set); - tv.tv_sec = 0; - tv.tv_usec = 0; - sret = select (fdin + 1, &set, NULL, NULL, &tv); - if (sret < 0 || !FD_ISSET (fdin, &set)) - break; - } - if (mem) - gst_memory_unref (mem); - - return TRUE; -} - -static gboolean -read_many (GstIpcPipelineComm * comm) -{ - gboolean ret = TRUE; - gsize available; - const guint8 *payload; - - while (1) - switch (comm->state) { - case GST_IPC_PIPELINE_COMM_STATE_TYPE: - { - guint8 type; - guint32 mapped_size; - - available = gst_adapter_available (comm->adapter); - mapped_size = 1 + sizeof (gint32) * 2; - if (available < mapped_size) - goto done; - - payload = gst_adapter_map (comm->adapter, mapped_size); - type = *payload++; - g_mutex_lock (&comm->mutex); - memcpy (&comm->id, payload, sizeof (guint32)); - memcpy (&comm->payload_length, payload + 4, sizeof (guint32)); - g_mutex_unlock (&comm->mutex); - gst_adapter_unmap (comm->adapter); - gst_adapter_flush (comm->adapter, mapped_size); - GST_TRACE_OBJECT (comm->element, "Got id %u, type %d, payload %u", - comm->id, type, comm->payload_length); - switch (type) { - case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK: - case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT: - case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER: - case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT: - case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT: - case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY: - case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE: - case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST: - case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE: - case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE: - GST_TRACE_OBJECT (comm->element, "switching to state %s", - gst_ipc_pipeline_comm_data_type_get_name (type)); - comm->state = type; - break; - default: - goto out_of_sync; - } - break; - } - case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK: - { - const guint8 *rets; - guint32 ret32; - - available = gst_adapter_available (comm->adapter); - if (available < comm->payload_length) - goto done; - - if (available < sizeof (guint32)) - goto ack_failed; - - rets = gst_adapter_map (comm->adapter, sizeof (guint32)); - memcpy (&ret32, rets, sizeof (ret32)); - gst_adapter_unmap (comm->adapter); - gst_adapter_flush (comm->adapter, sizeof (guint32)); - GST_TRACE_OBJECT (comm->element, "Got ACK %s for id %u", - gst_flow_get_name (ret32), comm->id); - - g_mutex_lock (&comm->mutex); - gst_ipc_pipeline_comm_reply_request (comm, comm->id, ret32, NULL); - g_mutex_unlock (&comm->mutex); - - GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); - comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; - break; - } - case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT: - { - GstQuery *query = NULL; - gboolean qret; - - available = gst_adapter_available (comm->adapter); - if (available < comm->payload_length) - goto done; - - qret = - gst_ipc_pipeline_comm_read_query_result (comm, comm->payload_length, - &query); - - GST_TRACE_OBJECT (comm->element, - "deserialized query result %p: %d, %" GST_PTR_FORMAT, query, qret, - query); - - g_mutex_lock (&comm->mutex); - gst_ipc_pipeline_comm_reply_request (comm, comm->id, qret, query); - g_mutex_unlock (&comm->mutex); - - gst_query_unref (query); - - GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); - comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; - break; - } - case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER: - { - GstBuffer *buf; - - available = gst_adapter_available (comm->adapter); - if (available < comm->payload_length) - goto done; - - buf = gst_ipc_pipeline_comm_read_buffer (comm, comm->payload_length); - if (!buf) - goto buffer_failed; - - /* set caps and push */ - GST_TRACE_OBJECT (comm->element, - "deserialized buffer %p, pushing, timestamp %" GST_TIME_FORMAT - ", duration %" GST_TIME_FORMAT ", offset %" G_GINT64_FORMAT - ", offset_end %" G_GINT64_FORMAT ", size %" G_GSIZE_FORMAT - ", flags 0x%x", buf, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)), - GST_TIME_ARGS (GST_BUFFER_DURATION (buf)), GST_BUFFER_OFFSET (buf), - GST_BUFFER_OFFSET_END (buf), gst_buffer_get_size (buf), - GST_BUFFER_FLAGS (buf)); - - gst_mini_object_set_qdata (GST_MINI_OBJECT (buf), QUARK_ID, - GINT_TO_POINTER (comm->id), NULL); - - if (comm->on_buffer) - (*comm->on_buffer) (comm->id, buf, comm->user_data); - - GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); - comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; - break; - } - case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT: - { - GstEvent *event; - gboolean upstream; - - available = gst_adapter_available (comm->adapter); - if (available < comm->payload_length) - goto done; - - event = gst_ipc_pipeline_comm_read_event (comm, comm->payload_length, - &upstream); - if (!event) - goto event_failed; - - GST_TRACE_OBJECT (comm->element, "deserialized event %p of type %s", - event, gst_event_type_get_name (event->type)); - - gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID, - GINT_TO_POINTER (comm->id), NULL); - - if (comm->on_event) - (*comm->on_event) (comm->id, event, upstream, comm->user_data); - - GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); - comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; - break; - } - case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT: - { - GstEvent *event; - - available = gst_adapter_available (comm->adapter); - if (available < comm->payload_length) - goto done; - - event = gst_ipc_pipeline_comm_read_sink_message_event (comm, - comm->payload_length); - if (!event) - goto event_failed; - - GST_TRACE_OBJECT (comm->element, "deserialized sink message event %p", - event); - - gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID, - GINT_TO_POINTER (comm->id), NULL); - - if (comm->on_event) - (*comm->on_event) (comm->id, event, FALSE, comm->user_data); - - GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); - comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; - break; - } - case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY: - { - GstQuery *query; - gboolean upstream; - - available = gst_adapter_available (comm->adapter); - if (available < comm->payload_length) - goto done; - - query = gst_ipc_pipeline_comm_read_query (comm, comm->payload_length, - &upstream); - if (!query) - goto query_failed; - - GST_TRACE_OBJECT (comm->element, "deserialized query %p of type %s", - query, gst_query_type_get_name (query->type)); - - gst_mini_object_set_qdata (GST_MINI_OBJECT (query), QUARK_ID, - GINT_TO_POINTER (comm->id), NULL); - - if (comm->on_query) - (*comm->on_query) (comm->id, query, upstream, comm->user_data); - - GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); - comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; - break; - } - case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE: - { - guint32 transition; - - available = gst_adapter_available (comm->adapter); - if (available < comm->payload_length) - goto done; - - if (!gst_ipc_pipeline_comm_read_state_change (comm, - comm->payload_length, &transition)) - goto state_change_failed; - - GST_TRACE_OBJECT (comm->element, - "deserialized state change request: %s -> %s", - gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT - (transition)), - gst_element_state_get_name (GST_STATE_TRANSITION_NEXT - (transition))); - - if (comm->on_state_change) - (*comm->on_state_change) (comm->id, transition, comm->user_data); - - GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); - comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; - break; - } - case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST: - { - available = gst_adapter_available (comm->adapter); - if (available < comm->payload_length) - goto done; - - if (!gst_ipc_pipeline_comm_read_state_lost (comm, comm->payload_length)) - goto event_failed; - - GST_TRACE_OBJECT (comm->element, "deserialized state-lost"); - - if (comm->on_state_lost) - (*comm->on_state_lost) (comm->user_data); - - GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); - comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; - break; - } - case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE: - { - GstMessage *message; - - available = gst_adapter_available (comm->adapter); - if (available < comm->payload_length) - goto done; - - message = gst_ipc_pipeline_comm_read_message (comm, - comm->payload_length); - if (!message) - goto message_failed; - - GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s", - message, gst_message_type_get_name (message->type)); - - if (comm->on_message) - (*comm->on_message) (comm->id, message, comm->user_data); - - GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); - comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; - break; - } - case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE: - { - GstMessage *message; - - available = gst_adapter_available (comm->adapter); - if (available < comm->payload_length) - goto done; - - message = gst_ipc_pipeline_comm_read_gerror_message (comm, - comm->payload_length); - if (!message) - goto message_failed; - - GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s", - message, gst_message_type_get_name (message->type)); - - if (comm->on_message) - (*comm->on_message) (comm->id, message, comm->user_data); - - GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); - comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; - break; - } - } - -done: - return ret; - - /* ERRORS */ -out_of_sync: - { - GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), - ("Socket out of sync")); - ret = FALSE; - goto done; - } -state_change_failed: - { - GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), - ("could not read state change from fd")); - ret = FALSE; - goto done; - } -ack_failed: - { - GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), - ("could not read ack from fd")); - ret = FALSE; - goto done; - } -buffer_failed: - { - GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), - ("could not read buffer from fd")); - ret = FALSE; - goto done; - } -event_failed: - { - GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), - ("could not read event from fd")); - ret = FALSE; - goto done; - } -message_failed: - { - GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), - ("could not read message from fd")); - ret = FALSE; - goto done; - } -query_failed: - { - GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), - ("could not read query from fd")); - ret = FALSE; - goto done; - } -} - -static gpointer -reader_thread (gpointer data) -{ - GstIpcPipelineComm *comm = (GstIpcPipelineComm *) data; - - g_atomic_int_set (&comm->thread_running, 1); - while (!comm->reader_thread_stopping) { - if (!update_adapter (comm)) { - if (comm->reader_thread_stopping) { - GST_INFO_OBJECT (comm->element, "We're stopping, all good"); - break; - } - GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL), - ("Failed to read from socket")); - break; - } - read_many (comm); - } - - GST_INFO_OBJECT (comm->element, "Reader thread ending"); - g_atomic_int_set (&comm->thread_running, 0); - - return NULL; -} - -gboolean -gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm, - void (*on_buffer) (guint32, GstBuffer *, gpointer), - void (*on_event) (guint32, GstEvent *, gboolean, gpointer), - void (*on_query) (guint32, GstQuery *, gboolean, gpointer), - void (*on_state_change) (guint32, GstStateChange, gpointer), - void (*on_state_lost) (gpointer), - void (*on_message) (guint32, GstMessage *, gpointer), gpointer user_data) -{ - if (comm->reader_thread) - return FALSE; - - comm->reader_thread_stopping = FALSE; - comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; - comm->on_buffer = on_buffer; - comm->on_event = on_event; - comm->on_query = on_query; - comm->on_state_change = on_state_change; - comm->on_state_lost = on_state_lost; - comm->on_message = on_message; - comm->user_data = user_data; - if (pipe (comm->reader_thread_stopping_pipe) < 0) { - GST_WARNING_OBJECT (comm->element, "Failed to create pipes"); - return FALSE; - } - comm->reader_thread = - g_thread_new ("reader", (GThreadFunc) reader_thread, comm); - return TRUE; -} - -void -gst_ipc_pipeline_comm_stop_reader_thread (GstIpcPipelineComm * comm) -{ - char dummy = 0; - - if (!comm->reader_thread) - return; - while (write (comm->reader_thread_stopping_pipe[1], &dummy, 1) < 0 - && errno == EINTR); - g_thread_join (comm->reader_thread); - close (comm->reader_thread_stopping_pipe[0]); - close (comm->reader_thread_stopping_pipe[1]); - comm->reader_thread = NULL; -} - -static gchar * -gst_value_serialize_event (const GValue * value) -{ - const GstStructure *structure; - GstEvent *ev; - gchar *type, *ts, *seqnum, *rt_offset, *str, *str64, *s; - GValue val = G_VALUE_INIT; - - ev = g_value_get_boxed (value); - - g_value_init (&val, gst_event_type_get_type ()); - g_value_set_enum (&val, ev->type); - type = gst_value_serialize (&val); - g_value_unset (&val); - - g_value_init (&val, G_TYPE_UINT64); - g_value_set_uint64 (&val, ev->timestamp); - ts = gst_value_serialize (&val); - g_value_unset (&val); - - g_value_init (&val, G_TYPE_UINT); - g_value_set_uint (&val, ev->seqnum); - seqnum = gst_value_serialize (&val); - g_value_unset (&val); - - g_value_init (&val, G_TYPE_INT64); - g_value_set_int64 (&val, gst_event_get_running_time_offset (ev)); - rt_offset = gst_value_serialize (&val); - g_value_unset (&val); - - structure = gst_event_get_structure (ev); - str = gst_structure_to_string (structure); - str64 = g_base64_encode ((guchar *) str, strlen (str) + 1); - g_strdelimit (str64, "=", '_'); - g_free (str); - - s = g_strconcat (type, ":", ts, ":", seqnum, ":", rt_offset, ":", str64, - NULL); - - g_free (type); - g_free (ts); - g_free (seqnum); - g_free (rt_offset); - g_free (str64); - - return s; -} - -static gboolean -gst_value_deserialize_event (GValue * dest, const gchar * s) -{ - GstEvent *ev = NULL; - GValue val = G_VALUE_INIT; - gboolean ret = FALSE; - gchar **fields; - gsize len; - - fields = g_strsplit (s, ":", -1); - if (g_strv_length (fields) != 5) - goto wrong_length; - - g_strdelimit (fields[4], "_", '='); - g_base64_decode_inplace (fields[4], &len); - - g_value_init (&val, gst_event_type_get_type ()); - if (!gst_value_deserialize (&val, fields[0])) - goto fail; - ev = gst_event_new_custom (g_value_get_enum (&val), - gst_structure_new_from_string (fields[4])); - - g_value_unset (&val); - g_value_init (&val, G_TYPE_UINT64); - if (!gst_value_deserialize (&val, fields[1])) - goto fail; - ev->timestamp = g_value_get_uint64 (&val); - - g_value_unset (&val); - g_value_init (&val, G_TYPE_UINT); - if (!gst_value_deserialize (&val, fields[2])) - goto fail; - ev->seqnum = g_value_get_uint (&val); - - g_value_unset (&val); - g_value_init (&val, G_TYPE_INT64); - if (!gst_value_deserialize (&val, fields[3])) - goto fail; - gst_event_set_running_time_offset (ev, g_value_get_int64 (&val)); - - g_value_take_boxed (dest, g_steal_pointer (&ev)); - ret = TRUE; - -fail: - g_clear_pointer (&ev, gst_event_unref); - g_value_unset (&val); - -wrong_length: - g_strfreev (fields); - return ret; -} - -#define REGISTER_SERIALIZATION_NO_COMPARE(_gtype, _type) \ -G_STMT_START { \ - static GstValueTable gst_value = \ - { 0, NULL, \ - gst_value_serialize_ ## _type, gst_value_deserialize_ ## _type }; \ - gst_value.type = _gtype; \ - gst_value_register (&gst_value); \ -} G_STMT_END - -void -gst_ipc_pipeline_comm_plugin_init (void) -{ - static volatile gsize once = 0; - - if (g_once_init_enter (&once)) { - GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_comm_debug, "ipcpipelinecomm", 0, - "ipc pipeline comm"); - QUARK_ID = g_quark_from_static_string ("ipcpipeline-id"); - REGISTER_SERIALIZATION_NO_COMPARE (gst_event_get_type (), event); - g_once_init_leave (&once, (gsize) 1); - } -} diff --git a/gst/ipcpipeline/gstipcpipelinecomm.h b/gst/ipcpipeline/gstipcpipelinecomm.h deleted file mode 100644 index bd7335e9e..000000000 --- a/gst/ipcpipeline/gstipcpipelinecomm.h +++ /dev/null @@ -1,132 +0,0 @@ -/* GStreamer - * Copyright (C) 2015-2017 YouView TV Ltd - * Author: Vincent Penquerch <vincent.penquerch@collabora.co.uk> - * - * gstipcpipelinecomm.h: - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, - * Boston, MA 02110-1301, USA. - */ - - -#ifndef __GST_IPC_PIPELINE_COMM_H__ -#define __GST_IPC_PIPELINE_COMM_H__ - -#include <glib.h> -#include <gst/gst.h> -#include <gst/base/gstadapter.h> - -G_BEGIN_DECLS - -#define GST_FLOW_COMM_ERROR GST_FLOW_CUSTOM_ERROR_1 - -extern GQuark QUARK_ID; - -typedef enum { - GST_IPC_PIPELINE_COMM_STATE_TYPE = 0, - /* for the rest of the states we use directly the data type enums below */ -} GstIpcPipelineCommState; - -typedef enum { - /* reply types */ - GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK = 1, - GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT, - /* data send types */ - GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER, - GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT, - GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT, - GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY, - GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE, - GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST, - GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE, - GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE, -} GstIpcPipelineCommDataType; - -typedef struct -{ - GstElement *element; - - GMutex mutex; - int fdin; - int fdout; - GHashTable *waiting_ids; - - GThread *reader_thread; - gboolean reader_thread_stopping; - volatile gint thread_running; - int reader_thread_stopping_pipe[2]; - GstAdapter *adapter; - guint8 state; - guint32 send_id; - - guint32 payload_length; - guint32 id; - - guint read_chunk_size; - GstClockTime ack_time; - - void (*on_buffer) (guint32, GstBuffer *, gpointer); - void (*on_event) (guint32, GstEvent *, gboolean, gpointer); - void (*on_query) (guint32, GstQuery *, gboolean, gpointer); - void (*on_state_change) (guint32, GstStateChange, gpointer); - void (*on_state_lost) (gpointer); - void (*on_message) (guint32, GstMessage *, gpointer); - gpointer user_data; - -} GstIpcPipelineComm; - -void gst_ipc_pipeline_comm_plugin_init (void); - -void gst_ipc_pipeline_comm_init (GstIpcPipelineComm *comm, GstElement *e); -void gst_ipc_pipeline_comm_clear (GstIpcPipelineComm *comm); -void gst_ipc_pipeline_comm_cancel (GstIpcPipelineComm * comm, - gboolean flushing); - -void gst_ipc_pipeline_comm_write_flow_ack_to_fd (GstIpcPipelineComm * comm, - guint32 id, GstFlowReturn ret); -void gst_ipc_pipeline_comm_write_boolean_ack_to_fd (GstIpcPipelineComm * comm, - guint32 id, gboolean ret); -void gst_ipc_pipeline_comm_write_state_change_ack_to_fd ( - GstIpcPipelineComm * comm, guint32 id, GstStateChangeReturn ret); - -void gst_ipc_pipeline_comm_write_query_result_to_fd (GstIpcPipelineComm * comm, - guint32 id, gboolean result, GstQuery *query); - -GstFlowReturn gst_ipc_pipeline_comm_write_buffer_to_fd ( - GstIpcPipelineComm * comm, GstBuffer * buffer); -gboolean gst_ipc_pipeline_comm_write_event_to_fd (GstIpcPipelineComm * comm, - gboolean upstream, GstEvent * event); -gboolean gst_ipc_pipeline_comm_write_query_to_fd (GstIpcPipelineComm * comm, - gboolean upstream, GstQuery * query); -GstStateChangeReturn gst_ipc_pipeline_comm_write_state_change_to_fd ( - GstIpcPipelineComm * comm, GstStateChange transition); -void gst_ipc_pipeline_comm_write_state_lost_to_fd (GstIpcPipelineComm * comm); -gboolean gst_ipc_pipeline_comm_write_message_to_fd (GstIpcPipelineComm * comm, - GstMessage *message); - -gboolean gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm, - void (*on_buffer) (guint32, GstBuffer *, gpointer), - void (*on_event) (guint32, GstEvent *, gboolean, gpointer), - void (*on_query) (guint32, GstQuery *, gboolean, gpointer), - void (*on_state_change) (guint32, GstStateChange, gpointer), - void (*on_state_lost) (gpointer), - void (*on_message) (guint32, GstMessage *, gpointer), - gpointer user_data); -void gst_ipc_pipeline_comm_stop_reader_thread (GstIpcPipelineComm * comm); - -G_END_DECLS - -#endif - diff --git a/gst/ipcpipeline/gstipcpipelinesink.c b/gst/ipcpipeline/gstipcpipelinesink.c deleted file mode 100644 index f1f02bfcb..000000000 --- a/gst/ipcpipeline/gstipcpipelinesink.c +++ /dev/null @@ -1,722 +0,0 @@ -/* GStreamer - * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu> - * 2005 Wim Taymans <wim@fluendo.com> - * 2006 Thomas Vander Stichele <thomas at apestaart dot org> - * 2014 Tim-Philipp Müller <tim centricular com> - * 2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk> - * - * gstipcpipelinesink.c: - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, - * Boston, MA 02110-1301, USA. - */ -/** - * SECTION:element-ipcpipelinesink - * @see_also: #GstIpcPipelineSrc, #GstIpcSlavePipeline - * - * Communicates with an ipcpipelinesrc element in another process via a socket. - * - * This element, together with ipcpipelinesrc and ipcslavepipeline form a - * mechanism that allows splitting a single pipeline in different processes. - * The main use-case for it is a playback pipeline split in two parts, where the - * first part contains the networking, parsing and demuxing and the second part - * contains the decoding and display. The intention of this split is to improve - * security of an application, by letting the networking, parsing and demuxing - * parts run in a less privileged process than the process that accesses the - * decoder and display. - * - * Once the pipelines in those different processes have been created, the - * playback can be controlled entirely from the first pipeline, which is the - * one that contains ipcpipelinesink. We call this pipeline the “master”. - * All relevant events and queries sent from the application are sent to - * the master pipeline and messages to the application are sent from the master - * pipeline. The second pipeline, in the other process, is transparently slaved. - * - * ipcpipelinesink can work only in push mode and does not synchronize buffers - * to the clock. Synchronization is meant to happen either at the real sink at - * the end of the remote slave pipeline, or not to happen at all, if the - * pipeline is live. - * - * A master pipeline may contain more than one ipcpipelinesink elements, which - * can be connected either to the same slave pipeline or to different ones. - * - * Communication with ipcpipelinesrc on the slave happens via a socket, using a - * custom protocol. Each buffer, event, query, message or state change is - * serialized in a "packet" and sent over the socket. The sender then - * performs a blocking wait for a reply, if a return code is needed. - * - * All objects that contan a GstStructure (messages, queries, events) are - * serialized by serializing the GstStructure to a string - * (gst_structure_to_string). This implies some limitations, of course. - * All fields of this structures that are not serializable to strings (ex. - * object pointers) are ignored, except for some cases where custom - * serialization may occur (ex error/warning/info messages that contain a - * GError are serialized differently). - * - * Buffers are transported by writing their content directly on the socket. - * More efficient ways for memory sharing could be implemented in the future. - */ - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#include <unistd.h> -#include "gstipcpipelinesink.h" -#include <string.h> - -static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", - GST_PAD_SINK, - GST_PAD_ALWAYS, - GST_STATIC_CAPS_ANY); - -GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_sink_debug); -#define GST_CAT_DEFAULT gst_ipc_pipeline_sink_debug - -enum -{ - SIGNAL_DISCONNECT, - /* FILL ME */ - LAST_SIGNAL -}; -static guint gst_ipc_pipeline_sink_signals[LAST_SIGNAL] = { 0 }; - -enum -{ - PROP_0, - PROP_FDIN, - PROP_FDOUT, - PROP_READ_CHUNK_SIZE, - PROP_ACK_TIME, -}; - - -#define DEFAULT_READ_CHUNK_SIZE 4096 -#define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND) - -#define _do_init \ - GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_sink_debug, "ipcpipelinesink", 0, "ipcpipelinesink element"); -#define gst_ipc_pipeline_sink_parent_class parent_class -G_DEFINE_TYPE_WITH_CODE (GstIpcPipelineSink, gst_ipc_pipeline_sink, - GST_TYPE_ELEMENT, _do_init); - -static void gst_ipc_pipeline_sink_set_property (GObject * object, guint prop_id, - const GValue * value, GParamSpec * pspec); -static void gst_ipc_pipeline_sink_get_property (GObject * object, guint prop_id, - GValue * value, GParamSpec * pspec); -static void gst_ipc_pipeline_sink_dispose (GObject * obj); -static void gst_ipc_pipeline_sink_finalize (GObject * obj); -static gboolean gst_ipc_pipeline_sink_start_reader_thread (GstIpcPipelineSink * - sink); -static void gst_ipc_pipeline_sink_stop_reader_thread (GstIpcPipelineSink * - sink); - -static GstStateChangeReturn gst_ipc_pipeline_sink_change_state (GstElement * - element, GstStateChange transition); - -static GstFlowReturn gst_ipc_pipeline_sink_chain (GstPad * pad, - GstObject * parent, GstBuffer * buffer); -static gboolean gst_ipc_pipeline_sink_event (GstPad * pad, GstObject * parent, - GstEvent * event); -static gboolean gst_ipc_pipeline_sink_element_query (GstElement * element, - GstQuery * query); -static gboolean gst_ipc_pipeline_sink_send_event (GstElement * element, - GstEvent * event); -static gboolean gst_ipc_pipeline_sink_query (GstPad * pad, GstObject * parent, - GstQuery * query); -static gboolean gst_ipc_pipeline_sink_pad_activate_mode (GstPad * pad, - GstObject * parent, GstPadMode mode, gboolean active); - - -static void gst_ipc_pipeline_sink_disconnect (GstIpcPipelineSink * sink); -static void pusher (gpointer data, gpointer user_data); - - -static void -gst_ipc_pipeline_sink_class_init (GstIpcPipelineSinkClass * klass) -{ - GObjectClass *gobject_class; - GstElementClass *gstelement_class; - - gobject_class = G_OBJECT_CLASS (klass); - gstelement_class = GST_ELEMENT_CLASS (klass); - - gobject_class->set_property = gst_ipc_pipeline_sink_set_property; - gobject_class->get_property = gst_ipc_pipeline_sink_get_property; - gobject_class->dispose = gst_ipc_pipeline_sink_dispose; - gobject_class->finalize = gst_ipc_pipeline_sink_finalize; - - g_object_class_install_property (gobject_class, PROP_FDIN, - g_param_spec_int ("fdin", "Input file descriptor", - "File descriptor to received data from", - -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_FDOUT, - g_param_spec_int ("fdout", "Output file descriptor", - "File descriptor to send data through", - -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_READ_CHUNK_SIZE, - g_param_spec_uint ("read-chunk-size", "Read chunk size", - "Read chunk size", - 1, 1 << 24, DEFAULT_READ_CHUNK_SIZE, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_ACK_TIME, - g_param_spec_uint64 ("ack-time", "Ack time", - "Maximum time to wait for a response to a message", - 0, G_MAXUINT64, DEFAULT_ACK_TIME, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - gst_ipc_pipeline_sink_signals[SIGNAL_DISCONNECT] = - g_signal_new ("disconnect", - G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, - G_STRUCT_OFFSET (GstIpcPipelineSinkClass, disconnect), - NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); - - gst_element_class_set_static_metadata (gstelement_class, - "Inter-process Pipeline Sink", - "Sink", - "Allows splitting and continuing a pipeline in another process", - "Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>"); - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&sinktemplate)); - - gstelement_class->change_state = - GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_change_state); - gstelement_class->query = - GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_element_query); - gstelement_class->send_event = - GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_send_event); - - klass->disconnect = GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_disconnect); -} - -static void -gst_ipc_pipeline_sink_init (GstIpcPipelineSink * sink) -{ - GstPadTemplate *pad_template; - - GST_OBJECT_FLAG_SET (sink, GST_ELEMENT_FLAG_SINK); - - gst_ipc_pipeline_comm_init (&sink->comm, GST_ELEMENT (sink)); - sink->comm.read_chunk_size = DEFAULT_READ_CHUNK_SIZE; - sink->comm.ack_time = DEFAULT_ACK_TIME; - sink->comm.fdin = -1; - sink->comm.fdout = -1; - sink->threads = g_thread_pool_new (pusher, sink, -1, FALSE, NULL); - gst_ipc_pipeline_sink_start_reader_thread (sink); - - pad_template = - gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (sink), "sink"); - g_return_if_fail (pad_template != NULL); - - sink->sinkpad = gst_pad_new_from_template (pad_template, "sink"); - - gst_pad_set_activatemode_function (sink->sinkpad, - gst_ipc_pipeline_sink_pad_activate_mode); - gst_pad_set_query_function (sink->sinkpad, gst_ipc_pipeline_sink_query); - gst_pad_set_event_function (sink->sinkpad, gst_ipc_pipeline_sink_event); - gst_pad_set_chain_function (sink->sinkpad, gst_ipc_pipeline_sink_chain); - gst_element_add_pad (GST_ELEMENT_CAST (sink), sink->sinkpad); - -} - -static void -gst_ipc_pipeline_sink_dispose (GObject * obj) -{ - GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (obj); - - gst_ipc_pipeline_sink_stop_reader_thread (sink); - gst_ipc_pipeline_comm_cancel (&sink->comm, TRUE); - - G_OBJECT_CLASS (parent_class)->dispose (obj); -} - -static void -gst_ipc_pipeline_sink_finalize (GObject * obj) -{ - GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (obj); - - gst_ipc_pipeline_comm_clear (&sink->comm); - g_thread_pool_free (sink->threads, TRUE, TRUE); - - G_OBJECT_CLASS (parent_class)->finalize (obj); -} - -static void -gst_ipc_pipeline_sink_set_property (GObject * object, guint prop_id, - const GValue * value, GParamSpec * pspec) -{ - GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (object); - - switch (prop_id) { - case PROP_FDIN: - sink->comm.fdin = g_value_get_int (value); - break; - case PROP_FDOUT: - sink->comm.fdout = g_value_get_int (value); - break; - case PROP_READ_CHUNK_SIZE: - sink->comm.read_chunk_size = g_value_get_uint (value); - break; - case PROP_ACK_TIME: - sink->comm.ack_time = g_value_get_uint64 (value); - break; - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; - } -} - -static void -gst_ipc_pipeline_sink_get_property (GObject * object, guint prop_id, - GValue * value, GParamSpec * pspec) -{ - GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (object); - - switch (prop_id) { - case PROP_FDIN: - g_value_set_int (value, sink->comm.fdin); - break; - case PROP_FDOUT: - g_value_set_int (value, sink->comm.fdout); - break; - case PROP_READ_CHUNK_SIZE: - g_value_set_uint (value, sink->comm.read_chunk_size); - break; - case PROP_ACK_TIME: - g_value_set_uint64 (value, sink->comm.ack_time); - break; - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; - } -} - -static gboolean -gst_ipc_pipeline_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) -{ - GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent); - gboolean ret; - - GST_DEBUG_OBJECT (sink, "received event %p of type %s (%d)", - event, gst_event_type_get_name (event->type), event->type); - - ret = gst_ipc_pipeline_comm_write_event_to_fd (&sink->comm, FALSE, event); - gst_event_unref (event); - return ret; -} - -static GstFlowReturn -gst_ipc_pipeline_sink_chain (GstPad * pad, GstObject * parent, - GstBuffer * buffer) -{ - GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent); - GstFlowReturn ret; - - GST_DEBUG_OBJECT (sink, "Rendering buffer %" GST_PTR_FORMAT, buffer); - - ret = gst_ipc_pipeline_comm_write_buffer_to_fd (&sink->comm, buffer); - if (ret != GST_FLOW_OK) - GST_DEBUG_OBJECT (sink, "Peer result was %s", gst_flow_get_name (ret)); - - gst_buffer_unref (buffer); - return ret; -} - -static gboolean -gst_ipc_pipeline_sink_query (GstPad * pad, GstObject * parent, GstQuery * query) -{ - GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent); - gboolean ret; - - GST_DEBUG_OBJECT (sink, "Got query %s: %" GST_PTR_FORMAT, - GST_QUERY_TYPE_NAME (query), query); - - switch (GST_QUERY_TYPE (query)) { - case GST_QUERY_ALLOCATION: - GST_DEBUG_OBJECT (sink, "Rejecting ALLOCATION query"); - return FALSE; - case GST_QUERY_CAPS: - { - /* caps queries occur even while linking the pipeline. - * It is possible that the ipcpipelinesrc may not be connected at this - * point, so let's avoid a couple of errors... */ - GstState state; - GST_OBJECT_LOCK (sink); - state = GST_STATE (sink); - GST_OBJECT_UNLOCK (sink); - if (state == GST_STATE_NULL) - return FALSE; - } - default: - break; - } - ret = gst_ipc_pipeline_comm_write_query_to_fd (&sink->comm, FALSE, query); - - return ret; -} - -static gboolean -gst_ipc_pipeline_sink_element_query (GstElement * element, GstQuery * query) -{ - GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element); - gboolean ret; - - GST_DEBUG_OBJECT (sink, "Got element query %s: %" GST_PTR_FORMAT, - GST_QUERY_TYPE_NAME (query), query); - - ret = gst_ipc_pipeline_comm_write_query_to_fd (&sink->comm, TRUE, query); - GST_DEBUG_OBJECT (sink, "Got query reply: %d: %" GST_PTR_FORMAT, ret, query); - return ret; -} - -static gboolean -gst_ipc_pipeline_sink_send_event (GstElement * element, GstEvent * event) -{ - GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element); - gboolean ret; - - GST_DEBUG_OBJECT (sink, "Got element event %s: %" GST_PTR_FORMAT, - GST_EVENT_TYPE_NAME (event), event); - - ret = gst_ipc_pipeline_comm_write_event_to_fd (&sink->comm, TRUE, event); - GST_DEBUG_OBJECT (sink, "Got event reply: %d: %" GST_PTR_FORMAT, ret, event); - - gst_event_unref (event); - return ret; -} - - -static gboolean -gst_ipc_pipeline_sink_pad_activate_mode (GstPad * pad, - GstObject * parent, GstPadMode mode, gboolean active) -{ - if (mode == GST_PAD_MODE_PULL) - return FALSE; - return TRUE; -} - -static void -on_buffer (guint32 id, GstBuffer * buffer, gpointer user_data) -{ - GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data); - GST_ERROR_OBJECT (sink, - "Got buffer id %u! I never knew buffers could go upstream...", id); - gst_buffer_unref (buffer); -} - -static void -pusher (gpointer data, gpointer user_data) -{ - GstIpcPipelineSink *sink = user_data; - gboolean ret; - guint32 id; - - id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (data), - QUARK_ID)); - - if (GST_IS_EVENT (data)) { - GstEvent *event = GST_EVENT (data); - GST_DEBUG_OBJECT (sink, "Pushing event async: %" GST_PTR_FORMAT, event); - ret = gst_pad_push_event (sink->sinkpad, event); - GST_DEBUG_OBJECT (sink, "Event pushed, return %d", ret); - gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&sink->comm, id, ret); - } else if (GST_IS_QUERY (data)) { - GstQuery *query = GST_QUERY (data); - GST_DEBUG_OBJECT (sink, "Pushing query async: %" GST_PTR_FORMAT, query); - ret = gst_pad_peer_query (sink->sinkpad, query); - GST_DEBUG_OBJECT (sink, "Query pushed, return %d", ret); - gst_ipc_pipeline_comm_write_query_result_to_fd (&sink->comm, id, ret, - query); - gst_query_unref (query); - } else { - GST_ERROR_OBJECT (sink, "Unsupported object type"); - } - gst_object_unref (sink); -} - -static void -on_event (guint32 id, GstEvent * event, gboolean upstream, gpointer user_data) -{ - GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data); - - if (!upstream) { - GST_ERROR_OBJECT (sink, "Got downstream event id %u! Not supposed to...", - id); - gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&sink->comm, id, FALSE); - gst_event_unref (event); - return; - } - - GST_DEBUG_OBJECT (sink, "Got event id %u: %" GST_PTR_FORMAT, id, event); - gst_object_ref (sink); - g_thread_pool_push (sink->threads, event, NULL); -} - -static void -on_query (guint32 id, GstQuery * query, gboolean upstream, gpointer user_data) -{ - GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data); - - if (!upstream) { - GST_ERROR_OBJECT (sink, "Got downstream query id %u! Not supposed to...", - id); - gst_ipc_pipeline_comm_write_query_result_to_fd (&sink->comm, id, FALSE, - query); - gst_query_unref (query); - return; - } - - GST_DEBUG_OBJECT (sink, "Got query id %u: %" GST_PTR_FORMAT, id, query); - gst_object_ref (sink); - g_thread_pool_push (sink->threads, query, NULL); -} - -static void -on_state_change (guint32 id, GstStateChange transition, gpointer user_data) -{ - GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data); - GST_ERROR_OBJECT (sink, "Got state change id %u! Not supposed to...", id); -} - -static void -on_state_lost (gpointer user_data) -{ - GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data); - - GST_DEBUG_OBJECT (sink, "Got state lost notification, losing state"); - - GST_OBJECT_LOCK (sink); - sink->pass_next_async_done = TRUE; - GST_OBJECT_UNLOCK (sink); - - gst_element_lost_state (GST_ELEMENT (sink)); -} - -static void -do_async_done (GstElement * element, gpointer user_data) -{ - GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element); - GstMessage *message = user_data; - - GST_STATE_LOCK (sink); - GST_OBJECT_LOCK (sink); - if (sink->pass_next_async_done) { - sink->pass_next_async_done = FALSE; - GST_OBJECT_UNLOCK (sink); - gst_element_continue_state (element, GST_STATE_CHANGE_SUCCESS); - GST_STATE_UNLOCK (sink); - gst_element_post_message (element, gst_message_ref (message)); - - } else { - GST_OBJECT_UNLOCK (sink); - GST_STATE_UNLOCK (sink); - } -} - -static void -on_message (guint32 id, GstMessage * message, gpointer user_data) -{ - GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data); - - GST_DEBUG_OBJECT (sink, "Got message id %u: %" GST_PTR_FORMAT, id, message); - - switch (GST_MESSAGE_TYPE (message)) { - case GST_MESSAGE_ASYNC_DONE: - GST_OBJECT_LOCK (sink); - if (sink->pass_next_async_done) { - GST_OBJECT_UNLOCK (sink); - gst_element_call_async (GST_ELEMENT (sink), do_async_done, - message, (GDestroyNotify) gst_message_unref); - } else { - GST_OBJECT_UNLOCK (sink); - gst_message_unref (message); - } - return; - default: - break; - } - - gst_element_post_message (GST_ELEMENT (sink), message); -} - -static gboolean -gst_ipc_pipeline_sink_start_reader_thread (GstIpcPipelineSink * sink) -{ - if (!gst_ipc_pipeline_comm_start_reader_thread (&sink->comm, on_buffer, - on_event, on_query, on_state_change, on_state_lost, on_message, - sink)) { - GST_ERROR_OBJECT (sink, "Failed to start reader thread"); - return FALSE; - } - return TRUE; -} - -static void -gst_ipc_pipeline_sink_stop_reader_thread (GstIpcPipelineSink * sink) -{ - gst_ipc_pipeline_comm_stop_reader_thread (&sink->comm); -} - - -static void -gst_ipc_pipeline_sink_disconnect (GstIpcPipelineSink * sink) -{ - GST_DEBUG_OBJECT (sink, "Disconnecting"); - gst_ipc_pipeline_sink_stop_reader_thread (sink); - sink->comm.fdin = -1; - sink->comm.fdout = -1; - gst_ipc_pipeline_comm_cancel (&sink->comm, FALSE); - gst_ipc_pipeline_sink_start_reader_thread (sink); -} - -static GstStateChangeReturn -gst_ipc_pipeline_sink_change_state (GstElement * element, - GstStateChange transition) -{ - GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element); - GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS; - GstStateChangeReturn peer_ret = GST_STATE_CHANGE_SUCCESS; - gboolean async = FALSE; - gboolean down = FALSE; - - GST_DEBUG_OBJECT (sink, "Got state change request: %s -> %s", - gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)), - gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition))); - - switch (transition) { - case GST_STATE_CHANGE_NULL_TO_READY: - if (sink->comm.fdin < 0) { - GST_ERROR_OBJECT (element, "Invalid fdin: %d", sink->comm.fdin); - return GST_STATE_CHANGE_FAILURE; - } - if (sink->comm.fdout < 0) { - GST_ERROR_OBJECT (element, "Invalid fdout: %d", sink->comm.fdout); - return GST_STATE_CHANGE_FAILURE; - } - if (!sink->comm.reader_thread) { - GST_ERROR_OBJECT (element, "Failed to start reader thread"); - return GST_STATE_CHANGE_FAILURE; - } - break; - case GST_STATE_CHANGE_READY_TO_PAUSED: - case GST_STATE_CHANGE_PAUSED_TO_PLAYING: - case GST_STATE_CHANGE_PLAYING_TO_PAUSED: - /* In these transitions, it is possible that the peer returns ASYNC. - * We don't know that in advance, but we post async-start anyway because - * it needs to be delivered *before* async-done, and async-done may - * arrive at any point in time after we've set the state of the peer. - * In case the peer doesn't return ASYNC, we just post async-done - * ourselves and the parent GstBin takes care of matching and deleting - * them, so the app never gets any of these. */ - async = TRUE; - break; - default: - break; - } - - /* downwards state change */ - down = (GST_STATE_TRANSITION_CURRENT (transition) >= - GST_STATE_TRANSITION_NEXT (transition)); - - if (async) { - GST_DEBUG_OBJECT (sink, - "Posting async-start for %s, will need state-change-done", - gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition))); - - gst_element_post_message (GST_ELEMENT (sink), - gst_message_new_async_start (GST_OBJECT (sink))); - - GST_OBJECT_LOCK (sink); - sink->pass_next_async_done = TRUE; - GST_OBJECT_UNLOCK (sink); - } - - /* change the state of the peer first */ - /* If the fd out is -1, we do not actually call the peer. This will happen - when we explicitely disconnected, and in that case we want to be able - to bring the element down to NULL, so it can be restarted with a new - slave pipeline. */ - if (sink->comm.fdout >= 0) { - GST_DEBUG_OBJECT (sink, "Calling peer with state change"); - peer_ret = gst_ipc_pipeline_comm_write_state_change_to_fd (&sink->comm, - transition); - if (ret == GST_STATE_CHANGE_FAILURE && down) { - GST_WARNING_OBJECT (sink, "Peer returned state change failure, " - "but ignoring because we are going down"); - peer_ret = GST_STATE_CHANGE_SUCCESS; - } - } else { - if (down) { - GST_WARNING_OBJECT (sink, "Not calling peer (fdout %d)", - sink->comm.fdout); - peer_ret = GST_STATE_CHANGE_SUCCESS; - } else { - GST_ERROR_OBJECT (sink, "Not calling peer (fdout %d) and failing", - sink->comm.fdout); - peer_ret = GST_STATE_CHANGE_FAILURE; - } - } - - /* chain up to the parent class to change our state, if the peer succeeded */ - if (peer_ret != GST_STATE_CHANGE_FAILURE) { - ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); - - if (G_UNLIKELY (ret == GST_STATE_CHANGE_FAILURE && down)) { - GST_WARNING_OBJECT (sink, "Parent returned state change failure, " - "but ignoring because we are going down"); - ret = GST_STATE_CHANGE_SUCCESS; - } - } - - GST_DEBUG_OBJECT (sink, "For %s -> %s: Peer ret: %s, parent ret: %s", - gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)), - gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)), - gst_element_state_change_return_get_name (peer_ret), - gst_element_state_change_return_get_name (ret)); - - /* now interpret the return codes */ - if (async && peer_ret != GST_STATE_CHANGE_ASYNC) { - GST_DEBUG_OBJECT (sink, "Posting async-done for %s; peer wasn't ASYNC", - gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition))); - - GST_OBJECT_LOCK (sink); - sink->pass_next_async_done = FALSE; - GST_OBJECT_UNLOCK (sink); - - gst_element_post_message (GST_ELEMENT (sink), - gst_message_new_async_done (GST_OBJECT (sink), GST_CLOCK_TIME_NONE)); - } else if (G_UNLIKELY (!async && peer_ret == GST_STATE_CHANGE_ASYNC)) { - GST_WARNING_OBJECT (sink, "Transition not async but peer returned ASYNC"); - peer_ret = GST_STATE_CHANGE_SUCCESS; - } - - if (peer_ret == GST_STATE_CHANGE_FAILURE || ret == GST_STATE_CHANGE_FAILURE) { - if (peer_ret != GST_STATE_CHANGE_FAILURE && sink->comm.fdout >= 0) { - /* only the parent's ret was FAILURE - revert remote changes */ - GST_DEBUG_OBJECT (sink, "Reverting remote state change because parent " - "returned failure"); - gst_ipc_pipeline_comm_write_state_change_to_fd (&sink->comm, - GST_STATE_TRANSITION (GST_STATE_TRANSITION_NEXT (transition), - GST_STATE_TRANSITION_CURRENT (transition))); - } - return GST_STATE_CHANGE_FAILURE; - } - - /* the parent's (GstElement) state change func won't return ASYNC or - * NO_PREROLL, so unless it has returned FAILURE, which we have catched above, - * we are not interested in its return code... just return the peer's */ - return peer_ret; -} diff --git a/gst/ipcpipeline/gstipcpipelinesink.h b/gst/ipcpipeline/gstipcpipelinesink.h deleted file mode 100644 index 18c880eee..000000000 --- a/gst/ipcpipeline/gstipcpipelinesink.h +++ /dev/null @@ -1,67 +0,0 @@ -/* GStreamer - * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu> - * 2000 Wim Taymans <wtay@chello.be> - * 2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk> - * - * gstipcpipelinesink.h: - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, - * Boston, MA 02110-1301, USA. - */ - - -#ifndef __GST_IPC_PIPELINE_SINK_H__ -#define __GST_IPC_PIPELINE_SINK_H__ - -#include <gst/gst.h> -#include "gstipcpipelinecomm.h" - -G_BEGIN_DECLS - -#define GST_TYPE_IPC_PIPELINE_SINK \ - (gst_ipc_pipeline_sink_get_type()) -#define GST_IPC_PIPELINE_SINK(obj) \ - (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_IPC_PIPELINE_SINK,GstIpcPipelineSink)) -#define GST_IPC_PIPELINE_SINK_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_IPC_PIPELINE_SINK,GstIpcPipelineSinkClass)) -#define GST_IS_IPC_PIPELINE_SINK(obj) \ - (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_IPC_PIPELINE_SINK)) -#define GST_IS_IPC_PIPELINE_SINK_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_IPC_PIPELINE_SINK)) -#define GST_IPC_PIPELINE_SINK_CAST(obj) ((GstIpcPipelineSink *)obj) - -typedef struct _GstIpcPipelineSink GstIpcPipelineSink; -typedef struct _GstIpcPipelineSinkClass GstIpcPipelineSinkClass; - -struct _GstIpcPipelineSink { - GstElement element; - - GstIpcPipelineComm comm; - GThreadPool *threads; - gboolean pass_next_async_done; - GstPad *sinkpad; -}; - -struct _GstIpcPipelineSinkClass { - GstElementClass parent_class; - - void (*disconnect) (GstIpcPipelineSink * sink); -}; - -G_GNUC_INTERNAL GType gst_ipc_pipeline_sink_get_type (void); - -G_END_DECLS - -#endif /* __GST_IPC_PIPELINE_SINK_H__ */ diff --git a/gst/ipcpipeline/gstipcpipelinesrc.c b/gst/ipcpipeline/gstipcpipelinesrc.c deleted file mode 100644 index 656bd5a71..000000000 --- a/gst/ipcpipeline/gstipcpipelinesrc.c +++ /dev/null @@ -1,954 +0,0 @@ -/* GStreamer - * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu> - * 2000 Wim Taymans <wim@fluendo.com> - * 2006 Thomas Vander Stichele <thomas at apestaart dot org> - * 2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk> - * - * gstipcpipelinesrc.c: - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, - * Boston, MA 02110-1301, USA. - */ -/** - * SECTION:element-ipcpipelinesrc - * @see_also: #GstIpcPipelineSink, #GstIpcSlavePipeline - * - * Communicates with an ipcpipelinesink element in another process via a socket. - * - * The ipcpipelinesrc element allows 2-way communication with an ipcpipelinesink - * element on another process/pipeline. It is meant to run inside an - * interslavepipeline and when paired with an ipcpipelinesink, it will slave its - * whole parent pipeline to the "master" one, which contains the ipcpipelinesink. - * - * For more details about this mechanism and its uses, see the documentation - * of the ipcpipelinesink element. - */ - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#include <unistd.h> -#include <stdlib.h> -#include <string.h> - -#include "gstipcpipelinesrc.h" - -static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", - GST_PAD_SRC, - GST_PAD_ALWAYS, - GST_STATIC_CAPS_ANY); - -GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_src_debug); -#define GST_CAT_DEFAULT gst_ipc_pipeline_src_debug - -enum -{ - /* FILL ME */ - SIGNAL_FORWARD_MESSAGE, - SIGNAL_DISCONNECT, - LAST_SIGNAL -}; -static guint gst_ipc_pipeline_src_signals[LAST_SIGNAL] = { 0 }; - -enum -{ - PROP_0, - PROP_FDIN, - PROP_FDOUT, - PROP_READ_CHUNK_SIZE, - PROP_ACK_TIME, - PROP_LAST, -}; - -static GQuark QUARK_UPSTREAM; - -#define DEFAULT_READ_CHUNK_SIZE 65536 -#define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND) - -#define _do_init \ - GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_src_debug, "ipcpipelinesrc", 0, "ipcpipelinesrc element"); -#define gst_ipc_pipeline_src_parent_class parent_class -G_DEFINE_TYPE_WITH_CODE (GstIpcPipelineSrc, gst_ipc_pipeline_src, - GST_TYPE_ELEMENT, _do_init); - -static void gst_ipc_pipeline_src_finalize (GObject * object); -static void gst_ipc_pipeline_src_dispose (GObject * object); -static void gst_ipc_pipeline_src_set_property (GObject * object, guint prop_id, - const GValue * value, GParamSpec * pspec); -static void gst_ipc_pipeline_src_get_property (GObject * object, guint prop_id, - GValue * value, GParamSpec * pspec); - -static void gst_ipc_pipeline_src_cancel_queued (GstIpcPipelineSrc * src); - -static gboolean gst_ipc_pipeline_src_start_reader_thread (GstIpcPipelineSrc * - src); -static void gst_ipc_pipeline_src_stop_reader_thread (GstIpcPipelineSrc * src); - -static gboolean gst_ipc_pipeline_src_activate_mode (GstPad * pad, - GstObject * parent, GstPadMode mode, gboolean active); -static gboolean gst_ipc_pipeline_src_srcpad_event (GstPad * pad, - GstObject * parent, GstEvent * event); -static gboolean gst_ipc_pipeline_src_srcpad_query (GstPad * pad, - GstObject * parent, GstQuery * query); -static void gst_ipc_pipeline_src_loop (GstIpcPipelineSrc * src); - -static gboolean gst_ipc_pipeline_src_send_event (GstElement * element, - GstEvent * event); -static gboolean gst_ipc_pipeline_src_query (GstElement * element, - GstQuery * query); -static GstStateChangeReturn gst_ipc_pipeline_src_change_state (GstElement * - element, GstStateChange transition); - -static gboolean gst_ipc_pipeline_src_forward_message (GstIpcPipelineSrc * src, - GstMessage * msg); -static void gst_ipc_pipeline_src_disconnect (GstIpcPipelineSrc * src); - -static void -gst_ipc_pipeline_src_class_init (GstIpcPipelineSrcClass * klass) -{ - GObjectClass *gobject_class = G_OBJECT_CLASS (klass); - GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); - - QUARK_UPSTREAM = g_quark_from_static_string ("ipcpipeline-upstream"); - - gobject_class->dispose = gst_ipc_pipeline_src_dispose; - gobject_class->finalize = gst_ipc_pipeline_src_finalize; - - gobject_class->set_property = gst_ipc_pipeline_src_set_property; - gobject_class->get_property = gst_ipc_pipeline_src_get_property; - - gstelement_class->send_event = - GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_send_event); - gstelement_class->query = GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_query); - gstelement_class->change_state = - GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_change_state); - - klass->forward_message = - GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_forward_message); - klass->disconnect = GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_disconnect); - - GST_DEBUG_REGISTER_FUNCPTR (gst_ipc_pipeline_src_activate_mode); - GST_DEBUG_REGISTER_FUNCPTR (gst_ipc_pipeline_src_srcpad_event); - GST_DEBUG_REGISTER_FUNCPTR (gst_ipc_pipeline_src_srcpad_query); - - g_object_class_install_property (gobject_class, PROP_FDIN, - g_param_spec_int ("fdin", "Input file descriptor", - "File descriptor to read data from", - -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_FDOUT, - g_param_spec_int ("fdout", "Output file descriptor", - "File descriptor to write data through", - -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_READ_CHUNK_SIZE, - g_param_spec_uint ("read-chunk-size", "Read chunk size", - "Read chunk size", - 1, 1 << 24, DEFAULT_READ_CHUNK_SIZE, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_ACK_TIME, - g_param_spec_uint64 ("ack-time", "Ack time", - "Maximum time to wait for a response to a message", - 0, G_MAXUINT64, DEFAULT_ACK_TIME, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - gst_ipc_pipeline_src_signals[SIGNAL_FORWARD_MESSAGE] = - g_signal_new ("forward-message", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, - G_STRUCT_OFFSET (GstIpcPipelineSrcClass, forward_message), NULL, NULL, - g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 1, GST_TYPE_MESSAGE); - - gst_ipc_pipeline_src_signals[SIGNAL_DISCONNECT] = - g_signal_new ("disconnect", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, - G_STRUCT_OFFSET (GstIpcPipelineSrcClass, disconnect), - NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); - - gst_element_class_set_static_metadata (gstelement_class, - "Inter-process Pipeline Source", - "Source", - "Continues a split pipeline from another process", - "Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>"); - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&srctemplate)); -} - -static void -gst_ipc_pipeline_src_init (GstIpcPipelineSrc * src) -{ - GST_OBJECT_FLAG_SET (src, GST_ELEMENT_FLAG_SOURCE); - - gst_ipc_pipeline_comm_init (&src->comm, GST_ELEMENT (src)); - src->comm.read_chunk_size = DEFAULT_READ_CHUNK_SIZE; - src->comm.ack_time = DEFAULT_ACK_TIME; - src->flushing = TRUE; - src->last_ret = GST_FLOW_FLUSHING; - src->queued = NULL; - g_cond_init (&src->create_cond); - - src->srcpad = gst_pad_new_from_static_template (&srctemplate, "src"); - gst_pad_set_activatemode_function (src->srcpad, - gst_ipc_pipeline_src_activate_mode); - gst_pad_set_event_function (src->srcpad, gst_ipc_pipeline_src_srcpad_event); - gst_pad_set_query_function (src->srcpad, gst_ipc_pipeline_src_srcpad_query); - gst_element_add_pad (GST_ELEMENT (src), src->srcpad); - - gst_ipc_pipeline_src_start_reader_thread (src); -} - -static void -gst_ipc_pipeline_src_dispose (GObject * object) -{ - GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object); - - gst_ipc_pipeline_src_stop_reader_thread (src); - gst_ipc_pipeline_src_cancel_queued (src); - gst_ipc_pipeline_comm_cancel (&src->comm, TRUE); - - G_OBJECT_CLASS (parent_class)->dispose (object); -} - -static void -gst_ipc_pipeline_src_finalize (GObject * object) -{ - GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object); - - gst_ipc_pipeline_comm_clear (&src->comm); - g_cond_clear (&src->create_cond); - - G_OBJECT_CLASS (parent_class)->finalize (object); -} - -static void -gst_ipc_pipeline_src_set_property (GObject * object, guint prop_id, - const GValue * value, GParamSpec * pspec) -{ - GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object); - - switch (prop_id) { - case PROP_FDIN: - src->comm.fdin = g_value_get_int (value); - break; - case PROP_FDOUT: - src->comm.fdout = g_value_get_int (value); - break; - case PROP_READ_CHUNK_SIZE: - src->comm.read_chunk_size = g_value_get_uint (value); - break; - case PROP_ACK_TIME: - src->comm.ack_time = g_value_get_uint64 (value); - break; - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; - } -} - -static void -gst_ipc_pipeline_src_get_property (GObject * object, guint prop_id, - GValue * value, GParamSpec * pspec) -{ - GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object); - - g_return_if_fail (GST_IS_IPC_PIPELINE_SRC (object)); - - switch (prop_id) { - case PROP_FDIN: - g_value_set_int (value, src->comm.fdin); - break; - case PROP_FDOUT: - g_value_set_int (value, src->comm.fdout); - break; - case PROP_READ_CHUNK_SIZE: - g_value_set_uint (value, src->comm.read_chunk_size); - break; - case PROP_ACK_TIME: - g_value_set_uint64 (value, src->comm.ack_time); - break; - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; - } -} - -static void -gst_ipc_pipeline_src_log_queue (GstIpcPipelineSrc * src) -{ - GList *queued; - guint n; - - queued = src->queued; - n = 0; - GST_LOG_OBJECT (src, "There are %u objects in the queue", - g_list_length (queued)); - while (queued) { - void *object = queued->data; - if (GST_IS_EVENT (object)) { - GST_LOG_OBJECT (src, " #%u: %s event", n, GST_EVENT_TYPE_NAME (object)); - } else if (GST_IS_QUERY (object)) { - GST_LOG_OBJECT (src, " #%u: %s query", n, GST_QUERY_TYPE_NAME (object)); - } else if (GST_IS_BUFFER (object)) { - GST_LOG_OBJECT (src, " #%u: %zu bytes buffer", n, - (size_t) gst_buffer_get_size (object)); - } else { - GST_LOG_OBJECT (src, " #%u: unknown item in queue", n); - } - queued = queued->next; - ++n; - } -} - -static void -gst_ipc_pipeline_src_cancel_queued (GstIpcPipelineSrc * src) -{ - GList *queued; - guint32 id; - - g_mutex_lock (&src->comm.mutex); - queued = src->queued; - src->queued = NULL; - g_cond_broadcast (&src->create_cond); - g_mutex_unlock (&src->comm.mutex); - - while (queued) { - void *object = queued->data; - - id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (object), - QUARK_ID)); - - queued = g_list_delete_link (queued, queued); - if (GST_IS_EVENT (object)) { - GstEvent *event = GST_EVENT (object); - GST_DEBUG_OBJECT (src, "Cancelling queued event: %" GST_PTR_FORMAT, - event); - gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, FALSE); - gst_event_unref (event); - } else if (GST_IS_BUFFER (object)) { - GstBuffer *buffer = GST_BUFFER (object); - GST_DEBUG_OBJECT (src, "Cancelling queued buffer: %" GST_PTR_FORMAT, - buffer); - gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id, - GST_FLOW_FLUSHING); - gst_buffer_unref (buffer); - } else if (GST_IS_QUERY (object)) { - GstQuery *query = GST_QUERY (object); - GST_DEBUG_OBJECT (src, "Cancelling queued query: %" GST_PTR_FORMAT, - query); - gst_ipc_pipeline_comm_write_query_result_to_fd (&src->comm, id, FALSE, - query); - gst_query_unref (query); - } - } - -} - -static void -gst_ipc_pipeline_src_start_loop (GstIpcPipelineSrc * src) -{ - g_mutex_lock (&src->comm.mutex); - src->flushing = FALSE; - src->last_ret = GST_FLOW_OK; - g_mutex_unlock (&src->comm.mutex); - - gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_ipc_pipeline_src_loop, - src, NULL); -} - -static void -gst_ipc_pipeline_src_stop_loop (GstIpcPipelineSrc * src, gboolean stop) -{ - g_mutex_lock (&src->comm.mutex); - src->flushing = TRUE; - g_cond_broadcast (&src->create_cond); - g_mutex_unlock (&src->comm.mutex); - - if (stop) - gst_pad_stop_task (src->srcpad); -} - -static gboolean -gst_ipc_pipeline_src_activate_mode (GstPad * pad, GstObject * parent, - GstPadMode mode, gboolean active) -{ - GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (parent); - - switch (mode) { - case GST_PAD_MODE_PUSH: - GST_DEBUG_OBJECT (pad, "%s in push mode", active ? "activating" : - "deactivating"); - if (active) { - gst_ipc_pipeline_src_start_loop (src); - } else { - gst_ipc_pipeline_src_stop_loop (src, TRUE); - gst_ipc_pipeline_comm_cancel (&src->comm, FALSE); - } - return TRUE; - default: - GST_DEBUG_OBJECT (pad, "unsupported activation mode"); - return FALSE; - } -} - -static gboolean -gst_ipc_pipeline_src_srcpad_event (GstPad * pad, GstObject * parent, - GstEvent * event) -{ - GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (parent); - gboolean ret; - - GST_DEBUG_OBJECT (src, "Got upstream event %s", GST_EVENT_TYPE_NAME (event)); - - ret = gst_ipc_pipeline_comm_write_event_to_fd (&src->comm, TRUE, event); - gst_event_unref (event); - - GST_DEBUG_OBJECT (src, "Returning event result: %d", ret); - return ret; -} - -static gboolean -gst_ipc_pipeline_src_srcpad_query (GstPad * pad, GstObject * parent, - GstQuery * query) -{ - GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (parent); - gboolean ret; - - /* answer some queries that do not make sense to be forwarded */ - switch (GST_QUERY_TYPE (query)) { - case GST_QUERY_LATENCY: - return TRUE; - case GST_QUERY_CONTEXT: - return FALSE; - case GST_QUERY_CAPS: - { - /* caps queries occur even while linking the pipeline. - * It is possible that the ipcpipelinesink may not be connected at this - * point, so let's avoid a couple of errors... */ - GstState state; - GST_OBJECT_LOCK (src); - state = GST_STATE (src); - GST_OBJECT_UNLOCK (src); - if (state == GST_STATE_NULL) - return FALSE; - } - default: - break; - } - - GST_DEBUG_OBJECT (src, "Got upstream query %s: %" GST_PTR_FORMAT, - GST_QUERY_TYPE_NAME (query), query); - - ret = gst_ipc_pipeline_comm_write_query_to_fd (&src->comm, TRUE, query); - - GST_DEBUG_OBJECT (src, "Returning query result: %d, %" GST_PTR_FORMAT, - ret, query); - return ret; -} - -static void -gst_ipc_pipeline_src_loop (GstIpcPipelineSrc * src) -{ - gpointer object; - guint32 id; - gboolean ok; - GstFlowReturn ret = GST_FLOW_OK; - - g_mutex_lock (&src->comm.mutex); - - while (!src->queued && !src->flushing) - g_cond_wait (&src->create_cond, &src->comm.mutex); - - if (src->flushing) - goto out; - - object = src->queued->data; - src->queued = g_list_delete_link (src->queued, src->queued); - g_mutex_unlock (&src->comm.mutex); - - id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (object), - QUARK_ID)); - - if (GST_IS_BUFFER (object)) { - GstBuffer *buf = GST_BUFFER (object); - GST_DEBUG_OBJECT (src, "Pushing queued buffer: %" GST_PTR_FORMAT, buf); - ret = gst_pad_push (src->srcpad, buf); - GST_DEBUG_OBJECT (src, "pushed id %u, ret: %s", id, - gst_flow_get_name (ret)); - gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id, ret); - } else if (GST_IS_EVENT (object)) { - GstEvent *event = GST_EVENT (object); - GST_DEBUG_OBJECT (src, "Pushing queued event: %" GST_PTR_FORMAT, event); - ok = gst_pad_push_event (src->srcpad, event); - gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ok); - } else if (GST_IS_QUERY (object)) { - GstQuery *query = GST_QUERY (object); - GST_DEBUG_OBJECT (src, "Pushing queued query: %" GST_PTR_FORMAT, query); - ok = gst_pad_peer_query (src->srcpad, query); - gst_ipc_pipeline_comm_write_query_result_to_fd (&src->comm, id, ok, query); - gst_query_unref (query); - } else { - GST_WARNING_OBJECT (src, "Unknown data type queued"); - } - - g_mutex_lock (&src->comm.mutex); - if (!src->queued) - g_cond_broadcast (&src->create_cond); -out: - if (src->flushing) - ret = GST_FLOW_FLUSHING; - if (ret != GST_FLOW_OK) - src->last_ret = ret; - g_mutex_unlock (&src->comm.mutex); - - if (ret == GST_FLOW_FLUSHING) { - gst_ipc_pipeline_src_cancel_queued (src); - gst_pad_pause_task (src->srcpad); - } -} - -static gboolean -gst_ipc_pipeline_src_send_event (GstElement * element, GstEvent * event) -{ - GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element); - return gst_pad_push_event (src->srcpad, event); -} - -static gboolean -gst_ipc_pipeline_src_query (GstElement * element, GstQuery * query) -{ - GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element); - return gst_pad_query (src->srcpad, query); -} - -static GstElement * -find_pipeline (GstElement * element) -{ - GstElement *pipeline = element; - while (GST_ELEMENT_PARENT (pipeline)) { - pipeline = GST_ELEMENT_PARENT (pipeline); - if (GST_IS_PIPELINE (pipeline)) - break; - } - if (!pipeline || !GST_IS_PIPELINE (pipeline)) { - pipeline = NULL; - } - return pipeline; -} - -static gboolean -gst_ipc_pipeline_src_forward_message (GstIpcPipelineSrc * src, GstMessage * msg) -{ - gboolean skip = FALSE; - - GST_DEBUG_OBJECT (src, "Message to forward: %" GST_PTR_FORMAT, msg); - - switch (GST_MESSAGE_TYPE (msg)) { - case GST_MESSAGE_STATE_CHANGED: - { - GstState old, new, pending; - GstElement *pipeline = find_pipeline (GST_ELEMENT (src)); - - gst_message_parse_state_changed (msg, &old, &new, &pending); - - if (GST_MESSAGE_SRC (msg) == GST_OBJECT (pipeline) && - old == new && new == pending) { - GST_DEBUG_OBJECT (src, "Detected lost state, notifying master"); - gst_ipc_pipeline_comm_write_state_lost_to_fd (&src->comm); - } - /* fall through & skip */ - } - case GST_MESSAGE_ASYNC_START: - case GST_MESSAGE_CLOCK_PROVIDE: - case GST_MESSAGE_CLOCK_LOST: - case GST_MESSAGE_NEW_CLOCK: - case GST_MESSAGE_STREAM_STATUS: - case GST_MESSAGE_NEED_CONTEXT: - case GST_MESSAGE_HAVE_CONTEXT: - case GST_MESSAGE_STRUCTURE_CHANGE: - skip = TRUE; - break; - case GST_MESSAGE_RESET_TIME: - { - GQuark ipcpipelinesrc_posted = g_quark_from_static_string - ("gstinterslavepipeline-message-already-posted"); - - skip = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (msg), - ipcpipelinesrc_posted)); - if (!skip) { - gst_mini_object_set_qdata (GST_MINI_OBJECT (msg), ipcpipelinesrc_posted, - GUINT_TO_POINTER (1), NULL); - } - break; - } - case GST_MESSAGE_ERROR: - { - GError *error = NULL; - - /* skip forwarding a RESOURCE/WRITE error message that originated from - * ipcpipelinesrc; we post this error when writing to the comm fd fails, - * so if we try to forward it here, we will likely get another one posted - * immediately and end up doing an endless loop */ - gst_message_parse_error (msg, &error, NULL); - skip = (GST_MESSAGE_SRC (msg) == GST_OBJECT_CAST (src) - && error->domain == gst_resource_error_quark () - && error->code == GST_RESOURCE_ERROR_WRITE); - g_error_free (error); - break; - } - default: - break; - } - - if (skip) { - GST_DEBUG_OBJECT (src, "message will not be forwarded"); - return TRUE; - } - - return gst_ipc_pipeline_comm_write_message_to_fd (&src->comm, msg); -} - -static void -on_buffer (guint32 id, GstBuffer * buffer, gpointer user_data) -{ - GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data); - GST_DEBUG_OBJECT (src, "Got buffer id %u, queueing: %" GST_PTR_FORMAT, id, - buffer); - g_mutex_lock (&src->comm.mutex); - if (!GST_PAD_IS_ACTIVE (src->srcpad) || src->flushing) { - g_mutex_unlock (&src->comm.mutex); - GST_INFO_OBJECT (src, "We're not started or flushing, buffer ignored"); - gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id, - GST_FLOW_FLUSHING); - gst_buffer_unref (buffer); - return; - } - if (src->last_ret != GST_FLOW_OK) { - GstFlowReturn last_ret = src->last_ret; - g_mutex_unlock (&src->comm.mutex); - GST_DEBUG_OBJECT (src, "Last flow was %s, rejecting buffer", - gst_flow_get_name (last_ret)); - gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id, last_ret); - gst_buffer_unref (buffer); - return; - } - src->queued = g_list_append (src->queued, buffer); /* keep the ref */ - gst_ipc_pipeline_src_log_queue (src); - g_cond_broadcast (&src->create_cond); - g_mutex_unlock (&src->comm.mutex); -} - -static void -do_oob_event (GstElement * element, gpointer user_data) -{ - GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element); - GstEvent *event = user_data; - gboolean ret, upstream; - guint32 id; - - id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT - (event), QUARK_ID)); - upstream = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT - (event), QUARK_UPSTREAM)); - - if (upstream) { - GstElement *pipeline; - gboolean ok = FALSE; - - if (!(pipeline = find_pipeline (element))) { - GST_ERROR_OBJECT (src, "No pipeline found"); - gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ok); - } else { - GST_DEBUG_OBJECT (src, "Posting upstream event on pipeline: %" - GST_PTR_FORMAT, event); - ok = gst_element_send_event (pipeline, gst_event_ref (event)); - gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ok); - } - } else { - GST_DEBUG_OBJECT (src, "Pushing event async: %" GST_PTR_FORMAT, event); - ret = gst_element_send_event (element, gst_event_ref (event)); - GST_DEBUG_OBJECT (src, "Event pushed, return %d", ret); - gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ret); - } -} - -static void -on_event (guint32 id, GstEvent * event, gboolean upstream, gpointer user_data) -{ - GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data); - GstFlowReturn last_ret = GST_FLOW_OK; - - GST_DEBUG_OBJECT (src, "Got event id %u, queueing: %" GST_PTR_FORMAT, id, - event); - - gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_UPSTREAM, - GINT_TO_POINTER (upstream), NULL); - - switch (GST_EVENT_TYPE (event)) { - case GST_EVENT_FLUSH_START: - gst_ipc_pipeline_src_stop_loop (src, FALSE); - break; - case GST_EVENT_FLUSH_STOP: - gst_ipc_pipeline_src_start_loop (src); - break; - default: - g_mutex_lock (&src->comm.mutex); - last_ret = src->last_ret; - g_mutex_unlock (&src->comm.mutex); - break; - } - - if (GST_EVENT_IS_SERIALIZED (event) && !upstream) { - if (last_ret != GST_FLOW_OK) { - GST_DEBUG_OBJECT (src, "Last flow was %s, rejecting event", - gst_flow_get_name (last_ret)); - gst_event_unref (event); - gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, FALSE); - } else { - GST_DEBUG_OBJECT (src, "This is a serialized event, adding to queue %p", - src->queued); - g_mutex_lock (&src->comm.mutex); - src->queued = g_list_append (src->queued, event); /* keep the ref */ - gst_ipc_pipeline_src_log_queue (src); - g_cond_broadcast (&src->create_cond); - g_mutex_unlock (&src->comm.mutex); - } - } else { - if (last_ret != GST_FLOW_OK && !upstream) { - GST_DEBUG_OBJECT (src, "Last flow was %s, rejecting event", - gst_flow_get_name (last_ret)); - gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, FALSE); - gst_event_unref (event); - } else { - GST_DEBUG_OBJECT (src, - "This is not a serialized event, pushing in a thread"); - gst_element_call_async (GST_ELEMENT (src), do_oob_event, event, - (GDestroyNotify) gst_event_unref); - } - } -} - -static void -do_oob_query (GstElement * element, gpointer user_data) -{ - GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element); - GstQuery *query = GST_QUERY (user_data); - guint32 id; - gboolean upstream; - gboolean ret; - - id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT - (query), QUARK_ID)); - upstream = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT - (query), QUARK_UPSTREAM)); - - if (upstream) { - GstElement *pipeline; - - if (!(pipeline = find_pipeline (element))) { - GST_ERROR_OBJECT (src, "No pipeline found"); - ret = FALSE; - } else { - GST_DEBUG_OBJECT (src, "Posting query on pipeline: %" GST_PTR_FORMAT, - query); - ret = gst_element_query (pipeline, query); - } - } else { - GST_DEBUG_OBJECT (src, "Pushing query async: %" GST_PTR_FORMAT, query); - ret = gst_pad_peer_query (src->srcpad, query); - GST_DEBUG_OBJECT (src, "Query pushed, return %d", ret); - } - gst_ipc_pipeline_comm_write_query_result_to_fd (&src->comm, id, ret, query); -} - -static void -on_query (guint32 id, GstQuery * query, gboolean upstream, gpointer user_data) -{ - GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data); - - GST_DEBUG_OBJECT (src, "Got query id %u, queueing: %" GST_PTR_FORMAT, id, - query); - - if (GST_QUERY_IS_SERIALIZED (query) && !upstream) { - g_mutex_lock (&src->comm.mutex); - src->queued = g_list_append (src->queued, query); /* keep the ref */ - gst_ipc_pipeline_src_log_queue (src); - g_cond_broadcast (&src->create_cond); - g_mutex_unlock (&src->comm.mutex); - } else { - gst_mini_object_set_qdata (GST_MINI_OBJECT (query), QUARK_UPSTREAM, - GINT_TO_POINTER (upstream), NULL); - gst_element_call_async (GST_ELEMENT (src), do_oob_query, query, - (GDestroyNotify) gst_query_unref); - } -} - -struct StateChangeData -{ - guint32 id; - GstStateChange transition; -}; - -static void -do_state_change (GstElement * element, gpointer data) -{ - GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element); - GstElement *pipeline; - GstStateChangeReturn ret = GST_STATE_CHANGE_FAILURE; - GstState state, pending, effective; - struct StateChangeData *d = data; - guint32 id = d->id; - GstStateChange transition = d->transition; - gboolean down; - - GST_DEBUG_OBJECT (src, "Doing state change id %u, %s -> %s", id, - gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)), - gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition))); - - if (!(pipeline = find_pipeline (element))) { - GST_ERROR_OBJECT (src, "No pipeline found"); - ret = GST_STATE_CHANGE_FAILURE; - goto done_nolock; - } - - down = (GST_STATE_TRANSITION_CURRENT (transition) >= - GST_STATE_TRANSITION_NEXT (transition)); - - GST_STATE_LOCK (pipeline); - ret = gst_element_get_state (pipeline, &state, &pending, 0); - - /* if we are pending a state change, count the pending state as - * the current one */ - effective = pending == GST_STATE_VOID_PENDING ? state : pending; - - GST_DEBUG_OBJECT (src, "Current element state: ret:%s state:%s pending:%s " - "effective:%s", gst_element_state_change_return_get_name (ret), - gst_element_state_get_name (state), - gst_element_state_get_name (pending), - gst_element_state_get_name (effective)); - - if ((GST_STATE_TRANSITION_NEXT (transition) <= effective && !down) || - (GST_STATE_TRANSITION_NEXT (transition) > effective && down)) { - /* if the request was to transition to a state that we have already - * transitioned to in the same direction, then we just silently return */ - GST_DEBUG_OBJECT (src, "State transition to %s is unnecessary", - gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition))); - /* make sure we return SUCCESS if the transition is to NULL or READY, - * even if our current ret is ASYNC for example; also, make sure not - * to return FAILURE, since our state is already committed */ - if (GST_STATE_TRANSITION_NEXT (transition) <= GST_STATE_READY || - ret == GST_STATE_CHANGE_FAILURE) { - ret = GST_STATE_CHANGE_SUCCESS; - } - } else if (ret != GST_STATE_CHANGE_FAILURE || down) { - /* if the request was to transition to a state that we haven't already - * transitioned to in the same direction, then we need to request a state - * change in the pipeline, *unless* we are going upwards and the last ret - * was FAILURE, in which case we should just return FAILURE and stop. - * We don't stop a downwards state change though in case of FAILURE, since - * we need to be able to bring the pipeline down to NULL. Note that - * GST_MESSAGE_ERROR will cause ret to be GST_STATE_CHANGE_FAILURE */ - ret = gst_element_set_state (pipeline, - GST_STATE_TRANSITION_NEXT (transition)); - GST_DEBUG_OBJECT (src, "gst_element_set_state returned %s", - gst_element_state_change_return_get_name (ret)); - } - - GST_STATE_UNLOCK (pipeline); - -done_nolock: - GST_DEBUG_OBJECT (src, "sending state change ack, ret = %s", - gst_element_state_change_return_get_name (ret)); - gst_ipc_pipeline_comm_write_state_change_ack_to_fd (&src->comm, id, ret); -} - -static void -on_state_change (guint32 id, GstStateChange transition, gpointer user_data) -{ - struct StateChangeData *d; - GstElement *ipcpipelinesrc = GST_ELEMENT (user_data); - - GST_DEBUG_OBJECT (ipcpipelinesrc, "Got state change id %u, %s -> %s", id, - gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)), - gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition))); - - d = g_new (struct StateChangeData, 1); - d->id = id; - d->transition = transition; - - gst_element_call_async (ipcpipelinesrc, do_state_change, d, g_free); -} - -static void -on_message (guint32 id, GstMessage * message, gpointer user_data) -{ - GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data); - - GST_ERROR_OBJECT (src, "Got message id %u, not supposed to: %" GST_PTR_FORMAT, - id, message); - gst_message_unref (message); -} - -static gboolean -gst_ipc_pipeline_src_start_reader_thread (GstIpcPipelineSrc * src) -{ - if (!gst_ipc_pipeline_comm_start_reader_thread (&src->comm, on_buffer, - on_event, on_query, on_state_change, NULL, on_message, src)) { - GST_ERROR_OBJECT (src, "Failed to start reader thread"); - return FALSE; - } - return TRUE; -} - -static void -gst_ipc_pipeline_src_stop_reader_thread (GstIpcPipelineSrc * src) -{ - gst_ipc_pipeline_comm_stop_reader_thread (&src->comm); -} - -static void -gst_ipc_pipeline_src_disconnect (GstIpcPipelineSrc * src) -{ - GST_DEBUG_OBJECT (src, "Disconnecting"); - gst_ipc_pipeline_src_stop_reader_thread (src); - src->comm.fdin = -1; - src->comm.fdout = -1; - gst_ipc_pipeline_comm_cancel (&src->comm, FALSE); - gst_ipc_pipeline_src_start_reader_thread (src); -} - -static GstStateChangeReturn -gst_ipc_pipeline_src_change_state (GstElement * element, - GstStateChange transition) -{ - GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element); - - switch (transition) { - case GST_STATE_CHANGE_NULL_TO_READY: - if (src->comm.fdin < 0) { - GST_ERROR_OBJECT (element, "Invalid fdin: %d", src->comm.fdin); - return GST_STATE_CHANGE_FAILURE; - } - if (src->comm.fdout < 0) { - GST_ERROR_OBJECT (element, "Invalid fdout: %d", src->comm.fdout); - return GST_STATE_CHANGE_FAILURE; - } - if (!src->comm.reader_thread) { - GST_ERROR_OBJECT (element, "Failed to start reader thread"); - return GST_STATE_CHANGE_FAILURE; - } - break; - default: - break; - } - return GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); -} diff --git a/gst/ipcpipeline/gstipcpipelinesrc.h b/gst/ipcpipeline/gstipcpipelinesrc.h deleted file mode 100644 index 221fccdd1..000000000 --- a/gst/ipcpipeline/gstipcpipelinesrc.h +++ /dev/null @@ -1,76 +0,0 @@ -/* GStreamer - * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu> - * 2000 Wim Taymans <wtay@chello.be> - * 2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk> - * - * gstipcpipelinesrc.h: - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, - * Boston, MA 02110-1301, USA. - */ - - -#ifndef __GST_IPC_PIPELINE_SRC_H__ -#define __GST_IPC_PIPELINE_SRC_H__ - -#include <gst/gst.h> -#include "gstipcpipelinecomm.h" - -G_BEGIN_DECLS - -#define GST_TYPE_IPC_PIPELINE_SRC \ - (gst_ipc_pipeline_src_get_type()) -#define GST_IPC_PIPELINE_SRC(obj) \ - (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_IPC_PIPELINE_SRC,GstIpcPipelineSrc)) -#define GST_IPC_PIPELINE_SRC_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_IPC_PIPELINE_SRC,GstIpcPipelineSrcClass)) -#define GST_IS_IPC_PIPELINE_SRC(obj) \ - (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_IPC_PIPELINE_SRC)) -#define GST_IS_IPC_PIPELINE_SRC_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_IPC_PIPELINE_SRC)) - -typedef struct _GstIpcPipelineSrc GstIpcPipelineSrc; -typedef struct _GstIpcPipelineSrcClass GstIpcPipelineSrcClass; - -/** - * GstIpcPipelineSrc: - * - * Opaque #GstIpcPipelineSrc data structure. - */ -struct _GstIpcPipelineSrc { - GstElement element; - - GstIpcPipelineComm comm; - GstPad *srcpad; - - gboolean flushing; - GList *queued; - GstFlowReturn last_ret; - - GCond create_cond; -}; - -struct _GstIpcPipelineSrcClass { - GstElementClass parent_class; - - gboolean (*forward_message) (GstIpcPipelineSrc *, GstMessage *); - void (*disconnect) (GstIpcPipelineSrc * src); -}; - -G_GNUC_INTERNAL GType gst_ipc_pipeline_src_get_type (void); - -G_END_DECLS - -#endif /* __GST_IPC_PIPELINE_SRC_H__ */ diff --git a/gst/ipcpipeline/gstipcslavepipeline.c b/gst/ipcpipeline/gstipcslavepipeline.c deleted file mode 100644 index 2c9dd1516..000000000 --- a/gst/ipcpipeline/gstipcslavepipeline.c +++ /dev/null @@ -1,122 +0,0 @@ -/* GStreamer - * Copyright (C) 2015-2017 YouView TV Ltd - * Author: Vincent Penquerc'h <vincent.penquerch@collabora.co.uk> - * - * gstipcslavepipeline.c: - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, - * Boston, MA 02110-1301, USA. - */ -/** - * SECTION:element-ipcslavepipeline - * @see_also: #GstIpcPipelineSink, #GstIpcPipelineSrc - * - * This is a GstPipeline subclass meant to embed one ore more ipcpipelinesrc - * elements, and be slaved transparently to the master pipeline, using one ore - * more ipcpipelinesink elements on the master. - * - * The actual pipeline slaving logic happens in ipcpipelinesrc. The only thing - * that this class actually does is that it watches the pipeline bus for - * messages and forwards them to the master pipeline through the ipcpipelinesrc - * elements that it contains. - * - * For more details about this mechanism and its uses, see the documentation - * of the ipcpipelinesink element. - */ - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif - -#include <string.h> - -#include "gstipcpipelinesrc.h" -#include "gstipcslavepipeline.h" - -GST_DEBUG_CATEGORY_STATIC (gst_ipcslavepipeline_debug); -#define GST_CAT_DEFAULT gst_ipcslavepipeline_debug - -#define _do_init \ - GST_DEBUG_CATEGORY_INIT (gst_ipcslavepipeline_debug, "ipcslavepipeline", 0, "ipcslavepipeline element"); -#define gst_ipc_slave_pipeline_parent_class parent_class -G_DEFINE_TYPE_WITH_CODE (GstIpcSlavePipeline, gst_ipc_slave_pipeline, - GST_TYPE_PIPELINE, _do_init); - -static gboolean gst_ipc_slave_pipeline_post_message (GstElement * element, - GstMessage * message); - -static void -gst_ipc_slave_pipeline_class_init (GstIpcSlavePipelineClass * klass) -{ - GstElementClass *element_class; - - element_class = GST_ELEMENT_CLASS (klass); - - element_class->post_message = gst_ipc_slave_pipeline_post_message; - - gst_element_class_set_static_metadata (element_class, - "Inter-process slave pipeline", - "Generic/Bin/Slave", - "Contains the slave part of an inter-process pipeline", - "Vincent Penquerc'h <vincent.penquerch@collabora.co.uk"); -} - -static void -gst_ipc_slave_pipeline_init (GstIpcSlavePipeline * isp) -{ -} - -static gboolean -send_message_if_ipcpipelinesrc (const GValue * v, GValue * r, - gpointer user_data) -{ - GstElement *e; - GType et; - gboolean ret; - GstMessage *message = user_data; - - e = g_value_get_object (v); - et = gst_element_factory_get_element_type (gst_element_get_factory (e)); - if (et == GST_TYPE_IPC_PIPELINE_SRC) { - g_signal_emit_by_name (G_OBJECT (e), "forward-message", message, &ret); - - /* if we succesfully sent this to the master and it's not ASYNC_DONE or EOS, - * we can skip sending it again through the other ipcpipelinesrcs */ - if (ret && GST_MESSAGE_TYPE (message) != GST_MESSAGE_ASYNC_DONE && - GST_MESSAGE_TYPE (message) != GST_MESSAGE_EOS) - return FALSE; - } - return TRUE; -} - -static void -gst_ipc_slave_pipeline_forward_message (GstIpcSlavePipeline * pipeline, - GstMessage * message) -{ - GstIterator *it; - - it = gst_bin_iterate_sources (GST_BIN (pipeline)); - gst_iterator_fold (it, send_message_if_ipcpipelinesrc, NULL, message); - gst_iterator_free (it); -} - -static gboolean -gst_ipc_slave_pipeline_post_message (GstElement * element, GstMessage * message) -{ - gst_ipc_slave_pipeline_forward_message (GST_IPC_SLAVE_PIPELINE - (element), message); - - return GST_ELEMENT_CLASS (parent_class)->post_message (element, message); -} diff --git a/gst/ipcpipeline/gstipcslavepipeline.h b/gst/ipcpipeline/gstipcslavepipeline.h deleted file mode 100644 index 354dfacf0..000000000 --- a/gst/ipcpipeline/gstipcslavepipeline.h +++ /dev/null @@ -1,59 +0,0 @@ -/* GStreamer - * Copyright (C) 2015-2017 YouView TV Ltd - * Author: Vincent Penquerc'h <vincent.penquerch@collabora.co.uk> - * - * gstipcslavepipeline.h: - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, - * Boston, MA 02110-1301, USA. - */ -#ifndef _GST_IPC_SLAVE_PIPELINE_H_ -#define _GST_IPC_SLAVE_PIPELINE_H_ - -#include <gst/gst.h> - -G_BEGIN_DECLS - -#define GST_TYPE_IPC_SLAVE_PIPELINE \ - (gst_ipc_slave_pipeline_get_type()) -#define GST_IPC_SLAVE_PIPELINE(obj) \ - (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_IPC_SLAVE_PIPELINE,GstIpcSlavePipeline)) -#define GST_IPC_SLAVE_PIPELINE_CAST(obj) \ - ((GstIpcSlavePipeline *) obj) -#define GST_IPC_SLAVE_PIPELINE_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_IPC_SLAVE_PIPELINE,GstIpcSlavePipelineClass)) -#define GST_IS_IPC_SLAVE_PIPELINE(obj) \ - (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_IPC_SLAVE_PIPELINE)) -#define GST_IS_IPC_SLAVE_PIPELINE_CLASS(obj) \ - (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_IPC_SLAVE_PIPELINE)) - -typedef struct _GstIpcSlavePipeline GstIpcSlavePipeline; -typedef struct _GstIpcSlavePipelineClass GstIpcSlavePipelineClass; - -struct _GstIpcSlavePipeline -{ - GstPipeline pipeline; -}; - -struct _GstIpcSlavePipelineClass -{ - GstPipelineClass pipeline_class; -}; - -G_GNUC_INTERNAL GType gst_ipc_slave_pipeline_get_type (void); - -G_END_DECLS - -#endif diff --git a/gst/ipcpipeline/meson.build b/gst/ipcpipeline/meson.build deleted file mode 100644 index 7ce3809f5..000000000 --- a/gst/ipcpipeline/meson.build +++ /dev/null @@ -1,16 +0,0 @@ -ipcpipeline_sources = [ - 'gstipcpipeline.c', - 'gstipcpipelinecomm.c', - 'gstipcpipelinesink.c', - 'gstipcpipelinesrc.c', - 'gstipcslavepipeline.c' -] - -gstipcpipeline = library('gstipcpipeline', - ipcpipeline_sources, - c_args : gst_plugins_bad_args, - include_directories : [configinc], - dependencies : [gstbase_dep], - install : true, - install_dir : plugins_install_dir, -) diff --git a/gst/ipcpipeline/protocol.txt b/gst/ipcpipeline/protocol.txt deleted file mode 100644 index 77f6e568e..000000000 --- a/gst/ipcpipeline/protocol.txt +++ /dev/null @@ -1,92 +0,0 @@ -This documents the protocol used to pass data over fds between ipcpipelinesrc -and ipcpipelinesink. - -The protocol is used in both directions. However, some combinations do -not make sense (eg, a buffer going from ipcpipelinesrc to ipcpipelinesink). - -The protocol consists of an arbitrary number of variable sized chunks -with a type. Each chunk has a request ID which can be used to match a -request with its reply (ack / query result). - -Each chunk consists of: - - a type (byte): - 1: ack - 2: query result - 3: buffer - 4: event - 5: sink message event - 6: query - 7: state change - 8: state lost - 9: message - 10: error/warning/info message - - a request ID, 4 bytes, little endian - - the payload size, 4 bytes, little endian - - N bytes payload - -Depending on the type, the payload can contain: - - - 1: ack - result: 4 bytes, little endian - interpreted as GstFlowReturn for buffers, boolean for events and - GstStateChangeReturn for state changes - - 2: query result - result boolean: 1 byte - query type: 4 bytes, little endian - returned query string representation, NUL terminated - - 3: buffer: - pts: 8 bytes, little endian - dts: 8 bytes, little endian - duration: 8 bytes, little endian - offset: 8 bytes, little endian - offset end: 8 bytes, little endian - flags: 8 bytes, little endian - buffer size: 4 bytes, little endian - data: contents of the buffer data, size specified in "buffer size" - number of GstMeta: 4 bytes, little endian - For each GstMeta: - bytes: 4 bytes, little endian - this is the number of bytes before the string representation - at the end of this block, including the 4 bytes of itself - flags: 4 bytes, little endian - length of the GstMetaInfo::api name: 4 bytes, little endian - GstMetaInfo::api name: string, NUL terminated - GstMetaInfo::size: 8 bytes, little endian - length of the string representation: 4 bytes, little endian - string representation, NUL terminated - - 4: event - event type: 4 bytes, little endian - sequence number: 4 bytes, little endian - direction: 1 byte - whether the event is going upstream (1) or downstream (0) - string representation, NUL terminated - - 5: sink message event - message type: 4 bytes, little endian - event sequence number: 4 bytes, little endian - message sequence number: 4 bytes, little endian - length: 4 bytes, little endian - event structure name: length bytes, NUL terminated - message structure string representation: remaining bytes, NUL terminated - - 6: query - query type: 4 bytes, little endian - direction: 1 byte - whether the query is going upstream (1) or downstream (0) - string representation, NUL terminated - - 7: state change - GstStateChange: 4 bytes, little endian - - 8: state lost - no payload - - 9: message - message type: 4 bytes, little endian - string representation, NUL terminated - - 10: error/warning/info message - message type (2 = error, 1 = warning, 0 = info): 1 byte - error domain string length: 4 bytes, little endian - string representation of the error domain, NUL terminated - error code: 4 bytes, little endian - length: 4 bytes, little endian - if zero: no error message - if non zero: As many bytes as this length: the error message, NUL terminated - length: 4 bytes, little endian - if zero: no extra message - if non zero: As many bytes as this length: the error extra debug message, NUL terminated diff --git a/gst/meson.build b/gst/meson.build index 29872f224..1017adf30 100644 --- a/gst/meson.build +++ b/gst/meson.build @@ -27,7 +27,6 @@ subdir('geometrictransform') subdir('id3tag') subdir('inter') subdir('interlace') -subdir('ipcpipeline') subdir('ivfparse') subdir('ivtc') subdir('jp2kdecimator') |