diff options
Diffstat (limited to 'transmitters')
-rw-r--r-- | transmitters/multicast/Makefile.am | 6 | ||||
-rw-r--r-- | transmitters/multicast/fs-multicast-transmitter.c | 36 | ||||
-rw-r--r-- | transmitters/nice/fs-nice-stream-transmitter.c | 11 | ||||
-rw-r--r-- | transmitters/nice/fs-nice-transmitter.c | 33 | ||||
-rw-r--r-- | transmitters/nice/fs-nice-transmitter.h | 2 | ||||
-rw-r--r-- | transmitters/rawudp/Makefile.am | 9 | ||||
-rw-r--r-- | transmitters/rawudp/fs-rawudp-component.c | 102 | ||||
-rw-r--r-- | transmitters/rawudp/fs-rawudp-transmitter.c | 105 | ||||
-rw-r--r-- | transmitters/rawudp/fs-rawudp-transmitter.h | 14 | ||||
-rw-r--r-- | transmitters/shm/fs-shm-transmitter.c | 30 |
10 files changed, 215 insertions, 133 deletions
diff --git a/transmitters/multicast/Makefile.am b/transmitters/multicast/Makefile.am index be533540..3baa27bc 100644 --- a/transmitters/multicast/Makefile.am +++ b/transmitters/multicast/Makefile.am @@ -13,13 +13,15 @@ libmulticast_transmitter_la_CFLAGS = \ $(FS_INTERNAL_CFLAGS) \ $(FS_CFLAGS) \ $(GST_PLUGINS_BASE_CFLAGS) \ - $(GST_CFLAGS) + $(GST_CFLAGS) \ + $(GIO_CFLAGS) libmulticast_transmitter_la_LDFLAGS = $(FS_PLUGIN_LDFLAGS) libmulticast_transmitter_la_LIBADD = \ $(top_builddir)/farstream/libfarstream-@FS_MAJORMINOR@.la \ $(FS_LIBS) \ $(GST_BASE_LIBS) \ - $(GST_LIBS) + $(GST_LIBS) \ + $(GIO_LIBS) noinst_HEADERS = \ fs-multicast-transmitter.h \ diff --git a/transmitters/multicast/fs-multicast-transmitter.c b/transmitters/multicast/fs-multicast-transmitter.c index a02046cb..bce74a9c 100644 --- a/transmitters/multicast/fs-multicast-transmitter.c +++ b/transmitters/multicast/fs-multicast-transmitter.c @@ -43,6 +43,8 @@ #include <string.h> #include <sys/types.h> +#include <gio/gio.h> + #ifdef HAVE_UNISTD_H # include <unistd.h> #endif @@ -274,12 +276,12 @@ fs_multicast_transmitter_constructed (GObject *object) /* Lets create the RTP source funnel */ - self->priv->udpsrc_funnels[c] = gst_element_factory_make ("fsfunnel", NULL); + self->priv->udpsrc_funnels[c] = gst_element_factory_make ("funnel", NULL); if (!self->priv->udpsrc_funnels[c]) { trans->construction_error = g_error_new (FS_ERROR, FS_ERROR_CONSTRUCTION, - "Could not make the fsfunnel element"); + "Could not make the funnel element"); return; } @@ -287,11 +289,11 @@ fs_multicast_transmitter_constructed (GObject *object) self->priv->udpsrc_funnels[c])) { trans->construction_error = g_error_new (FS_ERROR, FS_ERROR_CONSTRUCTION, - "Could not add the fsfunnel element to the transmitter src bin"); + "Could not add the funnel element to the transmitter src bin"); } pad = gst_element_get_static_pad (self->priv->udpsrc_funnels[c], "src"); - padname = g_strdup_printf ("src%d", c); + padname = g_strdup_printf ("src_%u", c); ghostpad = gst_ghost_pad_new (padname, pad); g_free (padname); gst_object_unref (pad); @@ -319,7 +321,7 @@ fs_multicast_transmitter_constructed (GObject *object) } pad = gst_element_get_static_pad (self->priv->udpsink_tees[c], "sink"); - padname = g_strdup_printf ("sink%d", c); + padname = g_strdup_printf ("sink_%u", c); ghostpad = gst_ghost_pad_new (padname, pad); g_free (padname); gst_object_unref (pad); @@ -350,7 +352,7 @@ fs_multicast_transmitter_constructed (GObject *object) "sync" , FALSE, NULL); - pad = gst_element_get_request_pad (self->priv->udpsink_tees[c], "src%d"); + pad = gst_element_get_request_pad (self->priv->udpsink_tees[c], "src_%u"); pad2 = gst_element_get_static_pad (fakesink, "sink"); ret = gst_pad_link (pad, pad2); @@ -525,6 +527,7 @@ struct _UdpSock { guint8 current_ttl; gint fd; + GSocket *socket; /* Protected by the transmitter mutex */ GByteArray *ttls; @@ -698,7 +701,7 @@ _bind_port ( static GstElement * _create_sinksource (gchar *elementname, GstBin *bin, - GstElement *teefunnel, GstElement *filter, gint fd, + GstElement *teefunnel, GstElement *filter, GSocket *socket, GstPadDirection direction, GstPad **requested_pad, GError **error) { GstElement *elem; @@ -716,8 +719,8 @@ _create_sinksource (gchar *elementname, GstBin *bin, } g_object_set (elem, - "closefd", FALSE, - "sockfd", fd, + "close-socket", FALSE, + "socket", socket, "auto-multicast", FALSE, NULL); @@ -730,9 +733,9 @@ _create_sinksource (gchar *elementname, GstBin *bin, } if (direction == GST_PAD_SINK) - *requested_pad = gst_element_get_request_pad (teefunnel, "src%d"); + *requested_pad = gst_element_get_request_pad (teefunnel, "src_%u"); else - *requested_pad = gst_element_get_request_pad (teefunnel, "sink%d"); + *requested_pad = gst_element_get_request_pad (teefunnel, "sink_%u"); if (!*requested_pad) { g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION, @@ -946,13 +949,17 @@ fs_multicast_transmitter_get_udpsock (FsMulticastTransmitter *trans, if (udpsock->fd < 0) goto error; + udpsock->socket = g_socket_new_from_fd (udpsock->fd, error); + if (!udpsock->socket) + goto error; + /* Now lets create the elements */ udpsock->tee = trans->priv->udpsink_tees[component_id]; udpsock->funnel = trans->priv->udpsrc_funnels[component_id]; udpsock->udpsrc = _create_sinksource ("udpsrc", - GST_BIN (trans->priv->gst_src), udpsock->funnel, NULL, udpsock->fd, + GST_BIN (trans->priv->gst_src), udpsock->funnel, NULL, udpsock->socket, GST_PAD_SRC, &udpsock->udpsrc_requested_pad, error); if (!udpsock->udpsrc) goto error; @@ -963,7 +970,7 @@ fs_multicast_transmitter_get_udpsock (FsMulticastTransmitter *trans, udpsock->udpsink = _create_sinksource ("multiudpsink", GST_BIN (trans->priv->gst_sink), udpsock->tee, udpsock->udpsink_recvonly_filter, - udpsock->fd, GST_PAD_SINK, &udpsock->udpsink_requested_pad, error); + udpsock->socket, GST_PAD_SINK, &udpsock->udpsink_requested_pad, error); if (!udpsock->udpsink) goto error; @@ -1120,6 +1127,9 @@ fs_multicast_transmitter_put_udpsock (FsMulticastTransmitter *trans, GST_ERROR ("Could not remove sink filter element from transmitter sink"); } + if (udpsock->socket) + g_object_unref (udpsock->socket); + if (udpsock->fd >= 0) close (udpsock->fd); diff --git a/transmitters/nice/fs-nice-stream-transmitter.c b/transmitters/nice/fs-nice-stream-transmitter.c index 1f0dbefc..697eb568 100644 --- a/transmitters/nice/fs-nice-stream-transmitter.c +++ b/transmitters/nice/fs-nice-stream-transmitter.c @@ -172,8 +172,8 @@ static void agent_new_candidate (NiceAgent *agent, const gchar *foundation, gpointer user_data); -static gboolean known_buffer_have_buffer_handler (GstPad *pad, - GstBuffer *buffer, +static GstPadProbeReturn known_buffer_have_buffer_handler (GstPad *pad, + GstPadProbeInfo *info, gpointer user_data); @@ -1398,7 +1398,7 @@ fs_nice_stream_transmitter_build (FsNiceStreamTransmitter *self, self->priv->transmitter, self->priv->agent->agent, self->priv->stream_id, - G_CALLBACK (known_buffer_have_buffer_handler), self, + known_buffer_have_buffer_handler, self, error); if (self->priv->gststream == NULL) return FALSE; @@ -1833,12 +1833,13 @@ fs_nice_stream_transmitter_newv (FsNiceTransmitter *transmitter, } -static gboolean -known_buffer_have_buffer_handler (GstPad *pad, GstBuffer *buffer, +static GstPadProbeReturn +known_buffer_have_buffer_handler (GstPad *pad, GstPadProbeInfo *info, gpointer user_data) { FsNiceStreamTransmitter *self = FS_NICE_STREAM_TRANSMITTER (user_data); guint component_id; + GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info); if (!g_atomic_int_get (&self->priv->associate_on_source)) return TRUE; diff --git a/transmitters/nice/fs-nice-transmitter.c b/transmitters/nice/fs-nice-transmitter.c index a99b7478..0798dd8e 100644 --- a/transmitters/nice/fs-nice-transmitter.c +++ b/transmitters/nice/fs-nice-transmitter.c @@ -255,13 +255,13 @@ fs_nice_transmitter_constructed (GObject *object) /* Lets create the RTP source funnel */ - self->priv->src_funnels[c] = gst_element_factory_make ("fsfunnel", NULL); + self->priv->src_funnels[c] = gst_element_factory_make ("funnel", NULL); if (!self->priv->src_funnels[c]) { trans->construction_error = g_error_new (FS_ERROR, FS_ERROR_CONSTRUCTION, - "Could not make the fsfunnel element"); + "Could not make the funnel element"); return; } @@ -270,11 +270,11 @@ fs_nice_transmitter_constructed (GObject *object) { trans->construction_error = g_error_new (FS_ERROR, FS_ERROR_CONSTRUCTION, - "Could not add the fsfunnel element to the transmitter src bin"); + "Could not add the funnel element to the transmitter src bin"); } pad = gst_element_get_static_pad (self->priv->src_funnels[c], "src"); - padname = g_strdup_printf ("src%d", c); + padname = g_strdup_printf ("src_%u", c); ghostpad = gst_ghost_pad_new (padname, pad); g_free (padname); gst_object_unref (pad); @@ -304,7 +304,7 @@ fs_nice_transmitter_constructed (GObject *object) } pad = gst_element_get_static_pad (self->priv->sink_tees[c], "sink"); - padname = g_strdup_printf ("sink%d", c); + padname = g_strdup_printf ("sink_%u", c); ghostpad = gst_ghost_pad_new (padname, pad); g_free (padname); gst_object_unref (pad); @@ -336,7 +336,7 @@ fs_nice_transmitter_constructed (GObject *object) return; } - pad = gst_element_get_request_pad (self->priv->sink_tees[c], "src%d"); + pad = gst_element_get_request_pad (self->priv->sink_tees[c], "src_%u"); pad2 = gst_element_get_static_pad (fakesink, "sink"); ret = gst_pad_link (pad, pad2); @@ -495,7 +495,7 @@ _create_sinksource ( guint component_id, GstPadDirection direction, gboolean do_timestamp, - GCallback have_buffer_callback, + GstPadProbeCallback have_buffer_callback, gpointer have_buffer_user_data, gulong *buffer_probe_id, GstPad **requested_pad, @@ -545,9 +545,9 @@ _create_sinksource ( gst_object_ref (elem); if (direction == GST_PAD_SINK) - *requested_pad = gst_element_get_request_pad (teefunnel, "src%d"); + *requested_pad = gst_element_get_request_pad (teefunnel, "src_%u"); else - *requested_pad = gst_element_get_request_pad (teefunnel, "sink%d"); + *requested_pad = gst_element_get_request_pad (teefunnel, "sink_%u"); if (!*requested_pad) { @@ -626,17 +626,18 @@ _create_sinksource ( { g_object_set_data (G_OBJECT (*requested_pad), "component-id", GUINT_TO_POINTER (component_id)); - *buffer_probe_id = gst_pad_add_buffer_probe (*requested_pad, + *buffer_probe_id = gst_pad_add_probe (*requested_pad, + GST_PAD_PROBE_TYPE_BUFFER, have_buffer_callback, - have_buffer_user_data); + have_buffer_user_data, NULL); } else { g_object_set_data (G_OBJECT (elempad), "component-id", GUINT_TO_POINTER (component_id)); - *buffer_probe_id = gst_pad_add_buffer_probe (elempad, - have_buffer_callback, - have_buffer_user_data); + *buffer_probe_id = gst_pad_add_probe (elempad, + GST_PAD_PROBE_TYPE_BUFFER, + have_buffer_callback, have_buffer_user_data, NULL); } if (*buffer_probe_id == 0) @@ -709,7 +710,7 @@ NiceGstStream * fs_nice_transmitter_add_gst_stream (FsNiceTransmitter *self, NiceAgent *agent, guint stream_id, - GCallback have_buffer_callback, + GstPadProbeCallback have_buffer_callback, gpointer have_buffer_user_data, GError **error) { @@ -920,7 +921,7 @@ fs_nice_transmitter_set_sending (FsNiceTransmitter *self, ns->requested_tee_pads[c] = - gst_element_get_request_pad (self->priv->sink_tees[c], "src%d"); + gst_element_get_request_pad (self->priv->sink_tees[c], "src_%u"); g_assert (ns->requested_tee_pads[c]); diff --git a/transmitters/nice/fs-nice-transmitter.h b/transmitters/nice/fs-nice-transmitter.h index e4cbe898..a203ce36 100644 --- a/transmitters/nice/fs-nice-transmitter.h +++ b/transmitters/nice/fs-nice-transmitter.h @@ -93,7 +93,7 @@ typedef struct _NiceGstStream NiceGstStream; NiceGstStream *fs_nice_transmitter_add_gst_stream (FsNiceTransmitter *self, NiceAgent *agent, guint stream_id, - GCallback have_buffer_callback, + GstPadProbeCallback have_buffer_callback, gpointer have_buffer_user_data, GError **error); diff --git a/transmitters/rawudp/Makefile.am b/transmitters/rawudp/Makefile.am index eed9960b..41051de8 100644 --- a/transmitters/rawudp/Makefile.am +++ b/transmitters/rawudp/Makefile.am @@ -18,19 +18,20 @@ nodist_librawudp_transmitter_la_SOURCES = \ librawudp_transmitter_la_CFLAGS = \ $(FS_INTERNAL_CFLAGS) \ $(FS_CFLAGS) \ - $(GST_PLUGINS_BASE_CFLAGS) \ $(GST_CFLAGS) \ $(NICE_CFLAGS) \ - $(GUPNP_CFLAGS) + $(GUPNP_CFLAGS) \ + $(GIO_CFLAGS) + librawudp_transmitter_la_LDFLAGS = $(FS_PLUGIN_LDFLAGS) librawudp_transmitter_la_LIBADD = \ $(top_builddir)/farstream/libfarstream-@FS_MAJORMINOR@.la \ $(FS_LIBS) \ - $(GST_PLUGINS_BASE_LIBS) \ $(GST_LIBS) \ $(NICE_LIBS) \ $(GUPNP_LIBS) \ - -lgstnetbuffer-@GST_MAJORMINOR@ + $(GIO_LIBS) \ + -lgstnet-@GST_MAJORMINOR@ noinst_HEADERS = \ fs-rawudp-transmitter.h \ diff --git a/transmitters/rawudp/fs-rawudp-component.c b/transmitters/rawudp/fs-rawudp-component.c index 27f6c8a3..4713b9b3 100644 --- a/transmitters/rawudp/fs-rawudp-component.c +++ b/transmitters/rawudp/fs-rawudp-component.c @@ -39,7 +39,7 @@ #include <farstream/fs-conference.h> -#include <gst/netbuffer/gstnetbuffer.h> +#include <gst/net/gstnetaddressmeta.h> #ifdef HAVE_GUPNP #include <libgupnp-igd/gupnp-simple-igd-thread.h> @@ -143,7 +143,7 @@ struct _FsRawUdpComponentPrivate UdpPort *udpport; FsCandidate *remote_candidate; - GstNetAddress remote_address; + GSocketAddress *remote_address; FsCandidate *local_active_candidate; FsCandidate *local_forced_candidate; @@ -216,16 +216,15 @@ static void fs_rawudp_component_emit_candidate (FsRawUdpComponent *self, FsCandidate *candidate); -static gboolean -stun_recv_cb (GstPad *pad, GstBuffer *buffer, - gpointer user_data); +static GstPadProbeReturn +stun_recv_cb (GstPad *pad, GstPadProbeInfo *info, gpointer user_data); static gpointer stun_timeout_func (gpointer user_data); -static gboolean -buffer_recv_cb (GstPad *pad, GstBuffer *buffer, gpointer user_data); +static GstPadProbeReturn +buffer_recv_cb (GstPad *pad, GstPadProbeInfo *info, gpointer user_data); static void -remote_is_unique_cb (gboolean unique, const GstNetAddress *address, +remote_is_unique_cb (gboolean unique, GSocketAddress *address, gpointer user_data); static gboolean @@ -261,8 +260,6 @@ fs_rawudp_component_register_type (FsPlugin *module) /* Required because the GST type registration is not thread safe */ - g_type_class_ref (GST_TYPE_NETBUFFER); - type = g_type_module_register_type (G_TYPE_MODULE (module), G_TYPE_OBJECT, "FsRawUdpComponent", &info, 0); @@ -572,8 +569,7 @@ fs_rawudp_constructed (GObject *object) if (self->priv->associate_on_source) self->priv->buffer_recv_id = fs_rawudp_transmitter_udpport_connect_recv ( - self->priv->udpport, - G_CALLBACK (buffer_recv_cb), self); + self->priv->udpport, buffer_recv_cb, self); GST_CALL_PARENT (G_OBJECT_CLASS, constructed, (object)); } @@ -611,6 +607,8 @@ fs_rawudp_component_dispose (GObject *object) self->priv->transmitter = NULL; FS_RAWUDP_COMPONENT_UNLOCK (self); + g_clear_object (&self->priv->remote_address); + g_object_unref (ts); parent_class->dispose (object); @@ -670,7 +668,7 @@ fs_rawudp_component_stop (FsRawUdpComponent *self) fs_rawudp_transmitter_udpport_remove_known_address (udpport, - &self->priv->remote_address, remote_is_unique_cb, self); + self->priv->remote_address, remote_is_unique_cb, self); } FS_RAWUDP_COMPONENT_UNLOCK (self); @@ -912,14 +910,14 @@ fs_rawudp_component_new ( } static void -remote_is_unique_cb (gboolean unique, const GstNetAddress *address, +remote_is_unique_cb (gboolean unique, GSocketAddress *address, gpointer user_data) { FsRawUdpComponent *self = FS_RAWUDP_COMPONENT (user_data); FS_RAWUDP_COMPONENT_LOCK (self); - if (!gst_netaddress_equal (address, &self->priv->remote_address)) + if (!fs_g_inet_socket_address_equal (address, self->priv->remote_address)) { GST_ERROR ("Got callback for an address that is not ours"); goto out; @@ -942,6 +940,7 @@ fs_rawudp_component_set_remote_candidate (FsRawUdpComponent *self, struct addrinfo hints = {0}; struct addrinfo *res = NULL; int rv; + GInetAddress *addr; if (candidate->component_id != self->priv->component) { @@ -973,30 +972,36 @@ fs_rawudp_component_set_remote_candidate (FsRawUdpComponent *self, if (self->priv->remote_candidate) fs_rawudp_transmitter_udpport_remove_known_address (self->priv->udpport, - &self->priv->remote_address, remote_is_unique_cb, self); + self->priv->remote_address, remote_is_unique_cb, self); old_candidate = self->priv->remote_candidate; self->priv->remote_candidate = fs_candidate_copy (candidate); sending = self->priv->sending; + g_clear_object (&self->priv->remote_address); + switch (res->ai_family) { case AF_INET: - gst_netaddress_set_ip4_address (&self->priv->remote_address, - ((struct sockaddr_in *)res->ai_addr)->sin_addr.s_addr, - g_htons(candidate->port)); + addr = g_inet_address_new_from_bytes ( + (guint8*) &(((struct sockaddr_in *)res->ai_addr)->sin_addr.s_addr), + G_SOCKET_FAMILY_IPV4); break; case AF_INET6: - gst_netaddress_set_ip6_address (&self->priv->remote_address, - ((struct sockaddr_in6 *)res->ai_addr)->sin6_addr.s6_addr, - g_htons(candidate->port)); + addr = g_inet_address_new_from_bytes ( + (guint8*) &(((struct sockaddr_in6 *)res->ai_addr)->sin6_addr.s6_addr), + G_SOCKET_FAMILY_IPV6); break; } + self->priv->remote_address = g_inet_socket_address_new (addr, + candidate->port); + g_object_unref (addr); + self->priv->remote_is_unique = fs_rawudp_transmitter_udpport_add_known_address (self->priv->udpport, - &self->priv->remote_address, remote_is_unique_cb, self); + self->priv->remote_address, remote_is_unique_cb, self); FS_RAWUDP_COMPONENT_UNLOCK (self); @@ -1308,8 +1313,7 @@ fs_rawudp_component_start_stun (FsRawUdpComponent *self, GError **error) FS_RAWUDP_COMPONENT_LOCK (self); self->priv->stun_recv_id = fs_rawudp_transmitter_udpport_connect_recv ( - self->priv->udpport, - G_CALLBACK (stun_recv_cb), self); + self->priv->udpport, stun_recv_cb, self); nice_address_init (&niceaddr); @@ -1376,11 +1380,11 @@ fs_rawudp_component_stop_stun_locked (FsRawUdpComponent *self) -static gboolean -stun_recv_cb (GstPad *pad, GstBuffer *buffer, - gpointer user_data) +static GstPadProbeReturn +stun_recv_cb (GstPad *pad, GstPadProbeInfo *info, gpointer user_data) { FsRawUdpComponent *self = FS_RAWUDP_COMPONENT (user_data); + GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info); FsCandidate *candidate = NULL; StunMessage msg; StunValidationStatus stunv; @@ -1391,26 +1395,30 @@ stun_recv_cb (GstPad *pad, GstBuffer *buffer, socklen_t alt_addr_len = sizeof(alt_addr); gchar addr_str[NI_MAXHOST]; NiceAddress niceaddr; + GstMapInfo map; + + gst_buffer_map (buffer, &map, GST_MAP_READ); - if (GST_BUFFER_SIZE (buffer) < 4) + if (gst_buffer_get_size (buffer) < 4) /* Packet is too small to be STUN */ - return TRUE; + goto passthrough; - if (GST_BUFFER_DATA (buffer)[0] >> 6) + if (map.data[0] >> 6) /* Non stun packet */ - return TRUE; + goto passthrough; g_assert (fs_rawudp_transmitter_udpport_is_pad (self->priv->udpport, pad)); FS_RAWUDP_COMPONENT_LOCK(self); stunv = stun_agent_validate (&self->priv->stun_agent, &msg, - GST_BUFFER_DATA (buffer), GST_BUFFER_SIZE (buffer), NULL, NULL); + map.data, map.size, NULL, NULL); FS_RAWUDP_COMPONENT_UNLOCK(self); /* not a valid stun message */ if (stunv != STUN_VALIDATION_SUCCESS) - return TRUE; + goto passthrough; + stunr = stun_usage_bind_process (&msg, (struct sockaddr *) &addr, &addr_len, @@ -1483,7 +1491,14 @@ stun_recv_cb (GstPad *pad, GstBuffer *buffer, fs_candidate_destroy (candidate); - return FALSE; + gst_buffer_unmap (buffer, &map); + + return GST_PAD_PROBE_DROP; + +passthrough: + + gst_buffer_unmap (buffer, &map); + return GST_PAD_PROBE_OK; } static gpointer @@ -1704,18 +1719,19 @@ fs_rawudp_component_emit_candidate (FsRawUdpComponent *self, * This is a has "have-data" signal handler, so we return %TRUE to not * drop the buffer */ -static gboolean -buffer_recv_cb (GstPad *pad, GstBuffer *buffer, gpointer user_data) +static GstPadProbeReturn +buffer_recv_cb (GstPad *pad, GstPadProbeInfo *info, gpointer user_data) { FsRawUdpComponent *self = FS_RAWUDP_COMPONENT (user_data); + GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info); + GstNetAddressMeta *netmeta = gst_buffer_get_net_address_meta (buffer); - if (GST_IS_NETBUFFER (buffer)) + if (netmeta) { - GstNetBuffer *netbuffer = (GstNetBuffer*) buffer; - FS_RAWUDP_COMPONENT_LOCK (self); if (self->priv->remote_is_unique && - gst_netaddress_equal (&self->priv->remote_address, &netbuffer->from)) + fs_g_inet_socket_address_equal (self->priv->remote_address, + netmeta->addr)) { FS_RAWUDP_COMPONENT_UNLOCK (self); g_signal_emit (self, signals[KNOWN_SOURCE_PACKET_RECEIVED], 0, @@ -1728,8 +1744,8 @@ buffer_recv_cb (GstPad *pad, GstBuffer *buffer, gpointer user_data) } else { - GST_WARNING ("received buffer thats not a NetBuffer"); + GST_WARNING ("received buffer that does not contain a GstNetAddressMeta"); } - return TRUE; + return GST_PAD_PROBE_OK; } diff --git a/transmitters/rawudp/fs-rawudp-transmitter.c b/transmitters/rawudp/fs-rawudp-transmitter.c index 377de54e..c81aaddb 100644 --- a/transmitters/rawudp/fs-rawudp-transmitter.c +++ b/transmitters/rawudp/fs-rawudp-transmitter.c @@ -281,13 +281,13 @@ fs_rawudp_transmitter_constructed (GObject *object) /* Lets create the RTP source funnel */ - self->priv->udpsrc_funnels[c] = gst_element_factory_make ("fsfunnel", NULL); + self->priv->udpsrc_funnels[c] = gst_element_factory_make ("funnel", NULL); if (!self->priv->udpsrc_funnels[c]) { trans->construction_error = g_error_new (FS_ERROR, FS_ERROR_CONSTRUCTION, - "Could not make the fsfunnel element"); + "Could not make the funnel element"); return; } @@ -296,11 +296,11 @@ fs_rawudp_transmitter_constructed (GObject *object) { trans->construction_error = g_error_new (FS_ERROR, FS_ERROR_CONSTRUCTION, - "Could not add the fsfunnel element to the transmitter src bin"); + "Could not add the funnel element to the transmitter src bin"); } pad = gst_element_get_static_pad (self->priv->udpsrc_funnels[c], "src"); - padname = g_strdup_printf ("src%d", c); + padname = g_strdup_printf ("src_%u", c); ghostpad = gst_ghost_pad_new (padname, pad); g_free (padname); gst_object_unref (pad); @@ -330,7 +330,7 @@ fs_rawudp_transmitter_constructed (GObject *object) } pad = gst_element_get_static_pad (self->priv->udpsink_tees[c], "sink"); - padname = g_strdup_printf ("sink%d", c); + padname = g_strdup_printf ("sink_%u", c); ghostpad = gst_ghost_pad_new (padname, pad); g_free (padname); gst_object_unref (pad); @@ -362,7 +362,7 @@ fs_rawudp_transmitter_constructed (GObject *object) "sync", FALSE, NULL); - pad = gst_element_get_request_pad (self->priv->udpsink_tees[c], "src%d"); + pad = gst_element_get_request_pad (self->priv->udpsink_tees[c], "src_%u"); pad2 = gst_element_get_static_pad (fakesink, "sink"); ret = gst_pad_link (pad, pad2); @@ -549,6 +549,7 @@ struct _UdpPort { guint port; gint fd; + GSocket *socket; /* These are just convenience pointers to our parent transmitter */ GstElement *funnel; @@ -564,7 +565,7 @@ struct _UdpPort { struct KnownAddress { FsRawUdpAddressUniqueCallbackFunc callback; gpointer user_data; - GstNetAddress addr; + GSocketAddress *addr; }; static gint @@ -645,7 +646,7 @@ _create_sinksource ( GstBin *bin, GstElement *teefunnel, GstElement *filter, - gint fd, + GSocket *socket, GstPadDirection direction, gboolean do_timestamp, GstPad **requested_pad, @@ -667,9 +668,9 @@ _create_sinksource ( } g_object_set (elem, - "sockfd", fd, "auto-multicast", FALSE, - "closefd", FALSE, + "close-socket", FALSE, + "socket", socket, NULL); if (direction == GST_PAD_SINK) @@ -692,9 +693,9 @@ _create_sinksource ( } if (direction == GST_PAD_SINK) - *requested_pad = gst_element_get_request_pad (teefunnel, "src%d"); + *requested_pad = gst_element_get_request_pad (teefunnel, "src_%u"); else - *requested_pad = gst_element_get_request_pad (teefunnel, "sink%d"); + *requested_pad = gst_element_get_request_pad (teefunnel, "sink_%u"); if (!*requested_pad) { @@ -885,6 +886,10 @@ fs_rawudp_transmitter_get_udpport (FsRawUdpTransmitter *trans, if (udpport->fd < 0) goto error; + udpport->socket = g_socket_new_from_fd (udpport->fd, error); + if (!udpport->socket) + goto error; + /* Now lets create the elements */ udpport->tee = trans->priv->udpsink_tees[component_id]; @@ -892,14 +897,15 @@ fs_rawudp_transmitter_get_udpport (FsRawUdpTransmitter *trans, udpport->udpsrc = _create_sinksource ("udpsrc", GST_BIN (trans->priv->gst_src), udpport->funnel, NULL, - udpport->fd, GST_PAD_SRC, trans->priv->do_timestamp, + udpport->socket, GST_PAD_SRC, trans->priv->do_timestamp, &udpport->udpsrc_requested_pad, error); if (!udpport->udpsrc) goto error; udpport->udpsink = _create_sinksource ("multiudpsink", GST_BIN (trans->priv->gst_sink), udpport->tee, NULL, - udpport->fd, GST_PAD_SINK, FALSE, &udpport->udpsink_requested_pad, error); + udpport->socket, GST_PAD_SINK, FALSE, &udpport->udpsink_requested_pad, + error); if (!udpport->udpsink) goto error; @@ -909,8 +915,9 @@ fs_rawudp_transmitter_get_udpport (FsRawUdpTransmitter *trans, if (udpport->recvonly_filter) { udpport->recvonly_udpsink = _create_sinksource ("multiudpsink", - GST_BIN (trans->priv->gst_sink), udpport->tee, udpport->recvonly_filter, - udpport->fd, GST_PAD_SINK, FALSE, &udpport->recvonly_requested_pad, error); + GST_BIN (trans->priv->gst_sink), udpport->tee, + udpport->recvonly_filter, udpport->socket, GST_PAD_SINK, FALSE, + &udpport->recvonly_requested_pad, error); if (!udpport->recvonly_udpsink) goto error; } @@ -1018,13 +1025,21 @@ fs_rawudp_transmitter_put_udpport (FsRawUdpTransmitter *trans, GST_ERROR ("Could not remove udpsink element from transmitter source"); } + g_clear_object (&udpport->socket); + if (udpport->fd >= 0) close (udpport->fd); if (udpport->mutex) g_mutex_free (udpport->mutex); if (udpport->known_addresses) + { + guint i; + for (i = 0; i < udpport->known_addresses->len; i++) + g_object_unref (g_array_index (udpport->known_addresses, + struct KnownAddress, i).addr); g_array_free (udpport->known_addresses, TRUE); + } g_free (udpport->requested_ip); g_slice_free (UdpPort, udpport); @@ -1073,7 +1088,7 @@ fs_rawudp_transmitter_udpport_sendto (UdpPort *udpport, gulong fs_rawudp_transmitter_udpport_connect_recv (UdpPort *udpport, - GCallback callback, + GstPadProbeCallback callback, gpointer user_data) { GstPad *pad; @@ -1081,7 +1096,9 @@ fs_rawudp_transmitter_udpport_connect_recv (UdpPort *udpport, pad = gst_element_get_static_pad (udpport->udpsrc, "src"); - id = gst_pad_add_buffer_probe (pad, callback, user_data); + id = gst_pad_add_probe (pad, + GST_PAD_PROBE_TYPE_BUFFER, + callback, user_data, NULL); gst_object_unref (pad); @@ -1095,7 +1112,7 @@ fs_rawudp_transmitter_udpport_disconnect_recv (UdpPort *udpport, { GstPad *pad = gst_element_get_static_pad (udpport->udpsrc, "src"); - gst_pad_remove_buffer_probe (pad, id); + gst_pad_remove_probe (pad, id); gst_object_unref (pad); } @@ -1133,7 +1150,7 @@ fs_rawudp_transmitter_get_stream_transmitter_type (FsTransmitter *transmitter) /** * fs_rawudp_transmitter_udpport_add_known_address: * @udpport: a #UdpPort - * @address: the new #GstNetAddress that we know + * @address: the new #GSocketAddress that we know * @callback: a Callback that will be called if the uniqueness of an address * changes * @user_data: data passed back to the callback @@ -1146,7 +1163,7 @@ fs_rawudp_transmitter_get_stream_transmitter_type (FsTransmitter *transmitter) gboolean fs_rawudp_transmitter_udpport_add_known_address (UdpPort *udpport, - GstNetAddress *address, + GSocketAddress *address, FsRawUdpAddressUniqueCallbackFunc callback, gpointer user_data) { @@ -1159,11 +1176,13 @@ fs_rawudp_transmitter_udpport_add_known_address (UdpPort *udpport, g_mutex_lock (udpport->mutex); for (i = 0; - g_array_index (udpport->known_addresses, struct KnownAddress, i).callback; + g_array_index (udpport->known_addresses, + struct KnownAddress, i).callback; i++) { - struct KnownAddress *ka = &g_array_index (udpport->known_addresses, struct KnownAddress, i); - if (gst_netaddress_equal (address, &ka->addr)) + struct KnownAddress *ka = &g_array_index (udpport->known_addresses, + struct KnownAddress, i); + if (fs_g_inet_socket_address_equal (address, ka->addr)) { g_assert (!(ka->callback == callback && ka->user_data == user_data)); @@ -1179,10 +1198,10 @@ fs_rawudp_transmitter_udpport_add_known_address (UdpPort *udpport, else if (counter == 1) { if (prev_ka->callback) - prev_ka->callback (FALSE, &prev_ka->addr, prev_ka->user_data); + prev_ka->callback (FALSE, prev_ka->addr, prev_ka->user_data); } - memcpy (&newka.addr, address, sizeof (GstNetAddress)); + newka.addr = g_object_ref (address); newka.callback = callback; newka.user_data = user_data; @@ -1208,7 +1227,7 @@ fs_rawudp_transmitter_udpport_add_known_address (UdpPort *udpport, void fs_rawudp_transmitter_udpport_remove_known_address (UdpPort *udpport, - GstNetAddress *address, + GSocketAddress *address, FsRawUdpAddressUniqueCallbackFunc callback, gpointer user_data) { @@ -1223,8 +1242,9 @@ fs_rawudp_transmitter_udpport_remove_known_address (UdpPort *udpport, g_array_index (udpport->known_addresses, struct KnownAddress, i).callback; i++) { - struct KnownAddress *ka = &g_array_index (udpport->known_addresses, struct KnownAddress, i); - if (gst_netaddress_equal (address, &ka->addr)) + struct KnownAddress *ka = &g_array_index (udpport->known_addresses, + struct KnownAddress, i); + if (fs_g_inet_socket_address_equal (address, ka->addr)) { if (ka->callback == callback && ka->user_data == user_data) { @@ -1245,8 +1265,10 @@ fs_rawudp_transmitter_udpport_remove_known_address (UdpPort *udpport, } if (counter == 1) - prev_ka->callback (TRUE, &prev_ka->addr, prev_ka->user_data); + prev_ka->callback (TRUE, prev_ka->addr, prev_ka->user_data); + g_object_unref (g_array_index (udpport->known_addresses, + struct KnownAddress, remove_i).addr); g_array_remove_index_fast (udpport->known_addresses, remove_i); out: @@ -1307,3 +1329,26 @@ fs_rawudp_transmitter_set_type_of_service (FsRawUdpTransmitter *self, out: g_mutex_unlock (self->priv->mutex); } + + +/* TEMPORARY: should be in Glib */ +gboolean +fs_g_inet_socket_address_equal (GSocketAddress *addr1, GSocketAddress *addr2) +{ + GInetSocketAddress *inet1; + GInetSocketAddress *inet2; + + if (!G_IS_INET_SOCKET_ADDRESS (addr1) || !G_IS_INET_SOCKET_ADDRESS (addr2)) + return FALSE; + + inet1 = G_INET_SOCKET_ADDRESS (addr1); + inet2 = G_INET_SOCKET_ADDRESS (addr2); + + if (g_inet_socket_address_get_port (inet1) == + g_inet_socket_address_get_port (inet2) && + g_inet_address_equal (g_inet_socket_address_get_address (inet1), + g_inet_socket_address_get_address (inet2))) + return TRUE; + else + return FALSE; +} diff --git a/transmitters/rawudp/fs-rawudp-transmitter.h b/transmitters/rawudp/fs-rawudp-transmitter.h index c5c18de1..23ec312f 100644 --- a/transmitters/rawudp/fs-rawudp-transmitter.h +++ b/transmitters/rawudp/fs-rawudp-transmitter.h @@ -27,9 +27,8 @@ #include <farstream/fs-transmitter.h> -#include <gst/netbuffer/gstnetbuffer.h> - #include <gst/gst.h> +#include <gst/net/gstnetaddressmeta.h> #ifdef G_OS_WIN32 # include <ws2tcpip.h> @@ -93,7 +92,7 @@ struct _FsRawUdpTransmitter typedef struct _UdpPort UdpPort; typedef void (*FsRawUdpAddressUniqueCallbackFunc) (gboolean unique, - const GstNetAddress *address, gpointer user_data); + GSocketAddress *address, gpointer user_data); GType fs_rawudp_transmitter_get_type (void); @@ -123,7 +122,7 @@ gboolean fs_rawudp_transmitter_udpport_sendto (UdpPort *udpport, GError **error); gulong fs_rawudp_transmitter_udpport_connect_recv (UdpPort *udpport, - GCallback callback, + GstPadProbeCallback callback, gpointer user_data); void fs_rawudp_transmitter_udpport_disconnect_recv (UdpPort *udpport, gulong id); @@ -135,12 +134,12 @@ gint fs_rawudp_transmitter_udpport_get_port (UdpPort *udpport); gboolean fs_rawudp_transmitter_udpport_add_known_address (UdpPort *udpport, - GstNetAddress *address, + GSocketAddress *address, FsRawUdpAddressUniqueCallbackFunc callback, gpointer user_data); void fs_rawudp_transmitter_udpport_remove_known_address (UdpPort *udpport, - GstNetAddress *address, + GSocketAddress *address, FsRawUdpAddressUniqueCallbackFunc callback, gpointer user_data); @@ -152,6 +151,9 @@ void fs_rawudp_transmitter_udpport_remove_recvonly_dest (UdpPort *udpport, const gchar *ip, gint port); +gboolean fs_g_inet_socket_address_equal (GSocketAddress *addr1, + GSocketAddress *addr2); + G_END_DECLS #endif /* __FS_RAWUDP_TRANSMITTER_H__ */ diff --git a/transmitters/shm/fs-shm-transmitter.c b/transmitters/shm/fs-shm-transmitter.c index af658f80..b548608a 100644 --- a/transmitters/shm/fs-shm-transmitter.c +++ b/transmitters/shm/fs-shm-transmitter.c @@ -346,12 +346,12 @@ fs_shm_transmitter_constructed (GObject *object) /* Lets create the RTP source funnel */ - self->priv->funnels[c] = gst_element_factory_make ("fsfunnel", NULL); + self->priv->funnels[c] = gst_element_factory_make ("funnel", NULL); if (!self->priv->funnels[c]) { trans->construction_error = g_error_new (FS_ERROR, FS_ERROR_CONSTRUCTION, - "Could not make the fsfunnel element"); + "Could not make the funnel element"); return; } @@ -359,11 +359,11 @@ fs_shm_transmitter_constructed (GObject *object) self->priv->funnels[c])) { trans->construction_error = g_error_new (FS_ERROR, FS_ERROR_CONSTRUCTION, - "Could not add the fsfunnel element to the transmitter src bin"); + "Could not add the funnel element to the transmitter src bin"); } pad = gst_element_get_static_pad (self->priv->funnels[c], "src"); - padname = g_strdup_printf ("src%d", c); + padname = g_strdup_printf ("src_%u", c); ghostpad = gst_ghost_pad_new (padname, pad); g_free (padname); gst_object_unref (pad); @@ -391,7 +391,7 @@ fs_shm_transmitter_constructed (GObject *object) } pad = gst_element_get_static_pad (self->priv->tees[c], "sink"); - padname = g_strdup_printf ("sink%d", c); + padname = g_strdup_printf ("sink_%u", c); ghostpad = gst_ghost_pad_new (padname, pad); g_free (padname); gst_object_unref (pad); @@ -422,7 +422,7 @@ fs_shm_transmitter_constructed (GObject *object) return; } - pad = gst_element_get_request_pad (self->priv->tees[c], "src%d"); + pad = gst_element_get_request_pad (self->priv->tees[c], "src_%u"); pad2 = gst_element_get_static_pad (fakesink, "sink"); ret = gst_pad_link (pad, pad2); @@ -570,9 +570,12 @@ struct _ShmSrc { }; -static gboolean -src_buffer_probe_cb (GstPad *pad, GstBuffer *buffer, ShmSrc *shm) +static GstPadProbeReturn +src_buffer_probe_cb (GstPad *pad, GstPadProbeInfo *info, gpointer user_data) { + ShmSrc *shm = user_data; + GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info); + shm->got_buffer_func (buffer, shm->component, shm->cb_data); return TRUE; @@ -638,7 +641,7 @@ fs_shm_transmitter_get_shm_src (FsShmTransmitter *self, shm->src = elem; shm->funnelpad = gst_element_get_request_pad (self->priv->funnels[component], - "sink%d"); + "sink_%u"); if (!shm->funnelpad) { @@ -659,8 +662,9 @@ fs_shm_transmitter_get_shm_src (FsShmTransmitter *self, gst_object_unref (pad); if (got_buffer_func) - shm->buffer_probe = gst_pad_add_buffer_probe (shm->funnelpad, - G_CALLBACK (src_buffer_probe_cb), shm); + shm->buffer_probe = gst_pad_add_probe (shm->funnelpad, + GST_PAD_PROBE_TYPE_BUFFER, + src_buffer_probe_cb, shm, NULL); if (!gst_element_sync_state_with_parent (shm->src)) { @@ -688,7 +692,7 @@ fs_shm_transmitter_check_shm_src (FsShmTransmitter *self, ShmSrc *shm, return TRUE; if (shm->buffer_probe) - gst_pad_remove_buffer_probe (shm->funnelpad, shm->buffer_probe); + gst_pad_remove_probe (shm->funnelpad, shm->buffer_probe); shm->buffer_probe = 0; if (shm->src) @@ -845,7 +849,7 @@ fs_shm_transmitter_get_shm_sink (FsShmTransmitter *self, } shm->teepad = gst_element_get_request_pad (self->priv->tees[component], - "src%d"); + "src_%u"); if (!shm->teepad) { |