summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Nicholson <dbn@endlessos.org>2023-04-07 09:47:14 -0600
committerDan Nicholson <dbn@endlessos.org>2023-04-13 09:00:23 -0600
commita0407225f15415ddf207c25201c48445418ab964 (patch)
treef7f165dd36e920ba9d19c83f3dc1d19c175ff89a
parent945f0a860299e93985be02719c0c427162de551c (diff)
downloadostree-a0407225f15415ddf207c25201c48445418ab964.tar.gz
fetcher/soup3: Rewrite without threads
soup3 works best using only the async API from a single thread[1]. Rework the fetcher to stop using worker threads. In order to maximize session usage across requests, sessions will be reused for each main context. 1. https://libsoup.org/libsoup-3.0/client-thread-safety.html
-rw-r--r--src/libostree/ostree-fetcher-soup3.c1239
1 files changed, 413 insertions, 826 deletions
diff --git a/src/libostree/ostree-fetcher-soup3.c b/src/libostree/ostree-fetcher-soup3.c
index 9f495a72..b46ce34f 100644
--- a/src/libostree/ostree-fetcher-soup3.c
+++ b/src/libostree/ostree-fetcher-soup3.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2011 Colin Walters <walters@verbum.org>
* Copyright (C) 2022 Igalia S.L.
+ * Copyright (C) 2023 Endless OS Foundation, LLC
*
* SPDX-License-Identifier: LGPL-2.0+
*
@@ -19,12 +20,12 @@
*
* Author: Colin Walters <walters@verbum.org>
* Author: Daniel Kolesa <dkolesa@igalia.com>
+ * Author: Dan Nicholson <dbn@endlessos.org>
*/
#include "config.h"
#include <gio/gio.h>
-#include <gio/gfiledescriptorbased.h>
#include <gio/gunixoutputstream.h>
#include <libsoup/soup.h>
@@ -33,62 +34,24 @@
#include "ostree-fetcher-util.h"
#include "ostree-tls-cert-interaction-private.h"
#include "ostree-enumtypes.h"
-#include "ostree.h"
#include "ostree-repo-private.h"
-#include "otutil.h"
-
-typedef enum {
- OSTREE_FETCHER_STATE_PENDING,
- OSTREE_FETCHER_STATE_DOWNLOADING,
- OSTREE_FETCHER_STATE_COMPLETE
-} OstreeFetcherState;
-
-typedef struct {
- int ref_count; /* atomic */
-
- SoupSession *session; /* not referenced */
- GMainContext *main_context;
- volatile gint running;
- GError *initialization_error; /* Any failure to load the db */
-
- char *remote_name;
- int base_tmpdir_dfd;
-
- GVariant *extra_headers;
- gboolean transfer_gzip;
-
- /* Our active HTTP requests */
- GHashTable *outstanding;
-
- /* Shared across threads; be sure to lock. */
- GHashTable *output_stream_set; /* set<GOutputStream> */
- GMutex output_stream_set_lock;
-
- /* Also protected by output_stream_set_lock. */
- guint64 total_downloaded;
-
- GError *oob_error;
-} ThreadClosure;
typedef struct {
- int ref_count; /* atomic */
-
- ThreadClosure *thread_closure;
GPtrArray *mirrorlist; /* list of base URIs */
char *filename; /* relative name to fetch or NULL */
guint mirrorlist_idx;
- OstreeFetcherState state;
-
SoupMessage *message;
- GFile *file;
struct OstreeFetcher *fetcher;
+ GMainContext *mainctx;
+ SoupSession *session;
+ GFile *file;
gboolean is_membuf;
OstreeFetcherRequestFlags flags;
char *if_none_match; /* request ETag */
guint64 if_modified_since; /* seconds since the epoch */
- GInputStream *request_body;
+ GInputStream *response_body;
GLnxTmpfile tmpf;
GOutputStream *out_stream;
gboolean out_not_modified; /* TRUE if the server gave a HTTP 304 Not Modified response, which we don’t propagate as an error */
@@ -97,29 +60,26 @@ typedef struct {
guint64 max_size;
guint64 current_size;
- guint64 content_length;
-} OstreeFetcherPendingURI;
-
-/* Used by session_thread_idle_add() */
-typedef void (*SessionThreadFunc) (ThreadClosure *thread_closure,
- gpointer data);
-
-/* Used by session_thread_idle_add() */
-typedef struct {
- ThreadClosure *thread_closure;
- SessionThreadFunc function;
- gpointer data;
- GDestroyNotify notify;
-} IdleClosure;
+ goffset content_length;
+} FetcherRequest;
struct OstreeFetcher
{
GObject parent_instance;
OstreeFetcherConfigFlags config_flags;
+ char *remote_name;
+ int tmpdir_dfd;
- GThread *session_thread;
- ThreadClosure *thread_closure;
+ GHashTable *sessions; /* (element-type GMainContext SoupSession ) */
+ GProxyResolver *proxy_resolver;
+ SoupCookieJar *cookie_jar;
+ OstreeTlsCertInteraction *client_cert;
+ GTlsDatabase *tls_database;
+ GVariant *extra_headers;
+ char *user_agent;
+
+ guint64 bytes_transferred;
};
enum {
@@ -129,399 +89,113 @@ enum {
G_DEFINE_TYPE (OstreeFetcher, _ostree_fetcher, G_TYPE_OBJECT)
-static ThreadClosure *
-thread_closure_ref (ThreadClosure *thread_closure)
-{
- int refcount;
- g_return_val_if_fail (thread_closure != NULL, NULL);
- refcount = g_atomic_int_add (&thread_closure->ref_count, 1);
- g_assert (refcount > 0);
- return thread_closure;
-}
-
-static void
-thread_closure_unref (ThreadClosure *thread_closure)
-{
- g_return_if_fail (thread_closure != NULL);
-
- if (g_atomic_int_dec_and_test (&thread_closure->ref_count))
- {
- /* The session thread should have cleared this by now. */
- g_assert (thread_closure->session == NULL);
-
- g_clear_pointer (&thread_closure->main_context, g_main_context_unref);
-
- g_clear_pointer (&thread_closure->extra_headers, g_variant_unref);
-
- g_clear_pointer (&thread_closure->output_stream_set, g_hash_table_unref);
- g_mutex_clear (&thread_closure->output_stream_set_lock);
-
- g_clear_pointer (&thread_closure->oob_error, g_error_free);
-
- g_free (thread_closure->remote_name);
-
- g_slice_free (ThreadClosure, thread_closure);
- }
-}
-
static void
-idle_closure_free (IdleClosure *idle_closure)
-{
- g_clear_pointer (&idle_closure->thread_closure, thread_closure_unref);
-
- if (idle_closure->notify != NULL)
- idle_closure->notify (idle_closure->data);
-
- g_slice_free (IdleClosure, idle_closure);
-}
-
-static OstreeFetcherPendingURI *
-pending_uri_ref (OstreeFetcherPendingURI *pending)
-{
- gint refcount;
- g_assert (pending);
- refcount = g_atomic_int_add (&pending->ref_count, 1);
- g_assert (refcount > 0);
- return pending;
+fetcher_request_free (FetcherRequest *request)
+{
+ g_debug ("Freeing request for %s", request->filename);
+ g_clear_pointer (&request->mirrorlist, g_ptr_array_unref);
+ g_clear_pointer (&request->filename, g_free);
+ g_clear_object (&request->message);
+ g_clear_pointer (&request->mainctx, g_main_context_unref);
+ g_clear_object (&request->session);
+ g_clear_object (&request->file);
+ g_clear_pointer (&request->if_none_match, g_free);
+ g_clear_object (&request->response_body);
+ glnx_tmpfile_clear (&request->tmpf);
+ g_clear_object (&request->out_stream);
+ g_clear_pointer (&request->out_etag, g_free);
+ g_free (request);
}
static void
-pending_uri_unref (OstreeFetcherPendingURI *pending)
-{
- if (!g_atomic_int_dec_and_test (&pending->ref_count))
- return;
-
- g_clear_pointer (&pending->thread_closure, thread_closure_unref);
-
- g_clear_pointer (&pending->mirrorlist, g_ptr_array_unref);
- g_free (pending->filename);
- g_clear_object (&pending->message);
- g_clear_object (&pending->file);
- g_clear_object (&pending->request_body);
- g_free (pending->if_none_match);
- glnx_tmpfile_clear (&pending->tmpf);
- g_clear_object (&pending->out_stream);
- g_free (pending->out_etag);
- g_free (pending);
-}
-
-static gboolean
-session_thread_idle_dispatch (gpointer data)
-{
- IdleClosure *idle_closure = data;
-
- idle_closure->function (idle_closure->thread_closure,
- idle_closure->data);
-
- return G_SOURCE_REMOVE;
-}
-
-static void
-session_thread_idle_add (ThreadClosure *thread_closure,
- SessionThreadFunc function,
- gpointer data,
- GDestroyNotify notify)
-{
- IdleClosure *idle_closure;
-
- g_return_if_fail (thread_closure != NULL);
- g_return_if_fail (function != NULL);
-
- idle_closure = g_slice_new (IdleClosure);
- idle_closure->thread_closure = thread_closure_ref (thread_closure);
- idle_closure->function = function;
- idle_closure->data = data;
- idle_closure->notify = notify;
-
- g_main_context_invoke_full (thread_closure->main_context,
- G_PRIORITY_DEFAULT,
- session_thread_idle_dispatch,
- idle_closure, /* takes ownership */
- (GDestroyNotify) idle_closure_free);
-}
-
-static void
-session_thread_add_logger (ThreadClosure *thread_closure,
- gpointer data)
-{
- glnx_unref_object SoupLogger *logger = NULL;
-
- logger = soup_logger_new (SOUP_LOGGER_LOG_BODY);
- soup_logger_set_max_body_size (logger, 500);
- soup_session_add_feature (thread_closure->session,
- SOUP_SESSION_FEATURE (logger));
-}
-
-static void
-session_thread_set_proxy_cb (ThreadClosure *thread_closure,
- gpointer data)
-{
- GProxyResolver *resolver = data;
-
- g_object_set (thread_closure->session,
- "proxy-resolver",
- g_object_ref (resolver), NULL);
-}
-
-static void
-session_thread_set_cookie_jar_cb (ThreadClosure *thread_closure,
- gpointer data)
-{
- SoupCookieJar *jar = data;
-
- soup_session_add_feature (thread_closure->session,
- SOUP_SESSION_FEATURE (jar));
-}
-
-static void
-session_thread_set_headers_cb (ThreadClosure *thread_closure,
- gpointer data)
-{
- GVariant *headers = data;
-
- g_clear_pointer (&thread_closure->extra_headers, g_variant_unref);
- thread_closure->extra_headers = g_variant_ref (headers);
-}
-
-static void
-session_thread_set_tls_interaction_cb (ThreadClosure *thread_closure,
- gpointer data)
-{
- const char *cert_and_key_path = data; /* str\0str\0 in one malloc buf */
- const char *cert_path = cert_and_key_path;
- const char *key_path = cert_and_key_path + strlen (cert_and_key_path) + 1;
- g_autoptr(OstreeTlsCertInteraction) interaction = NULL;
-
- /* The GTlsInteraction instance must be created in the
- * session thread so it uses the correct GMainContext. */
- interaction = _ostree_tls_cert_interaction_new (cert_path, key_path);
-
- g_object_set (thread_closure->session,
- "tls-interaction",
- interaction, NULL);
-}
-
-static void
-session_thread_set_tls_database_cb (ThreadClosure *thread_closure,
- gpointer data)
-{
- const char *db_path = data;
-
- if (db_path != NULL)
- {
- glnx_unref_object GTlsDatabase *tlsdb = NULL;
-
- g_clear_error (&thread_closure->initialization_error);
- tlsdb = g_tls_file_database_new (db_path, &thread_closure->initialization_error);
-
- if (tlsdb)
- g_object_set (thread_closure->session,
- "tls-database",
- tlsdb, NULL);
- }
-}
-
-static void
-session_thread_set_extra_user_agent_cb (ThreadClosure *thread_closure,
- gpointer data)
-{
- const char *extra_user_agent = data;
- if (extra_user_agent != NULL)
- {
- g_autofree char *ua =
- g_strdup_printf ("%s %s", OSTREE_FETCHER_USERAGENT_STRING, extra_user_agent);
- g_object_set (thread_closure->session, "user-agent", ua, NULL);
- }
- else
- {
- g_object_set (thread_closure->session, "user-agent",
- OSTREE_FETCHER_USERAGENT_STRING, NULL);
- }
-}
-
-static void
-on_request_sent (GObject *object, GAsyncResult *result, gpointer user_data);
-
-static void
-request_send_async (ThreadClosure *thread_closure,
- GTask *task)
-{
- OstreeFetcherPendingURI *pending;
- GCancellable *cancellable;
-
- pending = g_task_get_task_data (task);
- cancellable = g_task_get_cancellable (task);
-
- if (pending->file)
- g_file_read_async (pending->file, g_task_get_priority (task),
- cancellable, on_request_sent, task);
- else
- {
- soup_session_send_async (thread_closure->session, pending->message,
- g_task_get_priority (task),
- cancellable, on_request_sent, task);
- }
-}
-
-static void
-start_pending_request (ThreadClosure *thread_closure,
- GTask *task)
-{
-
- OstreeFetcherPendingURI *pending;
-
- pending = g_task_get_task_data (task);
-
- g_hash_table_add (thread_closure->outstanding, pending_uri_ref (pending));
- request_send_async (thread_closure, g_object_ref (task));
-}
+on_request_sent (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data);
static gboolean
_message_accept_cert_loose (SoupMessage *msg,
GTlsCertificate *tls_peer_certificate,
- GTlsCertificateFlags tls_peer_errors,
+ GTlsCertificateFlags tls_peer_errors,
gpointer data)
{
- OstreeFetcherConfigFlags config_flags;
-
- config_flags = GPOINTER_TO_UINT (data);
-
- return ((config_flags & OSTREE_FETCHER_FLAGS_TLS_PERMISSIVE) > 0);
+ return TRUE;
}
static void
-create_pending_soup_request (OstreeFetcherPendingURI *pending)
+create_request_message (FetcherRequest *request)
{
- OstreeFetcherURI *next_mirror = NULL;
- g_autoptr(OstreeFetcherURI) uri = NULL;
- GUri *guri = NULL;
+ g_assert (request->mirrorlist);
+ g_assert (request->mirrorlist_idx < request->mirrorlist->len);
- g_assert (pending->mirrorlist);
- g_assert (pending->mirrorlist_idx < pending->mirrorlist->len);
-
- next_mirror = g_ptr_array_index (pending->mirrorlist, pending->mirrorlist_idx);
- if (pending->filename)
- uri = _ostree_fetcher_uri_new_subpath (next_mirror, pending->filename);
+ OstreeFetcherURI *next_mirror = g_ptr_array_index (request->mirrorlist, request->mirrorlist_idx);
+ g_autoptr(OstreeFetcherURI) uri = NULL;
+ if (request->filename)
+ uri = _ostree_fetcher_uri_new_subpath (next_mirror, request->filename);
if (!uri)
- uri = (OstreeFetcherURI*)g_uri_ref ((GUri*)next_mirror);
+ uri = (OstreeFetcherURI *) g_uri_ref ((GUri *) next_mirror);
- g_clear_object (&pending->message);
- g_clear_object (&pending->file);
+ g_clear_object (&request->message);
+ g_clear_object (&request->file);
+ g_clear_object (&request->response_body);
- guri = (GUri*)(uri ? uri : next_mirror);
+ GUri *guri = (GUri *) uri;
/* file:// URI is handle via GFile */
if (!strcmp (g_uri_get_scheme (guri), "file"))
{
- char *str = g_uri_to_string (guri);
- pending->file = g_file_new_for_uri (str);
- g_free (str);
+ g_autofree char *str = g_uri_to_string (guri);
+ request->file = g_file_new_for_uri (str);
return;
}
- pending->message = soup_message_new_from_uri ("GET", guri);
+ request->message = soup_message_new_from_uri ("GET", guri);
- if (pending->if_none_match != NULL)
- {
- soup_message_headers_append (soup_message_get_request_headers (pending->message),
- "If-None-Match", pending->if_none_match);
- }
+ if (request->if_none_match != NULL)
+ soup_message_headers_append (soup_message_get_request_headers (request->message),
+ "If-None-Match", request->if_none_match);
- if (pending->if_modified_since > 0)
+ if (request->if_modified_since > 0)
{
- g_autoptr(GDateTime) date_time = g_date_time_new_from_unix_utc (pending->if_modified_since);
+ g_autoptr(GDateTime) date_time = g_date_time_new_from_unix_utc (request->if_modified_since);
g_autofree char *mod_date = g_date_time_format (date_time, "%a, %d %b %Y %H:%M:%S %Z");
- soup_message_headers_append (soup_message_get_request_headers (pending->message),
+ soup_message_headers_append (soup_message_get_request_headers (request->message),
"If-Modified-Since", mod_date);
}
- if (pending->message && pending->fetcher->config_flags != 0)
- {
- g_signal_connect (pending->message, "accept-certificate",
- G_CALLBACK (_message_accept_cert_loose),
- GUINT_TO_POINTER (pending->fetcher->config_flags));
- }
-}
+ if ((request->fetcher->config_flags & OSTREE_FETCHER_FLAGS_TLS_PERMISSIVE) != 0)
+ g_signal_connect (request->message, "accept-certificate",
+ G_CALLBACK (_message_accept_cert_loose), NULL);
-static void
-session_thread_request_uri (ThreadClosure *thread_closure,
- gpointer data)
-{
- GTask *task = G_TASK (data);
- OstreeFetcherPendingURI *pending;
-
- pending = g_task_get_task_data (task);
-
- /* If we caught an error in init, re-throw it for every request */
- if (thread_closure->initialization_error)
+ if (request->fetcher->extra_headers)
{
- g_task_return_error (task, g_error_copy (thread_closure->initialization_error));
- return;
- }
-
- create_pending_soup_request (pending);
-
- if (pending->message && thread_closure->extra_headers)
- {
- g_autoptr(GVariantIter) viter = g_variant_iter_new (thread_closure->extra_headers);
+ g_autoptr(GVariantIter) viter = g_variant_iter_new (request->fetcher->extra_headers);
const char *key;
const char *value;
while (g_variant_iter_next (viter, "(&s&s)", &key, &value))
- soup_message_headers_append (soup_message_get_request_headers (pending->message), key, value);
+ soup_message_headers_append (soup_message_get_request_headers (request->message), key, value);
}
-
- if (pending->is_membuf)
- request_send_async (thread_closure, g_object_ref (task));
- else
- start_pending_request (thread_closure, task);
}
-static gpointer
-ostree_fetcher_session_thread (gpointer data)
+static void
+initiate_task_request (GTask *task)
{
- ThreadClosure *closure = data;
- g_autoptr(GMainContext) mainctx = g_main_context_ref (closure->main_context);
-
- /* This becomes the GMainContext that SoupSession schedules async
- * callbacks and emits signals from. Make it the thread-default
- * context for this thread before creating the session. */
- g_main_context_push_thread_default (mainctx);
-
- /* We retain ownership of the SoupSession reference. */
- closure->session = soup_session_new_with_options ("user-agent", OSTREE_FETCHER_USERAGENT_STRING,
- "timeout", 60,
- "idle-timeout", 60,
- "max-conns-per-host", _OSTREE_MAX_OUTSTANDING_FETCHER_REQUESTS,
- NULL);
+ FetcherRequest *request = g_task_get_task_data (task);
+ create_request_message (request);
- /* SoupContentDecoder is included in the session by default. Remove it
- * if gzip compression isn't in use.
- */
- if (!closure->transfer_gzip)
- soup_session_remove_feature_by_type (closure->session, SOUP_TYPE_CONTENT_DECODER);
-
- /* This model ensures we don't hit a race using g_main_loop_quit();
- * see also what pull_termination_condition() in ostree-repo-pull.c
- * is doing.
- */
- while (g_atomic_int_get (&closure->running))
- g_main_context_iteration (closure->main_context, TRUE);
-
- /* Since the ThreadClosure may be finalized from any thread we
- * unreference all data related to the SoupSession ourself to ensure
- * it's freed in the same thread where it was created. */
- g_clear_pointer (&closure->outstanding, g_hash_table_unref);
- g_clear_pointer (&closure->session, g_object_unref);
-
- thread_closure_unref (closure);
-
- /* Do this last, since libsoup uses g_main_current_source() which
- * relies on it.
- */
- g_main_context_pop_thread_default (mainctx);
-
- return NULL;
+ GCancellable *cancellable = g_task_get_cancellable (task);
+ int priority = g_task_get_priority (task);
+ if (request->file)
+ g_file_read_async (request->file, priority, cancellable, on_request_sent, task);
+ else
+ {
+ g_autofree char *uri = g_uri_to_string (soup_message_get_uri (request->message));
+ const char *dest = request->is_membuf ? "memory" : "tmpfile";
+ g_debug ("Requesting %s to %s for session %p in main context %p",
+ uri, dest, request->session, request->mainctx);
+ soup_session_send_async (request->session, request->message,
+ priority, cancellable, on_request_sent, task);
+ }
}
static void
@@ -567,18 +241,14 @@ _ostree_fetcher_finalize (GObject *object)
{
OstreeFetcher *self = OSTREE_FETCHER (object);
- /* Terminate the session thread. */
- g_atomic_int_set (&self->thread_closure->running, 0);
- g_main_context_wakeup (self->thread_closure->main_context);
- if (self->session_thread)
- {
- /* We need to explicitly synchronize to clean up TLS */
- if (self->session_thread != g_thread_self ())
- g_thread_join (self->session_thread);
- else
- g_clear_pointer (&self->session_thread, g_thread_unref);
- }
- g_clear_pointer (&self->thread_closure, thread_closure_unref);
+ g_clear_pointer (&self->remote_name, g_free);
+ g_clear_pointer (&self->sessions, g_hash_table_unref);
+ g_clear_object (&self->proxy_resolver);
+ g_clear_object (&self->cookie_jar);
+ g_clear_object (&self->client_cert);
+ g_clear_object (&self->tls_database);
+ g_clear_pointer (&self->extra_headers, g_variant_unref);
+ g_clear_pointer (&self->user_agent, g_free);
G_OBJECT_CLASS (_ostree_fetcher_parent_class)->finalize (object);
}
@@ -587,40 +257,11 @@ static void
_ostree_fetcher_constructed (GObject *object)
{
OstreeFetcher *self = OSTREE_FETCHER (object);
- g_autoptr(GMainContext) main_context = NULL;
- const char *http_proxy;
-
- main_context = g_main_context_new ();
- self->thread_closure = g_slice_new0 (ThreadClosure);
- self->thread_closure->ref_count = 1;
- self->thread_closure->main_context = g_main_context_ref (main_context);
- self->thread_closure->running = 1;
- self->thread_closure->transfer_gzip = (self->config_flags & OSTREE_FETCHER_FLAGS_TRANSFER_GZIP) != 0;
-
- self->thread_closure->outstanding = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify)pending_uri_unref);
- self->thread_closure->output_stream_set = g_hash_table_new_full (NULL, NULL,
- (GDestroyNotify) NULL,
- (GDestroyNotify) g_object_unref);
- g_mutex_init (&self->thread_closure->output_stream_set_lock);
-
- if (g_getenv ("OSTREE_DEBUG_HTTP"))
- {
- session_thread_idle_add (self->thread_closure,
- session_thread_add_logger,
- NULL, (GDestroyNotify) NULL);
- }
-
- http_proxy = g_getenv ("http_proxy");
+ const char *http_proxy = g_getenv ("http_proxy");
if (http_proxy != NULL && http_proxy[0] != '\0')
_ostree_fetcher_set_proxy (self, http_proxy);
- /* FIXME Maybe implement GInitableIface and use g_thread_try_new()
- * so we can try to handle thread creation errors gracefully? */
- self->session_thread = g_thread_new ("fetcher-session-thread",
- ostree_fetcher_session_thread,
- thread_closure_ref (self->thread_closure));
-
G_OBJECT_CLASS (_ostree_fetcher_parent_class)->constructed (object);
}
@@ -649,72 +290,56 @@ _ostree_fetcher_class_init (OstreeFetcherClass *klass)
static void
_ostree_fetcher_init (OstreeFetcher *self)
{
+ self->sessions = g_hash_table_new (g_direct_hash, g_direct_equal);
}
OstreeFetcher *
-_ostree_fetcher_new (int tmpdir_dfd,
- const char *remote_name,
- OstreeFetcherConfigFlags flags)
+_ostree_fetcher_new (int tmpdir_dfd,
+ const char *remote_name,
+ OstreeFetcherConfigFlags flags)
{
- OstreeFetcher *self;
-
- self = g_object_new (OSTREE_TYPE_FETCHER, "config-flags", flags, NULL);
- self->thread_closure->remote_name = g_strdup (remote_name);
- self->thread_closure->base_tmpdir_dfd = tmpdir_dfd;
+ OstreeFetcher *self = g_object_new (OSTREE_TYPE_FETCHER, "config-flags", flags, NULL);
+ self->remote_name = g_strdup (remote_name);
+ self->tmpdir_dfd = tmpdir_dfd;
return self;
}
int
-_ostree_fetcher_get_dfd (OstreeFetcher *fetcher)
+_ostree_fetcher_get_dfd (OstreeFetcher *self)
{
- return fetcher->thread_closure->base_tmpdir_dfd;
+ return self->tmpdir_dfd;
}
void
_ostree_fetcher_set_proxy (OstreeFetcher *self,
const char *http_proxy)
{
- GProxyResolver *resolver;
- GUri *guri;
-
g_return_if_fail (OSTREE_IS_FETCHER (self));
g_return_if_fail (http_proxy != NULL && http_proxy[0] != '\0');
/* validate first */
- guri = g_uri_parse (http_proxy, SOUP_HTTP_URI_FLAGS, NULL);
-
- if (!guri)
+ g_autoptr(GError) local_error = NULL;
+ g_autoptr(GUri) guri = g_uri_parse (http_proxy, G_URI_FLAGS_NONE, &local_error);
+ if (guri == NULL)
{
- g_warning ("Invalid proxy URI '%s'", http_proxy);
+ g_warning ("Invalid proxy URI '%s': %s", http_proxy, local_error->message);
return;
}
- g_uri_unref (guri);
-
- resolver = g_simple_proxy_resolver_new (http_proxy, NULL);
-
- session_thread_idle_add (self->thread_closure,
- session_thread_set_proxy_cb,
- resolver, /* takes ownership */
- (GDestroyNotify) g_object_unref);
- }
+ g_clear_object (&self->proxy_resolver);
+ self->proxy_resolver = g_simple_proxy_resolver_new (http_proxy, NULL);
+}
void
_ostree_fetcher_set_cookie_jar (OstreeFetcher *self,
const char *jar_path)
{
- SoupCookieJar *jar;
-
g_return_if_fail (OSTREE_IS_FETCHER (self));
g_return_if_fail (jar_path != NULL);
- jar = soup_cookie_jar_text_new (jar_path, TRUE);
-
- session_thread_idle_add (self->thread_closure,
- session_thread_set_cookie_jar_cb,
- jar, /* takes ownership */
- (GDestroyNotify) g_object_unref);
+ g_clear_object (&self->cookie_jar);
+ self->cookie_jar = soup_cookie_jar_text_new (jar_path, TRUE);
}
void
@@ -722,20 +347,10 @@ _ostree_fetcher_set_client_cert (OstreeFetcher *self,
const char *cert_path,
const char *key_path)
{
- g_autoptr(GString) buf = NULL;
g_return_if_fail (OSTREE_IS_FETCHER (self));
- if (cert_path)
- {
- buf = g_string_new (cert_path);
- g_string_append_c (buf, '\0');
- g_string_append (buf, key_path);
- }
-
- session_thread_idle_add (self->thread_closure,
- session_thread_set_tls_interaction_cb,
- g_string_free (g_steal_pointer (&buf), FALSE),
- (GDestroyNotify) g_free);
+ g_clear_object (&self->client_cert);
+ self->client_cert = _ostree_tls_cert_interaction_new (cert_path, key_path);
}
void
@@ -744,235 +359,201 @@ _ostree_fetcher_set_tls_database (OstreeFetcher *self,
{
g_return_if_fail (OSTREE_IS_FETCHER (self));
- session_thread_idle_add (self->thread_closure,
- session_thread_set_tls_database_cb,
- g_strdup (tlsdb_path),
- (GDestroyNotify) g_free);
+ g_autoptr(GError) local_error = NULL;
+ GTlsDatabase *tlsdb = g_tls_file_database_new (tlsdb_path, &local_error);
+ if (tlsdb == NULL)
+ {
+ g_warning ("Invalid TLS database '%s': %s", tlsdb_path, local_error->message);
+ return;
+ }
+
+ g_clear_object (&self->tls_database);
+ self->tls_database = tlsdb;
}
void
_ostree_fetcher_set_extra_headers (OstreeFetcher *self,
GVariant *extra_headers)
{
- session_thread_idle_add (self->thread_closure,
- session_thread_set_headers_cb,
- g_variant_ref (extra_headers),
- (GDestroyNotify) g_variant_unref);
+ g_return_if_fail (OSTREE_IS_FETCHER (self));
+
+ g_clear_pointer (&self->extra_headers, g_variant_unref);
+ self->extra_headers = g_variant_ref (extra_headers);
}
void
_ostree_fetcher_set_extra_user_agent (OstreeFetcher *self,
const char *extra_user_agent)
{
- session_thread_idle_add (self->thread_closure,
- session_thread_set_extra_user_agent_cb,
- g_strdup (extra_user_agent),
- (GDestroyNotify) g_free);
+ g_return_if_fail (OSTREE_IS_FETCHER (self));
+
+ g_clear_pointer (&self->user_agent, g_free);
+ if (extra_user_agent != NULL)
+ self->user_agent = g_strdup_printf ("%s %s",
+ OSTREE_FETCHER_USERAGENT_STRING,
+ extra_user_agent);
}
static gboolean
-finish_stream (OstreeFetcherPendingURI *pending,
- GCancellable *cancellable,
- GError **error)
+finish_stream (FetcherRequest *request,
+ GCancellable *cancellable,
+ GError **error)
{
- gboolean ret = FALSE;
- struct stat stbuf;
-
/* Close it here since we do an async fstat(), where we don't want
* to hit a bad fd.
*/
- if (pending->out_stream)
+ if (request->out_stream)
{
- if ((pending->flags & OSTREE_FETCHER_REQUEST_NUL_TERMINATION) > 0)
+ if ((request->flags & OSTREE_FETCHER_REQUEST_NUL_TERMINATION) > 0)
{
const guint8 nulchar = 0;
- gsize bytes_written;
- if (!g_output_stream_write_all (pending->out_stream, &nulchar, 1, &bytes_written,
+ if (!g_output_stream_write_all (request->out_stream, &nulchar, 1, NULL,
cancellable, error))
- goto out;
+ return FALSE;
}
- if (!g_output_stream_close (pending->out_stream, cancellable, error))
- goto out;
-
- g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
- g_hash_table_remove (pending->thread_closure->output_stream_set,
- pending->out_stream);
- g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
+ if (!g_output_stream_close (request->out_stream, cancellable, error))
+ return FALSE;
}
- if (!pending->is_membuf)
+ if (!request->is_membuf)
{
- if (!glnx_fstat (pending->tmpf.fd, &stbuf, error))
- goto out;
- }
+ struct stat stbuf;
- pending->state = OSTREE_FETCHER_STATE_COMPLETE;
+ if (!glnx_fstat (request->tmpf.fd, &stbuf, error))
+ return FALSE;
- if (!pending->is_membuf)
- {
- if (stbuf.st_size < pending->content_length)
+ if (request->content_length >= 0 && stbuf.st_size < request->content_length)
{
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Download incomplete");
- goto out;
- }
- else
- {
- g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
- pending->thread_closure->total_downloaded += stbuf.st_size;
- g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
+ return FALSE;
}
}
- ret = TRUE;
- out:
- (void) g_input_stream_close (pending->request_body, NULL, NULL);
- return ret;
+ return TRUE;
}
static void
-on_stream_read (GObject *object,
- GAsyncResult *result,
- gpointer user_data);
+on_stream_read (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data);
static void
-remove_pending (OstreeFetcherPendingURI *pending)
+on_out_splice_complete (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data)
{
- /* Hold a temporary ref to ensure the reference to
- * pending->thread_closure is valid.
- */
- pending_uri_ref (pending);
- g_hash_table_remove (pending->thread_closure->outstanding, pending);
- pending_uri_unref (pending);
-}
-
-static void
-on_out_splice_complete (GObject *object,
- GAsyncResult *result,
- gpointer user_data)
-{
- GTask *task = G_TASK (user_data);
- OstreeFetcherPendingURI *pending;
- GCancellable *cancellable;
- gssize bytes_written;
+ g_autoptr(GTask) task = G_TASK (user_data);
GError *local_error = NULL;
- pending = g_task_get_task_data (task);
- cancellable = g_task_get_cancellable (task);
-
- bytes_written = g_output_stream_splice_finish ((GOutputStream *)object,
- result,
- &local_error);
+ gssize bytes_written = g_output_stream_splice_finish ((GOutputStream *) object, result, &local_error);
if (bytes_written < 0)
- goto out;
+ {
+ g_task_return_error (task, local_error);
+ return;
+ }
+
+ FetcherRequest *request = g_task_get_task_data (task);
+ request->fetcher->bytes_transferred += bytes_written;
- g_input_stream_read_bytes_async (pending->request_body,
+ GCancellable *cancellable = g_task_get_cancellable (task);
+ g_input_stream_read_bytes_async (request->response_body,
8192, G_PRIORITY_DEFAULT,
cancellable,
on_stream_read,
g_object_ref (task));
-
- out:
- if (local_error)
- {
- g_task_return_error (task, local_error);
- remove_pending (pending);
- }
-
- g_object_unref (task);
}
static void
-on_stream_read (GObject *object,
- GAsyncResult *result,
- gpointer user_data)
+on_stream_read (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data)
{
- GTask *task = G_TASK (user_data);
- OstreeFetcherPendingURI *pending;
- GCancellable *cancellable;
- g_autoptr(GBytes) bytes = NULL;
- gsize bytes_read;
+ g_autoptr(GTask) task = G_TASK (user_data);
+ FetcherRequest *request = g_task_get_task_data (task);
GError *local_error = NULL;
- pending = g_task_get_task_data (task);
- cancellable = g_task_get_cancellable (task);
-
/* Only open the output stream on demand to ensure we use as
* few file descriptors as possible.
*/
- if (!pending->out_stream)
+ if (!request->out_stream)
{
- if (!pending->is_membuf)
+ if (!request->is_membuf)
{
- if (!_ostree_fetcher_tmpf_from_flags (pending->flags, pending->thread_closure->base_tmpdir_dfd,
- &pending->tmpf, &local_error))
- goto out;
- pending->out_stream = g_unix_output_stream_new (pending->tmpf.fd, FALSE);
+ if (!_ostree_fetcher_tmpf_from_flags (request->flags, request->fetcher->tmpdir_dfd,
+ &request->tmpf, &local_error))
+ {
+ g_task_return_error (task, local_error);
+ return;
+ }
+ request->out_stream = g_unix_output_stream_new (request->tmpf.fd, FALSE);
}
else
{
- pending->out_stream = g_memory_output_stream_new_resizable ();
+ request->out_stream = g_memory_output_stream_new_resizable ();
}
-
- g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
- g_hash_table_add (pending->thread_closure->output_stream_set,
- g_object_ref (pending->out_stream));
- g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
}
/* Get a GBytes buffer */
- bytes = g_input_stream_read_bytes_finish ((GInputStream*)object, result, &local_error);
+ g_autoptr(GBytes) bytes = g_input_stream_read_bytes_finish ((GInputStream *) object, result, &local_error);
if (!bytes)
- goto out;
- bytes_read = g_bytes_get_size (bytes);
+ {
+ g_task_return_error (task, local_error);
+ return;
+ }
/* Was this the end of the stream? */
+ GCancellable *cancellable = g_task_get_cancellable (task);
+ gsize bytes_read = g_bytes_get_size (bytes);
if (bytes_read == 0)
{
- if (!finish_stream (pending, cancellable, &local_error))
- goto out;
- if (pending->is_membuf)
+ if (!finish_stream (request, cancellable, &local_error))
+ {
+ g_task_return_error (task, local_error);
+ return;
+ }
+ if (request->is_membuf)
{
- g_task_return_pointer (task,
- g_memory_output_stream_steal_as_bytes ((GMemoryOutputStream*)pending->out_stream),
- (GDestroyNotify) g_bytes_unref);
+ GBytes *mem_bytes = g_memory_output_stream_steal_as_bytes ((GMemoryOutputStream *) request->out_stream);
+ g_task_return_pointer (task, mem_bytes, (GDestroyNotify) g_bytes_unref);
}
else
{
- if (lseek (pending->tmpf.fd, 0, SEEK_SET) < 0)
+ if (lseek (request->tmpf.fd, 0, SEEK_SET) < 0)
{
glnx_set_error_from_errno (&local_error);
g_task_return_error (task, g_steal_pointer (&local_error));
+ return;
}
- else
- g_task_return_boolean (task, TRUE);
+
+ g_task_return_boolean (task, TRUE);
}
- remove_pending (pending);
}
else
{
/* Verify max size */
- if (pending->max_size > 0)
+ if (request->max_size > 0)
{
- if (bytes_read > pending->max_size ||
- (bytes_read + pending->current_size) > pending->max_size)
+ if (bytes_read > request->max_size ||
+ (bytes_read + request->current_size) > request->max_size)
{
g_autofree char *uristr = NULL;
- if (pending->file)
- uristr = g_file_get_uri (pending->file);
+ if (request->file)
+ uristr = g_file_get_uri (request->file);
else
- uristr = g_uri_to_string (soup_message_get_uri (pending->message));
+ uristr = g_uri_to_string (soup_message_get_uri (request->message));
local_error = g_error_new (G_IO_ERROR, G_IO_ERROR_FAILED,
"URI %s exceeded maximum size of %" G_GUINT64_FORMAT " bytes",
- uristr, pending->max_size);
- goto out;
+ uristr, request->max_size);
+ g_task_return_error (task, local_error);
+ return;
}
}
- pending->current_size += bytes_read;
+ request->current_size += bytes_read;
/* We do this instead of _write_bytes_async() as that's not
* guaranteed to do a complete write.
@@ -980,7 +561,7 @@ on_stream_read (GObject *object,
{
g_autoptr(GInputStream) membuf =
g_memory_input_stream_new_from_bytes (bytes);
- g_output_stream_splice_async (pending->out_stream, membuf,
+ g_output_stream_splice_async (request->out_stream, membuf,
G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE,
G_PRIORITY_DEFAULT,
cancellable,
@@ -988,214 +569,251 @@ on_stream_read (GObject *object,
g_object_ref (task));
}
}
-
- out:
- if (local_error)
- {
- g_task_return_error (task, local_error);
- remove_pending (pending);
- }
-
- g_object_unref (task);
}
static void
-on_request_sent (GObject *object,
- GAsyncResult *result,
- gpointer user_data)
+on_request_sent (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data)
{
- GTask *task = G_TASK (user_data);
- /* Hold a ref to the pending across this function, since we remove
- * it from the hash early in some cases, not in others. */
- OstreeFetcherPendingURI *pending = pending_uri_ref (g_task_get_task_data (task));
- GCancellable *cancellable = g_task_get_cancellable (task);
+ g_autoptr(GTask) task = G_TASK (user_data);
+ FetcherRequest *request = g_task_get_task_data (task);
GError *local_error = NULL;
- glnx_unref_object SoupMessage *msg = NULL;
- SoupStatus status;
-
- pending->state = OSTREE_FETCHER_STATE_COMPLETE;
- if (pending->file)
- pending->request_body = (GInputStream *)g_file_read_finish ((GFile *) object,
- result,
- &local_error);
+
+ if (request->file)
+ request->response_body = (GInputStream *) g_file_read_finish ((GFile *) object,
+ result,
+ &local_error);
else
- pending->request_body = soup_session_send_finish ((SoupSession*) object,
- result, &local_error);
+ request->response_body = soup_session_send_finish ((SoupSession *) object,
+ result, &local_error);
- if (!pending->request_body)
- goto out;
+ if (!request->response_body)
+ {
+ g_task_return_error (task, local_error);
+ return;
+ }
- if (pending->message)
+ if (request->message)
{
- status = soup_message_get_status (pending->message);
+ SoupStatus status = soup_message_get_status (request->message);
if (status == SOUP_STATUS_NOT_MODIFIED &&
- (pending->if_none_match != NULL || pending->if_modified_since > 0))
+ (request->if_none_match != NULL || request->if_modified_since > 0))
{
/* Version on the server is unchanged from the version we have cached locally;
* report this as an out-argument, a zero-length response buffer, and no error */
- pending->out_not_modified = TRUE;
+ request->out_not_modified = TRUE;
}
else if (!SOUP_STATUS_IS_SUCCESSFUL (status))
{
/* is there another mirror we can try? */
- if (pending->mirrorlist_idx + 1 < pending->mirrorlist->len)
+ if (request->mirrorlist_idx + 1 < request->mirrorlist->len)
{
- pending->mirrorlist_idx++;
- create_pending_soup_request (pending);
-
- (void) g_input_stream_close (pending->request_body, NULL, NULL);
-
- start_pending_request (pending->thread_closure, task);
+ request->mirrorlist_idx++;
+ initiate_task_request (g_object_ref (task));
+ return;
}
else
{
- g_autofree char *uristring = g_uri_to_string (soup_message_get_uri (pending->message));
+ g_autofree char *uristring = g_uri_to_string (soup_message_get_uri (request->message));
GIOErrorEnum code = _ostree_fetcher_http_status_code_to_io_error (status);
{
g_autofree char *errmsg =
g_strdup_printf ("Server returned status %u: %s",
status,
soup_status_get_phrase (status));
-
- /* Let's make OOB errors be the final one since they're probably
- * the cause for the error here. */
- if (pending->thread_closure->oob_error)
- {
- local_error =
- g_error_copy (pending->thread_closure->oob_error);
- g_prefix_error (&local_error, "%s: ", errmsg);
- }
- else
- local_error = g_error_new_literal (G_IO_ERROR, code, errmsg);
+ local_error = g_error_new_literal (G_IO_ERROR, code, errmsg);
}
- if (pending->mirrorlist->len > 1)
+ if (request->mirrorlist->len > 1)
g_prefix_error (&local_error,
"All %u mirrors failed. Last error was: ",
- pending->mirrorlist->len);
- if (pending->thread_closure->remote_name &&
- !((pending->flags & OSTREE_FETCHER_REQUEST_OPTIONAL_CONTENT) > 0 &&
+ request->mirrorlist->len);
+ if (request->fetcher->remote_name &&
+ !((request->flags & OSTREE_FETCHER_REQUEST_OPTIONAL_CONTENT) > 0 &&
code == G_IO_ERROR_NOT_FOUND))
- _ostree_fetcher_journal_failure (pending->thread_closure->remote_name,
+ _ostree_fetcher_journal_failure (request->fetcher->remote_name,
uristring, local_error->message);
+ g_task_return_error (task, local_error);
+ return;
}
- goto out;
}
/* Grab cache properties from the response */
- pending->out_etag = g_strdup (soup_message_headers_get_one (soup_message_get_response_headers (pending->message), "ETag"));
- pending->out_last_modified = 0;
+ request->out_etag = g_strdup (soup_message_headers_get_one (soup_message_get_response_headers (request->message), "ETag"));
+ request->out_last_modified = 0;
- const char *last_modified_str = soup_message_headers_get_one (soup_message_get_response_headers (pending->message), "Last-Modified");
+ const char *last_modified_str = soup_message_headers_get_one (soup_message_get_response_headers (request->message), "Last-Modified");
if (last_modified_str != NULL)
{
GDateTime *soup_date = soup_date_time_new_from_http_string (last_modified_str);
if (soup_date != NULL)
{
- pending->out_last_modified = g_date_time_to_unix (soup_date);
+ request->out_last_modified = g_date_time_to_unix (soup_date);
g_date_time_unref (soup_date);
}
}
}
- pending->state = OSTREE_FETCHER_STATE_DOWNLOADING;
-
- if (pending->file)
+ if (request->file)
{
- GFileInfo *info = g_file_query_info (pending->file,
+ GFileInfo *info = g_file_query_info (request->file,
G_FILE_ATTRIBUTE_STANDARD_CONTENT_TYPE ","
G_FILE_ATTRIBUTE_STANDARD_SIZE,
0, NULL, NULL);
if (info)
{
- pending->content_length = g_file_info_get_size (info);
+ request->content_length = g_file_info_get_size (info);
g_object_unref (info);
}
else
- pending->content_length = -1;
+ request->content_length = -1;
}
else
- pending->content_length = soup_message_headers_get_content_length (soup_message_get_response_headers (pending->message));
+ {
+ SoupMessageHeaders *headers = soup_message_get_response_headers (request->message);
+ if (soup_message_headers_get_list (headers, "Content-Encoding") == NULL)
+ request->content_length = soup_message_headers_get_content_length (headers);
+ else
+ request->content_length = -1;
+ }
- g_input_stream_read_bytes_async (pending->request_body,
+ GCancellable *cancellable = g_task_get_cancellable (task);
+ g_input_stream_read_bytes_async (request->response_body,
8192, G_PRIORITY_DEFAULT,
cancellable,
on_stream_read,
g_object_ref (task));
+}
- out:
- if (local_error)
+static SoupSession *
+create_soup_session (OstreeFetcher *self)
+{
+ const char *user_agent = self->user_agent ?: OSTREE_FETCHER_USERAGENT_STRING;
+ SoupSession *session =
+ soup_session_new_with_options ("user-agent", user_agent,
+ "timeout", 60,
+ "idle-timeout", 60,
+ "max-conns-per-host", _OSTREE_MAX_OUTSTANDING_FETCHER_REQUESTS,
+ NULL);
+
+ /* SoupContentDecoder is included in the session by default. Remove it
+ * if gzip compression isn't in use.
+ */
+ if ((self->config_flags & OSTREE_FETCHER_FLAGS_TRANSFER_GZIP) == 0)
+ soup_session_remove_feature_by_type (session, SOUP_TYPE_CONTENT_DECODER);
+
+ if (g_getenv ("OSTREE_DEBUG_HTTP"))
{
- if (pending->request_body)
- (void) g_input_stream_close (pending->request_body, NULL, NULL);
- g_task_return_error (task, local_error);
- remove_pending (pending);
+ glnx_unref_object SoupLogger *logger = soup_logger_new (SOUP_LOGGER_LOG_BODY);
+ soup_session_add_feature (session, SOUP_SESSION_FEATURE (logger));
}
- pending_uri_unref (pending);
- g_object_unref (task);
+ if (self->proxy_resolver != NULL)
+ soup_session_set_proxy_resolver (session, self->proxy_resolver);
+ if (self->cookie_jar != NULL)
+ soup_session_add_feature (session, SOUP_SESSION_FEATURE (self->cookie_jar));
+ if (self->client_cert != NULL)
+ soup_session_set_tls_interaction (session, (GTlsInteraction *) self->client_cert);
+ if (self->tls_database != NULL)
+ soup_session_set_tls_database (session, self->tls_database);
+
+ return session;
+}
+
+static gboolean
+match_value (gpointer key,
+ gpointer value,
+ gpointer user_data)
+{
+ return value == user_data;
}
static void
-_ostree_fetcher_request_async (OstreeFetcher *self,
- GPtrArray *mirrorlist,
- const char *filename,
- OstreeFetcherRequestFlags flags,
- const char *if_none_match,
- guint64 if_modified_since,
- gboolean is_membuf,
- guint64 max_size,
- int priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
+on_session_finalized (gpointer data,
+ GObject *object)
{
- g_autoptr(GTask) task = NULL;
- OstreeFetcherPendingURI *pending;
+ GHashTable *sessions = data;
+ g_debug ("Removing session %p from sessions hash table", object);
+ (void) g_hash_table_foreach_remove (sessions, match_value, object);
+}
+static void
+_ostree_fetcher_request_async (OstreeFetcher *self,
+ GPtrArray *mirrorlist,
+ const char *filename,
+ OstreeFetcherRequestFlags flags,
+ const char *if_none_match,
+ guint64 if_modified_since,
+ gboolean is_membuf,
+ guint64 max_size,
+ int priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
g_return_if_fail (OSTREE_IS_FETCHER (self));
g_return_if_fail (mirrorlist != NULL);
g_return_if_fail (mirrorlist->len > 0);
- /* SoupRequest is created in session thread. */
- pending = g_new0 (OstreeFetcherPendingURI, 1);
- pending->ref_count = 1;
- pending->thread_closure = thread_closure_ref (self->thread_closure);
- pending->mirrorlist = g_ptr_array_ref (mirrorlist);
- pending->filename = g_strdup (filename);
- pending->flags = flags;
- pending->if_none_match = g_strdup (if_none_match);
- pending->if_modified_since = if_modified_since;
- pending->max_size = max_size;
- pending->is_membuf = is_membuf;
- pending->fetcher = self;
-
- task = g_task_new (self, cancellable, callback, user_data);
+ FetcherRequest *request = g_new0 (FetcherRequest, 1);
+ request->mirrorlist = g_ptr_array_ref (mirrorlist);
+ request->filename = g_strdup (filename);
+ request->flags = flags;
+ request->if_none_match = g_strdup (if_none_match);
+ request->if_modified_since = if_modified_since;
+ request->max_size = max_size;
+ request->is_membuf = is_membuf;
+ request->fetcher = self;
+ request->mainctx = g_main_context_ref_thread_default ();
+
+ /* Ideally each fetcher would have a single soup session. However, each
+ * session needs to be used from a single main context and the fetcher
+ * doesn't have that limitation. Instead, a mapping from main context to
+ * session is kept in the fetcher.
+ */
+ g_debug ("Looking up session for main context %p", request->mainctx);
+ request->session = (SoupSession *) g_hash_table_lookup (self->sessions, request->mainctx);
+ if (request->session != NULL)
+ {
+ g_debug ("Using existing session %p", request->session);
+ g_object_ref (request->session);
+ }
+ else
+ {
+ request->session = create_soup_session (self);
+ g_debug ("Created new session %p", request->session);
+ g_hash_table_insert (self->sessions, request->mainctx, request->session);
+
+ /* Add a weak ref to the session so that when it's finalized it can be
+ * removed from the hash table.
+ */
+ g_object_weak_ref (G_OBJECT (request->session), on_session_finalized, self->sessions);
+ }
+
+ g_autoptr(GTask) task = g_task_new (self, cancellable, callback, user_data);
g_task_set_source_tag (task, _ostree_fetcher_request_async);
- g_task_set_task_data (task, pending, (GDestroyNotify) pending_uri_unref);
+ g_task_set_task_data (task, request, (GDestroyNotify) fetcher_request_free);
/* We'll use the GTask priority for our own priority queue. */
g_task_set_priority (task, priority);
- session_thread_idle_add (self->thread_closure,
- session_thread_request_uri,
- g_object_ref (task),
- (GDestroyNotify) g_object_unref);
+ initiate_task_request (g_object_ref (task));
}
void
-_ostree_fetcher_request_to_tmpfile (OstreeFetcher *self,
- GPtrArray *mirrorlist,
- const char *filename,
- OstreeFetcherRequestFlags flags,
- const char *if_none_match,
- guint64 if_modified_since,
- guint64 max_size,
- int priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
+_ostree_fetcher_request_to_tmpfile (OstreeFetcher *self,
+ GPtrArray *mirrorlist,
+ const char *filename,
+ OstreeFetcherRequestFlags flags,
+ const char *if_none_match,
+ guint64 if_modified_since,
+ guint64 max_size,
+ int priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
{
_ostree_fetcher_request_async (self, mirrorlist, filename, flags,
if_none_match, if_modified_since, FALSE,
@@ -1204,54 +822,49 @@ _ostree_fetcher_request_to_tmpfile (OstreeFetcher *self,
}
gboolean
-_ostree_fetcher_request_to_tmpfile_finish (OstreeFetcher *self,
- GAsyncResult *result,
- GLnxTmpfile *out_tmpf,
- gboolean *out_not_modified,
- char **out_etag,
- guint64 *out_last_modified,
- GError **error)
+_ostree_fetcher_request_to_tmpfile_finish (OstreeFetcher *self,
+ GAsyncResult *result,
+ GLnxTmpfile *out_tmpf,
+ gboolean *out_not_modified,
+ char **out_etag,
+ guint64 *out_last_modified,
+ GError **error)
{
- GTask *task;
- OstreeFetcherPendingURI *pending;
- gpointer ret;
-
g_return_val_if_fail (g_task_is_valid (result, self), FALSE);
g_return_val_if_fail (g_async_result_is_tagged (result, _ostree_fetcher_request_async), FALSE);
- task = (GTask*)result;
- pending = g_task_get_task_data (task);
-
- ret = g_task_propagate_pointer (task, error);
+ GTask *task = (GTask *) result;
+ gpointer ret = g_task_propagate_pointer (task, error);
if (!ret)
return FALSE;
- g_assert (!pending->is_membuf);
- *out_tmpf = pending->tmpf;
- pending->tmpf.initialized = FALSE; /* Transfer ownership */
+ FetcherRequest *request = g_task_get_task_data (task);
+ g_assert (!request->is_membuf);
+ *out_tmpf = request->tmpf;
+ request->tmpf.initialized = FALSE; /* Transfer ownership */
if (out_not_modified != NULL)
- *out_not_modified = pending->out_not_modified;
+ *out_not_modified = request->out_not_modified;
if (out_etag != NULL)
- *out_etag = g_steal_pointer (&pending->out_etag);
+ *out_etag = g_steal_pointer (&request->out_etag);
if (out_last_modified != NULL)
- *out_last_modified = pending->out_last_modified;
+ *out_last_modified = request->out_last_modified;
return TRUE;
}
void
-_ostree_fetcher_request_to_membuf (OstreeFetcher *self,
- GPtrArray *mirrorlist,
- const char *filename,
- OstreeFetcherRequestFlags flags,
- const char *if_none_match,
- guint64 if_modified_since,
- guint64 max_size,
- int priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
+_ostree_fetcher_request_to_membuf (OstreeFetcher *self,
+ GPtrArray *mirrorlist,
+ const char *filename,
+ OstreeFetcherRequestFlags flags,
+ const char *if_none_match,
+ guint64 if_modified_since,
+ guint64 max_size,
+ int priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
{
_ostree_fetcher_request_async (self, mirrorlist, filename, flags,
if_none_match, if_modified_since, TRUE,
@@ -1260,66 +873,40 @@ _ostree_fetcher_request_to_membuf (OstreeFetcher *self,
}
gboolean
-_ostree_fetcher_request_to_membuf_finish (OstreeFetcher *self,
- GAsyncResult *result,
- GBytes **out_buf,
- gboolean *out_not_modified,
- char **out_etag,
- guint64 *out_last_modified,
- GError **error)
+_ostree_fetcher_request_to_membuf_finish (OstreeFetcher *self,
+ GAsyncResult *result,
+ GBytes **out_buf,
+ gboolean *out_not_modified,
+ char **out_etag,
+ guint64 *out_last_modified,
+ GError **error)
{
- GTask *task;
- OstreeFetcherPendingURI *pending;
- gpointer ret;
-
g_return_val_if_fail (g_task_is_valid (result, self), FALSE);
g_return_val_if_fail (g_async_result_is_tagged (result, _ostree_fetcher_request_async), FALSE);
- task = (GTask*)result;
- pending = g_task_get_task_data (task);
-
- ret = g_task_propagate_pointer (task, error);
+ GTask *task = (GTask *) result;
+ gpointer ret = g_task_propagate_pointer (task, error);
if (!ret)
return FALSE;
- g_assert (pending->is_membuf);
+ FetcherRequest *request = g_task_get_task_data (task);
+ g_assert (request->is_membuf);
g_assert (out_buf);
*out_buf = ret;
if (out_not_modified != NULL)
- *out_not_modified = pending->out_not_modified;
+ *out_not_modified = request->out_not_modified;
if (out_etag != NULL)
- *out_etag = g_steal_pointer (&pending->out_etag);
+ *out_etag = g_steal_pointer (&request->out_etag);
if (out_last_modified != NULL)
- *out_last_modified = pending->out_last_modified;
+ *out_last_modified = request->out_last_modified;
return TRUE;
}
guint64
-_ostree_fetcher_bytes_transferred (OstreeFetcher *self)
+_ostree_fetcher_bytes_transferred (OstreeFetcher *self)
{
- g_return_val_if_fail (OSTREE_IS_FETCHER (self), 0);
-
- g_mutex_lock (&self->thread_closure->output_stream_set_lock);
-
- guint64 ret = self->thread_closure->total_downloaded;
-
- GLNX_HASH_TABLE_FOREACH (self->thread_closure->output_stream_set,
- GFileOutputStream*, stream)
- {
- if (G_IS_FILE_DESCRIPTOR_BASED (stream))
- {
- int fd = g_file_descriptor_based_get_fd ((GFileDescriptorBased*)stream);
- struct stat stbuf;
-
- if (glnx_fstat (fd, &stbuf, NULL))
- ret += stbuf.st_size;
- }
- }
-
- g_mutex_unlock (&self->thread_closure->output_stream_set_lock);
-
- return ret;
+ return self->bytes_transferred;
}