diff options
author | Martyn Russell <martyn@lanedo.com> | 2010-10-14 17:44:23 +0100 |
---|---|---|
committer | Martyn Russell <martyn@lanedo.com> | 2010-10-14 17:44:23 +0100 |
commit | 4c203acb0507965ece6358e1db8297d0c0a35abe (patch) | |
tree | f3c1f1ddf013f6c50164e11beb8af906a0af6033 | |
parent | 69f705a51fb069dc0462c72a246c2d850311fefb (diff) | |
parent | 1798d43798d1a1bd4a3693e88fd9a75b2e188c58 (diff) | |
download | tracker-4c203acb0507965ece6358e1db8297d0c0a35abe.tar.gz |
Merge branch 'multi-insert'
-rwxr-xr-x | autogen.sh | 2 | ||||
-rw-r--r-- | src/libtracker-bus/tracker-bus-fd-update.c | 615 | ||||
-rw-r--r-- | src/libtracker-bus/tracker-bus-fd-update.h | 75 | ||||
-rw-r--r-- | src/libtracker-bus/tracker-bus-fd-update.vapi | 5 | ||||
-rw-r--r-- | src/libtracker-bus/tracker-bus.vala | 15 | ||||
-rw-r--r-- | src/libtracker-sparql/tracker-backend.vala | 5 | ||||
-rw-r--r-- | src/libtracker-sparql/tracker-connection.vala | 70 | ||||
-rw-r--r-- | src/tracker-store/tracker-steroids.c | 281 | ||||
-rw-r--r-- | tests/functional-tests/.gitignore | 1 | ||||
-rw-r--r-- | tests/functional-tests/Makefile.am | 14 | ||||
-rw-r--r-- | tests/functional-tests/update-array-performance-test.c | 283 | ||||
-rw-r--r-- | tests/tracker-steroids/tracker-test.c | 93 |
12 files changed, 1268 insertions, 191 deletions
diff --git a/autogen.sh b/autogen.sh index b2cda32fa..4e919ac2e 100755 --- a/autogen.sh +++ b/autogen.sh @@ -8,7 +8,7 @@ test -z "$srcdir" && srcdir=. PKG_NAME="tracker" REQUIRED_AUTOMAKE_VERSION=1.11 -REQUIRED_VALA_VERSION=0.9.4 +REQUIRED_VALA_VERSION=0.9.5 (test -f $srcdir/configure.ac \ && test -f $srcdir/README) || { diff --git a/src/libtracker-bus/tracker-bus-fd-update.c b/src/libtracker-bus/tracker-bus-fd-update.c index a7baa1330..76cc8e7a6 100644 --- a/src/libtracker-bus/tracker-bus-fd-update.c +++ b/src/libtracker-bus/tracker-bus-fd-update.c @@ -52,7 +52,6 @@ typedef struct { gulong cancelid; } FastAsyncData; - static void fast_async_data_free (gpointer data) { @@ -89,7 +88,6 @@ on_cancel_idle (gpointer data) "Operation was cancelled"); g_simple_async_result_set_from_error (fad->res, error); - g_simple_async_result_complete (fad->res); g_error_free (error); @@ -123,11 +121,15 @@ fast_async_data_new (DBusConnection *connection, data->connection = dbus_connection_ref (connection); data->operation_type = operation_type; + if (cancellable) { data->cancellable = g_object_ref (cancellable); - - data->cancelid = g_cancellable_connect (cancellable, G_CALLBACK (on_cancel), data, NULL); + data->cancelid = g_cancellable_connect (cancellable, + G_CALLBACK (on_cancel), + data, + NULL); } + data->user_data = user_data; return data; @@ -165,13 +167,9 @@ sparql_update_fast_callback (DBusPendingCall *call, error = sparql_error_from_dbus_message (reply); g_simple_async_result_set_from_error (fad->res, error); - dbus_message_unref (reply); - g_simple_async_result_complete (fad->res); - fast_async_data_free (fad); - dbus_pending_call_unref (call); return; @@ -185,8 +183,11 @@ sparql_update_fast_callback (DBusPendingCall *call, break; case FAST_UPDATE_BLANK: result = tracker_bus_message_to_variant (reply); - g_simple_async_result_set_op_res_gpointer (fad->res, result, NULL); + g_simple_async_result_set_op_res_gpointer (fad->res, + g_variant_ref (result), + (GDestroyNotify) g_variant_unref); g_simple_async_result_complete (fad->res); + fad->res = NULL; g_variant_unref (result); break; @@ -203,6 +204,89 @@ sparql_update_fast_callback (DBusPendingCall *call, dbus_pending_call_unref (call); } +static void +sparql_update_array_error_free (gpointer data) +{ + if (data) { + g_error_free (data); + } +} + +static void +sparql_update_array_fast_callback (DBusPendingCall *call, + void *user_data) +{ + FastAsyncData *fad = user_data; + DBusMessage *reply; + GError *error = NULL; + GPtrArray *errors; + DBusMessageIter iter, subiter; + + /* Check for errors */ + reply = dbus_pending_call_steal_reply (call); + + if (dbus_message_get_type (reply) == DBUS_MESSAGE_TYPE_ERROR) { + error = sparql_error_from_dbus_message (reply); + + g_simple_async_result_set_from_error (fad->res, error); + + dbus_message_unref (reply); + + g_simple_async_result_complete (fad->res); + + fast_async_data_free (fad); + + dbus_pending_call_unref (call); + + return; + } + + /* Call iterator callback */ + switch (fad->operation_type) { + case FAST_UPDATE: + case FAST_UPDATE_BATCH: + dbus_message_iter_init (reply, &iter); + + dbus_message_iter_recurse (&iter, &subiter); + + errors = g_ptr_array_new_with_free_func (sparql_update_array_error_free); + + while (dbus_message_iter_get_arg_type (&subiter) != DBUS_TYPE_INVALID) { + gchar *code, *message; + GError *error; + + dbus_message_iter_get_basic (&subiter, &code); + dbus_message_iter_next (&subiter); + dbus_message_iter_get_basic (&subiter, &message); + dbus_message_iter_next (&subiter); + + if (code && code[0] != '\0' && message && message[0] != '\0') { + error = g_error_new_literal (TRACKER_SPARQL_ERROR, 0, message); + } else { + error = NULL; + } + + g_ptr_array_add (errors, error); + } + + g_simple_async_result_set_op_res_gpointer (fad->res, + g_ptr_array_ref (errors), + (GDestroyNotify) g_ptr_array_unref); + g_simple_async_result_complete (fad->res); + fad->res = NULL; + g_ptr_array_unref (errors); + break; + default: + g_assert_not_reached (); + break; + } + + /* Clean up */ + dbus_message_unref (reply); + fast_async_data_free (fad); + dbus_pending_call_unref (call); +} + static DBusPendingCall * sparql_update_fast_send (DBusConnection *connection, const gchar *query, @@ -273,119 +357,295 @@ sparql_update_fast_send (DBusConnection *connection, g_propagate_error (error, inner_error); g_object_unref (data_output_stream); g_object_unref (buffered_output_stream); - g_object_unref (output_stream); - return NULL; - } - - g_data_output_stream_put_string (data_output_stream, - query, - NULL, - &inner_error); - - if (inner_error) { - g_propagate_error (error, inner_error); - g_object_unref (data_output_stream); - g_object_unref (buffered_output_stream); - g_object_unref (output_stream); - return NULL; - } - - g_object_unref (data_output_stream); - g_object_unref (buffered_output_stream); - g_object_unref (output_stream); - - return call; -} - -static DBusMessage * -sparql_update_fast (DBusConnection *connection, - const gchar *query, - FastOperationType type, - GError **error) -{ - DBusPendingCall *call; - DBusMessage *reply; - - call = sparql_update_fast_send (connection, query, type, error); - if (!call) { - return NULL; - } - - dbus_pending_call_block (call); - - reply = dbus_pending_call_steal_reply (call); - - if (dbus_message_get_type (reply) == DBUS_MESSAGE_TYPE_ERROR) { - g_propagate_error (error, sparql_error_from_dbus_message (reply)); - - return NULL; - } - - dbus_pending_call_unref (call); - - return reply; -} - -static void -sparql_update_fast_async (DBusConnection *connection, - const gchar *query, - FastAsyncData *fad, - GError **error) -{ - DBusPendingCall *call; - - call = sparql_update_fast_send (connection, query, fad->operation_type, error); - if (!call) { - /* Do some clean up ?*/ - return; - } - - fad->dbus_call = call; - - dbus_pending_call_set_notify (call, sparql_update_fast_callback, fad, NULL); -} - -/* Public API */ - -void -tracker_bus_fd_sparql_update (DBusGConnection *connection, - const char *query, - GError **error) -{ - DBusMessage *reply; - - g_return_if_fail (query != NULL); - - reply = sparql_update_fast (dbus_g_connection_get_connection (connection), - query, FAST_UPDATE, error); - - if (!reply) { - return; - } - - dbus_message_unref (reply); -} - -void -tracker_bus_fd_sparql_update_async (DBusGConnection *connection, - const char *query, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data) -{ - FastAsyncData *fad; - GError *error = NULL; - - g_return_if_fail (query != NULL); - - fad = fast_async_data_new (dbus_g_connection_get_connection (connection), - FAST_UPDATE, cancellable, user_data); - - fad->res = g_simple_async_result_new (NULL, callback, user_data, - tracker_bus_fd_sparql_update_async); - - sparql_update_fast_async (dbus_g_connection_get_connection (connection), - query, fad, &error); + g_object_unref (output_stream); + + return NULL; + } + + g_data_output_stream_put_string (data_output_stream, + query, + NULL, + &inner_error); + + if (inner_error) { + g_propagate_error (error, inner_error); + g_object_unref (data_output_stream); + g_object_unref (buffered_output_stream); + g_object_unref (output_stream); + return NULL; + } + + g_object_unref (data_output_stream); + g_object_unref (buffered_output_stream); + g_object_unref (output_stream); + + return call; + } + + static DBusPendingCall * + sparql_update_array_fast_send (DBusConnection *connection, + const gchar **queries, + guint queries_len, + FastOperationType type, + GError **error) + { + const gchar *dbus_method; + DBusMessage *message; + DBusMessageIter iter; + DBusPendingCall *call; + int pipefd[2], i; + GOutputStream *output_stream; + GOutputStream *buffered_output_stream; + GDataOutputStream *data_output_stream; + GError *inner_error = NULL; + + g_return_val_if_fail (queries != NULL, NULL); + g_return_val_if_fail (queries_len != 0, NULL); + + if (pipe (pipefd) < 0) { + g_set_error (error, + TRACKER_SPARQL_ERROR, + TRACKER_SPARQL_ERROR_UNSUPPORTED, + "Cannot open pipe"); + return NULL; + } + + switch (type) { + case FAST_UPDATE: + dbus_method = "UpdateArray"; + break; + case FAST_UPDATE_BATCH: + dbus_method = "BatchUpdateArray"; + break; + default: + g_assert_not_reached (); + } + + message = dbus_message_new_method_call (TRACKER_DBUS_SERVICE, + TRACKER_DBUS_OBJECT_STEROIDS, + TRACKER_DBUS_INTERFACE_STEROIDS, + dbus_method); + dbus_message_iter_init_append (message, &iter); + dbus_message_iter_append_basic (&iter, DBUS_TYPE_UNIX_FD, &pipefd[0]); + dbus_connection_send_with_reply (connection, message, &call, -1); + dbus_message_unref (message); + close (pipefd[0]); + + if (!call) { + g_set_error (error, + TRACKER_SPARQL_ERROR, + TRACKER_SPARQL_ERROR_UNSUPPORTED, + "FD passing unsupported or connection disconnected"); + return NULL; + } + + output_stream = g_unix_output_stream_new (pipefd[1], TRUE); + buffered_output_stream = g_buffered_output_stream_new_sized (output_stream, + TRACKER_DBUS_PIPE_BUFFER_SIZE); + data_output_stream = g_data_output_stream_new (buffered_output_stream); + + g_data_output_stream_put_uint32 (data_output_stream, + queries_len, + NULL, + &inner_error); + + for (i = 0; i < queries_len; i++) { + const gchar *query = queries[i]; + + g_data_output_stream_put_int32 (data_output_stream, + strlen (query), + NULL, + &inner_error); + + if (inner_error) { + g_propagate_error (error, inner_error); + g_object_unref (data_output_stream); + g_object_unref (buffered_output_stream); + g_object_unref (output_stream); + return NULL; + } + + g_data_output_stream_put_string (data_output_stream, + query, + NULL, + &inner_error); + + if (inner_error) { + g_propagate_error (error, inner_error); + g_object_unref (data_output_stream); + g_object_unref (buffered_output_stream); + g_object_unref (output_stream); + return NULL; + } + } + + g_object_unref (data_output_stream); + g_object_unref (buffered_output_stream); + g_object_unref (output_stream); + + return call; + } + + static DBusMessage * + sparql_update_fast (DBusConnection *connection, + const gchar *query, + FastOperationType type, + GError **error) + { + DBusPendingCall *call; + DBusMessage *reply; + + call = sparql_update_fast_send (connection, query, type, error); + if (!call) { + return NULL; + } + + dbus_pending_call_block (call); + + reply = dbus_pending_call_steal_reply (call); + + if (dbus_message_get_type (reply) == DBUS_MESSAGE_TYPE_ERROR) { + g_propagate_error (error, sparql_error_from_dbus_message (reply)); + + return NULL; + } + + dbus_pending_call_unref (call); + + return reply; + } + + static void + sparql_update_fast_async (DBusConnection *connection, + const gchar *query, + FastAsyncData *fad, + GError **error) + { + DBusPendingCall *call; + + call = sparql_update_fast_send (connection, query, fad->operation_type, error); + if (!call) { + /* Do some clean up ?*/ + return; + } + + fad->dbus_call = call; + + dbus_pending_call_set_notify (call, sparql_update_fast_callback, fad, NULL); + } + + static void + sparql_update_array_fast_async (DBusConnection *connection, + const gchar **queries, + guint queries_len, + FastAsyncData *fad, + GError **error) + { + DBusPendingCall *call; + + call = sparql_update_array_fast_send (connection, + queries, + queries_len, + fad->operation_type, + error); + + if (!call) { + /* Do some clean up ?*/ + return; + } + + fad->dbus_call = call; + + dbus_pending_call_set_notify (call, sparql_update_array_fast_callback, fad, NULL); + } + + /* Public API */ + + void + tracker_bus_fd_sparql_update (DBusGConnection *connection, + const char *query, + GError **error) + { + DBusMessage *reply; + + g_return_if_fail (query != NULL); + + reply = sparql_update_fast (dbus_g_connection_get_connection (connection), + query, + FAST_UPDATE, + error); + + if (!reply) { + return; + } + + dbus_message_unref (reply); + } + + void + tracker_bus_fd_sparql_update_async (DBusGConnection *connection, + const char *query, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) + { + FastAsyncData *fad; + GError *error = NULL; + + g_return_if_fail (query != NULL); + + fad = fast_async_data_new (dbus_g_connection_get_connection (connection), + FAST_UPDATE, + cancellable, + user_data); + + fad->res = g_simple_async_result_new (NULL, + callback, + user_data, + tracker_bus_fd_sparql_update_async); + + sparql_update_fast_async (dbus_g_connection_get_connection (connection), + query, + fad, + &error); + + if (error) { + g_critical ("Could not initiate update: %s", error->message); + g_error_free (error); + g_object_unref (fad->res); + fast_async_data_free (fad); + } + } + + void + tracker_bus_fd_sparql_update_array_async (DBusGConnection *connection, + const char **queries, + guint queries_len, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) + { + FastAsyncData *fad; + GError *error = NULL; + + g_return_if_fail (queries != NULL); + g_return_if_fail (queries_len != 0); + + fad = fast_async_data_new (dbus_g_connection_get_connection (connection), + FAST_UPDATE, + cancellable, + user_data); + + fad->res = g_simple_async_result_new (NULL, + callback, + user_data, + tracker_bus_fd_sparql_update_async); + + sparql_update_array_fast_async (dbus_g_connection_get_connection (connection), + queries, + queries_len, + fad, + &error); if (error) { g_critical ("Could not initiate update: %s", error->message); @@ -396,8 +656,8 @@ tracker_bus_fd_sparql_update_async (DBusGConnection *connection, } void -tracker_bus_fd_sparql_update_finish (GAsyncResult *res, - GError **error) +tracker_bus_fd_sparql_update_finish (GAsyncResult *res, + GError **error) { g_return_if_fail (res != NULL); @@ -405,16 +665,23 @@ tracker_bus_fd_sparql_update_finish (GAsyncResult *res, } GVariant * -tracker_bus_fd_sparql_update_blank_finish (GAsyncResult *res, - GError **error) +tracker_bus_fd_sparql_update_blank_finish (GAsyncResult *res, + GError **error) { + GSimpleAsyncResult *simple_res; + gpointer p; + g_return_val_if_fail (res != NULL, NULL); - if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error)) { + simple_res = G_SIMPLE_ASYNC_RESULT (res); + + if (g_simple_async_result_propagate_error (simple_res, error)) { return NULL; } - return g_variant_ref (g_simple_async_result_get_op_res_gpointer (G_SIMPLE_ASYNC_RESULT (res))); + p = g_simple_async_result_get_op_res_gpointer (simple_res); + + return g_variant_ref (p); } GVariant * @@ -428,7 +695,9 @@ tracker_bus_fd_sparql_update_blank (DBusGConnection *connection, g_return_val_if_fail (query != NULL, NULL); reply = sparql_update_fast (dbus_g_connection_get_connection (connection), - query, FAST_UPDATE_BLANK, error); + query, + FAST_UPDATE_BLANK, + error); if (!reply) { return NULL; @@ -450,11 +719,11 @@ tracker_bus_fd_sparql_update_blank (DBusGConnection *connection, } void -tracker_bus_fd_sparql_update_blank_async (DBusGConnection *connection, - const gchar *query, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data) +tracker_bus_fd_sparql_update_blank_async (DBusGConnection *connection, + const gchar *query, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) { FastAsyncData *fad; GError *error = NULL; @@ -467,11 +736,15 @@ tracker_bus_fd_sparql_update_blank_async (DBusGConnection *connection, cancellable, user_data); - fad->res = g_simple_async_result_new (NULL, callback, user_data, + fad->res = g_simple_async_result_new (NULL, + callback, + user_data, tracker_bus_fd_sparql_update_blank_async); sparql_update_fast_async (dbus_g_connection_get_connection (connection), - query, fad, &error); + query, + fad, + &error); if (error) { g_critical ("Could not initiate update: %s", error->message); @@ -491,7 +764,9 @@ tracker_bus_fd_sparql_batch_update (DBusGConnection *connection, g_return_if_fail (query != NULL); reply = sparql_update_fast (dbus_g_connection_get_connection (connection), - query, FAST_UPDATE_BATCH, error); + query, + FAST_UPDATE_BATCH, + error); if (!reply) { return; @@ -530,10 +805,72 @@ tracker_bus_fd_sparql_batch_update_async (DBusGConnection *connection, } void -tracker_bus_fd_sparql_batch_update_finish (GAsyncResult *res, - GError **error) +tracker_bus_fd_sparql_batch_update_array_async (DBusGConnection *connection, + const char **queries, + guint queries_len, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + FastAsyncData *fad; + GError *error = NULL; + + g_return_if_fail (queries != NULL); + g_return_if_fail (queries_len != 0); + + fad = fast_async_data_new (dbus_g_connection_get_connection (connection), + FAST_UPDATE_BATCH, + cancellable, + user_data); + + fad->res = g_simple_async_result_new (NULL, + callback, + user_data, + tracker_bus_fd_sparql_batch_update_async); + + sparql_update_array_fast_async (dbus_g_connection_get_connection (connection), + queries, + queries_len, + fad, + &error); + + if (error) { + g_critical ("Could not initiate update: %s", error->message); + g_error_free (error); + g_object_unref (fad->res); + fast_async_data_free (fad); + } +} + +void +tracker_bus_fd_sparql_batch_update_finish (GAsyncResult *res, + GError **error) { g_return_if_fail (res != NULL); g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error); } + +GPtrArray * +tracker_bus_fd_sparql_update_array_finish (GAsyncResult *res) +{ + gpointer p; + + g_return_val_if_fail (res != NULL, NULL); + + p = g_simple_async_result_get_op_res_gpointer (G_SIMPLE_ASYNC_RESULT (res)); + + return g_ptr_array_ref (p); +} + +GPtrArray * +tracker_bus_fd_sparql_batch_update_array_finish (GAsyncResult *res) +{ + gpointer p; + + g_return_val_if_fail (res != NULL, NULL); + + p = g_simple_async_result_get_op_res_gpointer (G_SIMPLE_ASYNC_RESULT (res)); + + return g_ptr_array_ref (p); +} diff --git a/src/libtracker-bus/tracker-bus-fd-update.h b/src/libtracker-bus/tracker-bus-fd-update.h index b844a13db..46fdd4635 100644 --- a/src/libtracker-bus/tracker-bus-fd-update.h +++ b/src/libtracker-bus/tracker-bus-fd-update.h @@ -23,35 +23,50 @@ G_BEGIN_DECLS -void tracker_bus_fd_sparql_update (DBusGConnection *connection, - const char *query, - GError **error); -void tracker_bus_fd_sparql_update_async (DBusGConnection *connection, - const char *query, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data); -GVariant * tracker_bus_fd_sparql_update_blank (DBusGConnection *connection, - const gchar *query, - GError **error); -void tracker_bus_fd_sparql_update_blank_async (DBusGConnection *connection, - const gchar *query, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data); -void tracker_bus_fd_sparql_batch_update (DBusGConnection *connection, - const char *query, - GError **error); -void tracker_bus_fd_sparql_batch_update_async (DBusGConnection *connection, - const char *query, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data); -void tracker_bus_fd_sparql_update_finish (GAsyncResult *res, - GError **error); -GVariant * tracker_bus_fd_sparql_update_blank_finish (GAsyncResult *res, - GError **error); -void tracker_bus_fd_sparql_batch_update_finish (GAsyncResult *res, - GError **error); +void tracker_bus_fd_sparql_update (DBusGConnection *connection, + const char *query, + GError **error); +void tracker_bus_fd_sparql_update_async (DBusGConnection *connection, + const char *query, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); +void tracker_bus_fd_sparql_update_array_async (DBusGConnection *connection, + const char **queries, + guint queries_len, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); +GVariant * tracker_bus_fd_sparql_update_blank (DBusGConnection *connection, + const gchar *query, + GError **error); +void tracker_bus_fd_sparql_update_blank_async (DBusGConnection *connection, + const gchar *query, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); +void tracker_bus_fd_sparql_batch_update (DBusGConnection *connection, + const char *query, + GError **error); +void tracker_bus_fd_sparql_batch_update_async (DBusGConnection *connection, + const char *query, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); +void tracker_bus_fd_sparql_batch_update_array_async (DBusGConnection *connection, + const char **queries, + guint queries_len, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); +void tracker_bus_fd_sparql_update_finish (GAsyncResult *res, + GError **error); +GVariant * tracker_bus_fd_sparql_update_blank_finish (GAsyncResult *res, + GError **error); +void tracker_bus_fd_sparql_batch_update_finish (GAsyncResult *res, + GError **error); + +GPtrArray* tracker_bus_fd_sparql_update_array_finish (GAsyncResult *res); +GPtrArray* tracker_bus_fd_sparql_batch_update_array_finish (GAsyncResult *res); G_END_DECLS diff --git a/src/libtracker-bus/tracker-bus-fd-update.vapi b/src/libtracker-bus/tracker-bus-fd-update.vapi index a94ed3d23..e5f1dc20b 100644 --- a/src/libtracker-bus/tracker-bus-fd-update.vapi +++ b/src/libtracker-bus/tracker-bus-fd-update.vapi @@ -29,3 +29,8 @@ public extern async void tracker_bus_fd_sparql_update_async (DBus.Connection con public extern async GLib.Variant tracker_bus_fd_sparql_update_blank_async (DBus.Connection connection, string query, GLib.Cancellable? cancellable = null) throws Tracker.Sparql.Error, DBus.Error, GLib.IOError; [CCode (cheader_filename = "tracker-bus-fd-update.h")] public extern async void tracker_bus_fd_sparql_batch_update_async (DBus.Connection connection, string query, GLib.Cancellable? cancellable = null) throws Tracker.Sparql.Error, DBus.Error, GLib.IOError; + +[CCode (cheader_filename = "tracker-bus-fd-update.h")] +public extern async GLib.PtrArray? tracker_bus_fd_sparql_update_array_async (DBus.Connection connection, string query[], GLib.Cancellable? cancellable = null); +[CCode (cheader_filename = "tracker-bus-fd-update.h")] +public extern async GLib.PtrArray? tracker_bus_fd_sparql_batch_update_array_async (DBus.Connection connection, string[] query, GLib.Cancellable? cancellable = null); diff --git a/src/libtracker-bus/tracker-bus.vala b/src/libtracker-bus/tracker-bus.vala index ce3f9410e..60081341c 100644 --- a/src/libtracker-bus/tracker-bus.vala +++ b/src/libtracker-bus/tracker-bus.vala @@ -107,6 +107,21 @@ public class Tracker.Bus.Connection : Tracker.Sparql.Connection { } } + public async override GLib.PtrArray? update_array_async (string[] sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError { + try { + // helper variable necessary to work around bug in vala < 0.11 + PtrArray result; + if (priority >= GLib.Priority.DEFAULT) { + result = yield tracker_bus_fd_sparql_update_array_async (connection, sparql, cancellable); + } else { + result = yield tracker_bus_fd_sparql_batch_update_array_async (connection, sparql, cancellable); + } + return result; + } catch (DBus.Error e) { + throw new Sparql.Error.INTERNAL (e.message); + } + } + public override GLib.Variant? update_blank (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError { try { GLib.Variant res = null; diff --git a/src/libtracker-sparql/tracker-backend.vala b/src/libtracker-sparql/tracker-backend.vala index 64c30dfbd..85955963f 100644 --- a/src/libtracker-sparql/tracker-backend.vala +++ b/src/libtracker-sparql/tracker-backend.vala @@ -135,6 +135,11 @@ class Tracker.Sparql.Backend : Connection { yield bus.update_async (sparql, priority, cancellable); } + public async override GLib.PtrArray? update_array_async (string[] sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError + requires (bus != null) { + return yield bus.update_array_async (sparql, priority, cancellable); + } + public async override GLib.Variant? update_blank_async (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError requires (bus != null) { debug ("%s(priority:%d): '%s'", Log.METHOD, priority, sparql); diff --git a/src/libtracker-sparql/tracker-connection.vala b/src/libtracker-sparql/tracker-connection.vala index cb87393a8..024714b84 100644 --- a/src/libtracker-sparql/tracker-connection.vala +++ b/src/libtracker-sparql/tracker-connection.vala @@ -402,10 +402,10 @@ public abstract class Tracker.Sparql.Connection : Object { * @self: a #TrackerSparqlConnection * @sparql: string containing the SPARQL update query * @priority: the priority for the asynchronous operation + * @cancellable: a #GCancellable used to cancel the operation * @_callback_: user-defined #GAsyncReadyCallback to be called when * asynchronous operation is finished. * @_user_data_: user-defined data to be passed to @_callback_ - * @cancellable: a #GCancellable used to cancel the operation * * Executes asynchronously a SPARQL update. * @@ -427,6 +427,68 @@ public abstract class Tracker.Sparql.Connection : Object { } /** + * tracker_sparql_connection_update_array_async: + * @self: a #TrackerSparqlConnection + * @sparql: an array of strings containing the SPARQL update queries + * @sparql_length1: the amount of strings you pass as @sparql + * @priority: the priority for the asynchronous operation + * @cancellable: a #GCancellable used to cancel the operation + * @_callback_: user-defined #GAsyncReadyCallback to be called when + * asynchronous operation is finished. + * @_user_data_: user-defined data to be passed to @_callback_ + * + * Executes asynchronously an array of SPARQL updates. Each update in the + * array is its own transaction. This means that update n+1 is not halted + * due to an error in update n. + * + * Since: 0.10 + */ + + /** + * tracker_sparql_connection_update_array_finish: + * @self: a #TrackerSparqlConnection + * @_res_: a #GAsyncResult with the result of the operation + * @error: #GError for error reporting. + * + * Finishes the asynchronous SPARQL update_array operation. + * + * <example> + * <programlisting> + * static void + * async_update_array_callback (GObject *source_object, + * GAsyncResult *result, + * gpointer user_data) + * { + * GError *error = NULL; + * GPtrArray *errors; + * guint i; + * errors = tracker_sparql_connection_update_array_finish (connection, result, &error); + * g_assert_no_error (error); + * for (i = 0; i < errors->len; i++) { + * const GError *a_error = g_ptr_array_index (errors, i); + * } + * g_ptr_array_unref (errors); + * } + * </programlisting> + * </example> + * + * Returns: a #GPtrArray of size @sparql_length1 with elements that are + * either NULL or a GError instance. The returned array should be freed with + * g_ptr_array_unref when no longer used, not with g_ptr_array_free. When + * you use errors of the array, you must g_error_copy them. Errors inside of + * the array must be considered as const data and not freed. The index of + * the error corresponds to the index of the update query in the array that + * you passed to tracker_sparql_connection_update_array_async. + * + * Since: 0.10 + */ + + public async virtual GLib.PtrArray? update_array_async (string[] sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError { + warning ("Interface 'update_array_async' not implemented"); + return null; + } + + /** * tracker_sparql_connection_update_blank: * @self: a #TrackerSparqlConnection * @sparql: string containing the SPARQL update query @@ -452,10 +514,10 @@ public abstract class Tracker.Sparql.Connection : Object { * @self: a #TrackerSparqlConnection * @sparql: string containing the SPARQL update query * @priority: the priority for the asynchronous operation + * @cancellable: a #GCancellable used to cancel the operation * @_callback_: user-defined #GAsyncReadyCallback to be called when * asynchronous operation is finished. * @_user_data_: user-defined data to be passed to @_callback_ - * @cancellable: a #GCancellable used to cancel the operation * * Executes asynchronously a SPARQL update. * @@ -501,10 +563,10 @@ public abstract class Tracker.Sparql.Connection : Object { * tracker_sparql_connection_load_async: * @self: a #TrackerSparqlConnection * @file: a #GFile + * @cancellable: a #GCancellable used to cancel the operation * @_callback_: user-defined #GAsyncReadyCallback to be called when * asynchronous operation is finished. * @_user_data_: user-defined data to be passed to @_callback_ - * @cancellable: a #GCancellable used to cancel the operation * * Loads, asynchronously, a Turtle file (TTL) into the store. * @@ -548,10 +610,10 @@ public abstract class Tracker.Sparql.Connection : Object { /** * tracker_sparql_connection_statistics_async: * @self: a #TrackerSparqlConnection + * @cancellable: a #GCancellable used to cancel the operation * @_callback_: user-defined #GAsyncReadyCallback to be called when * asynchronous operation is finished. * @_user_data_: user-defined data to be passed to @_callback_ - * @cancellable: a #GCancellable used to cancel the operation * * Retrieves, asynchronously, the statistics from the Store. * diff --git a/src/tracker-store/tracker-steroids.c b/src/tracker-store/tracker-steroids.c index 59de111e6..4b5e8d16d 100644 --- a/src/tracker-store/tracker-steroids.c +++ b/src/tracker-store/tracker-steroids.c @@ -51,6 +51,11 @@ typedef struct { int fd; guint request_id; DBusConnection *connection; + struct { + int query_count; + int seen; + GPtrArray *errors; + } array_info; } ClientInfo; typedef struct { @@ -174,6 +179,63 @@ update_callback (GError *error, gpointer user_data) } static void +update_array_callback (GError *error, gpointer user_data) +{ + ClientInfo *info = user_data; + DBusMessage *reply; + + info->array_info.seen++; + + if (!info->array_info.errors) { + info->array_info.errors = g_ptr_array_new (); + } + + if (error) { + g_ptr_array_add (info->array_info.errors, g_error_copy (error)); + } else { + g_ptr_array_add (info->array_info.errors, NULL); + } + + if (info->array_info.seen == info->array_info.query_count) { + guint i; + DBusMessageIter iter, subiter; + + tracker_dbus_request_success (info->request_id, NULL); + reply = dbus_message_new_method_return (info->call_message); + + dbus_message_iter_init_append (reply, &iter); + dbus_message_iter_open_container (&iter, DBUS_TYPE_ARRAY, "ss", &subiter); + + for (i = 0; i < info->array_info.errors->len; i++) { + GError *error = g_ptr_array_index (info->array_info.errors, i); + const gchar *str = ""; + const gchar *message = ""; + + if (error) { + str = TRACKER_STEROIDS_INTERFACE ".UpdateError"; + message = error->message; + } + + dbus_message_iter_append_basic (&subiter, DBUS_TYPE_STRING, &str); + dbus_message_iter_append_basic (&subiter, DBUS_TYPE_STRING, &message); + + if (error) { + g_error_free (error); + } + } + + g_ptr_array_free (info->array_info.errors, TRUE); + + dbus_message_iter_close_container (&iter, &subiter); + + dbus_connection_send (info->connection, reply, NULL); + dbus_message_unref (reply); + + client_info_destroy (info); + } +} + +static void marshal_hash_table_item (gpointer key, gpointer value, gpointer user_data) @@ -468,7 +530,7 @@ steroids_query (TrackerSteroids *steroids, return; } - info = g_slice_new (ClientInfo); + info = g_slice_new0 (ClientInfo); info->connection = dbus_connection_ref (connection); info->call_message = dbus_message_ref (message); info->request_id = request_id; @@ -574,7 +636,7 @@ steroids_update (TrackerSteroids *steroids, __FUNCTION__, fd); - info = g_slice_new (ClientInfo); + info = g_slice_new0 (ClientInfo); info->connection = dbus_connection_ref (connection); info->call_message = dbus_message_ref (message); info->request_id = request_id; @@ -669,6 +731,211 @@ steroids_update (TrackerSteroids *steroids, g_free (query); } + +static void +steroids_update_array (TrackerSteroids *steroids, + DBusConnection *connection, + DBusMessage *message, + gboolean batch) +{ + DBusError dbus_error; + ClientInfo *info; + GInputStream *input_stream; + GDataInputStream *data_input_stream; + GError *error = NULL; + guint request_id; + const gchar *sender; + int i; + DBusMessage *reply; + int fd; + gchar **query_array; + + request_id = tracker_dbus_get_next_request_id (); + + if (g_strcmp0 (dbus_message_get_signature (message), DBUS_TYPE_UNIX_FD_AS_STRING)) { + tracker_dbus_request_new (request_id, + NULL, + "%s()", + __FUNCTION__); + + reply = dbus_message_new_error_printf (message, + DBUS_ERROR_UNKNOWN_METHOD, + UNKNOWN_METHOD_MESSAGE, + "Update", + dbus_message_get_signature (message), + dbus_message_get_interface (message), + DBUS_TYPE_UNIX_FD_AS_STRING); + dbus_connection_send (connection, reply, NULL); + dbus_message_unref (reply); + + tracker_dbus_request_failed (request_id, + NULL, + NULL, + UNKNOWN_METHOD_MESSAGE, + "Update", + dbus_message_get_signature (message), + dbus_message_get_interface (message), + DBUS_TYPE_UNIX_FD_AS_STRING); + + return; + } + + dbus_error_init (&dbus_error); + + dbus_message_get_args (message, + &dbus_error, + DBUS_TYPE_UNIX_FD, &fd, + DBUS_TYPE_INVALID); + + if (dbus_error_is_set (&dbus_error)) { + tracker_dbus_request_new (request_id, + NULL, + "%s()", + __FUNCTION__); + + reply = dbus_message_new_error (message, dbus_error.name, dbus_error.message); + dbus_connection_send (connection, reply, NULL); + + tracker_dbus_request_failed (request_id, + NULL, + NULL, + dbus_error.message); + + dbus_message_unref (reply); + dbus_error_free (&dbus_error); + + return; + } + + tracker_dbus_request_new (request_id, + NULL, + "%s(fd:%d)", + __FUNCTION__, + fd); + + info = g_slice_new0 (ClientInfo); + info->connection = dbus_connection_ref (connection); + info->call_message = dbus_message_ref (message); + info->request_id = request_id; + info->fd = fd; + + sender = dbus_message_get_sender (message); + + input_stream = g_unix_input_stream_new (info->fd, TRUE); + data_input_stream = g_data_input_stream_new (input_stream); + g_buffered_input_stream_set_buffer_size (G_BUFFERED_INPUT_STREAM (data_input_stream), + TRACKER_STEROIDS_BUFFER_SIZE); + + info->array_info.query_count = g_data_input_stream_read_uint32 (data_input_stream, + NULL, + &error); + + if (error) { + reply = dbus_message_new_error (info->call_message, + TRACKER_STEROIDS_INTERFACE ".UpdateError", + error->message); + dbus_connection_send (connection, reply, NULL); + dbus_message_unref (reply); + + tracker_dbus_request_failed (request_id, + NULL, + NULL, + error->message); + + g_object_unref (data_input_stream); + g_object_unref (input_stream); + g_error_free (error); + client_info_destroy (info); + + return; + } + + info->array_info.seen = 0; + query_array = g_new0 (gchar*, info->array_info.query_count + 1); + + for (i = 0; i < info->array_info.query_count; i++) { + gsize bytes_read; + int query_size; + + query_size = g_data_input_stream_read_int32 (data_input_stream, + NULL, + &error); + + if (error) { + reply = dbus_message_new_error (info->call_message, + TRACKER_STEROIDS_INTERFACE ".UpdateError", + error->message); + dbus_connection_send (connection, reply, NULL); + dbus_message_unref (reply); + + tracker_dbus_request_failed (request_id, + NULL, + NULL, + error->message); + + g_strfreev (query_array); + g_object_unref (data_input_stream); + g_object_unref (input_stream); + g_error_free (error); + client_info_destroy (info); + + return; + } + + /* We malloc one more char to ensure string is 0 terminated */ + query_array[i] = g_malloc0 ((1 + query_size) * sizeof (char)); + + g_input_stream_read_all (input_stream, + query_array[i], + query_size, + &bytes_read, + NULL, + &error); + + if (error) { + reply = dbus_message_new_error (info->call_message, + TRACKER_STEROIDS_INTERFACE ".UpdateError", + error->message); + dbus_connection_send (connection, reply, NULL); + dbus_message_unref (reply); + + tracker_dbus_request_failed (request_id, + NULL, + NULL, + error->message); + + g_strfreev (query_array); + g_object_unref (data_input_stream); + g_object_unref (input_stream); + g_error_free (error); + client_info_destroy (info); + + return; + } + + } + + g_object_unref (data_input_stream); + g_object_unref (input_stream); + + for (i = 0; query_array[i] != NULL; i++) { + + tracker_dbus_request_debug (request_id, + NULL, + "query: '%s'", + query_array[i]); + + tracker_store_sparql_update (query_array[i], + batch ? TRACKER_STORE_PRIORITY_LOW : TRACKER_STORE_PRIORITY_HIGH, + update_array_callback, + sender, + info, + NULL); + } + + g_strfreev (query_array); +} + DBusHandlerResult tracker_steroids_connection_filter (DBusConnection *connection, DBusMessage *message, @@ -697,6 +964,11 @@ tracker_steroids_connection_filter (DBusConnection *connection, return DBUS_HANDLER_RESULT_HANDLED; } + if (!g_strcmp0 ("UpdateArray", dbus_message_get_member (message))) { + steroids_update_array (steroids, connection, message, FALSE); + return DBUS_HANDLER_RESULT_HANDLED; + } + if (!g_strcmp0 ("Update", dbus_message_get_member (message))) { steroids_update (steroids, connection, message, FALSE, FALSE); return DBUS_HANDLER_RESULT_HANDLED; @@ -712,5 +984,10 @@ tracker_steroids_connection_filter (DBusConnection *connection, return DBUS_HANDLER_RESULT_HANDLED; } + if (!g_strcmp0 ("BatchUpdateArray", dbus_message_get_member (message))) { + steroids_update_array (steroids, connection, message, TRUE); + return DBUS_HANDLER_RESULT_HANDLED; + } + return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; } diff --git a/tests/functional-tests/.gitignore b/tests/functional-tests/.gitignore index fb75dfea9..9851bf468 100644 --- a/tests/functional-tests/.gitignore +++ b/tests/functional-tests/.gitignore @@ -1,3 +1,4 @@ +update-array-performance-test class-signal-performance-test class-signal-performance-test.c class-signal-test diff --git a/tests/functional-tests/Makefile.am b/tests/functional-tests/Makefile.am index 4d0269b5f..edacaa8e3 100644 --- a/tests/functional-tests/Makefile.am +++ b/tests/functional-tests/Makefile.am @@ -88,7 +88,8 @@ noinst_PROGRAMS = busy-handling-test \ default-update-test \ bus-update-test \ class-signal-test \ - class-signal-performance-test + class-signal-performance-test \ + update-array-performance-test busy_handling_test_VALASOURCES = busy-handling-test.vala @@ -119,6 +120,8 @@ default_update_test_SOURCES = \ bus_query_test_VALASOURCES = shared-query-test.vala bus-query-test.vala +update_array_performance_test_SOURCES = update-array-performance-test.c + bus_query_test_SOURCES = \ bus-query-test.vala.stamp \ $(bus_query_test_VALASOURCES:.vala=.c) @@ -209,6 +212,15 @@ class_signal_performance_test_LDADD = \ $(GLIB2_LIBS) \ $(DBUS_LIBS) +update_array_performance_test_LDADD = \ + $(top_builddir)/src/libtracker-direct/libtracker-direct.la \ + $(top_builddir)/src/libtracker-bus/libtracker-bus.la \ + $(top_builddir)/src/libtracker-sparql/libtracker-sparql-$(TRACKER_API_VERSION).la \ + $(top_builddir)/src/libtracker-common/libtracker-common.la \ + $(GIO_LIBS) \ + $(GLIB2_LIBS) \ + $(DBUS_LIBS) + BUILT_SOURCES = \ busy-handling-test.vala.stamp \ class-signal-test.vala.stamp \ diff --git a/tests/functional-tests/update-array-performance-test.c b/tests/functional-tests/update-array-performance-test.c new file mode 100644 index 000000000..bfdfa2ca2 --- /dev/null +++ b/tests/functional-tests/update-array-performance-test.c @@ -0,0 +1,283 @@ +/* + * Copyright (C) 2010, Codeminded BVBA <abustany@gnome.org> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA. + * + * Copied from ../tracker-steroids/tracker-test.c + */ + +#include <stdlib.h> +#include <string.h> + +#include <tracker-bus.h> +#include <tracker-sparql.h> + +typedef struct { + GMainLoop *main_loop; + const gchar *query; + guint len, cur; +} AsyncData; + +static TrackerSparqlConnection *connection; +#define MSIZE 90 +#define TEST_STR "Brrr0092323" + +static const gchar *queries[90] = { + "INSERT { _:a0 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:a9 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:a11 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:b0 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:b9 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:b11 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:c0 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:c9 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:c12 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:d0 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:d9 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:d12 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:e0 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:e9 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:e11 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:f0 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:f9 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:f11 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:b1 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:b8 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:b13 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:c1 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:c8 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:c13 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:d1 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:d8 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:d14 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:e1 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:e8 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:e14 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:f1 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:f8 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:f15 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:b2 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:b7 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:b15 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:c2 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:c7 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:c15 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:d2 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:d7 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:d16 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:e2 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:e7 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:e16 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:f2 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:f7 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:f17 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:b3 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:b6 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:b16 a nmo:Message; nie:title '" TEST_STR "'}", + "INSERT { _:c3 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:c6 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:c18 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:d3 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:d6 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:d19 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:e3 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:e6 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:e20 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:f3 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:f6 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:f21 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:b4 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:b5 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:b22 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:c4 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:c5 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:c23 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:d4 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:d5 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:d24 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:e4 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:e5 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:e24 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:f4 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:f5 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:f25 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:c5 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:c2 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:c26 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:d5 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:d2 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:d28 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:e5 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:e2 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:e29 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:f5 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:f2 a nmo:Message; nie:title '" TEST_STR "' }", + "INSERT { _:f33 a nmo:Message; nie:title '" TEST_STR "' }" }; + +static void +async_update_array_callback (GObject *source_object, + GAsyncResult *result, + gpointer user_data) +{ + AsyncData *data = user_data; + GError *error = NULL; + GPtrArray *errors; + + errors = tracker_sparql_connection_update_array_finish (connection, result, &error); + g_assert_no_error (error); + g_ptr_array_unref (errors); + g_main_loop_quit (data->main_loop); +} + + +static void +test_tracker_sparql_update_array_async () +{ + GMainLoop *main_loop; + AsyncData *data; + + main_loop = g_main_loop_new (NULL, FALSE); + + data = g_slice_new (AsyncData); + data->main_loop = main_loop; + + /* Cast here is because vala doesn't make const-char-** possible :( */ + tracker_sparql_connection_update_array_async (connection, + (char**) queries, MSIZE, + 0, NULL, + async_update_array_callback, + data); + + g_main_loop_run (main_loop); + + g_slice_free (AsyncData, data); + g_main_loop_unref (main_loop); + +} + +static void +async_update_callback (GObject *source_object, + GAsyncResult *result, + gpointer user_data) +{ + AsyncData *data = user_data; + GError *error = NULL; + + data->cur++; + + tracker_sparql_connection_update_finish (connection, result, &error); + if (error) + g_error_free (error); + + if (data->cur == data->len) + g_main_loop_quit (data->main_loop); +} + +static void +test_tracker_sparql_update_async () +{ + guint i; + GMainLoop *main_loop; + AsyncData *data; + + main_loop = g_main_loop_new (NULL, FALSE); + + data = g_slice_new (AsyncData); + data->len = MSIZE; + data->main_loop = main_loop; + data->cur = 0; + + for (i = 0; i < data->len; i++) { + tracker_sparql_connection_update_async (connection, + queries[i], + 0, NULL, + async_update_callback, + data); + } + + g_main_loop_run (main_loop); + + g_slice_free (AsyncData, data); + g_main_loop_unref (main_loop); + +} + + +gint +main (gint argc, gchar **argv) +{ + GTimer *array_t, *update_t; + + g_type_init (); + + /* do not require prior installation */ + g_setenv ("TRACKER_SPARQL_MODULE_PATH", "../../src/libtracker-bus/.libs", TRUE); + + connection = tracker_sparql_connection_get (NULL, NULL); + + g_print ("First run (first update then array)\n"); + + tracker_sparql_connection_update (connection, + "DELETE { ?r a rdfs:Resource } WHERE { ?r nie:title '" TEST_STR "' }", + 0, NULL, NULL); + + update_t = g_timer_new (); + test_tracker_sparql_update_async (); + g_timer_stop (update_t); + + tracker_sparql_connection_update (connection, + "DELETE { ?r a rdfs:Resource } WHERE { ?r nie:title '" TEST_STR "' }", + 0, NULL, NULL); + + array_t = g_timer_new (); + test_tracker_sparql_update_array_async (); + g_timer_stop (array_t); + + tracker_sparql_connection_update (connection, + "DELETE { ?r a rdfs:Resource } WHERE { ?r nie:title '" TEST_STR "' }", + 0, NULL, NULL); + + g_print ("Array: %f, Update: %f\n", g_timer_elapsed (array_t, NULL), g_timer_elapsed (update_t, NULL)); + + g_print ("Reversing run (first array then update)\n"); + + g_timer_destroy (array_t); + g_timer_destroy (update_t); + + array_t = g_timer_new (); + test_tracker_sparql_update_array_async (); + g_timer_stop (array_t); + + tracker_sparql_connection_update (connection, + "DELETE { ?r a rdfs:Resource } WHERE { ?r nie:title '" TEST_STR "' }", + 0, NULL, NULL); + + update_t = g_timer_new (); + test_tracker_sparql_update_async (); + g_timer_stop (update_t); + + tracker_sparql_connection_update (connection, + "DELETE { ?r a rdfs:Resource } WHERE { ?r nie:title '" TEST_STR "' }", + 0, NULL, NULL); + + g_print ("Array: %f, Update: %f\n", g_timer_elapsed (array_t, NULL), g_timer_elapsed (update_t, NULL)); + + g_timer_destroy (array_t); + g_timer_destroy (update_t); + g_object_unref (connection); + + return 0; +} diff --git a/tests/tracker-steroids/tracker-test.c b/tests/tracker-steroids/tracker-test.c index 7a2ddfb21..2285d8a50 100644 --- a/tests/tracker-steroids/tracker-test.c +++ b/tests/tracker-steroids/tracker-test.c @@ -38,11 +38,11 @@ insert_test_data () { GError *error = NULL; const char *delete_query = "DELETE { " - "<urn:testdata1> a rdfs:Resource ." - "<urn:testdata2> a rdfs:Resource ." - "<urn:testdata3> a rdfs:Resource ." - "<urn:testdata4> a rdfs:Resource ." - "}"; + "<urn:testdata1> a rdfs:Resource ." + "<urn:testdata2> a rdfs:Resource ." + "<urn:testdata3> a rdfs:Resource ." + "<urn:testdata4> a rdfs:Resource ." + "}"; char *longName = g_malloc (LONG_NAME_SIZE); char *filled_query; @@ -295,6 +295,69 @@ test_tracker_sparql_update_fast_large () } static void +async_update_array_callback (GObject *source_object, + GAsyncResult *result, + gpointer user_data) +{ + GError *error = NULL; + AsyncData *data = user_data; + GPtrArray *errors; + + errors = tracker_sparql_connection_update_array_finish (connection, result, &error); + + /* main error is only set on fatal (D-Bus) errors that apply to the whole update */ + g_assert_no_error (error); + + g_assert (errors->len == 6); + + g_assert (g_ptr_array_index (errors, 0) == NULL); + g_assert (g_ptr_array_index (errors, 1) == NULL); + g_assert (g_ptr_array_index (errors, 2) == NULL); + g_assert (g_ptr_array_index (errors, 3) != NULL); + g_assert (g_ptr_array_index (errors, 4) == NULL); + g_assert (g_ptr_array_index (errors, 5) == NULL); + + g_ptr_array_unref (errors); + + g_main_loop_quit (data->main_loop); +} + + +static void +test_tracker_sparql_update_array_async (void) +{ + const gchar *queries[6] = { "INSERT { _:a a nmo:Message }", + "INSERT { _:b a nmo:Message }", + "INSERT { _:c a nmo:Message }", + "INSERT { _:d syntax error a nmo:Message }", + "INSERT { _:e a nmo:Message }", + "INSERT { _:f a nmo:Message }" }; + + GMainLoop *main_loop; + AsyncData *data; + + main_loop = g_main_loop_new (NULL, FALSE); + + data = g_slice_new (AsyncData); + data->main_loop = main_loop; + + /* Cast here is because vala doesn't make const-char-** possible :( */ + tracker_sparql_connection_update_array_async (connection, + (char**) queries, + 6, + 0, + NULL, + async_update_array_callback, + data); + + g_main_loop_run (main_loop); + + g_slice_free (AsyncData, data); + g_main_loop_unref (main_loop); + +} + +static void test_tracker_sparql_update_fast_error () { GError *error = NULL; @@ -347,7 +410,7 @@ test_tracker_sparql_update_blank_fast_large () } static void -test_tracker_sparql_update_blank_fast_error () +test_tracker_sparql_update_blank_fast_error (void) { GError *error = NULL; const gchar *query = "blork blork blork"; @@ -362,7 +425,7 @@ test_tracker_sparql_update_blank_fast_error () } static void -test_tracker_sparql_update_blank_fast_no_blanks () +test_tracker_sparql_update_blank_fast_no_blanks (void) { GError *error = NULL; const gchar *query = "INSERT { <urn:not_blank> a nmo:Message }"; @@ -377,7 +440,7 @@ test_tracker_sparql_update_blank_fast_no_blanks () } static void -test_tracker_batch_sparql_update_fast () +test_tracker_batch_sparql_update_fast (void) { /* GError *error = NULL; */ /* const gchar *query = "INSERT { _:x a nmo:Message }"; */ @@ -412,7 +475,8 @@ async_query_cb (GObject *source_object, g_assert_no_error (error); g_assert (cursor_glib != NULL); - while (tracker_sparql_cursor_next (cursor_fd, NULL, NULL) && tracker_sparql_cursor_next (cursor_glib, NULL, NULL)) { + while (tracker_sparql_cursor_next (cursor_fd, NULL, NULL) && + tracker_sparql_cursor_next (cursor_glib, NULL, NULL)) { g_assert_cmpstr (tracker_sparql_cursor_get_string (cursor_fd, 0, NULL), ==, tracker_sparql_cursor_get_string (cursor_glib, 0, NULL)); @@ -426,7 +490,7 @@ async_query_cb (GObject *source_object, } static void -test_tracker_sparql_query_iterate_async () +test_tracker_sparql_query_iterate_async (void) { const gchar *query = "SELECT ?r nie:url(?r) WHERE {?r a nfo:FileDataObject}"; GMainLoop *main_loop; @@ -467,7 +531,7 @@ cancel_query_cb (GObject *source_object, } static void -test_tracker_sparql_query_iterate_async_cancel () +test_tracker_sparql_query_iterate_async_cancel (void) { const gchar *query = "SELECT ?r nie:url(?r) WHERE {?r a nfo:FileDataObject}"; GMainLoop *main_loop; @@ -504,7 +568,7 @@ async_update_callback (GObject *source_object, } static void -test_tracker_sparql_update_async () +test_tracker_sparql_update_async (void) { const gchar *query = "INSERT { _:x a nmo:Message }"; GMainLoop *main_loop; @@ -545,7 +609,7 @@ cancel_update_cb (GObject *source_object, } static void -test_tracker_sparql_update_async_cancel () +test_tracker_sparql_update_async_cancel (void) { GCancellable *cancellable = g_cancellable_new (); const gchar *query = "INSERT { _:x a nmo:Message }"; @@ -584,7 +648,7 @@ async_update_blank_callback (GObject *source_object, } static void -test_tracker_sparql_update_blank_async () +test_tracker_sparql_update_blank_async (void) { const gchar *query = "INSERT { _:x a nmo:Message }"; GMainLoop *main_loop; @@ -639,6 +703,7 @@ main (gint argc, gchar **argv) g_test_add_func ("/steroids/tracker/tracker_sparql_update_async", test_tracker_sparql_update_async); g_test_add_func ("/steroids/tracker/tracker_sparql_update_async_cancel", test_tracker_sparql_update_async_cancel); g_test_add_func ("/steroids/tracker/tracker_sparql_update_blank_async", test_tracker_sparql_update_blank_async); + g_test_add_func ("/steroids/tracker/tracker_sparql_update_array_async", test_tracker_sparql_update_array_async); return g_test_run (); } |