From f3e93d4ee4a0479bfd4bceca9c2509a2b370a1b3 Mon Sep 17 00:00:00 2001 From: Elliott Sales de Andrade Date: Mon, 18 May 2020 19:58:19 -0400 Subject: simple: Convert TCP writing to gio. --- libpurple/protocols/simple/simple.c | 111 +++++++++++++++--------------------- libpurple/protocols/simple/simple.h | 2 +- 2 files changed, 48 insertions(+), 65 deletions(-) diff --git a/libpurple/protocols/simple/simple.c b/libpurple/protocols/simple/simple.c index 0e62346ce1..35aeac81d8 100644 --- a/libpurple/protocols/simple/simple.c +++ b/libpurple/protocols/simple/simple.c @@ -474,38 +474,26 @@ static void fill_auth(struct simple_account_data *sip, const gchar *hdr, struct } -static void simple_canwrite_cb(gpointer data, gint source, PurpleInputCondition cond) { - PurpleConnection *gc = data; - struct simple_account_data *sip = purple_connection_get_protocol_data(gc); - gsize max_write; - gssize written; - const gchar *output = NULL; - - max_write = purple_circular_buffer_get_max_read(sip->txbuf); - - if(max_write == 0) { - purple_input_remove(sip->tx_handler); - sip->tx_handler = 0; - return; - } +static void +simple_push_bytes_cb(GObject *sender, GAsyncResult *res, gpointer data) +{ + PurpleQueuedOutputStream *stream = PURPLE_QUEUED_OUTPUT_STREAM(sender); + struct simple_account_data *sip = data; + gboolean result; + GError *error = NULL; - output = purple_circular_buffer_get_output(sip->txbuf); + result = purple_queued_output_stream_push_bytes_finish(stream, res, &error); - written = write(sip->fd, output, max_write); + if (!result) { + purple_queued_output_stream_clear_queue(stream); - if(written < 0 && errno == EAGAIN) - written = 0; - else if (written <= 0) { - /*TODO: do we really want to disconnect on a failure to write?*/ - gchar *tmp = g_strdup_printf(_("Lost connection with server: %s"), - g_strerror(errno)); - purple_connection_error(gc, - PURPLE_CONNECTION_ERROR_NETWORK_ERROR, tmp); - g_free(tmp); - return; + if (g_error_matches(error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + g_error_free(error); + } else { + g_prefix_error(&error, "%s", _("Lost connection with server: ")); + purple_connection_take_error(sip->gc, error); + } } - - purple_circular_buffer_mark_read(sip->txbuf, written); } static void simple_input_cb(gpointer data, gint source, PurpleInputCondition cond); @@ -519,6 +507,7 @@ send_later_cb(GObject *sender, GAsyncResult *res, gpointer data) GSocketConnection *sockconn; GSocket *socket; gint fd; + gsize writelen; GError *error = NULL; sockconn = g_socket_client_connect_to_host_finish(G_SOCKET_CLIENT(sender), @@ -538,14 +527,25 @@ send_later_cb(GObject *sender, GAsyncResult *res, gpointer data) sip = purple_connection_get_protocol_data(gc); sip->fd = fd; + sip->output = purple_queued_output_stream_new( + g_io_stream_get_output_stream(G_IO_STREAM(sockconn))); sip->connecting = FALSE; - simple_canwrite_cb(gc, sip->fd, PURPLE_INPUT_WRITE); + writelen = purple_circular_buffer_get_max_read(sip->txbuf); + if (writelen != 0) { + const gchar *buf; + GBytes *output; - /* If there is more to write now, we need to register a handler */ - if(purple_circular_buffer_get_used(sip->txbuf) > 0) - sip->tx_handler = purple_input_add(sip->fd, PURPLE_INPUT_WRITE, - simple_canwrite_cb, gc); + buf = purple_circular_buffer_get_output(sip->txbuf); + + output = g_bytes_new(buf, writelen); + purple_queued_output_stream_push_bytes_async( + sip->output, output, G_PRIORITY_DEFAULT, sip->cancellable, + simple_push_bytes_cb, sip); + g_bytes_unref(output); + + purple_circular_buffer_mark_read(sip->txbuf, writelen); + } conn = connection_create(sip, sockconn, fd); conn->inputhandler = purple_input_add(sip->fd, PURPLE_INPUT_READ, simple_input_cb, gc); @@ -591,39 +591,18 @@ static void sendout_pkt(PurpleConnection *gc, const char *buf) { purple_debug_info("simple", "could not send packet\n"); } } else { - int ret; - if(sip->fd < 0) { - sendlater(gc, buf); - return; - } - - if(sip->tx_handler) { - ret = -1; - errno = EAGAIN; - } else - ret = write(sip->fd, buf, writelen); + GBytes *output; - if (ret < 0 && errno == EAGAIN) - ret = 0; - else if(ret <= 0) { /* XXX: When does this happen legitimately? */ + if (sip->output == NULL) { sendlater(gc, buf); return; } - if (ret < writelen) { - if(!sip->tx_handler) - sip->tx_handler = purple_input_add(sip->fd, - PURPLE_INPUT_WRITE, simple_canwrite_cb, - gc); - - /* XXX: is it OK to do this? You might get part of a request sent - with part of another. */ - if(purple_circular_buffer_get_used(sip->txbuf) > 0) - purple_circular_buffer_append(sip->txbuf, "\r\n", 2); - - purple_circular_buffer_append(sip->txbuf, buf + ret, - writelen - ret); - } + output = g_bytes_new(buf, writelen); + purple_queued_output_stream_push_bytes_async( + sip->output, output, G_PRIORITY_DEFAULT, sip->cancellable, + simple_push_bytes_cb, sip); + g_bytes_unref(output); } } @@ -1792,8 +1771,11 @@ static void simple_input_cb(gpointer data, gint source, PurpleInputCondition con return; else if(len <= 0) { purple_debug_info("simple", "simple_input_cb: read error\n"); + if (sip->fd == source) { + sip->fd = -1; + g_clear_object(&sip->output); + } connection_remove(sip, source); - if(sip->fd == source) sip->fd = -1; return; } purple_connection_update_last_received(gc); @@ -1855,6 +1837,8 @@ login_cb(GObject *sender, GAsyncResult *res, gpointer data) sip = purple_connection_get_protocol_data(gc); sip->fd = fd; + sip->output = purple_queued_output_stream_new( + g_io_stream_get_output_stream(G_IO_STREAM(sockconn))); conn = connection_create(sip, sockconn, fd); @@ -2128,8 +2112,6 @@ static void simple_close(PurpleConnection *gc) if (sip->listenpa) purple_input_remove(sip->listenpa); - if (sip->tx_handler) - purple_input_remove(sip->tx_handler); if (sip->resendtimeout) g_source_remove(sip->resendtimeout); if (sip->registertimeout) @@ -2146,6 +2128,7 @@ static void simple_close(PurpleConnection *gc) if (sip->listen_data != NULL) purple_network_listen_cancel(sip->listen_data); + g_clear_object(&sip->output); if (sip->fd >= 0) close(sip->fd); diff --git a/libpurple/protocols/simple/simple.h b/libpurple/protocols/simple/simple.h index bf52a7d754..55d596d30d 100644 --- a/libpurple/protocols/simple/simple.h +++ b/libpurple/protocols/simple/simple.h @@ -94,6 +94,7 @@ struct simple_account_data { gchar *username; gchar *password; GCancellable *cancellable; + PurpleQueuedOutputStream *output; GSocketService *service; PurpleNetworkListenData *listen_data; int fd; @@ -112,7 +113,6 @@ struct simple_account_data { gboolean connecting; PurpleAccount *account; PurpleCircularBuffer *txbuf; - guint tx_handler; gchar *regcallid; GSList *transactions; GSList *watcher; -- cgit v1.2.1 From 26bbace872ea40aa49e341088391bda84f891d2b Mon Sep 17 00:00:00 2001 From: Elliott Sales de Andrade Date: Mon, 18 May 2020 23:34:20 -0400 Subject: simple: Convert TCP reading to gio. --- libpurple/protocols/simple/simple.c | 143 ++++++++++++++++++++---------------- libpurple/protocols/simple/simple.h | 1 + 2 files changed, 82 insertions(+), 62 deletions(-) diff --git a/libpurple/protocols/simple/simple.c b/libpurple/protocols/simple/simple.c index 35aeac81d8..cf879729c9 100644 --- a/libpurple/protocols/simple/simple.c +++ b/libpurple/protocols/simple/simple.c @@ -164,17 +164,6 @@ static void simple_set_status(PurpleAccount *account, PurpleStatus *status) { } } -static struct sip_connection *connection_find(struct simple_account_data *sip, int fd) { - struct sip_connection *ret = NULL; - GSList *entry = sip->openconns; - while(entry) { - ret = entry->data; - if(ret->fd == fd) return ret; - entry = entry->next; - } - return NULL; -} - static struct simple_watcher *watcher_find(struct simple_account_data *sip, const gchar *name) { struct simple_watcher *watcher; @@ -211,12 +200,10 @@ watcher_destroy(struct simple_watcher *watcher) } static struct sip_connection * -connection_create(struct simple_account_data *sip, GSocketConnection *sockconn, - int fd) +connection_create(struct simple_account_data *sip, GSocketConnection *sockconn) { struct sip_connection *ret = g_new0(struct sip_connection, 1); ret->sockconn = sockconn; - ret->fd = fd; sip->openconns = g_slist_append(sip->openconns, ret); return ret; } @@ -225,17 +212,32 @@ static void connection_destroy(struct sip_connection *conn) { if (conn->inputhandler) { - purple_input_remove(conn->inputhandler); + g_source_remove(conn->inputhandler); } g_clear_pointer(&conn->inbuf, g_free); g_clear_object(&conn->sockconn); g_free(conn); } +static struct sip_connection * +connection_find(struct simple_account_data *sip, GInputStream *input) +{ + struct sip_connection *ret = NULL; + GSList *entry = sip->openconns; + while (entry) { + ret = entry->data; + if (g_io_stream_get_input_stream(G_IO_STREAM(ret->sockconn)) == input) { + return ret; + } + entry = entry->next; + } + return NULL; +} + static void -connection_remove(struct simple_account_data *sip, int fd) +connection_remove(struct simple_account_data *sip, GInputStream *input) { - struct sip_connection *conn = connection_find(sip, fd); + struct sip_connection *conn = connection_find(sip, input); sip->openconns = g_slist_remove(sip->openconns, conn); connection_destroy(conn); } @@ -496,7 +498,7 @@ simple_push_bytes_cb(GObject *sender, GAsyncResult *res, gpointer data) } } -static void simple_input_cb(gpointer data, gint source, PurpleInputCondition cond); +static gboolean simple_input_cb(GObject *stream, gpointer data); static void send_later_cb(GObject *sender, GAsyncResult *res, gpointer data) @@ -505,9 +507,8 @@ send_later_cb(GObject *sender, GAsyncResult *res, gpointer data) struct simple_account_data *sip; struct sip_connection *conn; GSocketConnection *sockconn; - GSocket *socket; - gint fd; gsize writelen; + GSource *source; GError *error = NULL; sockconn = g_socket_client_connect_to_host_finish(G_SOCKET_CLIENT(sender), @@ -521,12 +522,8 @@ send_later_cb(GObject *sender, GAsyncResult *res, gpointer data) return; } - socket = g_socket_connection_get_socket(sockconn); - g_assert(socket != NULL); - fd = g_socket_get_fd(socket); - sip = purple_connection_get_protocol_data(gc); - sip->fd = fd; + sip->input = g_io_stream_get_input_stream(G_IO_STREAM(sockconn)); sip->output = purple_queued_output_stream_new( g_io_stream_get_output_stream(G_IO_STREAM(sockconn))); sip->connecting = FALSE; @@ -547,8 +544,13 @@ send_later_cb(GObject *sender, GAsyncResult *res, gpointer data) purple_circular_buffer_mark_read(sip->txbuf, writelen); } - conn = connection_create(sip, sockconn, fd); - conn->inputhandler = purple_input_add(sip->fd, PURPLE_INPUT_READ, simple_input_cb, gc); + conn = connection_create(sip, sockconn); + + source = g_pollable_input_stream_create_source( + G_POLLABLE_INPUT_STREAM(sip->input), sip->cancellable); + g_source_set_callback(source, (GSourceFunc)simple_input_cb, gc, NULL); + conn->inputhandler = g_source_attach(source, NULL); + g_source_unref(source); } static void sendlater(PurpleConnection *gc, const char *buf) { @@ -1749,15 +1751,19 @@ static void simple_udp_process(gpointer data, gint source, PurpleInputCondition } } -static void simple_input_cb(gpointer data, gint source, PurpleInputCondition cond) +static gboolean +simple_input_cb(GObject *stream, gpointer data) { + GInputStream *input = G_INPUT_STREAM(stream); PurpleConnection *gc = data; struct simple_account_data *sip = purple_connection_get_protocol_data(gc); - int len; - struct sip_connection *conn = connection_find(sip, source); - if(!conn) { + gssize len; + struct sip_connection *conn = connection_find(sip, input); + GError *error = NULL; + + if (!conn) { purple_debug_error("simple", "Connection not found!\n"); - return; + return G_SOURCE_REMOVE; } if(conn->inbuflen < conn->inbufused + SIMPLE_BUF_INC) { @@ -1765,24 +1771,41 @@ static void simple_input_cb(gpointer data, gint source, PurpleInputCondition con conn->inbuf = g_realloc(conn->inbuf, conn->inbuflen); } - len = read(source, conn->inbuf + conn->inbufused, SIMPLE_BUF_INC - 1); - - if(len < 0 && errno == EAGAIN) - return; - else if(len <= 0) { - purple_debug_info("simple", "simple_input_cb: read error\n"); - if (sip->fd == source) { - sip->fd = -1; + len = g_pollable_input_stream_read_nonblocking( + G_POLLABLE_INPUT_STREAM(stream), conn->inbuf + conn->inbufused, + SIMPLE_BUF_INC - 1, sip->cancellable, &error); + if (len < 0) { + if (error->code == G_IO_ERROR_WOULD_BLOCK) { + g_error_free(error); + return G_SOURCE_CONTINUE; + } else if (error->code != G_IO_ERROR_CANCELLED) { + /* There has been an error reading from the socket */ + purple_debug_info("simple", "simple_input_cb: read error"); + if (sip->input == input) { + g_clear_object(&sip->input); + g_clear_object(&sip->output); + } + connection_remove(sip, input); + } + g_clear_error(&error); + return G_SOURCE_REMOVE; + } else if (len == 0) { /* The other end has closed the socket */ + purple_debug_warning("simple", "simple_input_cb: connection closed"); + if (sip->input == input) { + g_clear_object(&sip->input); g_clear_object(&sip->output); } - connection_remove(sip, source); - return; + connection_remove(sip, input); + return G_SOURCE_REMOVE; } + purple_connection_update_last_received(gc); conn->inbufused += len; conn->inbuf[conn->inbufused] = '\0'; process_input(sip, conn); + + return G_SOURCE_CONTINUE; } /* Callback for new connections on incoming TCP port */ @@ -1794,19 +1817,15 @@ simple_newconn_cb(G_GNUC_UNUSED GSocketService *service, PurpleConnection *gc = PURPLE_CONNECTION(source_object); struct simple_account_data *sip = purple_connection_get_protocol_data(gc); struct sip_connection *conn; - GSocket *socket; - gint fd; - - socket = g_socket_connection_get_socket(connection); - g_assert(socket != NULL); - fd = g_socket_get_fd(socket); + GSource *source; - _purple_network_set_common_socket_flags(fd); + conn = connection_create(sip, g_object_ref(connection)); - conn = connection_create(sip, g_object_ref(connection), fd); - - conn->inputhandler = - purple_input_add(fd, PURPLE_INPUT_READ, simple_input_cb, gc); + source = g_pollable_input_stream_create_source( + G_POLLABLE_INPUT_STREAM(sip->input), sip->cancellable); + g_source_set_callback(source, (GSourceFunc)simple_input_cb, gc, NULL); + conn->inputhandler = g_source_attach(source, NULL); + g_source_unref(source); } static void @@ -1816,8 +1835,7 @@ login_cb(GObject *sender, GAsyncResult *res, gpointer data) struct simple_account_data *sip; struct sip_connection *conn; GSocketConnection *sockconn; - GSocket *socket; - gint fd; + GSource *source; GError *error = NULL; sockconn = g_socket_client_connect_to_host_finish(G_SOCKET_CLIENT(sender), @@ -1831,22 +1849,22 @@ login_cb(GObject *sender, GAsyncResult *res, gpointer data) return; } - socket = g_socket_connection_get_socket(sockconn); - g_assert(socket != NULL); - fd = g_socket_get_fd(socket); - sip = purple_connection_get_protocol_data(gc); - sip->fd = fd; + sip->input = g_io_stream_get_input_stream(G_IO_STREAM(sockconn)); sip->output = purple_queued_output_stream_new( g_io_stream_get_output_stream(G_IO_STREAM(sockconn))); - conn = connection_create(sip, sockconn, fd); + conn = connection_create(sip, sockconn); sip->registertimeout = g_timeout_add(g_random_int_range(10000, 100000), (GSourceFunc)subscribe_timeout, sip); do_register(sip); - conn->inputhandler = purple_input_add(sip->fd, PURPLE_INPUT_READ, simple_input_cb, gc); + source = g_pollable_input_stream_create_source( + G_POLLABLE_INPUT_STREAM(sip->input), sip->cancellable); + g_source_set_callback(source, (GSourceFunc)simple_input_cb, gc, NULL); + conn->inputhandler = g_source_attach(source, NULL); + g_source_unref(source); } static guint simple_ht_hash_nick(const char *nick) { @@ -2128,6 +2146,7 @@ static void simple_close(PurpleConnection *gc) if (sip->listen_data != NULL) purple_network_listen_cancel(sip->listen_data); + g_clear_object(&sip->input); g_clear_object(&sip->output); if (sip->fd >= 0) close(sip->fd); diff --git a/libpurple/protocols/simple/simple.h b/libpurple/protocols/simple/simple.h index 55d596d30d..9390a43db4 100644 --- a/libpurple/protocols/simple/simple.h +++ b/libpurple/protocols/simple/simple.h @@ -94,6 +94,7 @@ struct simple_account_data { gchar *username; gchar *password; GCancellable *cancellable; + GInputStream *input; PurpleQueuedOutputStream *output; GSocketService *service; PurpleNetworkListenData *listen_data; -- cgit v1.2.1