/* * Farstream - Farstream RTP Stream * * Copyright 2007 Collabora Ltd. * @author: Olivier Crete * Copyright 2007 Nokia Corp. * * fs-rtp-stream.c - A Farstream RTP Stream gobject * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ /** * SECTION:fs-rtp-stream * @short_description: A RTP stream in a #FsRtpSession in a #FsRtpConference * * This is the conjunction of a #FsRtpParticipant and a #FsRtpSession, * it is created by calling fs_session_new_stream() on a * #FsRtpSession. * * SRTP authentication & decryption * * * To tell #FsRtpStream to authenticate and decrypt the media it is * receiving using SRTP, one must set the parameters using a * #GstStructure named "FarstreamSRTP" and pass it to * fs_stream_set_decryption_parameters(). * * The cipher, auth, and key must be specified, refer to the FsRtpSession * documentation for details. * * */ #ifdef HAVE_CONFIG_H #include "config.h" #endif #include "fs-rtp-stream.h" #include #include /* Signals */ enum { LAST_SIGNAL }; /* props */ enum { PROP_0, #if 0 /* TODO Do we really need this? */ PROP_SOURCE_PADS, #endif PROP_REMOTE_CODECS, PROP_NEGOTIATED_CODECS, PROP_CURRENT_RECV_CODECS, PROP_DIRECTION, PROP_PARTICIPANT, PROP_SESSION, PROP_RTP_HEADER_EXTENSIONS, PROP_DECRYPTION_PARAMETERS, PROP_SEND_RTCP_MUX, PROP_REQUIRE_ENCRYPTION }; struct _FsRtpStreamPrivate { FsRtpSession *session; FsStreamTransmitter *stream_transmitter; FsStreamDirection direction; gboolean send_rtcp_mux; stream_new_remote_codecs_cb new_remote_codecs_cb; stream_known_source_packet_receive_cb known_source_packet_received_cb; stream_sending_changed_locked_cb sending_changed_locked_cb; stream_ssrc_added_cb ssrc_added_cb; stream_get_new_stream_transmitter_cb get_new_stream_transmitter_cb; stream_decrypt_clear_locked_cb decrypt_clear_locked_cb; gpointer user_data_for_cb; /* protected by session lock */ GstStructure *decryption_parameters; gboolean encrypted; gulong local_candidates_prepared_handler_id; gulong new_active_candidate_pair_handler_id; gulong new_local_candidate_handler_id; gulong error_handler_id; gulong known_source_packet_received_handler_id; gulong state_changed_handler_id; GMutex mutex; }; G_DEFINE_TYPE(FsRtpStream, fs_rtp_stream, FS_TYPE_STREAM); #define FS_RTP_STREAM_GET_PRIVATE(o) \ (G_TYPE_INSTANCE_GET_PRIVATE ((o), FS_TYPE_RTP_STREAM, FsRtpStreamPrivate)) static void fs_rtp_stream_dispose (GObject *object); static void fs_rtp_stream_finalize (GObject *object); static void fs_rtp_stream_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec); static void fs_rtp_stream_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec); static gboolean fs_rtp_stream_add_remote_candidates (FsStream *stream, GList *candidates, GError **error); static gboolean fs_rtp_stream_force_remote_candidates (FsStream *stream, GList *remote_candidates, GError **error); static gboolean fs_rtp_stream_set_remote_codecs (FsStream *stream, GList *remote_codecs, GError **error); static gboolean fs_rtp_stream_set_transmitter (FsStream *stream, const gchar *transmitter, GParameter *stream_transmitter_parameters, guint stream_transmitter_n_parameters, GError **error); static void fs_rtp_stream_add_id (FsStream *stream, guint id); static gboolean fs_rtp_stream_set_decryption_parameters (FsStream *stream, GstStructure *parameters, GError **error); static void _local_candidates_prepared ( FsStreamTransmitter *stream_transmitter, gpointer user_data); static void _new_active_candidate_pair ( FsStreamTransmitter *stream_transmitter, FsCandidate *candidate1, FsCandidate *candidate2, gpointer user_data); static void _new_local_candidate ( FsStreamTransmitter *stream_transmitter, FsCandidate *candidate, gpointer user_data); static void _known_source_packet_received (FsStreamTransmitter *st, guint component, GstBuffer *buffer, FsRtpStream *self); static void _transmitter_error ( FsStreamTransmitter *stream_transmitter, gint errorno, gchar *error_msg, gpointer user_data); static void _substream_codec_changed (FsRtpSubStream *substream, FsRtpStream *stream); static void _state_changed (FsStreamTransmitter *stream_transmitter, guint component, FsStreamState state, gpointer user_data); // static guint signals[LAST_SIGNAL] = { 0 }; static void fs_rtp_stream_class_init (FsRtpStreamClass *klass) { GObjectClass *gobject_class; FsStreamClass *stream_class = FS_STREAM_CLASS (klass); gobject_class = (GObjectClass *) klass; gobject_class->set_property = fs_rtp_stream_set_property; gobject_class->get_property = fs_rtp_stream_get_property; gobject_class->dispose = fs_rtp_stream_dispose; gobject_class->finalize = fs_rtp_stream_finalize; stream_class->add_remote_candidates = fs_rtp_stream_add_remote_candidates; stream_class->set_remote_codecs = fs_rtp_stream_set_remote_codecs; stream_class->force_remote_candidates = fs_rtp_stream_force_remote_candidates; stream_class->add_id = fs_rtp_stream_add_id; stream_class->set_transmitter = fs_rtp_stream_set_transmitter; stream_class->set_decryption_parameters = fs_rtp_stream_set_decryption_parameters; g_type_class_add_private (klass, sizeof (FsRtpStreamPrivate)); g_object_class_override_property (gobject_class, PROP_REMOTE_CODECS, "remote-codecs"); g_object_class_override_property (gobject_class, PROP_NEGOTIATED_CODECS, "negotiated-codecs"); g_object_class_override_property (gobject_class, PROP_CURRENT_RECV_CODECS, "current-recv-codecs"); g_object_class_override_property (gobject_class, PROP_DIRECTION, "direction"); g_object_class_override_property (gobject_class, PROP_PARTICIPANT, "participant"); g_object_class_override_property (gobject_class, PROP_SESSION, "session"); g_object_class_override_property (gobject_class, PROP_DECRYPTION_PARAMETERS, "decryption-parameters"); g_object_class_override_property (gobject_class, PROP_REQUIRE_ENCRYPTION, "require-encryption"); g_object_class_install_property (gobject_class, PROP_RTP_HEADER_EXTENSIONS, g_param_spec_boxed ("rtp-header-extensions", "RTP Header extension desired by participant in this stream", "GList of RTP Header extensions that the participant for this stream" " would like to use", FS_TYPE_RTP_HEADER_EXTENSION_LIST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_SEND_RTCP_MUX, g_param_spec_boolean ("send-rtcp-mux", "Send RTCP muxed with on the same RTP connection", "Send RTCP muxed with on the same RTP connection", FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); } static void fs_rtp_stream_init (FsRtpStream *self) { /* member init */ self->priv = FS_RTP_STREAM_GET_PRIVATE (self); self->priv->session = NULL; self->participant = NULL; self->priv->stream_transmitter = NULL; g_mutex_init (&self->priv->mutex); self->priv->direction = FS_DIRECTION_NONE; } static FsRtpSession * fs_rtp_stream_get_session (FsRtpStream *self, GError **error) { FsRtpSession *session; g_mutex_lock (&self->priv->mutex); session = self->priv->session; if (session) g_object_ref (session); g_mutex_unlock (&self->priv->mutex); if (!session) g_set_error (error, FS_ERROR, FS_ERROR_DISPOSED, "Called function after stream has been disposed"); return session; } static FsStreamTransmitter * fs_rtp_stream_get_stream_transmitter (FsRtpStream *self, GError **error) { FsRtpSession *session = fs_rtp_stream_get_session (self, error); FsStreamTransmitter *st = NULL; if (!session) return NULL; FS_RTP_SESSION_LOCK (session); st = self->priv->stream_transmitter; if (st) g_object_ref (st); FS_RTP_SESSION_UNLOCK (session); if (!st) g_set_error (error, FS_ERROR, FS_ERROR_DISPOSED, "Stream transmitter not set (or stream has been disposed)"); g_object_unref (session); return st; } static void fs_rtp_stream_dispose (GObject *object) { FsRtpStream *self = FS_RTP_STREAM (object); FsStreamTransmitter *st; FsRtpParticipant *participant; FsRtpSession *session = fs_rtp_stream_get_session (self, NULL); if (!session) return; g_mutex_lock (&self->priv->mutex); self->priv->session = NULL; g_mutex_unlock (&self->priv->mutex); FS_RTP_SESSION_LOCK (session); if (self->priv->sending_changed_locked_cb && self->priv->direction & FS_DIRECTION_SEND) self->priv->sending_changed_locked_cb (self, FALSE, self->priv->user_data_for_cb); participant = self->participant; self->participant = NULL; st = self->priv->stream_transmitter; self->priv->stream_transmitter = NULL; if (st) { g_signal_handler_disconnect (st, self->priv->local_candidates_prepared_handler_id); g_signal_handler_disconnect (st, self->priv->new_active_candidate_pair_handler_id); g_signal_handler_disconnect (st, self->priv->new_local_candidate_handler_id); g_signal_handler_disconnect (st, self->priv->error_handler_id); g_signal_handler_disconnect (st, self->priv->known_source_packet_received_handler_id); g_signal_handler_disconnect (st, self->priv->state_changed_handler_id); FS_RTP_SESSION_UNLOCK (session); fs_stream_transmitter_stop (st); g_object_unref (st); FS_RTP_SESSION_LOCK (session); } while (self->substreams) { FsRtpSubStream *substream = self->substreams->data; self->substreams = g_list_remove (self->substreams, substream); FS_RTP_SESSION_UNLOCK (session); g_object_unref (substream); FS_RTP_SESSION_LOCK (session); } FS_RTP_SESSION_UNLOCK (session); g_object_unref (participant); g_object_unref (session); g_object_unref (session); G_OBJECT_CLASS (fs_rtp_stream_parent_class)->dispose (object); } static void fs_rtp_stream_finalize (GObject *object) { FsRtpStream *self = FS_RTP_STREAM (object); fs_codec_list_destroy (self->remote_codecs); fs_codec_list_destroy (self->negotiated_codecs); if (self->priv->decryption_parameters) gst_structure_free (self->priv->decryption_parameters); g_mutex_clear (&self->priv->mutex); G_OBJECT_CLASS (fs_rtp_stream_parent_class)->finalize (object); } static gboolean _codec_list_has_codec (GList *list, FsCodec *codec) { for (; list; list = g_list_next (list)) { FsCodec *listcodec = list->data; if (fs_codec_are_equal (codec, listcodec)) return TRUE; } return FALSE; } static void fs_rtp_stream_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) { FsRtpStream *self = FS_RTP_STREAM (object); FsRtpSession *session = fs_rtp_stream_get_session (self, NULL); if (!session) return; switch (prop_id) { case PROP_REMOTE_CODECS: FS_RTP_SESSION_LOCK (session); g_value_set_boxed (value, self->remote_codecs); FS_RTP_SESSION_UNLOCK (session); break; case PROP_NEGOTIATED_CODECS: FS_RTP_SESSION_LOCK (session); g_value_set_boxed (value, self->negotiated_codecs); FS_RTP_SESSION_UNLOCK (session); break; case PROP_SESSION: g_value_set_object (value, session); break; case PROP_PARTICIPANT: FS_RTP_SESSION_LOCK (session); g_value_set_object (value, self->participant); FS_RTP_SESSION_UNLOCK (session); break; case PROP_DIRECTION: g_value_set_flags (value, self->priv->direction); break; case PROP_CURRENT_RECV_CODECS: { GList *codeclist = NULL; GList *substream_item; FS_RTP_SESSION_LOCK (session); for (substream_item = g_list_first (self->substreams); substream_item; substream_item = g_list_next (substream_item)) { FsRtpSubStream *substream = substream_item->data; if (substream->codec) { if (!_codec_list_has_codec (codeclist, substream->codec)) codeclist = g_list_append (codeclist, fs_codec_copy (substream->codec)); } } g_value_take_boxed (value, codeclist); FS_RTP_SESSION_UNLOCK (session); } break; case PROP_RTP_HEADER_EXTENSIONS: FS_RTP_SESSION_LOCK (session); g_value_set_boxed (value, self->hdrext); FS_RTP_SESSION_UNLOCK (session); break; case PROP_DECRYPTION_PARAMETERS: FS_RTP_SESSION_LOCK (session); g_value_set_boxed (value, self->priv->decryption_parameters); FS_RTP_SESSION_UNLOCK (session); break; case PROP_SEND_RTCP_MUX: FS_RTP_SESSION_LOCK (session); if (self->priv->stream_transmitter == NULL || g_object_class_find_property ( G_OBJECT_GET_CLASS (self->priv->stream_transmitter), "send-component-mux") != NULL) g_value_set_boolean (value, self->priv->send_rtcp_mux); else g_value_set_boolean (value, FALSE); FS_RTP_SESSION_UNLOCK (session); break; case PROP_REQUIRE_ENCRYPTION: FS_RTP_SESSION_LOCK (session); g_value_set_boolean (value, fs_rtp_stream_requires_crypto_locked (self)); FS_RTP_SESSION_UNLOCK (session); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } g_object_unref (session); } static void fs_rtp_stream_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) { FsRtpStream *self = FS_RTP_STREAM (object); GList *item; switch (prop_id) { case PROP_SESSION: self->priv->session = FS_RTP_SESSION (g_value_dup_object (value)); break; case PROP_PARTICIPANT: self->participant = FS_RTP_PARTICIPANT (g_value_dup_object (value)); break; case PROP_DIRECTION: { FsStreamTransmitter *st = NULL; GList *copy = NULL; FsRtpSession *session = fs_rtp_stream_get_session (self, NULL); FsStreamDirection dir; if (!session) { self->priv->direction = g_value_get_flags (value); return; } FS_RTP_SESSION_LOCK (session); if (self->priv->sending_changed_locked_cb && (self->priv->direction & FS_DIRECTION_SEND) != (g_value_get_flags (value) & FS_DIRECTION_SEND)) self->priv->sending_changed_locked_cb (self, g_value_get_flags (value) & FS_DIRECTION_SEND, self->priv->user_data_for_cb); dir = self->priv->direction = g_value_get_flags (value); FS_RTP_SESSION_UNLOCK (session); st = fs_rtp_stream_get_stream_transmitter (self, NULL); if (st) { g_object_set (self->priv->stream_transmitter, "sending", dir & FS_DIRECTION_SEND, NULL); g_object_unref (st); } FS_RTP_SESSION_LOCK (session); copy = g_list_copy (g_list_first (self->substreams)); g_list_foreach (copy, (GFunc) g_object_ref, NULL); FS_RTP_SESSION_UNLOCK (session); for (item = copy; item; item = g_list_next (item)) g_object_set (G_OBJECT (item->data), "receiving", ((dir & FS_DIRECTION_RECV) != 0), NULL); g_list_foreach (copy, (GFunc) g_object_unref, NULL); g_list_free (copy); g_object_unref (session); } break; case PROP_RTP_HEADER_EXTENSIONS: { FsRtpSession *session = fs_rtp_stream_get_session (self, NULL); if (session) { FS_RTP_SESSION_LOCK (session); fs_rtp_header_extension_list_destroy (self->hdrext); self->hdrext = g_value_dup_boxed (value); FS_RTP_SESSION_UNLOCK (session); /* The callbadck can not fail because it does not change * the codecs */ self->priv->new_remote_codecs_cb (NULL, NULL, NULL, self->priv->user_data_for_cb); g_object_unref (session); } } break; case PROP_SEND_RTCP_MUX: { FsRtpSession *session = fs_rtp_stream_get_session (self, NULL); if (session) { FS_RTP_SESSION_LOCK (session); self->priv->send_rtcp_mux = g_value_get_boolean (value); if (self->priv->stream_transmitter != NULL && g_object_class_find_property ( G_OBJECT_GET_CLASS (self->priv->stream_transmitter), "send-component-mux") != NULL) g_object_set (self->priv->stream_transmitter, "send-component-mux", self->priv->send_rtcp_mux, NULL); FS_RTP_SESSION_UNLOCK (session); } } break; case PROP_REQUIRE_ENCRYPTION: { FsRtpSession *session = fs_rtp_stream_get_session (self, NULL); if (session) { FS_RTP_SESSION_LOCK (session); if (self->priv->encrypted != g_value_get_boolean (value)) { self->priv->encrypted = g_value_get_boolean (value); if (!self->priv->decrypt_clear_locked_cb (self, self->priv->user_data_for_cb)) { g_warning ("Can't set encryption because srtpdec is not" " installed"); self->priv->encrypted = FALSE; } } FS_RTP_SESSION_UNLOCK (session); } } break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } /** * fs_rtp_stream_add_remote_candidate: */ static gboolean fs_rtp_stream_add_remote_candidates (FsStream *stream, GList *candidates, GError **error) { FsRtpStream *self = FS_RTP_STREAM (stream); FsStreamTransmitter *st = fs_rtp_stream_get_stream_transmitter (self, error); gboolean ret = FALSE; if (!st) return FALSE; ret = fs_stream_transmitter_add_remote_candidates (st, candidates, error); g_object_unref (st); return ret; } /** * fs_rtp_stream_force_remote_candidates * * Implement FsStream -> force_remote_candidates * by calling the same function in the stream transmittrer */ static gboolean fs_rtp_stream_force_remote_candidates (FsStream *stream, GList *remote_candidates, GError **error) { FsRtpStream *self = FS_RTP_STREAM (stream); FsStreamTransmitter *st = fs_rtp_stream_get_stream_transmitter (self, error); gboolean ret = FALSE; if (!st) return FALSE; ret = fs_stream_transmitter_force_remote_candidates ( self->priv->stream_transmitter, remote_candidates, error); g_object_unref (st); return ret; } /** * fs_rtp_stream_set_remote_codecs: * @stream: an #FsStream * @remote_codecs: a #GList of #FsCodec representing the remote codecs * @error: location of a #GError, or NULL if no error occured * * This function will set the list of remote codecs for this stream. If * the given remote codecs couldn't be negotiated with the list of local * codecs or already negotiated codecs for the corresponding #FsSession, @error * will be set and %FALSE will be returned. The @remote_codecs list will be * copied so it must be free'd using fs_codec_list_destroy() when done. * * Returns: %FALSE if the remote codecs couldn't be set. */ static gboolean fs_rtp_stream_set_remote_codecs (FsStream *stream, GList *remote_codecs, GError **error) { FsRtpStream *self = FS_RTP_STREAM (stream); GList *item = NULL; FsMediaType media_type; FsRtpSession *session = fs_rtp_stream_get_session (self, error); if (!session) return FALSE; if (remote_codecs == NULL) { g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS, "You can not set NULL remote codecs"); goto error; } g_object_get (session, "media-type", &media_type, NULL); for (item = g_list_first (remote_codecs); item; item = g_list_next (item)) { FsCodec *codec = item->data; if (!codec->encoding_name) { g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS, "The codec must have an encoding name"); goto error; } if (codec->id < 0 || codec->id > 128) { g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS, "The codec id must be between 0 ans 128 for %s", codec->encoding_name); goto error; } if (codec->media_type != media_type) { g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS, "The media type for codec %s is not %s", codec->encoding_name, fs_media_type_to_string (media_type)); goto error; } } if (self->priv->new_remote_codecs_cb (self, remote_codecs, error, self->priv->user_data_for_cb)) { gboolean is_new = TRUE; FS_RTP_SESSION_LOCK (session); if (self->remote_codecs) { is_new = !fs_codec_list_are_equal (self->remote_codecs, remote_codecs); fs_codec_list_destroy (self->remote_codecs); } self->remote_codecs = fs_codec_list_copy (remote_codecs); FS_RTP_SESSION_UNLOCK (session); if (is_new) g_object_notify (G_OBJECT (stream), "remote-codecs"); } else { goto error; } g_object_unref (session); return TRUE; error: g_object_unref (session); return FALSE; } /** * fs_rtp_stream_new: * @session: The #FsRtpSession this stream is a child of * @participant: The #FsRtpParticipant this stream is for * @direction: the initial #FsDirection for this stream * @new_remote_codecs_cb: Callback called when the remote codecs change * (ie when fs_rtp_stream_set_remote_codecs() is called). One must hold * the session lock across calls. * @known_source_packet_received: Callback called when a packet from a * known source is receive. * @sending_changed_locked_cb: Callback called when the sending status of * this stream changes * @user_data: User data for the callbacks. * This function create a new stream * * Returns: the newly created string or NULL on error */ FsRtpStream * fs_rtp_stream_new (FsRtpSession *session, FsRtpParticipant *participant, FsStreamDirection direction, stream_new_remote_codecs_cb new_remote_codecs_cb, stream_known_source_packet_receive_cb known_source_packet_received_cb, stream_sending_changed_locked_cb sending_changed_locked_cb, stream_ssrc_added_cb ssrc_added_cb, stream_get_new_stream_transmitter_cb get_new_stream_transmitter_cb, stream_decrypt_clear_locked_cb decrypt_clear_locked_cb, gpointer user_data_for_cb) { FsRtpStream *self; g_return_val_if_fail (session, NULL); g_return_val_if_fail (participant, NULL); g_return_val_if_fail (new_remote_codecs_cb, NULL); g_return_val_if_fail (known_source_packet_received_cb, NULL); self = g_object_new (FS_TYPE_RTP_STREAM, "session", session, "participant", participant, "direction", direction, NULL); self->priv->new_remote_codecs_cb = new_remote_codecs_cb; self->priv->known_source_packet_received_cb = known_source_packet_received_cb; self->priv->sending_changed_locked_cb = sending_changed_locked_cb; self->priv->ssrc_added_cb = ssrc_added_cb; self->priv->get_new_stream_transmitter_cb = get_new_stream_transmitter_cb; self->priv->decrypt_clear_locked_cb = decrypt_clear_locked_cb; self->priv->user_data_for_cb = user_data_for_cb; return self; } static void _local_candidates_prepared (FsStreamTransmitter *stream_transmitter, gpointer user_data) { FsRtpStream *self = FS_RTP_STREAM (user_data); GstElement *conf = NULL; FsRtpSession *session = fs_rtp_stream_get_session (self, NULL); if (!session) return; g_object_get (session, "conference", &conf, NULL); gst_element_post_message (conf, gst_message_new_element (GST_OBJECT (conf), gst_structure_new ("farstream-local-candidates-prepared", "stream", FS_TYPE_STREAM, self, NULL))); gst_object_unref (conf); g_object_unref (session); } static void _new_active_candidate_pair ( FsStreamTransmitter *stream_transmitter, FsCandidate *local_candidate, FsCandidate *remote_candidate, gpointer user_data) { FsRtpStream *self = FS_RTP_STREAM (user_data); FsRtpSession *session = fs_rtp_stream_get_session (self, NULL); GstElement *conf = NULL; if (!session) return; g_object_get (session, "conference", &conf, NULL); gst_element_post_message (conf, gst_message_new_element (GST_OBJECT (conf), gst_structure_new ("farstream-new-active-candidate-pair", "stream", FS_TYPE_STREAM, self, "local-candidate", FS_TYPE_CANDIDATE, local_candidate, "remote-candidate", FS_TYPE_CANDIDATE, remote_candidate, NULL))); gst_object_unref (conf); g_object_unref (session); } static void _new_local_candidate ( FsStreamTransmitter *stream_transmitter, FsCandidate *candidate, gpointer user_data) { FsRtpStream *self = FS_RTP_STREAM (user_data); FsRtpSession *session = fs_rtp_stream_get_session (self, NULL); GstElement *conf = NULL; if (!session) return; g_object_get (session, "conference", &conf, NULL); gst_element_post_message (conf, gst_message_new_element (GST_OBJECT (conf), gst_structure_new ("farstream-new-local-candidate", "stream", FS_TYPE_STREAM, self, "candidate", FS_TYPE_CANDIDATE, candidate, NULL))); gst_object_unref (conf); g_object_unref (session); } static void _transmitter_error ( FsStreamTransmitter *stream_transmitter, gint errorno, gchar *error_msg, gpointer user_data) { FsStream *stream = FS_STREAM (user_data); fs_stream_emit_error (stream, errorno, error_msg); } static void _known_source_packet_received (FsStreamTransmitter *st, guint component, GstBuffer *buffer, FsRtpStream *self) { self->priv->known_source_packet_received_cb (self, component, buffer, self->priv->user_data_for_cb); } static void _state_changed (FsStreamTransmitter *stream_transmitter, guint component, FsStreamState state, gpointer user_data) { FsRtpStream *self = FS_RTP_STREAM (user_data); FsRtpSession *session = fs_rtp_stream_get_session (self, NULL); GstElement *conf = NULL; if (!session) return; g_object_get (session, "conference", &conf, NULL); gst_element_post_message (conf, gst_message_new_element (GST_OBJECT (conf), gst_structure_new ("farstream-component-state-changed", "stream", FS_TYPE_STREAM, self, "component", G_TYPE_UINT, component, "state", FS_TYPE_STREAM_STATE, state, NULL))); gst_object_unref (conf); g_object_unref (session); if (component == 1 && state == FS_STREAM_STATE_FAILED) fs_stream_emit_error (FS_STREAM (self), FS_ERROR_CONNECTION_FAILED, "Could not establish connection on the RTP component"); } static void _substream_src_pad_added (FsRtpSubStream *substream, GstPad *pad, FsCodec *codec, gpointer user_data) { FsStream *stream = FS_STREAM (user_data); fs_stream_emit_src_pad_added (stream, pad, codec); } static void _substream_error (FsRtpSubStream *substream, gint errorno, gchar *error_msg, gchar *debug_msg, gpointer user_data) { FsStream *stream = FS_STREAM (user_data); fs_stream_emit_error (stream, errorno, error_msg); } static void _substream_unlinked (FsRtpSubStream *substream, gpointer user_data) { FsRtpStream *stream = FS_RTP_STREAM (user_data); FsRtpSession *session = fs_rtp_stream_get_session (stream, NULL); if (!session) return; FS_RTP_SESSION_LOCK (session); stream->substreams = g_list_remove (stream->substreams, substream); FS_RTP_SESSION_UNLOCK (session); fs_rtp_sub_stream_stop (substream); g_object_unref (substream); g_object_unref (session); } /** * fs_rtp_stream_add_substream_unlock: * @stream: a #FsRtpStream * @substream: the #FsRtpSubStream to associate with this stream * * This functions associates a substream with this stream * * You must enter this function with the session lock held and it will release * it. * * Returns: TRUE on success, FALSE on failure */ gboolean fs_rtp_stream_add_substream_unlock (FsRtpStream *stream, FsRtpSubStream *substream, GError **error) { gboolean ret = TRUE; FsRtpSession *session = fs_rtp_stream_get_session (stream, error); if (!session) return FALSE; stream->substreams = g_list_prepend (stream->substreams, substream); g_object_set (substream, "stream", stream, "receiving", ((stream->priv->direction & FS_DIRECTION_RECV) != 0), NULL); g_signal_connect_object (substream, "unlinked", G_CALLBACK (_substream_unlinked), stream, 0); g_signal_connect_object (substream, "src-pad-added", G_CALLBACK (_substream_src_pad_added), stream, 0); g_signal_connect_object (substream, "codec-changed", G_CALLBACK (_substream_codec_changed), stream, 0); g_signal_connect_object (substream, "error", G_CALLBACK (_substream_error), stream, 0); fs_rtp_sub_stream_verify_codec_locked (substream); /* Only announce a pad if it has a codec attached to it */ if (substream->codec) ret = fs_rtp_sub_stream_add_output_ghostpad_unlock (substream, error); else FS_RTP_SESSION_UNLOCK (session); g_object_unref (session); return ret; } /** * _substream_codec_changed * @substream: The #FsRtpSubStream that may have a new receive codec * @stream: a #FsRtpStream * * This function checks if the specified substream introduces a new codec * not present in another substream and if it does, it emits a GstMessage * and the notify signal */ static void _substream_codec_changed (FsRtpSubStream *substream, FsRtpStream *stream) { GList *substream_item = NULL; GList *codeclist = NULL; FsRtpSession *session = fs_rtp_stream_get_session (stream, NULL); if (!session) return; FS_RTP_SESSION_LOCK (session); if (!substream->codec) { FS_RTP_SESSION_UNLOCK (session); g_object_unref (session); return; } codeclist = g_list_prepend (NULL, fs_codec_copy (substream->codec)); for (substream_item = stream->substreams; substream_item; substream_item = g_list_next (substream_item)) { FsRtpSubStream *othersubstream = substream_item->data; if (othersubstream != substream) { if (othersubstream->codec) { if (fs_codec_are_equal (substream->codec, othersubstream->codec)) break; if (!_codec_list_has_codec (codeclist, othersubstream->codec)) codeclist = g_list_append (codeclist, fs_codec_copy (othersubstream->codec)); } } } FS_RTP_SESSION_UNLOCK (session); if (substream_item == NULL) { GstElement *conf = NULL; g_object_notify (G_OBJECT (stream), "current-recv-codecs"); g_object_get (session, "conference", &conf, NULL); gst_element_post_message (conf, gst_message_new_element (GST_OBJECT (conf), gst_structure_new ("farstream-recv-codecs-changed", "stream", FS_TYPE_STREAM, stream, "codecs", FS_TYPE_CODEC_LIST, codeclist, NULL))); gst_object_unref (conf); } fs_codec_list_destroy (codeclist); g_object_unref (session); } /** * fs_rtp_stream_set_negotiated_codecs_unlock * @stream: a #FsRtpStream * @codecs: The #GList of #FsCodec to set for the negotiated-codecs property * * This function sets the value of the FsStream:negotiated-codecs property. * Unlike most other functions in this element, it TAKES the reference to the * codecs, so you have to give it its own copy. * * You must enter this function with the session lock held and it will release * it. */ void fs_rtp_stream_set_negotiated_codecs_unlock (FsRtpStream *stream, GList *codecs) { FsRtpSession *session = fs_rtp_stream_get_session (stream, NULL); if (!session) return; if (fs_codec_list_are_equal (stream->negotiated_codecs, codecs)) { fs_codec_list_destroy (codecs); FS_RTP_SESSION_UNLOCK (session); g_object_unref (session); return; } if (stream->negotiated_codecs) fs_codec_list_destroy (stream->negotiated_codecs); stream->negotiated_codecs = codecs; FS_RTP_SESSION_UNLOCK (session); g_object_notify (G_OBJECT (stream), "negotiated-codecs"); g_object_unref (session); } static void fs_rtp_stream_add_id (FsStream *stream, guint id) { FsRtpStream *self = FS_RTP_STREAM (stream); FsRtpSession *session = fs_rtp_stream_get_session (self, NULL); if (!session) return; if (self->priv->ssrc_added_cb) self->priv->ssrc_added_cb (self, id, self->priv->user_data_for_cb); g_object_unref (session); } static gboolean fs_rtp_stream_set_transmitter (FsStream *stream, const gchar *transmitter, GParameter *stream_transmitter_parameters, guint stream_transmitter_n_parameters, GError **error) { FsStreamTransmitter *st = NULL; FsRtpStream *self = FS_RTP_STREAM (stream); FsRtpSession *session = fs_rtp_stream_get_session (self, error); if (!session) return FALSE; FS_RTP_SESSION_LOCK (session); if (self->priv->stream_transmitter) { FS_RTP_SESSION_UNLOCK (session); g_object_unref (session); return FALSE; } FS_RTP_SESSION_UNLOCK (session); st = self->priv->get_new_stream_transmitter_cb (self, FS_PARTICIPANT (self->participant), transmitter, stream_transmitter_parameters, stream_transmitter_n_parameters, error, self->priv->user_data_for_cb); if (!st) { g_object_unref (session); return FALSE; } g_object_set (st, "sending", self->priv->direction & FS_DIRECTION_SEND, NULL); self->priv->local_candidates_prepared_handler_id = g_signal_connect_object (st, "local-candidates-prepared", G_CALLBACK (_local_candidates_prepared), self, 0); self->priv->new_active_candidate_pair_handler_id = g_signal_connect_object (st, "new-active-candidate-pair", G_CALLBACK (_new_active_candidate_pair), self, 0); self->priv->new_local_candidate_handler_id = g_signal_connect_object (st, "new-local-candidate", G_CALLBACK (_new_local_candidate), self, 0); self->priv->error_handler_id = g_signal_connect_object (st, "error", G_CALLBACK (_transmitter_error), self, 0); self->priv->known_source_packet_received_handler_id = g_signal_connect_object (st, "known-source-packet-received", G_CALLBACK (_known_source_packet_received), self, 0); self->priv->state_changed_handler_id = g_signal_connect_object (st, "state-changed", G_CALLBACK (_state_changed), self, 0); FS_RTP_SESSION_LOCK (session); self->priv->stream_transmitter = st; if (self->priv->direction & FS_DIRECTION_SEND) self->priv->sending_changed_locked_cb (self, self->priv->direction & FS_DIRECTION_SEND, self->priv->user_data_for_cb); if (g_object_class_find_property (G_OBJECT_GET_CLASS (st), "send-component-mux") != NULL) g_object_set (st, "send-component-mux", self->priv->send_rtcp_mux, NULL); FS_RTP_SESSION_UNLOCK (session); if (!fs_stream_transmitter_gather_local_candidates (st, error)) { FS_RTP_SESSION_LOCK (session); self->priv->stream_transmitter = NULL; FS_RTP_SESSION_UNLOCK (session); g_object_unref (st); g_object_unref (session); return FALSE; } g_object_unref (session); return TRUE; } static gint parse_enum (const gchar *name, const gchar *value, GError **error) { GstElementFactory *factory; GstPluginFeature *loaded_feature; GType srtpenc_type; GObjectClass *srtpenc_class; GParamSpec *spec; GParamSpecEnum *enumspec; GEnumValue *enumvalue; if (value == NULL) goto error; factory = gst_element_factory_find ("srtpenc"); if (!factory) goto error_not_installed; loaded_feature = gst_plugin_feature_load (GST_PLUGIN_FEATURE (factory)); gst_object_unref (factory); factory = GST_ELEMENT_FACTORY (loaded_feature); srtpenc_type = gst_element_factory_get_element_type (factory); gst_object_unref (factory); if (srtpenc_type == 0) goto error_not_installed; srtpenc_class = g_type_class_ref (srtpenc_type); if (!srtpenc_class) goto error_not_installed; spec = g_object_class_find_property (srtpenc_class, name); g_type_class_unref (srtpenc_class); if (!spec) goto error_internal; if (!G_IS_PARAM_SPEC_ENUM (spec)) goto error_internal; enumspec = G_PARAM_SPEC_ENUM (spec); enumvalue = g_enum_get_value_by_nick (enumspec->enum_class, value); if (enumvalue) return enumvalue->value; enumvalue = g_enum_get_value_by_name (enumspec->enum_class, value); if (enumvalue) return enumvalue->value; error: g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS, "Invalid %s value: %s", name, value); return -1; error_not_installed: g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION, "Can't find srtpenc, no encryption possible"); return -1; error_internal: g_set_error (error, FS_ERROR, FS_ERROR_INTERNAL, "Can't find srtpenc %s property or is not a GEnum type!", name); return -1; } gboolean validate_srtp_parameters (GstStructure *parameters, gint *srtp_cipher, gint *srtcp_cipher, gint *srtp_auth, gint *srtcp_auth, GstBuffer **key, guint *replay_window, GError **error) { gint cipher = 0; /* 0 is null cipher, no encryption */ gint auth = -1; *key = NULL; *srtp_cipher = -1; *srtcp_cipher = -1; *srtp_auth = -1; *srtcp_auth = -1; *replay_window = 128; if (parameters) { const GValue *v = NULL; const gchar *tmp; if (!gst_structure_has_name (parameters, "FarstreamSRTP")) { g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS, "The only structure accepted is FarstreamSRTP"); return FALSE; } if ((tmp = gst_structure_get_string (parameters, "cipher"))) { cipher = parse_enum ("rtp-cipher", tmp, error); if (cipher == -1) return FALSE; } if ((tmp = gst_structure_get_string (parameters, "rtp-cipher"))) { *srtp_cipher = parse_enum ("rtp-cipher", tmp, error); if (*srtp_cipher == -1) return FALSE; } if ((tmp = gst_structure_get_string (parameters, "rtcp-cipher"))) { *srtcp_cipher = parse_enum ("rtcp-cipher", tmp, error); if (*srtcp_cipher == -1) return FALSE; } if ((tmp = gst_structure_get_string (parameters, "auth"))) { auth = parse_enum ("rtp-auth", tmp, error); if (auth == -1) return FALSE; } if ((tmp = gst_structure_get_string (parameters, "rtp-auth"))) { *srtp_auth = parse_enum ("rtp-auth", tmp, error); if (*srtp_auth == -1) return FALSE; } if ((tmp = gst_structure_get_string (parameters, "rtcp-auth"))) { *srtcp_auth = parse_enum ("rtcp-auth", tmp, error); if (*srtcp_auth == -1) return FALSE; } if (*srtp_cipher == -1) *srtp_cipher = cipher; if (*srtcp_cipher == -1) *srtcp_cipher = cipher; if (*srtp_auth == -1) *srtp_auth = auth; if (*srtcp_auth == -1) *srtcp_auth = auth; if (*srtp_auth == -1 || *srtcp_auth == -1) { g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS, "At least the authentication MUST be set, \"auth\" or \"rtp-auth\"" " and \"rtcp-auth\" are required."); return FALSE; } v = gst_structure_get_value (parameters, "key"); if (!v) { g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS, "The argument \"key\" is required."); return FALSE; } if (!GST_VALUE_HOLDS_BUFFER (v) || gst_value_get_buffer (v) == NULL) { g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS, "The argument \"key\" MUST hold a GstBuffer."); return FALSE; } *key = gst_value_get_buffer (v); if (gst_structure_get_uint (parameters, "replay-window-size", replay_window)) { if (*replay_window < 64 || *replay_window >= 32768) { g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS, "Reply window size must be between 64 and 32768"); return FALSE; } } } else { *srtp_cipher = *srtcp_cipher = *srtcp_auth = *srtp_auth = 0; /* 0 is NULL */ } return TRUE; } static gboolean fs_rtp_stream_set_decryption_parameters (FsStream *stream, GstStructure *parameters, GError **error) { FsRtpStream *self = FS_RTP_STREAM (stream); GstBuffer *key; gint rtp_cipher; gint rtcp_cipher; gint rtp_auth; gint rtcp_auth; guint replay_window_size; FsRtpSession *session; gboolean ret = FALSE; g_return_val_if_fail (FS_IS_RTP_STREAM (stream), FALSE); g_return_val_if_fail (parameters == NULL || GST_IS_STRUCTURE (parameters), FALSE); if (!validate_srtp_parameters (parameters, &rtp_cipher, &rtcp_cipher, &rtp_auth, &rtcp_auth, &key, &replay_window_size, error)) return FALSE; session = fs_rtp_stream_get_session (self, error); if (!session) return FALSE; FS_RTP_SESSION_LOCK (session); if (self->priv->decryption_parameters != parameters && (!parameters || !self->priv->decryption_parameters || !gst_structure_is_equal (self->priv->decryption_parameters, parameters))) { if (!self->priv->decrypt_clear_locked_cb (self, self->priv->user_data_for_cb)) { g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS, "Can't set encryption because srtpdec is not installed"); goto done; } if (self->priv->decryption_parameters) gst_structure_free (self->priv->decryption_parameters); if (parameters) self->priv->decryption_parameters = gst_structure_copy (parameters); else self->priv->decryption_parameters = NULL; } ret = TRUE; done: FS_RTP_SESSION_UNLOCK (session); g_object_unref (session); return ret; } GstCaps * fs_rtp_stream_get_srtp_caps_locked (FsRtpStream *self) { const gchar *srtp_cipher; const gchar *srtcp_cipher; const gchar *srtp_auth; const gchar *srtcp_auth; const GValue *v; GstBuffer *key; /* This is always TRUE for now, but when we expand to DTLS-SRTP, it may * not be. */ if (!self->priv->decryption_parameters || !gst_structure_has_name (self->priv->decryption_parameters, "FarstreamSRTP")) { /* Return NULL (drop packets) if encrypted, otherwise return * the NULL codec. */ if (self->priv->encrypted) return NULL; else return gst_caps_new_simple ("application/x-srtp", "srtp-cipher", G_TYPE_STRING, "null", "srtcp-cipher", G_TYPE_STRING, "null", "srtp-auth", G_TYPE_STRING, "null", "srtcp-auth", G_TYPE_STRING, "null", NULL); } srtp_cipher = gst_structure_get_string (self->priv->decryption_parameters, "rtp-cipher"); if (!srtp_cipher) srtp_cipher = gst_structure_get_string (self->priv->decryption_parameters, "cipher"); if (!srtp_cipher) srtp_cipher = "null"; srtcp_cipher = gst_structure_get_string (self->priv->decryption_parameters, "rtcp-cipher"); if (!srtcp_cipher) srtcp_cipher = gst_structure_get_string (self->priv->decryption_parameters, "cipher"); if (!srtcp_cipher) srtcp_cipher = "null"; srtp_auth = gst_structure_get_string (self->priv->decryption_parameters, "rtp-auth"); if (!srtp_auth) srtp_auth = gst_structure_get_string (self->priv->decryption_parameters, "auth"); if (!srtp_auth) srtp_auth = "null"; srtcp_auth = gst_structure_get_string (self->priv->decryption_parameters, "rtcp-auth"); if (!srtcp_auth) srtcp_auth = gst_structure_get_string (self->priv->decryption_parameters, "auth"); if (!srtcp_auth) srtcp_auth = "null"; v = gst_structure_get_value (self->priv->decryption_parameters, "key"); key = gst_value_get_buffer (v); return gst_caps_new_simple ("application/x-srtp", "srtp-key", GST_TYPE_BUFFER, key, "srtp-cipher", G_TYPE_STRING, srtp_cipher, "srtcp-cipher", G_TYPE_STRING, srtcp_cipher, "srtp-auth", G_TYPE_STRING, srtp_auth, "srtcp-auth", G_TYPE_STRING, srtcp_auth, NULL); } gboolean fs_rtp_stream_requires_crypto_locked (FsRtpStream *self) { return self->priv->encrypted; }