/* * stream-tube-channel.h - high level API for StreamTube channels * * Copyright (C) 2010 Collabora Ltd. * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ /** * SECTION:stream-tube-channel * @title: TpStreamTubeChannel * @short_description: proxy object for a stream tube channel * * #TpStreamTubeChannel is a sub-class of #TpChannel providing convenient API * to offer and accept a stream tube. * * Since: 0.13.2 */ /** * TpStreamTubeChannel: * * Data structure representing a #TpStreamTubeChannel. * * Since: 0.13.2 */ /** * TpStreamTubeChannelClass: * * The class of a #TpStreamTubeChannel. * * Since: 0.13.2 */ #include #include "telepathy-glib/stream-tube-channel.h" #include #include #include #include #include #include #include #include #include #include #define DEBUG_FLAG TP_DEBUG_CHANNEL #include "telepathy-glib/debug-internal.h" #include "_gen/signals-marshal.h" #include #include #ifdef HAVE_GIO_UNIX #include #include #endif /* HAVE_GIO_UNIX */ G_DEFINE_TYPE (TpStreamTubeChannel, tp_stream_tube_channel, TP_TYPE_CHANNEL) /* Used to store the data of a NewRemoteConnection signal while we are waiting * for the TCP connection identified by this signal */ typedef struct { TpHandle handle; GValue *param; guint connection_id; gboolean rejected; } SigWaitingConn; static SigWaitingConn * sig_waiting_conn_new (TpHandle handle, const GValue *param, guint connection_id, gboolean rejected) { SigWaitingConn *ret = g_slice_new0 (SigWaitingConn); ret->handle = handle; ret->param = tp_g_value_slice_dup (param); ret->connection_id = connection_id; ret->rejected = rejected; return ret; } static void sig_waiting_conn_free (SigWaitingConn *sig) { g_assert (sig != NULL); tp_g_value_slice_free (sig->param); g_slice_free (SigWaitingConn, sig); } typedef struct { GSocketConnection *conn; /* Used only with TP_SOCKET_ACCESS_CONTROL_CREDENTIALS to store the byte * read with the credentials. */ guchar byte; } ConnWaitingSig; static ConnWaitingSig * conn_waiting_sig_new (GSocketConnection *conn, guchar byte) { ConnWaitingSig *ret = g_slice_new0 (ConnWaitingSig); ret->conn = g_object_ref (conn); ret->byte = byte; return ret; } static void conn_waiting_sig_free (ConnWaitingSig *c) { g_assert (c != NULL); g_object_unref (c->conn); g_slice_free (ConnWaitingSig, c); } struct _TpStreamTubeChannelPrivate { GHashTable *parameters; /* Offering side */ GSocketService *service; GSocketAddress *address; /* GSocketConnection we have accepted but are still waiting a * NewRemoteConnection to identify them. Owned ConnWaitingSig. */ GSList *conn_waiting_sig; /* NewRemoteConnection signals we have received but didn't accept their TCP * connection yet. Owned SigWaitingConn. */ GSList *sig_waiting_conn; /* Accepting side */ GSocket *client_socket; /* The access_control_param we passed to Accept */ GValue *access_control_param; /* Connection to the CM while we are waiting for its * ID (NewLocalConnection) */ GSocketConnection *local_conn_waiting_id; /* ID received from NewLocalConnection stored while the connection has not * be connected yet. */ guint local_conn_id; /* TRUE if local_conn_id is meaningfull (0 can be a valid ID so we can't use * it to check if NewLocalConnection has been received :\ ) */ gboolean local_conn_id_set; TpSocketAddressType socket_type; TpSocketAccessControl access_control; GSimpleAsyncResult *result; /* (guint) connection ID => weakly reffed TpStreamTubeConnection */ GHashTable *tube_connections; }; enum { PROP_SERVICE = 1, PROP_PARAMETERS }; enum /* signals */ { INCOMING, LAST_SIGNAL }; static guint _signals[LAST_SIGNAL] = { 0, }; static void remote_connection_destroyed_cb (gpointer user_data, GObject *conn) { /* The GSocketConnection has been destroyed, removing it from the hash */ TpStreamTubeChannel *self = user_data; GHashTableIter iter; gpointer value; g_hash_table_iter_init (&iter, self->priv->tube_connections); while (g_hash_table_iter_next (&iter, NULL, &value)) { if (value == conn) { g_hash_table_iter_remove (&iter); break; } } } static void tp_stream_tube_channel_dispose (GObject *obj) { TpStreamTubeChannel *self = (TpStreamTubeChannel *) obj; if (self->priv->service != NULL) { g_socket_service_stop (self->priv->service); tp_clear_object (&self->priv->service); } tp_clear_object (&self->priv->result); tp_clear_pointer (&self->priv->parameters, g_hash_table_unref); g_slist_foreach (self->priv->conn_waiting_sig, (GFunc) conn_waiting_sig_free, NULL); tp_clear_pointer (&self->priv->conn_waiting_sig, g_slist_free); g_slist_foreach (self->priv->sig_waiting_conn, (GFunc) sig_waiting_conn_free, NULL); tp_clear_pointer (&self->priv->sig_waiting_conn, g_slist_free); if (self->priv->tube_connections != NULL) { GHashTableIter iter; gpointer conn; g_hash_table_iter_init (&iter, self->priv->tube_connections); while (g_hash_table_iter_next (&iter, NULL, &conn)) { g_object_weak_unref (conn, remote_connection_destroyed_cb, self); } g_hash_table_unref (self->priv->tube_connections); self->priv->tube_connections = NULL; } if (self->priv->address != NULL) { #ifdef HAVE_GIO_UNIX /* check if we need to remove the temporary file we created */ if (G_IS_UNIX_SOCKET_ADDRESS (self->priv->address)) { const gchar *path; path = g_unix_socket_address_get_path ( G_UNIX_SOCKET_ADDRESS (self->priv->address)); g_unlink (path); } #endif /* HAVE_GIO_UNIX */ g_object_unref (self->priv->address); self->priv->address = NULL; } tp_clear_pointer (&self->priv->access_control_param, tp_g_value_slice_free); tp_clear_object (&self->priv->local_conn_waiting_id); tp_clear_object (&self->priv->client_socket); G_OBJECT_CLASS (tp_stream_tube_channel_parent_class)->dispose (obj); } static void tp_stream_tube_channel_get_property (GObject *object, guint property_id, GValue *value, GParamSpec *pspec) { TpStreamTubeChannel *self = (TpStreamTubeChannel *) object; switch (property_id) { case PROP_SERVICE: g_value_set_string (value, tp_stream_tube_channel_get_service (self)); break; case PROP_PARAMETERS: g_value_set_boxed (value, self->priv->parameters); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; } } static void connection_closed_cb (TpChannel *channel, guint connection_id, const gchar *err, const gchar *message, gpointer user_data, GObject *weak_object) { TpStreamTubeChannel *self = (TpStreamTubeChannel *) weak_object; TpStreamTubeConnection *tube_conn; GError *error = NULL; DEBUG ("Got ConnectionClosed signal on connection %u: %s (%s)", connection_id, err, message); tube_conn = g_hash_table_lookup (self->priv->tube_connections, GUINT_TO_POINTER (connection_id)); if (tube_conn == NULL) { DEBUG ("No connection with ID %u; ignoring", connection_id); return; } tp_proxy_dbus_error_to_gerror (self, err, message, &error); _tp_stream_tube_connection_fire_closed (tube_conn, error); g_error_free (error); } static void tp_stream_tube_channel_constructed (GObject *obj) { TpStreamTubeChannel *self = (TpStreamTubeChannel *) obj; void (*chain_up) (GObject *) = ((GObjectClass *) tp_stream_tube_channel_parent_class)->constructed; TpChannel *chan = (TpChannel *) obj; GHashTable *props; GError *err = NULL; if (chain_up != NULL) chain_up (obj); if (tp_channel_get_channel_type_id (chan) != TP_IFACE_QUARK_CHANNEL_TYPE_STREAM_TUBE) { GError error = { TP_DBUS_ERRORS, TP_DBUS_ERROR_INCONSISTENT, "Channel is not a stream tube" }; DEBUG ("Channel is not a stream tube: %s", tp_channel_get_channel_type ( chan)); tp_proxy_invalidate (TP_PROXY (self), &error); return; } props = tp_channel_borrow_immutable_properties (TP_CHANNEL (self)); if (tp_asv_get_string (props, TP_PROP_CHANNEL_TYPE_STREAM_TUBE_SERVICE) == NULL) { GError error = { TP_DBUS_ERRORS, TP_DBUS_ERROR_INCONSISTENT, "Tube doesn't have StreamTube.Service property" }; DEBUG ("%s", error.message); tp_proxy_invalidate (TP_PROXY (self), &error); return; } /* Tube.Parameters is immutable for incoming tubes. For outgoing ones, * it's defined when offering the tube. */ if (!tp_channel_get_requested (TP_CHANNEL (self))) { GHashTable *params; params = tp_asv_get_boxed (props, TP_PROP_CHANNEL_INTERFACE_TUBE_PARAMETERS, TP_HASH_TYPE_STRING_VARIANT_MAP); if (params == NULL) { DEBUG ("Incoming tube doesn't have Tube.Parameters property"); self->priv->parameters = tp_asv_new (NULL, NULL); } else { self->priv->parameters = g_boxed_copy ( TP_HASH_TYPE_STRING_VARIANT_MAP, params); } } tp_cli_channel_type_stream_tube_connect_to_connection_closed ( TP_CHANNEL (self), connection_closed_cb, NULL, NULL, G_OBJECT (self), &err); if (err != NULL) { DEBUG ("Failed to connect to ConnectionClosed signal: %s", err->message); g_error_free (err); } } static void tp_stream_tube_channel_class_init (TpStreamTubeChannelClass *klass) { GObjectClass *gobject_class = G_OBJECT_CLASS (klass); GParamSpec *param_spec; gobject_class->constructed = tp_stream_tube_channel_constructed; gobject_class->get_property = tp_stream_tube_channel_get_property; gobject_class->dispose = tp_stream_tube_channel_dispose; /** * TpStreamTubeChannel:service: * * A string representing the service name that will be used over the tube. * * Since: 0.13.2 */ param_spec = g_param_spec_string ("service", "Service", "The service of the stream tube", NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS); g_object_class_install_property (gobject_class, PROP_SERVICE, param_spec); /** * TpStreamTubeChannel:parameters: * * A string to #GValue #GHashTable representing the parameters of the tube. * * Will be %NULL for outgoing tubes until the tube has been offered. * * Since: 0.13.2 */ param_spec = g_param_spec_boxed ("parameters", "Parameters", "The parameters of the stream tube", TP_HASH_TYPE_STRING_VARIANT_MAP, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS); g_object_class_install_property (gobject_class, PROP_PARAMETERS, param_spec); /** * TpStreamTubeChannel::incoming * @self: the #TpStreamTubeChannel * @tube_connection: the #TpStreamTubeConnection for the connection * * The ::incoming signal is emitted on offered Tubes when a new incoming * connection is made from a remote user (one accepting the Tube). * * Consumers of this signal must take their own references to * @tube_connection */ _signals[INCOMING] = g_signal_new ("incoming", G_OBJECT_CLASS_TYPE (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, TP_TYPE_STREAM_TUBE_CONNECTION); g_type_class_add_private (gobject_class, sizeof (TpStreamTubeChannelPrivate)); } static void tp_stream_tube_channel_init (TpStreamTubeChannel *self) { self->priv = G_TYPE_INSTANCE_GET_PRIVATE ((self), TP_TYPE_STREAM_TUBE_CHANNEL, TpStreamTubeChannelPrivate); self->priv->tube_connections = g_hash_table_new (NULL, NULL); } /** * tp_stream_tube_channel_new: * @conn: a #TpConnection; may not be %NULL * @object_path: the object path of the channel; may not be %NULL * @immutable_properties: (transfer none) (element-type utf8 GObject.Value): * the immutable properties of the channel, * as signalled by the NewChannel D-Bus signal or returned by the * CreateChannel and EnsureChannel D-Bus methods: a mapping from * strings (D-Bus interface name + "." + property name) to #GValue instances * @error: used to indicate the error if %NULL is returned * * Convenient function to create a new #TpStreamTubeChannel * * Returns: (transfer full): a newly created #TpStreamTubeChannel * * Since: 0.13.2 */ TpStreamTubeChannel * tp_stream_tube_channel_new (TpConnection *conn, const gchar *object_path, const GHashTable *immutable_properties, GError **error) { TpProxy *conn_proxy = (TpProxy *) conn; g_return_val_if_fail (TP_IS_CONNECTION (conn), NULL); g_return_val_if_fail (object_path != NULL, NULL); g_return_val_if_fail (immutable_properties != NULL, NULL); if (!tp_dbus_check_valid_object_path (object_path, error)) return NULL; return g_object_new (TP_TYPE_STREAM_TUBE_CHANNEL, "connection", conn, "dbus-daemon", conn_proxy->dbus_daemon, "bus-name", conn_proxy->bus_name, "object-path", object_path, "handle-type", (guint) TP_UNKNOWN_HANDLE_TYPE, "channel-properties", immutable_properties, NULL); } static void operation_failed (TpStreamTubeChannel *self, const GError *error) { g_simple_async_result_set_from_error (self->priv->result, error); g_simple_async_result_complete (self->priv->result); tp_clear_object (&self->priv->result); } static void complete_accept_operation (TpStreamTubeChannel *self, TpStreamTubeConnection *tube_conn) { g_simple_async_result_set_op_res_gpointer (self->priv->result, g_object_ref (tube_conn), g_object_unref); g_simple_async_result_complete (self->priv->result); tp_clear_object (&self->priv->result); } static void new_local_connection_with_contact (TpConnection *conn, guint n_contacts, TpContact * const *contacts, guint n_failed, const TpHandle *failed, const GError *in_error, gpointer user_data, GObject *obj) { TpStreamTubeChannel *self = (TpStreamTubeChannel *) obj; TpContact *contact; TpStreamTubeConnection *tube_conn = user_data; if (in_error != NULL) { DEBUG ("Failed to prepare TpContact: %s", in_error->message); return; } if (n_failed > 0) { DEBUG ("Failed to prepare TpContact (InvalidHandle)"); return; } contact = contacts[0]; _tp_stream_tube_connection_set_contact (tube_conn, contact); complete_accept_operation (self, tube_conn); } static void new_local_connection_identified (TpStreamTubeChannel *self, GSocketConnection *conn, guint connection_id) { TpHandle initiator_handle; TpStreamTubeConnection *tube_conn; tube_conn = _tp_stream_tube_connection_new (conn, self); g_hash_table_insert (self->priv->tube_connections, GUINT_TO_POINTER (connection_id), tube_conn); g_object_weak_ref (G_OBJECT (tube_conn), remote_connection_destroyed_cb, self); /* We are accepting a tube so the contact of the connection is the * initiator of the tube */ initiator_handle = tp_channel_get_initiator_handle (TP_CHANNEL (self)); /* Pass ownership of tube_conn to the function */ tp_connection_get_contacts_by_handle ( tp_channel_borrow_connection (TP_CHANNEL (self)), 1, &initiator_handle, 0, NULL, new_local_connection_with_contact, tube_conn, g_object_unref, G_OBJECT (self)); } static void client_socket_connected (TpStreamTubeChannel *self) { GSocketConnection *conn; conn = g_socket_connection_factory_create_connection ( self->priv->client_socket); g_assert (conn); DEBUG ("Stream Tube socket connected"); #ifdef HAVE_GIO_UNIX if (self->priv->access_control == TP_SOCKET_ACCESS_CONTROL_CREDENTIALS) { guchar byte; GError *error = NULL; byte = g_value_get_uchar (self->priv->access_control_param); /* FIXME: we should an async version of this API (bgo #629503) */ if (!tp_unix_connection_send_credentials_with_byte ( conn, byte, NULL, &error)) { DEBUG ("Failed to send credentials: %s", error->message); operation_failed (self, error); g_clear_error (&error); return; } } #endif if (self->priv->local_conn_id_set) { new_local_connection_identified (self, conn, self->priv->local_conn_id); self->priv->local_conn_id_set = FALSE; } else { /* Wait for NewLocalConnection signal */ /* This assume that we never connect more than once. Or at least that we * wait to have identify a connection before making a new connection. */ g_assert (self->priv->local_conn_waiting_id == NULL); self->priv->local_conn_waiting_id = g_object_ref (conn); } g_object_unref (conn); } static gboolean client_socket_cb (GSocket *socket, GIOCondition condition, TpStreamTubeChannel *self) { GError *error = NULL; if (!g_socket_check_connect_result (socket, &error)) { DEBUG ("Failed to connect to socket: %s", error->message); operation_failed (self, error); g_error_free (error); return FALSE; } client_socket_connected (self); return FALSE; } static void new_local_connection_cb (TpChannel *proxy, guint connection_id, gpointer user_data, GObject *weak_object) { TpStreamTubeChannel *self = (TpStreamTubeChannel *) weak_object; if (self->priv->local_conn_waiting_id != NULL) { /* We got the ID of the connection */ new_local_connection_identified (self, self->priv->local_conn_waiting_id, connection_id); tp_clear_object (&self->priv->local_conn_waiting_id); return; } /* Wait that the connection is connected */ self->priv->local_conn_id = connection_id; self->priv->local_conn_id_set = TRUE; } static void _channel_accepted (TpChannel *channel, const GValue *addressv, const GError *in_error, gpointer user_data, GObject *obj) { TpStreamTubeChannel *self = (TpStreamTubeChannel *) obj; GSocketAddress *remote_address; GError *error = NULL; if (in_error != NULL) { DEBUG ("Failed to Accept Stream Tube: %s", in_error->message); operation_failed (self, in_error); return; } tp_cli_channel_type_stream_tube_connect_to_new_local_connection ( TP_CHANNEL (self), new_local_connection_cb, NULL, NULL, G_OBJECT (self), &error); if (error != NULL) { DEBUG ("Failed to connect to NewLocalConnection signal"); operation_failed (self, error); g_error_free (error); return; } remote_address = tp_g_socket_address_from_variant (self->priv->socket_type, addressv, &error); if (error != NULL) { DEBUG ("Failed to convert address: %s", error->message); operation_failed (self, error); g_error_free (error); return; } /* Connect to CM */ g_socket_set_blocking (self->priv->client_socket, FALSE); g_socket_connect (self->priv->client_socket, remote_address, NULL, &error); if (error == NULL) { /* Socket is connected */ client_socket_connected (self); goto out; } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_PENDING)) { /* We have to wait that the socket is connected */ GSource *source; source = g_socket_create_source (self->priv->client_socket, G_IO_OUT, NULL); g_source_attach (source, g_main_context_get_thread_default ()); g_source_set_callback (source, (GSourceFunc) client_socket_cb, self, NULL); g_error_free (error); g_source_unref (source); } else { DEBUG ("Failed to connect to CM: %s", error->message); operation_failed (self, error); g_error_free (error); } out: g_object_unref (remote_address); } /** * tp_stream_tube_channel_accept_async: * @self: an incoming #TpStreamTubeChannel * @callback: a callback to call when the tube has been accepted * @user_data: data to pass to @callback * * Accept an incoming stream tube. When the tube has been accepted, @callback * will be called. You can then call tp_stream_tube_channel_accept_finish() * to get a #TpStreamTubeConnection connected to the tube. * * Since: 0.13.2 */ void tp_stream_tube_channel_accept_async (TpStreamTubeChannel *self, GAsyncReadyCallback callback, gpointer user_data) { GHashTable *properties; GHashTable *supported_sockets; GError *error = NULL; g_return_if_fail (TP_IS_STREAM_TUBE_CHANNEL (self)); g_return_if_fail (self->priv->result == NULL); if (self->priv->access_control_param != NULL) { g_simple_async_report_error_in_idle (G_OBJECT (self), callback, user_data, TP_ERRORS, TP_ERROR_INVALID_ARGUMENT, "Tube has already be accepted"); return; } self->priv->result = g_simple_async_result_new (G_OBJECT (self), callback, user_data, tp_stream_tube_channel_accept_async); properties = tp_channel_borrow_immutable_properties (TP_CHANNEL (self)); supported_sockets = tp_asv_get_boxed (properties, TP_PROP_CHANNEL_TYPE_STREAM_TUBE_SUPPORTED_SOCKET_TYPES, TP_HASH_TYPE_SUPPORTED_SOCKET_MAP); if (!_tp_set_socket_address_type_and_access_control_type (supported_sockets, &self->priv->socket_type, &self->priv->access_control, &error)) { operation_failed (self, error); g_clear_error (&error); return; } DEBUG ("Using socket type %u with access control %u", self->priv->socket_type, self->priv->access_control); self->priv->client_socket = _tp_create_client_socket (self->priv->socket_type, &error); if (error != NULL) { DEBUG ("Failed to create socket: %s", error->message); operation_failed (self, error); g_clear_error (&error); return; } switch (self->priv->access_control) { case TP_SOCKET_ACCESS_CONTROL_LOCALHOST: /* Put a dummy value */ self->priv->access_control_param = tp_g_value_slice_new_uint (0); break; case TP_SOCKET_ACCESS_CONTROL_PORT: { GSocketAddress *addr; guint16 port; addr = g_socket_get_local_address (self->priv->client_socket, &error); if (addr == NULL) { DEBUG ("Failed to get local address of client socket: %s", error->message); operation_failed (self, error); g_error_free (error); return; } port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (addr)); self->priv->access_control_param = tp_g_value_slice_new_uint (port); g_object_unref (addr); } break; case TP_SOCKET_ACCESS_CONTROL_CREDENTIALS: self->priv->access_control_param = tp_g_value_slice_new_byte ( g_random_int_range (0, G_MAXUINT8)); break; default: g_assert_not_reached (); } /* Call Accept */ tp_cli_channel_type_stream_tube_call_accept (TP_CHANNEL (self), -1, self->priv->socket_type, self->priv->access_control, self->priv->access_control_param, _channel_accepted, NULL, NULL, G_OBJECT (self)); } /** * tp_stream_tube_channel_accept_finish: * @self: a #TpStreamTubeChannel * @result: a #GAsyncResult * @error: a #GError to fill * * Finishes to accept an incoming stream tube. The returned * #TpStreamTubeConnection can then be used to exchange data through the tube. * * Returns: (transfer full): a newly created #TpStreamTubeConnection * * Since: 0.13.2 */ TpStreamTubeConnection * tp_stream_tube_channel_accept_finish (TpStreamTubeChannel *self, GAsyncResult *result, GError **error) { _tp_implement_finish_return_copy_pointer (self, tp_stream_tube_channel_accept_async, g_object_ref) } static void _new_remote_connection_with_contact (TpConnection *conn, guint n_contacts, TpContact * const *contacts, guint n_failed, const TpHandle *failed, const GError *in_error, gpointer user_data, GObject *obj) { TpStreamTubeChannel *self = (TpStreamTubeChannel *) obj; TpContact *contact; TpStreamTubeConnection *tube_conn = user_data; if (in_error != NULL) { DEBUG ("Failed to prepare TpContact: %s", in_error->message); return; } if (n_failed > 0) { DEBUG ("Failed to prepare TpContact (InvalidHandle)"); return; } contact = contacts[0]; _tp_stream_tube_connection_set_contact (tube_conn, contact); DEBUG ("Accepting incoming GIOStream from %s", tp_contact_get_identifier (contact)); g_signal_emit (self, _signals[INCOMING], 0, tube_conn); /* anyone receiving the signal is required to hold their own reference */ } static gboolean sig_match_conn (TpStreamTubeChannel *self, SigWaitingConn *sig, ConnWaitingSig *c) { if (self->priv->access_control == TP_SOCKET_ACCESS_CONTROL_PORT) { /* Use the port to identify the connection */ guint port; GSocketAddress *address; GError *error = NULL; address = g_socket_connection_get_remote_address (c->conn, &error); if (address == NULL) { DEBUG ("Failed to get connection address: %s", error->message); g_error_free (error); return FALSE; } dbus_g_type_struct_get (sig->param, 1, &port, G_MAXINT); if (port == g_inet_socket_address_get_port ( G_INET_SOCKET_ADDRESS (address))) { DEBUG ("Identified connection %u using port %u", port, sig->connection_id); g_object_unref (address); return TRUE; } g_object_unref (address); } else if (self->priv->access_control == TP_SOCKET_ACCESS_CONTROL_CREDENTIALS) { guchar byte; byte = g_value_get_uchar (sig->param); return byte == c->byte; } else { DEBUG ("Can't properly identify connection as we are using " "access control %u. Assume it's the head of the list", self->priv->access_control); return TRUE; } return FALSE; } static gboolean can_identify_contact (TpStreamTubeChannel *self) { TpHandleType handle_type; tp_channel_get_handle (TP_CHANNEL (self), &handle_type); /* With contact stream tube, it's always the same contact connecting to the * tube */ if (handle_type == TP_HANDLE_TYPE_CONTACT) return TRUE; /* Room stream tube, we need either the Credentials or Port access control * to properly identify connections. */ if (self->priv->access_control == TP_SOCKET_ACCESS_CONTROL_CREDENTIALS || self->priv->access_control == TP_SOCKET_ACCESS_CONTROL_PORT) return TRUE; return FALSE; } static void connection_identified (TpStreamTubeChannel *self, GSocketConnection *conn, TpHandle handle, guint connection_id) { TpStreamTubeConnection *tube_conn; tube_conn = _tp_stream_tube_connection_new (conn, self); g_hash_table_insert (self->priv->tube_connections, GUINT_TO_POINTER (connection_id), tube_conn); g_object_weak_ref (G_OBJECT (tube_conn), remote_connection_destroyed_cb, self); if (can_identify_contact (self)) { /* Pass the ref on tube_conn to the function */ tp_connection_get_contacts_by_handle ( tp_channel_borrow_connection (TP_CHANNEL (self)), 1, &handle, 0, NULL, _new_remote_connection_with_contact, tube_conn, g_object_unref, G_OBJECT (self)); } else { g_signal_emit (self, _signals[INCOMING], 0, tube_conn); g_object_unref (tube_conn); } } static void stream_tube_connection_closed_cb (GObject *source, GAsyncResult *result, gpointer user_data) { GError *error = NULL; if (!g_io_stream_close_finish (G_IO_STREAM (source), result, &error)) { DEBUG ("Failed to close connection: %s", error->message); g_error_free (error); return; } } static void connection_rejected (TpStreamTubeChannel *self, GSocketConnection *conn, TpHandle handle, guint connection_id) { DEBUG ("Reject connection %u with contact %u", connection_id, handle); g_io_stream_close_async (G_IO_STREAM (conn), G_PRIORITY_DEFAULT, NULL, stream_tube_connection_closed_cb, self); } static void _new_remote_connection (TpChannel *channel, guint handle, const GValue *param, guint connection_id, gpointer user_data, GObject *obj) { TpStreamTubeChannel *self = (TpStreamTubeChannel *) obj; GSList *l; ConnWaitingSig *found_conn = NULL; SigWaitingConn *sig; TpHandle chan_handle; TpHandleType handle_type; gboolean rejected = FALSE; chan_handle = tp_channel_get_handle (channel, &handle_type); if (handle_type == TP_HANDLE_TYPE_CONTACT && handle != chan_handle) { DEBUG ("CM claimed that handle %u connected to the stream tube, " "but as a contact stream tube we should only get connection from " "handle %u", handle, chan_handle); rejected = TRUE; } sig = sig_waiting_conn_new (handle, param, connection_id, rejected); for (l = self->priv->conn_waiting_sig; l != NULL && found_conn == NULL; l = g_slist_next (l)) { ConnWaitingSig *conn = l->data; if (sig_match_conn (self, sig, conn)) found_conn = conn; } if (found_conn == NULL) { DEBUG ("Didn't find any connection for %u. Waiting for more", connection_id); /* Pass ownership of sig to the list */ self->priv->sig_waiting_conn = g_slist_append ( self->priv->sig_waiting_conn, sig); return; } /* We found a connection */ self->priv->conn_waiting_sig = g_slist_remove ( self->priv->conn_waiting_sig, found_conn); if (rejected) connection_rejected (self, found_conn->conn, handle, connection_id); else connection_identified (self, found_conn->conn, handle, connection_id); sig_waiting_conn_free (sig); conn_waiting_sig_free (found_conn); } static void _channel_offered (TpChannel *channel, const GError *in_error, gpointer user_data, GObject *obj) { TpStreamTubeChannel *self = (TpStreamTubeChannel *) obj; if (in_error != NULL) { DEBUG ("Failed to Offer Stream Tube: %s", in_error->message); operation_failed (self, in_error); return; } DEBUG ("Stream Tube offered"); g_simple_async_result_complete (self->priv->result); tp_clear_object (&self->priv->result); } static void _offer_with_address (TpStreamTubeChannel *self, GHashTable *params) { GValue *addressv = NULL; GError *error = NULL; addressv = tp_address_variant_from_g_socket_address (self->priv->address, &self->priv->socket_type, &error); if (error != NULL) { operation_failed (self, error); g_clear_error (&error); goto finally; } /* Connect the NewRemoteConnection signal */ tp_cli_channel_type_stream_tube_connect_to_new_remote_connection ( TP_CHANNEL (self), _new_remote_connection, NULL, NULL, G_OBJECT (self), &error); if (error != NULL) { operation_failed (self, error); g_clear_error (&error); goto finally; } g_assert (self->priv->parameters == NULL); if (params != NULL) self->priv->parameters = g_hash_table_ref (params); else self->priv->parameters = tp_asv_new (NULL, NULL); g_object_notify (G_OBJECT (self), "parameters"); /* Call Offer */ tp_cli_channel_type_stream_tube_call_offer (TP_CHANNEL (self), -1, self->priv->socket_type, addressv, self->priv->access_control, self->priv->parameters, _channel_offered, NULL, NULL, G_OBJECT (self)); finally: if (addressv != NULL) tp_g_value_slice_free (addressv); } static SigWaitingConn * find_sig_for_conn (TpStreamTubeChannel *self, ConnWaitingSig *c) { GSList *l; for (l = self->priv->sig_waiting_conn; l != NULL; l = g_slist_next (l)) { SigWaitingConn *sig = l->data; if (sig_match_conn (self, sig, c)) return sig; } return NULL; } static void service_incoming_cb (GSocketService *service, GSocketConnection *conn, GObject *source_object, gpointer user_data) { TpStreamTubeChannel *self = user_data; SigWaitingConn *sig; ConnWaitingSig *c; guchar byte = 0; DEBUG ("New incoming connection"); #ifdef HAVE_GIO_UNIX /* Check the credentials if needed */ if (self->priv->access_control == TP_SOCKET_ACCESS_CONTROL_CREDENTIALS) { GCredentials *creds; uid_t uid; GError *error = NULL; /* FIXME: we should an async version of this API (bgo #629503) */ creds = tp_unix_connection_receive_credentials_with_byte ( conn, &byte, NULL, &error); if (creds == NULL) { DEBUG ("Failed to receive credentials: %s", error->message); g_error_free (error); return; } uid = g_credentials_get_unix_user (creds, &error); g_object_unref (creds); if (uid != geteuid ()) { DEBUG ("Wrong credentials received (user: %u)", uid); return; } } #endif c = conn_waiting_sig_new (conn, byte); sig = find_sig_for_conn (self, c); if (sig == NULL) { DEBUG ("Can't identify the connection, wait for NewRemoteConnection sig"); /* Pass ownership to the list */ self->priv->conn_waiting_sig = g_slist_append ( self->priv->conn_waiting_sig, c); return; } /* Connection has been identified */ self->priv->sig_waiting_conn = g_slist_remove (self->priv->sig_waiting_conn, sig); if (sig->rejected) connection_rejected (self, conn, sig->handle, sig->connection_id); else connection_identified (self, conn, sig->handle, sig->connection_id); sig_waiting_conn_free (sig); conn_waiting_sig_free (c); } /** * tp_stream_tube_channel_offer_async: * @self: an outgoing #TpStreamTubeChannel * @params: (allow-none) (transfer none): parameters of the tube, or %NULL * @callback: a callback to call when the tube has been offered * @user_data: data to pass to @callback * * Offer an outgoing stream tube. When the tube has been offered, @callback * will be called. You can then call tp_stream_tube_channel_offer_finish() * to get the result of the operation. * * You have to connect to the #TpStreamTubeChannel::incoming signal to get a * #TpStreamTubeConnection each time a contact establishes a connection to * the tube. * * Since: 0.13.2 */ void tp_stream_tube_channel_offer_async (TpStreamTubeChannel *self, GHashTable *params, GAsyncReadyCallback callback, gpointer user_data) { GHashTable *properties; GHashTable *supported_sockets; GError *error = NULL; g_return_if_fail (TP_IS_STREAM_TUBE_CHANNEL (self)); g_return_if_fail (self->priv->result == NULL); g_return_if_fail (tp_channel_get_requested (TP_CHANNEL (self))); if (self->priv->service != NULL) { g_critical ("Can't reoffer Tube!"); return; } self->priv->result = g_simple_async_result_new (G_OBJECT (self), callback, user_data, tp_stream_tube_channel_offer_async); properties = tp_channel_borrow_immutable_properties (TP_CHANNEL (self)); supported_sockets = tp_asv_get_boxed (properties, TP_PROP_CHANNEL_TYPE_STREAM_TUBE_SUPPORTED_SOCKET_TYPES, TP_HASH_TYPE_SUPPORTED_SOCKET_MAP); if (!_tp_set_socket_address_type_and_access_control_type (supported_sockets, &self->priv->socket_type, &self->priv->access_control, &error)) { operation_failed (self, error); g_clear_error (&error); return; } DEBUG ("Using socket type %u with access control %u", self->priv->socket_type, self->priv->access_control); self->priv->service = g_socket_service_new (); switch (self->priv->socket_type) { #ifdef HAVE_GIO_UNIX case TP_SOCKET_ADDRESS_TYPE_UNIX: { self->priv->address = _tp_create_temp_unix_socket ( self->priv->service, &error); /* check there wasn't an error on the final attempt */ if (self->priv->address == NULL) { operation_failed (self, error); g_clear_error (&error); return; } } break; #endif /* HAVE_GIO_UNIX */ case TP_SOCKET_ADDRESS_TYPE_IPV4: case TP_SOCKET_ADDRESS_TYPE_IPV6: { GInetAddress *localhost; GSocketAddress *in_address; localhost = g_inet_address_new_loopback ( self->priv->socket_type == TP_SOCKET_ADDRESS_TYPE_IPV4 ? G_SOCKET_FAMILY_IPV4 : G_SOCKET_FAMILY_IPV6); in_address = g_inet_socket_address_new (localhost, 0); g_socket_listener_add_address ( G_SOCKET_LISTENER (self->priv->service), in_address, G_SOCKET_TYPE_STREAM, G_SOCKET_PROTOCOL_DEFAULT, NULL, &self->priv->address, &error); g_object_unref (localhost); g_object_unref (in_address); if (error != NULL) { operation_failed (self, error); g_clear_error (&error); return; } break; } default: /* should have already errored */ g_assert_not_reached (); break; } tp_g_signal_connect_object (self->priv->service, "incoming", G_CALLBACK (service_incoming_cb), self, 0); g_socket_service_start (self->priv->service); _offer_with_address (self, params); } /** * tp_stream_tube_channel_offer_finish: * @self: a #TpStreamTubeChannel * @result: a #GAsyncResult * @error: a #GError to fill * * Finishes to offer an outgoing stream tube. * * Returns: %TRUE when a Tube has been successfully offered; %FALSE otherwise * * Since: 0.13.2 */ gboolean tp_stream_tube_channel_offer_finish (TpStreamTubeChannel *self, GAsyncResult *result, GError **error) { _tp_implement_finish_void (self, tp_stream_tube_channel_offer_async) } /** * tp_stream_tube_channel_get_service: (skip) * @self: a #TpStreamTubeChannel * * Return the #TpStreamTubeChannel:service property * * Returns: (transfer none): the value of #TpStreamTubeChannel:service * * Since: 0.13.2 */ const gchar * tp_stream_tube_channel_get_service (TpStreamTubeChannel *self) { GHashTable *props; props = tp_channel_borrow_immutable_properties (TP_CHANNEL (self)); return tp_asv_get_string (props, TP_PROP_CHANNEL_TYPE_STREAM_TUBE_SERVICE); } /** * tp_stream_tube_channel_get_parameters: (skip) * @self: a #TpStreamTubeChannel * * Return the #TpStreamTubeChannel:parameters property * * Returns: (transfer none) (element-type utf8 GObject.Value): * the value of #TpStreamTubeChannel:parameters * * Since: 0.13.2 */ GHashTable * tp_stream_tube_channel_get_parameters (TpStreamTubeChannel *self) { return self->priv->parameters; }