summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Thursfield <sam@afuera.me.uk>2020-10-18 22:35:08 +0000
committerSam Thursfield <sam@afuera.me.uk>2020-10-18 22:35:08 +0000
commite181a35484cdf2455d0f76f0e6085217b89edf7a (patch)
tree8e34d865ab533a49b88258e07b422db0285d2737
parent20b52dc7cdce200771c4ce5a495fb32c2ee539ad (diff)
parent2d9743a93b9046b4ad17f4a51f408f1337dd5ba1 (diff)
downloadtracker-e181a35484cdf2455d0f76f0e6085217b89edf7a.tar.gz
Merge branch 'wip/carlosg/parallel-stmts' into 'master'
Improve behavior on parallel queries See merge request GNOME/tracker!333
-rw-r--r--src/libtracker-data/tracker-db-manager.c26
-rw-r--r--src/libtracker-sparql/tracker-notifier.c29
2 files changed, 46 insertions, 9 deletions
diff --git a/src/libtracker-data/tracker-db-manager.c b/src/libtracker-data/tracker-db-manager.c
index 4eec98e11..5e98337de 100644
--- a/src/libtracker-data/tracker-db-manager.c
+++ b/src/libtracker-data/tracker-db-manager.c
@@ -819,7 +819,8 @@ TrackerDBInterface *
tracker_db_manager_get_db_interface (TrackerDBManager *db_manager)
{
GError *internal_error = NULL;
- TrackerDBInterface *interface;
+ TrackerDBInterface *interface = NULL;
+ guint len, i;
/* The interfaces never actually leave the async queue,
* we use it as a thread synchronized LRU, which doesn't
@@ -830,19 +831,30 @@ tracker_db_manager_get_db_interface (TrackerDBManager *db_manager)
* in the interface lock).
*/
g_async_queue_lock (db_manager->interfaces);
- interface = g_async_queue_try_pop_unlocked (db_manager->interfaces);
+ len = g_async_queue_length_unlocked (db_manager->interfaces);
+
+ /* 1st. Find a free interface */
+ for (i = 0; i < len; i++) {
+ interface = g_async_queue_try_pop_unlocked (db_manager->interfaces);
+
+ if (!interface)
+ break;
+ if (!tracker_db_interface_get_is_used (interface))
+ break;
- if (interface && tracker_db_interface_get_is_used (interface) &&
- g_async_queue_length_unlocked (db_manager->interfaces) < MAX_INTERFACES) {
- /* Put it back and go at creating a new one */
- g_async_queue_push_front_unlocked (db_manager->interfaces, interface);
+ g_async_queue_push_unlocked (db_manager->interfaces, interface);
interface = NULL;
}
+ /* 2nd. If no more interfaces can be created, pick one */
+ if (!interface && len >= MAX_INTERFACES) {
+ interface = g_async_queue_try_pop_unlocked (db_manager->interfaces);
+ }
+
if (interface) {
g_signal_emit (db_manager, signals[UPDATE_INTERFACE], 0, interface);
} else {
- /* Create a new one to satisfy the request */
+ /* 3rd. Create a new interface to satisfy the request */
interface = tracker_db_manager_create_db_interface (db_manager,
TRUE, &internal_error);
diff --git a/src/libtracker-sparql/tracker-notifier.c b/src/libtracker-sparql/tracker-notifier.c
index 80f5867f4..6e2989bf8 100644
--- a/src/libtracker-sparql/tracker-notifier.c
+++ b/src/libtracker-sparql/tracker-notifier.c
@@ -70,7 +70,9 @@ struct _TrackerNotifierPrivate {
GHashTable *subscriptions; /* guint -> TrackerNotifierSubscription */
GCancellable *cancellable;
TrackerSparqlStatement *local_statement;
+ GAsyncQueue *queue;
gint n_local_statement_slots;
+ gboolean querying;
GMutex mutex;
};
@@ -454,6 +456,8 @@ handle_cursor (GTask *task,
{
TrackerNotifierEventCache *cache = task_data;
TrackerSparqlCursor *cursor = source_object;
+ TrackerNotifier *notifier = cache->notifier;
+ TrackerNotifierPrivate *priv = tracker_notifier_get_instance_private (notifier);
TrackerNotifierEvent *event;
GSequenceIter *iter;
gint64 id;
@@ -486,9 +490,19 @@ handle_cursor (GTask *task,
cache->first = iter;
if (g_sequence_iter_is_end (cache->first)) {
+ TrackerNotifierEventCache *next;
+
tracker_notifier_emit_events_in_idle (cache);
+
+ g_async_queue_lock (priv->queue);
+ next = g_async_queue_try_pop_unlocked (priv->queue);
+ if (next)
+ tracker_notifier_query_extra_info (notifier, next);
+ else
+ priv->querying = FALSE;
+ g_async_queue_unlock (priv->queue);
} else {
- tracker_notifier_query_extra_info (cache->notifier, cache);
+ tracker_notifier_query_extra_info (notifier, cache);
}
g_task_return_boolean (task, TRUE);
@@ -609,6 +623,7 @@ void
_tracker_notifier_event_cache_flush_events (TrackerNotifierEventCache *cache)
{
TrackerNotifier *notifier = cache->notifier;
+ TrackerNotifierPrivate *priv = tracker_notifier_get_instance_private (notifier);
if (g_sequence_is_empty (cache->sequence)) {
_tracker_notifier_event_cache_free (cache);
@@ -616,7 +631,15 @@ _tracker_notifier_event_cache_flush_events (TrackerNotifierEventCache *cache)
}
cache->first = g_sequence_get_begin_iter (cache->sequence);
- tracker_notifier_query_extra_info (notifier, cache);
+
+ g_async_queue_lock (priv->queue);
+ if (priv->querying) {
+ g_async_queue_push_unlocked (priv->queue, cache);
+ } else {
+ priv->querying = TRUE;
+ tracker_notifier_query_extra_info (notifier, cache);
+ }
+ g_async_queue_unlock (priv->queue);
}
static void
@@ -691,6 +714,7 @@ tracker_notifier_finalize (GObject *object)
g_cancellable_cancel (priv->cancellable);
g_clear_object (&priv->cancellable);
g_clear_object (&priv->local_statement);
+ g_async_queue_unref (priv->queue);
if (priv->connection)
g_object_unref (priv->connection);
@@ -755,6 +779,7 @@ tracker_notifier_init (TrackerNotifier *notifier)
priv->subscriptions = g_hash_table_new_full (NULL, NULL, NULL,
(GDestroyNotify) tracker_notifier_subscription_free);
priv->cancellable = g_cancellable_new ();
+ priv->queue = g_async_queue_new ();
}
/**