diff options
-rw-r--r-- | gst/shm/gstshm.h | 64 | ||||
-rw-r--r-- | gst/shm/gstshmsink.c | 542 | ||||
-rw-r--r-- | gst/shm/gstshmsink.h | 22 |
3 files changed, 367 insertions, 261 deletions
diff --git a/gst/shm/gstshm.h b/gst/shm/gstshm.h deleted file mode 100644 index 601cfde0a..000000000 --- a/gst/shm/gstshm.h +++ /dev/null @@ -1,64 +0,0 @@ - -/* GStreamer - * Copyright (C) <2009> Collabora Ltd - * @author: Olivier Crete <olivier.crete@collabora.co.uk - * Copyright (C) <2009> Nokia Inc - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - -#ifndef __GST_SHM_H__ -#define __GST_SHM_H__ - -#include <gst/gst.h> -#include <semaphore.h> - -G_BEGIN_DECLS -#define GST_SHM_MAX_CAPS_LENGTH (1024) -#define SHM_LOCK(shm) sem_wait (&(shm)->mutex); -#define SHM_UNLOCK(shm) sem_post (&(shm)->mutex); -#define GST_SHM_CAPS_BUFFER(shm) ((shm)->data) -#define GST_SHM_BUFFER(shm) ((shm)->data+(shm)->caps_size) - struct GstShmHeader -{ - sem_t notification; - sem_t mutex; - - guint caps_gen; - guint buffer_gen; - - gint caps_size; - gint buffer_size; - - guint flags; - - GstClockTime timestamp; - GstClockTime duration; - - guint64 offset; - guint64 offset_end; - - gboolean eos; - - gchar data[0]; - /* - * gchar caps_buffer[caps_size]; - * gchar buffer[buffer_size]; - */ -}; - -G_END_DECLS -#endif /* __GST_SHM_H__ */ diff --git a/gst/shm/gstshmsink.c b/gst/shm/gstshmsink.c index 48332e60d..9a0233b25 100644 --- a/gst/shm/gstshmsink.c +++ b/gst/shm/gstshmsink.c @@ -25,21 +25,16 @@ #endif #include "gstshmsink.h" -#include "gstshm.h" #include <gst/gst.h> -#include <sys/mman.h> -#include <sys/stat.h> -#include <fcntl.h> -#include <errno.h> #include <string.h> -#include <unistd.h> -#include <semaphore.h> /* signals */ enum { + SIGNAL_CLIENT_CONNECTED, + SIGNAL_CLIENT_DISCONNECTED, LAST_SIGNAL }; @@ -47,12 +42,25 @@ enum enum { PROP_0, - PROP_SHM_NAME, - PROP_PERMS + PROP_SOCKET_PATH, + PROP_PERMS, + PROP_SHM_SIZE, + PROP_WAIT_FOR_CONNECTION }; +struct GstShmClient +{ + ShmClient *client; + GstPollFD pollfd; +}; + +#define DEFAULT_SIZE ( 256 * 1024 ) +#define DEFAULT_WAIT_FOR_CONNECTION (TRUE) +#define DEFAULT_PERMS (S_IRWXU | S_IRWXG) + GST_DEBUG_CATEGORY_STATIC (shmsink_debug); +#define GST_CAT_DEFAULT shmsink_debug static const GstElementDetails gst_shm_sink_details = GST_ELEMENT_DETAILS ("Shared Memory Sink", @@ -67,21 +75,25 @@ static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", GST_BOILERPLATE (GstShmSink, gst_shm_sink, GstBaseSink, GST_TYPE_BASE_SINK); +static void gst_shm_sink_finalize (GObject * object); static void gst_shm_sink_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); static void gst_shm_sink_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); -static GstStateChangeReturn gst_shm_sink_change_state (GstElement * element, - GstStateChange transition); - static gboolean gst_shm_sink_start (GstBaseSink * bsink); static gboolean gst_shm_sink_stop (GstBaseSink * bsink); -static gboolean gst_shm_sink_set_caps (GstBaseSink * bsink, GstCaps * caps); static GstFlowReturn gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf); +static GstFlowReturn gst_shm_sink_buffer_alloc (GstBaseSink * sink, + guint64 offset, guint size, GstCaps * caps, GstBuffer ** out_buf); + static gboolean gst_shm_sink_event (GstBaseSink * bsink, GstEvent * event); +static gboolean gst_shm_sink_unlock (GstBaseSink * bsink); +static gboolean gst_shm_sink_unlock_stop (GstBaseSink * bsink); -// static guint gst_shm_sink_signals[LAST_SIGNAL] = { 0 }; +static gpointer pollthread_func (gpointer data); + +static guint signals[LAST_SIGNAL] = { 0 }; static void gst_shm_sink_base_init (gpointer g_class) @@ -98,8 +110,10 @@ gst_shm_sink_base_init (gpointer g_class) static void gst_shm_sink_init (GstShmSink * self, GstShmSinkClass * g_class) { - self->fd = -1; - self->shm_area = MAP_FAILED; + self->cond = g_cond_new (); + self->size = DEFAULT_SIZE; + self->wait_for_connection = DEFAULT_WAIT_FOR_CONNECTION; + self->perms = DEFAULT_PERMS; } static void @@ -113,33 +127,64 @@ gst_shm_sink_class_init (GstShmSinkClass * klass) gstelement_class = (GstElementClass *) klass; gstbasesink_class = (GstBaseSinkClass *) klass; + gobject_class->finalize = gst_shm_sink_finalize; gobject_class->set_property = gst_shm_sink_set_property; gobject_class->get_property = gst_shm_sink_get_property; - gstelement_class->change_state = gst_shm_sink_change_state; - - gstbasesink_class->start = gst_shm_sink_start; - gstbasesink_class->stop = gst_shm_sink_stop; - gstbasesink_class->set_caps = gst_shm_sink_set_caps; - gstbasesink_class->render = gst_shm_sink_render; - gstbasesink_class->event = gst_shm_sink_event; - - g_object_class_install_property (gobject_class, PROP_SHM_NAME, - g_param_spec_string ("shm-name", - "Name of the shared memory area", - "The name of the shared memory area that the source can read from", - NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_shm_sink_start); + gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_shm_sink_stop); + gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_shm_sink_render); + gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_shm_sink_event); + gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_shm_sink_unlock); + gstbasesink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_shm_sink_unlock_stop); + gstbasesink_class->buffer_alloc = + GST_DEBUG_FUNCPTR (gst_shm_sink_buffer_alloc); + + g_object_class_install_property (gobject_class, PROP_SOCKET_PATH, + g_param_spec_string ("socket-path", + "Path to the control socket", + "The path to the control socket used to control the shared memory" + " transport", NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_PERMS, g_param_spec_uint ("perms", "Permissions on the shm area", "Permissions to set on the shm area", - 0, 07777, S_IRWXU | S_IRWXG, - G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + 0, 07777, DEFAULT_PERMS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_SHM_SIZE, + g_param_spec_uint ("shm-size", + "Size of the shm area", + "Size of the shared memory area", + 0, G_MAXUINT, DEFAULT_SIZE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_WAIT_FOR_CONNECTION, + g_param_spec_boolean ("wait-for-connection", + "Wait for a connection until rendering", + "Block the stream until the shm pipe is connected", + DEFAULT_WAIT_FOR_CONNECTION, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + signals[SIGNAL_CLIENT_CONNECTED] = g_signal_new ("client-connected", + GST_TYPE_SHM_SINK, G_SIGNAL_RUN_LAST, 0, NULL, NULL, + g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT); + + signals[SIGNAL_CLIENT_DISCONNECTED] = g_signal_new ("client-disconnected", + GST_TYPE_SHM_SINK, G_SIGNAL_RUN_LAST, 0, NULL, NULL, + g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT); GST_DEBUG_CATEGORY_INIT (shmsink_debug, "shmsink", 0, "Shared Memory Sink"); } +static void +gst_shm_sink_finalize (GObject * object) +{ + GstShmSink *self = GST_SHM_SINK (object); + + g_cond_free (self->cond); +} + /* * Set the value of a property for the server sink. */ @@ -150,19 +195,34 @@ gst_shm_sink_set_property (GObject * object, guint prop_id, GstShmSink *self = GST_SHM_SINK (object); switch (prop_id) { - case PROP_SHM_NAME: + case PROP_SOCKET_PATH: GST_OBJECT_LOCK (object); - g_free (self->shm_name); - self->shm_name = g_value_dup_string (value); + g_free (self->socket_path); + self->socket_path = g_value_dup_string (value); GST_OBJECT_UNLOCK (object); break; case PROP_PERMS: + GST_OBJECT_LOCK (object); self->perms = g_value_get_uint (value); - if (self->fd >= 0) - if (fchmod (self->fd, g_value_get_uint (value))) - GST_WARNING_OBJECT (self, - "Could not set permissions %o on shm area: %s", - g_value_get_uint (value), strerror (errno)); + sp_writer_setperms_shm (self->pipe, self->perms); + GST_OBJECT_UNLOCK (object); + break; + case PROP_SHM_SIZE: + GST_OBJECT_LOCK (object); + if (sp_writer_resize (self->pipe, g_value_get_uint (value)) < 0) + GST_DEBUG_OBJECT (self, + "Resize shared memory area from %u to %u bytes"); + else + GST_WARNING_OBJECT (self, + "Could not resize shared memory area from %u to %u bytes"); + self->size = g_value_get_uint (value); + GST_OBJECT_UNLOCK (object); + break; + case PROP_WAIT_FOR_CONNECTION: + GST_OBJECT_LOCK (object); + self->wait_for_connection = g_value_get_boolean (value); + GST_OBJECT_UNLOCK (object); + g_cond_broadcast (self->cond); break; default: break; @@ -175,22 +235,29 @@ gst_shm_sink_get_property (GObject * object, guint prop_id, { GstShmSink *self = GST_SHM_SINK (object); + GST_OBJECT_LOCK (object); + switch (prop_id) { - case PROP_SHM_NAME: - GST_OBJECT_LOCK (object); - g_value_set_string (value, self->shm_name); - GST_OBJECT_UNLOCK (object); + case PROP_SOCKET_PATH: + g_value_set_string (value, self->socket_path); break; case PROP_PERMS: self->perms = g_value_get_uint (value); - if (self->fd >= 0) - fchmod (self->fd, g_value_get_uint (value)); + if (self->pipe) + sp_writer_setperms_shm (self->pipe, self->perms); + break; + case PROP_SHM_SIZE: + g_value_set_uint (value, self->size); + break; + case PROP_WAIT_FOR_CONNECTION: + g_value_set_boolean (value, self->wait_for_connection); break; - default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } + + GST_OBJECT_UNLOCK (object); } @@ -199,62 +266,44 @@ static gboolean gst_shm_sink_start (GstBaseSink * bsink) { GstShmSink *self = GST_SHM_SINK (bsink); - g_return_val_if_fail (self->fd == -1, FALSE); - GST_OBJECT_LOCK (self); - if (!self->shm_name) { - GST_OBJECT_UNLOCK (self); - GST_ERROR_OBJECT (self, "Must set the name of the shm area first"); + self->stop = FALSE; + + if (!self->socket_path) { GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE, - ("One must specify the name of the shm area"), - ("shm-name property not set")); + ("Could not open socket."), (NULL)); return FALSE; } - self->fd = shm_open (self->shm_name, O_RDWR | O_CREAT | O_TRUNC, self->perms); + self->pipe = sp_writer_create (self->socket_path, self->size, self->perms); - if (self->fd < 0) { - GST_OBJECT_UNLOCK (self); - GST_ERROR_OBJECT (self, "Could not open shm area: %s", strerror (errno)); + if (!self->pipe) { GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE, - ("Could not open the shm area"), - ("shm_open failed (%d): %s", errno, strerror (errno))); + ("Could not open socket."), (NULL)); return FALSE; } - self->opened_name = g_strdup (self->shm_name); - GST_OBJECT_UNLOCK (self); + self->poll = gst_poll_new (TRUE); + gst_poll_fd_init (&self->serverpollfd); + self->serverpollfd.fd = sp_get_fd (self->pipe); + gst_poll_add_fd (self->poll, &self->serverpollfd); + gst_poll_fd_ctl_read (self->poll, &self->serverpollfd, TRUE); - self->shm_area_len = sizeof (struct GstShmHeader); + self->pollthread = g_thread_create (pollthread_func, self, TRUE, NULL); - if (ftruncate (self->fd, self->shm_area_len)) { - GST_ERROR_OBJECT (self, - "Could not make shm area large enough for header: %s", - strerror (errno)); - gst_shm_sink_stop (bsink); - GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE, - ("Could not resize memory area"), - ("ftruncate failed (%d): %s", errno, strerror (errno))); - return FALSE; - } + if (!self->pollthread) + goto thread_error; - self->shm_area = mmap (NULL, self->shm_area_len, PROT_READ | PROT_WRITE, - MAP_SHARED, self->fd, 0); + return TRUE; - if (self->shm_area == MAP_FAILED) { - GST_ERROR_OBJECT (self, "Could not map shm area"); - GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE, - ("Could not map memory area"), - ("mmap failed (%d): %s", errno, strerror (errno))); - gst_shm_sink_stop (bsink); - return FALSE; - } +thread_error: - memset (self->shm_area, 0, self->shm_area_len); - g_assert (sem_init (&self->shm_area->notification, 1, 0) == 0); - g_assert (sem_init (&self->shm_area->mutex, 1, 1) == 0); + sp_close (self->pipe); + self->pipe = NULL; + gst_poll_free (self->poll); - return TRUE; + GST_ELEMENT_ERROR (self, CORE, THREAD, ("Could not srart thread"), (NULL)); + return FALSE; } @@ -263,124 +312,226 @@ gst_shm_sink_stop (GstBaseSink * bsink) { GstShmSink *self = GST_SHM_SINK (bsink); - if (self->fd >= 0) - close (self->fd); - self->fd = -1; + self->stop = TRUE; + gst_poll_set_flushing (self->poll, TRUE); - if (self->opened_name) { - shm_unlink (self->opened_name); - g_free (self->opened_name); - self->opened_name = NULL; + g_thread_join (self->pollthread); + self->pollthread = NULL; + + GST_DEBUG_OBJECT (self, "Stopping"); + + while (self->clients) { + struct GstShmClient *client = self->clients->data; + self->clients = g_list_remove (self->clients, client); + sp_writer_close_client (self->pipe, client->client); + g_signal_emit (self, signals[SIGNAL_CLIENT_DISCONNECTED], 0, + client->pollfd.fd); + g_slice_free (struct GstShmClient, client); } - if (self->shm_area != MAP_FAILED) - munmap (self->shm_area, self->shm_area_len); - self->shm_area_len = 0; - self->shm_area = MAP_FAILED; + gst_poll_free (self->poll); + self->poll = NULL; + + sp_close (self->pipe); + self->pipe = NULL; return TRUE; } -static gboolean -gst_shm_sink_set_caps (GstBaseSink * bsink, GstCaps * caps) +static GstFlowReturn +gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf) { GstShmSink *self = GST_SHM_SINK (bsink); + int rv; - self->caps_gen++; + GST_OBJECT_LOCK (self); + while (self->wait_for_connection && !self->clients) { + g_cond_wait (self->cond, GST_OBJECT_GET_LOCK (self)); + if (self->unlock) { + GST_OBJECT_UNLOCK (self); + return GST_FLOW_WRONG_STATE; + } + } - return TRUE; -} + rv = sp_writer_send_buf (self->pipe, (char *) GST_BUFFER_DATA (buf), + GST_BUFFER_SIZE (buf)); -static gboolean -resize_area (GstShmSink * self, size_t desired_length) -{ - if (desired_length <= self->shm_area_len) - return TRUE; + if (rv == -1) { + ShmBlock *block = NULL; + gchar *shmbuf = NULL; + while ((block = sp_writer_alloc_block (self->pipe, + GST_BUFFER_SIZE (buf))) == NULL) { + g_cond_wait (self->cond, GST_OBJECT_GET_LOCK (self)); + if (self->unlock) { + GST_OBJECT_UNLOCK (self); + return GST_FLOW_WRONG_STATE; + } + } + while (self->wait_for_connection && !self->clients) { + g_cond_wait (self->cond, GST_OBJECT_GET_LOCK (self)); + if (self->unlock) { + sp_writer_free_block (block); + GST_OBJECT_UNLOCK (self); + return GST_FLOW_WRONG_STATE; + } + } - SHM_UNLOCK (self->shm_area); - if (munmap (self->shm_area, self->shm_area_len)) { - GST_ERROR_OBJECT (self, "Could not unmap shared area"); - GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE, - ("Could not unmap memory area"), - ("munmap failed (%d): %s", errno, strerror (errno))); - return FALSE; - } - if (ftruncate (self->fd, desired_length)) { - GST_ERROR_OBJECT (self, "Could not resize shared area"); - GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE, - ("Could not resize memory area"), - ("ftruncate failed (%d): %s", errno, strerror (errno))); - return FALSE; + shmbuf = sp_writer_block_get_buf (block); + memcpy (shmbuf, GST_BUFFER_DATA (buf), GST_BUFFER_SIZE (buf)); + sp_writer_send_buf (self->pipe, shmbuf, GST_BUFFER_SIZE (buf)); + sp_writer_free_block (block); } - self->shm_area = mmap (NULL, desired_length, PROT_READ | PROT_WRITE, - MAP_SHARED, self->fd, 0); - self->shm_area_len = desired_length; - if (self->shm_area == MAP_FAILED) { - self->shm_area = NULL; - GST_ERROR_OBJECT (self, "Could not remap shared area"); - GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE, - ("Could not map memory area"), - ("mmap failed (%d): %s", errno, strerror (errno))); - return FALSE; - } + GST_OBJECT_UNLOCK (self); - SHM_LOCK (self->shm_area); + return GST_FLOW_OK; +} - return TRUE; +void +gst_shm_sink_free_buffer (gpointer data) +{ + ShmBlock *block = data; + sp_writer_free_block (block); } static GstFlowReturn -gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf) +gst_shm_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size, + GstCaps * caps, GstBuffer ** out_buf) { - GstShmSink *self = GST_SHM_SINK (bsink); + GstShmSink *self = GST_SHM_SINK (sink); + GstBuffer *buffer; + ShmBlock *block = NULL; + gpointer buf = NULL; - g_return_val_if_fail (self->shm_area != MAP_FAILED, GST_FLOW_ERROR); + GST_OBJECT_LOCK (self); + block = sp_writer_alloc_block (self->pipe, size); + if (block) + buf = sp_writer_block_get_buf (block); + GST_OBJECT_UNLOCK (self); - SHM_LOCK (self->shm_area); + if (block) { + buffer = gst_buffer_new (); + GST_BUFFER_DATA (buffer) = buf; + GST_BUFFER_MALLOCDATA (buffer) = (guint8 *) block; + GST_BUFFER_FREE_FUNC (buffer) = + GST_DEBUG_FUNCPTR (gst_shm_sink_free_buffer); + GST_BUFFER_SIZE (buffer) = size; + GST_LOG_OBJECT (self, + "Allocated buffer of %lu bytes from shared memory at %p", size, buf); + } else { + buffer = gst_buffer_new_and_alloc (size); + GST_LOG_OBJECT (self, "Not enough shared memory for buffer of %lu bytes, " + "allocating using standard allocator", size); + } - if (self->caps_gen != self->shm_area->caps_gen) { - gchar *caps_str = - gst_caps_to_string (GST_PAD_CAPS (GST_BASE_SINK_PAD (bsink))); - guint caps_size = strlen (caps_str) + 1; + GST_BUFFER_OFFSET (buffer) = offset; + gst_buffer_set_caps (buffer, caps); - if (!resize_area (self, sizeof (struct GstShmHeader) + - caps_size + GST_BUFFER_SIZE (buf))) { - g_free (caps_str); - return GST_FLOW_ERROR; - } + *out_buf = buffer; - self->shm_area->caps_size = caps_size; + return GST_FLOW_OK; +} - memcpy (GST_SHM_CAPS_BUFFER (self->shm_area), caps_str, caps_size); - g_free (caps_str); +static gpointer +pollthread_func (gpointer data) +{ + GstShmSink *self = GST_SHM_SINK (data); + GList *item; - self->shm_area->caps_gen = self->caps_gen; - } else { - if (!resize_area (self, sizeof (struct GstShmHeader) + - self->shm_area->caps_size + GST_BUFFER_SIZE (buf))) - return GST_FLOW_ERROR; - } + while (!self->stop) { - memcpy (GST_SHM_BUFFER (self->shm_area), GST_BUFFER_DATA (buf), - GST_BUFFER_SIZE (buf)); + if (gst_poll_wait (self->poll, GST_CLOCK_TIME_NONE) < 0) + return NULL; - self->shm_area->buffer_size = GST_BUFFER_SIZE (buf); - self->shm_area->buffer_gen++; + if (self->stop) + return NULL; - self->shm_area->timestamp = GST_BUFFER_TIMESTAMP (buf); - self->shm_area->duration = GST_BUFFER_DURATION (buf); - self->shm_area->offset = GST_BUFFER_OFFSET (buf); - self->shm_area->offset_end = GST_BUFFER_OFFSET_END (buf); - self->shm_area->flags = GST_BUFFER_FLAGS (buf) & (GST_BUFFER_FLAG_DISCONT | - GST_BUFFER_FLAG_GAP | GST_BUFFER_FLAG_DELTA_UNIT); + if (gst_poll_fd_has_closed (self->poll, &self->serverpollfd)) { + GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed read from shmsink"), + ("Control socket has closed")); + return NULL; + } - sem_post (&self->shm_area->notification); + if (gst_poll_fd_has_error (self->poll, &self->serverpollfd)) { + GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsink"), + ("Control socket has error")); + return NULL; + } - SHM_UNLOCK (self->shm_area); + if (gst_poll_fd_can_read (self->poll, &self->serverpollfd)) { + ShmClient *client; + struct GstShmClient *gclient; + + GST_OBJECT_LOCK (self); + client = sp_writer_accept_client (self->pipe); + GST_OBJECT_UNLOCK (self); + + if (!client) { + GST_ELEMENT_ERROR (self, RESOURCE, READ, + ("Failed to read from shmsink"), + ("Control socket returns wrong data")); + return NULL; + } + + gclient = g_slice_new (struct GstShmClient); + gclient->client = client; + gst_poll_fd_init (&gclient->pollfd); + gclient->pollfd.fd = sp_writer_get_client_fd (client); + gst_poll_add_fd (self->poll, &gclient->pollfd); + gst_poll_fd_ctl_read (self->poll, &gclient->pollfd, TRUE); + self->clients = g_list_prepend (self->clients, gclient); + g_signal_emit (self, signals[SIGNAL_CLIENT_CONNECTED], 0, + gclient->pollfd.fd); + } - return GST_FLOW_OK; + again: + for (item = self->clients; item; item = item->next) { + struct GstShmClient *gclient = item->data; + + if (gst_poll_fd_has_closed (self->poll, &gclient->pollfd)) { + GST_WARNING_OBJECT (self, "One client is gone, closing"); + goto close_client; + } + + if (gst_poll_fd_has_error (self->poll, &gclient->pollfd)) { + GST_WARNING_OBJECT (self, "One client fd has error, closing"); + goto close_client; + } + + if (gst_poll_fd_can_read (self->poll, &gclient->pollfd)) { + int rv; + + GST_OBJECT_LOCK (self); + rv = sp_writer_recv (self->pipe, gclient->client); + GST_OBJECT_UNLOCK (self); + + if (rv < 0) { + GST_WARNING_OBJECT (self, "One client has read error," + " closing (retval: %d errno: %d)", rv, errno); + goto close_client; + } + } + continue; + close_client: + GST_OBJECT_LOCK (self); + sp_writer_close_client (self->pipe, gclient->client); + GST_OBJECT_UNLOCK (self); + + gst_poll_remove_fd (self->poll, &gclient->pollfd); + self->clients = g_list_remove (self->clients, gclient); + + g_signal_emit (self, signals[SIGNAL_CLIENT_DISCONNECTED], 0, + gclient->pollfd.fd); + g_slice_free (struct GstShmClient, gclient); + + goto again; + } + + g_cond_broadcast (self->cond); + } + + return NULL; } static gboolean @@ -388,33 +539,42 @@ gst_shm_sink_event (GstBaseSink * bsink, GstEvent * event) { GstShmSink *self = GST_SHM_SINK (bsink); - g_return_val_if_fail (self->shm_area != MAP_FAILED, FALSE); - - if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) { - SHM_LOCK (self->shm_area); - self->shm_area->eos = TRUE; - SHM_UNLOCK (self->shm_area); + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_EOS: + GST_OBJECT_LOCK (self); + while (sp_writer_pending_writes (self->pipe) && !self->unlock) + g_cond_wait (self->cond, GST_OBJECT_GET_LOCK (self)); + GST_OBJECT_UNLOCK (self); + break; + default: + break; } return TRUE; } -static GstStateChangeReturn -gst_shm_sink_change_state (GstElement * element, GstStateChange transition) + +static gboolean +gst_shm_sink_unlock (GstBaseSink * bsink) { - GstShmSink *self = GST_SHM_SINK (element); - - switch (transition) { - case GST_STATE_CHANGE_READY_TO_PAUSED: - g_return_val_if_fail (self->shm_area != MAP_FAILED, - GST_STATE_CHANGE_FAILURE); - SHM_LOCK (self->shm_area); - self->shm_area->eos = FALSE; - SHM_UNLOCK (self->shm_area); - break; - default: - break; - } + GstShmSink *self = GST_SHM_SINK (bsink); - return GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + GST_OBJECT_LOCK (self); + self->unlock = TRUE; + GST_OBJECT_UNLOCK (self); + + g_cond_broadcast (self->cond); + return TRUE; +} + +static gboolean +gst_shm_sink_unlock_stop (GstBaseSink * bsink) +{ + GstShmSink *self = GST_SHM_SINK (bsink); + + GST_OBJECT_LOCK (self); + self->unlock = FALSE; + GST_OBJECT_UNLOCK (self); + + return TRUE; } diff --git a/gst/shm/gstshmsink.h b/gst/shm/gstshmsink.h index 0475e1daa..bc6da74bc 100644 --- a/gst/shm/gstshmsink.h +++ b/gst/shm/gstshmsink.h @@ -25,6 +25,8 @@ #include <gst/gst.h> #include <gst/base/gstbasesink.h> +#include "shmpipe.h" + G_BEGIN_DECLS #define GST_TYPE_SHM_SINK \ (gst_shm_sink_get_type()) @@ -43,16 +45,24 @@ struct _GstShmSink { GstBaseSink element; - gchar *shm_name; + gchar *socket_path; - int fd; - struct GstShmHeader *shm_area; - size_t shm_area_len; - gchar *opened_name; + ShmPipe *pipe; guint perms; + guint size; + + GList *clients; + + GThread *pollthread; + GstPoll *poll; + GstPollFD serverpollfd; + + gboolean wait_for_connection; + gboolean stop; + gboolean unlock; - guint caps_gen; + GCond *cond; }; struct _GstShmSinkClass |