diff options
Diffstat (limited to 'src/libtracker-sparql/bus/tracker-bus.vala')
-rw-r--r-- | src/libtracker-sparql/bus/tracker-bus.vala | 295 |
1 files changed, 295 insertions, 0 deletions
diff --git a/src/libtracker-sparql/bus/tracker-bus.vala b/src/libtracker-sparql/bus/tracker-bus.vala new file mode 100644 index 000000000..72069873f --- /dev/null +++ b/src/libtracker-sparql/bus/tracker-bus.vala @@ -0,0 +1,295 @@ +/* + * Copyright (C) 2010, Nokia <ivan.frade@nokia.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +public class Tracker.Bus.Connection : Tracker.Sparql.Connection { + DBusConnection bus; + string dbus_name; + string object_path; + + private const string ENDPOINT_IFACE = "org.freedesktop.Tracker3.Endpoint"; + + public string bus_name { + get { return dbus_name; } + } + + public string bus_object_path { + get { return object_path; } + } + + public Connection (string dbus_name, string object_path, DBusConnection? dbus_connection) throws Sparql.Error, IOError, DBusError, GLib.Error { + Object (); + this.dbus_name = dbus_name; + this.bus = dbus_connection; + this.object_path = object_path; + + // ensure that error domain is registered with GDBus + new Sparql.Error.INTERNAL (""); + } + + static void pipe (out UnixInputStream input, out UnixOutputStream output) throws IOError { + int pipefd[2]; + if (Posix.pipe (pipefd) < 0) { + throw new IOError.FAILED ("Pipe creation failed"); + } + input = new UnixInputStream (pipefd[0], true); + output = new UnixOutputStream (pipefd[1], true); + } + + static void handle_error_reply (DBusMessage message) throws Sparql.Error, IOError, DBusError { + try { + message.to_gerror (); + } catch (IOError e_io) { + throw e_io; + } catch (Sparql.Error e_sparql) { + throw e_sparql; + } catch (DBusError e_dbus) { + throw e_dbus; + } catch (Error e) { + throw new IOError.FAILED (e.message); + } + } + + static void send_query (DBusConnection bus, string dbus_name, string object_path, string sparql, VariantBuilder? arguments, UnixOutputStream output, Cancellable? cancellable, AsyncReadyCallback? callback) throws GLib.IOError, GLib.Error { + var message = new DBusMessage.method_call (dbus_name, object_path, ENDPOINT_IFACE, "Query"); + var fd_list = new UnixFDList (); + message.set_body (new Variant ("(sha{sv})", sparql, fd_list.append (output.fd), arguments)); + message.set_unix_fd_list (fd_list); + + bus.send_message_with_reply.begin (message, DBusSendMessageFlags.NONE, int.MAX, null, cancellable, callback); + } + + public static async Sparql.Cursor perform_query_call (DBusConnection bus, string dbus_name, string object_path, string sparql, VariantBuilder? arguments, Cancellable? cancellable) throws GLib.IOError, GLib.Error { + UnixInputStream input; + UnixOutputStream output; + pipe (out input, out output); + + // send D-Bus request + AsyncResult dbus_res = null; + bool received_result = false; + send_query (bus, dbus_name, object_path, sparql, arguments, output, cancellable, (o, res) => { + dbus_res = res; + if (received_result) { + perform_query_call.callback (); + } + }); + + output = null; + + // receive query results via FD + var mem_stream = new MemoryOutputStream (null, GLib.realloc, GLib.free); + + try { + yield mem_stream.splice_async (input, OutputStreamSpliceFlags.CLOSE_SOURCE | OutputStreamSpliceFlags.CLOSE_TARGET, Priority.DEFAULT, cancellable); + } finally { + // wait for D-Bus reply + received_result = true; + if (dbus_res == null) { + yield; + } + } + + var reply = bus.send_message_with_reply.end (dbus_res); + handle_error_reply (reply); + + string[] variable_names = (string[]) reply.get_body ().get_child_value (0); + mem_stream.close (); + return new FDCursor (mem_stream.steal_data (), mem_stream.data_size, variable_names); + } + + public override Sparql.Cursor query (string sparql, Cancellable? cancellable) throws Sparql.Error, GLib.Error, GLib.IOError, DBusError { + // use separate main context for sync operation + var context = new MainContext (); + var loop = new MainLoop (context, false); + context.push_thread_default (); + AsyncResult async_res = null; + query_async.begin (sparql, cancellable, (o, res) => { + async_res = res; + loop.quit (); + }); + loop.run (); + context.pop_thread_default (); + return query_async.end (async_res); + } + + public async override Sparql.Cursor query_async (string sparql, Cancellable? cancellable = null) throws Sparql.Error, GLib.Error, GLib.IOError, DBusError { + return yield perform_query_call (bus, dbus_name, object_path, sparql, null, cancellable); + } + + public override Sparql.Statement? query_statement (string sparql, GLib.Cancellable? cancellable = null) throws Sparql.Error { + return new Bus.Statement (bus, dbus_name, object_path, sparql); + } + + void send_update (string method, UnixInputStream input, Cancellable? cancellable, AsyncReadyCallback? callback) throws GLib.Error, GLib.IOError { + var message = new DBusMessage.method_call (dbus_name, object_path, ENDPOINT_IFACE, method); + var fd_list = new UnixFDList (); + message.set_body (new Variant ("(h)", fd_list.append (input.fd))); + message.set_unix_fd_list (fd_list); + + bus.send_message_with_reply.begin (message, DBusSendMessageFlags.NONE, int.MAX, null, cancellable, callback); + } + + public override void update (string sparql, Cancellable? cancellable = null) throws Sparql.Error, GLib.Error, GLib.IOError, DBusError { + // use separate main context for sync operation + var context = new MainContext (); + var loop = new MainLoop (context, false); + context.push_thread_default (); + AsyncResult async_res = null; + update_async.begin (sparql, cancellable, (o, res) => { + async_res = res; + loop.quit (); + }); + loop.run (); + context.pop_thread_default (); + update_async.end (async_res); + } + + public async override void update_async (string sparql, Cancellable? cancellable = null) throws Sparql.Error, GLib.Error, GLib.IOError, DBusError { + UnixInputStream input; + UnixOutputStream output; + pipe (out input, out output); + + // send D-Bus request + AsyncResult dbus_res = null; + bool sent_update = false; + send_update ("Update", input, cancellable, (o, res) => { + dbus_res = res; + if (sent_update) { + update_async.callback (); + } + }); + + // send sparql string via fd + var data_stream = new DataOutputStream (output); + data_stream.set_byte_order (DataStreamByteOrder.HOST_ENDIAN); + data_stream.put_int32 ((int32) sparql.length); + data_stream.put_string (sparql); + data_stream = null; + + // wait for D-Bus reply + sent_update = true; + if (dbus_res == null) { + yield; + } + + var reply = bus.send_message_with_reply.end (dbus_res); + handle_error_reply (reply); + } + + public async override bool update_array_async (string[] sparql, Cancellable? cancellable = null) throws Sparql.Error, GLib.Error, GLib.IOError, DBusError { + UnixInputStream input; + UnixOutputStream output; + pipe (out input, out output); + + // send D-Bus request + AsyncResult dbus_res = null; + bool sent_update = false; + send_update ("UpdateArray", input, cancellable, (o, res) => { + dbus_res = res; + if (sent_update) { + update_array_async.callback (); + } + }); + + // send sparql strings via fd + var data_stream = new DataOutputStream (output); + data_stream.set_byte_order (DataStreamByteOrder.HOST_ENDIAN); + data_stream.put_int32 ((int32) sparql.length); + for (int i = 0; i < sparql.length; i++) { + data_stream.put_int32 ((int32) sparql[i].length); + data_stream.put_string (sparql[i]); + } + data_stream = null; + + // wait for D-Bus reply + sent_update = true; + if (dbus_res == null) { + yield; + } + + var reply = bus.send_message_with_reply.end (dbus_res); + handle_error_reply (reply); + + return true; + } + + public override GLib.Variant? update_blank (string sparql, Cancellable? cancellable = null) throws Sparql.Error, GLib.Error, GLib.IOError, DBusError { + // use separate main context for sync operation + var context = new MainContext (); + var loop = new MainLoop (context, false); + context.push_thread_default (); + AsyncResult async_res = null; + update_blank_async.begin (sparql, cancellable, (o, res) => { + async_res = res; + loop.quit (); + }); + loop.run (); + context.pop_thread_default (); + return update_blank_async.end (async_res); + } + + public async override GLib.Variant? update_blank_async (string sparql, Cancellable? cancellable = null) throws Sparql.Error, GLib.Error, GLib.IOError, DBusError { + UnixInputStream input; + UnixOutputStream output; + pipe (out input, out output); + + // send D-Bus request + AsyncResult dbus_res = null; + bool sent_update = false; + send_update ("UpdateBlank", input, cancellable, (o, res) => { + dbus_res = res; + if (sent_update) { + update_blank_async.callback (); + } + }); + + // send sparql strings via fd + var data_stream = new DataOutputStream (output); + data_stream.set_byte_order (DataStreamByteOrder.HOST_ENDIAN); + data_stream.put_int32 ((int32) sparql.length); + data_stream.put_string (sparql); + data_stream = null; + + // wait for D-Bus reply + sent_update = true; + if (dbus_res == null) { + yield; + } + + var reply = bus.send_message_with_reply.end (dbus_res); + handle_error_reply (reply); + return reply.get_body ().get_child_value (0); + } + + public override Tracker.Notifier? create_notifier () { + var notifier = (Tracker.Notifier) Object.new (typeof (Tracker.Notifier), + "connection", this, + null); + + notifier.signal_subscribe (this.bus, this.dbus_name, null, null); + + return notifier; + } + + public override void close () { + } + + public async override bool close_async () throws GLib.IOError { + return true; + } +} |