diff options
author | Alexander Larsson <alexl@src.gnome.org> | 2007-09-13 11:57:40 +0000 |
---|---|---|
committer | Alexander Larsson <alexl@src.gnome.org> | 2007-09-13 11:57:40 +0000 |
commit | e8762e3d82c5cde9d548383bdb3df2d9222e2708 (patch) | |
tree | 99dc9529164c0e182f31f65644763b8aa8286748 | |
parent | fd9e5a6cd9eda6908eb48cbd433434f192b90922 (diff) | |
download | gvfs-e8762e3d82c5cde9d548383bdb3df2d9222e2708.tar.gz |
Move generic code from GVfsReadChannel to GVfsChannel base class
Original git commit by Alexander Larsson <alex@greebo.(none)> at 1171634935 +0100
svn path=/trunk/; revision=354
-rw-r--r-- | client/gfileinputstreamdaemon.c | 1 | ||||
-rw-r--r-- | common/gvfsdaemonprotocol.h | 3 | ||||
-rw-r--r-- | daemon/Makefile.am | 1 | ||||
-rw-r--r-- | daemon/gvfschannel.c | 610 | ||||
-rw-r--r-- | daemon/gvfschannel.h | 66 | ||||
-rw-r--r-- | daemon/gvfsjobcloseread.c | 2 | ||||
-rw-r--r-- | daemon/gvfsjobopenforread.c | 12 | ||||
-rw-r--r-- | daemon/gvfsjobread.c | 2 | ||||
-rw-r--r-- | daemon/gvfsjobseekread.c | 2 | ||||
-rw-r--r-- | daemon/gvfsreadchannel.c | 494 | ||||
-rw-r--r-- | daemon/gvfsreadchannel.h | 24 |
11 files changed, 771 insertions, 446 deletions
diff --git a/client/gfileinputstreamdaemon.c b/client/gfileinputstreamdaemon.c index 3e041631..c7c21fca 100644 --- a/client/gfileinputstreamdaemon.c +++ b/client/gfileinputstreamdaemon.c @@ -278,6 +278,7 @@ append_request (GFileInputStreamDaemon *stream, guint32 command, cmd.seq_nr = g_htonl (stream->priv->seq_nr++); cmd.arg1 = g_htonl (arg1); cmd.arg2 = g_htonl (arg2); + cmd.data_len = 0; g_string_append_len (stream->priv->output_buffer, (char *)&cmd, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE); diff --git a/common/gvfsdaemonprotocol.h b/common/gvfsdaemonprotocol.h index c28bba52..7caa7b34 100644 --- a/common/gvfsdaemonprotocol.h +++ b/common/gvfsdaemonprotocol.h @@ -44,9 +44,10 @@ typedef struct { guint32 seq_nr; guint32 arg1; guint32 arg2; + guint32 data_len; } GVfsDaemonSocketProtocolRequest; -#define G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE 16 +#define G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE sizeof(GVfsDaemonSocketProtocolRequest) #define G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_READ 0 #define G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CLOSE 1 diff --git a/daemon/Makefile.am b/daemon/Makefile.am index 6427d031..7e8185fb 100644 --- a/daemon/Makefile.am +++ b/daemon/Makefile.am @@ -47,6 +47,7 @@ noinst_PROGRAMS = \ libdaemon_la_SOURCES = \ gvfsdaemon.c gvfsdaemon.h \ gvfsbackend.c gvfsbackend.h \ + gvfschannel.c gvfschannel.h \ gvfsreadchannel.c gvfsreadchannel.h \ gvfsdaemonutils.c gvfsdaemonutils.h \ gvfsjob.c gvfsjob.h \ diff --git a/daemon/gvfschannel.c b/daemon/gvfschannel.c new file mode 100644 index 00000000..088c5f6a --- /dev/null +++ b/daemon/gvfschannel.c @@ -0,0 +1,610 @@ +#include <config.h> + +#include <unistd.h> +#include <errno.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <unistd.h> +#include <fcntl.h> + +#include <glib.h> +#include <glib-object.h> +#include <glib/gi18n.h> +#include <dbus-gmain.h> +#include <gvfsreadchannel.h> +#include <gio/ginputstreamsocket.h> +#include <gio/goutputstreamsocket.h> +#include <gvfsdaemonprotocol.h> +#include <gvfsdaemonutils.h> +#include <gvfsjobread.h> +#include <gvfsjobseekread.h> +#include <gvfsjobcloseread.h> + +static void g_vfs_channel_job_source_iface_init (GVfsJobSourceIface *iface); + +G_DEFINE_TYPE_WITH_CODE (GVfsChannel, g_vfs_channel, G_TYPE_OBJECT, + G_IMPLEMENT_INTERFACE (G_TYPE_VFS_JOB_SOURCE, + g_vfs_channel_job_source_iface_init)) + +/* TODO: Real P_() */ +#define P_(_x) (_x) + +enum { + PROP_0, + PROP_BACKEND +}; + +typedef struct +{ + GVfsChannel *channel; + GInputStream *command_stream; + char buffer[G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE]; + int buffer_size; + char *data; + gsize data_len; + gsize data_pos; +} RequestReader; + +struct _GVfsChannelPrivate +{ + GVfsBackend *backend; + gboolean connection_closed; + GInputStream *command_stream; + GOutputStream *reply_stream; + int remote_fd; + + GVfsBackendHandle backend_handle; + GVfsJob *current_job; + guint32 current_job_seq_nr; + + RequestReader *request_reader; + + char reply_buffer[G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE]; + int reply_buffer_pos; + + char *output_data; /* Owned by job */ + gsize output_data_size; + gsize output_data_pos; +}; + +static void start_request_reader (GVfsChannel *channel); +static void g_vfs_channel_get_property (GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec); +static void g_vfs_channel_set_property (GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec); + + +static void +g_vfs_channel_finalize (GObject *object) +{ + GVfsChannel *channel; + + channel = G_VFS_CHANNEL (object); + + if (channel->priv->current_job) + g_object_unref (channel->priv->current_job); + channel->priv->current_job = NULL; + + if (channel->priv->reply_stream) + g_object_unref (channel->priv->reply_stream); + channel->priv->reply_stream = NULL; + + if (channel->priv->request_reader) + channel->priv->request_reader->channel = NULL; + channel->priv->request_reader = NULL; + + if (channel->priv->command_stream) + g_object_unref (channel->priv->command_stream); + channel->priv->command_stream = NULL; + + if (channel->priv->remote_fd != -1) + close (channel->priv->remote_fd); + + if (channel->priv->backend) + g_object_unref (channel->priv->backend); + + g_assert (channel->priv->backend_handle == NULL); + + if (G_OBJECT_CLASS (g_vfs_channel_parent_class)->finalize) + (*G_OBJECT_CLASS (g_vfs_channel_parent_class)->finalize) (object); +} + +static void +g_vfs_channel_job_source_iface_init (GVfsJobSourceIface *iface) +{ +} + +static void +g_vfs_channel_class_init (GVfsChannelClass *klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + + g_type_class_add_private (klass, sizeof (GVfsChannelPrivate)); + + gobject_class->finalize = g_vfs_channel_finalize; + gobject_class->set_property = g_vfs_channel_set_property; + gobject_class->get_property = g_vfs_channel_get_property; + + g_object_class_install_property (gobject_class, + PROP_BACKEND, + g_param_spec_object ("backend", + P_("Backend"), + P_("Backend implementation to use"), + G_TYPE_VFS_BACKEND, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | + G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB)); +} + +static void +g_vfs_channel_init (GVfsChannel *channel) +{ + int socket_fds[2]; + int ret; + + channel->priv = G_TYPE_INSTANCE_GET_PRIVATE (channel, + G_TYPE_VFS_CHANNEL, + GVfsChannelPrivate); + channel->priv->remote_fd = -1; + + ret = socketpair (AF_UNIX, SOCK_STREAM, 0, socket_fds); + if (ret == -1) + g_warning ("Error creating socket pair: %d\n", errno); + else + { + channel->priv->command_stream = g_input_stream_socket_new (socket_fds[0], TRUE); + channel->priv->reply_stream = g_output_stream_socket_new (socket_fds[0], FALSE); + channel->priv->remote_fd = socket_fds[1]; + + start_request_reader (channel); + } +} + +static void +g_vfs_channel_set_property (GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + GVfsChannel *channel = G_VFS_CHANNEL (object); + + switch (prop_id) + { + case PROP_BACKEND: + if (channel->priv->backend) + g_object_unref (channel->priv->backend); + channel->priv->backend = G_VFS_BACKEND (g_value_dup_object (value)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +g_vfs_channel_get_property (GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec) +{ + GVfsChannel *channel = G_VFS_CHANNEL (object); + + switch (prop_id) + { + case PROP_BACKEND: + g_value_set_object (value, channel->priv->backend); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +g_vfs_channel_connection_closed (GVfsChannel *channel) +{ + GVfsChannelClass *class; + + if (channel->priv->connection_closed) + return; + channel->priv->connection_closed = TRUE; + + if (channel->priv->current_job == NULL && + channel->priv->backend_handle != NULL) + { + class = G_VFS_CHANNEL_GET_CLASS (channel); + + channel->priv->current_job = class->close (channel); + channel->priv->current_job_seq_nr = 0; + g_vfs_job_source_new_job (G_VFS_JOB_SOURCE (channel), channel->priv->current_job); + } + /* Otherwise we'll close when current_job is finished */ +} + +static void +request_reader_free (RequestReader *reader) +{ + g_object_unref (reader->command_stream); + g_free (reader->data); + g_free (reader); +} + +static void +got_request (GVfsChannel *channel, + GVfsDaemonSocketProtocolRequest *request, + gpointer data, gsize data_len) +{ + GVfsChannelClass *class; + GVfsJob *job; + GError *error; + guint32 seq_nr, command, arg1, arg2; + + command = g_ntohl (request->command); + arg1 = g_ntohl (request->arg1); + arg2 = g_ntohl (request->arg2); + seq_nr = g_ntohl (request->seq_nr); + + g_print ("got_command %d %d %d %d\n", command, seq_nr, arg1, arg2); + + job = NULL; + + if (channel->priv->current_job != NULL) + { + if (command != G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL) + { + g_warning ("Ignored non-cancel request with outstanding request"); + /* Can't send an error reply now, that would confuse the reply + to the outstanding request */ + return; + } + + if (arg1 == channel->priv->current_job_seq_nr) + g_vfs_job_cancel (channel->priv->current_job); + return; + } + /* Ignore cancel with no outstanding job */ + else if (command != G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL) + { + class = G_VFS_CHANNEL_GET_CLASS (channel); + + error = NULL; + job = class->handle_request (channel, + command, seq_nr, + arg1, arg2, + data, data_len, + &error); + if (job) + { + channel->priv->current_job = job; + channel->priv->current_job_seq_nr = seq_nr; + g_vfs_job_source_new_job (G_VFS_JOB_SOURCE (channel), channel->priv->current_job); + } + else + { + g_vfs_channel_send_error (channel, error); + g_error_free (error); + } + } +} + +static void command_read_cb (GInputStream *input_stream, + void *buffer, + gsize count_requested, + gssize count_read, + gpointer data, + GError *error); + + +static void +finish_request (RequestReader *reader) +{ + got_request (reader->channel, (GVfsDaemonSocketProtocolRequest *)reader->buffer, + reader->data, reader->data_len); + + /* Request more commands immediately, so can get cancel requests */ + + reader->buffer_size = 0; + g_free (reader->data); + reader->data = NULL; + reader->data_len = 0; + g_input_stream_read_async (reader->command_stream, + reader->buffer + reader->buffer_size, + G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE - reader->buffer_size, + 0, + command_read_cb, + reader, + NULL); +} + +static void +data_read_cb (GInputStream *input_stream, + void *buffer, + gsize count_requested, + gssize count_read, + gpointer data, + GError *error) +{ + RequestReader *reader = data; + + if (reader->channel == NULL) + { + /* Channel was finalized */ + request_reader_free (reader); + return; + } + + if (count_read <= 0) + { + reader->channel->priv->request_reader = NULL; + g_vfs_channel_connection_closed (reader->channel); + request_reader_free (reader); + return; + } + + reader->data_pos += count_read; + + if (reader->data_pos < reader->data_len) + { + g_input_stream_read_async (reader->command_stream, + reader->data + reader->data_pos, + reader->data_len - reader->data_pos, + 0, + data_read_cb, + reader, + NULL); + return; + } + + finish_request (reader); +} + + +static void +command_read_cb (GInputStream *input_stream, + void *buffer, + gsize count_requested, + gssize count_read, + gpointer data, + GError *error) +{ + RequestReader *reader = data; + GVfsDaemonSocketProtocolRequest *request; + guint32 data_len; + + if (reader->channel == NULL) + { + /* Channel was finalized */ + request_reader_free (reader); + return; + } + + if (count_read <= 0) + { + reader->channel->priv->request_reader = NULL; + g_vfs_channel_connection_closed (reader->channel); + request_reader_free (reader); + return; + } + + reader->buffer_size += count_read; + + if (reader->buffer_size < G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE) + { + g_input_stream_read_async (reader->command_stream, + reader->buffer + reader->buffer_size, + G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE - reader->buffer_size, + 0, + command_read_cb, + reader, + NULL); + return; + } + + request = (GVfsDaemonSocketProtocolRequest *)reader->buffer; + data_len = g_ntohl (request->data_len); + + if (data_len > 0) + { + reader->data = g_malloc (data_len); + reader->data_len = data_len; + reader->data_pos = 0; + + g_input_stream_read_async (reader->command_stream, + reader->data + reader->data_pos, + reader->data_len - reader->data_pos, + 0, + data_read_cb, + reader, + NULL); + return; + } + + finish_request (reader); +} + +static void +start_request_reader (GVfsChannel *channel) +{ + RequestReader *reader; + + reader = g_new0 (RequestReader, 1); + reader->channel = channel; + reader->command_stream = g_object_ref (channel->priv->command_stream); + + g_input_stream_read_async (reader->command_stream, + reader->buffer + reader->buffer_size, + G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE - reader->buffer_size, + 0, + command_read_cb, + reader, + NULL); + + channel->priv->request_reader = reader; +} + +static void +send_reply_cb (GOutputStream *output_stream, + void *buffer, + gsize bytes_requested, + gssize bytes_written, + gpointer data, + GError *error) +{ + GVfsChannel *channel = data; + GVfsChannelClass *class; + GVfsJob *job; + + g_print ("send_reply_cb: %d\n", bytes_written); + + if (bytes_written <= 0) + { + g_vfs_channel_connection_closed (channel); + goto error_out; + } + + if (channel->priv->reply_buffer_pos < G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE) + { + channel->priv->reply_buffer_pos += bytes_written; + + /* Write more of reply header if needed */ + if (channel->priv->reply_buffer_pos < G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE) + { + g_output_stream_write_async (channel->priv->reply_stream, + channel->priv->reply_buffer + channel->priv->reply_buffer_pos, + G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE - channel->priv->reply_buffer_pos, + 0, + send_reply_cb, channel, + NULL); + return; + } + bytes_written = 0; + } + + channel->priv->output_data_pos += bytes_written; + + /* Write more of output_data if needed */ + if (channel->priv->output_data != NULL && + channel->priv->output_data_pos < channel->priv->output_data_size) + { + g_output_stream_write_async (channel->priv->reply_stream, + channel->priv->output_data + channel->priv->output_data_pos, + channel->priv->output_data_size - channel->priv->output_data_pos, + 0, + send_reply_cb, channel, + NULL); + return; + } + + error_out: + + /* Sent full reply */ + channel->priv->output_data = NULL; + + job = channel->priv->current_job; + channel->priv->current_job = NULL; + g_vfs_job_emit_finished (job); + + if (G_IS_VFS_JOB_CLOSE_READ (job)) + { + g_vfs_job_source_closed (G_VFS_JOB_SOURCE (channel)); + channel->priv->backend_handle = NULL; + } + else if (channel->priv->connection_closed) + { + class = G_VFS_CHANNEL_GET_CLASS (channel); + + channel->priv->current_job = class->close (channel); + channel->priv->current_job_seq_nr = 0; + g_vfs_job_source_new_job (G_VFS_JOB_SOURCE (channel), channel->priv->current_job); + } + + g_object_unref (job); + g_print ("Sent reply\n"); +} + +/* Might be called on an i/o thread */ +void +g_vfs_channel_send_reply (GVfsChannel *channel, + GVfsDaemonSocketProtocolReply *reply, + void *data, + gsize data_len) +{ + + channel->priv->output_data = data; + channel->priv->output_data_size = data_len; + channel->priv->output_data_pos = 0; + + if (reply != NULL) + { + memcpy (channel->priv->reply_buffer, reply, sizeof (GVfsDaemonSocketProtocolReply)); + channel->priv->reply_buffer_pos = 0; + + g_output_stream_write_async (channel->priv->reply_stream, + channel->priv->reply_buffer, + G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE, + 0, + send_reply_cb, channel, + NULL); + } + else + { + channel->priv->reply_buffer_pos = G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE; + g_output_stream_write_async (channel->priv->reply_stream, + channel->priv->output_data, + channel->priv->output_data_size, + 0, + send_reply_cb, channel, + NULL); + } +} + +/* Might be called on an i/o thread + */ +void +g_vfs_channel_send_error (GVfsChannel *channel, + GError *error) +{ + char *data; + gsize data_len; + + data = g_error_to_daemon_reply (error, channel->priv->current_job_seq_nr, &data_len); + g_vfs_channel_send_reply (channel, NULL, data, data_len); +} + +int +g_vfs_channel_steal_remote_fd (GVfsChannel *channel) +{ + int fd; + fd = channel->priv->remote_fd; + channel->priv->remote_fd = -1; + return fd; +} + +GVfsBackend * +g_vfs_channel_get_backend (GVfsChannel *channel) +{ + return channel->priv->backend; +} + +void +g_vfs_channel_set_backend_handle (GVfsChannel *channel, + GVfsBackendHandle backend_handle) +{ + channel->priv->backend_handle = backend_handle; +} + +GVfsBackendHandle +g_vfs_channel_get_backend_handle (GVfsChannel *channel) +{ + return channel->priv->backend_handle; +} + +guint32 +g_vfs_channel_get_current_seq_nr (GVfsChannel *channel) +{ + return channel->priv->current_job_seq_nr; +} diff --git a/daemon/gvfschannel.h b/daemon/gvfschannel.h new file mode 100644 index 00000000..61a76fe0 --- /dev/null +++ b/daemon/gvfschannel.h @@ -0,0 +1,66 @@ +#ifndef __G_VFS_CHANNEL_H__ +#define __G_VFS_CHANNEL_H__ + +#include <glib-object.h> +#include <gio/gvfstypes.h> +#include <gvfsjob.h> +#include <gvfsbackend.h> +#include <gvfsdaemonprotocol.h> + +G_BEGIN_DECLS + +#define G_TYPE_VFS_CHANNEL (g_vfs_channel_get_type ()) +#define G_VFS_CHANNEL(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), G_TYPE_VFS_CHANNEL, GVfsChannel)) +#define G_VFS_CHANNEL_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), G_TYPE_VFS_CHANNEL, GVfsChannelClass)) +#define G_IS_VFS_CHANNEL(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), G_TYPE_VFS_CHANNEL)) +#define G_IS_VFS_CHANNEL_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), G_TYPE_VFS_CHANNEL)) +#define G_VFS_CHANNEL_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), G_TYPE_VFS_CHANNEL, GVfsChannelClass)) + +typedef struct _GVfsChannel GVfsChannel; +typedef struct _GVfsChannelClass GVfsChannelClass; +typedef struct _GVfsChannelPrivate GVfsChannelPrivate; + +struct _GVfsChannel +{ + GObject parent_instance; + + GVfsChannelPrivate *priv; +}; + +struct _GVfsChannelClass +{ + GObjectClass parent_class; + + GVfsJob *(*close) (GVfsChannel *channel); + GVfsJob *(*handle_request) (GVfsChannel *channel, + guint32 command, + guint32 seq_nr, + guint32 arg1, + guint32 arg2, + gpointer data, + gsize data_len, + GError **error); +}; + +GType g_vfs_channel_get_type (void) G_GNUC_CONST; + +int g_vfs_channel_steal_remote_fd (GVfsChannel *channel); +GVfsBackend * g_vfs_channel_get_backend (GVfsChannel *channel); +GVfsBackendHandle g_vfs_channel_get_backend_handle (GVfsChannel *channel); +void g_vfs_channel_set_backend_handle (GVfsChannel *channel, + GVfsBackendHandle backend_handle); +gboolean g_vfs_channel_has_job (GVfsChannel *channel); +GVfsJob * g_vfs_channel_get_job (GVfsChannel *channel); +void g_vfs_channel_send_error (GVfsChannel *channel, + GError *error); +void g_vfs_channel_send_reply (GVfsChannel *channel, + GVfsDaemonSocketProtocolReply *reply, + void *data, + gsize data_len); +guint32 g_vfs_channel_get_current_seq_nr (GVfsChannel *channel); + +/* TODO: i/o priority? */ + +G_END_DECLS + +#endif /* __G_VFS_CHANNEL_H__ */ diff --git a/daemon/gvfsjobcloseread.c b/daemon/gvfsjobcloseread.c index e39846ae..eb2c81a2 100644 --- a/daemon/gvfsjobcloseread.c +++ b/daemon/gvfsjobcloseread.c @@ -73,7 +73,7 @@ send_reply (GVfsJob *job) g_print ("job_close_read send reply\n"); if (job->failed) - g_vfs_read_channel_send_error (op_job->channel, job->error); + g_vfs_channel_send_error (G_VFS_CHANNEL (op_job->channel), job->error); else g_vfs_read_channel_send_closed (op_job->channel); } diff --git a/daemon/gvfsjobopenforread.c b/daemon/gvfsjobopenforread.c index 8450fe54..ce045b22 100644 --- a/daemon/gvfsjobopenforread.c +++ b/daemon/gvfsjobopenforread.c @@ -154,15 +154,9 @@ create_reply (GVfsJob *job, g_assert (open_job->backend_handle != NULL); error = NULL; - channel = g_vfs_read_channel_new (open_job->backend, &error); - if (channel == NULL) - { - reply = _dbus_message_new_error_from_gerror (message, error); - g_error_free (error); - return reply; - } + channel = g_vfs_read_channel_new (open_job->backend); - remote_fd = g_vfs_read_channel_steal_remote_fd (channel); + remote_fd = g_vfs_channel_steal_remote_fd (G_VFS_CHANNEL (channel)); if (!dbus_connection_send_fd (connection, remote_fd, &fd_id, &error)) @@ -182,7 +176,7 @@ create_reply (GVfsJob *job, DBUS_TYPE_BOOLEAN, &can_seek, DBUS_TYPE_INVALID); - g_vfs_read_channel_set_backend_handle (channel, open_job->backend_handle); + g_vfs_channel_set_backend_handle (G_VFS_CHANNEL (channel), open_job->backend_handle); open_job->backend_handle = NULL; open_job->read_channel = channel; diff --git a/daemon/gvfsjobread.c b/daemon/gvfsjobread.c index 1cdb920b..26c87159 100644 --- a/daemon/gvfsjobread.c +++ b/daemon/gvfsjobread.c @@ -77,7 +77,7 @@ send_reply (GVfsJob *job) g_print ("job_read send reply, %d bytes\n", op_job->data_count); if (job->failed) - g_vfs_read_channel_send_error (op_job->channel, job->error); + g_vfs_channel_send_error (G_VFS_CHANNEL (op_job->channel), job->error); else { g_vfs_read_channel_send_data (op_job->channel, diff --git a/daemon/gvfsjobseekread.c b/daemon/gvfsjobseekread.c index 2314e264..de197ec3 100644 --- a/daemon/gvfsjobseekread.c +++ b/daemon/gvfsjobseekread.c @@ -77,7 +77,7 @@ send_reply (GVfsJob *job) g_print ("job_seek_read send reply, pos %d\n", (int)op_job->final_offset); if (job->failed) - g_vfs_read_channel_send_error (op_job->channel, job->error); + g_vfs_channel_send_error (G_VFS_CHANNEL (op_job->channel), job->error); else { g_vfs_read_channel_send_seek_offset (op_job->channel, diff --git a/daemon/gvfsreadchannel.c b/daemon/gvfsreadchannel.c index 4c6362e4..b3b2931f 100644 --- a/daemon/gvfsreadchannel.c +++ b/daemon/gvfsreadchannel.c @@ -21,161 +21,94 @@ #include <gvfsjobseekread.h> #include <gvfsjobcloseread.h> -static void g_vfs_read_channel_job_source_iface_init (GVfsJobSourceIface *iface); - -G_DEFINE_TYPE_WITH_CODE (GVfsReadChannel, g_vfs_read_channel, G_TYPE_OBJECT, - G_IMPLEMENT_INTERFACE (G_TYPE_VFS_JOB_SOURCE, - g_vfs_read_channel_job_source_iface_init)) - -enum { - PROP_0, -}; - -typedef struct +struct _GVfsReadChannel { - GVfsReadChannel *read_channel; - GInputStream *command_stream; - char buffer[G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE]; - int buffer_size; -} RequestReader; + GVfsChannel parent_instance; -struct _GVfsReadChannelPrivate -{ - GVfsBackend *backend; - gboolean connection_closed; - GInputStream *command_stream; - GOutputStream *reply_stream; - int remote_fd; int seek_generation; - - GVfsBackendHandle backend_handle; - GVfsJob *current_job; - guint32 current_job_seq_nr; - - RequestReader *request_reader; - - char reply_buffer[G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE]; - int reply_buffer_pos; - - char *output_data; /* Owned by job */ - gsize output_data_size; - gsize output_data_pos; }; -static void start_request_reader (GVfsReadChannel *channel); +G_DEFINE_TYPE (GVfsReadChannel, g_vfs_read_channel, G_TYPE_VFS_CHANNEL) +static GVfsJob *read_channel_close (GVfsChannel *channel); +static GVfsJob *read_channel_handle_request (GVfsChannel *channel, + guint32 command, + guint32 seq_nr, + guint32 arg1, + guint32 arg2, + gpointer data, + gsize data_len, + GError **error); + static void g_vfs_read_channel_finalize (GObject *object) { GVfsReadChannel *read_channel; read_channel = G_VFS_READ_CHANNEL (object); - - if (read_channel->priv->current_job) - g_object_unref (read_channel->priv->current_job); - read_channel->priv->current_job = NULL; - - if (read_channel->priv->reply_stream) - g_object_unref (read_channel->priv->reply_stream); - read_channel->priv->reply_stream = NULL; - - if (read_channel->priv->request_reader) - read_channel->priv->request_reader->read_channel = NULL; - read_channel->priv->request_reader = NULL; - - if (read_channel->priv->command_stream) - g_object_unref (read_channel->priv->command_stream); - read_channel->priv->command_stream = NULL; - - if (read_channel->priv->remote_fd != -1) - close (read_channel->priv->remote_fd); - - g_assert (read_channel->priv->backend_handle == NULL); if (G_OBJECT_CLASS (g_vfs_read_channel_parent_class)->finalize) (*G_OBJECT_CLASS (g_vfs_read_channel_parent_class)->finalize) (object); } static void -g_vfs_read_channel_job_source_iface_init (GVfsJobSourceIface *iface) -{ -} - -static void g_vfs_read_channel_class_init (GVfsReadChannelClass *klass) { GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GVfsChannelClass *channel_class = G_VFS_CHANNEL_CLASS (klass); - g_type_class_add_private (klass, sizeof (GVfsReadChannelPrivate)); - gobject_class->finalize = g_vfs_read_channel_finalize; + channel_class->close = read_channel_close; + channel_class->handle_request = read_channel_handle_request; + } static void g_vfs_read_channel_init (GVfsReadChannel *channel) { - channel->priv = G_TYPE_INSTANCE_GET_PRIVATE (channel, - G_TYPE_VFS_READ_CHANNEL, - GVfsReadChannelPrivate); - channel->priv->remote_fd = -1; } -static void -g_vfs_read_channel_connection_closed (GVfsReadChannel *channel) +static GVfsJob * +read_channel_close (GVfsChannel *channel) { - if (channel->priv->connection_closed) - return; - channel->priv->connection_closed = TRUE; - - if (channel->priv->current_job == NULL && - channel->priv->backend_handle != NULL) - { - channel->priv->current_job = g_vfs_job_close_read_new (channel, channel->priv->backend_handle, channel->priv->backend); - channel->priv->current_job_seq_nr = 0; - g_vfs_job_source_new_job (G_VFS_JOB_SOURCE (channel), channel->priv->current_job); - } - /* Otherwise we'll close when current_job is finished */ -} + return g_vfs_job_close_read_new (G_VFS_READ_CHANNEL (channel), + g_vfs_channel_get_backend_handle (channel), + g_vfs_channel_get_backend (channel)); +} -static void -got_command (GVfsReadChannel *channel, - guint32 command, - guint32 seq_nr, - guint32 arg1, - guint32 arg2) +static GVfsJob * +read_channel_handle_request (GVfsChannel *channel, + guint32 command, + guint32 seq_nr, + guint32 arg1, + guint32 arg2, + gpointer data, + gsize data_len, + GError **error) { GVfsJob *job; - GError *error; GSeekType seek_type; + GVfsBackendHandle backend_handle; + GVfsBackend *backend; + GVfsReadChannel *read_channel; - g_print ("got_command %d %d %d %d\n", command, seq_nr, arg1, arg2); - - if (channel->priv->current_job != NULL) - { - if (command != G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL) - { - g_warning ("Ignored non-cancel request with outstanding request"); - return; - } - - if (arg1 == channel->priv->current_job_seq_nr) - g_vfs_job_cancel (channel->priv->current_job); - return; - } + read_channel = G_VFS_READ_CHANNEL (channel); + backend_handle = g_vfs_channel_get_backend_handle (channel); + backend = g_vfs_channel_get_backend (channel); job = NULL; switch (command) { case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_READ: - job = g_vfs_job_read_new (channel, - channel->priv->backend_handle, + job = g_vfs_job_read_new (read_channel, + backend_handle, arg1, - channel->priv->backend); + backend); break; case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CLOSE: - job = g_vfs_job_close_read_new (channel, - channel->priv->backend_handle, - channel->priv->backend); + job = g_vfs_job_close_read_new (read_channel, + backend_handle, + backend); break; case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_CUR: case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_END: @@ -186,267 +119,41 @@ got_command (GVfsReadChannel *channel, else if (command == G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_CUR) seek_type = G_SEEK_CUR; - channel->priv->seek_generation++; - job = g_vfs_job_seek_read_new (channel, - channel->priv->backend_handle, + read_channel->seek_generation++; + job = g_vfs_job_seek_read_new (read_channel, + backend_handle, seek_type, ((goffset)arg1) | (((goffset)arg2) << 32), - channel->priv->backend); - break; - - case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL: - /* Ignore cancel with no outstanding job */ + backend); break; default: - error = NULL; - g_set_error (&error, G_VFS_ERROR, + g_set_error (error, G_VFS_ERROR, G_VFS_ERROR_INTERNAL_ERROR, "Unknown stream command %d\n", command); - g_vfs_read_channel_send_error (channel, error); - g_error_free (error); break; } - if (job) - { - channel->priv->current_job = job; - channel->priv->current_job_seq_nr = seq_nr; - g_vfs_job_source_new_job (G_VFS_JOB_SOURCE (channel), channel->priv->current_job); - } -} - -static void -command_read_cb (GInputStream *input_stream, - void *buffer, - gsize count_requested, - gssize count_read, - gpointer data, - GError *error) -{ - RequestReader *reader = data; - GVfsDaemonSocketProtocolRequest *cmd; - guint32 seq_nr; - guint32 command; - guint32 arg1, arg2; - - if (reader->read_channel == NULL) - { - /* ReadChannel was finalized */ - g_object_unref (reader->command_stream); - g_free (reader); - return; - } - - if (count_read <= 0) - { - reader->read_channel->priv->request_reader = NULL; - g_vfs_read_channel_connection_closed (reader->read_channel); - g_object_unref (reader->command_stream); - g_free (reader); - return; - } - - reader->buffer_size += count_read; - - if (reader->buffer_size < G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE) - { - g_input_stream_read_async (reader->command_stream, - reader->buffer + reader->buffer_size, - G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE - reader->buffer_size, - 0, - command_read_cb, - reader, - NULL); - return; - } - - cmd = (GVfsDaemonSocketProtocolRequest *)reader->buffer; - command = g_ntohl (cmd->command); - arg1 = g_ntohl (cmd->arg1); - arg2 = g_ntohl (cmd->arg2); - seq_nr = g_ntohl (cmd->seq_nr); - reader->buffer_size = 0; - - got_command (reader->read_channel, command, seq_nr, arg1, arg2); - - /* Request more commands, so can get cancel requests */ - - reader->buffer_size = 0; - g_input_stream_read_async (reader->command_stream, - reader->buffer + reader->buffer_size, - G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE - reader->buffer_size, - 0, - command_read_cb, - reader, - NULL); -} - -static void -start_request_reader (GVfsReadChannel *channel) -{ - RequestReader *reader; - - reader = g_new0 (RequestReader, 1); - reader->read_channel = channel; - reader->command_stream = g_object_ref (channel->priv->command_stream); - reader->buffer_size = 0; - - g_input_stream_read_async (reader->command_stream, - reader->buffer + reader->buffer_size, - G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE - reader->buffer_size, - 0, - command_read_cb, - reader, - NULL); - - channel->priv->request_reader = reader; -} - - -static void -send_reply_cb (GOutputStream *output_stream, - void *buffer, - gsize bytes_requested, - gssize bytes_written, - gpointer data, - GError *error) -{ - GVfsReadChannel *channel = data; - GVfsJob *job; - - g_print ("send_reply_cb: %d\n", bytes_written); - - if (bytes_written <= 0) - { - g_vfs_read_channel_connection_closed (channel); - goto error_out; - } - - if (channel->priv->reply_buffer_pos < G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE) - { - channel->priv->reply_buffer_pos += bytes_written; - - /* Write more of reply header if needed */ - if (channel->priv->reply_buffer_pos < G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE) - { - g_output_stream_write_async (channel->priv->reply_stream, - channel->priv->reply_buffer + channel->priv->reply_buffer_pos, - G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE - channel->priv->reply_buffer_pos, - 0, - send_reply_cb, channel, - NULL); - return; - } - bytes_written = 0; - } - - channel->priv->output_data_pos += bytes_written; - - /* Write more of output_data if needed */ - if (channel->priv->output_data != NULL && - channel->priv->output_data_pos < channel->priv->output_data_size) - { - g_output_stream_write_async (channel->priv->reply_stream, - channel->priv->output_data + channel->priv->output_data_pos, - channel->priv->output_data_size - channel->priv->output_data_pos, - 0, - send_reply_cb, channel, - NULL); - return; - } - - error_out: - - /* Sent full reply */ - channel->priv->output_data = NULL; - - job = channel->priv->current_job; - channel->priv->current_job = NULL; - g_vfs_job_emit_finished (job); - - if (G_IS_VFS_JOB_CLOSE_READ (job)) - { - g_vfs_job_source_closed (G_VFS_JOB_SOURCE (channel)); - channel->priv->backend_handle = NULL; - } - else if (channel->priv->connection_closed) - { - channel->priv->current_job = g_vfs_job_close_read_new (channel, channel->priv->backend_handle, - channel->priv->backend); - channel->priv->current_job_seq_nr = 0; - g_vfs_job_source_new_job (G_VFS_JOB_SOURCE (channel), channel->priv->current_job); - } - - g_object_unref (job); - g_print ("Sent reply\n"); -} - -/* Might be called on an i/o thread */ -static void -send_reply (GVfsReadChannel *channel, - gboolean use_header, - char *data, - gsize data_len) -{ - - channel->priv->output_data = data; - channel->priv->output_data_size = data_len; - channel->priv->output_data_pos = 0; - - if (use_header) - { - channel->priv->reply_buffer_pos = 0; - - g_output_stream_write_async (channel->priv->reply_stream, - channel->priv->reply_buffer, - G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE, - 0, - send_reply_cb, channel, - NULL); - } - else - { - channel->priv->reply_buffer_pos = G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE; - - g_output_stream_write_async (channel->priv->reply_stream, - channel->priv->output_data, - channel->priv->output_data_size, - 0, - send_reply_cb, channel, - NULL); - } + return job; } /* Might be called on an i/o thread */ void -g_vfs_read_channel_send_error (GVfsReadChannel *read_channel, - GError *error) -{ - char *data; - gsize data_len; - - data = g_error_to_daemon_reply (error, read_channel->priv->current_job_seq_nr, &data_len); - send_reply (read_channel, FALSE, data, data_len); -} - - -/* Might be called on an i/o thread - */ -void g_vfs_read_channel_send_seek_offset (GVfsReadChannel *read_channel, - goffset offset) + goffset offset) { - GVfsDaemonSocketProtocolReply *reply; + GVfsDaemonSocketProtocolReply reply; + GVfsChannel *channel; + + channel = G_VFS_CHANNEL (read_channel); - reply = (GVfsDaemonSocketProtocolReply *)read_channel->priv->reply_buffer; - reply->type = g_htonl (G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SEEK_POS); - reply->seq_nr = g_htonl (read_channel->priv->current_job_seq_nr); - reply->arg1 = g_htonl (offset & 0xffffffff); - reply->arg2 = g_htonl (offset >> 32); + reply.type = g_htonl (G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SEEK_POS); + reply.seq_nr = g_htonl (g_vfs_channel_get_current_seq_nr (channel)); + reply.arg1 = g_htonl (offset & 0xffffffff); + reply.arg2 = g_htonl (offset >> 32); - send_reply (read_channel, TRUE, NULL, 0); + g_vfs_channel_send_reply (channel, &reply, NULL, 0); } /* Might be called on an i/o thread @@ -454,15 +161,17 @@ g_vfs_read_channel_send_seek_offset (GVfsReadChannel *read_channel, void g_vfs_read_channel_send_closed (GVfsReadChannel *read_channel) { - GVfsDaemonSocketProtocolReply *reply; + GVfsDaemonSocketProtocolReply reply; + GVfsChannel *channel; + + channel = G_VFS_CHANNEL (read_channel); - reply = (GVfsDaemonSocketProtocolReply *)read_channel->priv->reply_buffer; - reply->type = g_htonl (G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_CLOSED); - reply->seq_nr = g_htonl (read_channel->priv->current_job_seq_nr); - reply->arg1 = g_htonl (0); - reply->arg2 = g_htonl (0); + reply.type = g_htonl (G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_CLOSED); + reply.seq_nr = g_htonl (g_vfs_channel_get_current_seq_nr (channel)); + reply.arg1 = g_htonl (0); + reply.arg2 = g_htonl (0); - send_reply (read_channel, TRUE, NULL, 0); + g_vfs_channel_send_reply (channel, &reply, NULL, 0); } /* Might be called on an i/o thread @@ -472,63 +181,24 @@ g_vfs_read_channel_send_data (GVfsReadChannel *read_channel, char *buffer, gsize count) { - GVfsDaemonSocketProtocolReply *reply; + GVfsDaemonSocketProtocolReply reply; + GVfsChannel *channel; - reply = (GVfsDaemonSocketProtocolReply *)read_channel->priv->reply_buffer; - reply->type = g_htonl (G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_DATA); - reply->seq_nr = g_htonl (read_channel->priv->current_job_seq_nr); - reply->arg1 = g_htonl (count); - reply->arg2 = g_htonl (read_channel->priv->seek_generation); + channel = G_VFS_CHANNEL (read_channel); - send_reply (read_channel, TRUE, buffer, count); -} - -GVfsReadChannel * -g_vfs_read_channel_new (GVfsBackend *backend, - GError **error) -{ - GVfsReadChannel *channel; - int socket_fds[2]; - int ret; - - ret = socketpair (AF_UNIX, SOCK_STREAM, 0, socket_fds); - if (ret == -1) - { - g_set_error (error, G_FILE_ERROR, - g_file_error_from_errno (errno), - _("Error creating socket pair")); - return NULL; - } - - channel = g_object_new (G_TYPE_VFS_READ_CHANNEL, NULL); - channel->priv->backend = backend; - channel->priv->command_stream = g_input_stream_socket_new (socket_fds[0], TRUE); - channel->priv->reply_stream = g_output_stream_socket_new (socket_fds[0], FALSE); - channel->priv->remote_fd = socket_fds[1]; + reply.type = g_htonl (G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_DATA); + reply.seq_nr = g_htonl (g_vfs_channel_get_current_seq_nr (channel)); + reply.arg1 = g_htonl (count); + reply.arg2 = g_htonl (read_channel->seek_generation); - start_request_reader (channel); - - return channel; -} - -int -g_vfs_read_channel_steal_remote_fd (GVfsReadChannel *channel) -{ - int fd; - fd = channel->priv->remote_fd; - channel->priv->remote_fd = -1; - return fd; + g_vfs_channel_send_reply (channel, &reply, buffer, count); } -GVfsBackend * -g_vfs_read_channel_get_backend (GVfsReadChannel *read_channel) -{ - return read_channel->priv->backend; -} -void -g_vfs_read_channel_set_backend_handle (GVfsReadChannel *read_channel, - GVfsBackendHandle backend_handle) +GVfsReadChannel * +g_vfs_read_channel_new (GVfsBackend *backend) { - read_channel->priv->backend_handle = backend_handle; + return g_object_new (G_TYPE_VFS_READ_CHANNEL, + "backend", backend, + NULL); } diff --git a/daemon/gvfsreadchannel.h b/daemon/gvfsreadchannel.h index 11e845bf..b4e74a2b 100644 --- a/daemon/gvfsreadchannel.h +++ b/daemon/gvfsreadchannel.h @@ -4,7 +4,7 @@ #include <glib-object.h> #include <gio/gvfstypes.h> #include <gvfsjob.h> -#include <gvfsbackend.h> +#include <gvfschannel.h> G_BEGIN_DECLS @@ -19,39 +19,21 @@ typedef struct _GVfsReadChannel GVfsReadChannel; typedef struct _GVfsReadChannelClass GVfsReadChannelClass; typedef struct _GVfsReadChannelPrivate GVfsReadChannelPrivate; -struct _GVfsReadChannel -{ - GObject parent_instance; - - GVfsReadChannelPrivate *priv; -}; - struct _GVfsReadChannelClass { - GObjectClass parent_class; + GVfsChannelClass parent_class; }; GType g_vfs_read_channel_get_type (void) G_GNUC_CONST; -GVfsReadChannel *g_vfs_read_channel_new (GVfsBackend *backend, - GError **error); -int g_vfs_read_channel_steal_remote_fd (GVfsReadChannel *read_channel); -GVfsBackend *g_vfs_read_channel_get_backend (GVfsReadChannel *read_channel); -void g_vfs_read_channel_set_backend_handle (GVfsReadChannel *read_channel, - GVfsBackendHandle backend_handle); -gboolean g_vfs_read_channel_has_job (GVfsReadChannel *read_channel); -GVfsJob * g_vfs_read_channel_get_job (GVfsReadChannel *read_channel); +GVfsReadChannel *g_vfs_read_channel_new (GVfsBackend *backend); void g_vfs_read_channel_send_data (GVfsReadChannel *read_channel, char *buffer, gsize count); -void g_vfs_read_channel_send_error (GVfsReadChannel *read_channel, - GError *error); void g_vfs_read_channel_send_closed (GVfsReadChannel *read_channel); void g_vfs_read_channel_send_seek_offset (GVfsReadChannel *read_channel, goffset offset); -/* TODO: i/o priority? */ - G_END_DECLS #endif /* __G_VFS_READ_CHANNEL_H__ */ |