summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksander Morgado <aleksander@lanedo.com>2011-03-17 16:59:30 +0100
committerMartyn Russell <martyn@lanedo.com>2011-04-20 15:45:48 +0100
commitec4cc47b328e12826599edcbb5e732e0dbb09115 (patch)
tree71b8d4ceca2fa9102565813552cd909840ffa4db
parenta380106720f005b4b12bf99c53c8eb53018dc17d (diff)
downloadtracker-ec4cc47b328e12826599edcbb5e732e0dbb09115.tar.gz
libtracker-miner: Need to keep bulk ops if we are going to queue requests
-rw-r--r--src/libtracker-miner/tracker-miner-fs-processing-pool.c46
1 files changed, 29 insertions, 17 deletions
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.c b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
index 34e6d81e5..35f255d6d 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.c
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
@@ -211,6 +211,7 @@ typedef struct {
GPtrArray *tasks;
GArray *sparql_array;
GArray *error_map;
+ GPtrArray *bulk_ops;
guint n_bulk_operations;
} UpdateArrayData;
@@ -370,10 +371,12 @@ pool_status_trace_timeout_cb (gpointer data)
l = g_list_next (l);
}
}
- trace ("(Processing Pool %s) Requests being currently processed: %u (max: %u)",
+ trace ("(Processing Pool %s) Requests being currently processed: %u "
+ "(max: %u, pending: %u)",
G_OBJECT_TYPE_NAME (pool->miner),
pool->n_requests,
- pool->limit_n_requests);
+ pool->limit_n_requests,
+ pool->pending_requests->length);
return TRUE;
}
#endif /* PROCESSING_POOL_ENABLE_TRACE */
@@ -403,6 +406,12 @@ update_array_data_free (UpdateArrayData *update_data)
g_array_free (update_data->sparql_array, TRUE);
}
+ if (update_data->bulk_ops) {
+ /* The BulkOperationMerge structs which contain the sparql strings
+ * are deallocated here */
+ g_ptr_array_free (update_data->bulk_ops, TRUE);
+ }
+
g_ptr_array_free (update_data->tasks, TRUE);
g_array_free (update_data->error_map, TRUE);
g_slice_free (UpdateArrayData, update_data);
@@ -639,10 +648,11 @@ tracker_processing_pool_sparql_update_array_cb (GObject *object,
pool->n_requests--;
trace ("(Processing Pool) Finished array-update %p with %u tasks "
- "(%u requests pending)",
+ "(%u requests processing, %u requests queued)",
update_data->tasks,
update_data->tasks->len,
- pool->n_requests);
+ pool->n_requests,
+ pool->pending_requests->length);
sparql_array_errors = tracker_sparql_connection_update_array_finish (TRACKER_SPARQL_CONNECTION (object),
result,
@@ -881,7 +891,7 @@ void
tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
const gchar *reason)
{
- GPtrArray *bulk_ops;
+ GPtrArray *bulk_ops = NULL;
GArray *sparql_array, *error_map;
UpdateArrayData *update_data;
guint i, j;
@@ -895,9 +905,6 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
/* Loop buffer and construct array of strings */
sparql_array = g_array_new (FALSE, TRUE, sizeof (gchar *));
-
- /* TODO: Avoid preallocating this, as we may not have any bulk operation */
- bulk_ops = g_ptr_array_new_with_free_func ((GDestroyNotify) bulk_operation_merge_free);
error_map = g_array_new (TRUE, TRUE, sizeof (gint));
for (i = 0; i < pool->sparql_buffer->len; i++) {
@@ -927,6 +934,10 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
BulkOperationMerge *bulk = NULL;
gint j;
+ if (G_UNLIKELY (!bulk_ops)) {
+ bulk_ops = g_ptr_array_new_with_free_func ((GDestroyNotify) bulk_operation_merge_free);
+ }
+
for (j = 0; j < bulk_ops->len; j++) {
BulkOperationMerge *cur;
@@ -951,14 +962,16 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
g_array_append_val (error_map, pos);
}
- for (j = 0; j < bulk_ops->len; j++) {
- BulkOperationMerge *bulk;
+ if (bulk_ops) {
+ for (j = 0; j < bulk_ops->len; j++) {
+ BulkOperationMerge *bulk;
- bulk = g_ptr_array_index (bulk_ops, j);
- bulk_operation_merge_finish (bulk);
+ bulk = g_ptr_array_index (bulk_ops, j);
+ bulk_operation_merge_finish (bulk);
- if (bulk->sparql) {
- g_array_prepend_val (sparql_array, bulk->sparql);
+ if (bulk->sparql) {
+ g_array_prepend_val (sparql_array, bulk->sparql);
+ }
}
}
@@ -967,7 +980,8 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
update_data = g_slice_new0 (UpdateArrayData);
update_data->pool = pool;
update_data->tasks = pool->sparql_buffer;
- update_data->n_bulk_operations = bulk_ops->len;
+ update_data->bulk_ops = bulk_ops;
+ update_data->n_bulk_operations = bulk_ops ? bulk_ops->len : 0;
update_data->error_map = error_map;
update_data->sparql_array = sparql_array;
@@ -983,8 +997,6 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
g_object_unref (pool->sparql_buffer_current_parent);
pool->sparql_buffer_current_parent = NULL;
}
-
- g_ptr_array_free (bulk_ops, TRUE);
}
gboolean