summaryrefslogtreecommitdiff
path: root/src/libtracker-sparql/direct/tracker-direct.c
diff options
context:
space:
mode:
authorCarlos Garnacho <carlosg@gnome.org>2020-06-22 16:02:57 +0200
committerCarlos Garnacho <carlosg@gnome.org>2020-06-22 21:45:25 +0200
commita2d6b0820d72c98ab045e4dc6843cef6d9575b2e (patch)
tree2fab777499620f21f8a6ebbe7a2c2cb07c476a5f /src/libtracker-sparql/direct/tracker-direct.c
parent34b48e751c1f4bb7eab18ef4749d04388da83b7a (diff)
downloadtracker-a2d6b0820d72c98ab045e4dc6843cef6d9575b2e.tar.gz
Merge libtracker-sparql-backend with libtracker-sparql
One more step towards the unification of libtracker-sparql.
Diffstat (limited to 'src/libtracker-sparql/direct/tracker-direct.c')
-rw-r--r--src/libtracker-sparql/direct/tracker-direct.c1047
1 files changed, 1047 insertions, 0 deletions
diff --git a/src/libtracker-sparql/direct/tracker-direct.c b/src/libtracker-sparql/direct/tracker-direct.c
new file mode 100644
index 000000000..355c5debf
--- /dev/null
+++ b/src/libtracker-sparql/direct/tracker-direct.c
@@ -0,0 +1,1047 @@
+/*
+ * Copyright (C) 2010, Nokia <ivan.frade@nokia.com>
+ * Copyright (C) 2017, Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#include "config.h"
+
+#include "tracker-direct.h"
+#include "tracker-direct-statement.h"
+#include "libtracker-sparql/tracker-private.h"
+#include <libtracker-data/tracker-data.h>
+#include <libtracker-data/tracker-sparql.h>
+#include <libtracker-sparql/tracker-notifier-private.h>
+
+typedef struct _TrackerDirectConnectionPrivate TrackerDirectConnectionPrivate;
+
+struct _TrackerDirectConnectionPrivate
+{
+ TrackerSparqlConnectionFlags flags;
+ GFile *store;
+ GFile *ontology;
+
+ TrackerNamespaceManager *namespace_manager;
+ TrackerDataManager *data_manager;
+ GMutex mutex;
+
+ GThreadPool *update_thread; /* Contains 1 exclusive thread */
+ GThreadPool *select_pool;
+
+ GList *notifiers;
+
+ guint initialized : 1;
+ guint closing : 1;
+};
+
+enum {
+ PROP_0,
+ PROP_FLAGS,
+ PROP_STORE_LOCATION,
+ PROP_ONTOLOGY_LOCATION,
+ N_PROPS
+};
+
+static GParamSpec *props[N_PROPS] = { NULL };
+
+typedef enum {
+ TASK_TYPE_QUERY,
+ TASK_TYPE_UPDATE,
+ TASK_TYPE_UPDATE_BLANK,
+} TaskType;
+
+typedef struct {
+ TaskType type;
+ gchar *query;
+} TaskData;
+
+static void tracker_direct_connection_initable_iface_init (GInitableIface *iface);
+static void tracker_direct_connection_async_initable_iface_init (GAsyncInitableIface *iface);
+
+G_DEFINE_QUARK (TrackerDirectNotifier, tracker_direct_notifier)
+
+G_DEFINE_TYPE_WITH_CODE (TrackerDirectConnection, tracker_direct_connection,
+ TRACKER_TYPE_SPARQL_CONNECTION,
+ G_ADD_PRIVATE (TrackerDirectConnection)
+ G_IMPLEMENT_INTERFACE (G_TYPE_INITABLE,
+ tracker_direct_connection_initable_iface_init)
+ G_IMPLEMENT_INTERFACE (G_TYPE_ASYNC_INITABLE,
+ tracker_direct_connection_async_initable_iface_init))
+
+static TaskData *
+task_data_query_new (TaskType type,
+ const gchar *sparql)
+{
+ TaskData *data;
+
+ data = g_new0 (TaskData, 1);
+ data->type = type;
+ data->query = g_strdup (sparql);
+
+ return data;
+}
+
+static void
+task_data_free (TaskData *task)
+{
+ g_free (task->query);
+ g_free (task);
+}
+
+static void
+update_thread_func (gpointer data,
+ gpointer user_data)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ GTask *task = data;
+ TaskData *task_data = g_task_get_task_data (task);
+ TrackerData *tracker_data;
+ GError *error = NULL;
+ gpointer retval = NULL;
+ GDestroyNotify destroy_notify = NULL;
+
+ conn = user_data;
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ g_mutex_lock (&priv->mutex);
+ tracker_data = tracker_data_manager_get_data (priv->data_manager);
+
+ switch (task_data->type) {
+ case TASK_TYPE_QUERY:
+ g_warning ("Queries don't go through this thread");
+ break;
+ case TASK_TYPE_UPDATE:
+ tracker_data_update_sparql (tracker_data, task_data->query, &error);
+ break;
+ case TASK_TYPE_UPDATE_BLANK:
+ retval = tracker_data_update_sparql_blank (tracker_data, task_data->query, &error);
+ destroy_notify = (GDestroyNotify) g_variant_unref;
+ break;
+ }
+
+ if (error)
+ g_task_return_error (task, error);
+ else if (retval)
+ g_task_return_pointer (task, retval, destroy_notify);
+ else
+ g_task_return_boolean (task, TRUE);
+
+ g_object_unref (task);
+ g_mutex_unlock (&priv->mutex);
+}
+
+static void
+query_thread_pool_func (gpointer data,
+ gpointer user_data)
+{
+ TrackerDirectConnection *conn = user_data;
+ TrackerDirectConnectionPrivate *priv;
+ TrackerSparqlCursor *cursor;
+ GTask *task = data;
+ TaskData *task_data = g_task_get_task_data (task);
+ GError *error = NULL;
+
+ g_assert (task_data->type == TASK_TYPE_QUERY);
+
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ if (priv->closing) {
+ g_task_return_new_error (task,
+ G_IO_ERROR,
+ G_IO_ERROR_CONNECTION_CLOSED,
+ "Connection is closed");
+ g_object_unref (task);
+ return;
+ }
+
+ cursor = tracker_sparql_connection_query (TRACKER_SPARQL_CONNECTION (g_task_get_source_object (task)),
+ task_data->query,
+ g_task_get_cancellable (task),
+ &error);
+ if (cursor)
+ g_task_return_pointer (task, cursor, g_object_unref);
+ else
+ g_task_return_error (task, error);
+
+ g_object_unref (task);
+}
+
+static gint
+task_compare_func (GTask *a,
+ GTask *b,
+ gpointer user_data)
+{
+ return g_task_get_priority (b) - g_task_get_priority (a);
+}
+
+static gboolean
+set_up_thread_pools (TrackerDirectConnection *conn,
+ GError **error)
+{
+ TrackerDirectConnectionPrivate *priv;
+
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ priv->select_pool = g_thread_pool_new (query_thread_pool_func,
+ conn, 16, FALSE, error);
+ if (!priv->select_pool)
+ return FALSE;
+
+ priv->update_thread = g_thread_pool_new (update_thread_func,
+ conn, 1, TRUE, error);
+ if (!priv->update_thread)
+ return FALSE;
+
+ g_thread_pool_set_sort_function (priv->select_pool,
+ (GCompareDataFunc) task_compare_func,
+ conn);
+ g_thread_pool_set_sort_function (priv->update_thread,
+ (GCompareDataFunc) task_compare_func,
+ conn);
+ return TRUE;
+}
+
+static TrackerDBManagerFlags
+translate_flags (TrackerSparqlConnectionFlags flags)
+{
+ TrackerDBManagerFlags db_flags = TRACKER_DB_MANAGER_ENABLE_MUTEXES;
+
+ if ((flags & TRACKER_SPARQL_CONNECTION_FLAGS_READONLY) != 0)
+ db_flags |= TRACKER_DB_MANAGER_READONLY;
+ if ((flags & TRACKER_SPARQL_CONNECTION_FLAGS_FTS_ENABLE_STEMMER) != 0)
+ db_flags |= TRACKER_DB_MANAGER_FTS_ENABLE_STEMMER;
+ if ((flags & TRACKER_SPARQL_CONNECTION_FLAGS_FTS_ENABLE_UNACCENT) != 0)
+ db_flags |= TRACKER_DB_MANAGER_FTS_ENABLE_UNACCENT;
+ if ((flags & TRACKER_SPARQL_CONNECTION_FLAGS_FTS_ENABLE_STOP_WORDS) != 0)
+ db_flags |= TRACKER_DB_MANAGER_FTS_ENABLE_STOP_WORDS;
+ if ((flags & TRACKER_SPARQL_CONNECTION_FLAGS_FTS_IGNORE_NUMBERS) != 0)
+ db_flags |= TRACKER_DB_MANAGER_FTS_IGNORE_NUMBERS;
+
+ return db_flags;
+}
+
+static GError *
+translate_error (GError *error)
+{
+ GError *new_error = NULL;
+
+ if (error->domain == TRACKER_DATA_ONTOLOGY_ERROR) {
+ /* This is an internal error domain, so translate to a libtracker-sparql error code. */
+ switch (error->code) {
+ case TRACKER_DATA_ONTOLOGY_NOT_FOUND:
+ new_error = g_error_new_literal (TRACKER_SPARQL_ERROR,
+ TRACKER_SPARQL_ERROR_ONTOLOGY_NOT_FOUND,
+ error->message);
+ break;
+ case TRACKER_DATA_UNSUPPORTED_LOCATION:
+ case TRACKER_DATA_UNSUPPORTED_ONTOLOGY_CHANGE:
+ new_error = g_error_new_literal (TRACKER_SPARQL_ERROR,
+ TRACKER_SPARQL_ERROR_UNSUPPORTED,
+ error->message);
+ break;
+ default:
+ new_error = g_error_new_literal (TRACKER_SPARQL_ERROR,
+ TRACKER_SPARQL_ERROR_INTERNAL,
+ error->message);
+ }
+ }
+
+ if (new_error) {
+ g_error_free (error);
+ return new_error;
+ } else {
+ return error;
+ }
+}
+
+
+static gboolean
+tracker_direct_connection_initable_init (GInitable *initable,
+ GCancellable *cancellable,
+ GError **error)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ TrackerDBManagerFlags db_flags;
+ GHashTable *namespaces;
+ GHashTableIter iter;
+ gchar *prefix, *ns;
+ GError *inner_error = NULL;
+
+ conn = TRACKER_DIRECT_CONNECTION (initable);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ tracker_locale_sanity_check ();
+
+ if (!set_up_thread_pools (conn, error))
+ return FALSE;
+
+ db_flags = translate_flags (priv->flags);
+
+ if (!priv->store) {
+ db_flags |= TRACKER_DB_MANAGER_IN_MEMORY;
+ }
+
+ priv->data_manager = tracker_data_manager_new (db_flags, priv->store,
+ priv->ontology,
+ 100, 100);
+ if (!g_initable_init (G_INITABLE (priv->data_manager), cancellable, &inner_error)) {
+ g_propagate_error (error, translate_error (inner_error));
+ g_clear_object (&priv->data_manager);
+ return FALSE;
+ }
+
+ /* Initialize namespace manager */
+ priv->namespace_manager = tracker_namespace_manager_new ();
+ namespaces = tracker_data_manager_get_namespaces (priv->data_manager);
+ g_hash_table_iter_init (&iter, namespaces);
+
+ while (g_hash_table_iter_next (&iter, (gpointer*) &prefix, (gpointer*) &ns)) {
+ tracker_namespace_manager_add_prefix (priv->namespace_manager,
+ prefix, ns);
+ }
+
+ g_hash_table_unref (namespaces);
+
+ return TRUE;
+}
+
+static void
+tracker_direct_connection_initable_iface_init (GInitableIface *iface)
+{
+ iface->init = tracker_direct_connection_initable_init;
+}
+
+static void
+async_initable_thread_func (GTask *task,
+ gpointer source_object,
+ gpointer task_data,
+ GCancellable *cancellable)
+{
+ GError *error = NULL;
+
+ if (!g_initable_init (G_INITABLE (source_object), cancellable, &error))
+ g_task_return_error (task, error);
+ else
+ g_task_return_boolean (task, TRUE);
+
+ g_object_unref (task);
+}
+
+static void
+tracker_direct_connection_async_initable_init_async (GAsyncInitable *async_initable,
+ gint priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ GTask *task;
+
+ task = g_task_new (async_initable, cancellable, callback, user_data);
+ g_task_set_priority (task, priority);
+ g_task_run_in_thread (task, async_initable_thread_func);
+}
+
+static gboolean
+tracker_direct_connection_async_initable_init_finish (GAsyncInitable *async_initable,
+ GAsyncResult *res,
+ GError **error)
+{
+ return g_task_propagate_boolean (G_TASK (res), error);
+}
+
+static void
+tracker_direct_connection_async_initable_iface_init (GAsyncInitableIface *iface)
+{
+ iface->init_async = tracker_direct_connection_async_initable_init_async;
+ iface->init_finish = tracker_direct_connection_async_initable_init_finish;
+}
+
+static void
+tracker_direct_connection_init (TrackerDirectConnection *conn)
+{
+}
+
+static GHashTable *
+get_event_cache_ht (TrackerNotifier *notifier)
+{
+ GHashTable *events;
+
+ events = g_object_get_qdata (G_OBJECT (notifier), tracker_direct_notifier_quark ());
+ if (!events) {
+ events = g_hash_table_new_full (NULL, NULL, NULL,
+ (GDestroyNotify) _tracker_notifier_event_cache_free);
+ g_object_set_qdata_full (G_OBJECT (notifier), tracker_direct_notifier_quark (),
+ events, (GDestroyNotify) g_hash_table_unref);
+ }
+
+ return events;
+}
+
+static TrackerNotifierEventCache *
+lookup_event_cache (TrackerNotifier *notifier,
+ gint graph_id,
+ const gchar *graph)
+{
+ TrackerNotifierEventCache *cache;
+ GHashTable *events;
+
+ events = get_event_cache_ht (notifier);
+ cache = g_hash_table_lookup (events, GINT_TO_POINTER (graph_id));
+
+ if (!cache) {
+ cache = _tracker_notifier_event_cache_new (notifier, graph);
+ g_hash_table_insert (events, GINT_TO_POINTER (graph_id), cache);
+ }
+
+ return cache;
+}
+
+/* These callbacks will be called from a different thread
+ * (always the same one though), handle with care.
+ */
+static void
+insert_statement_cb (gint graph_id,
+ const gchar *graph,
+ gint subject_id,
+ const gchar *subject,
+ gint predicate_id,
+ gint object_id,
+ const gchar *object,
+ GPtrArray *rdf_types,
+ gpointer user_data)
+{
+ TrackerNotifier *notifier = user_data;
+ TrackerSparqlConnection *conn = _tracker_notifier_get_connection (notifier);
+ TrackerDirectConnection *direct = TRACKER_DIRECT_CONNECTION (conn);
+ TrackerDirectConnectionPrivate *priv = tracker_direct_connection_get_instance_private (direct);
+ TrackerOntologies *ontologies = tracker_data_manager_get_ontologies (priv->data_manager);
+ TrackerProperty *rdf_type = tracker_ontologies_get_rdf_type (ontologies);
+ TrackerNotifierEventCache *cache;
+ TrackerClass *new_class = NULL;
+ gint i;
+
+ cache = lookup_event_cache (notifier, graph_id, graph);
+
+ if (predicate_id == tracker_property_get_id (rdf_type)) {
+ const gchar *uri;
+
+ uri = tracker_ontologies_get_uri_by_id (ontologies, object_id);
+ new_class = tracker_ontologies_get_class_by_uri (ontologies, uri);
+ }
+
+ for (i = 0; i < rdf_types->len; i++) {
+ TrackerClass *class = g_ptr_array_index (rdf_types, i);
+ TrackerNotifierEventType event_type;
+
+ if (!tracker_class_get_notify (class))
+ continue;
+
+ if (class == new_class)
+ event_type = TRACKER_NOTIFIER_EVENT_CREATE;
+ else
+ event_type = TRACKER_NOTIFIER_EVENT_UPDATE;
+
+ _tracker_notifier_event_cache_push_event (cache, subject_id, event_type);
+ }
+}
+
+static void
+delete_statement_cb (gint graph_id,
+ const gchar *graph,
+ gint subject_id,
+ const gchar *subject,
+ gint predicate_id,
+ gint object_id,
+ const gchar *object,
+ GPtrArray *rdf_types,
+ gpointer user_data)
+{
+ TrackerNotifier *notifier = user_data;
+ TrackerSparqlConnection *conn = _tracker_notifier_get_connection (notifier);
+ TrackerDirectConnection *direct = TRACKER_DIRECT_CONNECTION (conn);
+ TrackerDirectConnectionPrivate *priv = tracker_direct_connection_get_instance_private (direct);
+ TrackerOntologies *ontologies = tracker_data_manager_get_ontologies (priv->data_manager);
+ TrackerProperty *rdf_type = tracker_ontologies_get_rdf_type (ontologies);
+ TrackerNotifierEventCache *cache;
+ TrackerClass *class_being_removed = NULL;
+ gint i;
+
+ cache = lookup_event_cache (notifier, graph_id, graph);
+
+ if (predicate_id == tracker_property_get_id (rdf_type)) {
+ class_being_removed = tracker_ontologies_get_class_by_uri (ontologies, object);
+ }
+
+ for (i = 0; i < rdf_types->len; i++) {
+ TrackerClass *class = g_ptr_array_index (rdf_types, i);
+ TrackerNotifierEventType event_type;
+
+ if (!tracker_class_get_notify (class))
+ continue;
+
+ if (class_being_removed && class == class_being_removed) {
+ event_type = TRACKER_NOTIFIER_EVENT_DELETE;
+ } else {
+ event_type = TRACKER_NOTIFIER_EVENT_UPDATE;
+ }
+
+ _tracker_notifier_event_cache_push_event (cache, subject_id, event_type);
+ }
+}
+
+static void
+commit_statement_cb (gpointer user_data)
+{
+ TrackerNotifierEventCache *cache;
+ TrackerNotifier *notifier = user_data;
+ GHashTable *events;
+ GHashTableIter iter;
+
+ events = get_event_cache_ht (notifier);
+ g_hash_table_iter_init (&iter, events);
+
+ while (g_hash_table_iter_next (&iter, NULL, (gpointer *) &cache)) {
+ g_hash_table_iter_steal (&iter);
+ _tracker_notifier_event_cache_flush_events (cache);
+ }
+}
+
+static void
+rollback_statement_cb (gpointer user_data)
+{
+ TrackerNotifier *notifier = user_data;
+ GHashTable *events;
+
+ events = get_event_cache_ht (notifier);
+ g_hash_table_remove_all (events);
+}
+
+static void
+detach_notifier (TrackerDirectConnection *conn,
+ TrackerNotifier *notifier)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerData *tracker_data;
+
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ priv->notifiers = g_list_remove (priv->notifiers, notifier);
+
+ tracker_data = tracker_data_manager_get_data (priv->data_manager);
+ tracker_data_remove_insert_statement_callback (tracker_data,
+ insert_statement_cb,
+ notifier);
+ tracker_data_remove_delete_statement_callback (tracker_data,
+ delete_statement_cb,
+ notifier);
+ tracker_data_remove_commit_statement_callback (tracker_data,
+ commit_statement_cb,
+ notifier);
+ tracker_data_remove_rollback_statement_callback (tracker_data,
+ rollback_statement_cb,
+ notifier);
+}
+
+static void
+weak_ref_notify (gpointer data,
+ GObject *prev_location)
+{
+ TrackerDirectConnection *conn = data;
+
+ detach_notifier (conn, (TrackerNotifier *) prev_location);
+}
+
+static void
+tracker_direct_connection_finalize (GObject *object)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+
+ conn = TRACKER_DIRECT_CONNECTION (object);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ g_clear_object (&priv->store);
+ g_clear_object (&priv->ontology);
+ g_clear_object (&priv->namespace_manager);
+
+ G_OBJECT_CLASS (tracker_direct_connection_parent_class)->finalize (object);
+}
+
+static void
+tracker_direct_connection_set_property (GObject *object,
+ guint prop_id,
+ const GValue *value,
+ GParamSpec *pspec)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+
+ conn = TRACKER_DIRECT_CONNECTION (object);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ switch (prop_id) {
+ case PROP_FLAGS:
+ priv->flags = g_value_get_flags (value);
+ break;
+ case PROP_STORE_LOCATION:
+ priv->store = g_value_dup_object (value);
+ break;
+ case PROP_ONTOLOGY_LOCATION:
+ priv->ontology = g_value_dup_object (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+tracker_direct_connection_get_property (GObject *object,
+ guint prop_id,
+ GValue *value,
+ GParamSpec *pspec)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+
+ conn = TRACKER_DIRECT_CONNECTION (object);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ switch (prop_id) {
+ case PROP_FLAGS:
+ g_value_set_flags (value, priv->flags);
+ break;
+ case PROP_STORE_LOCATION:
+ g_value_set_object (value, priv->store);
+ break;
+ case PROP_ONTOLOGY_LOCATION:
+ g_value_set_object (value, priv->ontology);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static TrackerSparqlCursor *
+tracker_direct_connection_query (TrackerSparqlConnection *self,
+ const gchar *sparql,
+ GCancellable *cancellable,
+ GError **error)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ TrackerSparql *query;
+ TrackerSparqlCursor *cursor;
+
+ conn = TRACKER_DIRECT_CONNECTION (self);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ g_mutex_lock (&priv->mutex);
+ query = tracker_sparql_new (priv->data_manager, sparql);
+ cursor = tracker_sparql_execute_cursor (query, NULL, error);
+ g_object_unref (query);
+
+ if (cursor)
+ tracker_sparql_cursor_set_connection (cursor, self);
+ g_mutex_unlock (&priv->mutex);
+
+ return cursor;
+}
+
+static void
+tracker_direct_connection_query_async (TrackerSparqlConnection *self,
+ const gchar *sparql,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ GError *error = NULL;
+ GTask *task;
+
+ conn = TRACKER_DIRECT_CONNECTION (self);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ task = g_task_new (self, cancellable, callback, user_data);
+ g_task_set_task_data (task,
+ task_data_query_new (TASK_TYPE_QUERY, sparql),
+ (GDestroyNotify) task_data_free);
+
+ if (!g_thread_pool_push (priv->select_pool, task, &error)) {
+ g_task_return_error (task, error);
+ g_object_unref (task);
+ }
+}
+
+static TrackerSparqlCursor *
+tracker_direct_connection_query_finish (TrackerSparqlConnection *self,
+ GAsyncResult *res,
+ GError **error)
+{
+ return g_task_propagate_pointer (G_TASK (res), error);
+}
+
+static TrackerSparqlStatement *
+tracker_direct_connection_query_statement (TrackerSparqlConnection *self,
+ const gchar *query,
+ GCancellable *cancellable,
+ GError **error)
+{
+ return TRACKER_SPARQL_STATEMENT (tracker_direct_statement_new (self, query, error));
+}
+
+static void
+tracker_direct_connection_update (TrackerSparqlConnection *self,
+ const gchar *sparql,
+ GCancellable *cancellable,
+ GError **error)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ TrackerData *data;
+
+ conn = TRACKER_DIRECT_CONNECTION (self);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ g_mutex_lock (&priv->mutex);
+ data = tracker_data_manager_get_data (priv->data_manager);
+ tracker_data_update_sparql (data, sparql, error);
+ g_mutex_unlock (&priv->mutex);
+}
+
+static void
+tracker_direct_connection_update_async (TrackerSparqlConnection *self,
+ const gchar *sparql,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ GTask *task;
+
+ conn = TRACKER_DIRECT_CONNECTION (self);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ task = g_task_new (self, cancellable, callback, user_data);
+ g_task_set_task_data (task,
+ task_data_query_new (TASK_TYPE_UPDATE, sparql),
+ (GDestroyNotify) task_data_free);
+
+ g_thread_pool_push (priv->update_thread, task, NULL);
+}
+
+static void
+tracker_direct_connection_update_finish (TrackerSparqlConnection *self,
+ GAsyncResult *res,
+ GError **error)
+{
+ g_task_propagate_boolean (G_TASK (res), error);
+}
+
+static void
+tracker_direct_connection_update_array_async (TrackerSparqlConnection *self,
+ gchar **updates,
+ gint n_updates,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ TaskData *task_data;
+ GTask *task;
+ gchar *concatenated;
+ gchar **array_copy;
+
+ conn = TRACKER_DIRECT_CONNECTION (self);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ /* Make a NULL-terminated array and concatenate it */
+ array_copy = g_new0 (gchar *, n_updates + 1);
+ memcpy (array_copy, updates, n_updates * sizeof (gchar *));
+ concatenated = g_strjoinv ("\n", array_copy);
+ g_free (array_copy);
+
+ task_data = task_data_query_new (TASK_TYPE_UPDATE, NULL);
+ task_data->query = concatenated;
+
+ task = g_task_new (self, cancellable, callback, user_data);
+ g_task_set_task_data (task, task_data,
+ (GDestroyNotify) task_data_free);
+
+ g_thread_pool_push (priv->update_thread, task, NULL);
+}
+
+static gboolean
+tracker_direct_connection_update_array_finish (TrackerSparqlConnection *self,
+ GAsyncResult *res,
+ GError **error)
+{
+ return g_task_propagate_boolean (G_TASK (res), error);
+}
+
+static GVariant *
+tracker_direct_connection_update_blank (TrackerSparqlConnection *self,
+ const gchar *sparql,
+ GCancellable *cancellable,
+ GError **error)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ TrackerData *data;
+ GVariant *blank_nodes;
+
+ conn = TRACKER_DIRECT_CONNECTION (self);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ g_mutex_lock (&priv->mutex);
+ data = tracker_data_manager_get_data (priv->data_manager);
+ blank_nodes = tracker_data_update_sparql_blank (data, sparql, error);
+ g_mutex_unlock (&priv->mutex);
+
+ return blank_nodes;
+}
+
+static void
+tracker_direct_connection_update_blank_async (TrackerSparqlConnection *self,
+ const gchar *sparql,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ GTask *task;
+
+ conn = TRACKER_DIRECT_CONNECTION (self);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ task = g_task_new (self, cancellable, callback, user_data);
+ g_task_set_task_data (task,
+ task_data_query_new (TASK_TYPE_UPDATE_BLANK, sparql),
+ (GDestroyNotify) task_data_free);
+
+ g_thread_pool_push (priv->update_thread, task, NULL);
+}
+
+static GVariant *
+tracker_direct_connection_update_blank_finish (TrackerSparqlConnection *self,
+ GAsyncResult *res,
+ GError **error)
+{
+ return g_task_propagate_pointer (G_TASK (res), error);
+}
+
+static TrackerNamespaceManager *
+tracker_direct_connection_get_namespace_manager (TrackerSparqlConnection *self)
+{
+ TrackerDirectConnectionPrivate *priv;
+
+ priv = tracker_direct_connection_get_instance_private (TRACKER_DIRECT_CONNECTION (self));
+
+ return priv->namespace_manager;
+}
+
+static TrackerNotifier *
+tracker_direct_connection_create_notifier (TrackerSparqlConnection *self)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerNotifier *notifier;
+ TrackerData *tracker_data;
+
+ priv = tracker_direct_connection_get_instance_private (TRACKER_DIRECT_CONNECTION (self));
+
+ notifier = g_object_new (TRACKER_TYPE_NOTIFIER,
+ "connection", self,
+ NULL);
+
+ tracker_data = tracker_data_manager_get_data (priv->data_manager);
+ tracker_data_add_insert_statement_callback (tracker_data,
+ insert_statement_cb,
+ notifier);
+ tracker_data_add_delete_statement_callback (tracker_data,
+ delete_statement_cb,
+ notifier);
+ tracker_data_add_commit_statement_callback (tracker_data,
+ commit_statement_cb,
+ notifier);
+ tracker_data_add_rollback_statement_callback (tracker_data,
+ rollback_statement_cb,
+ notifier);
+
+ g_object_weak_ref (G_OBJECT (notifier), weak_ref_notify, self);
+ priv->notifiers = g_list_prepend (priv->notifiers, notifier);
+
+ return notifier;
+}
+
+static void
+tracker_direct_connection_close (TrackerSparqlConnection *self)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+
+ conn = TRACKER_DIRECT_CONNECTION (self);
+ priv = tracker_direct_connection_get_instance_private (conn);
+ priv->closing = TRUE;
+
+ if (priv->update_thread) {
+ g_thread_pool_free (priv->update_thread, TRUE, TRUE);
+ priv->update_thread = NULL;
+ }
+
+ if (priv->select_pool) {
+ g_thread_pool_free (priv->select_pool, TRUE, TRUE);
+ priv->select_pool = NULL;
+ }
+
+ while (priv->notifiers) {
+ TrackerNotifier *notifier = priv->notifiers->data;
+
+ g_object_weak_unref (G_OBJECT (notifier),
+ weak_ref_notify,
+ conn);
+ detach_notifier (conn, notifier);
+ }
+
+ if (priv->data_manager) {
+ tracker_data_manager_shutdown (priv->data_manager);
+ g_clear_object (&priv->data_manager);
+ }
+}
+
+void
+async_close_thread_func (GTask *task,
+ gpointer source_object,
+ gpointer task_data,
+ GCancellable *cancellable)
+{
+ if (g_task_return_error_if_cancelled (task))
+ return;
+
+ tracker_sparql_connection_close (source_object);
+ g_task_return_boolean (task, TRUE);
+}
+
+void
+tracker_direct_connection_close_async (TrackerSparqlConnection *connection,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ GTask *task;
+
+ task = g_task_new (connection, cancellable, callback, user_data);
+ g_task_run_in_thread (task, async_close_thread_func);
+ g_object_unref (task);
+}
+
+gboolean
+tracker_direct_connection_close_finish (TrackerSparqlConnection *connection,
+ GAsyncResult *res,
+ GError **error)
+{
+ return g_task_propagate_boolean (G_TASK (res), error);
+}
+
+static void
+tracker_direct_connection_class_init (TrackerDirectConnectionClass *klass)
+{
+ TrackerSparqlConnectionClass *sparql_connection_class;
+ GObjectClass *object_class;
+
+ object_class = G_OBJECT_CLASS (klass);
+ sparql_connection_class = TRACKER_SPARQL_CONNECTION_CLASS (klass);
+
+ object_class->finalize = tracker_direct_connection_finalize;
+ object_class->set_property = tracker_direct_connection_set_property;
+ object_class->get_property = tracker_direct_connection_get_property;
+
+ sparql_connection_class->query = tracker_direct_connection_query;
+ sparql_connection_class->query_async = tracker_direct_connection_query_async;
+ sparql_connection_class->query_finish = tracker_direct_connection_query_finish;
+ sparql_connection_class->query_statement = tracker_direct_connection_query_statement;
+ sparql_connection_class->update = tracker_direct_connection_update;
+ sparql_connection_class->update_async = tracker_direct_connection_update_async;
+ sparql_connection_class->update_finish = tracker_direct_connection_update_finish;
+ sparql_connection_class->update_array_async = tracker_direct_connection_update_array_async;
+ sparql_connection_class->update_array_finish = tracker_direct_connection_update_array_finish;
+ sparql_connection_class->update_blank = tracker_direct_connection_update_blank;
+ sparql_connection_class->update_blank_async = tracker_direct_connection_update_blank_async;
+ sparql_connection_class->update_blank_finish = tracker_direct_connection_update_blank_finish;
+ sparql_connection_class->get_namespace_manager = tracker_direct_connection_get_namespace_manager;
+ sparql_connection_class->create_notifier = tracker_direct_connection_create_notifier;
+ sparql_connection_class->close = tracker_direct_connection_close;
+ sparql_connection_class->close_async = tracker_direct_connection_close_async;
+ sparql_connection_class->close_finish = tracker_direct_connection_close_finish;
+
+ props[PROP_FLAGS] =
+ g_param_spec_flags ("flags",
+ "Flags",
+ "Flags",
+ TRACKER_TYPE_SPARQL_CONNECTION_FLAGS,
+ TRACKER_SPARQL_CONNECTION_FLAGS_NONE,
+ G_PARAM_READWRITE |
+ G_PARAM_CONSTRUCT_ONLY);
+ props[PROP_STORE_LOCATION] =
+ g_param_spec_object ("store-location",
+ "Store location",
+ "Store location",
+ G_TYPE_FILE,
+ G_PARAM_READWRITE |
+ G_PARAM_CONSTRUCT_ONLY);
+ props[PROP_ONTOLOGY_LOCATION] =
+ g_param_spec_object ("ontology-location",
+ "Ontology location",
+ "Ontology location",
+ G_TYPE_FILE,
+ G_PARAM_READWRITE |
+ G_PARAM_CONSTRUCT_ONLY);
+
+ g_object_class_install_properties (object_class, N_PROPS, props);
+}
+
+TrackerDirectConnection *
+tracker_direct_connection_new (TrackerSparqlConnectionFlags flags,
+ GFile *store,
+ GFile *ontology,
+ GError **error)
+{
+ g_return_val_if_fail (!store || G_IS_FILE (store), NULL);
+ g_return_val_if_fail (!ontology || G_IS_FILE (ontology), NULL);
+ g_return_val_if_fail (!error || !*error, NULL);
+
+ return g_object_new (TRACKER_TYPE_DIRECT_CONNECTION,
+ "flags", flags,
+ "store-location", store,
+ "ontology-location", ontology,
+ NULL);
+}
+
+TrackerDataManager *
+tracker_direct_connection_get_data_manager (TrackerDirectConnection *conn)
+{
+ TrackerDirectConnectionPrivate *priv;
+
+ priv = tracker_direct_connection_get_instance_private (conn);
+ return priv->data_manager;
+}