summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCarlos Garnacho <carlos@lanedo.com>2011-07-14 15:03:19 +0200
committerJürg Billeter <j@bitron.ch>2011-07-22 14:26:40 +0200
commit0478db84a158bc4a4c99172c81004115d7fb0453 (patch)
treeeec0c51aab22b697af94d4b202c9cb61b4ae1cec
parentad173647549ac12b283c6cab2a142838e8193b18 (diff)
downloadtracker-0478db84a158bc4a4c99172c81004115d7fb0453.tar.gz
libtracker-miner: Add priorities to the sparql buffer
If a task is of priority G_PRIORITY_HIGH, it will issue an Update() instead of a UpdateArray(), so updates are most immediate.
-rw-r--r--src/libtracker-miner/tracker-miner-fs.c4
-rw-r--r--src/libtracker-miner/tracker-sparql-buffer.c84
-rw-r--r--src/libtracker-miner/tracker-sparql-buffer.h1
3 files changed, 73 insertions, 16 deletions
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 9ca23a581..229475d45 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -1891,6 +1891,7 @@ item_add_or_update_cb (TrackerMinerFS *fs,
tracker_sparql_buffer_push (fs->priv->sparql_buffer,
sparql_task,
+ ctxt->priority,
sparql_buffer_task_finished_cb,
fs);
@@ -1999,6 +2000,7 @@ item_remove (TrackerMinerFS *fs,
tracker_sparql_buffer_push (fs->priv->sparql_buffer,
task,
+ G_PRIORITY_DEFAULT,
sparql_buffer_task_finished_cb,
fs);
@@ -2017,6 +2019,7 @@ item_remove (TrackerMinerFS *fs,
tracker_sparql_buffer_push (fs->priv->sparql_buffer,
task,
+ G_PRIORITY_DEFAULT,
sparql_buffer_task_finished_cb,
fs);
@@ -2364,6 +2367,7 @@ item_move (TrackerMinerFS *fs,
FALSE));
tracker_sparql_buffer_push (fs->priv->sparql_buffer,
task,
+ G_PRIORITY_DEFAULT,
sparql_buffer_task_finished_cb,
fs);
diff --git a/src/libtracker-miner/tracker-sparql-buffer.c b/src/libtracker-miner/tracker-sparql-buffer.c
index bd3abe9b5..cb35b44b7 100644
--- a/src/libtracker-miner/tracker-sparql-buffer.c
+++ b/src/libtracker-miner/tracker-sparql-buffer.c
@@ -26,6 +26,7 @@
typedef struct _TrackerSparqlBufferPrivate TrackerSparqlBufferPrivate;
typedef struct _SparqlTaskData SparqlTaskData;
typedef struct _UpdateArrayData UpdateArrayData;
+typedef struct _UpdateData UpdateData;
typedef struct _BulkOperationMerge BulkOperationMerge;
enum {
@@ -64,6 +65,11 @@ struct _SparqlTaskData
GSimpleAsyncResult *result;
};
+struct _UpdateData {
+ TrackerSparqlBuffer *buffer;
+ TrackerTask *task;
+};
+
struct _UpdateArrayData {
TrackerSparqlBuffer *buffer;
GPtrArray *tasks;
@@ -544,9 +550,30 @@ tracker_sparql_buffer_flush (TrackerSparqlBuffer *buffer,
return TRUE;
}
+static void
+tracker_sparql_buffer_update_cb (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ UpdateData *update_data = user_data;
+ GError *error = NULL;
+
+ tracker_sparql_connection_update_finish (TRACKER_SPARQL_CONNECTION (object),
+ result, &error);
+ if (error) {
+ g_critical ("Error in prioritized update: %s\n", error->message);
+ g_error_free (error);
+ }
+
+ tracker_task_pool_remove (TRACKER_TASK_POOL (update_data->buffer),
+ update_data->task);
+ g_slice_free (UpdateData, update_data);
+}
+
void
tracker_sparql_buffer_push (TrackerSparqlBuffer *buffer,
TrackerTask *task,
+ gint priority,
GAsyncReadyCallback cb,
gpointer user_data)
{
@@ -558,27 +585,52 @@ tracker_sparql_buffer_push (TrackerSparqlBuffer *buffer,
priv = buffer->priv;
- if (tracker_task_pool_get_size (TRACKER_TASK_POOL (buffer)) == 0) {
- reset_flush_timeout (buffer);
- }
+ data = tracker_task_get_data (task);
+ data->result = g_simple_async_result_new (G_OBJECT (buffer),
+ cb, user_data, NULL);
- tracker_task_pool_add (TRACKER_TASK_POOL (buffer), task);
+ if (priority <= G_PRIORITY_HIGH &&
+ data->type != TASK_TYPE_BULK) {
+ UpdateData *update_data;
+ const gchar *sparql = NULL;
- if (!priv->tasks) {
- priv->tasks = g_ptr_array_new_with_free_func ((GDestroyNotify) tracker_task_unref);
- }
+ /* High priority task */
+ update_data = g_slice_new0 (UpdateData);
+ update_data->buffer = buffer;
+ update_data->task = task;
- g_ptr_array_add (priv->tasks, task);
+ if (data->type == TASK_TYPE_SPARQL_STR) {
+ sparql = data->data.str;
+ } else if (data->type == TASK_TYPE_SPARQL) {
+ sparql = tracker_sparql_builder_get_result (data->data.builder);
+ }
- data = tracker_task_get_data (task);
- data->result = g_simple_async_result_new (G_OBJECT (buffer),
- cb, user_data, NULL);
+ tracker_task_pool_add (TRACKER_TASK_POOL (buffer), task);
+ tracker_sparql_connection_update_async (priv->connection,
+ sparql,
+ G_PRIORITY_HIGH,
+ NULL,
+ tracker_sparql_buffer_update_cb,
+ update_data);
+ } else {
+ if (tracker_task_pool_get_size (TRACKER_TASK_POOL (buffer)) == 0) {
+ reset_flush_timeout (buffer);
+ }
+
+ tracker_task_pool_add (TRACKER_TASK_POOL (buffer), task);
+
+ if (!priv->tasks) {
+ priv->tasks = g_ptr_array_new_with_free_func ((GDestroyNotify) tracker_task_unref);
+ }
+
+ g_ptr_array_add (priv->tasks, task);
- if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (buffer))) {
- tracker_sparql_buffer_flush (buffer, "SPARQL buffer limit reached");
- } else if (priv->tasks->len > tracker_task_pool_get_limit (TRACKER_TASK_POOL (buffer)) / 2) {
- /* We've filled half of the buffer, flush it as we receive more tasks */
- tracker_sparql_buffer_flush (buffer, "SPARQL buffer half-full");
+ if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (buffer))) {
+ tracker_sparql_buffer_flush (buffer, "SPARQL buffer limit reached");
+ } else if (priv->tasks->len > tracker_task_pool_get_limit (TRACKER_TASK_POOL (buffer)) / 2) {
+ /* We've filled half of the buffer, flush it as we receive more tasks */
+ tracker_sparql_buffer_flush (buffer, "SPARQL buffer half-full");
+ }
}
}
diff --git a/src/libtracker-miner/tracker-sparql-buffer.h b/src/libtracker-miner/tracker-sparql-buffer.h
index 6ee13b470..919ad9ae2 100644
--- a/src/libtracker-miner/tracker-sparql-buffer.h
+++ b/src/libtracker-miner/tracker-sparql-buffer.h
@@ -65,6 +65,7 @@ gboolean tracker_sparql_buffer_flush (TrackerSparqlBuffer *buffer,
void tracker_sparql_buffer_push (TrackerSparqlBuffer *buffer,
TrackerTask *task,
+ gint priority,
GAsyncReadyCallback cb,
gpointer user_data);