/* * Farstream - Farstream Shared Memory Stream Transmitter * * Copyright 2009 Collabora Ltd. * @author: Olivier Crete * Copyright 2009 Nokia Corp. * * fs-shm-stream-transmitter.c - A Farstream Shared memory stream transmitter * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ /** * SECTION:fs-shm-stream-transmitter * @short_description: A stream transmitter object for Shared Memory * * The name of this transmitter is "shm". * * This transmitter is meant to send and received the data from another process * on the same system while minimizing the memory pressure associated with the * use of sockets. * * Two sockets are used to control the shared memory areas. One is used to * send data and one to receive data. The receiver always connects to the * sender. The sender socket must exist before the receiver connects to it. * * Negotiating the paths of the sockets can happen in two ways. If the * create-local-candidates is True then the transmitter will generate the * path of the local candidate and us it as the ip filed in #FsCandidate. The * transmitter will expect the path of the applications sender socket to be in * the "ip" field of the remote candidates #FsCandidate as well. * * Or alternatively, if create-local-candidates is false then * the sender socket can be created by giving the transmitter a candidate * with the path of the socket in the "ip" field of the #FsCandidate. This * #FsCandidate can be given to the #FsStreamTransmitter in two ways, either * by setting the #FsStreamTransmitter:preferred-local-candidates property * or by calling the fs_stream_transmitter_force_remote_candidates() function. * There can be only one single send socket per stream. When the send socket * is ready to be connected to, #FsStreamTransmitter::new-local-candidate signal * will be emitted. * * To connect the receive side to the other application, one must create a * #FsCandidate with the path of the sender's socket in the "username" field. * If the receiver can not connect to the sender, * the fs_stream_transmitter_force_remote_candidates() call will fail. */ #ifdef HAVE_CONFIG_H #include "config.h" #endif #include "fs-shm-stream-transmitter.h" #include "fs-shm-transmitter.h" #include #include #include #include #include #include #ifdef HAVE_UNISTD_H # include #endif #include GST_DEBUG_CATEGORY_EXTERN (fs_shm_transmitter_debug); #define GST_CAT_DEFAULT fs_shm_transmitter_debug /* Signals */ enum { LAST_SIGNAL }; /* props */ enum { PROP_0, PROP_SENDING, PROP_PREFERRED_LOCAL_CANDIDATES, PROP_CREATE_LOCAL_CANDIDATES, }; struct _FsShmStreamTransmitterPrivate { /* We don't actually hold a ref to this, * But since our parent FsStream can not exist without its parent * FsSession, we should be safe */ FsShmTransmitter *transmitter; GList *preferred_local_candidates; GMutex mutex; /* Protected by the mutex */ gboolean sending; /* Protected by the mutex */ FsCandidate **candidates; /* Whether we create the local candidate ourselves or rely on the remote end * to pass them to us as part of the candidate */ gboolean create_local_candidates; /* temporary socket directy in case we made one */ gchar *socket_dir; ShmSrc **shm_src; ShmSink **shm_sink; }; #define FS_SHM_STREAM_TRANSMITTER_GET_PRIVATE(o) \ (G_TYPE_INSTANCE_GET_PRIVATE ((o), FS_TYPE_SHM_STREAM_TRANSMITTER, \ FsShmStreamTransmitterPrivate)) #define FS_SHM_STREAM_TRANSMITTER_LOCK(s) \ g_mutex_lock (&(s)->priv->mutex) #define FS_SHM_STREAM_TRANSMITTER_UNLOCK(s) \ g_mutex_unlock (&(s)->priv->mutex) static void fs_shm_stream_transmitter_class_init (FsShmStreamTransmitterClass *klass); static void fs_shm_stream_transmitter_init (FsShmStreamTransmitter *self); static void fs_shm_stream_transmitter_dispose (GObject *object); static void fs_shm_stream_transmitter_finalize (GObject *object); static void fs_shm_stream_transmitter_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec); static void fs_shm_stream_transmitter_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec); static gboolean fs_shm_stream_transmitter_force_remote_candidates ( FsStreamTransmitter *streamtransmitter, GList *candidates, GError **error); static gboolean fs_shm_stream_transmitter_gather_local_candidates ( FsStreamTransmitter *streamtransmitter, GError **error); static gboolean fs_shm_stream_transmitter_add_sink (FsShmStreamTransmitter *self, FsCandidate *candidate, GError **error); static GObjectClass *parent_class = NULL; // static guint signals[LAST_SIGNAL] = { 0 }; static GType type = 0; GType fs_shm_stream_transmitter_get_type (void) { return type; } GType fs_shm_stream_transmitter_register_type (FsPlugin *module G_GNUC_UNUSED) { static const GTypeInfo info = { sizeof (FsShmStreamTransmitterClass), NULL, NULL, (GClassInitFunc) fs_shm_stream_transmitter_class_init, NULL, NULL, sizeof (FsShmStreamTransmitter), 0, (GInstanceInitFunc) fs_shm_stream_transmitter_init }; type = g_type_register_static (FS_TYPE_STREAM_TRANSMITTER, "FsShmStreamTransmitter", &info, 0); return type; } static void fs_shm_stream_transmitter_class_init (FsShmStreamTransmitterClass *klass) { GObjectClass *gobject_class = (GObjectClass *) klass; FsStreamTransmitterClass *streamtransmitterclass = FS_STREAM_TRANSMITTER_CLASS (klass); GParamSpec *pspec; parent_class = g_type_class_peek_parent (klass); gobject_class->set_property = fs_shm_stream_transmitter_set_property; gobject_class->get_property = fs_shm_stream_transmitter_get_property; streamtransmitterclass->force_remote_candidates = fs_shm_stream_transmitter_force_remote_candidates; streamtransmitterclass->gather_local_candidates = fs_shm_stream_transmitter_gather_local_candidates; g_object_class_override_property (gobject_class, PROP_SENDING, "sending"); g_object_class_override_property (gobject_class, PROP_PREFERRED_LOCAL_CANDIDATES, "preferred-local-candidates"); pspec = g_param_spec_boolean ("create-local-candidates", "CreateLocalCandidates", "Whether the transmitter should automatically create local candidates", FALSE, G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS); g_object_class_install_property (gobject_class, PROP_CREATE_LOCAL_CANDIDATES, pspec); gobject_class->dispose = fs_shm_stream_transmitter_dispose; gobject_class->finalize = fs_shm_stream_transmitter_finalize; g_type_class_add_private (klass, sizeof (FsShmStreamTransmitterPrivate)); } static void fs_shm_stream_transmitter_init (FsShmStreamTransmitter *self) { /* member init */ self->priv = FS_SHM_STREAM_TRANSMITTER_GET_PRIVATE (self); self->priv->sending = TRUE; g_mutex_init (&self->priv->mutex); } static void fs_shm_stream_transmitter_dispose (GObject *object) { FsShmStreamTransmitter *self = FS_SHM_STREAM_TRANSMITTER (object); gint c; /* component_id */ for (c = 1; c <= self->priv->transmitter->components; c++) { if (self->priv->shm_src[c]) { fs_shm_transmitter_check_shm_src (self->priv->transmitter, self->priv->shm_src[c], NULL); } self->priv->shm_src[c] = NULL; if (self->priv->shm_sink[c]) { fs_shm_transmitter_check_shm_sink (self->priv->transmitter, self->priv->shm_sink[c], NULL); } self->priv->shm_sink[c] = NULL; } if (self->priv->socket_dir != NULL) g_rmdir (self->priv->socket_dir); g_free (self->priv->socket_dir); self->priv->socket_dir = NULL; parent_class->dispose (object); } static void fs_shm_stream_transmitter_finalize (GObject *object) { FsShmStreamTransmitter *self = FS_SHM_STREAM_TRANSMITTER (object); fs_candidate_list_destroy (self->priv->preferred_local_candidates); g_free (self->priv->shm_src); g_free (self->priv->shm_sink); g_mutex_clear (&self->priv->mutex); parent_class->finalize (object); } static void fs_shm_stream_transmitter_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) { FsShmStreamTransmitter *self = FS_SHM_STREAM_TRANSMITTER (object); switch (prop_id) { case PROP_SENDING: FS_SHM_STREAM_TRANSMITTER_LOCK (self); g_value_set_boolean (value, self->priv->sending); FS_SHM_STREAM_TRANSMITTER_UNLOCK (self); break; case PROP_PREFERRED_LOCAL_CANDIDATES: g_value_set_boxed (value, self->priv->preferred_local_candidates); break; case PROP_CREATE_LOCAL_CANDIDATES: g_value_set_boolean (value, self->priv->create_local_candidates); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } static void fs_shm_stream_transmitter_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) { FsShmStreamTransmitter *self = FS_SHM_STREAM_TRANSMITTER (object); switch (prop_id) { case PROP_SENDING: FS_SHM_STREAM_TRANSMITTER_LOCK (self); self->priv->sending = g_value_get_boolean (value); if (self->priv->shm_sink[1]) fs_shm_transmitter_sink_set_sending (self->priv->transmitter, self->priv->shm_sink[1], self->priv->sending); FS_SHM_STREAM_TRANSMITTER_UNLOCK (self); break; case PROP_PREFERRED_LOCAL_CANDIDATES: self->priv->preferred_local_candidates = g_value_dup_boxed (value); break; case PROP_CREATE_LOCAL_CANDIDATES: self->priv->create_local_candidates = g_value_get_boolean (value); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } static gboolean fs_shm_stream_transmitter_build (FsShmStreamTransmitter *self, GError **error) { self->priv->shm_src = g_new0 (ShmSrc *, self->priv->transmitter->components + 1); self->priv->shm_sink = g_new0 (ShmSink *, self->priv->transmitter->components + 1); return TRUE; } static void got_buffer_func (GstBuffer *buffer, guint component, gpointer data) { FsShmStreamTransmitter *self = FS_SHM_STREAM_TRANSMITTER_CAST (data); g_signal_emit_by_name (self, "known-source-packet-received", component, buffer); } static void ready_cb (guint component, gchar *path, gpointer data) { FsShmStreamTransmitter *self = FS_SHM_STREAM_TRANSMITTER_CAST (data); FsCandidate *candidate = fs_candidate_new (NULL, component, FS_CANDIDATE_TYPE_HOST, FS_NETWORK_PROTOCOL_UDP, path, 0); GST_DEBUG ("Emitting new local candidate with path %s", path); g_signal_emit_by_name (self, "new-local-candidate", candidate); g_signal_emit_by_name (self, "local-candidates-prepared"); fs_candidate_destroy (candidate); } static void connected_cb (guint component, gint id, gpointer data) { FsShmStreamTransmitter *self = data; g_signal_emit_by_name (self, "state-changed", component, FS_STREAM_STATE_READY); } static gboolean fs_shm_stream_transmitter_add_sink (FsShmStreamTransmitter *self, FsCandidate *candidate, GError **error) { if (self->priv->create_local_candidates) return TRUE; if (!candidate->ip || !candidate->ip[0]) return TRUE; if (self->priv->shm_sink[candidate->component_id]) { if (fs_shm_transmitter_check_shm_sink (self->priv->transmitter, self->priv->shm_sink[candidate->component_id], candidate->ip)) return TRUE; self->priv->shm_sink[candidate->component_id] = NULL; } self->priv->shm_sink[candidate->component_id] = fs_shm_transmitter_get_shm_sink (self->priv->transmitter, candidate->component_id, candidate->ip, ready_cb, connected_cb, self, error); if (self->priv->shm_sink[candidate->component_id] == NULL) return FALSE; if (candidate->component_id == 1) fs_shm_transmitter_sink_set_sending (self->priv->transmitter, self->priv->shm_sink[candidate->component_id], self->priv->sending); return TRUE; } static void disconnected_cb (guint component, gint id, gpointer data) { FsShmStreamTransmitter *self = data; g_signal_emit_by_name (self, "state-changed", component, FS_STREAM_STATE_FAILED); } static gboolean fs_shm_stream_transmitter_force_remote_candidate ( FsShmStreamTransmitter *self, FsCandidate *candidate, GError **error) { const gchar *path; if (!fs_shm_stream_transmitter_add_sink (self, candidate, error)) return FALSE; if (self->priv->create_local_candidates) path = candidate->ip; else path = candidate->username; if (path && path[0]) { if (self->priv->shm_src[candidate->component_id]) { if (fs_shm_transmitter_check_shm_src (self->priv->transmitter, self->priv->shm_src[candidate->component_id], path)) return TRUE; self->priv->shm_src[candidate->component_id] = NULL; } self->priv->shm_src[candidate->component_id] = fs_shm_transmitter_get_shm_src (self->priv->transmitter, candidate->component_id, path, got_buffer_func, disconnected_cb, self, error); if (self->priv->shm_src[candidate->component_id] == NULL) return FALSE; } return TRUE; } /** * fs_shm_stream_transmitter_force_remote_candidates */ static gboolean fs_shm_stream_transmitter_force_remote_candidates ( FsStreamTransmitter *streamtransmitter, GList *candidates, GError **error) { GList *item = NULL; FsShmStreamTransmitter *self = FS_SHM_STREAM_TRANSMITTER (streamtransmitter); for (item = candidates; item; item = g_list_next (item)) { FsCandidate *candidate = item->data; if (candidate->component_id == 0 || candidate->component_id > self->priv->transmitter->components) { g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS, "The candidate passed has an invalid component id %u (not in [1,%u])", candidate->component_id, self->priv->transmitter->components); return FALSE; } if ((!candidate->ip || !candidate->ip[0]) && (!candidate->username || !candidate->username[0])) { g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS, "The candidate does not have a SINK shm segment in its ip" " or a SRC shm segment in its username"); return FALSE; } } for (item = candidates; item; item = g_list_next (item)) if (!fs_shm_stream_transmitter_force_remote_candidate (self, item->data, error)) return FALSE; return TRUE; } FsShmStreamTransmitter * fs_shm_stream_transmitter_newv (FsShmTransmitter *transmitter, guint n_parameters, GParameter *parameters, GError **error) { FsShmStreamTransmitter *streamtransmitter = NULL; streamtransmitter = g_object_newv (FS_TYPE_SHM_STREAM_TRANSMITTER, n_parameters, parameters); if (!streamtransmitter) { g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION, "Could not build the stream transmitter"); return NULL; } streamtransmitter->priv->transmitter = transmitter; if (!fs_shm_stream_transmitter_build (streamtransmitter, error)) { g_object_unref (streamtransmitter); return NULL; } return streamtransmitter; } static gboolean fs_shm_stream_transmitter_gather_local_candidates ( FsStreamTransmitter *streamtransmitter, GError **error) { FsShmStreamTransmitter *self = FS_SHM_STREAM_TRANSMITTER (streamtransmitter); GList *item; if (self->priv->create_local_candidates) { guint c; gchar *socket_dir; socket_dir = g_build_filename (g_get_tmp_dir (), "farstream-shm-XXXXXX", NULL); if (g_mkdtemp (socket_dir) == NULL) return FALSE; self->priv->socket_dir = socket_dir; for (c = 1; c <= self->priv->transmitter->components; c++) { gchar *path = g_strdup_printf ("%s/shm-sink-socket-%d", socket_dir, c); self->priv->shm_sink[c] = fs_shm_transmitter_get_shm_sink (self->priv->transmitter, c, path, ready_cb, connected_cb, self, error); g_free (path); if (self->priv->shm_sink[c] == NULL) return FALSE; if (c == 1) fs_shm_transmitter_sink_set_sending (self->priv->transmitter, self->priv->shm_sink[c], self->priv->sending); } return TRUE; } for (item = self->priv->preferred_local_candidates; item; item = g_list_next (item)) { FsCandidate *candidate = item->data; if (candidate->ip && candidate->ip[0]) if (!fs_shm_stream_transmitter_add_sink (self, candidate, error)) return FALSE; } return TRUE; }