diff options
author | Philip Withnall <pwithnall@endlessos.org> | 2023-04-25 15:58:46 +0100 |
---|---|---|
committer | Philip Withnall <pwithnall@endlessos.org> | 2023-04-27 12:23:25 +0100 |
commit | 7b18e6205a03058541391e02c013c893b144e1ed (patch) | |
tree | 6f1f51c0db42e65e580d6c213bd03196842fdb06 | |
parent | 84074ce757d090f52f6701f43b99a41517131433 (diff) | |
download | glib-7b18e6205a03058541391e02c013c893b144e1ed.tar.gz |
gthreadedresolver: Switch to using a separate thread pool
Rather than running lookups in the global shared thread pool belonging
to `GTask`, run them in a private thread pool.
This is needed because the global shared thread pool is constrained to
only 14 threads. If there are 14 ongoing calls to
`g_task_run_in_thread()` from any library/code in the process, and then
one of them asks to do a DNS lookup, the lookup will block forever.
Under certain circumstances, particularly where there are a couple of
deep chains of dependent tasks running with `g_task_run_in_thread()`,
this can livelock the program.
Since `GResolver` is likely to be called as a frequent leaf call in
certain workloads, and in particular there are likely to be several
lookups requested at the same time, it makes sense to move resolver
lookups to a private thread pool.
Signed-off-by: Philip Withnall <pwithnall@endlessos.org>
-rw-r--r-- | gio/gthreadedresolver.c | 198 |
1 files changed, 162 insertions, 36 deletions
diff --git a/gio/gthreadedresolver.c b/gio/gthreadedresolver.c index 4e5b20751..1277907e7 100644 --- a/gio/gthreadedresolver.c +++ b/gio/gthreadedresolver.c @@ -41,18 +41,39 @@ struct _GThreadedResolver { GResolver parent_instance; + + GThreadPool *thread_pool; /* (owned) */ }; G_DEFINE_TYPE (GThreadedResolver, g_threaded_resolver, G_TYPE_RESOLVER) -static void threaded_resolver_worker_cb (GTask *task, - gpointer source_object, - gpointer task_data, - GCancellable *cancellable); +static void run_task_in_thread_pool_async (GThreadedResolver *self, + GTask *task); +static void run_task_in_thread_pool_sync (GThreadedResolver *self, + GTask *task); +static void threaded_resolver_worker_cb (gpointer task_data, + gpointer user_data); static void -g_threaded_resolver_init (GThreadedResolver *gtr) +g_threaded_resolver_init (GThreadedResolver *self) { + self->thread_pool = g_thread_pool_new_full (threaded_resolver_worker_cb, + self, + (GDestroyNotify) g_object_unref, + 20, + FALSE, + NULL); +} + +static void +g_threaded_resolver_finalize (GObject *object) +{ + GThreadedResolver *self = G_THREADED_RESOLVER (object); + + g_thread_pool_free (self->thread_pool, TRUE, FALSE); + self->thread_pool = NULL; + + G_OBJECT_CLASS (g_threaded_resolver_parent_class)->finalize (object); } static GResolverError @@ -95,6 +116,23 @@ typedef struct { GResolverRecordType record_type; } lookup_records; }; + + GCond cond; /* used for signalling completion of the task when running it sync */ + GMutex lock; + + /* This enum indicates that a particular code path has claimed the + * task and is shortly about to call g_task_return_*() on it. + * This must be accessed with GThreadedResolver.lock held. */ + enum + { + NOT_YET, + COMPLETED, /* libc lookup call has completed successfully or errored */ + } will_return; + + /* Whether the thread pool thread executing this lookup has finished executing + * it and g_task_return_*() has been called on it already. + * This must be accessed with GThreadedResolver.lock held. */ + gboolean has_returned; } LookupData; static LookupData * @@ -103,6 +141,8 @@ lookup_data_new_by_name (const char *hostname, { LookupData *data = g_new0 (LookupData, 1); data->lookup_type = LOOKUP_BY_NAME; + g_cond_init (&data->cond); + g_mutex_init (&data->lock); data->lookup_by_name.hostname = g_strdup (hostname); data->lookup_by_name.address_family = address_family; return g_steal_pointer (&data); @@ -113,6 +153,8 @@ lookup_data_new_by_address (GInetAddress *address) { LookupData *data = g_new0 (LookupData, 1); data->lookup_type = LOOKUP_BY_ADDRESS; + g_cond_init (&data->cond); + g_mutex_init (&data->lock); data->lookup_by_address.address = g_object_ref (address); return g_steal_pointer (&data); } @@ -123,6 +165,8 @@ lookup_data_new_records (const gchar *rrname, { LookupData *data = g_new0 (LookupData, 1); data->lookup_type = LOOKUP_RECORDS; + g_cond_init (&data->cond); + g_mutex_init (&data->lock); data->lookup_records.rrname = g_strdup (rrname); data->lookup_records.record_type = record_type; return g_steal_pointer (&data); @@ -145,6 +189,9 @@ lookup_data_free (LookupData *data) g_assert_not_reached (); } + g_mutex_clear (&data->lock); + g_cond_clear (&data->cond); + g_free (data); } @@ -243,6 +290,7 @@ lookup_by_name (GResolver *resolver, GCancellable *cancellable, GError **error) { + GThreadedResolver *self = G_THREADED_RESOLVER (resolver); GTask *task; GList *addresses; LookupData *data; @@ -252,8 +300,9 @@ lookup_by_name (GResolver *resolver, g_task_set_source_tag (task, lookup_by_name); g_task_set_name (task, "[gio] resolver lookup"); g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free); - g_task_set_return_on_cancel (task, TRUE); - g_task_run_in_thread_sync (task, threaded_resolver_worker_cb); + + run_task_in_thread_pool_sync (self, task); + addresses = g_task_propagate_pointer (task, error); g_object_unref (task); @@ -285,6 +334,7 @@ lookup_by_name_with_flags (GResolver *resolver, GCancellable *cancellable, GError **error) { + GThreadedResolver *self = G_THREADED_RESOLVER (resolver); GTask *task; GList *addresses; LookupData *data; @@ -294,8 +344,9 @@ lookup_by_name_with_flags (GResolver *resolver, g_task_set_source_tag (task, lookup_by_name_with_flags); g_task_set_name (task, "[gio] resolver lookup"); g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free); - g_task_set_return_on_cancel (task, TRUE); - g_task_run_in_thread_sync (task, threaded_resolver_worker_cb); + + run_task_in_thread_pool_sync (self, task); + addresses = g_task_propagate_pointer (task, error); g_object_unref (task); @@ -310,6 +361,7 @@ lookup_by_name_with_flags_async (GResolver *resolver, GAsyncReadyCallback callback, gpointer user_data) { + GThreadedResolver *self = G_THREADED_RESOLVER (resolver); GTask *task; LookupData *data; @@ -322,8 +374,9 @@ lookup_by_name_with_flags_async (GResolver *resolver, g_task_set_source_tag (task, lookup_by_name_with_flags_async); g_task_set_name (task, "[gio] resolver lookup"); g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free); - g_task_set_return_on_cancel (task, TRUE); - g_task_run_in_thread (task, threaded_resolver_worker_cb); + + run_task_in_thread_pool_async (self, task); + g_object_unref (task); } @@ -415,6 +468,7 @@ lookup_by_address (GResolver *resolver, GCancellable *cancellable, GError **error) { + GThreadedResolver *self = G_THREADED_RESOLVER (resolver); LookupData *data = NULL; GTask *task; gchar *name; @@ -424,8 +478,9 @@ lookup_by_address (GResolver *resolver, g_task_set_source_tag (task, lookup_by_address); g_task_set_name (task, "[gio] resolver lookup"); g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free); - g_task_set_return_on_cancel (task, TRUE); - g_task_run_in_thread_sync (task, threaded_resolver_worker_cb); + + run_task_in_thread_pool_sync (self, task); + name = g_task_propagate_pointer (task, error); g_object_unref (task); @@ -439,6 +494,7 @@ lookup_by_address_async (GResolver *resolver, GAsyncReadyCallback callback, gpointer user_data) { + GThreadedResolver *self = G_THREADED_RESOLVER (resolver); LookupData *data = NULL; GTask *task; @@ -447,8 +503,9 @@ lookup_by_address_async (GResolver *resolver, g_task_set_source_tag (task, lookup_by_address_async); g_task_set_name (task, "[gio] resolver lookup"); g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free); - g_task_set_return_on_cancel (task, TRUE); - g_task_run_in_thread (task, threaded_resolver_worker_cb); + + run_task_in_thread_pool_async (self, task); + g_object_unref (task); } @@ -1247,6 +1304,7 @@ lookup_records (GResolver *resolver, GCancellable *cancellable, GError **error) { + GThreadedResolver *self = G_THREADED_RESOLVER (resolver); GTask *task; GList *records; LookupData *data = NULL; @@ -1258,8 +1316,8 @@ lookup_records (GResolver *resolver, data = lookup_data_new_records (rrname, record_type); g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free); - g_task_set_return_on_cancel (task, TRUE); - g_task_run_in_thread_sync (task, threaded_resolver_worker_cb); + run_task_in_thread_pool_sync (self, task); + records = g_task_propagate_pointer (task, error); g_object_unref (task); @@ -1274,6 +1332,7 @@ lookup_records_async (GResolver *resolver, GAsyncReadyCallback callback, gpointer user_data) { + GThreadedResolver *self = G_THREADED_RESOLVER (resolver); GTask *task; LookupData *data = NULL; @@ -1284,8 +1343,8 @@ lookup_records_async (GResolver *resolver, data = lookup_data_new_records (rrname, record_type); g_task_set_task_data (task, g_steal_pointer (&data), (GDestroyNotify) lookup_data_free); - g_task_set_return_on_cancel (task, TRUE); - g_task_run_in_thread (task, threaded_resolver_worker_cb); + run_task_in_thread_pool_async (self, task); + g_object_unref (task); } @@ -1300,13 +1359,41 @@ lookup_records_finish (GResolver *resolver, } static void -threaded_resolver_worker_cb (GTask *task, - gpointer source_object, - gpointer task_data, - GCancellable *cancellable) +run_task_in_thread_pool_async (GThreadedResolver *self, + GTask *task) +{ + LookupData *data = g_task_get_task_data (task); + + g_mutex_lock (&data->lock); + + g_thread_pool_push (self->thread_pool, g_object_ref (task), NULL); + + g_mutex_unlock (&data->lock); +} + +static void +run_task_in_thread_pool_sync (GThreadedResolver *self, + GTask *task) +{ + LookupData *data = g_task_get_task_data (task); + + run_task_in_thread_pool_async (self, task); + + g_mutex_lock (&data->lock); + while (!data->has_returned) + g_cond_wait (&data->cond, &data->lock); + g_mutex_unlock (&data->lock); +} + +static void +threaded_resolver_worker_cb (gpointer task_data, + gpointer user_data) { - LookupData *data = task_data; + GTask *task = G_TASK (g_steal_pointer (&task_data)); + LookupData *data = g_task_get_task_data (task); + GCancellable *cancellable = g_task_get_cancellable (task); GError *local_error = NULL; + gboolean should_return; switch (data->lookup_type) { case LOOKUP_BY_NAME: @@ -1316,10 +1403,19 @@ threaded_resolver_worker_cb (GTask *task, cancellable, &local_error); - if (addresses != NULL) - g_task_return_pointer (task, g_steal_pointer (&addresses), (GDestroyNotify) g_resolver_free_addresses); - else - g_task_return_error (task, g_steal_pointer (&local_error)); + g_mutex_lock (&data->lock); + should_return = g_atomic_int_compare_and_exchange (&data->will_return, NOT_YET, COMPLETED); + g_mutex_unlock (&data->lock); + + if (should_return) + { + if (addresses != NULL) + g_task_return_pointer (task, g_steal_pointer (&addresses), (GDestroyNotify) g_resolver_free_addresses); + else + g_task_return_error (task, g_steal_pointer (&local_error)); + } + + g_clear_pointer (&addresses, g_resolver_free_addresses); } break; case LOOKUP_BY_ADDRESS: @@ -1328,10 +1424,19 @@ threaded_resolver_worker_cb (GTask *task, cancellable, &local_error); - if (name != NULL) - g_task_return_pointer (task, g_steal_pointer (&name), g_free); - else - g_task_return_error (task, g_steal_pointer (&local_error)); + g_mutex_lock (&data->lock); + should_return = g_atomic_int_compare_and_exchange (&data->will_return, NOT_YET, COMPLETED); + g_mutex_unlock (&data->lock); + + if (should_return) + { + if (name != NULL) + g_task_return_pointer (task, g_steal_pointer (&name), g_free); + else + g_task_return_error (task, g_steal_pointer (&local_error)); + } + + g_clear_pointer (&name, g_free); } break; case LOOKUP_RECORDS: @@ -1341,22 +1446,43 @@ threaded_resolver_worker_cb (GTask *task, cancellable, &local_error); - if (records != NULL) - g_task_return_pointer (task, g_steal_pointer (&records), (GDestroyNotify) free_records); - else - g_task_return_error (task, g_steal_pointer (&local_error)); + g_mutex_lock (&data->lock); + should_return = g_atomic_int_compare_and_exchange (&data->will_return, NOT_YET, COMPLETED); + g_mutex_unlock (&data->lock); + + if (should_return) + { + if (records != NULL) + g_task_return_pointer (task, g_steal_pointer (&records), (GDestroyNotify) free_records); + else + g_task_return_error (task, g_steal_pointer (&local_error)); + } + + g_clear_pointer (&records, free_records); } break; default: g_assert_not_reached (); } + + /* Signal completion of a task. */ + g_mutex_lock (&data->lock); + g_assert (!data->has_returned); + data->has_returned = TRUE; + g_cond_broadcast (&data->cond); + g_mutex_unlock (&data->lock); + + g_object_unref (task); } static void g_threaded_resolver_class_init (GThreadedResolverClass *threaded_class) { + GObjectClass *object_class = G_OBJECT_CLASS (threaded_class); GResolverClass *resolver_class = G_RESOLVER_CLASS (threaded_class); + object_class->finalize = g_threaded_resolver_finalize; + resolver_class->lookup_by_name = lookup_by_name; resolver_class->lookup_by_name_async = lookup_by_name_async; resolver_class->lookup_by_name_finish = lookup_by_name_finish; |