summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Thursfield <sam@afuera.me.uk>2022-11-22 10:06:23 +0000
committerSam Thursfield <sam@afuera.me.uk>2022-11-22 10:06:23 +0000
commita3356860d467ef6b09ba56ae96a99f3056061726 (patch)
tree9ebbe3ee84d9830c9cc3f241056795328ccce51a
parent94c7e383cff4b45cb5eab7b7dfcc448828a616fc (diff)
parentcb4c34a71584073f63a4565a654a3cf5bc5ebe2e (diff)
downloadtracker-a3356860d467ef6b09ba56ae96a99f3056061726.tar.gz
Merge branch 'wip/carlosg/stalls' into 'master'
libtracker-sparql: Close descriptors asynchronously in TrackerEndpointDBus Closes #386 See merge request https://gitlab.gnome.org/GNOME/tracker/-/merge_requests/551
-rw-r--r--src/libtracker-sparql/tracker-endpoint-dbus.c107
1 files changed, 59 insertions, 48 deletions
diff --git a/src/libtracker-sparql/tracker-endpoint-dbus.c b/src/libtracker-sparql/tracker-endpoint-dbus.c
index 4bc486c76..40b096929 100644
--- a/src/libtracker-sparql/tracker-endpoint-dbus.c
+++ b/src/libtracker-sparql/tracker-endpoint-dbus.c
@@ -123,13 +123,6 @@ GParamSpec *props[N_PROPS] = { 0 };
static void tracker_endpoint_dbus_initable_iface_init (GInitableIface *iface);
-static void read_update_cb (GObject *object,
- GAsyncResult *res,
- gpointer user_data);
-static void read_update_blank_cb (GObject *object,
- GAsyncResult *res,
- gpointer user_data);
-
G_DEFINE_TYPE_WITH_CODE (TrackerEndpointDBus, tracker_endpoint_dbus, TRACKER_TYPE_ENDPOINT,
G_IMPLEMENT_INTERFACE (G_TYPE_INITABLE, tracker_endpoint_dbus_initable_iface_init))
@@ -269,8 +262,9 @@ query_request_free (QueryRequest *request)
g_source_unref (request->source);
g_object_unref (request->cancellable);
- g_output_stream_close (G_OUTPUT_STREAM (request->data_stream),
- NULL, NULL);
+ g_output_stream_close_async (G_OUTPUT_STREAM (request->data_stream),
+ G_PRIORITY_DEFAULT,
+ NULL, NULL, NULL);
g_object_unref (request->invocation);
g_object_unref (request->data_stream);
@@ -310,44 +304,53 @@ update_request_new (TrackerEndpointDBus *endpoint,
return request;
}
-static gboolean
-update_request_read_next (UpdateRequest *request,
- GAsyncReadyCallback cb)
+static void
+handle_read_updates (GTask *task,
+ gpointer source_object,
+ gpointer task_data,
+ GCancellable *cancellable)
{
+ UpdateRequest *request = task_data;
gchar *buffer;
gint buffer_size, prologue_size = 0;
+ GError *error = NULL;
- if (request->cur_query >= request->num_queries)
- return FALSE;
+ while (request->cur_query < request->num_queries) {
+ if (request->prologue)
+ prologue_size = strlen (request->prologue) + 1;
+
+ request->cur_query++;
+ buffer_size = g_data_input_stream_read_int32 (request->input_stream, NULL, NULL);
+ buffer = g_new0 (char, prologue_size + 1 + buffer_size + 1);
- if (request->prologue)
- prologue_size = strlen (request->prologue) + 1;
+ if (request->prologue) {
+ strncpy (buffer, request->prologue, prologue_size - 1);
+ buffer[prologue_size - 1] = ' ';
+ }
- request->cur_query++;
- buffer_size = g_data_input_stream_read_int32 (request->input_stream, NULL, NULL);
- buffer = g_new0 (char, prologue_size + 1 + buffer_size + 1);
+ g_ptr_array_add (request->queries, buffer);
- if (request->prologue) {
- strncpy (buffer, request->prologue, prologue_size - 1);
- buffer[prologue_size - 1] = ' ';
+ if (!g_input_stream_read_all (G_INPUT_STREAM (request->input_stream),
+ &buffer[prologue_size],
+ buffer_size,
+ NULL,
+ cancellable,
+ &error))
+ goto error;
}
- g_ptr_array_add (request->queries, buffer);
-
- g_input_stream_read_all_async (G_INPUT_STREAM (request->input_stream),
- &buffer[prologue_size],
- buffer_size,
- G_PRIORITY_DEFAULT,
- request->endpoint->cancellable,
- cb, request);
- return TRUE;
+ g_task_return_boolean (task, TRUE);
+ return;
+ error:
+ g_task_return_error (task, error);
}
static void
update_request_free (UpdateRequest *request)
{
- g_input_stream_close (G_INPUT_STREAM (request->input_stream),
- NULL, NULL);
+ g_input_stream_close_async (G_INPUT_STREAM (request->input_stream),
+ G_PRIORITY_DEFAULT,
+ NULL, NULL, NULL);
g_ptr_array_unref (request->queries);
g_object_unref (request->invocation);
@@ -633,22 +636,19 @@ read_update_cb (GObject *object,
UpdateRequest *request = user_data;
GError *error = NULL;
- if (!g_input_stream_read_all_finish (G_INPUT_STREAM (object),
- res, NULL, &error)) {
+ if (!g_task_propagate_boolean (G_TASK (res), &error)) {
g_dbus_method_invocation_return_gerror (request->invocation, error);
update_request_free (request);
return;
}
- if (!update_request_read_next (request, read_update_cb)) {
- conn = tracker_endpoint_get_sparql_connection (TRACKER_ENDPOINT (request->endpoint));
- tracker_sparql_connection_update_array_async (conn,
- (gchar **) request->queries->pdata,
- request->queries->len,
- request->endpoint->cancellable,
- update_cb,
- request);
- }
+ conn = tracker_endpoint_get_sparql_connection (TRACKER_ENDPOINT (request->endpoint));
+ tracker_sparql_connection_update_array_async (conn,
+ (gchar **) request->queries->pdata,
+ request->queries->len,
+ request->endpoint->cancellable,
+ update_cb,
+ request);
}
static void
@@ -685,8 +685,7 @@ read_update_blank_cb (GObject *object,
UpdateRequest *request = user_data;
GError *error = NULL;
- if (!g_input_stream_read_all_finish (G_INPUT_STREAM (object),
- res, NULL, &error)) {
+ if (!g_task_propagate_boolean (G_TASK (res), &error)) {
g_dbus_method_invocation_return_gerror (request->invocation, error);
update_request_free (request);
return;
@@ -927,11 +926,17 @@ endpoint_dbus_iface_method_call (GDBusConnection *connection,
"Did not get a file descriptor");
} else {
UpdateRequest *request;
+ GTask *task;
request = update_request_new (endpoint_dbus, invocation,
g_strcmp0 (method_name, "UpdateArray") == 0,
fd);
- update_request_read_next (request, read_update_cb);
+
+ task = g_task_new (NULL, request->endpoint->cancellable,
+ read_update_cb, request);
+ g_task_set_task_data (task, request, NULL);
+ g_task_run_in_thread (task, handle_read_updates);
+ g_object_unref (task);
}
} else if (g_strcmp0 (method_name, "UpdateBlank") == 0) {
if (tracker_endpoint_dbus_forbid_operation (endpoint_dbus,
@@ -956,9 +961,15 @@ endpoint_dbus_iface_method_call (GDBusConnection *connection,
"Did not get a file descriptor");
} else {
UpdateRequest *request;
+ GTask *task;
request = update_request_new (endpoint_dbus, invocation, FALSE, fd);
- update_request_read_next (request, read_update_blank_cb);
+
+ task = g_task_new (NULL, request->endpoint->cancellable,
+ read_update_blank_cb, request);
+ g_task_set_task_data (task, request, NULL);
+ g_task_run_in_thread (task, handle_read_updates);
+ g_object_unref (task);
}
} else if (g_strcmp0 (method_name, "Deserialize") == 0) {
TrackerDeserializeFlags flags;