/* * Farstream - Farstream RTP Sub Stream * * Copyright 2007-2009 Collabora Ltd. * @author: Olivier Crete * Copyright 2007-2009 Nokia Corp. * * fs-rtp-substream.c - A Farstream RTP Substream gobject * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ #ifdef HAVE_CONFIG_H #include "config.h" #endif #include "fs-rtp-substream.h" #include #include #include "fs-rtp-stream.h" #define GST_CAT_DEFAULT fsrtpconference_debug /* * SECTION:fs-rtp-sub-stream * @short_description: The receive codec bin for a ssrc and a pt * * This object controls a part of the receive pipeline, with the following shape * * rtpbin_pad -> input_valve -> capsfilter -> codecbin -> output_valve -> output_ghostad * */ /* signals */ enum { NO_RTCP_TIMEDOUT, SRC_PAD_ADDED, CODEC_CHANGED, ERROR_SIGNAL, GET_CODEC_BIN, UNLINKED, LAST_SIGNAL }; /* props */ enum { PROP_0, PROP_CONFERENCE, PROP_SESSION, PROP_STREAM, PROP_RTPBIN_PAD, PROP_SSRC, PROP_PT, PROP_CODEC, PROP_RECEIVING, PROP_OUTPUT_GHOSTPAD, PROP_NO_RTCP_TIMEOUT }; #define DEFAULT_NO_RTCP_TIMEOUT (7000) struct _FsRtpSubStreamPrivate { /* These are only pointers, we don't own references */ FsRtpConference *conference; FsRtpSession *session; FsRtpStream *stream; /* only set once, protected by session lock */ GstPad *rtpbin_pad; gulong rtpbin_unlinked_sig; GstElement *input_valve; GstElement *output_valve; GstElement *capsfilter; /* This only exists if the codec is valid, * otherwise the rtpbin_pad is blocked */ /* Protected by the session mutex */ GstElement *codecbin; guint builder_hash; /* This is only created when the substream is associated with a FsRtpStream */ GstPad *output_ghostpad; /* Set to TRUE if the ghostpad is already being added */ /* Proteced by the session mutex */ gboolean adding_output_ghostpad; /* The id of the pad probe used to block the stream while the recv codec * is changed * Protected by the session mutex */ gulong blocking_id; gulong check_caps_id; /* This is protected by the session lock... the caller takes the lock * before updating the property.. yea nasty I know */ gboolean receiving; /* Protected by the this mutex */ GMutex mutex; GstClockID no_rtcp_timeout_id; GstClockTime next_no_rtcp_timeout; GThread *no_rtcp_timeout_thread; /* Can only be used while using the lock */ GRWLock stopped_lock; gboolean stopped; GError *construction_error; }; static GObjectClass *parent_class = NULL; static guint signals[LAST_SIGNAL] = { 0 }; G_DEFINE_TYPE(FsRtpSubStream, fs_rtp_sub_stream, G_TYPE_OBJECT); #define FS_RTP_SUB_STREAM_GET_PRIVATE(o) \ (G_TYPE_INSTANCE_GET_PRIVATE ((o), FS_TYPE_RTP_SUB_STREAM, \ FsRtpSubStreamPrivate)) #define FS_RTP_SUB_STREAM_LOCK(substream) \ g_mutex_lock (&substream->priv->mutex) #define FS_RTP_SUB_STREAM_UNLOCK(substream) \ g_mutex_unlock (&substream->priv->mutex) static void fs_rtp_sub_stream_dispose (GObject *object); static void fs_rtp_sub_stream_finalize (GObject *object); static void fs_rtp_sub_stream_constructed (GObject *object); static void fs_rtp_sub_stream_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec); static void fs_rtp_sub_stream_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec); static void fs_rtp_sub_stream_emit_error (FsRtpSubStream *substream, gint error_no, gchar *error_msg, gchar *debug_msg); static gboolean fs_rtp_sub_stream_has_stopped_enter (FsRtpSubStream *self) { g_rw_lock_reader_lock (&self->priv->stopped_lock); if (self->priv->stopped) { g_rw_lock_reader_unlock (&self->priv->stopped_lock); return TRUE; } return FALSE; } static void fs_rtp_sub_stream_has_stopped_exit (FsRtpSubStream *self) { g_rw_lock_reader_unlock (&self->priv->stopped_lock); } static void fs_rtp_sub_stream_class_init (FsRtpSubStreamClass *klass) { GObjectClass *gobject_class = G_OBJECT_CLASS (klass); parent_class = fs_rtp_sub_stream_parent_class; gobject_class->constructed = fs_rtp_sub_stream_constructed; gobject_class->dispose = fs_rtp_sub_stream_dispose; gobject_class->finalize = fs_rtp_sub_stream_finalize; gobject_class->set_property = fs_rtp_sub_stream_set_property; gobject_class->get_property = fs_rtp_sub_stream_get_property; g_object_class_install_property (gobject_class, PROP_CONFERENCE, g_param_spec_object ("conference", "The FsRtpConference this substream stream refers to", "This is a convience pointer for the Conference", FS_TYPE_RTP_CONFERENCE, G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_SESSION, g_param_spec_object ("session", "The FsRtpSession this substream stream refers to", "This is a convience pointer for the parent FsRtpSession", FS_TYPE_RTP_SESSION, G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_STREAM, g_param_spec_object ("stream", "The FsRtpStream this substream stream refers to", "This is a convience pointer for the parent FsRtpStream", FS_TYPE_RTP_STREAM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_RTPBIN_PAD, g_param_spec_object ("rtpbin-pad", "The GstPad this substrea is linked to", "This is the pad on which this substream will attach itself", GST_TYPE_PAD, G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_SSRC, g_param_spec_uint ("ssrc", "The ssrc this stream is used for", "This is the SSRC from the pad", 0, G_MAXUINT32, 0, G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_PT, g_param_spec_uint ("pt", "The payload type this stream is used for", "This is the payload type from the pad", 0, 128, 0, G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_CODEC, g_param_spec_boxed ("codec", "The FsCodec this substream is received", "The FsCodec currently received from this substream", FS_TYPE_CODEC, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_RECEIVING, g_param_spec_boolean ("receiving", "Whether this substream will receive any data", "A toggle that prevents the substream from outputting any data", TRUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_OUTPUT_GHOSTPAD, g_param_spec_object ("output-ghostpad", "The output ghostpad for this substream", "The GstPad which is on the outside of the fsrtpconference element" " for this substream", GST_TYPE_PAD, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_NO_RTCP_TIMEOUT, g_param_spec_int ("no-rtcp-timeout", "The timeout (in ms) before no RTCP is assumed", "This is the time (in ms) after which data received without RTCP" " is attached the FsStream, this only works if there is only one" " FsStream. <=0 will do nothing", -1, G_MAXINT, DEFAULT_NO_RTCP_TIMEOUT, G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * FsRtpSubStream::no-rtcp-timedout: * @self: #FsSubStream that emitted the signal * * This signal is emitted after the timeout specified by * #FsRtpSubStream:no-rtcp-timeout if this sub-stream has not been attached * to a stream. * */ signals[NO_RTCP_TIMEDOUT] = g_signal_new ("no-rtcp-timedout", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); /** * FsRtpSubStream::src-pad-added: * @self: #FsRtpSubStream that emitted the signal * @pad: #GstPad of the new source pad * @codec: #FsCodec of the codec being received on the new source pad * * This signal is emitted when a new gst source pad has been created for a * specific codec being received. There will be a different source pad for * each codec that is received. The user must ref the #GstPad if he wants to * keep it. The user should not modify the #FsCodec and must copy it if he * wants to use it outside the callback scope. * * This signal is not emitted on the main thread, but on GStreamer's streaming * thread! * * This is re-emited by the FsStream * */ signals[SRC_PAD_ADDED] = g_signal_new ("src-pad-added", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 2, GST_TYPE_PAD, FS_TYPE_CODEC); /** * FsRtpSubStream::error: * @self: #FsRtpSubStream that emitted the signal * @errorno: The number of the error * @error_msg: Error message to be displayed to user * @debug_msg: Debugging error message * * This signal is emitted in any error condition * */ signals[ERROR_SIGNAL] = g_signal_new ("error", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 3, G_TYPE_INT, G_TYPE_STRING, G_TYPE_STRING); /** * FsRtpSubStream:codec-changed * @self: #FsRtpSubStream that emitted the signal * * This signal is emitted when the code for this substream has * changed. It can be fetched from the #FsRtpSubStream:codec property * This is useful for displaying the current active reception codecs. */ signals[CODEC_CHANGED] = g_signal_new ("codec-changed", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); /** * FsRtpSubStream:get-codec-bin-locked * @self: #FsRtpSubStream that emitted the signal * @stream: the #FsRtpStream this substream is attached to if any (or %NULL) * @current_codec: The current codec * @new_codec: A pointer to a location where the codec can be stored * @current_builder_hash: The hash of the current codecbin builder * @new_builder_hash: A location to store the hash of the new codecbin * builder * @error: The location of a GError where an error can be stored * * This emitted when the substream want to get a codecbin or replace * the current one. * * Returns: The Codec Bin */ signals[GET_CODEC_BIN] = g_signal_new ("get-codec-bin", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_POINTER, 5, G_TYPE_POINTER, G_TYPE_POINTER, G_TYPE_UINT, G_TYPE_POINTER, G_TYPE_POINTER); /** * FsRtpSubStream:unlinked * @self: #FsRtpSubStream that emitted the signal * * This signal is emitted when the rtpbin pad that this substream decodes * from is unlinked. */ signals[UNLINKED] = g_signal_new ("unlinked", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE); g_type_class_add_private (klass, sizeof (FsRtpSubStreamPrivate)); } static void fs_rtp_sub_stream_init (FsRtpSubStream *self) { self->priv = FS_RTP_SUB_STREAM_GET_PRIVATE (self); self->priv->receiving = TRUE; g_mutex_init (&self->priv->mutex); g_rw_lock_init (&self->priv->stopped_lock); } static gpointer no_rtcp_timeout_func (gpointer user_data) { FsRtpSubStream *self = FS_RTP_SUB_STREAM (user_data); GstClock *sysclock = NULL; GstClockID id; gboolean emit = TRUE; sysclock = gst_system_clock_obtain (); if (sysclock == NULL) goto no_sysclock; FS_RTP_SUB_STREAM_LOCK(self); id = self->priv->no_rtcp_timeout_id = gst_clock_new_single_shot_id (sysclock, self->priv->next_no_rtcp_timeout); FS_RTP_SUB_STREAM_UNLOCK(self); gst_clock_id_wait (id, NULL); FS_RTP_SUB_STREAM_LOCK(self); gst_clock_id_unref (id); self->priv->no_rtcp_timeout_id = NULL; if (self->priv->next_no_rtcp_timeout == 0) emit = FALSE; FS_RTP_SUB_STREAM_UNLOCK(self); gst_object_unref (sysclock); if (emit) g_signal_emit (self, signals[NO_RTCP_TIMEDOUT], 0); return NULL; no_sysclock: { fs_rtp_sub_stream_emit_error (self, FS_ERROR_INTERNAL, "Could not get system clock", "Could not get system clock"); return NULL; } } static gboolean fs_rtp_sub_stream_start_no_rtcp_timeout_thread (FsRtpSubStream *self, GError **error) { gboolean res = TRUE; GstClock *sysclock = NULL; sysclock = gst_system_clock_obtain (); if (sysclock == NULL) { g_set_error (error, FS_ERROR, FS_ERROR_INTERNAL, "Could not obtain gst system clock"); return FALSE; } FS_RTP_SESSION_LOCK (self->priv->session); FS_RTP_SUB_STREAM_LOCK(self); self->priv->next_no_rtcp_timeout = gst_clock_get_time (sysclock) + (self->no_rtcp_timeout * GST_MSECOND); gst_object_unref (sysclock); if (self->priv->no_rtcp_timeout_thread == NULL) { /* only create a new thread if the old one was stopped. Otherwise we can * just reuse the currently running one. */ self->priv->no_rtcp_timeout_thread = g_thread_try_new ("no rtcp timeout", no_rtcp_timeout_func, self, error); } res = (self->priv->no_rtcp_timeout_thread != NULL); if (res == FALSE && error && *error == NULL) g_set_error (error, FS_ERROR, FS_ERROR_INTERNAL, "Unknown error creating" " thread"); FS_RTP_SUB_STREAM_UNLOCK(self); FS_RTP_SESSION_UNLOCK (self->priv->session); return res; } static void fs_rtp_sub_stream_stop_no_rtcp_timeout_thread (FsRtpSubStream *self) { FS_RTP_SUB_STREAM_LOCK(self); self->priv->next_no_rtcp_timeout = 0; if (self->priv->no_rtcp_timeout_id) gst_clock_id_unschedule (self->priv->no_rtcp_timeout_id); if (self->priv->no_rtcp_timeout_thread == NULL) { FS_RTP_SUB_STREAM_UNLOCK(self); return; } else { FS_RTP_SUB_STREAM_UNLOCK(self); } g_thread_join (self->priv->no_rtcp_timeout_thread); FS_RTP_SUB_STREAM_LOCK(self); self->priv->no_rtcp_timeout_thread = NULL; FS_RTP_SUB_STREAM_UNLOCK(self); } static void rtpbin_pad_unlinked (GstPad *pad, GstPad *peer, gpointer user_data) { FsRtpSubStream *self = user_data; g_signal_emit (self, signals[UNLINKED], 0); } static void fs_rtp_sub_stream_constructed (GObject *object) { FsRtpSubStream *self = FS_RTP_SUB_STREAM (object); GstPad *valve_sink_pad = NULL; GstPadLinkReturn linkret; gchar *tmp; GST_DEBUG ("New substream in session %u for ssrc %x and pt %u", self->priv->session->id, self->ssrc, self->pt); if (!self->priv->conference) { self->priv->construction_error = g_error_new (FS_ERROR, FS_ERROR_INVALID_ARGUMENTS, "A Substream needs a conference object"); return; } self->priv->rtpbin_unlinked_sig = g_signal_connect_object ( self->priv->rtpbin_pad, "unlinked", G_CALLBACK (rtpbin_pad_unlinked), self, 0); tmp = g_strdup_printf ("output_recv_valve_%u_%u_%u", self->priv->session->id, self->ssrc, self->pt); self->priv->output_valve = gst_element_factory_make ("valve", tmp); g_free (tmp); if (!self->priv->output_valve) { self->priv->construction_error = g_error_new (FS_ERROR, FS_ERROR_CONSTRUCTION, "Could not create a valve element for" " session substream with ssrc: %u and pt:%d", self->ssrc, self->pt); return; } if (!gst_bin_add (GST_BIN (self->priv->conference), self->priv->output_valve)) { self->priv->construction_error = g_error_new (FS_ERROR, FS_ERROR_CONSTRUCTION, "Could not add the valve element for session" " substream with ssrc: %u and pt:%d to the conference bin", self->ssrc, self->pt); return; } /* We set the valve to dropping, the stream will unblock it when its linked */ g_object_set (self->priv->output_valve, "drop", TRUE, NULL); if (gst_element_set_state (self->priv->output_valve, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { self->priv->construction_error = g_error_new (FS_ERROR, FS_ERROR_CONSTRUCTION, "Could not set the valve element for session" " substream with ssrc: %u and pt:%d to the playing state", self->ssrc, self->pt); return; } tmp = g_strdup_printf ("recv_capsfilter_%u_%u_%u", self->priv->session->id, self->ssrc, self->pt); self->priv->capsfilter = gst_element_factory_make ("capsfilter", tmp); g_free (tmp); if (!self->priv->capsfilter) { self->priv->construction_error = g_error_new (FS_ERROR, FS_ERROR_CONSTRUCTION, "Could not create a capsfilter element for" " session substream with ssrc: %u and pt:%d", self->ssrc, self->pt); return; } if (!gst_bin_add (GST_BIN (self->priv->conference), self->priv->capsfilter)) { self->priv->construction_error = g_error_new (FS_ERROR, FS_ERROR_CONSTRUCTION, "Could not add the capsfilter element for session" " substream with ssrc: %u and pt:%d to the conference bin", self->ssrc, self->pt); return; } if (gst_element_set_state (self->priv->capsfilter, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { self->priv->construction_error = g_error_new (FS_ERROR, FS_ERROR_CONSTRUCTION, "Could not set the capsfilter element for session" " substream with ssrc: %u and pt:%d to the playing state", self->ssrc, self->pt); return; } tmp = g_strdup_printf ("input_recv_valve_%u_%u_%u", self->priv->session->id, self->ssrc, self->pt); self->priv->input_valve = gst_element_factory_make ("valve", tmp); g_free (tmp); if (!self->priv->input_valve) { self->priv->construction_error = g_error_new (FS_ERROR, FS_ERROR_CONSTRUCTION, "Could not create a valve element for" " session substream with ssrc: %u and pt:%d", self->ssrc, self->pt); return; } if (!gst_bin_add (GST_BIN (self->priv->conference), self->priv->input_valve)) { self->priv->construction_error = g_error_new (FS_ERROR, FS_ERROR_CONSTRUCTION, "Could not add the valve element for session" " substream with ssrc: %u and pt:%d to the conference bin", self->ssrc, self->pt); return; } if (gst_element_set_state (self->priv->input_valve, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { self->priv->construction_error = g_error_new (FS_ERROR, FS_ERROR_CONSTRUCTION, "Could not set the valve element for session" " substream with ssrc: %u and pt:%d to the playing state", self->ssrc, self->pt); return; } if (!gst_element_link (self->priv->input_valve, self->priv->capsfilter)) { self->priv->construction_error = g_error_new (FS_ERROR, FS_ERROR_CONSTRUCTION, "Could not link the input valve" " and the capsfilter"); return; } valve_sink_pad = gst_element_get_static_pad (self->priv->input_valve, "sink"); if (!valve_sink_pad) { self->priv->construction_error = g_error_new (FS_ERROR, FS_ERROR_CONSTRUCTION, "Could not get the valve's sink pad"); return; } linkret = gst_pad_link (self->priv->rtpbin_pad, valve_sink_pad); gst_object_unref (valve_sink_pad); if (GST_PAD_LINK_FAILED (linkret)) { self->priv->construction_error = g_error_new (FS_ERROR, FS_ERROR_CONSTRUCTION, "Could not link the rtpbin to the codec bin (%d)", linkret); return; } if (self->no_rtcp_timeout > 0) if (!fs_rtp_sub_stream_start_no_rtcp_timeout_thread (self, &self->priv->construction_error)) return; GST_CALL_PARENT (G_OBJECT_CLASS, constructed, (object)); } static void fs_rtp_sub_stream_dispose (GObject *object) { FsRtpSubStream *self = FS_RTP_SUB_STREAM (object); fs_rtp_sub_stream_stop_no_rtcp_timeout_thread (self); if (self->priv->output_ghostpad) { gst_element_remove_pad (GST_ELEMENT (self->priv->conference), self->priv->output_ghostpad); self->priv->output_ghostpad = NULL; } if (self->priv->output_valve) { gst_element_set_locked_state (self->priv->output_valve, TRUE); gst_element_set_state (self->priv->output_valve, GST_STATE_NULL); gst_bin_remove (GST_BIN (self->priv->conference), self->priv->output_valve); self->priv->output_valve = NULL; } if (self->priv->codecbin) { gst_element_set_locked_state (self->priv->codecbin, TRUE); gst_element_set_state (self->priv->codecbin, GST_STATE_NULL); gst_bin_remove (GST_BIN (self->priv->conference), self->priv->codecbin); self->priv->codecbin = NULL; } if (self->priv->capsfilter) { gst_element_set_locked_state (self->priv->capsfilter, TRUE); gst_element_set_state (self->priv->capsfilter, GST_STATE_NULL); gst_bin_remove (GST_BIN (self->priv->conference), self->priv->capsfilter); self->priv->capsfilter = NULL; } if (self->priv->input_valve) { gst_element_set_locked_state (self->priv->input_valve, TRUE); gst_element_set_state (self->priv->input_valve, GST_STATE_NULL); gst_bin_remove (GST_BIN (self->priv->conference), self->priv->input_valve); self->priv->input_valve = NULL; } if (self->priv->rtpbin_pad) { gst_object_unref (self->priv->rtpbin_pad); self->priv->rtpbin_pad = NULL; } G_OBJECT_CLASS (fs_rtp_sub_stream_parent_class)->dispose (object); } static void fs_rtp_sub_stream_finalize (GObject *object) { FsRtpSubStream *self = FS_RTP_SUB_STREAM (object); fs_codec_destroy (self->codec); g_mutex_clear (&self->priv->mutex); g_rw_lock_clear (&self->priv->stopped_lock); G_OBJECT_CLASS (fs_rtp_sub_stream_parent_class)->finalize (object); } /* * These properties can only be accessed while holding the session lock * */ static void fs_rtp_sub_stream_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) { FsRtpSubStream *self = FS_RTP_SUB_STREAM (object); switch (prop_id) { case PROP_CONFERENCE: self->priv->conference = g_value_get_object (value); break; case PROP_SESSION: self->priv->session = g_value_get_object (value); break; case PROP_STREAM: if (self->priv->stream) GST_WARNING ("Stream already set, not re-setting"); else self->priv->stream = g_value_get_object (value); break; case PROP_RTPBIN_PAD: self->priv->rtpbin_pad = GST_PAD (g_value_dup_object (value)); break; case PROP_SSRC: self->ssrc = g_value_get_uint (value); break; case PROP_PT: self->pt = g_value_get_uint (value); break; case PROP_RECEIVING: self->priv->receiving = g_value_get_boolean (value); if (self->priv->input_valve) g_object_set (G_OBJECT (self->priv->input_valve), "drop", !self->priv->receiving, NULL); break; case PROP_NO_RTCP_TIMEOUT: self->no_rtcp_timeout = g_value_get_int (value); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } /* * These properties can only be accessed while holding the session lock * */ static void fs_rtp_sub_stream_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) { FsRtpSubStream *self = FS_RTP_SUB_STREAM (object); switch (prop_id) { case PROP_CONFERENCE: g_value_set_object (value, self->priv->conference); break; case PROP_SESSION: g_value_set_object (value, self->priv->session); break; case PROP_STREAM: g_value_set_object (value, self->priv->stream); break; case PROP_RTPBIN_PAD: g_value_set_object (value, self->priv->rtpbin_pad); break; case PROP_SSRC: g_value_set_uint (value, self->ssrc); break; case PROP_PT: g_value_set_uint (value, self->pt); break; case PROP_CODEC: g_value_set_boxed (value, self->codec); break; case PROP_RECEIVING: g_value_set_boolean (value, self->priv->receiving); break; case PROP_OUTPUT_GHOSTPAD: g_value_set_object (value, self->priv->output_ghostpad); break; case PROP_NO_RTCP_TIMEOUT: g_value_set_int (value, self->no_rtcp_timeout); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } /** * fs_rtp_sub_stream_set_codecbin: * * Add and links the rtpbin for a given substream. * Removes any codecbin that was previously there. * * This function will swallow one ref to the codecbin and the codec. * * Returns: TRUE on success */ static gboolean fs_rtp_sub_stream_set_codecbin (FsRtpSubStream *substream, GstElement *codecbin, guint builder_hash, GError **error) { gboolean ret = FALSE; GstPad *pad; if (substream->priv->codecbin) { gst_element_set_locked_state (substream->priv->codecbin, TRUE); if (gst_element_set_state (substream->priv->codecbin, GST_STATE_NULL) != GST_STATE_CHANGE_SUCCESS) { gst_element_set_locked_state (substream->priv->codecbin, FALSE); g_set_error (error, FS_ERROR, FS_ERROR_INTERNAL, "Could not set the codec bin for ssrc %u" " and payload type %d to the state NULL", substream->ssrc, substream->pt); gst_object_unref (codecbin); return FALSE; } gst_bin_remove (GST_BIN (substream->priv->conference), substream->priv->codecbin); FS_RTP_SESSION_LOCK (substream->priv->session); substream->priv->codecbin = NULL; substream->priv->builder_hash = 0; FS_RTP_SESSION_UNLOCK (substream->priv->session); } if (!gst_bin_add (GST_BIN (substream->priv->conference), codecbin)) { gst_object_unref (codecbin); g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION, "Could not add the codec bin to the conference"); return FALSE; } if (gst_element_set_state (codecbin, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION, "Could not set the codec bin to the playing state"); goto error; } if (!gst_element_link_pads (codecbin, "src", substream->priv->output_valve, "sink")) { g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION, "Could not link the codec bin to the output_valve"); goto error; } if (!gst_element_link_pads (substream->priv->capsfilter, "src", codecbin, "sink")) { g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION, "Could not link the receive capsfilter and the codecbin for pt %d", substream->pt); goto error; } pad = gst_element_get_static_pad (codecbin, "sink"); if (!pad) { g_set_error (error, FS_ERROR, FS_ERROR_INTERNAL, "Could not get sink pad" " from codecbin"); goto error; } gst_object_unref (pad); FS_RTP_SESSION_LOCK (substream->priv->session); substream->priv->codecbin = codecbin; substream->priv->builder_hash = builder_hash; if (substream->priv->stream && !substream->priv->output_ghostpad) { if (!fs_rtp_sub_stream_add_output_ghostpad_unlock (substream, error)) goto error; } else { FS_RTP_SESSION_UNLOCK (substream->priv->session); g_signal_emit (substream, signals[CODEC_CHANGED], 0); } return TRUE; error: gst_element_set_locked_state (codecbin, TRUE); gst_element_set_state (codecbin, GST_STATE_NULL); gst_bin_remove (GST_BIN (substream->priv->conference), codecbin); return ret; } FsRtpSubStream * fs_rtp_sub_stream_new (FsRtpConference *conference, FsRtpSession *session, GstPad *rtpbin_pad, guint32 ssrc, guint pt, gint no_rtcp_timeout, GError **error) { FsRtpSubStream *substream = g_object_new (FS_TYPE_RTP_SUB_STREAM, "conference", conference, "session", session, "rtpbin-pad", rtpbin_pad, "ssrc", ssrc, "pt", pt, "no-rtcp-timeout", no_rtcp_timeout, NULL); if (substream->priv->construction_error) { g_propagate_error (error, substream->priv->construction_error); g_object_unref (substream); return NULL; } return substream; } /** * fs_rtp_sub_stream_stop: * * Stops all of the elements on a #FsRtpSubstream */ void fs_rtp_sub_stream_stop (FsRtpSubStream *substream) { substream->priv->stopped = TRUE; g_rw_lock_writer_lock (&substream->priv->stopped_lock); substream->priv->stopped = TRUE; g_rw_lock_writer_unlock (&substream->priv->stopped_lock); if (substream->priv->rtpbin_unlinked_sig) { g_signal_handler_disconnect (substream->priv->rtpbin_pad, substream->priv->rtpbin_unlinked_sig); substream->priv->rtpbin_unlinked_sig = 0; } FS_RTP_SESSION_LOCK (substream->priv->session); if (substream->priv->blocking_id != 0) { gst_pad_remove_probe (substream->priv->rtpbin_pad, substream->priv->blocking_id); substream->priv->blocking_id = 0; } FS_RTP_SESSION_UNLOCK (substream->priv->session); if (substream->priv->check_caps_id != 0) { gst_pad_remove_probe (substream->priv->rtpbin_pad, substream->priv->check_caps_id); substream->priv->check_caps_id = 0; } if (substream->priv->output_ghostpad) gst_pad_set_active (substream->priv->output_ghostpad, FALSE); if (substream->priv->output_valve) { gst_element_set_locked_state (substream->priv->output_valve, TRUE); gst_element_set_state (substream->priv->output_valve, GST_STATE_NULL); } if (substream->priv->codecbin) { gst_element_set_locked_state (substream->priv->codecbin, TRUE); gst_element_set_state (substream->priv->codecbin, GST_STATE_NULL); } if (substream->priv->capsfilter) { gst_element_set_locked_state (substream->priv->capsfilter, TRUE); gst_element_set_state (substream->priv->capsfilter, GST_STATE_NULL); } if (substream->priv->input_valve) { gst_element_set_locked_state (substream->priv->input_valve, TRUE); gst_element_set_state (substream->priv->input_valve, GST_STATE_NULL); } } /** * fs_rtp_sub_stream_add_output_ghostpad_unlock: * * Creates and adds an output ghostpad for this substreams * * The caller MUST hold the session lock * * Returns: TRUE on Success, FALSE on error */ gboolean fs_rtp_sub_stream_add_output_ghostpad_unlock (FsRtpSubStream *substream, GError **error) { GstPad *valve_srcpad; gchar *padname = NULL; GstPad *ghostpad = NULL; FsCodec *codec = NULL; if (fs_rtp_sub_stream_has_stopped_enter (substream)) { FS_RTP_SESSION_UNLOCK (substream->priv->session); return TRUE; } if (substream->priv->adding_output_ghostpad) { FS_RTP_SESSION_UNLOCK (substream->priv->session); goto out; } g_assert (substream->priv->output_ghostpad == NULL); substream->priv->adding_output_ghostpad = TRUE; padname = g_strdup_printf ("src_%u_%u_%u", substream->priv->session->id, substream->ssrc, substream->pt); FS_RTP_SESSION_UNLOCK (substream->priv->session); valve_srcpad = gst_element_get_static_pad (substream->priv->output_valve, "src"); g_assert (valve_srcpad); ghostpad = gst_ghost_pad_new_from_template (padname, valve_srcpad, gst_element_class_get_pad_template ( GST_ELEMENT_GET_CLASS (substream->priv->conference), "src_%u_%u_%u")); gst_object_unref (valve_srcpad); g_free (padname); if (!ghostpad) { g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION, "Could not build ghostpad src_%u_%u_%u", substream->priv->session->id, substream->ssrc, substream->pt); goto error; } if (!gst_pad_set_active (ghostpad, TRUE)) { g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION, "Could not activate the src_%u_%u_%u", substream->priv->session->id, substream->ssrc, substream->pt); gst_object_unref (ghostpad); goto error; } if (!gst_element_add_pad (GST_ELEMENT (substream->priv->conference), ghostpad)) { g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION, "Could not add ghostpad src_%u_%u_%u to the conference", substream->priv->session->id, substream->ssrc, substream->pt); gst_object_unref (ghostpad); goto error; } FS_RTP_SESSION_LOCK (substream->priv->session); substream->priv->output_ghostpad = ghostpad; GST_DEBUG ("Src pad added on substream for ssrc:%X pt:%u " FS_CODEC_FORMAT, substream->ssrc, substream->pt, FS_CODEC_ARGS (substream->codec)); codec = fs_codec_copy (substream->codec); FS_RTP_SESSION_UNLOCK (substream->priv->session); g_signal_emit (substream, signals[SRC_PAD_ADDED], 0, ghostpad, codec); g_signal_emit (substream, signals[CODEC_CHANGED], 0); fs_codec_destroy (codec); g_object_set (substream->priv->output_valve, "drop", FALSE, NULL); out: fs_rtp_sub_stream_has_stopped_exit (substream); return TRUE; error: substream->priv->adding_output_ghostpad = FALSE; fs_rtp_sub_stream_has_stopped_exit (substream); return FALSE; } static GstPadProbeReturn _probe_check_caps (GstPad *pad, GstPadProbeInfo *info, gpointer user_data) { FsRtpSubStream *self = FS_RTP_SUB_STREAM (user_data); GstEvent *event; GstPadProbeReturn ret = GST_PAD_PROBE_DROP; /* drop buffers before we have the right caps */ if (!(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM)) return GST_PAD_PROBE_DROP; event = GST_PAD_PROBE_INFO_EVENT (info); /* let other events like segments through before the caps we like */ if (GST_EVENT_TYPE (event) != GST_EVENT_CAPS) return GST_PAD_PROBE_PASS; if (fs_rtp_session_has_disposed_enter (self->priv->session, NULL)) return GST_PAD_PROBE_REMOVE; if (fs_rtp_sub_stream_has_stopped_enter (self)) { fs_rtp_session_has_disposed_exit (self->priv->session); return GST_PAD_PROBE_REMOVE; } FS_RTP_SESSION_LOCK (self->priv->session); if (self->priv->codecbin && self->codec) { GstCaps *caps; gst_event_parse_caps (event, &caps); if (gst_pad_set_caps (pad, caps)) ret = GST_PAD_PROBE_REMOVE; } FS_RTP_SESSION_UNLOCK (self->priv->session); fs_rtp_sub_stream_has_stopped_exit (self); fs_rtp_session_has_disposed_exit (self->priv->session); return ret; } static GstPadProbeReturn _rtpbin_pad_blocked_callback (GstPad *pad, GstPadProbeInfo *info, gpointer user_data) { FsRtpSubStream *substream = user_data; GError *error = NULL; GstElement *codecbin = NULL; guint new_builder_hash = 0; FsCodec *codec = NULL; FsRtpSession *session; GstCaps *caps = NULL; if (GST_PAD_PROBE_INFO_TYPE (info) == GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM && !GST_EVENT_IS_SERIALIZED (GST_PAD_PROBE_INFO_EVENT (info))) return GST_PAD_PROBE_PASS; FS_RTP_SESSION_LOCK (substream->priv->session); substream->priv->blocking_id = 0; FS_RTP_SESSION_UNLOCK (substream->priv->session); if (fs_rtp_session_has_disposed_enter (substream->priv->session, NULL)) { return GST_PAD_PROBE_REMOVE; } if (fs_rtp_sub_stream_has_stopped_enter (substream)) { fs_rtp_session_has_disposed_exit (substream->priv->session); return GST_PAD_PROBE_REMOVE; } g_object_ref (substream); session = g_object_ref (substream->priv->session); GST_DEBUG ("Substream blocked for codec change (session:%d SSRC:%x pt:%d)", substream->priv->session->id, substream->ssrc, substream->pt); g_signal_emit (substream, signals[GET_CODEC_BIN], 0, substream->priv->stream, &codec, substream->priv->builder_hash, &new_builder_hash, &error, &codecbin); if (error) goto error; FS_RTP_SESSION_LOCK (substream->priv->session); if (codec && (!substream->codec || !fs_codec_are_equal (codec, substream->codec))) { gchar *tmp; if (substream->codec) fs_codec_destroy (substream->codec); substream->codec = codec; caps = fs_codec_to_gst_caps (codec); tmp = gst_caps_to_string (caps); GST_DEBUG ("Setting caps %s on recv substream", tmp); g_free (tmp); g_object_set (substream->priv->capsfilter, "caps", caps, NULL); } else if (codec) { fs_codec_destroy (codec); } FS_RTP_SESSION_UNLOCK (substream->priv->session); if (codecbin) { if (!fs_rtp_sub_stream_set_codecbin (substream, codecbin, new_builder_hash, &error)) goto error; } if (caps) { if (!gst_pad_set_caps (substream->priv->rtpbin_pad, caps)) { if (substream->priv->check_caps_id == 0) substream->priv->check_caps_id = gst_pad_add_probe (substream->priv->rtpbin_pad, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM, _probe_check_caps, g_object_ref (substream), g_object_unref); } gst_caps_unref (caps); } out: g_clear_error (&error); fs_rtp_sub_stream_has_stopped_exit (substream); fs_rtp_session_has_disposed_exit (substream->priv->session); g_object_unref (substream); g_object_unref (session); return GST_PAD_PROBE_REMOVE; error: g_prefix_error (&error, "Could not add the new recv codec bin for" " ssrc %u and payload type %d to the state NULL: ", substream->ssrc, substream->pt); if (substream->priv->stream) fs_stream_emit_error (FS_STREAM (substream->priv->stream), FS_ERROR_CONSTRUCTION, error->message); else fs_session_emit_error (FS_SESSION (substream->priv->session), FS_ERROR_CONSTRUCTION, error->message); if (caps) gst_caps_unref (caps); goto out; } /** * fs_rtp_sub_stream_verify_codec_locked: * @substream: A #FsRtpSubStream * * This function will start the process that invalidates the codec * for this rtpbin. * * You must hold the session lock to call it. */ void fs_rtp_sub_stream_verify_codec_locked (FsRtpSubStream *substream) { if (fs_rtp_sub_stream_has_stopped_enter (substream)) return; GST_LOG ("Starting codec verification process for substream with" " SSRC:%x pt:%d", substream->ssrc, substream->pt); if (!substream->priv->blocking_id) substream->priv->blocking_id = gst_pad_add_probe ( substream->priv->rtpbin_pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, _rtpbin_pad_blocked_callback, g_object_ref (substream), g_object_unref); fs_rtp_sub_stream_has_stopped_exit (substream); } static void fs_rtp_sub_stream_emit_error (FsRtpSubStream *substream, gint error_no, gchar *error_msg, gchar *debug_msg) { g_signal_emit (substream, signals[ERROR_SIGNAL], 0, error_no, error_msg, debug_msg); }