diff options
author | Sam Thursfield <sam@afuera.me.uk> | 2022-11-22 10:06:23 +0000 |
---|---|---|
committer | Sam Thursfield <sam@afuera.me.uk> | 2022-11-22 10:06:23 +0000 |
commit | a3356860d467ef6b09ba56ae96a99f3056061726 (patch) | |
tree | 9ebbe3ee84d9830c9cc3f241056795328ccce51a | |
parent | 94c7e383cff4b45cb5eab7b7dfcc448828a616fc (diff) | |
parent | cb4c34a71584073f63a4565a654a3cf5bc5ebe2e (diff) | |
download | tracker-a3356860d467ef6b09ba56ae96a99f3056061726.tar.gz |
Merge branch 'wip/carlosg/stalls' into 'master'
libtracker-sparql: Close descriptors asynchronously in TrackerEndpointDBus
Closes #386
See merge request https://gitlab.gnome.org/GNOME/tracker/-/merge_requests/551
-rw-r--r-- | src/libtracker-sparql/tracker-endpoint-dbus.c | 107 |
1 files changed, 59 insertions, 48 deletions
diff --git a/src/libtracker-sparql/tracker-endpoint-dbus.c b/src/libtracker-sparql/tracker-endpoint-dbus.c index 4bc486c76..40b096929 100644 --- a/src/libtracker-sparql/tracker-endpoint-dbus.c +++ b/src/libtracker-sparql/tracker-endpoint-dbus.c @@ -123,13 +123,6 @@ GParamSpec *props[N_PROPS] = { 0 }; static void tracker_endpoint_dbus_initable_iface_init (GInitableIface *iface); -static void read_update_cb (GObject *object, - GAsyncResult *res, - gpointer user_data); -static void read_update_blank_cb (GObject *object, - GAsyncResult *res, - gpointer user_data); - G_DEFINE_TYPE_WITH_CODE (TrackerEndpointDBus, tracker_endpoint_dbus, TRACKER_TYPE_ENDPOINT, G_IMPLEMENT_INTERFACE (G_TYPE_INITABLE, tracker_endpoint_dbus_initable_iface_init)) @@ -269,8 +262,9 @@ query_request_free (QueryRequest *request) g_source_unref (request->source); g_object_unref (request->cancellable); - g_output_stream_close (G_OUTPUT_STREAM (request->data_stream), - NULL, NULL); + g_output_stream_close_async (G_OUTPUT_STREAM (request->data_stream), + G_PRIORITY_DEFAULT, + NULL, NULL, NULL); g_object_unref (request->invocation); g_object_unref (request->data_stream); @@ -310,44 +304,53 @@ update_request_new (TrackerEndpointDBus *endpoint, return request; } -static gboolean -update_request_read_next (UpdateRequest *request, - GAsyncReadyCallback cb) +static void +handle_read_updates (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) { + UpdateRequest *request = task_data; gchar *buffer; gint buffer_size, prologue_size = 0; + GError *error = NULL; - if (request->cur_query >= request->num_queries) - return FALSE; + while (request->cur_query < request->num_queries) { + if (request->prologue) + prologue_size = strlen (request->prologue) + 1; + + request->cur_query++; + buffer_size = g_data_input_stream_read_int32 (request->input_stream, NULL, NULL); + buffer = g_new0 (char, prologue_size + 1 + buffer_size + 1); - if (request->prologue) - prologue_size = strlen (request->prologue) + 1; + if (request->prologue) { + strncpy (buffer, request->prologue, prologue_size - 1); + buffer[prologue_size - 1] = ' '; + } - request->cur_query++; - buffer_size = g_data_input_stream_read_int32 (request->input_stream, NULL, NULL); - buffer = g_new0 (char, prologue_size + 1 + buffer_size + 1); + g_ptr_array_add (request->queries, buffer); - if (request->prologue) { - strncpy (buffer, request->prologue, prologue_size - 1); - buffer[prologue_size - 1] = ' '; + if (!g_input_stream_read_all (G_INPUT_STREAM (request->input_stream), + &buffer[prologue_size], + buffer_size, + NULL, + cancellable, + &error)) + goto error; } - g_ptr_array_add (request->queries, buffer); - - g_input_stream_read_all_async (G_INPUT_STREAM (request->input_stream), - &buffer[prologue_size], - buffer_size, - G_PRIORITY_DEFAULT, - request->endpoint->cancellable, - cb, request); - return TRUE; + g_task_return_boolean (task, TRUE); + return; + error: + g_task_return_error (task, error); } static void update_request_free (UpdateRequest *request) { - g_input_stream_close (G_INPUT_STREAM (request->input_stream), - NULL, NULL); + g_input_stream_close_async (G_INPUT_STREAM (request->input_stream), + G_PRIORITY_DEFAULT, + NULL, NULL, NULL); g_ptr_array_unref (request->queries); g_object_unref (request->invocation); @@ -633,22 +636,19 @@ read_update_cb (GObject *object, UpdateRequest *request = user_data; GError *error = NULL; - if (!g_input_stream_read_all_finish (G_INPUT_STREAM (object), - res, NULL, &error)) { + if (!g_task_propagate_boolean (G_TASK (res), &error)) { g_dbus_method_invocation_return_gerror (request->invocation, error); update_request_free (request); return; } - if (!update_request_read_next (request, read_update_cb)) { - conn = tracker_endpoint_get_sparql_connection (TRACKER_ENDPOINT (request->endpoint)); - tracker_sparql_connection_update_array_async (conn, - (gchar **) request->queries->pdata, - request->queries->len, - request->endpoint->cancellable, - update_cb, - request); - } + conn = tracker_endpoint_get_sparql_connection (TRACKER_ENDPOINT (request->endpoint)); + tracker_sparql_connection_update_array_async (conn, + (gchar **) request->queries->pdata, + request->queries->len, + request->endpoint->cancellable, + update_cb, + request); } static void @@ -685,8 +685,7 @@ read_update_blank_cb (GObject *object, UpdateRequest *request = user_data; GError *error = NULL; - if (!g_input_stream_read_all_finish (G_INPUT_STREAM (object), - res, NULL, &error)) { + if (!g_task_propagate_boolean (G_TASK (res), &error)) { g_dbus_method_invocation_return_gerror (request->invocation, error); update_request_free (request); return; @@ -927,11 +926,17 @@ endpoint_dbus_iface_method_call (GDBusConnection *connection, "Did not get a file descriptor"); } else { UpdateRequest *request; + GTask *task; request = update_request_new (endpoint_dbus, invocation, g_strcmp0 (method_name, "UpdateArray") == 0, fd); - update_request_read_next (request, read_update_cb); + + task = g_task_new (NULL, request->endpoint->cancellable, + read_update_cb, request); + g_task_set_task_data (task, request, NULL); + g_task_run_in_thread (task, handle_read_updates); + g_object_unref (task); } } else if (g_strcmp0 (method_name, "UpdateBlank") == 0) { if (tracker_endpoint_dbus_forbid_operation (endpoint_dbus, @@ -956,9 +961,15 @@ endpoint_dbus_iface_method_call (GDBusConnection *connection, "Did not get a file descriptor"); } else { UpdateRequest *request; + GTask *task; request = update_request_new (endpoint_dbus, invocation, FALSE, fd); - update_request_read_next (request, read_update_blank_cb); + + task = g_task_new (NULL, request->endpoint->cancellable, + read_update_blank_cb, request); + g_task_set_task_data (task, request, NULL); + g_task_run_in_thread (task, handle_read_updates); + g_object_unref (task); } } else if (g_strcmp0 (method_name, "Deserialize") == 0) { TrackerDeserializeFlags flags; |