diff options
-rw-r--r-- | src/libtracker-sparql/direct/tracker-direct.c | 2 | ||||
-rw-r--r-- | src/libtracker-sparql/tracker-notifier-private.h | 3 | ||||
-rw-r--r-- | src/libtracker-sparql/tracker-notifier.c | 132 |
3 files changed, 89 insertions, 48 deletions
diff --git a/src/libtracker-sparql/direct/tracker-direct.c b/src/libtracker-sparql/direct/tracker-direct.c index c44254bcb..b3172ea32 100644 --- a/src/libtracker-sparql/direct/tracker-direct.c +++ b/src/libtracker-sparql/direct/tracker-direct.c @@ -723,7 +723,7 @@ commit_statement_cb (gpointer user_data) while (g_hash_table_iter_next (&iter, NULL, (gpointer *) &cache)) { g_hash_table_iter_steal (&iter); - _tracker_notifier_event_cache_flush_events (cache); + _tracker_notifier_event_cache_flush_events (notifier, cache); } } diff --git a/src/libtracker-sparql/tracker-notifier-private.h b/src/libtracker-sparql/tracker-notifier-private.h index c9bc73e18..f277cb3f6 100644 --- a/src/libtracker-sparql/tracker-notifier-private.h +++ b/src/libtracker-sparql/tracker-notifier-private.h @@ -37,7 +37,8 @@ _tracker_notifier_event_cache_push_event (TrackerNotifierEventCache *cache, gint64 id, TrackerNotifierEventType event_type); -void _tracker_notifier_event_cache_flush_events (TrackerNotifierEventCache *cache); +void _tracker_notifier_event_cache_flush_events (TrackerNotifier *notifier, + TrackerNotifierEventCache *cache); const gchar * tracker_notifier_event_cache_get_graph (TrackerNotifierEventCache *cache); diff --git a/src/libtracker-sparql/tracker-notifier.c b/src/libtracker-sparql/tracker-notifier.c index 1ac26e019..0eae20681 100644 --- a/src/libtracker-sparql/tracker-notifier.c +++ b/src/libtracker-sparql/tracker-notifier.c @@ -48,6 +48,9 @@ * Similarly, when receiving an event of type %TRACKER_NOTIFIER_EVENT_UPDATE, * the resource will have already changed, so the data previous to the update is * no longer available. + * + * The [signal@Tracker.Notifier::events] signal is emitted in the thread-default + * main context of the thread where the `TrackerNotifier` instance was created. */ #include "config.h" @@ -79,6 +82,7 @@ struct _TrackerNotifierPrivate { GCancellable *cancellable; TrackerSparqlStatement *local_statement; GAsyncQueue *queue; + GMainContext *main_context; gint n_local_statement_slots; guint querying : 1; guint urn_query_disabled : 1; @@ -86,9 +90,11 @@ struct _TrackerNotifierPrivate { }; struct _TrackerNotifierEventCache { - TrackerNotifierSubscription *subscription; + gchar *service; gchar *graph; - TrackerNotifier *notifier; + GWeakRef notifier; + GCancellable *cancellable; + TrackerSparqlStatement *stmt; GSequence *sequence; GSequenceIter *first; }; @@ -123,6 +129,12 @@ G_DEFINE_TYPE_WITH_CODE (TrackerNotifier, tracker_notifier, G_TYPE_OBJECT, static void tracker_notifier_query_extra_info (TrackerNotifier *notifier, TrackerNotifierEventCache *cache); +static gchar * get_service_name (TrackerNotifier *notifier, + TrackerNotifierSubscription *subscription); + +static TrackerSparqlStatement * ensure_extra_info_statement (TrackerNotifier *notifier, + TrackerNotifierSubscription *subscription); + static TrackerNotifierSubscription * tracker_notifier_subscription_new (TrackerNotifier *notifier, GDBusConnection *connection, @@ -200,12 +212,19 @@ _tracker_notifier_event_cache_new_full (TrackerNotifier *notifier, const gchar *graph) { TrackerNotifierEventCache *event_cache; + TrackerNotifierPrivate *priv; + + priv = tracker_notifier_get_instance_private (notifier); event_cache = g_new0 (TrackerNotifierEventCache, 1); - event_cache->notifier = g_object_ref (notifier); - event_cache->subscription = subscription; + g_weak_ref_init (&event_cache->notifier, notifier); event_cache->graph = g_strdup (graph); + event_cache->cancellable = g_object_ref (priv->cancellable); event_cache->sequence = g_sequence_new ((GDestroyNotify) tracker_notifier_event_unref); + event_cache->stmt = ensure_extra_info_statement (notifier, subscription); + + if (subscription) + event_cache->service = get_service_name (notifier, subscription); return event_cache; } @@ -221,7 +240,9 @@ void _tracker_notifier_event_cache_free (TrackerNotifierEventCache *event_cache) { g_sequence_free (event_cache->sequence); - g_object_unref (event_cache->notifier); + g_weak_ref_clear (&event_cache->notifier); + g_object_unref (event_cache->cancellable); + g_free (event_cache->service); g_free (event_cache->graph); g_free (event_cache); } @@ -324,14 +345,12 @@ compose_uri (const gchar *service, } static gchar * -get_service_name (TrackerNotifier *notifier, - TrackerNotifierEventCache *cache) +get_service_name (TrackerNotifier *notifier, + TrackerNotifierSubscription *subscription) { - TrackerNotifierSubscription *subscription; TrackerNotifierPrivate *priv; priv = tracker_notifier_get_instance_private (notifier); - subscription = cache->subscription; if (!subscription) return NULL; @@ -363,33 +382,47 @@ get_service_name (TrackerNotifier *notifier, static gboolean tracker_notifier_emit_events (TrackerNotifierEventCache *cache) { + TrackerNotifier *notifier; GPtrArray *events; - gchar *service; + + notifier = g_weak_ref_get (&cache->notifier); + if (!notifier) + return G_SOURCE_REMOVE; events = tracker_notifier_event_cache_take_events (cache); if (events) { - service = get_service_name (cache->notifier, cache); - g_signal_emit (cache->notifier, signals[EVENTS], 0, service, cache->graph, events); + g_signal_emit (notifier, signals[EVENTS], 0, + cache->service, cache->graph, events); g_ptr_array_unref (events); - g_free (service); } + g_object_unref (notifier); + return G_SOURCE_REMOVE; } static void -tracker_notifier_emit_events_in_idle (TrackerNotifierEventCache *cache) +tracker_notifier_emit_events_in_idle (TrackerNotifier *notifier, + TrackerNotifierEventCache *cache) { - g_idle_add_full (G_PRIORITY_DEFAULT, - (GSourceFunc) tracker_notifier_emit_events, - cache, - (GDestroyNotify) _tracker_notifier_event_cache_free); + TrackerNotifierPrivate *priv; + GSource *source; + + priv = tracker_notifier_get_instance_private (notifier); + + source = g_idle_source_new (); + g_source_set_callback (source, + (GSourceFunc) tracker_notifier_emit_events, + cache, + (GDestroyNotify) _tracker_notifier_event_cache_free); + g_source_attach (source, priv->main_context); + g_source_unref (source); } static gchar * -create_extra_info_query (TrackerNotifier *notifier, - TrackerNotifierEventCache *cache) +create_extra_info_query (TrackerNotifier *notifier, + TrackerNotifierSubscription *subscription) { GString *sparql; gchar *service; @@ -397,7 +430,7 @@ create_extra_info_query (TrackerNotifier *notifier, sparql = g_string_new ("SELECT ?id ?uri "); - service = get_service_name (notifier, cache); + service = get_service_name (notifier, subscription); if (service) { g_string_append_printf (sparql, @@ -428,8 +461,8 @@ create_extra_info_query (TrackerNotifier *notifier, } static TrackerSparqlStatement * -ensure_extra_info_statement (TrackerNotifier *notifier, - TrackerNotifierEventCache *cache) +ensure_extra_info_statement (TrackerNotifier *notifier, + TrackerNotifierSubscription *subscription) { TrackerSparqlStatement **ptr; TrackerNotifierPrivate *priv; @@ -438,8 +471,8 @@ ensure_extra_info_statement (TrackerNotifier *notifier, priv = tracker_notifier_get_instance_private (notifier); - if (cache->subscription) { - ptr = &cache->subscription->statement; + if (subscription) { + ptr = &subscription->statement; } else { ptr = &priv->local_statement; } @@ -448,7 +481,7 @@ ensure_extra_info_statement (TrackerNotifier *notifier, return *ptr; } - sparql = create_extra_info_query (notifier, cache); + sparql = create_extra_info_query (notifier, subscription); *ptr = tracker_sparql_connection_query_statement (priv->connection, sparql, priv->cancellable, @@ -472,8 +505,8 @@ handle_cursor (GTask *task, { TrackerNotifierEventCache *cache = task_data; TrackerSparqlCursor *cursor = source_object; - TrackerNotifier *notifier = cache->notifier; - TrackerNotifierPrivate *priv = tracker_notifier_get_instance_private (notifier); + TrackerNotifier *notifier; + TrackerNotifierPrivate *priv; TrackerNotifierEvent *event; GSequenceIter *iter; gint64 id; @@ -485,7 +518,7 @@ handle_cursor (GTask *task, * extracted from the GSequence, the latter because of the ORDER BY * clause. */ - while (tracker_sparql_cursor_next (cursor, NULL, NULL)) { + while (tracker_sparql_cursor_next (cursor, cancellable, NULL)) { id = tracker_sparql_cursor_get_integer (cursor, 0); event = g_sequence_get (iter); iter = g_sequence_iter_next (iter); @@ -500,12 +533,25 @@ handle_cursor (GTask *task, } tracker_sparql_cursor_close (cursor); + + if (g_task_return_error_if_cancelled (task)) { + _tracker_notifier_event_cache_free (cache); + return; + } + + notifier = g_weak_ref_get (&cache->notifier); + if (!notifier) { + _tracker_notifier_event_cache_free (cache); + return; + } + + priv = tracker_notifier_get_instance_private (notifier); cache->first = iter; if (g_sequence_iter_is_end (cache->first)) { TrackerNotifierEventCache *next; - tracker_notifier_emit_events_in_idle (cache); + tracker_notifier_emit_events_in_idle (notifier, cache); g_async_queue_lock (priv->queue); next = g_async_queue_try_pop_unlocked (priv->queue); @@ -519,6 +565,7 @@ handle_cursor (GTask *task, } g_task_return_boolean (task, TRUE); + g_object_unref (notifier); } static void @@ -548,14 +595,12 @@ query_extra_info_cb (GObject *object, { TrackerNotifierEventCache *cache = user_data; TrackerSparqlStatement *statement; - TrackerNotifierPrivate *priv; TrackerSparqlCursor *cursor; GError *error = NULL; GTask *task; statement = TRACKER_SPARQL_STATEMENT (object); cursor = tracker_sparql_statement_execute_finish (statement, res, &error); - priv = tracker_notifier_get_instance_private (cache->notifier); if (!cursor) { if (!g_error_matches (error, @@ -569,7 +614,7 @@ query_extra_info_cb (GObject *object, return; } - task = g_task_new (cursor, priv->cancellable, finish_query, NULL); + task = g_task_new (cursor, cache->cancellable, finish_query, NULL); g_task_set_task_data (task, cache, NULL); g_task_run_in_thread (task, handle_cursor); g_object_unref (task); @@ -612,30 +657,24 @@ tracker_notifier_query_extra_info (TrackerNotifier *notifier, TrackerNotifierEventCache *cache) { TrackerNotifierPrivate *priv; - TrackerSparqlStatement *statement; priv = tracker_notifier_get_instance_private (notifier); g_mutex_lock (&priv->mutex); - statement = ensure_extra_info_statement (notifier, cache); - if (!statement) - goto out; - - bind_arguments (statement, cache); - tracker_sparql_statement_execute_async (statement, - priv->cancellable, + bind_arguments (cache->stmt, cache); + tracker_sparql_statement_execute_async (cache->stmt, + cache->cancellable, query_extra_info_cb, cache); -out: g_mutex_unlock (&priv->mutex); } void -_tracker_notifier_event_cache_flush_events (TrackerNotifierEventCache *cache) +_tracker_notifier_event_cache_flush_events (TrackerNotifier *notifier, + TrackerNotifierEventCache *cache) { - TrackerNotifier *notifier = cache->notifier; TrackerNotifierPrivate *priv = tracker_notifier_get_instance_private (notifier); if (g_sequence_is_empty (cache->sequence)) { @@ -647,7 +686,7 @@ _tracker_notifier_event_cache_flush_events (TrackerNotifierEventCache *cache) g_async_queue_lock (priv->queue); if (priv->urn_query_disabled) { - tracker_notifier_emit_events_in_idle (cache); + tracker_notifier_emit_events_in_idle (notifier, cache); } else if (priv->querying) { g_async_queue_push_unlocked (priv->queue, cache); } else { @@ -678,7 +717,7 @@ graph_updated_cb (GDBusConnection *connection, handle_events (notifier, cache, events); g_variant_iter_free (events); - _tracker_notifier_event_cache_flush_events (cache); + _tracker_notifier_event_cache_flush_events (notifier, cache); } static void @@ -795,6 +834,7 @@ tracker_notifier_init (TrackerNotifier *notifier) (GDestroyNotify) tracker_notifier_subscription_free); priv->cancellable = g_cancellable_new (); priv->queue = g_async_queue_new (); + priv->main_context = g_main_context_get_thread_default (); } /** |