diff options
Diffstat (limited to 'daemon')
-rw-r--r-- | daemon/Makefile.am | 3 | ||||
-rw-r--r-- | daemon/gvfsbackendtest.c | 24 | ||||
-rw-r--r-- | daemon/gvfsdaemon.c | 70 | ||||
-rw-r--r-- | daemon/gvfsjob.c | 73 | ||||
-rw-r--r-- | daemon/gvfsjob.h | 3 | ||||
-rw-r--r-- | daemon/gvfsjobcloseread.c | 18 | ||||
-rw-r--r-- | daemon/gvfsjobcloseread.h | 6 | ||||
-rw-r--r-- | daemon/gvfsjobopenforread.c | 83 | ||||
-rw-r--r-- | daemon/gvfsjobopenforread.h | 29 | ||||
-rw-r--r-- | daemon/gvfsjobread.c | 22 | ||||
-rw-r--r-- | daemon/gvfsjobread.h | 6 | ||||
-rw-r--r-- | daemon/gvfsjobseekread.c | 20 | ||||
-rw-r--r-- | daemon/gvfsjobseekread.h | 6 | ||||
-rw-r--r-- | daemon/gvfsreadchannel.c | 552 | ||||
-rw-r--r-- | daemon/gvfsreadchannel.h | 63 | ||||
-rw-r--r-- | daemon/gvfsreadstream.c | 552 | ||||
-rw-r--r-- | daemon/gvfsreadstream.h | 63 |
17 files changed, 846 insertions, 747 deletions
diff --git a/daemon/Makefile.am b/daemon/Makefile.am index 51748e5a..58c85066 100644 --- a/daemon/Makefile.am +++ b/daemon/Makefile.am @@ -19,9 +19,10 @@ gvfs_daemon_SOURCES = \ gvfsdaemon.c gvfsdaemon.h \ gvfsbackend.c gvfsbackend.h \ gvfsbackendtest.c gvfsbackendtest.h \ - gvfsreadstream.c gvfsreadstream.h \ + gvfsreadchannel.c gvfsreadchannel.h \ gvfsdaemonutils.c gvfsdaemonutils.h \ gvfsjob.c gvfsjob.h \ + gvfsjobdbus.c gvfsjobdbus.h \ gvfsjobopenforread.c gvfsjobopenforread.h \ gvfsjobread.c gvfsjobread.h \ gvfsjobseekread.c gvfsjobseekread.h \ diff --git a/daemon/gvfsbackendtest.c b/daemon/gvfsbackendtest.c index 92b6ffc6..3ce5a6c9 100644 --- a/daemon/gvfsbackendtest.c +++ b/daemon/gvfsbackendtest.c @@ -50,6 +50,14 @@ open_idle_cb (gpointer data) GVfsJobOpenForRead *job = data; int fd; + if (g_vfs_job_is_cancelled (G_VFS_JOB (job))) + { + g_vfs_job_failed (G_VFS_JOB (job), G_VFS_ERROR, + G_VFS_ERROR_CANCELLED, + _("Operation was cancelled")); + return FALSE; + } + fd = g_open (job->filename, O_RDONLY); if (fd == -1) { @@ -67,6 +75,19 @@ open_idle_cb (gpointer data) return FALSE; } +static void +open_read_cancelled_cb (GVfsJob *job, gpointer data) +{ + guint tag = GPOINTER_TO_INT (data); + + g_print ("open_read_cancelled_cb\n"); + + if (g_source_remove (tag)) + g_vfs_job_failed (job, G_VFS_ERROR, + G_VFS_ERROR_CANCELLED, + _("Operation was cancelled")); +} + static gboolean do_open_for_read (GVfsBackend *backend, GVfsJobOpenForRead *job, @@ -84,7 +105,8 @@ do_open_for_read (GVfsBackend *backend, } else { - g_idle_add (open_idle_cb, job); + guint tag = g_timeout_add (0, open_idle_cb, job); + g_signal_connect (job, "cancelled", (GCallback)open_read_cancelled_cb, GINT_TO_POINTER (tag)); return TRUE; } } diff --git a/daemon/gvfsdaemon.c b/daemon/gvfsdaemon.c index b5ba0ead..2faa364e 100644 --- a/daemon/gvfsdaemon.c +++ b/daemon/gvfsdaemon.c @@ -29,7 +29,7 @@ struct _GVfsDaemonPrivate GQueue *pending_jobs; GQueue *jobs; /* protected by lock */ guint queued_job_start; /* protected by lock */ - GList *read_streams; /* protected by lock */ + GList *read_channels; /* protected by lock */ }; typedef struct { @@ -236,7 +236,7 @@ queue_start_jobs_at_idle (GVfsDaemon *daemon) } static void -handle_new_job_callback (GVfsReadStream *stream, +handle_new_job_callback (GVfsReadChannel *channel, GVfsJob *job, GVfsDaemon *daemon) { @@ -246,14 +246,14 @@ handle_new_job_callback (GVfsReadStream *stream, } static void -handle_read_stream_closed_callback (GVfsReadStream *stream, - GVfsDaemon *daemon) +handle_read_channel_closed_callback (GVfsReadChannel *channel, + GVfsDaemon *daemon) { g_mutex_lock (daemon->priv->lock); - daemon->priv->read_streams = g_list_remove (daemon->priv->read_streams, stream); - g_signal_handlers_disconnect_by_func (stream, (GCallback)handle_new_job_callback, daemon); - g_object_unref (stream); + daemon->priv->read_channels = g_list_remove (daemon->priv->read_channels, channel); + g_signal_handlers_disconnect_by_func (channel, (GCallback)handle_new_job_callback, daemon); + g_object_unref (channel); g_mutex_unlock (daemon->priv->lock); } @@ -272,17 +272,17 @@ job_finished_callback (GVfsJob *job, if (G_IS_VFS_JOB_OPEN_FOR_READ (job)) { GVfsJobOpenForRead *open_job = G_VFS_JOB_OPEN_FOR_READ (job); - GVfsReadStream *stream; + GVfsReadChannel *channel; - stream = g_vfs_job_open_for_read_steal_stream (open_job); + channel = g_vfs_job_open_for_read_steal_channel (open_job); - if (stream) + if (channel) { - g_print ("Got new read stream %p for daemon %p\n", stream, daemon); - daemon->priv->read_streams = g_list_append (daemon->priv->read_streams, - stream); - g_signal_connect (stream, "new_job", (GCallback)handle_new_job_callback, daemon); - g_signal_connect (stream, "closed", (GCallback)handle_read_stream_closed_callback, daemon); + g_print ("Got new read channel %p for daemon %p\n", channel, daemon); + daemon->priv->read_channels = g_list_append (daemon->priv->read_channels, + channel); + g_signal_connect (channel, "new_job", (GCallback)handle_new_job_callback, daemon); + g_signal_connect (channel, "closed", (GCallback)handle_read_channel_closed_callback, daemon); } } @@ -741,6 +741,46 @@ daemon_message_func (DBusConnection *conn, return DBUS_HANDLER_RESULT_HANDLED; } + if (dbus_message_is_method_call (message, + G_VFS_DBUS_DAEMON_INTERFACE, + G_VFS_DBUS_OP_CANCEL)) + { + GList *l; + dbus_uint32_t serial; + GVfsJob *job_to_cancel = NULL; + + g_print ("Got cancel dbus call\n"); + + if (dbus_message_get_args (message, NULL, + DBUS_TYPE_UINT32, &serial, + DBUS_TYPE_INVALID)) + { + g_mutex_lock (daemon->priv->lock); + for (l = daemon->priv->jobs->head; l != NULL; l = l->next) + { + GVfsJob *job = l->data; + + if (G_IS_VFS_JOB_DBUS (job) && + g_vfs_job_dbus_is_serial (G_VFS_JOB_DBUS (job), + conn, serial)) + { + job_to_cancel = g_object_ref (job); + break; + } + } + g_mutex_unlock (daemon->priv->lock); + + + if (job_to_cancel) + { + g_vfs_job_cancel (job_to_cancel); + g_object_unref (job_to_cancel); + } + } + + return DBUS_HANDLER_RESULT_HANDLED; + } + backend = NULL; dest = dbus_message_get_destination (message); if (dest != NULL) diff --git a/daemon/gvfsjob.c b/daemon/gvfsjob.c index 00d7d07e..6207773b 100644 --- a/daemon/gvfsjob.c +++ b/daemon/gvfsjob.c @@ -9,9 +9,18 @@ #include <dbus/dbus.h> #include <glib/gi18n.h> #include "gvfsjob.h" +#include "gvfsbackend.h" G_DEFINE_TYPE (GVfsJob, g_vfs_job, G_TYPE_OBJECT); +/* TODO: Real P_() */ +#define P_(_x) (_x) + +enum { + PROP_0, + PROP_BACKEND, +}; + enum { CANCELLED, SEND_REPLY, @@ -21,6 +30,15 @@ enum { static guint signals[LAST_SIGNAL] = { 0 }; +static void g_vfs_job_get_property (GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec); +static void g_vfs_job_set_property (GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec); + static void g_vfs_job_finalize (GObject *object) { @@ -41,6 +59,8 @@ g_vfs_job_class_init (GVfsJobClass *klass) GObjectClass *gobject_class = G_OBJECT_CLASS (klass); gobject_class->finalize = g_vfs_job_finalize; + gobject_class->set_property = g_vfs_job_set_property; + gobject_class->get_property = g_vfs_job_get_property; signals[CANCELLED] = g_signal_new ("cancelled", @@ -67,6 +87,14 @@ g_vfs_job_class_init (GVfsJobClass *klass) g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); + g_object_class_install_property (gobject_class, + PROP_BACKEND, + g_param_spec_object ("backend", + P_("VFS Backend"), + P_("The implementation for this job operartion."), + G_TYPE_VFS_BACKEND, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | + G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB)); } static void @@ -74,11 +102,42 @@ g_vfs_job_init (GVfsJob *job) { } -void -g_vfs_job_set_backend (GVfsJob *job, - GVfsBackend *backend) +static void +g_vfs_job_set_property (GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + GVfsJob *job = G_VFS_JOB (object); + + switch (prop_id) + { + case PROP_BACKEND: + job->backend = g_value_get_object (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +g_vfs_job_get_property (GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec) { - job->backend = backend; + GVfsJob *job = G_VFS_JOB (object); + + switch (prop_id) + { + case PROP_BACKEND: + g_value_set_object (value, job->backend); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } } gboolean @@ -158,6 +217,12 @@ g_vfs_job_is_finished (GVfsJob *job) return job->finished; } +gboolean +g_vfs_job_is_cancelled (GVfsJob *job) +{ + return job->cancelled; +} + /* Might be called on an i/o thread */ void g_vfs_job_emit_finished (GVfsJob *job) diff --git a/daemon/gvfsjob.h b/daemon/gvfsjob.h index 8d63a95f..b9cc4702 100644 --- a/daemon/gvfsjob.h +++ b/daemon/gvfsjob.h @@ -52,6 +52,7 @@ struct _GVfsJobClass GType g_vfs_job_get_type (void) G_GNUC_CONST; gboolean g_vfs_job_is_finished (GVfsJob *job); +gboolean g_vfs_job_is_cancelled (GVfsJob *job); void g_vfs_job_cancel (GVfsJob *job); gboolean g_vfs_job_start (GVfsJob *job); void g_vfs_job_emit_finished (GVfsJob *job); @@ -63,8 +64,6 @@ void g_vfs_job_failed (GVfsJob *job, void g_vfs_job_failed_from_error (GVfsJob *job, GError *error); void g_vfs_job_succeeded (GVfsJob *job); -void g_vfs_job_set_backend (GVfsJob *job, - GVfsBackend *backend); G_END_DECLS diff --git a/daemon/gvfsjobcloseread.c b/daemon/gvfsjobcloseread.c index 19f600ef..28aeddc2 100644 --- a/daemon/gvfsjobcloseread.c +++ b/daemon/gvfsjobcloseread.c @@ -7,7 +7,7 @@ #include <glib.h> #include <glib/gi18n.h> -#include "gvfsreadstream.h" +#include "gvfsreadchannel.h" #include "gvfsjobcloseread.h" #include "gvfsdaemonutils.h" @@ -22,7 +22,7 @@ g_vfs_job_close_read_finalize (GObject *object) GVfsJobCloseRead *job; job = G_VFS_JOB_CLOSE_READ (object); - g_object_unref (job->stream); + g_object_unref (job->channel); if (G_OBJECT_CLASS (g_vfs_job_close_read_parent_class)->finalize) (*G_OBJECT_CLASS (g_vfs_job_close_read_parent_class)->finalize) (object); @@ -46,17 +46,17 @@ g_vfs_job_close_read_init (GVfsJobCloseRead *job) } GVfsJob * -g_vfs_job_close_read_new (GVfsReadStream *stream, +g_vfs_job_close_read_new (GVfsReadChannel *channel, GVfsBackendHandle handle, GVfsBackend *backend) { GVfsJobCloseRead *job; - job = g_object_new (G_TYPE_VFS_JOB_CLOSE_READ, NULL); + job = g_object_new (G_TYPE_VFS_JOB_CLOSE_READ, + "backend", backend, + NULL); - g_vfs_job_set_backend (G_VFS_JOB (job), backend); - - job->stream = g_object_ref (stream); + job->channel = g_object_ref (channel); job->handle = handle; return G_VFS_JOB (job); @@ -71,9 +71,9 @@ send_reply (GVfsJob *job) g_print ("job_close_read send reply\n"); if (job->failed) - g_vfs_read_stream_send_error (op_job->stream, job->error); + g_vfs_read_channel_send_error (op_job->channel, job->error); else - g_vfs_read_stream_send_closed (op_job->stream); + g_vfs_read_channel_send_closed (op_job->channel); } static gboolean diff --git a/daemon/gvfsjobcloseread.h b/daemon/gvfsjobcloseread.h index c64be03b..5333c006 100644 --- a/daemon/gvfsjobcloseread.h +++ b/daemon/gvfsjobcloseread.h @@ -3,7 +3,7 @@ #include <gvfsjob.h> #include <gvfsbackend.h> -#include <gvfsreadstream.h> +#include <gvfsreadchannel.h> G_BEGIN_DECLS @@ -21,7 +21,7 @@ struct _GVfsJobCloseRead { GVfsJob parent_instance; - GVfsReadStream *stream; + GVfsReadChannel *channel; GVfsBackendHandle handle; }; @@ -32,7 +32,7 @@ struct _GVfsJobCloseReadClass GType g_vfs_job_close_read_get_type (void) G_GNUC_CONST; -GVfsJob *g_vfs_job_close_read_new (GVfsReadStream *stream, +GVfsJob *g_vfs_job_close_read_new (GVfsReadChannel *channel, GVfsBackendHandle handle, GVfsBackend *backend); diff --git a/daemon/gvfsjobopenforread.c b/daemon/gvfsjobopenforread.c index 9a9c16ca..6eec8d9f 100644 --- a/daemon/gvfsjobopenforread.c +++ b/daemon/gvfsjobopenforread.c @@ -8,14 +8,16 @@ #include <glib.h> #include <dbus/dbus.h> #include <glib/gi18n.h> -#include "gvfsreadstream.h" +#include "gvfsreadchannel.h" #include "gvfsjobopenforread.h" #include "gvfsdaemonutils.h" -G_DEFINE_TYPE (GVfsJobOpenForRead, g_vfs_job_open_for_read, G_TYPE_VFS_JOB); +G_DEFINE_TYPE (GVfsJobOpenForRead, g_vfs_job_open_for_read, G_TYPE_VFS_JOB_DBUS); -static gboolean start (GVfsJob *job); -static void send_reply (GVfsJob *job); +static gboolean start (GVfsJob *job); +static DBusMessage *create_reply (GVfsJob *job, + DBusConnection *connection, + DBusMessage *message); static void g_vfs_job_open_for_read_finalize (GObject *object) @@ -24,16 +26,10 @@ g_vfs_job_open_for_read_finalize (GObject *object) job = G_VFS_JOB_OPEN_FOR_READ (object); - if (job->message) - dbus_message_unref (job->message); + /* TODO: manage backend_handle if not put in read channel */ - if (job->connection) - dbus_connection_unref (job->connection); - - /* TODO: manage backend_handle if not put in readstream */ - - if (job->read_stream) - g_object_unref (job->read_stream); + if (job->read_channel) + g_object_unref (job->read_channel); g_free (job->filename); @@ -46,11 +42,11 @@ g_vfs_job_open_for_read_class_init (GVfsJobOpenForReadClass *klass) { GObjectClass *gobject_class = G_OBJECT_CLASS (klass); GVfsJobClass *job_class = G_VFS_JOB_CLASS (klass); + GVfsJobDBusClass *job_dbus_class = G_VFS_JOB_DBUS_CLASS (klass); gobject_class->finalize = g_vfs_job_open_for_read_finalize; - job_class->start = start; - job_class->send_reply = send_reply; + job_dbus_class->create_reply = create_reply; } static void @@ -84,12 +80,12 @@ g_vfs_job_open_for_read_new (DBusConnection *connection, return NULL; } - job = g_object_new (G_TYPE_VFS_JOB_OPEN_FOR_READ, NULL); + job = g_object_new (G_TYPE_VFS_JOB_OPEN_FOR_READ, + "backend", backend, + "message", message, + "connection", connection, + NULL); - g_vfs_job_set_backend (G_VFS_JOB (job), backend); - - job->connection = dbus_connection_ref (connection); - job->message = dbus_message_ref (message); job->filename = g_strndup (path_data, path_len); return G_VFS_JOB (job); @@ -126,7 +122,7 @@ create_reply (GVfsJob *job, DBusMessage *message) { GVfsJobOpenForRead *open_job = G_VFS_JOB_OPEN_FOR_READ (job); - GVfsReadStream *stream; + GVfsReadChannel *channel; DBusMessage *reply; GError *error; int remote_fd; @@ -137,15 +133,15 @@ create_reply (GVfsJob *job, g_assert (open_job->backend_handle != NULL); error = NULL; - stream = g_vfs_read_stream_new (job->backend, &error); - if (stream == NULL) + channel = g_vfs_read_channel_new (job->backend, &error); + if (channel == NULL) { reply = dbus_message_new_error_from_gerror (message, error); g_error_free (error); return reply; } - remote_fd = g_vfs_read_stream_steal_remote_fd (stream); + remote_fd = g_vfs_read_channel_steal_remote_fd (channel); if (!dbus_connection_send_fd (connection, remote_fd, &fd_id, &error)) @@ -153,7 +149,7 @@ create_reply (GVfsJob *job, close (remote_fd); reply = dbus_message_new_error_from_gerror (message, error); g_error_free (error); - g_object_unref (stream); + g_object_unref (channel); return reply; } close (remote_fd); @@ -165,42 +161,21 @@ create_reply (GVfsJob *job, DBUS_TYPE_BOOLEAN, &can_seek, DBUS_TYPE_INVALID); - g_vfs_read_stream_set_backend_handle (stream, open_job->backend_handle); + g_vfs_read_channel_set_backend_handle (channel, open_job->backend_handle); open_job->backend_handle = NULL; - open_job->read_stream = stream; + open_job->read_channel = channel; return reply; } -/* Might be called on an i/o thread */ -static void -send_reply (GVfsJob *job) -{ - GVfsJobOpenForRead *open_job = G_VFS_JOB_OPEN_FOR_READ (job); - DBusMessage *reply; - - if (job->failed) - reply = dbus_message_new_error_from_gerror (open_job->message, job->error); - else - reply = create_reply (job, open_job->connection, open_job->message); - - g_assert (reply != NULL); - - /* Queues reply (threadsafely), actually sends it in mainloop */ - dbus_connection_send (open_job->connection, reply, NULL); - dbus_message_unref (reply); - - g_vfs_job_emit_finished (job); -} - -GVfsReadStream * -g_vfs_job_open_for_read_steal_stream (GVfsJobOpenForRead *job) +GVfsReadChannel * +g_vfs_job_open_for_read_steal_channel (GVfsJobOpenForRead *job) { - GVfsReadStream *stream; + GVfsReadChannel *channel; - stream = job->read_stream; - job->read_stream = NULL; + channel = job->read_channel; + job->read_channel = NULL; - return stream; + return channel; } diff --git a/daemon/gvfsjobopenforread.h b/daemon/gvfsjobopenforread.h index c000446e..b301bb5c 100644 --- a/daemon/gvfsjobopenforread.h +++ b/daemon/gvfsjobopenforread.h @@ -2,10 +2,9 @@ #define __G_VFS_JOB_OPEN_FOR_READ_H__ #include <dbus/dbus.h> -#include <gvfsjob.h> +#include <gvfsjobdbus.h> #include <gvfsbackend.h> -#include <gvfsreadstream.h> - +#include <gvfsreadchannel.h> G_BEGIN_DECLS @@ -21,31 +20,29 @@ typedef struct _GVfsJobOpenForReadClass GVfsJobOpenForReadClass; struct _GVfsJobOpenForRead { - GVfsJob parent_instance; + GVfsJobDBus parent_instance; - DBusConnection *connection; - DBusMessage *message; char *filename; GVfsBackendHandle backend_handle; gboolean can_seek; - GVfsReadStream *read_stream; + GVfsReadChannel *read_channel; }; struct _GVfsJobOpenForReadClass { - GVfsJobClass parent_class; + GVfsJobDBusClass parent_class; }; GType g_vfs_job_open_for_read_get_type (void) G_GNUC_CONST; -GVfsJob * g_vfs_job_open_for_read_new (DBusConnection *connection, - DBusMessage *message, - GVfsBackend *backend); -void g_vfs_job_open_for_read_set_handle (GVfsJobOpenForRead *job, - GVfsBackendHandle handle); -void g_vfs_job_open_for_read_set_can_seek (GVfsJobOpenForRead *job, - gboolean can_seek); -GVfsReadStream *g_vfs_job_open_for_read_steal_stream (GVfsJobOpenForRead *job); +GVfsJob * g_vfs_job_open_for_read_new (DBusConnection *connection, + DBusMessage *message, + GVfsBackend *backend); +void g_vfs_job_open_for_read_set_handle (GVfsJobOpenForRead *job, + GVfsBackendHandle handle); +void g_vfs_job_open_for_read_set_can_seek (GVfsJobOpenForRead *job, + gboolean can_seek); +GVfsReadChannel *g_vfs_job_open_for_read_steal_channel (GVfsJobOpenForRead *job); G_END_DECLS diff --git a/daemon/gvfsjobread.c b/daemon/gvfsjobread.c index 6b5fdcce..84cbe277 100644 --- a/daemon/gvfsjobread.c +++ b/daemon/gvfsjobread.c @@ -7,7 +7,7 @@ #include <glib.h> #include <glib/gi18n.h> -#include "gvfsreadstream.h" +#include "gvfsreadchannel.h" #include "gvfsjobread.h" #include "gvfsdaemonutils.h" @@ -23,7 +23,7 @@ g_vfs_job_read_finalize (GObject *object) job = G_VFS_JOB_READ (object); - g_object_unref (job->stream); + g_object_unref (job->channel); g_free (job->buffer); if (G_OBJECT_CLASS (g_vfs_job_read_parent_class)->finalize) @@ -48,18 +48,18 @@ g_vfs_job_read_init (GVfsJobRead *job) } GVfsJob * -g_vfs_job_read_new (GVfsReadStream *stream, +g_vfs_job_read_new (GVfsReadChannel *channel, GVfsBackendHandle handle, gsize bytes_requested, GVfsBackend *backend) { GVfsJobRead *job; - job = g_object_new (G_TYPE_VFS_JOB_READ, NULL); - - g_vfs_job_set_backend (G_VFS_JOB (job), backend); + job = g_object_new (G_TYPE_VFS_JOB_READ, + "backend", backend, + NULL); - job->stream = g_object_ref (stream); + job->channel = g_object_ref (channel); job->handle = handle; job->buffer = g_malloc (bytes_requested); job->bytes_requested = bytes_requested; @@ -75,12 +75,12 @@ send_reply (GVfsJob *job) g_print ("job_read send reply, %d bytes\n", op_job->data_count); if (job->failed) - g_vfs_read_stream_send_error (op_job->stream, job->error); + g_vfs_read_channel_send_error (op_job->channel, job->error); else { - g_vfs_read_stream_send_data (op_job->stream, - op_job->buffer, - op_job->data_count); + g_vfs_read_channel_send_data (op_job->channel, + op_job->buffer, + op_job->data_count); } } diff --git a/daemon/gvfsjobread.h b/daemon/gvfsjobread.h index 9e40b6b9..f7cda3e8 100644 --- a/daemon/gvfsjobread.h +++ b/daemon/gvfsjobread.h @@ -3,7 +3,7 @@ #include <gvfsjob.h> #include <gvfsbackend.h> -#include <gvfsreadstream.h> +#include <gvfsreadchannel.h> G_BEGIN_DECLS @@ -21,7 +21,7 @@ struct _GVfsJobRead { GVfsJob parent_instance; - GVfsReadStream *stream; + GVfsReadChannel *channel; GVfsBackendHandle handle; gsize bytes_requested; char *buffer; @@ -35,7 +35,7 @@ struct _GVfsJobReadClass GType g_vfs_job_read_get_type (void) G_GNUC_CONST; -GVfsJob *g_vfs_job_read_new (GVfsReadStream *stream, +GVfsJob *g_vfs_job_read_new (GVfsReadChannel *channel, GVfsBackendHandle handle, gsize bytes_requested, GVfsBackend *backend); diff --git a/daemon/gvfsjobseekread.c b/daemon/gvfsjobseekread.c index c0ee0876..d3678b31 100644 --- a/daemon/gvfsjobseekread.c +++ b/daemon/gvfsjobseekread.c @@ -7,7 +7,7 @@ #include <glib.h> #include <glib/gi18n.h> -#include "gvfsreadstream.h" +#include "gvfsreadchannel.h" #include "gvfsjobseekread.h" #include "gvfsdaemonutils.h" @@ -22,7 +22,7 @@ g_vfs_job_seek_read_finalize (GObject *object) GVfsJobSeekRead *job; job = G_VFS_JOB_SEEK_READ (object); - g_object_unref (job->stream); + g_object_unref (job->channel); if (G_OBJECT_CLASS (g_vfs_job_seek_read_parent_class)->finalize) (*G_OBJECT_CLASS (g_vfs_job_seek_read_parent_class)->finalize) (object); @@ -46,7 +46,7 @@ g_vfs_job_seek_read_init (GVfsJobSeekRead *job) } GVfsJob * -g_vfs_job_seek_read_new (GVfsReadStream *stream, +g_vfs_job_seek_read_new (GVfsReadChannel *channel, GVfsBackendHandle handle, GSeekType seek_type, goffset offset, @@ -54,11 +54,11 @@ g_vfs_job_seek_read_new (GVfsReadStream *stream, { GVfsJobSeekRead *job; - job = g_object_new (G_TYPE_VFS_JOB_SEEK_READ, NULL); - - g_vfs_job_set_backend (G_VFS_JOB (job), backend); + job = g_object_new (G_TYPE_VFS_JOB_SEEK_READ, + "backend", backend, + NULL); - job->stream = g_object_ref (stream); + job->channel = g_object_ref (channel); job->handle = handle; job->requested_offset = offset; job->seek_type = seek_type; @@ -75,11 +75,11 @@ send_reply (GVfsJob *job) g_print ("job_seek_read send reply, pos %d\n", (int)op_job->final_offset); if (job->failed) - g_vfs_read_stream_send_error (op_job->stream, job->error); + g_vfs_read_channel_send_error (op_job->channel, job->error); else { - g_vfs_read_stream_send_seek_offset (op_job->stream, - op_job->final_offset); + g_vfs_read_channel_send_seek_offset (op_job->channel, + op_job->final_offset); } } diff --git a/daemon/gvfsjobseekread.h b/daemon/gvfsjobseekread.h index 177c8769..be11be53 100644 --- a/daemon/gvfsjobseekread.h +++ b/daemon/gvfsjobseekread.h @@ -3,7 +3,7 @@ #include <gvfsjob.h> #include <gvfsbackend.h> -#include <gvfsreadstream.h> +#include <gvfsreadchannel.h> G_BEGIN_DECLS @@ -21,7 +21,7 @@ struct _GVfsJobSeekRead { GVfsJob parent_instance; - GVfsReadStream *stream; + GVfsReadChannel *channel; GVfsBackendHandle handle; GSeekType seek_type; goffset requested_offset; @@ -35,7 +35,7 @@ struct _GVfsJobSeekReadClass GType g_vfs_job_seek_read_get_type (void) G_GNUC_CONST; -GVfsJob *g_vfs_job_seek_read_new (GVfsReadStream *stream, +GVfsJob *g_vfs_job_seek_read_new (GVfsReadChannel *channel, GVfsBackendHandle handle, GSeekType seek_type, goffset offset, diff --git a/daemon/gvfsreadchannel.c b/daemon/gvfsreadchannel.c new file mode 100644 index 00000000..54a0d2c5 --- /dev/null +++ b/daemon/gvfsreadchannel.c @@ -0,0 +1,552 @@ +#include <config.h> + +#include <unistd.h> +#include <errno.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <unistd.h> +#include <fcntl.h> + +#include <glib.h> +#include <glib-object.h> +#include <glib/gi18n.h> +#include <dbus-gmain.h> +#include <gvfsreadchannel.h> +#include <gvfs/ginputstreamsocket.h> +#include <gvfs/goutputstreamsocket.h> +#include <gvfsdaemonprotocol.h> +#include <gvfsdaemonutils.h> +#include <gvfsjobread.h> +#include <gvfsjobseekread.h> +#include <gvfsjobcloseread.h> + +G_DEFINE_TYPE (GVfsReadChannel, g_vfs_read_channel, G_TYPE_OBJECT); + +enum { + PROP_0, +}; + +enum { + NEW_JOB, + CLOSED, + LAST_SIGNAL +}; + +static guint signals[LAST_SIGNAL] = { 0 }; + +typedef struct +{ + GVfsReadChannel *read_channel; + GInputStream *command_stream; + char buffer[G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE]; + int buffer_size; +} RequestReader; + +struct _GVfsReadChannelPrivate +{ + GVfsBackend *backend; + gboolean connection_closed; + GInputStream *command_stream; + GOutputStream *reply_stream; + int remote_fd; + int seek_generation; + + GVfsBackendHandle backend_handle; + GVfsJob *current_job; + guint32 current_job_seq_nr; + + RequestReader *request_reader; + + char reply_buffer[G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE]; + int reply_buffer_pos; + + char *output_data; /* Owned by job */ + gsize output_data_size; + gsize output_data_pos; +}; + +static void start_request_reader (GVfsReadChannel *channel); + +static void +g_vfs_read_channel_finalize (GObject *object) +{ + GVfsReadChannel *read_channel; + + read_channel = G_VFS_READ_CHANNEL (object); + + if (read_channel->priv->current_job) + g_object_unref (read_channel->priv->current_job); + read_channel->priv->current_job = NULL; + + if (read_channel->priv->reply_stream) + g_object_unref (read_channel->priv->reply_stream); + read_channel->priv->reply_stream = NULL; + + if (read_channel->priv->request_reader) + read_channel->priv->request_reader->read_channel = NULL; + read_channel->priv->request_reader = NULL; + + if (read_channel->priv->command_stream) + g_object_unref (read_channel->priv->command_stream); + read_channel->priv->command_stream = NULL; + + if (read_channel->priv->remote_fd != -1) + close (read_channel->priv->remote_fd); + + g_assert (read_channel->priv->backend_handle == NULL); + + if (G_OBJECT_CLASS (g_vfs_read_channel_parent_class)->finalize) + (*G_OBJECT_CLASS (g_vfs_read_channel_parent_class)->finalize) (object); +} + +static void +g_vfs_read_channel_class_init (GVfsReadChannelClass *klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + + g_type_class_add_private (klass, sizeof (GVfsReadChannelPrivate)); + + gobject_class->finalize = g_vfs_read_channel_finalize; + + signals[NEW_JOB] = + g_signal_new ("new_job", + G_TYPE_FROM_CLASS (gobject_class), + G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET (GVfsReadChannelClass, new_job), + NULL, NULL, + g_cclosure_marshal_VOID__POINTER, + G_TYPE_NONE, 1, G_TYPE_VFS_JOB); + + signals[CLOSED] = + g_signal_new ("closed", + G_TYPE_FROM_CLASS (gobject_class), + G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET (GVfsReadChannelClass, closed), + NULL, NULL, + g_cclosure_marshal_VOID__VOID, + G_TYPE_NONE, 0); + +} + +static void +g_vfs_read_channel_init (GVfsReadChannel *channel) +{ + channel->priv = G_TYPE_INSTANCE_GET_PRIVATE (channel, + G_TYPE_VFS_READ_CHANNEL, + GVfsReadChannelPrivate); + channel->priv->remote_fd = -1; +} + +static void +g_vfs_read_channel_connection_closed (GVfsReadChannel *channel) +{ + if (channel->priv->connection_closed) + return; + channel->priv->connection_closed = TRUE; + + if (channel->priv->current_job == NULL && + channel->priv->backend_handle != NULL) + { + channel->priv->current_job = g_vfs_job_close_read_new (channel, channel->priv->backend_handle, channel->priv->backend); + channel->priv->current_job_seq_nr = 0; + g_signal_emit (channel, signals[NEW_JOB], 0, channel->priv->current_job); + } + /* Otherwise we'll close when current_job is finished */ +} + +static void +got_command (GVfsReadChannel *channel, + guint32 command, + guint32 seq_nr, + guint32 arg1, + guint32 arg2) +{ + GVfsJob *job; + GError *error; + GSeekType seek_type; + + g_print ("got_command %d %d %d %d\n", command, seq_nr, arg1, arg2); + + if (channel->priv->current_job != NULL) + { + if (command != G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL) + { + g_warning ("Ignored non-cancel request with outstanding request"); + return; + } + + if (arg1 == channel->priv->current_job_seq_nr) + g_vfs_job_cancel (channel->priv->current_job); + return; + } + + job = NULL; + switch (command) + { + case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_READ: + job = g_vfs_job_read_new (channel, + channel->priv->backend_handle, + arg1, + channel->priv->backend); + break; + case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CLOSE: + job = g_vfs_job_close_read_new (channel, + channel->priv->backend_handle, + channel->priv->backend); + break; + case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_CUR: + case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_END: + case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_SET: + seek_type = G_SEEK_SET; + if (command == G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_END) + seek_type = G_SEEK_END; + else if (command == G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_CUR) + seek_type = G_SEEK_CUR; + + channel->priv->seek_generation++; + job = g_vfs_job_seek_read_new (channel, + channel->priv->backend_handle, + seek_type, + ((goffset)arg1) | (((goffset)arg2) << 32), + channel->priv->backend); + break; + + case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL: + /* Ignore cancel with no outstanding job */ + break; + + default: + error = NULL; + g_set_error (&error, G_VFS_ERROR, + G_VFS_ERROR_INTERNAL_ERROR, + "Unknown stream command %d\n", command); + g_vfs_read_channel_send_error (channel, error); + g_error_free (error); + break; + } + + if (job) + { + channel->priv->current_job = job; + channel->priv->current_job_seq_nr = seq_nr; + g_signal_emit (channel, signals[NEW_JOB], 0, job); + } +} + +static void +command_read_cb (GInputStream *input_stream, + void *buffer, + gsize count_requested, + gssize count_read, + gpointer data, + GError *error) +{ + RequestReader *reader = data; + GVfsDaemonSocketProtocolRequest *cmd; + guint32 seq_nr; + guint32 command; + guint32 arg1, arg2; + + if (reader->read_channel == NULL) + { + /* ReadChannel was finalized */ + g_object_unref (reader->command_stream); + g_free (reader); + return; + } + + if (count_read <= 0) + { + reader->read_channel->priv->request_reader = NULL; + g_vfs_read_channel_connection_closed (reader->read_channel); + g_object_unref (reader->command_stream); + g_free (reader); + return; + } + + reader->buffer_size += count_read; + + if (reader->buffer_size < G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE) + { + g_input_stream_read_async (reader->command_stream, + reader->buffer + reader->buffer_size, + G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE - reader->buffer_size, + 0, + command_read_cb, + reader, + NULL); + return; + } + + cmd = (GVfsDaemonSocketProtocolRequest *)reader->buffer; + command = g_ntohl (cmd->command); + arg1 = g_ntohl (cmd->arg1); + arg2 = g_ntohl (cmd->arg2); + seq_nr = g_ntohl (cmd->seq_nr); + reader->buffer_size = 0; + + got_command (reader->read_channel, command, seq_nr, arg1, arg2); + + /* Request more commands, so can get cancel requests */ + + reader->buffer_size = 0; + g_input_stream_read_async (reader->command_stream, + reader->buffer + reader->buffer_size, + G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE - reader->buffer_size, + 0, + command_read_cb, + reader, + NULL); +} + +static void +start_request_reader (GVfsReadChannel *channel) +{ + RequestReader *reader; + + reader = g_new0 (RequestReader, 1); + reader->read_channel = channel; + reader->command_stream = g_object_ref (channel->priv->command_stream); + reader->buffer_size = 0; + + g_input_stream_read_async (reader->command_stream, + reader->buffer + reader->buffer_size, + G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE - reader->buffer_size, + 0, + command_read_cb, + reader, + NULL); + + channel->priv->request_reader = reader; +} + + +static void +send_reply_cb (GOutputStream *output_stream, + void *buffer, + gsize bytes_requested, + gssize bytes_written, + gpointer data, + GError *error) +{ + GVfsReadChannel *channel = data; + GVfsJob *job; + + g_print ("send_reply_cb: %d\n", bytes_written); + + if (bytes_written <= 0) + { + g_vfs_read_channel_connection_closed (channel); + goto error_out; + } + + if (channel->priv->reply_buffer_pos < G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE) + { + channel->priv->reply_buffer_pos += bytes_written; + + /* Write more of reply header if needed */ + if (channel->priv->reply_buffer_pos < G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE) + { + g_output_stream_write_async (channel->priv->reply_stream, + channel->priv->reply_buffer + channel->priv->reply_buffer_pos, + G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE - channel->priv->reply_buffer_pos, + 0, + send_reply_cb, channel, + NULL); + return; + } + bytes_written = 0; + } + + channel->priv->output_data_pos += bytes_written; + + /* Write more of output_data if needed */ + if (channel->priv->output_data != NULL && + channel->priv->output_data_pos < channel->priv->output_data_size) + { + g_output_stream_write_async (channel->priv->reply_stream, + channel->priv->output_data + channel->priv->output_data_pos, + channel->priv->output_data_size - channel->priv->output_data_pos, + 0, + send_reply_cb, channel, + NULL); + return; + } + + error_out: + + /* Sent full reply */ + channel->priv->output_data = NULL; + + job = channel->priv->current_job; + channel->priv->current_job = NULL; + g_vfs_job_emit_finished (job); + + if (G_IS_VFS_JOB_CLOSE_READ (job)) + { + g_signal_emit (channel, signals[CLOSED], 0); + channel->priv->backend_handle = NULL; + } + else if (channel->priv->connection_closed) + { + channel->priv->current_job = g_vfs_job_close_read_new (channel, channel->priv->backend_handle, + channel->priv->backend); + channel->priv->current_job_seq_nr = 0; + g_signal_emit (channel, signals[NEW_JOB], 0, channel->priv->current_job); + } + + g_object_unref (job); + g_print ("Sent reply\n"); +} + +/* Might be called on an i/o thread */ +static void +send_reply (GVfsReadChannel *channel, + gboolean use_header, + char *data, + gsize data_len) +{ + + channel->priv->output_data = data; + channel->priv->output_data_size = data_len; + channel->priv->output_data_pos = 0; + + if (use_header) + { + channel->priv->reply_buffer_pos = 0; + + g_output_stream_write_async (channel->priv->reply_stream, + channel->priv->reply_buffer, + G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE, + 0, + send_reply_cb, channel, + NULL); + } + else + { + channel->priv->reply_buffer_pos = G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE; + + g_output_stream_write_async (channel->priv->reply_stream, + channel->priv->output_data, + channel->priv->output_data_size, + 0, + send_reply_cb, channel, + NULL); + } +} + +/* Might be called on an i/o thread + */ +void +g_vfs_read_channel_send_error (GVfsReadChannel *read_channel, + GError *error) +{ + char *data; + gsize data_len; + + data = g_error_to_daemon_reply (error, read_channel->priv->current_job_seq_nr, &data_len); + send_reply (read_channel, FALSE, data, data_len); +} + + +/* Might be called on an i/o thread + */ +void +g_vfs_read_channel_send_seek_offset (GVfsReadChannel *read_channel, + goffset offset) +{ + GVfsDaemonSocketProtocolReply *reply; + + reply = (GVfsDaemonSocketProtocolReply *)read_channel->priv->reply_buffer; + reply->type = g_htonl (G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SEEK_POS); + reply->seq_nr = g_htonl (read_channel->priv->current_job_seq_nr); + reply->arg1 = g_htonl (offset & 0xffffffff); + reply->arg2 = g_htonl (offset >> 32); + + send_reply (read_channel, TRUE, NULL, 0); +} + +/* Might be called on an i/o thread + */ +void +g_vfs_read_channel_send_closed (GVfsReadChannel *read_channel) +{ + GVfsDaemonSocketProtocolReply *reply; + + reply = (GVfsDaemonSocketProtocolReply *)read_channel->priv->reply_buffer; + reply->type = g_htonl (G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_CLOSED); + reply->seq_nr = g_htonl (read_channel->priv->current_job_seq_nr); + reply->arg1 = g_htonl (0); + reply->arg2 = g_htonl (0); + + send_reply (read_channel, TRUE, NULL, 0); +} + +/* Might be called on an i/o thread + */ +void +g_vfs_read_channel_send_data (GVfsReadChannel *read_channel, + char *buffer, + gsize count) +{ + GVfsDaemonSocketProtocolReply *reply; + + reply = (GVfsDaemonSocketProtocolReply *)read_channel->priv->reply_buffer; + reply->type = g_htonl (G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_DATA); + reply->seq_nr = g_htonl (read_channel->priv->current_job_seq_nr); + reply->arg1 = g_htonl (count); + reply->arg2 = g_htonl (read_channel->priv->seek_generation); + + send_reply (read_channel, TRUE, buffer, count); +} + +GVfsReadChannel * +g_vfs_read_channel_new (GVfsBackend *backend, + GError **error) +{ + GVfsReadChannel *channel; + int socket_fds[2]; + int ret; + + ret = socketpair (AF_UNIX, SOCK_STREAM, 0, socket_fds); + if (ret == -1) + { + g_set_error (error, G_FILE_ERROR, + g_file_error_from_errno (errno), + _("Error creating socket pair")); + return NULL; + } + + channel = g_object_new (G_TYPE_VFS_READ_CHANNEL, NULL); + channel->priv->backend = backend; + channel->priv->command_stream = g_input_stream_socket_new (socket_fds[0], TRUE); + channel->priv->reply_stream = g_output_stream_socket_new (socket_fds[0], FALSE); + channel->priv->remote_fd = socket_fds[1]; + + start_request_reader (channel); + + return channel; +} + +int +g_vfs_read_channel_steal_remote_fd (GVfsReadChannel *channel) +{ + int fd; + fd = channel->priv->remote_fd; + channel->priv->remote_fd = -1; + return fd; +} + +GVfsBackend * +g_vfs_read_channel_get_backend (GVfsReadChannel *read_channel) +{ + return read_channel->priv->backend; +} + +void +g_vfs_read_channel_set_backend_handle (GVfsReadChannel *read_channel, + GVfsBackendHandle backend_handle) +{ + read_channel->priv->backend_handle = backend_handle; +} diff --git a/daemon/gvfsreadchannel.h b/daemon/gvfsreadchannel.h new file mode 100644 index 00000000..3ff59e1f --- /dev/null +++ b/daemon/gvfsreadchannel.h @@ -0,0 +1,63 @@ +#ifndef __G_VFS_READ_CHANNEL_H__ +#define __G_VFS_READ_CHANNEL_H__ + +#include <glib-object.h> +#include <gvfsjob.h> +#include <gvfsbackend.h> +#include <gvfs/gvfstypes.h> + +G_BEGIN_DECLS + +#define G_TYPE_VFS_READ_CHANNEL (g_vfs_read_channel_get_type ()) +#define G_VFS_READ_CHANNEL(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), G_TYPE_VFS_READ_CHANNEL, GVfsReadChannel)) +#define G_VFS_READ_CHANNEL_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), G_TYPE_VFS_READ_CHANNEL, GVfsReadChannelClass)) +#define G_IS_VFS_READ_CHANNEL(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), G_TYPE_VFS_READ_CHANNEL)) +#define G_IS_VFS_READ_CHANNEL_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), G_TYPE_VFS_READ_CHANNEL)) +#define G_VFS_READ_CHANNEL_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), G_TYPE_VFS_READ_CHANNEL, GVfsReadChannelClass)) + +typedef struct _GVfsReadChannel GVfsReadChannel; +typedef struct _GVfsReadChannelClass GVfsReadChannelClass; +typedef struct _GVfsReadChannelPrivate GVfsReadChannelPrivate; + +struct _GVfsReadChannel +{ + GObject parent_instance; + + GVfsReadChannelPrivate *priv; +}; + +struct _GVfsReadChannelClass +{ + GObjectClass parent_class; + + /* signals */ + + void (*new_job) (GVfsReadChannel *stream, + GVfsJob *job); + void (*closed) (GVfsReadChannel *stream); +}; + +GType g_vfs_read_channel_get_type (void) G_GNUC_CONST; + +GVfsReadChannel *g_vfs_read_channel_new (GVfsBackend *backend, + GError **error); +int g_vfs_read_channel_steal_remote_fd (GVfsReadChannel *read_channel); +GVfsBackend *g_vfs_read_channel_get_backend (GVfsReadChannel *read_channel); +void g_vfs_read_channel_set_backend_handle (GVfsReadChannel *read_channel, + GVfsBackendHandle backend_handle); +gboolean g_vfs_read_channel_has_job (GVfsReadChannel *read_channel); +GVfsJob * g_vfs_read_channel_get_job (GVfsReadChannel *read_channel); +void g_vfs_read_channel_send_data (GVfsReadChannel *read_channel, + char *buffer, + gsize count); +void g_vfs_read_channel_send_error (GVfsReadChannel *read_channel, + GError *error); +void g_vfs_read_channel_send_closed (GVfsReadChannel *read_channel); +void g_vfs_read_channel_send_seek_offset (GVfsReadChannel *read_channel, + goffset offset); + +/* TODO: i/o priority? */ + +G_END_DECLS + +#endif /* __G_VFS_READ_CHANNEL_H__ */ diff --git a/daemon/gvfsreadstream.c b/daemon/gvfsreadstream.c deleted file mode 100644 index 41e9b3ea..00000000 --- a/daemon/gvfsreadstream.c +++ /dev/null @@ -1,552 +0,0 @@ -#include <config.h> - -#include <unistd.h> -#include <errno.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <sys/un.h> -#include <unistd.h> -#include <fcntl.h> - -#include <glib.h> -#include <glib-object.h> -#include <glib/gi18n.h> -#include <dbus-gmain.h> -#include <gvfsreadstream.h> -#include <gvfs/ginputstreamsocket.h> -#include <gvfs/goutputstreamsocket.h> -#include <gvfsdaemonprotocol.h> -#include <gvfsdaemonutils.h> -#include <gvfsjobread.h> -#include <gvfsjobseekread.h> -#include <gvfsjobcloseread.h> - -G_DEFINE_TYPE (GVfsReadStream, g_vfs_read_stream, G_TYPE_OBJECT); - -enum { - PROP_0, -}; - -enum { - NEW_JOB, - CLOSED, - LAST_SIGNAL -}; - -static guint signals[LAST_SIGNAL] = { 0 }; - -typedef struct -{ - GVfsReadStream *read_stream; - GInputStream *command_stream; - char buffer[G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE]; - int buffer_size; -} RequestReader; - -struct _GVfsReadStreamPrivate -{ - GVfsBackend *backend; - gboolean connection_closed; - GInputStream *command_stream; - GOutputStream *reply_stream; - int remote_fd; - int seek_generation; - - GVfsBackendHandle backend_handle; - GVfsJob *current_job; - guint32 current_job_seq_nr; - - RequestReader *request_reader; - - char reply_buffer[G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE]; - int reply_buffer_pos; - - char *output_data; /* Owned by job */ - gsize output_data_size; - gsize output_data_pos; -}; - -static void start_request_reader (GVfsReadStream *stream); - -static void -g_vfs_read_stream_finalize (GObject *object) -{ - GVfsReadStream *read_stream; - - read_stream = G_VFS_READ_STREAM (object); - - if (read_stream->priv->current_job) - g_object_unref (read_stream->priv->current_job); - read_stream->priv->current_job = NULL; - - if (read_stream->priv->reply_stream) - g_object_unref (read_stream->priv->reply_stream); - read_stream->priv->reply_stream = NULL; - - if (read_stream->priv->request_reader) - read_stream->priv->request_reader->read_stream = NULL; - read_stream->priv->request_reader = NULL; - - if (read_stream->priv->command_stream) - g_object_unref (read_stream->priv->command_stream); - read_stream->priv->command_stream = NULL; - - if (read_stream->priv->remote_fd != -1) - close (read_stream->priv->remote_fd); - - g_assert (read_stream->priv->backend_handle == NULL); - - if (G_OBJECT_CLASS (g_vfs_read_stream_parent_class)->finalize) - (*G_OBJECT_CLASS (g_vfs_read_stream_parent_class)->finalize) (object); -} - -static void -g_vfs_read_stream_class_init (GVfsReadStreamClass *klass) -{ - GObjectClass *gobject_class = G_OBJECT_CLASS (klass); - - g_type_class_add_private (klass, sizeof (GVfsReadStreamPrivate)); - - gobject_class->finalize = g_vfs_read_stream_finalize; - - signals[NEW_JOB] = - g_signal_new ("new_job", - G_TYPE_FROM_CLASS (gobject_class), - G_SIGNAL_RUN_LAST, - G_STRUCT_OFFSET (GVfsReadStreamClass, new_job), - NULL, NULL, - g_cclosure_marshal_VOID__POINTER, - G_TYPE_NONE, 1, G_TYPE_VFS_JOB); - - signals[CLOSED] = - g_signal_new ("closed", - G_TYPE_FROM_CLASS (gobject_class), - G_SIGNAL_RUN_LAST, - G_STRUCT_OFFSET (GVfsReadStreamClass, closed), - NULL, NULL, - g_cclosure_marshal_VOID__VOID, - G_TYPE_NONE, 0); - -} - -static void -g_vfs_read_stream_init (GVfsReadStream *stream) -{ - stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, - G_TYPE_VFS_READ_STREAM, - GVfsReadStreamPrivate); - stream->priv->remote_fd = -1; -} - -static void -g_vfs_read_stream_connection_closed (GVfsReadStream *stream) -{ - if (stream->priv->connection_closed) - return; - stream->priv->connection_closed = TRUE; - - if (stream->priv->current_job == NULL && - stream->priv->backend_handle != NULL) - { - stream->priv->current_job = g_vfs_job_close_read_new (stream, stream->priv->backend_handle, stream->priv->backend); - stream->priv->current_job_seq_nr = 0; - g_signal_emit (stream, signals[NEW_JOB], 0, stream->priv->current_job); - } - /* Otherwise we'll close when current_job is finished */ -} - -static void -got_command (GVfsReadStream *stream, - guint32 command, - guint32 seq_nr, - guint32 arg1, - guint32 arg2) -{ - GVfsJob *job; - GError *error; - GSeekType seek_type; - - g_print ("got_command %d %d %d %d\n", command, seq_nr, arg1, arg2); - - if (stream->priv->current_job != NULL) - { - if (command != G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL) - { - g_warning ("Ignored non-cancel request with outstanding request"); - return; - } - - if (arg1 == stream->priv->current_job_seq_nr) - g_vfs_job_cancel (stream->priv->current_job); - return; - } - - job = NULL; - switch (command) - { - case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_READ: - job = g_vfs_job_read_new (stream, - stream->priv->backend_handle, - arg1, - stream->priv->backend); - break; - case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CLOSE: - job = g_vfs_job_close_read_new (stream, - stream->priv->backend_handle, - stream->priv->backend); - break; - case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_CUR: - case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_END: - case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_SET: - seek_type = G_SEEK_SET; - if (command == G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_END) - seek_type = G_SEEK_END; - else if (command == G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_CUR) - seek_type = G_SEEK_CUR; - - stream->priv->seek_generation++; - job = g_vfs_job_seek_read_new (stream, - stream->priv->backend_handle, - seek_type, - ((goffset)arg1) | (((goffset)arg2) << 32), - stream->priv->backend); - break; - - case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL: - /* Ignore cancel with no outstanding job */ - break; - - default: - error = NULL; - g_set_error (&error, G_VFS_ERROR, - G_VFS_ERROR_INTERNAL_ERROR, - "Unknown stream command %d\n", command); - g_vfs_read_stream_send_error (stream, error); - g_error_free (error); - break; - } - - if (job) - { - stream->priv->current_job = job; - stream->priv->current_job_seq_nr = seq_nr; - g_signal_emit (stream, signals[NEW_JOB], 0, job); - } -} - -static void -command_read_cb (GInputStream *input_stream, - void *buffer, - gsize count_requested, - gssize count_read, - gpointer data, - GError *error) -{ - RequestReader *reader = data; - GVfsDaemonSocketProtocolRequest *cmd; - guint32 seq_nr; - guint32 command; - guint32 arg1, arg2; - - if (reader->read_stream == NULL) - { - /* ReadStream was finalized */ - g_object_unref (reader->command_stream); - g_free (reader); - return; - } - - if (count_read <= 0) - { - reader->read_stream->priv->request_reader = NULL; - g_vfs_read_stream_connection_closed (reader->read_stream); - g_object_unref (reader->command_stream); - g_free (reader); - return; - } - - reader->buffer_size += count_read; - - if (reader->buffer_size < G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE) - { - g_input_stream_read_async (reader->command_stream, - reader->buffer + reader->buffer_size, - G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE - reader->buffer_size, - 0, - command_read_cb, - reader, - NULL); - return; - } - - cmd = (GVfsDaemonSocketProtocolRequest *)reader->buffer; - command = g_ntohl (cmd->command); - arg1 = g_ntohl (cmd->arg1); - arg2 = g_ntohl (cmd->arg2); - seq_nr = g_ntohl (cmd->seq_nr); - reader->buffer_size = 0; - - got_command (reader->read_stream, command, seq_nr, arg1, arg2); - - /* Request more commands, so can get cancel requests */ - - reader->buffer_size = 0; - g_input_stream_read_async (reader->command_stream, - reader->buffer + reader->buffer_size, - G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE - reader->buffer_size, - 0, - command_read_cb, - reader, - NULL); -} - -static void -start_request_reader (GVfsReadStream *stream) -{ - RequestReader *reader; - - reader = g_new0 (RequestReader, 1); - reader->read_stream = stream; - reader->command_stream = g_object_ref (stream->priv->command_stream); - reader->buffer_size = 0; - - g_input_stream_read_async (reader->command_stream, - reader->buffer + reader->buffer_size, - G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE - reader->buffer_size, - 0, - command_read_cb, - reader, - NULL); - - stream->priv->request_reader = reader; -} - - -static void -send_reply_cb (GOutputStream *output_stream, - void *buffer, - gsize bytes_requested, - gssize bytes_written, - gpointer data, - GError *error) -{ - GVfsReadStream *stream = data; - GVfsJob *job; - - g_print ("send_reply_cb: %d\n", bytes_written); - - if (bytes_written <= 0) - { - g_vfs_read_stream_connection_closed (stream); - goto error_out; - } - - if (stream->priv->reply_buffer_pos < G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE) - { - stream->priv->reply_buffer_pos += bytes_written; - - /* Write more of reply header if needed */ - if (stream->priv->reply_buffer_pos < G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE) - { - g_output_stream_write_async (stream->priv->reply_stream, - stream->priv->reply_buffer + stream->priv->reply_buffer_pos, - G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE - stream->priv->reply_buffer_pos, - 0, - send_reply_cb, stream, - NULL); - return; - } - bytes_written = 0; - } - - stream->priv->output_data_pos += bytes_written; - - /* Write more of output_data if needed */ - if (stream->priv->output_data != NULL && - stream->priv->output_data_pos < stream->priv->output_data_size) - { - g_output_stream_write_async (stream->priv->reply_stream, - stream->priv->output_data + stream->priv->output_data_pos, - stream->priv->output_data_size - stream->priv->output_data_pos, - 0, - send_reply_cb, stream, - NULL); - return; - } - - error_out: - - /* Sent full reply */ - stream->priv->output_data = NULL; - - job = stream->priv->current_job; - stream->priv->current_job = NULL; - g_vfs_job_emit_finished (job); - - if (G_IS_VFS_JOB_CLOSE_READ (job)) - { - g_signal_emit (stream, signals[CLOSED], 0); - stream->priv->backend_handle = NULL; - } - else if (stream->priv->connection_closed) - { - stream->priv->current_job = g_vfs_job_close_read_new (stream, stream->priv->backend_handle, - stream->priv->backend); - stream->priv->current_job_seq_nr = 0; - g_signal_emit (stream, signals[NEW_JOB], 0, stream->priv->current_job); - } - - g_object_unref (job); - g_print ("Sent reply\n"); -} - -/* Might be called on an i/o thread */ -static void -send_reply (GVfsReadStream *stream, - gboolean use_header, - char *data, - gsize data_len) -{ - - stream->priv->output_data = data; - stream->priv->output_data_size = data_len; - stream->priv->output_data_pos = 0; - - if (use_header) - { - stream->priv->reply_buffer_pos = 0; - - g_output_stream_write_async (stream->priv->reply_stream, - stream->priv->reply_buffer, - G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE, - 0, - send_reply_cb, stream, - NULL); - } - else - { - stream->priv->reply_buffer_pos = G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE; - - g_output_stream_write_async (stream->priv->reply_stream, - stream->priv->output_data, - stream->priv->output_data_size, - 0, - send_reply_cb, stream, - NULL); - } -} - -/* Might be called on an i/o thread - */ -void -g_vfs_read_stream_send_error (GVfsReadStream *read_stream, - GError *error) -{ - char *data; - gsize data_len; - - data = g_error_to_daemon_reply (error, read_stream->priv->current_job_seq_nr, &data_len); - send_reply (read_stream, FALSE, data, data_len); -} - - -/* Might be called on an i/o thread - */ -void -g_vfs_read_stream_send_seek_offset (GVfsReadStream *read_stream, - goffset offset) -{ - GVfsDaemonSocketProtocolReply *reply; - - reply = (GVfsDaemonSocketProtocolReply *)read_stream->priv->reply_buffer; - reply->type = g_htonl (G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SEEK_POS); - reply->seq_nr = g_htonl (read_stream->priv->current_job_seq_nr); - reply->arg1 = g_htonl (offset & 0xffffffff); - reply->arg2 = g_htonl (offset >> 32); - - send_reply (read_stream, TRUE, NULL, 0); -} - -/* Might be called on an i/o thread - */ -void -g_vfs_read_stream_send_closed (GVfsReadStream *read_stream) -{ - GVfsDaemonSocketProtocolReply *reply; - - reply = (GVfsDaemonSocketProtocolReply *)read_stream->priv->reply_buffer; - reply->type = g_htonl (G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_CLOSED); - reply->seq_nr = g_htonl (read_stream->priv->current_job_seq_nr); - reply->arg1 = g_htonl (0); - reply->arg2 = g_htonl (0); - - send_reply (read_stream, TRUE, NULL, 0); -} - -/* Might be called on an i/o thread - */ -void -g_vfs_read_stream_send_data (GVfsReadStream *read_stream, - char *buffer, - gsize count) -{ - GVfsDaemonSocketProtocolReply *reply; - - reply = (GVfsDaemonSocketProtocolReply *)read_stream->priv->reply_buffer; - reply->type = g_htonl (G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_DATA); - reply->seq_nr = g_htonl (read_stream->priv->current_job_seq_nr); - reply->arg1 = g_htonl (count); - reply->arg2 = g_htonl (read_stream->priv->seek_generation); - - send_reply (read_stream, TRUE, buffer, count); -} - -GVfsReadStream * -g_vfs_read_stream_new (GVfsBackend *backend, - GError **error) -{ - GVfsReadStream *stream; - int socket_fds[2]; - int ret; - - ret = socketpair (AF_UNIX, SOCK_STREAM, 0, socket_fds); - if (ret == -1) - { - g_set_error (error, G_FILE_ERROR, - g_file_error_from_errno (errno), - _("Error creating socket pair")); - return NULL; - } - - stream = g_object_new (G_TYPE_VFS_READ_STREAM, NULL); - stream->priv->backend = backend; - stream->priv->command_stream = g_input_stream_socket_new (socket_fds[0], TRUE); - stream->priv->reply_stream = g_output_stream_socket_new (socket_fds[0], FALSE); - stream->priv->remote_fd = socket_fds[1]; - - start_request_reader (stream); - - return stream; -} - -int -g_vfs_read_stream_steal_remote_fd (GVfsReadStream *stream) -{ - int fd; - fd = stream->priv->remote_fd; - stream->priv->remote_fd = -1; - return fd; -} - -GVfsBackend * -g_vfs_read_stream_get_backend (GVfsReadStream *read_stream) -{ - return read_stream->priv->backend; -} - -void -g_vfs_read_stream_set_backend_handle (GVfsReadStream *read_stream, - GVfsBackendHandle backend_handle) -{ - read_stream->priv->backend_handle = backend_handle; -} diff --git a/daemon/gvfsreadstream.h b/daemon/gvfsreadstream.h deleted file mode 100644 index 1a571093..00000000 --- a/daemon/gvfsreadstream.h +++ /dev/null @@ -1,63 +0,0 @@ -#ifndef __G_VFS_READ_STREAM_H__ -#define __G_VFS_READ_STREAM_H__ - -#include <glib-object.h> -#include <gvfsjob.h> -#include <gvfsbackend.h> -#include <gvfs/gvfstypes.h> - -G_BEGIN_DECLS - -#define G_TYPE_VFS_READ_STREAM (g_vfs_read_stream_get_type ()) -#define G_VFS_READ_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), G_TYPE_VFS_READ_STREAM, GVfsReadStream)) -#define G_VFS_READ_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), G_TYPE_VFS_READ_STREAM, GVfsReadStreamClass)) -#define G_IS_VFS_READ_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), G_TYPE_VFS_READ_STREAM)) -#define G_IS_VFS_READ_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), G_TYPE_VFS_READ_STREAM)) -#define G_VFS_READ_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), G_TYPE_VFS_READ_STREAM, GVfsReadStreamClass)) - -typedef struct _GVfsReadStream GVfsReadStream; -typedef struct _GVfsReadStreamClass GVfsReadStreamClass; -typedef struct _GVfsReadStreamPrivate GVfsReadStreamPrivate; - -struct _GVfsReadStream -{ - GObject parent_instance; - - GVfsReadStreamPrivate *priv; -}; - -struct _GVfsReadStreamClass -{ - GObjectClass parent_class; - - /* signals */ - - void (*new_job) (GVfsReadStream *stream, - GVfsJob *job); - void (*closed) (GVfsReadStream *stream); -}; - -GType g_vfs_read_stream_get_type (void) G_GNUC_CONST; - -GVfsReadStream *g_vfs_read_stream_new (GVfsBackend *backend, - GError **error); -int g_vfs_read_stream_steal_remote_fd (GVfsReadStream *read_stream); -GVfsBackend *g_vfs_read_stream_get_backend (GVfsReadStream *read_stream); -void g_vfs_read_stream_set_backend_handle (GVfsReadStream *read_stream, - GVfsBackendHandle backend_handle); -gboolean g_vfs_read_stream_has_job (GVfsReadStream *read_stream); -GVfsJob * g_vfs_read_stream_get_job (GVfsReadStream *read_stream); -void g_vfs_read_stream_send_data (GVfsReadStream *read_stream, - char *buffer, - gsize count); -void g_vfs_read_stream_send_error (GVfsReadStream *read_stream, - GError *error); -void g_vfs_read_stream_send_closed (GVfsReadStream *read_stream); -void g_vfs_read_stream_send_seek_offset (GVfsReadStream *read_stream, - goffset offset); - -/* TODO: i/o priority? */ - -G_END_DECLS - -#endif /* __G_VFS_READ_STREAM_H__ */ |