summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/libtracker-sparql/direct/tracker-direct.c2
-rw-r--r--src/libtracker-sparql/tracker-notifier-private.h3
-rw-r--r--src/libtracker-sparql/tracker-notifier.c132
3 files changed, 89 insertions, 48 deletions
diff --git a/src/libtracker-sparql/direct/tracker-direct.c b/src/libtracker-sparql/direct/tracker-direct.c
index c44254bcb..b3172ea32 100644
--- a/src/libtracker-sparql/direct/tracker-direct.c
+++ b/src/libtracker-sparql/direct/tracker-direct.c
@@ -723,7 +723,7 @@ commit_statement_cb (gpointer user_data)
while (g_hash_table_iter_next (&iter, NULL, (gpointer *) &cache)) {
g_hash_table_iter_steal (&iter);
- _tracker_notifier_event_cache_flush_events (cache);
+ _tracker_notifier_event_cache_flush_events (notifier, cache);
}
}
diff --git a/src/libtracker-sparql/tracker-notifier-private.h b/src/libtracker-sparql/tracker-notifier-private.h
index c9bc73e18..f277cb3f6 100644
--- a/src/libtracker-sparql/tracker-notifier-private.h
+++ b/src/libtracker-sparql/tracker-notifier-private.h
@@ -37,7 +37,8 @@ _tracker_notifier_event_cache_push_event (TrackerNotifierEventCache *cache,
gint64 id,
TrackerNotifierEventType event_type);
-void _tracker_notifier_event_cache_flush_events (TrackerNotifierEventCache *cache);
+void _tracker_notifier_event_cache_flush_events (TrackerNotifier *notifier,
+ TrackerNotifierEventCache *cache);
const gchar * tracker_notifier_event_cache_get_graph (TrackerNotifierEventCache *cache);
diff --git a/src/libtracker-sparql/tracker-notifier.c b/src/libtracker-sparql/tracker-notifier.c
index 1ac26e019..0eae20681 100644
--- a/src/libtracker-sparql/tracker-notifier.c
+++ b/src/libtracker-sparql/tracker-notifier.c
@@ -48,6 +48,9 @@
* Similarly, when receiving an event of type %TRACKER_NOTIFIER_EVENT_UPDATE,
* the resource will have already changed, so the data previous to the update is
* no longer available.
+ *
+ * The [signal@Tracker.Notifier::events] signal is emitted in the thread-default
+ * main context of the thread where the `TrackerNotifier` instance was created.
*/
#include "config.h"
@@ -79,6 +82,7 @@ struct _TrackerNotifierPrivate {
GCancellable *cancellable;
TrackerSparqlStatement *local_statement;
GAsyncQueue *queue;
+ GMainContext *main_context;
gint n_local_statement_slots;
guint querying : 1;
guint urn_query_disabled : 1;
@@ -86,9 +90,11 @@ struct _TrackerNotifierPrivate {
};
struct _TrackerNotifierEventCache {
- TrackerNotifierSubscription *subscription;
+ gchar *service;
gchar *graph;
- TrackerNotifier *notifier;
+ GWeakRef notifier;
+ GCancellable *cancellable;
+ TrackerSparqlStatement *stmt;
GSequence *sequence;
GSequenceIter *first;
};
@@ -123,6 +129,12 @@ G_DEFINE_TYPE_WITH_CODE (TrackerNotifier, tracker_notifier, G_TYPE_OBJECT,
static void tracker_notifier_query_extra_info (TrackerNotifier *notifier,
TrackerNotifierEventCache *cache);
+static gchar * get_service_name (TrackerNotifier *notifier,
+ TrackerNotifierSubscription *subscription);
+
+static TrackerSparqlStatement * ensure_extra_info_statement (TrackerNotifier *notifier,
+ TrackerNotifierSubscription *subscription);
+
static TrackerNotifierSubscription *
tracker_notifier_subscription_new (TrackerNotifier *notifier,
GDBusConnection *connection,
@@ -200,12 +212,19 @@ _tracker_notifier_event_cache_new_full (TrackerNotifier *notifier,
const gchar *graph)
{
TrackerNotifierEventCache *event_cache;
+ TrackerNotifierPrivate *priv;
+
+ priv = tracker_notifier_get_instance_private (notifier);
event_cache = g_new0 (TrackerNotifierEventCache, 1);
- event_cache->notifier = g_object_ref (notifier);
- event_cache->subscription = subscription;
+ g_weak_ref_init (&event_cache->notifier, notifier);
event_cache->graph = g_strdup (graph);
+ event_cache->cancellable = g_object_ref (priv->cancellable);
event_cache->sequence = g_sequence_new ((GDestroyNotify) tracker_notifier_event_unref);
+ event_cache->stmt = ensure_extra_info_statement (notifier, subscription);
+
+ if (subscription)
+ event_cache->service = get_service_name (notifier, subscription);
return event_cache;
}
@@ -221,7 +240,9 @@ void
_tracker_notifier_event_cache_free (TrackerNotifierEventCache *event_cache)
{
g_sequence_free (event_cache->sequence);
- g_object_unref (event_cache->notifier);
+ g_weak_ref_clear (&event_cache->notifier);
+ g_object_unref (event_cache->cancellable);
+ g_free (event_cache->service);
g_free (event_cache->graph);
g_free (event_cache);
}
@@ -324,14 +345,12 @@ compose_uri (const gchar *service,
}
static gchar *
-get_service_name (TrackerNotifier *notifier,
- TrackerNotifierEventCache *cache)
+get_service_name (TrackerNotifier *notifier,
+ TrackerNotifierSubscription *subscription)
{
- TrackerNotifierSubscription *subscription;
TrackerNotifierPrivate *priv;
priv = tracker_notifier_get_instance_private (notifier);
- subscription = cache->subscription;
if (!subscription)
return NULL;
@@ -363,33 +382,47 @@ get_service_name (TrackerNotifier *notifier,
static gboolean
tracker_notifier_emit_events (TrackerNotifierEventCache *cache)
{
+ TrackerNotifier *notifier;
GPtrArray *events;
- gchar *service;
+
+ notifier = g_weak_ref_get (&cache->notifier);
+ if (!notifier)
+ return G_SOURCE_REMOVE;
events = tracker_notifier_event_cache_take_events (cache);
if (events) {
- service = get_service_name (cache->notifier, cache);
- g_signal_emit (cache->notifier, signals[EVENTS], 0, service, cache->graph, events);
+ g_signal_emit (notifier, signals[EVENTS], 0,
+ cache->service, cache->graph, events);
g_ptr_array_unref (events);
- g_free (service);
}
+ g_object_unref (notifier);
+
return G_SOURCE_REMOVE;
}
static void
-tracker_notifier_emit_events_in_idle (TrackerNotifierEventCache *cache)
+tracker_notifier_emit_events_in_idle (TrackerNotifier *notifier,
+ TrackerNotifierEventCache *cache)
{
- g_idle_add_full (G_PRIORITY_DEFAULT,
- (GSourceFunc) tracker_notifier_emit_events,
- cache,
- (GDestroyNotify) _tracker_notifier_event_cache_free);
+ TrackerNotifierPrivate *priv;
+ GSource *source;
+
+ priv = tracker_notifier_get_instance_private (notifier);
+
+ source = g_idle_source_new ();
+ g_source_set_callback (source,
+ (GSourceFunc) tracker_notifier_emit_events,
+ cache,
+ (GDestroyNotify) _tracker_notifier_event_cache_free);
+ g_source_attach (source, priv->main_context);
+ g_source_unref (source);
}
static gchar *
-create_extra_info_query (TrackerNotifier *notifier,
- TrackerNotifierEventCache *cache)
+create_extra_info_query (TrackerNotifier *notifier,
+ TrackerNotifierSubscription *subscription)
{
GString *sparql;
gchar *service;
@@ -397,7 +430,7 @@ create_extra_info_query (TrackerNotifier *notifier,
sparql = g_string_new ("SELECT ?id ?uri ");
- service = get_service_name (notifier, cache);
+ service = get_service_name (notifier, subscription);
if (service) {
g_string_append_printf (sparql,
@@ -428,8 +461,8 @@ create_extra_info_query (TrackerNotifier *notifier,
}
static TrackerSparqlStatement *
-ensure_extra_info_statement (TrackerNotifier *notifier,
- TrackerNotifierEventCache *cache)
+ensure_extra_info_statement (TrackerNotifier *notifier,
+ TrackerNotifierSubscription *subscription)
{
TrackerSparqlStatement **ptr;
TrackerNotifierPrivate *priv;
@@ -438,8 +471,8 @@ ensure_extra_info_statement (TrackerNotifier *notifier,
priv = tracker_notifier_get_instance_private (notifier);
- if (cache->subscription) {
- ptr = &cache->subscription->statement;
+ if (subscription) {
+ ptr = &subscription->statement;
} else {
ptr = &priv->local_statement;
}
@@ -448,7 +481,7 @@ ensure_extra_info_statement (TrackerNotifier *notifier,
return *ptr;
}
- sparql = create_extra_info_query (notifier, cache);
+ sparql = create_extra_info_query (notifier, subscription);
*ptr = tracker_sparql_connection_query_statement (priv->connection,
sparql,
priv->cancellable,
@@ -472,8 +505,8 @@ handle_cursor (GTask *task,
{
TrackerNotifierEventCache *cache = task_data;
TrackerSparqlCursor *cursor = source_object;
- TrackerNotifier *notifier = cache->notifier;
- TrackerNotifierPrivate *priv = tracker_notifier_get_instance_private (notifier);
+ TrackerNotifier *notifier;
+ TrackerNotifierPrivate *priv;
TrackerNotifierEvent *event;
GSequenceIter *iter;
gint64 id;
@@ -485,7 +518,7 @@ handle_cursor (GTask *task,
* extracted from the GSequence, the latter because of the ORDER BY
* clause.
*/
- while (tracker_sparql_cursor_next (cursor, NULL, NULL)) {
+ while (tracker_sparql_cursor_next (cursor, cancellable, NULL)) {
id = tracker_sparql_cursor_get_integer (cursor, 0);
event = g_sequence_get (iter);
iter = g_sequence_iter_next (iter);
@@ -500,12 +533,25 @@ handle_cursor (GTask *task,
}
tracker_sparql_cursor_close (cursor);
+
+ if (g_task_return_error_if_cancelled (task)) {
+ _tracker_notifier_event_cache_free (cache);
+ return;
+ }
+
+ notifier = g_weak_ref_get (&cache->notifier);
+ if (!notifier) {
+ _tracker_notifier_event_cache_free (cache);
+ return;
+ }
+
+ priv = tracker_notifier_get_instance_private (notifier);
cache->first = iter;
if (g_sequence_iter_is_end (cache->first)) {
TrackerNotifierEventCache *next;
- tracker_notifier_emit_events_in_idle (cache);
+ tracker_notifier_emit_events_in_idle (notifier, cache);
g_async_queue_lock (priv->queue);
next = g_async_queue_try_pop_unlocked (priv->queue);
@@ -519,6 +565,7 @@ handle_cursor (GTask *task,
}
g_task_return_boolean (task, TRUE);
+ g_object_unref (notifier);
}
static void
@@ -548,14 +595,12 @@ query_extra_info_cb (GObject *object,
{
TrackerNotifierEventCache *cache = user_data;
TrackerSparqlStatement *statement;
- TrackerNotifierPrivate *priv;
TrackerSparqlCursor *cursor;
GError *error = NULL;
GTask *task;
statement = TRACKER_SPARQL_STATEMENT (object);
cursor = tracker_sparql_statement_execute_finish (statement, res, &error);
- priv = tracker_notifier_get_instance_private (cache->notifier);
if (!cursor) {
if (!g_error_matches (error,
@@ -569,7 +614,7 @@ query_extra_info_cb (GObject *object,
return;
}
- task = g_task_new (cursor, priv->cancellable, finish_query, NULL);
+ task = g_task_new (cursor, cache->cancellable, finish_query, NULL);
g_task_set_task_data (task, cache, NULL);
g_task_run_in_thread (task, handle_cursor);
g_object_unref (task);
@@ -612,30 +657,24 @@ tracker_notifier_query_extra_info (TrackerNotifier *notifier,
TrackerNotifierEventCache *cache)
{
TrackerNotifierPrivate *priv;
- TrackerSparqlStatement *statement;
priv = tracker_notifier_get_instance_private (notifier);
g_mutex_lock (&priv->mutex);
- statement = ensure_extra_info_statement (notifier, cache);
- if (!statement)
- goto out;
-
- bind_arguments (statement, cache);
- tracker_sparql_statement_execute_async (statement,
- priv->cancellable,
+ bind_arguments (cache->stmt, cache);
+ tracker_sparql_statement_execute_async (cache->stmt,
+ cache->cancellable,
query_extra_info_cb,
cache);
-out:
g_mutex_unlock (&priv->mutex);
}
void
-_tracker_notifier_event_cache_flush_events (TrackerNotifierEventCache *cache)
+_tracker_notifier_event_cache_flush_events (TrackerNotifier *notifier,
+ TrackerNotifierEventCache *cache)
{
- TrackerNotifier *notifier = cache->notifier;
TrackerNotifierPrivate *priv = tracker_notifier_get_instance_private (notifier);
if (g_sequence_is_empty (cache->sequence)) {
@@ -647,7 +686,7 @@ _tracker_notifier_event_cache_flush_events (TrackerNotifierEventCache *cache)
g_async_queue_lock (priv->queue);
if (priv->urn_query_disabled) {
- tracker_notifier_emit_events_in_idle (cache);
+ tracker_notifier_emit_events_in_idle (notifier, cache);
} else if (priv->querying) {
g_async_queue_push_unlocked (priv->queue, cache);
} else {
@@ -678,7 +717,7 @@ graph_updated_cb (GDBusConnection *connection,
handle_events (notifier, cache, events);
g_variant_iter_free (events);
- _tracker_notifier_event_cache_flush_events (cache);
+ _tracker_notifier_event_cache_flush_events (notifier, cache);
}
static void
@@ -795,6 +834,7 @@ tracker_notifier_init (TrackerNotifier *notifier)
(GDestroyNotify) tracker_notifier_subscription_free);
priv->cancellable = g_cancellable_new ();
priv->queue = g_async_queue_new ();
+ priv->main_context = g_main_context_get_thread_default ();
}
/**