summaryrefslogtreecommitdiff
path: root/gst/fsrawconference
diff options
context:
space:
mode:
authorOlivier CrĂȘte <olivier.crete@collabora.co.uk>2011-02-08 16:49:45 +0000
committerOlivier CrĂȘte <olivier.crete@collabora.co.uk>2011-03-09 17:53:13 -0500
commit1560b16f912f0ef775274a4962e3870b571bbe86 (patch)
tree6f028c78f58aa0a83bf01008ca3e298e05b4c3b7 /gst/fsrawconference
parent6aee195dba0efd3533aa9d3fedae32cf38caec42 (diff)
downloadfarstream-1560b16f912f0ef775274a4962e3870b571bbe86.tar.gz
raw: Move all Gst processing to the session
Diffstat (limited to 'gst/fsrawconference')
-rw-r--r--gst/fsrawconference/fs-raw-session.c474
-rw-r--r--gst/fsrawconference/fs-raw-stream.c295
-rw-r--r--gst/fsrawconference/fs-raw-stream.h1
3 files changed, 351 insertions, 419 deletions
diff --git a/gst/fsrawconference/fs-raw-session.c b/gst/fsrawconference/fs-raw-session.c
index faf4e1fd..8dc7e61c 100644
--- a/gst/fsrawconference/fs-raw-session.c
+++ b/gst/fsrawconference/fs-raw-session.c
@@ -100,6 +100,12 @@ struct _FsRawSessionPrivate
GstElement *send_valve;
+ GstElement *recv_capsfilter;
+ GstElement *recv_valve;
+ gulong transmitter_recv_probe_id;
+ GstPad *transmitter_src_pad;
+ GstPad *src_ghost_pad;
+
FsTransmitter *transmitter;
guint tos; /* Protected by conf lock */
@@ -113,31 +119,31 @@ struct _FsRawSessionPrivate
G_DEFINE_TYPE (FsRawSession, fs_raw_session, FS_TYPE_SESSION);
-#define FS_RAW_SESSION_GET_PRIVATE(o) \
- (G_TYPE_INSTANCE_GET_PRIVATE ((o), FS_TYPE_RAW_SESSION, FsRawSessionPrivate))
+#define FS_RAW_SESSION_GET_PRIVATE(o) \
+ (G_TYPE_INSTANCE_GET_PRIVATE ((o), FS_TYPE_RAW_SESSION, FsRawSessionPrivate))
#ifdef DEBUG_MUTEXES
-#define FS_RAW_SESSION_LOCK(session) \
- do { \
- g_mutex_lock (FS_RAW_SESSION (session)->priv->mutex); \
- g_assert (FS_RAW_SESSION (session)->priv->count == 0); \
- FS_RAW_SESSION (session)->priv->count++; \
+#define FS_RAW_SESSION_LOCK(session) \
+ do { \
+ g_mutex_lock (FS_RAW_SESSION (session)->priv->mutex); \
+ g_assert (FS_RAW_SESSION (session)->priv->count == 0); \
+ FS_RAW_SESSION (session)->priv->count++; \
} while (0);
-#define FS_RAW_SESSION_UNLOCK(session) \
- do { \
- g_assert (FS_RAW_SESSION (session)->priv->count == 1); \
- FS_RAW_SESSION (session)->priv->count--; \
- g_mutex_unlock (FS_RAW_SESSION (session)->priv->mutex); \
+#define FS_RAW_SESSION_UNLOCK(session) \
+ do { \
+ g_assert (FS_RAW_SESSION (session)->priv->count == 1); \
+ FS_RAW_SESSION (session)->priv->count--; \
+ g_mutex_unlock (FS_RAW_SESSION (session)->priv->mutex); \
} while (0);
-#define FS_RAW_SESSION_GET_LOCK(session) \
+#define FS_RAW_SESSION_GET_LOCK(session) \
(FS_RAW_SESSION (session)->priv->mutex)
#else
-#define FS_RAW_SESSION_LOCK(session) \
+#define FS_RAW_SESSION_LOCK(session) \
g_mutex_lock ((session)->priv->mutex)
-#define FS_RAW_SESSION_UNLOCK(session) \
+#define FS_RAW_SESSION_UNLOCK(session) \
g_mutex_unlock ((session)->priv->mutex)
-#define FS_RAW_SESSION_GET_LOCK(session) \
+#define FS_RAW_SESSION_GET_LOCK(session) \
((session)->priv->mutex)
#endif
@@ -196,7 +202,7 @@ fs_raw_session_class_init (FsRawSessionClass *klass)
session_class->new_stream = fs_raw_session_new_stream;
session_class->list_transmitters = fs_raw_session_list_transmitters;
session_class->get_stream_transmitter_type =
- fs_raw_session_get_stream_transmitter_type;
+ fs_raw_session_get_stream_transmitter_type;
g_object_class_override_property (gobject_class,
PROP_MEDIA_TYPE, "media-type");
@@ -206,17 +212,17 @@ fs_raw_session_class_init (FsRawSessionClass *klass)
PROP_SINK_PAD, "sink-pad");
g_object_class_override_property (gobject_class,
- PROP_CODEC_PREFERENCES, "codec-preferences");
+ PROP_CODEC_PREFERENCES, "codec-preferences");
g_object_class_override_property (gobject_class,
- PROP_CODECS, "codecs");
+ PROP_CODECS, "codecs");
g_object_class_override_property (gobject_class,
- PROP_CODECS_WITHOUT_CONFIG, "codecs-without-config");
+ PROP_CODECS_WITHOUT_CONFIG, "codecs-without-config");
g_object_class_override_property (gobject_class,
- PROP_CURRENT_SEND_CODEC, "current-send-codec");
+ PROP_CURRENT_SEND_CODEC, "current-send-codec");
g_object_class_override_property (gobject_class,
- PROP_CODECS_READY, "codecs-ready");
+ PROP_CODECS_READY, "codecs-ready");
g_object_class_override_property (gobject_class,
- PROP_TOS, "tos");
+ PROP_TOS, "tos");
g_object_class_install_property (gobject_class,
PROP_CONFERENCE,
@@ -396,7 +402,7 @@ fs_raw_session_dispose (GObject *object)
gst_object_unref (conference);
- out:
+out:
G_OBJECT_CLASS (fs_raw_session_parent_class)->dispose (object);
}
@@ -419,9 +425,9 @@ fs_raw_session_finalize (GObject *object)
static void
fs_raw_session_get_property (GObject *object,
- guint prop_id,
- GValue *value,
- GParamSpec *pspec)
+ guint prop_id,
+ GValue *value,
+ GParamSpec *pspec)
{
FsRawSession *self = FS_RAW_SESSION (object);
FsRawConference *conference = fs_raw_session_get_conference (self, NULL);
@@ -472,9 +478,9 @@ fs_raw_session_get_property (GObject *object,
static void
fs_raw_session_set_property (GObject *object,
- guint prop_id,
- const GValue *value,
- GParamSpec *pspec)
+ guint prop_id,
+ const GValue *value,
+ GParamSpec *pspec)
{
FsRawSession *self = FS_RAW_SESSION (object);
FsRawConference *conference = fs_raw_session_get_conference (self, NULL);
@@ -523,7 +529,7 @@ fs_raw_session_constructed (GObject *object)
if (self->id == 0)
{
g_error ("You can not instantiate this element directly, you MUST"
- " call fs_raw_session_new ()");
+ " call fs_raw_session_new ()");
return;
}
@@ -809,7 +815,7 @@ _stream_remote_codecs_changed (FsRawStream *stream, GParamSpec *pspec,
if (g_list_length (codecs) == 2)
codec = codecs->next->data;
- else if (codecs && codecs->data)
+ else
codec = codecs->data;
GST_OBJECT_LOCK (conference);
@@ -871,6 +877,21 @@ _stream_remote_codecs_changed (FsRawStream *stream, GParamSpec *pspec,
self->priv->send_codec = fs_codec_copy (codec);
}
+ codec = codecs->data;
+
+ if (self->priv->recv_capsfilter)
+ {
+ GstElement *capsfilter = gst_object_ref (self->priv->recv_capsfilter);
+ GstCaps *recv_caps;
+
+ recv_caps = fs_raw_codec_to_gst_caps (codec);
+ GST_OBJECT_UNLOCK (conference);
+ g_object_set (capsfilter, "caps", recv_caps, NULL);
+ gst_object_unref (capsfilter);
+ GST_OBJECT_LOCK (conference);
+ gst_caps_unref (recv_caps);
+ }
+
GST_OBJECT_UNLOCK (conference);
fs_raw_session_update_direction (self, direction);
@@ -926,16 +947,28 @@ fs_raw_session_remove_stream (FsRawSession *self,
if (self->priv->stream == (FsRawStream *) stream)
{
self->priv->stream = NULL;
- transmitter = self->priv->transmitter;
- self->priv->transmitter = NULL;
}
+ transmitter = self->priv->transmitter;
+ self->priv->transmitter = NULL;
GST_OBJECT_UNLOCK (conference);
+ if (!transmitter)
+ return;
g_object_get (transmitter,
"gst-src", &src,
"gst-sink", &sink,
NULL);
+
+ if (self->priv->transmitter_recv_probe_id)
+ {
+ if (self->priv->transmitter_src_pad)
+ gst_pad_remove_data_probe (self->priv->transmitter_src_pad,
+ self->priv->transmitter_recv_probe_id);
+ self->priv->transmitter_recv_probe_id = 0;
+ }
+
+
gst_element_set_locked_state (src, TRUE);
gst_element_set_state (src, GST_STATE_NULL);
gst_bin_remove (GST_BIN (conference), src);
@@ -947,6 +980,40 @@ fs_raw_session_remove_stream (FsRawSession *self,
gst_bin_remove (GST_BIN (conference), sink);
}
+ if (self->priv->transmitter_src_pad)
+ {
+ gst_object_unref (self->priv->transmitter_src_pad);
+ self->priv->transmitter_src_pad = NULL;
+ }
+
+ if (self->priv->recv_valve)
+ {
+ gst_element_set_locked_state (self->priv->recv_valve, TRUE);
+ gst_bin_remove (GST_BIN (conference), self->priv->recv_valve);
+ gst_element_set_state (self->priv->recv_valve, GST_STATE_NULL);
+ gst_object_unref (self->priv->recv_valve);
+ self->priv->recv_valve = NULL;
+ }
+
+ if (self->priv->recv_capsfilter)
+ {
+ gst_element_set_locked_state (self->priv->recv_capsfilter, TRUE);
+ gst_bin_remove (GST_BIN (conference), self->priv->recv_capsfilter);
+ gst_element_set_state (self->priv->recv_capsfilter, GST_STATE_NULL);
+ gst_object_unref (self->priv->recv_capsfilter);
+ self->priv->recv_capsfilter = NULL;
+ }
+
+ if (self->priv->src_ghost_pad)
+ {
+ gst_element_remove_pad (GST_ELEMENT (conference),
+ self->priv->src_ghost_pad);
+ gst_pad_set_active (self->priv->src_ghost_pad, FALSE);
+ gst_object_unref (self->priv->src_ghost_pad);
+ self->priv->src_ghost_pad = NULL;
+ }
+
+
gst_object_unref (src);
gst_object_unref (sink);
g_object_unref (transmitter);
@@ -955,8 +1022,8 @@ fs_raw_session_remove_stream (FsRawSession *self,
static gboolean
_add_transmitter_sink (FsRawSession *self,
- GstElement *transmitter_sink,
- GError **error)
+ GstElement *transmitter_sink,
+ GError **error)
{
if (!transmitter_sink)
{
@@ -1042,6 +1109,18 @@ fs_raw_session_update_direction (FsRawSession *self,
GST_OBJECT_LOCK (conference);
self->priv->transmitter_linked = TRUE;
}
+
+ if (self->priv->recv_valve)
+ {
+ GstElement *valve = g_object_ref (self->priv->recv_valve);
+
+ GST_OBJECT_UNLOCK (conference);
+ g_object_set (valve,
+ "drop", ! (direction & FS_DIRECTION_RECV), NULL);
+ g_object_unref (valve);
+ GST_OBJECT_LOCK (conference);
+ }
+
GST_OBJECT_UNLOCK (conference);
if (direction & FS_DIRECTION_SEND)
@@ -1069,22 +1148,17 @@ out:
*/
static FsStream *
fs_raw_session_new_stream (FsSession *session,
- FsParticipant *participant,
- FsStreamDirection direction,
- const gchar *transmitter,
- guint n_parameters,
- GParameter *parameters,
- GError **error)
+ FsParticipant *participant,
+ FsStreamDirection direction,
+ const gchar *transmitter,
+ guint n_parameters,
+ GParameter *parameters,
+ GError **error)
{
FsRawSession *self = FS_RAW_SESSION (session);
- FsRawParticipant *rawparticipant = NULL;
FsStream *new_stream = NULL;
FsRawConference *conference;
- FsTransmitter *fstransmitter = NULL;
FsStreamTransmitter *stream_transmitter = NULL;
- GstElement *transmitter_sink = NULL;
- GstElement *transmitter_src = NULL;
- GstPad *transmitter_pad;
if (!FS_IS_RAW_PARTICIPANT (participant))
{
@@ -1102,74 +1176,21 @@ fs_raw_session_new_stream (FsSession *session,
goto already_have_stream;
GST_OBJECT_UNLOCK (conference);
- fstransmitter = fs_transmitter_new (transmitter, 1, 0, error);
-
- if (!fstransmitter)
- {
- goto error;
- }
-
- stream_transmitter = fs_transmitter_new_stream_transmitter (fstransmitter,
- participant, n_parameters, parameters, error);
-
+ stream_transmitter = _stream_get_stream_transmitter (NULL,
+ transmitter, participant, parameters, n_parameters, error, self);
if (!stream_transmitter)
- {
- goto error;
- }
-
- g_object_get (fstransmitter, "gst-src", &transmitter_src, NULL);
-
- if (!transmitter_src)
- {
- g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
- "Unable to get the source element from the FsTransmitter");
- goto error;
- }
-
- if (!gst_bin_add (GST_BIN (self->priv->conference), transmitter_src))
- {
- g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
- "Could not add the transmitter's source element"
- " for session %d to the conference bin", self->id);
- gst_object_unref (transmitter_src);
- transmitter_src = NULL;
goto error;
- }
-
- transmitter_pad = gst_element_get_static_pad (transmitter_src, "src1");
-
- if (!transmitter_pad)
- {
- g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
- "Unable to get the srcpad from the FsTransmitter's gst-src");
- goto error;
- }
- rawparticipant = FS_RAW_PARTICIPANT (participant);
-
- new_stream = FS_STREAM_CAST (fs_raw_stream_new (self, rawparticipant,
- direction, conference, stream_transmitter, transmitter_pad,
+ new_stream = FS_STREAM_CAST (fs_raw_stream_new (self,
+ FS_RAW_PARTICIPANT (participant),
+ direction, conference, stream_transmitter,
_stream_get_stream_transmitter, self, error));
- /* stream_new takes the reference to this */
- stream_transmitter = NULL;
-
- /* stream_new doesn't take the reference to this. Perhaps it should */
- g_object_unref (transmitter_pad);
-
if (new_stream)
{
g_signal_connect_object (new_stream, "notify::remote-codecs",
G_CALLBACK (_stream_remote_codecs_changed), self, 0);
- if (!gst_element_sync_state_with_parent (transmitter_src))
- {
- g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
- "Could not sync the transmitter's source element"
- " with its parent for session %d", self->id);
- goto error;
- }
-
GST_OBJECT_LOCK (conference);
if (self->priv->stream)
{
@@ -1177,32 +1198,9 @@ fs_raw_session_new_stream (FsSession *session,
}
self->priv->stream = (FsRawStream *) new_stream;
- if (self->priv->tos)
- g_object_set (fstransmitter, "tos", self->priv->tos, NULL);
-
- self->priv->transmitter = fstransmitter;
-
GST_OBJECT_UNLOCK (conference);
-
- goto done;
}
-error:
- if (transmitter_src)
- gst_bin_remove (GST_BIN (conference), transmitter_src);
-
- if (transmitter_sink)
- gst_bin_remove (GST_BIN (conference), transmitter_sink);
-
- if (stream_transmitter)
- {
- fs_stream_transmitter_stop (stream_transmitter);
- g_object_unref (stream_transmitter);
- }
-
- if (fstransmitter)
- g_object_unref (fstransmitter);
-
done:
gst_object_unref (conference);
return new_stream;
@@ -1212,6 +1210,16 @@ already_have_stream:
g_set_error (error, FS_ERROR, FS_ERROR_ALREADY_EXISTS,
"There already is a stream in this session");
goto error;
+
+error:
+ fs_raw_session_remove_stream (self, NULL);
+
+ if (stream_transmitter)
+ {
+ fs_stream_transmitter_stop (stream_transmitter);
+ g_object_unref (stream_transmitter);
+ }
+ goto done;
}
FsRawSession *
@@ -1265,6 +1273,94 @@ fs_raw_session_get_stream_transmitter_type (FsSession *session,
return transmitter_type;
}
+static gboolean
+_transmitter_pad_have_data_callback (GstPad *pad, GstBuffer *buffer,
+ gpointer user_data)
+{
+ FsRawSession *self = FS_RAW_SESSION (user_data);
+ FsRawConference *conference = fs_raw_session_get_conference (self, NULL);
+ FsRawStream *stream;
+ GstElement *recv_capsfilter = NULL;
+ GstPad *ghostpad;
+ GstPad *srcpad;
+ gchar *padname;
+ FsCodec *codec;
+
+ if (!conference)
+ return FALSE;
+
+ GST_OBJECT_LOCK (conference);
+ if (!self->priv->codecs ||
+ !self->priv->recv_capsfilter ||
+ !self->priv->transmitter_recv_probe_id)
+ {
+ GST_OBJECT_UNLOCK (conference);
+ gst_object_unref (conference);
+ return FALSE;
+ }
+
+ recv_capsfilter = gst_object_ref (self->priv->recv_capsfilter);
+ gst_pad_remove_data_probe (pad, self->priv->transmitter_recv_probe_id);
+ self->priv->transmitter_recv_probe_id = 0;
+ codec = fs_codec_copy (self->priv->codecs->data);
+ GST_OBJECT_UNLOCK (conference);
+
+ srcpad = gst_element_get_static_pad (recv_capsfilter, "src");
+
+ if (!srcpad)
+ {
+ GST_WARNING ("Unable to get recv_capsfilter (%p) srcpad", recv_capsfilter);
+ goto error;
+ }
+
+ padname = g_strdup_printf ("src_%d", self->id);
+ ghostpad = gst_ghost_pad_new_from_template (padname, srcpad,
+ gst_element_class_get_pad_template (
+ GST_ELEMENT_GET_CLASS (self->priv->conference),
+ "src_%d"));
+ g_free (padname);
+ gst_object_unref (srcpad);
+
+ gst_object_ref (ghostpad);
+
+ if (!gst_pad_set_active (ghostpad, TRUE))
+ GST_WARNING ("Unable to set ghost pad active");
+
+
+ if (!gst_element_add_pad (GST_ELEMENT (self->priv->conference), ghostpad))
+ {
+ GST_WARNING ("Unable to add ghost pad to conference");
+
+ gst_object_unref (ghostpad);
+ gst_object_unref (ghostpad);
+ goto error;
+ }
+
+ GST_OBJECT_LOCK (conference);
+ self->priv->src_ghost_pad = ghostpad;
+ stream = g_object_ref (self->priv->stream);
+ GST_OBJECT_UNLOCK (conference);
+
+ fs_stream_emit_src_pad_added (FS_STREAM (stream), ghostpad, codec);
+
+ fs_codec_destroy (codec);
+ g_object_unref (stream);
+ gst_object_unref (conference);
+ gst_object_unref (recv_capsfilter);
+
+ return TRUE;
+
+error:
+ fs_codec_destroy (codec);
+ gst_object_unref (conference);
+ gst_object_unref (recv_capsfilter);
+
+ return FALSE;
+}
+
+
+
+
static FsStreamTransmitter *_stream_get_stream_transmitter (FsRawStream *stream,
const gchar *transmitter_name,
FsParticipant *participant,
@@ -1276,10 +1372,12 @@ static FsStreamTransmitter *_stream_get_stream_transmitter (FsRawStream *stream,
FsRawSession *self = user_data;
FsTransmitter *fstransmitter = NULL;
FsStreamTransmitter *stream_transmitter = NULL;
- GstElement *transmitter_sink = NULL;
GstElement *transmitter_src = NULL;
- GstPad *transmitter_pad;
FsRawConference *conference;
+ gchar *tmp;
+ GstElement *capsfilter;
+ GstElement *valve;
+ GstPad *transmitter_src_pad;
conference = fs_raw_session_get_conference (self, error);
if (!conference)
@@ -1301,7 +1399,7 @@ static FsStreamTransmitter *_stream_get_stream_transmitter (FsRawStream *stream,
g_object_get (fstransmitter, "gst-src", &transmitter_src, NULL);
g_assert (transmitter_src);
- if (!gst_bin_add (GST_BIN (self->priv->conference), transmitter_src))
+ if (!gst_bin_add (GST_BIN (conference), transmitter_src))
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not add the transmitter's source element"
@@ -1311,35 +1409,137 @@ static FsStreamTransmitter *_stream_get_stream_transmitter (FsRawStream *stream,
goto error;
}
- transmitter_pad = gst_element_get_static_pad (transmitter_src, "src1");
+ tmp = g_strdup_printf ("recv_capsfilter_%d", self->id);
+ capsfilter = gst_element_factory_make ("capsfilter", tmp);
+ g_free (tmp);
+
+ if (!capsfilter)
+ {
+ g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
+ "Could not create a capsfilter element for session %d", self->id);
+ g_object_unref (capsfilter);
+ goto error;
+ }
+
+ gst_object_ref (capsfilter);
+
+ if (!gst_bin_add (GST_BIN (conference), capsfilter))
+ {
+ g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
+ "Could not add the capsfilter element for session %d", self->id);
+ gst_object_unref (capsfilter);
+ gst_object_unref (capsfilter);
+ goto error;
+ }
+ self->priv->recv_capsfilter = capsfilter;
+
+ if (gst_element_set_state (self->priv->recv_capsfilter, GST_STATE_PLAYING) ==
+ GST_STATE_CHANGE_FAILURE)
+ {
+ g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
+ "Could not set the capsfilter element for session %d", self->id);
+ goto error;
+ }
+
+ tmp = g_strdup_printf ("recv_valve_%d", self->id);
+ valve = gst_element_factory_make ("valve", tmp);
+ g_free (tmp);
+
+ if (!valve) {
+ g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
+ "Could not create a valve element for session %d", self->id);
+ goto error;
+ }
+
+ gst_object_ref (valve);
- if (!transmitter_pad)
+ if (!gst_bin_add (GST_BIN (conference), valve))
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
- "Unable to get the srcpad from the FsTransmitter's gst-src");
+ "Could not add the valve element for session %d"
+ " to the conference bin", self->id);
+ gst_object_unref (valve);
+ goto error;
+ }
+
+ g_object_set (valve, "drop", TRUE, NULL);
+
+ self->priv->recv_valve = valve;
+
+ if (gst_element_set_state (self->priv->recv_valve, GST_STATE_PLAYING) ==
+ GST_STATE_CHANGE_FAILURE) {
+ g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
+ "Could not set the valve element for session %d to the playing state",
+ self->id);
goto error;
}
+ if (!gst_element_link (self->priv->recv_valve, self->priv->recv_capsfilter))
+ {
+ g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
+ "Could not link the recv valve and the capsfilter");
+ goto error;
+ }
+
+ if (!gst_element_link_pads (transmitter_src, "src1",
+ valve, "sink"))
+ {
+ g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
+ "Could not link the recv_valve to the codec bin");
+ goto error;
+ }
+
+ transmitter_src_pad = gst_element_get_static_pad (transmitter_src, "src1");
+
GST_OBJECT_LOCK (conference);
self->priv->transmitter = fstransmitter;
+ self->priv->transmitter_src_pad = transmitter_src_pad;
GST_OBJECT_UNLOCK (conference);
+ self->priv->transmitter_recv_probe_id = gst_pad_add_data_probe (
+ self->priv->transmitter_src_pad,
+ G_CALLBACK (_transmitter_pad_have_data_callback), self);
+
+ if (!gst_element_sync_state_with_parent (transmitter_src))
+ {
+ g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
+ "Could not sync the transmitter's source element"
+ " with its parent for session %d", self->id);
+ goto error;
+ }
+
+ gst_object_unref (transmitter_src);
gst_object_unref (conference);
+
return stream_transmitter;
error:
+ if (self->priv->recv_valve)
+ {
+ gst_bin_remove (GST_BIN (conference), self->priv->recv_valve);
+ self->priv->recv_valve = NULL;
+ }
+
+ if (self->priv->recv_capsfilter)
+ {
+ gst_bin_remove (GST_BIN (conference), self->priv->recv_capsfilter);
+ self->priv->recv_capsfilter = NULL;
+ }
+
if (transmitter_src)
gst_bin_remove (GST_BIN (conference), transmitter_src);
- if (transmitter_sink)
- gst_bin_remove (GST_BIN (conference), transmitter_sink);
-
if (stream_transmitter)
{
fs_stream_transmitter_stop (stream_transmitter);
g_object_unref (stream_transmitter);
}
+ GST_OBJECT_LOCK (conference);
+ fstransmitter = self->priv->transmitter;
+ self->priv->transmitter = NULL;
+ GST_OBJECT_UNLOCK (conference);
+
if (fstransmitter)
g_object_unref (fstransmitter);
diff --git a/gst/fsrawconference/fs-raw-stream.c b/gst/fsrawconference/fs-raw-stream.c
index 3e49d362..3f8b8751 100644
--- a/gst/fsrawconference/fs-raw-stream.c
+++ b/gst/fsrawconference/fs-raw-stream.c
@@ -76,7 +76,6 @@ enum
PROP_CONFERENCE,
PROP_STREAM_TRANSMITTER,
PROP_REMOTE_CODECS,
- PROP_TRANSMITTER_PAD,
};
@@ -88,15 +87,9 @@ struct _FsRawStreamPrivate
FsRawParticipant *participant;
FsStreamDirection direction;
FsStreamTransmitter *stream_transmitter;
- GstElement *recv_capsfilter;
- GstElement *recv_valve;
- GstPad *transmitter_pad;
- GstPad *src_pad;
GList *remote_codecs;
- gulong blocking_id;
-
GError *construction_error;
gulong local_candidates_prepared_handler_id;
@@ -233,14 +226,6 @@ fs_raw_stream_class_init (FsRawStreamClass *klass)
FS_TYPE_RAW_CONFERENCE,
G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class,
- PROP_TRANSMITTER_PAD,
- g_param_spec_object ("transmitter-pad",
- "The GstPad this stream is linked to",
- "This is the pad on which this stream will attach itself",
- GST_TYPE_PAD,
- G_PARAM_CONSTRUCT_ONLY | G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS));
-
/**
* FsRawStream:stream-transmitter:
*
@@ -310,46 +295,6 @@ fs_raw_stream_real_dispose (FsRawStream *self)
if (!conference)
return;
- if (self->priv->src_pad)
- {
- gst_element_remove_pad (GST_ELEMENT (conference), self->priv->src_pad);
- gst_pad_set_active (self->priv->src_pad, FALSE);
- gst_object_unref (self->priv->src_pad);
- self->priv->src_pad = NULL;
- }
-
-
- if (self->priv->recv_valve)
- {
- gst_element_set_locked_state (self->priv->recv_valve, TRUE);
- gst_bin_remove (GST_BIN (conference), self->priv->recv_valve);
- gst_element_set_state (self->priv->recv_valve, GST_STATE_NULL);
- gst_object_unref (self->priv->recv_valve);
- self->priv->recv_valve = NULL;
- }
-
- if (self->priv->recv_capsfilter)
- {
- gst_element_set_locked_state (self->priv->recv_capsfilter, TRUE);
- gst_bin_remove (GST_BIN (conference), self->priv->recv_capsfilter);
- gst_element_set_state (self->priv->recv_capsfilter, GST_STATE_NULL);
- gst_object_unref (self->priv->recv_capsfilter);
- self->priv->recv_capsfilter = NULL;
- }
-
- if (self->priv->blocking_id)
- {
- if (self->priv->transmitter_pad)
- gst_pad_remove_data_probe (self->priv->transmitter_pad,
- self->priv->blocking_id);
- self->priv->blocking_id = 0;
- }
-
- if (self->priv->transmitter_pad)
- {
- gst_object_unref (self->priv->transmitter_pad);
- self->priv->transmitter_pad = NULL;
- }
st = self->priv->stream_transmitter;
self->priv->stream_transmitter = NULL;
@@ -511,40 +456,25 @@ fs_raw_stream_set_property (GObject *object,
case PROP_DIRECTION:
if (g_value_get_flags (value) != self->priv->direction)
{
- GstElement *recv_valve = NULL;
- FsStreamTransmitter *st = NULL;
-
- if (!conference ||
- !self->priv->recv_valve ||
- !self->priv->session)
- {
- self->priv->direction = g_value_get_flags (value);
- break;
- }
+ FsStreamDirection direction = g_value_get_flags (value);
+ FsStreamTransmitter *st;
- if (self->priv->recv_valve)
- recv_valve = gst_object_ref (self->priv->recv_valve);
- if (self->priv->stream_transmitter)
- st = g_object_ref (self->priv->stream_transmitter);
-
- self->priv->direction = g_value_get_flags (value);
-
- GST_OBJECT_UNLOCK (conference);
- fs_raw_session_update_direction (self->priv->session,
- self->priv->direction);
-
- if (recv_valve)
- g_object_set (recv_valve, "drop",
- (self->priv->direction & FS_DIRECTION_RECV) ? FALSE : TRUE, NULL);
+ self->priv->direction = direction;
+ st = self->priv->stream_transmitter;
if (st)
- g_object_set (st, "sending",
- (self->priv->direction & FS_DIRECTION_SEND) ? TRUE : FALSE, NULL);
- GST_OBJECT_LOCK (conference);
+ g_object_ref (st);
- if (recv_valve)
- gst_object_unref (recv_valve);
+ if (conference)
+ GST_OBJECT_UNLOCK (conference);
if (st)
+ {
+ g_object_set (st, "sending",
+ (direction & FS_DIRECTION_SEND) ? TRUE : FALSE, NULL);
g_object_unref (st);
+ }
+ fs_raw_session_update_direction (self->priv->session, direction);
+ if (conference)
+ GST_OBJECT_LOCK (conference);
}
break;
case PROP_CONFERENCE:
@@ -553,9 +483,6 @@ fs_raw_stream_set_property (GObject *object,
case PROP_STREAM_TRANSMITTER:
self->priv->stream_transmitter = g_value_get_object (value);
break;
- case PROP_TRANSMITTER_PAD:
- self->priv->transmitter_pad = GST_PAD (g_value_dup_object (value));
- break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@@ -568,95 +495,10 @@ fs_raw_stream_set_property (GObject *object,
}
}
-static gboolean
-_transmitter_pad_have_data_callback (GstPad *pad, GstMiniObject *miniobj,
- gpointer user_data)
-{
- FsRawStream *self = FS_RAW_STREAM_CAST (user_data);
- FsRawConference *conference = fs_raw_stream_get_conference (self, NULL);
- GstElement *recv_capsfilter = NULL;
- GstPad *ghostpad;
- GstPad *srcpad;
- gchar *padname;
- FsCodec *codec;
-
- if (!conference)
- return FALSE;
-
- GST_OBJECT_LOCK (conference);
- if (!self->priv->remote_codecs ||
- !self->priv->recv_capsfilter ||
- !self->priv->blocking_id)
- {
- GST_OBJECT_UNLOCK (conference);
- gst_object_unref (conference);
- return FALSE;
- }
-
- recv_capsfilter = gst_object_ref (self->priv->recv_capsfilter);
- gst_pad_remove_data_probe (pad, self->priv->blocking_id);
- self->priv->blocking_id = 0;
- codec = fs_codec_copy (self->priv->remote_codecs->data);
- GST_OBJECT_UNLOCK (conference);
-
- srcpad = gst_element_get_static_pad (recv_capsfilter, "src");
-
- if (!srcpad)
- {
- GST_WARNING ("Unable to get recv_capsfilter (%p) srcpad", recv_capsfilter);
- goto error;
- }
-
- padname = g_strdup_printf ("src_%d", self->priv->session->id);
- ghostpad = gst_ghost_pad_new_from_template (padname, srcpad,
- gst_element_class_get_pad_template (
- GST_ELEMENT_GET_CLASS (self->priv->conference),
- "src_%d"));
- g_free (padname);
- gst_object_unref (srcpad);
-
- gst_object_ref (ghostpad);
-
- if (!gst_pad_set_active (ghostpad, TRUE))
- GST_WARNING ("Unable to set ghost pad active");
-
-
- if (!gst_element_add_pad (GST_ELEMENT (self->priv->conference), ghostpad))
- {
- GST_WARNING ("Unable to add ghost pad to conference");
-
- gst_object_unref (ghostpad);
- gst_object_unref (ghostpad);
- goto error;
- }
-
- GST_OBJECT_LOCK (conference);
- self->priv->src_pad = ghostpad;
- GST_OBJECT_UNLOCK (conference);
-
- fs_stream_emit_src_pad_added (FS_STREAM (self), ghostpad, codec);
-
- fs_codec_destroy (codec);
- gst_object_unref (conference);
- gst_object_unref (recv_capsfilter);
-
- return TRUE;
-
-error:
- fs_codec_destroy (codec);
- gst_object_unref (conference);
- gst_object_unref (recv_capsfilter);
-
- return FALSE;
-}
-
static void
fs_raw_stream_constructed (GObject *object)
{
FsRawStream *self = FS_RAW_STREAM_CAST (object);
- GstPad *valve_sink_pad = NULL;
- GstPadLinkReturn linkret;
- gchar *tmp;
if (!self->priv->conference) {
self->priv->construction_error = g_error_new (FS_ERROR,
@@ -664,100 +506,6 @@ fs_raw_stream_constructed (GObject *object)
return;
}
- tmp = g_strdup_printf ("recv_capsfilter_%d", self->priv->session->id);
- self->priv->recv_capsfilter = gst_element_factory_make ("capsfilter", tmp);
- g_free (tmp);
-
- if (!self->priv->recv_capsfilter) {
- self->priv->construction_error = g_error_new (FS_ERROR,
- FS_ERROR_CONSTRUCTION, "Could not create a capsfilter element for"
- " session %d", self->priv->session->id);
- return;
- }
-
- gst_object_ref_sink (self->priv->recv_capsfilter);
-
- if (!gst_bin_add (GST_BIN (self->priv->conference),
- self->priv->recv_capsfilter)) {
- self->priv->construction_error = g_error_new (FS_ERROR,
- FS_ERROR_CONSTRUCTION, "Could not add the capsfilter element for"
- " session %d", self->priv->session->id);
- gst_object_unref (self->priv->recv_capsfilter);
- return;
- }
-
- if (gst_element_set_state (self->priv->recv_capsfilter, GST_STATE_PLAYING) ==
- GST_STATE_CHANGE_FAILURE) {
- self->priv->construction_error = g_error_new (FS_ERROR,
- FS_ERROR_CONSTRUCTION, "Could not set the capsfilter element for"
- " session %d", self->priv->session->id);
- return;
- }
-
-
- tmp = g_strdup_printf ("recv_valve_%d", self->priv->session->id);
- self->priv->recv_valve = gst_element_factory_make ("valve", tmp);
- g_free (tmp);
-
- if (!self->priv->recv_valve) {
- self->priv->construction_error = g_error_new (FS_ERROR,
- FS_ERROR_CONSTRUCTION, "Could not create a valve element for"
- " session %d", self->priv->session->id);
- return;
- }
-
- gst_object_ref_sink (self->priv->recv_valve);
-
- if (!gst_bin_add (GST_BIN (self->priv->conference), self->priv->recv_valve))
- {
- self->priv->construction_error = g_error_new (FS_ERROR,
- FS_ERROR_CONSTRUCTION, "Could not add the valve element for session"
- " %d to the conference bin", self->priv->session->id);
- gst_object_unref (self->priv->recv_valve);
- return;
- }
-
- if (gst_element_set_state (self->priv->recv_valve, GST_STATE_PLAYING) ==
- GST_STATE_CHANGE_FAILURE) {
- self->priv->construction_error = g_error_new (FS_ERROR,
- FS_ERROR_CONSTRUCTION, "Could not set the valve element for session"
- " %d to the playing state", self->priv->session->id);
- return;
- }
-
- if (!gst_element_link (self->priv->recv_valve, self->priv->recv_capsfilter))
- {
- self->priv->construction_error = g_error_new (FS_ERROR,
- FS_ERROR_CONSTRUCTION, "Could not link the recv valve"
- " and the capsfilter");
- return;
- }
-
- valve_sink_pad = gst_element_get_static_pad (self->priv->recv_valve, "sink");
- if (!valve_sink_pad)
- {
- self->priv->construction_error = g_error_new (FS_ERROR,
- FS_ERROR_CONSTRUCTION,
- "Could not get the valve's sink pad");
- return;
- }
-
- linkret = gst_pad_link (self->priv->transmitter_pad, valve_sink_pad);
-
- gst_object_unref (valve_sink_pad);
-
- if (GST_PAD_LINK_FAILED (linkret))
- {
- self->priv->construction_error = g_error_new (FS_ERROR,
- FS_ERROR_CONSTRUCTION,
- "Could not link the recv_valve to the codec bin (%d)", linkret);
- return;
- }
-
- self->priv->blocking_id = gst_pad_add_data_probe (
- self->priv->transmitter_pad,
- G_CALLBACK (_transmitter_pad_have_data_callback), self);
-
if (!self->priv->stream_transmitter) {
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION, "The Stream Transmitter has not been set");
@@ -804,10 +552,6 @@ fs_raw_stream_constructed (GObject *object)
return;
}
- if (self->priv->recv_valve)
- g_object_set (self->priv->recv_valve, "drop",
- (self->priv->direction & FS_DIRECTION_RECV) ? FALSE : TRUE, NULL);
-
if (G_OBJECT_CLASS (fs_raw_stream_parent_class)->constructed)
G_OBJECT_CLASS (fs_raw_stream_parent_class)->constructed (object);
}
@@ -1043,16 +787,7 @@ fs_raw_stream_set_remote_codecs (FsStream *stream,
GST_OBJECT_UNLOCK (conf);
if (is_new)
- {
- FsCodec *codec = remote_codecs->data;
- GstCaps *caps;
-
- caps = fs_raw_codec_to_gst_caps (codec);
- g_object_set (self->priv->recv_capsfilter, "caps", caps, NULL);
- gst_caps_unref (caps);
-
g_object_notify (G_OBJECT (stream), "remote-codecs");
- }
g_object_unref (session);
g_object_unref (conf);
@@ -1084,7 +819,6 @@ fs_raw_stream_new (FsRawSession *session,
FsStreamDirection direction,
FsRawConference *conference,
FsStreamTransmitter *stream_transmitter,
- GstPad *transmitter_pad,
stream_get_new_stream_transmitter_cb get_new_stream_transmitter_cb,
gpointer user_data,
GError **error)
@@ -1101,7 +835,6 @@ fs_raw_stream_new (FsRawSession *session,
"direction", direction,
"conference", conference,
"stream-transmitter", stream_transmitter,
- "transmitter-pad", transmitter_pad,
NULL);
if (!self)
diff --git a/gst/fsrawconference/fs-raw-stream.h b/gst/fsrawconference/fs-raw-stream.h
index 4ddbf629..c94ff8d3 100644
--- a/gst/fsrawconference/fs-raw-stream.h
+++ b/gst/fsrawconference/fs-raw-stream.h
@@ -85,7 +85,6 @@ FsRawStream *fs_raw_stream_new (FsRawSession *session,
FsStreamDirection direction,
FsRawConference *conference,
FsStreamTransmitter *stream_transmitter,
- GstPad *transmitter_pad,
stream_get_new_stream_transmitter_cb get_new_stream_transmitter_cb,
gpointer user_data,
GError **error);