summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas Haller <thaller@redhat.com>2023-03-30 20:09:35 +0200
committerThomas Haller <thaller@redhat.com>2023-04-04 08:43:21 +0200
commit7e12d437fe6302acdc4816d0dcf0c7a4ac778c45 (patch)
tree19e23bbb7796728e144599f9be678a4e682e604e
parentf862d4bbcef704e539fa98d98eacd398e8c6c57a (diff)
downloadNetworkManager-7e12d437fe6302acdc4816d0dcf0c7a4ac778c45.tar.gz
ovsdb: use the FD directly instead of GSocketConnection/GOutputStream
GSocketConnection/GOutputStream/GInputStream seems rather unnecessary. Maybe they make sense when you want to write portable code (for Windows). Otherwise, watching a file descriptor and reading/writing it directly is simpler (and also more efficient). For example, we passed no GCancellable to g_input_stream_read_async(). What does that mean w.r.t. destroying the NMOvsdb instance? I suspect it's wrong, but it's hard to say, because there are so many layers of code. Note that we anyway keep state in NMOvsdb, namely the data we want to send (output_buf) and the data we partially received (input_buf). All we need, are poll notifications when the file descriptor is ready. To those, we hook up the read/write callbacks. Also before was the code async, and there were callbacks when when read/write was done. That does not simplify the code in any way. - we no longer use separate NMOvsdbPrivate.buf and NMOvsdbPrivate.input buffers. There is just a NMOvsdbPrivate.input_buf that can we can fill directly.
-rw-r--r--src/core/devices/ovs/nm-ovsdb.c224
1 files changed, 110 insertions, 114 deletions
diff --git a/src/core/devices/ovs/nm-ovsdb.c b/src/core/devices/ovs/nm-ovsdb.c
index bac6235429..2d080b3313 100644
--- a/src/core/devices/ovs/nm-ovsdb.c
+++ b/src/core/devices/ovs/nm-ovsdb.c
@@ -12,6 +12,7 @@
#include "libnm-glib-aux/nm-jansson.h"
#include "libnm-glib-aux/nm-str-buf.h"
+#include "libnm-glib-aux/nm-io-utils.h"
#include "nm-core-utils.h"
#include "libnm-core-intern/nm-core-internal.h"
#include "devices/nm-device.h"
@@ -134,13 +135,16 @@ enum {
static guint signals[LAST_SIGNAL] = {0};
typedef struct {
- NMPlatform *platform;
- GSocketConnection *conn;
- GCancellable *conn_cancellable;
- char buf[4096]; /* Input buffer */
- GString *input; /* JSON stream waiting for decoding. */
- GString *output; /* JSON stream to be sent. */
- guint64 call_id_counter;
+ NMPlatform *platform;
+ int conn_fd;
+ GSource *conn_fd_in_source;
+ GSource *conn_fd_out_source;
+ GCancellable *conn_cancellable;
+
+ NMStrBuf input_buf;
+ NMStrBuf output_buf;
+
+ guint64 call_id_counter;
CList calls_lst_head;
@@ -176,12 +180,13 @@ NM_DEFINE_SINGLETON_GETTER(NMOvsdb, nm_ovsdb_get, NM_TYPE_OVSDB);
/*****************************************************************************/
-static void ovsdb_try_connect(NMOvsdb *self);
-static void ovsdb_disconnect(NMOvsdb *self, gboolean retry, gboolean is_disposing);
-static void ovsdb_read(NMOvsdb *self);
-static void ovsdb_write(NMOvsdb *self);
-static void ovsdb_next_command(NMOvsdb *self);
-static void cleanup_check_ready(NMOvsdb *self);
+static void ovsdb_try_connect(NMOvsdb *self);
+static void ovsdb_disconnect(NMOvsdb *self, gboolean retry, gboolean is_disposing);
+static void ovsdb_read(NMOvsdb *self);
+static void ovsdb_write_try(NMOvsdb *self);
+static gboolean ovsdb_write_cb(int fd, GIOCondition condition, gpointer user_data);
+static void ovsdb_next_command(NMOvsdb *self);
+static void cleanup_check_ready(NMOvsdb *self);
/*****************************************************************************/
@@ -1448,7 +1453,7 @@ ovsdb_next_command(NMOvsdb *self)
nm_auto_free char *cmd = NULL;
nm_auto_decref_json json_t *msg = NULL;
- if (!priv->conn)
+ if (priv->conn_fd < 0)
return;
if (c_list_is_empty(&priv->calls_lst_head))
@@ -1585,9 +1590,9 @@ ovsdb_next_command(NMOvsdb *self)
cmd = json_dumps(msg, 0);
_LOGT_call(call, "send: call-id=%" G_GUINT64_FORMAT ", %s", call->call_id, cmd);
- g_string_append(priv->output, cmd);
+ nm_str_buf_append(&priv->output_buf, cmd);
- ovsdb_write(self);
+ ovsdb_write_try(self);
}
/**
@@ -2189,16 +2194,12 @@ ovsdb_got_echo(NMOvsdb *self, json_int_t id, json_t *data)
NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
nm_auto_decref_json json_t *msg = NULL;
nm_auto_free char *reply = NULL;
- gboolean output_was_empty;
-
- output_was_empty = priv->output->len == 0;
msg = json_pack("{s:I, s:O}", "id", id, "result", data);
reply = json_dumps(msg, 0);
- g_string_append(priv->output, reply);
+ nm_str_buf_append(&priv->output_buf, reply);
- if (output_was_empty)
- ovsdb_write(self);
+ ovsdb_write_try(self);
}
/**
@@ -2302,7 +2303,7 @@ ovsdb_got_msg(NMOvsdb *self, json_t *msg)
/* Don't progress further commands in case the callback hit an error
* and disconnected us. */
- if (!priv->conn)
+ if (priv->conn_fd < 0)
return;
/* Now we're free to serialize and send the next command, if any. */
@@ -2318,8 +2319,8 @@ ovsdb_got_msg(NMOvsdb *self, json_t *msg)
/*****************************************************************************/
typedef struct {
- gsize bufp;
- GString *input;
+ gsize bufp;
+ NMStrBuf *input;
} JsonReadMsgData;
/* Lower level marshalling and demarshalling of the JSON-RPC traffic on the
@@ -2339,13 +2340,13 @@ _json_read_msg_cb(void *buffer, size_t buflen, void *user_data)
}
/* Pass one more byte to the JSON decoder. */
- *(char *) buffer = data->input->str[data->bufp];
+ *(char *) buffer = nm_str_buf_get_char(data->input, data->bufp);
data->bufp++;
return 1;
}
static json_t *
-_json_read_msg(GString *input)
+_json_read_msg(NMStrBuf *input)
{
JsonReadMsgData data = {
.bufp = 0,
@@ -2364,114 +2365,112 @@ _json_read_msg(GString *input)
return NULL;
nm_assert(data.bufp > 0);
- g_string_erase(input, 0, data.bufp);
+ nm_str_buf_erase(input, 0, data.bufp, FALSE);
return msg;
}
-/**
- * ovsdb_read_cb:
- *
- * Read out the data available from the ovsdb socket and try to deserialize
- * the JSON. If we see a complete object, pass it upwards to ovsdb_got_msg().
- */
static void
-ovsdb_read_cb(GObject *source_object, GAsyncResult *res, gpointer user_data)
+ovsdb_read(NMOvsdb *self)
{
- NMOvsdb *self = NM_OVSDB(user_data);
- NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
- GInputStream *stream = G_INPUT_STREAM(source_object);
- GError *error = NULL;
+ NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
gssize size;
- size = g_input_stream_read_finish(stream, res, &error);
- if (size == -1) {
+again:
+ size = nm_utils_fd_read(priv->conn_fd, &priv->input_buf);
+
+ if (size <= 0) {
+ if (size == -EAGAIN)
+ return;
+
/* ovsdb-server was possibly restarted */
- _LOGW("short read from ovsdb: %s", error->message);
+ _LOGW("short read from ovsdb: %s", nm_strerror_native(-size));
priv->num_failures++;
- g_clear_error(&error);
ovsdb_disconnect(self, priv->num_failures <= OVSDB_MAX_FAILURES, FALSE);
return;
}
- g_string_append_len(priv->input, priv->buf, size);
+ nm_assert(priv->input_buf.len > 0);
+
while (TRUE) {
nm_auto_decref_json json_t *msg = NULL;
- msg = _json_read_msg(priv->input);
+ msg = _json_read_msg(&priv->input_buf);
if (!msg)
break;
ovsdb_got_msg(self, msg);
+
+ if (priv->input_buf.len == 0)
+ break;
}
- if (!priv->conn)
- return;
+ if (priv->input_buf.len > 0) {
+ /* We have an incomplete message in the message buffer. Don't wait for another round
+ * of "poll", instead try to read it again. */
+ goto again;
+ }
+}
- if (size)
- ovsdb_read(self);
+static gboolean
+ovsdb_read_cb(int fd, GIOCondition condition, gpointer user_data)
+{
+ ovsdb_read(user_data);
+ return G_SOURCE_CONTINUE;
}
static void
-ovsdb_read(NMOvsdb *self)
+ovsdb_write(NMOvsdb *self)
{
NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
+ gssize n;
- g_input_stream_read_async(g_io_stream_get_input_stream(G_IO_STREAM(priv->conn)),
- priv->buf,
- sizeof(priv->buf),
- G_PRIORITY_DEFAULT,
- NULL,
- ovsdb_read_cb,
- self);
-}
+again:
+ if (priv->output_buf.len == 0) {
+ nm_clear_g_source_inst(&priv->conn_fd_out_source);
+ return;
+ }
-static void
-ovsdb_write_cb(GObject *source_object, GAsyncResult *res, gpointer user_data)
-{
- GOutputStream *stream = G_OUTPUT_STREAM(source_object);
- NMOvsdb *self = NM_OVSDB(user_data);
- NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
- GError *error = NULL;
- gssize size;
+ n = write(priv->conn_fd,
+ nm_str_buf_get_str_at_unsafe(&priv->output_buf, 0),
+ priv->output_buf.len);
+
+ if (n < 0)
+ n = -NM_ERRNO_NATIVE(errno);
+
+ if (n == -EAGAIN) {
+ if (!priv->conn_fd_out_source) {
+ priv->conn_fd_out_source =
+ nm_g_unix_fd_add_source(priv->conn_fd, G_IO_OUT, ovsdb_write_cb, self);
+ }
+ return;
+ }
- size = g_output_stream_write_finish(stream, res, &error);
- if (size == -1) {
+ if (n <= 0) {
/* ovsdb-server was possibly restarted */
- _LOGW("short write to ovsdb: %s", error->message);
+ _LOGW("short write to ovsdb: %s", nm_strerror_native(-n));
priv->num_failures++;
- g_clear_error(&error);
ovsdb_disconnect(self, priv->num_failures <= OVSDB_MAX_FAILURES, FALSE);
return;
}
- if (!priv->conn)
- return;
-
- g_string_erase(priv->output, 0, size);
-
- ovsdb_write(self);
+ nm_str_buf_erase(&priv->output_buf, 0, n, FALSE);
+ goto again;
}
static void
-ovsdb_write(NMOvsdb *self)
+ovsdb_write_try(NMOvsdb *self)
{
NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
- GOutputStream *stream;
- if (!priv->output->len)
- return;
-
- stream = g_io_stream_get_output_stream(G_IO_STREAM(priv->conn));
- if (g_output_stream_has_pending(stream))
- return;
+ if (priv->conn_fd >= 0 && !priv->conn_fd_out_source)
+ ovsdb_write(self);
+}
- g_output_stream_write_async(stream,
- priv->output->str,
- priv->output->len,
- G_PRIORITY_DEFAULT,
- NULL,
- ovsdb_write_cb,
- self);
+static gboolean
+ovsdb_write_cb(int fd, GIOCondition condition, gpointer user_data)
+{
+ ovsdb_write(user_data);
+ return G_SOURCE_CONTINUE;
}
/*****************************************************************************/
@@ -2494,7 +2493,7 @@ ovsdb_disconnect(NMOvsdb *self, gboolean retry, gboolean is_disposing)
nm_assert(!retry || !is_disposing);
- if (!priv->conn && !priv->conn_cancellable)
+ if (priv->conn_fd < 0 && !priv->conn_cancellable)
return;
_LOGD("disconnecting from ovsdb, retry %d", retry);
@@ -2518,9 +2517,11 @@ ovsdb_disconnect(NMOvsdb *self, gboolean retry, gboolean is_disposing)
_call_complete(call, NULL, error);
}
- g_string_truncate(priv->input, 0);
- g_string_truncate(priv->output, 0);
- g_clear_object(&priv->conn);
+ nm_str_buf_reset(&priv->input_buf);
+ nm_str_buf_reset(&priv->output_buf);
+ nm_clear_fd(&priv->conn_fd);
+ nm_clear_g_source_inst(&priv->conn_fd_in_source);
+ nm_clear_g_source_inst(&priv->conn_fd_out_source);
nm_clear_g_free(&priv->db_uuid);
nm_clear_g_cancellable(&priv->conn_cancellable);
@@ -2721,15 +2722,12 @@ _ovsdb_connect_complete_with_fd(NMOvsdb *self, int fd_take)
gs_unref_object GSocket *socket = NULL;
gs_free_error GError *error = NULL;
- socket = g_socket_new_from_fd(nm_steal_fd(&fd_take), &error);
- if (!socket) {
- _LOGT("connect: failure to open socket for new FD: %s", error->message);
- ovsdb_disconnect(self, FALSE, FALSE);
- return;
- }
+ nm_clear_g_cancellable(&priv->conn_cancellable);
+
+ nm_io_fcntl_setfl_update_nonblock(fd_take);
- priv->conn = g_socket_connection_factory_create_connection(socket);
- g_clear_object(&priv->conn_cancellable);
+ priv->conn_fd = nm_steal_fd(&fd_take);
+ priv->conn_fd_in_source = nm_g_unix_fd_add_source(priv->conn_fd, G_IO_IN, ovsdb_read_cb, self);
ovsdb_read(self);
ovsdb_next_command(self);
@@ -2803,7 +2801,7 @@ ovsdb_try_connect(NMOvsdb *self)
{
NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
- if (priv->conn || priv->conn_cancellable)
+ if (priv->conn_fd >= 0 || priv->conn_cancellable)
return;
_LOGT("connect: start connecting socket %s on idle", NM_OVSDB_SOCKET);
@@ -2983,11 +2981,15 @@ nm_ovsdb_init(NMOvsdb *self)
{
NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
+ priv->conn_fd = -1;
+
+ priv->input_buf = NM_STR_BUF_INIT(0, FALSE);
+ priv->output_buf = NM_STR_BUF_INIT(0, FALSE);
+
c_list_init(&priv->calls_lst_head);
priv->platform = g_object_ref(NM_PLATFORM_GET);
- priv->input = g_string_new(NULL);
- priv->output = g_string_new(NULL);
+
priv->bridges =
g_hash_table_new_full(nm_pstr_hash, nm_pstr_equal, (GDestroyNotify) _free_bridge, NULL);
priv->ports =
@@ -3008,14 +3010,8 @@ dispose(GObject *object)
nm_assert(c_list_is_empty(&priv->calls_lst_head));
- if (priv->input) {
- g_string_free(priv->input, TRUE);
- priv->input = NULL;
- }
- if (priv->output) {
- g_string_free(priv->output, TRUE);
- priv->output = NULL;
- }
+ nm_str_buf_destroy(&priv->input_buf);
+ nm_str_buf_destroy(&priv->output_buf);
g_clear_object(&priv->platform);
nm_clear_pointer(&priv->bridges, g_hash_table_destroy);