diff options
author | Sam Thursfield <sam@afuera.me.uk> | 2020-10-18 22:35:08 +0000 |
---|---|---|
committer | Sam Thursfield <sam@afuera.me.uk> | 2020-10-18 22:35:08 +0000 |
commit | e181a35484cdf2455d0f76f0e6085217b89edf7a (patch) | |
tree | 8e34d865ab533a49b88258e07b422db0285d2737 | |
parent | 20b52dc7cdce200771c4ce5a495fb32c2ee539ad (diff) | |
parent | 2d9743a93b9046b4ad17f4a51f408f1337dd5ba1 (diff) | |
download | tracker-e181a35484cdf2455d0f76f0e6085217b89edf7a.tar.gz |
Merge branch 'wip/carlosg/parallel-stmts' into 'master'
Improve behavior on parallel queries
See merge request GNOME/tracker!333
-rw-r--r-- | src/libtracker-data/tracker-db-manager.c | 26 | ||||
-rw-r--r-- | src/libtracker-sparql/tracker-notifier.c | 29 |
2 files changed, 46 insertions, 9 deletions
diff --git a/src/libtracker-data/tracker-db-manager.c b/src/libtracker-data/tracker-db-manager.c index 4eec98e11..5e98337de 100644 --- a/src/libtracker-data/tracker-db-manager.c +++ b/src/libtracker-data/tracker-db-manager.c @@ -819,7 +819,8 @@ TrackerDBInterface * tracker_db_manager_get_db_interface (TrackerDBManager *db_manager) { GError *internal_error = NULL; - TrackerDBInterface *interface; + TrackerDBInterface *interface = NULL; + guint len, i; /* The interfaces never actually leave the async queue, * we use it as a thread synchronized LRU, which doesn't @@ -830,19 +831,30 @@ tracker_db_manager_get_db_interface (TrackerDBManager *db_manager) * in the interface lock). */ g_async_queue_lock (db_manager->interfaces); - interface = g_async_queue_try_pop_unlocked (db_manager->interfaces); + len = g_async_queue_length_unlocked (db_manager->interfaces); + + /* 1st. Find a free interface */ + for (i = 0; i < len; i++) { + interface = g_async_queue_try_pop_unlocked (db_manager->interfaces); + + if (!interface) + break; + if (!tracker_db_interface_get_is_used (interface)) + break; - if (interface && tracker_db_interface_get_is_used (interface) && - g_async_queue_length_unlocked (db_manager->interfaces) < MAX_INTERFACES) { - /* Put it back and go at creating a new one */ - g_async_queue_push_front_unlocked (db_manager->interfaces, interface); + g_async_queue_push_unlocked (db_manager->interfaces, interface); interface = NULL; } + /* 2nd. If no more interfaces can be created, pick one */ + if (!interface && len >= MAX_INTERFACES) { + interface = g_async_queue_try_pop_unlocked (db_manager->interfaces); + } + if (interface) { g_signal_emit (db_manager, signals[UPDATE_INTERFACE], 0, interface); } else { - /* Create a new one to satisfy the request */ + /* 3rd. Create a new interface to satisfy the request */ interface = tracker_db_manager_create_db_interface (db_manager, TRUE, &internal_error); diff --git a/src/libtracker-sparql/tracker-notifier.c b/src/libtracker-sparql/tracker-notifier.c index 80f5867f4..6e2989bf8 100644 --- a/src/libtracker-sparql/tracker-notifier.c +++ b/src/libtracker-sparql/tracker-notifier.c @@ -70,7 +70,9 @@ struct _TrackerNotifierPrivate { GHashTable *subscriptions; /* guint -> TrackerNotifierSubscription */ GCancellable *cancellable; TrackerSparqlStatement *local_statement; + GAsyncQueue *queue; gint n_local_statement_slots; + gboolean querying; GMutex mutex; }; @@ -454,6 +456,8 @@ handle_cursor (GTask *task, { TrackerNotifierEventCache *cache = task_data; TrackerSparqlCursor *cursor = source_object; + TrackerNotifier *notifier = cache->notifier; + TrackerNotifierPrivate *priv = tracker_notifier_get_instance_private (notifier); TrackerNotifierEvent *event; GSequenceIter *iter; gint64 id; @@ -486,9 +490,19 @@ handle_cursor (GTask *task, cache->first = iter; if (g_sequence_iter_is_end (cache->first)) { + TrackerNotifierEventCache *next; + tracker_notifier_emit_events_in_idle (cache); + + g_async_queue_lock (priv->queue); + next = g_async_queue_try_pop_unlocked (priv->queue); + if (next) + tracker_notifier_query_extra_info (notifier, next); + else + priv->querying = FALSE; + g_async_queue_unlock (priv->queue); } else { - tracker_notifier_query_extra_info (cache->notifier, cache); + tracker_notifier_query_extra_info (notifier, cache); } g_task_return_boolean (task, TRUE); @@ -609,6 +623,7 @@ void _tracker_notifier_event_cache_flush_events (TrackerNotifierEventCache *cache) { TrackerNotifier *notifier = cache->notifier; + TrackerNotifierPrivate *priv = tracker_notifier_get_instance_private (notifier); if (g_sequence_is_empty (cache->sequence)) { _tracker_notifier_event_cache_free (cache); @@ -616,7 +631,15 @@ _tracker_notifier_event_cache_flush_events (TrackerNotifierEventCache *cache) } cache->first = g_sequence_get_begin_iter (cache->sequence); - tracker_notifier_query_extra_info (notifier, cache); + + g_async_queue_lock (priv->queue); + if (priv->querying) { + g_async_queue_push_unlocked (priv->queue, cache); + } else { + priv->querying = TRUE; + tracker_notifier_query_extra_info (notifier, cache); + } + g_async_queue_unlock (priv->queue); } static void @@ -691,6 +714,7 @@ tracker_notifier_finalize (GObject *object) g_cancellable_cancel (priv->cancellable); g_clear_object (&priv->cancellable); g_clear_object (&priv->local_statement); + g_async_queue_unref (priv->queue); if (priv->connection) g_object_unref (priv->connection); @@ -755,6 +779,7 @@ tracker_notifier_init (TrackerNotifier *notifier) priv->subscriptions = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify) tracker_notifier_subscription_free); priv->cancellable = g_cancellable_new (); + priv->queue = g_async_queue_new (); } /** |