diff options
author | Alexander Larsson <alexl@src.gnome.org> | 2007-09-13 11:57:40 +0000 |
---|---|---|
committer | Alexander Larsson <alexl@src.gnome.org> | 2007-09-13 11:57:40 +0000 |
commit | e8762e3d82c5cde9d548383bdb3df2d9222e2708 (patch) | |
tree | 99dc9529164c0e182f31f65644763b8aa8286748 /daemon/gvfsreadchannel.c | |
parent | fd9e5a6cd9eda6908eb48cbd433434f192b90922 (diff) | |
download | gvfs-e8762e3d82c5cde9d548383bdb3df2d9222e2708.tar.gz |
Move generic code from GVfsReadChannel to GVfsChannel base class
Original git commit by Alexander Larsson <alex@greebo.(none)> at 1171634935 +0100
svn path=/trunk/; revision=354
Diffstat (limited to 'daemon/gvfsreadchannel.c')
-rw-r--r-- | daemon/gvfsreadchannel.c | 494 |
1 files changed, 82 insertions, 412 deletions
diff --git a/daemon/gvfsreadchannel.c b/daemon/gvfsreadchannel.c index 4c6362e4..b3b2931f 100644 --- a/daemon/gvfsreadchannel.c +++ b/daemon/gvfsreadchannel.c @@ -21,161 +21,94 @@ #include <gvfsjobseekread.h> #include <gvfsjobcloseread.h> -static void g_vfs_read_channel_job_source_iface_init (GVfsJobSourceIface *iface); - -G_DEFINE_TYPE_WITH_CODE (GVfsReadChannel, g_vfs_read_channel, G_TYPE_OBJECT, - G_IMPLEMENT_INTERFACE (G_TYPE_VFS_JOB_SOURCE, - g_vfs_read_channel_job_source_iface_init)) - -enum { - PROP_0, -}; - -typedef struct +struct _GVfsReadChannel { - GVfsReadChannel *read_channel; - GInputStream *command_stream; - char buffer[G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE]; - int buffer_size; -} RequestReader; + GVfsChannel parent_instance; -struct _GVfsReadChannelPrivate -{ - GVfsBackend *backend; - gboolean connection_closed; - GInputStream *command_stream; - GOutputStream *reply_stream; - int remote_fd; int seek_generation; - - GVfsBackendHandle backend_handle; - GVfsJob *current_job; - guint32 current_job_seq_nr; - - RequestReader *request_reader; - - char reply_buffer[G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE]; - int reply_buffer_pos; - - char *output_data; /* Owned by job */ - gsize output_data_size; - gsize output_data_pos; }; -static void start_request_reader (GVfsReadChannel *channel); +G_DEFINE_TYPE (GVfsReadChannel, g_vfs_read_channel, G_TYPE_VFS_CHANNEL) +static GVfsJob *read_channel_close (GVfsChannel *channel); +static GVfsJob *read_channel_handle_request (GVfsChannel *channel, + guint32 command, + guint32 seq_nr, + guint32 arg1, + guint32 arg2, + gpointer data, + gsize data_len, + GError **error); + static void g_vfs_read_channel_finalize (GObject *object) { GVfsReadChannel *read_channel; read_channel = G_VFS_READ_CHANNEL (object); - - if (read_channel->priv->current_job) - g_object_unref (read_channel->priv->current_job); - read_channel->priv->current_job = NULL; - - if (read_channel->priv->reply_stream) - g_object_unref (read_channel->priv->reply_stream); - read_channel->priv->reply_stream = NULL; - - if (read_channel->priv->request_reader) - read_channel->priv->request_reader->read_channel = NULL; - read_channel->priv->request_reader = NULL; - - if (read_channel->priv->command_stream) - g_object_unref (read_channel->priv->command_stream); - read_channel->priv->command_stream = NULL; - - if (read_channel->priv->remote_fd != -1) - close (read_channel->priv->remote_fd); - - g_assert (read_channel->priv->backend_handle == NULL); if (G_OBJECT_CLASS (g_vfs_read_channel_parent_class)->finalize) (*G_OBJECT_CLASS (g_vfs_read_channel_parent_class)->finalize) (object); } static void -g_vfs_read_channel_job_source_iface_init (GVfsJobSourceIface *iface) -{ -} - -static void g_vfs_read_channel_class_init (GVfsReadChannelClass *klass) { GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GVfsChannelClass *channel_class = G_VFS_CHANNEL_CLASS (klass); - g_type_class_add_private (klass, sizeof (GVfsReadChannelPrivate)); - gobject_class->finalize = g_vfs_read_channel_finalize; + channel_class->close = read_channel_close; + channel_class->handle_request = read_channel_handle_request; + } static void g_vfs_read_channel_init (GVfsReadChannel *channel) { - channel->priv = G_TYPE_INSTANCE_GET_PRIVATE (channel, - G_TYPE_VFS_READ_CHANNEL, - GVfsReadChannelPrivate); - channel->priv->remote_fd = -1; } -static void -g_vfs_read_channel_connection_closed (GVfsReadChannel *channel) +static GVfsJob * +read_channel_close (GVfsChannel *channel) { - if (channel->priv->connection_closed) - return; - channel->priv->connection_closed = TRUE; - - if (channel->priv->current_job == NULL && - channel->priv->backend_handle != NULL) - { - channel->priv->current_job = g_vfs_job_close_read_new (channel, channel->priv->backend_handle, channel->priv->backend); - channel->priv->current_job_seq_nr = 0; - g_vfs_job_source_new_job (G_VFS_JOB_SOURCE (channel), channel->priv->current_job); - } - /* Otherwise we'll close when current_job is finished */ -} + return g_vfs_job_close_read_new (G_VFS_READ_CHANNEL (channel), + g_vfs_channel_get_backend_handle (channel), + g_vfs_channel_get_backend (channel)); +} -static void -got_command (GVfsReadChannel *channel, - guint32 command, - guint32 seq_nr, - guint32 arg1, - guint32 arg2) +static GVfsJob * +read_channel_handle_request (GVfsChannel *channel, + guint32 command, + guint32 seq_nr, + guint32 arg1, + guint32 arg2, + gpointer data, + gsize data_len, + GError **error) { GVfsJob *job; - GError *error; GSeekType seek_type; + GVfsBackendHandle backend_handle; + GVfsBackend *backend; + GVfsReadChannel *read_channel; - g_print ("got_command %d %d %d %d\n", command, seq_nr, arg1, arg2); - - if (channel->priv->current_job != NULL) - { - if (command != G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL) - { - g_warning ("Ignored non-cancel request with outstanding request"); - return; - } - - if (arg1 == channel->priv->current_job_seq_nr) - g_vfs_job_cancel (channel->priv->current_job); - return; - } + read_channel = G_VFS_READ_CHANNEL (channel); + backend_handle = g_vfs_channel_get_backend_handle (channel); + backend = g_vfs_channel_get_backend (channel); job = NULL; switch (command) { case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_READ: - job = g_vfs_job_read_new (channel, - channel->priv->backend_handle, + job = g_vfs_job_read_new (read_channel, + backend_handle, arg1, - channel->priv->backend); + backend); break; case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CLOSE: - job = g_vfs_job_close_read_new (channel, - channel->priv->backend_handle, - channel->priv->backend); + job = g_vfs_job_close_read_new (read_channel, + backend_handle, + backend); break; case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_CUR: case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_END: @@ -186,267 +119,41 @@ got_command (GVfsReadChannel *channel, else if (command == G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_CUR) seek_type = G_SEEK_CUR; - channel->priv->seek_generation++; - job = g_vfs_job_seek_read_new (channel, - channel->priv->backend_handle, + read_channel->seek_generation++; + job = g_vfs_job_seek_read_new (read_channel, + backend_handle, seek_type, ((goffset)arg1) | (((goffset)arg2) << 32), - channel->priv->backend); - break; - - case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL: - /* Ignore cancel with no outstanding job */ + backend); break; default: - error = NULL; - g_set_error (&error, G_VFS_ERROR, + g_set_error (error, G_VFS_ERROR, G_VFS_ERROR_INTERNAL_ERROR, "Unknown stream command %d\n", command); - g_vfs_read_channel_send_error (channel, error); - g_error_free (error); break; } - if (job) - { - channel->priv->current_job = job; - channel->priv->current_job_seq_nr = seq_nr; - g_vfs_job_source_new_job (G_VFS_JOB_SOURCE (channel), channel->priv->current_job); - } -} - -static void -command_read_cb (GInputStream *input_stream, - void *buffer, - gsize count_requested, - gssize count_read, - gpointer data, - GError *error) -{ - RequestReader *reader = data; - GVfsDaemonSocketProtocolRequest *cmd; - guint32 seq_nr; - guint32 command; - guint32 arg1, arg2; - - if (reader->read_channel == NULL) - { - /* ReadChannel was finalized */ - g_object_unref (reader->command_stream); - g_free (reader); - return; - } - - if (count_read <= 0) - { - reader->read_channel->priv->request_reader = NULL; - g_vfs_read_channel_connection_closed (reader->read_channel); - g_object_unref (reader->command_stream); - g_free (reader); - return; - } - - reader->buffer_size += count_read; - - if (reader->buffer_size < G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE) - { - g_input_stream_read_async (reader->command_stream, - reader->buffer + reader->buffer_size, - G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE - reader->buffer_size, - 0, - command_read_cb, - reader, - NULL); - return; - } - - cmd = (GVfsDaemonSocketProtocolRequest *)reader->buffer; - command = g_ntohl (cmd->command); - arg1 = g_ntohl (cmd->arg1); - arg2 = g_ntohl (cmd->arg2); - seq_nr = g_ntohl (cmd->seq_nr); - reader->buffer_size = 0; - - got_command (reader->read_channel, command, seq_nr, arg1, arg2); - - /* Request more commands, so can get cancel requests */ - - reader->buffer_size = 0; - g_input_stream_read_async (reader->command_stream, - reader->buffer + reader->buffer_size, - G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE - reader->buffer_size, - 0, - command_read_cb, - reader, - NULL); -} - -static void -start_request_reader (GVfsReadChannel *channel) -{ - RequestReader *reader; - - reader = g_new0 (RequestReader, 1); - reader->read_channel = channel; - reader->command_stream = g_object_ref (channel->priv->command_stream); - reader->buffer_size = 0; - - g_input_stream_read_async (reader->command_stream, - reader->buffer + reader->buffer_size, - G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE - reader->buffer_size, - 0, - command_read_cb, - reader, - NULL); - - channel->priv->request_reader = reader; -} - - -static void -send_reply_cb (GOutputStream *output_stream, - void *buffer, - gsize bytes_requested, - gssize bytes_written, - gpointer data, - GError *error) -{ - GVfsReadChannel *channel = data; - GVfsJob *job; - - g_print ("send_reply_cb: %d\n", bytes_written); - - if (bytes_written <= 0) - { - g_vfs_read_channel_connection_closed (channel); - goto error_out; - } - - if (channel->priv->reply_buffer_pos < G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE) - { - channel->priv->reply_buffer_pos += bytes_written; - - /* Write more of reply header if needed */ - if (channel->priv->reply_buffer_pos < G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE) - { - g_output_stream_write_async (channel->priv->reply_stream, - channel->priv->reply_buffer + channel->priv->reply_buffer_pos, - G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE - channel->priv->reply_buffer_pos, - 0, - send_reply_cb, channel, - NULL); - return; - } - bytes_written = 0; - } - - channel->priv->output_data_pos += bytes_written; - - /* Write more of output_data if needed */ - if (channel->priv->output_data != NULL && - channel->priv->output_data_pos < channel->priv->output_data_size) - { - g_output_stream_write_async (channel->priv->reply_stream, - channel->priv->output_data + channel->priv->output_data_pos, - channel->priv->output_data_size - channel->priv->output_data_pos, - 0, - send_reply_cb, channel, - NULL); - return; - } - - error_out: - - /* Sent full reply */ - channel->priv->output_data = NULL; - - job = channel->priv->current_job; - channel->priv->current_job = NULL; - g_vfs_job_emit_finished (job); - - if (G_IS_VFS_JOB_CLOSE_READ (job)) - { - g_vfs_job_source_closed (G_VFS_JOB_SOURCE (channel)); - channel->priv->backend_handle = NULL; - } - else if (channel->priv->connection_closed) - { - channel->priv->current_job = g_vfs_job_close_read_new (channel, channel->priv->backend_handle, - channel->priv->backend); - channel->priv->current_job_seq_nr = 0; - g_vfs_job_source_new_job (G_VFS_JOB_SOURCE (channel), channel->priv->current_job); - } - - g_object_unref (job); - g_print ("Sent reply\n"); -} - -/* Might be called on an i/o thread */ -static void -send_reply (GVfsReadChannel *channel, - gboolean use_header, - char *data, - gsize data_len) -{ - - channel->priv->output_data = data; - channel->priv->output_data_size = data_len; - channel->priv->output_data_pos = 0; - - if (use_header) - { - channel->priv->reply_buffer_pos = 0; - - g_output_stream_write_async (channel->priv->reply_stream, - channel->priv->reply_buffer, - G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE, - 0, - send_reply_cb, channel, - NULL); - } - else - { - channel->priv->reply_buffer_pos = G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE; - - g_output_stream_write_async (channel->priv->reply_stream, - channel->priv->output_data, - channel->priv->output_data_size, - 0, - send_reply_cb, channel, - NULL); - } + return job; } /* Might be called on an i/o thread */ void -g_vfs_read_channel_send_error (GVfsReadChannel *read_channel, - GError *error) -{ - char *data; - gsize data_len; - - data = g_error_to_daemon_reply (error, read_channel->priv->current_job_seq_nr, &data_len); - send_reply (read_channel, FALSE, data, data_len); -} - - -/* Might be called on an i/o thread - */ -void g_vfs_read_channel_send_seek_offset (GVfsReadChannel *read_channel, - goffset offset) + goffset offset) { - GVfsDaemonSocketProtocolReply *reply; + GVfsDaemonSocketProtocolReply reply; + GVfsChannel *channel; + + channel = G_VFS_CHANNEL (read_channel); - reply = (GVfsDaemonSocketProtocolReply *)read_channel->priv->reply_buffer; - reply->type = g_htonl (G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SEEK_POS); - reply->seq_nr = g_htonl (read_channel->priv->current_job_seq_nr); - reply->arg1 = g_htonl (offset & 0xffffffff); - reply->arg2 = g_htonl (offset >> 32); + reply.type = g_htonl (G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SEEK_POS); + reply.seq_nr = g_htonl (g_vfs_channel_get_current_seq_nr (channel)); + reply.arg1 = g_htonl (offset & 0xffffffff); + reply.arg2 = g_htonl (offset >> 32); - send_reply (read_channel, TRUE, NULL, 0); + g_vfs_channel_send_reply (channel, &reply, NULL, 0); } /* Might be called on an i/o thread @@ -454,15 +161,17 @@ g_vfs_read_channel_send_seek_offset (GVfsReadChannel *read_channel, void g_vfs_read_channel_send_closed (GVfsReadChannel *read_channel) { - GVfsDaemonSocketProtocolReply *reply; + GVfsDaemonSocketProtocolReply reply; + GVfsChannel *channel; + + channel = G_VFS_CHANNEL (read_channel); - reply = (GVfsDaemonSocketProtocolReply *)read_channel->priv->reply_buffer; - reply->type = g_htonl (G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_CLOSED); - reply->seq_nr = g_htonl (read_channel->priv->current_job_seq_nr); - reply->arg1 = g_htonl (0); - reply->arg2 = g_htonl (0); + reply.type = g_htonl (G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_CLOSED); + reply.seq_nr = g_htonl (g_vfs_channel_get_current_seq_nr (channel)); + reply.arg1 = g_htonl (0); + reply.arg2 = g_htonl (0); - send_reply (read_channel, TRUE, NULL, 0); + g_vfs_channel_send_reply (channel, &reply, NULL, 0); } /* Might be called on an i/o thread @@ -472,63 +181,24 @@ g_vfs_read_channel_send_data (GVfsReadChannel *read_channel, char *buffer, gsize count) { - GVfsDaemonSocketProtocolReply *reply; + GVfsDaemonSocketProtocolReply reply; + GVfsChannel *channel; - reply = (GVfsDaemonSocketProtocolReply *)read_channel->priv->reply_buffer; - reply->type = g_htonl (G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_DATA); - reply->seq_nr = g_htonl (read_channel->priv->current_job_seq_nr); - reply->arg1 = g_htonl (count); - reply->arg2 = g_htonl (read_channel->priv->seek_generation); + channel = G_VFS_CHANNEL (read_channel); - send_reply (read_channel, TRUE, buffer, count); -} - -GVfsReadChannel * -g_vfs_read_channel_new (GVfsBackend *backend, - GError **error) -{ - GVfsReadChannel *channel; - int socket_fds[2]; - int ret; - - ret = socketpair (AF_UNIX, SOCK_STREAM, 0, socket_fds); - if (ret == -1) - { - g_set_error (error, G_FILE_ERROR, - g_file_error_from_errno (errno), - _("Error creating socket pair")); - return NULL; - } - - channel = g_object_new (G_TYPE_VFS_READ_CHANNEL, NULL); - channel->priv->backend = backend; - channel->priv->command_stream = g_input_stream_socket_new (socket_fds[0], TRUE); - channel->priv->reply_stream = g_output_stream_socket_new (socket_fds[0], FALSE); - channel->priv->remote_fd = socket_fds[1]; + reply.type = g_htonl (G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_DATA); + reply.seq_nr = g_htonl (g_vfs_channel_get_current_seq_nr (channel)); + reply.arg1 = g_htonl (count); + reply.arg2 = g_htonl (read_channel->seek_generation); - start_request_reader (channel); - - return channel; -} - -int -g_vfs_read_channel_steal_remote_fd (GVfsReadChannel *channel) -{ - int fd; - fd = channel->priv->remote_fd; - channel->priv->remote_fd = -1; - return fd; + g_vfs_channel_send_reply (channel, &reply, buffer, count); } -GVfsBackend * -g_vfs_read_channel_get_backend (GVfsReadChannel *read_channel) -{ - return read_channel->priv->backend; -} -void -g_vfs_read_channel_set_backend_handle (GVfsReadChannel *read_channel, - GVfsBackendHandle backend_handle) +GVfsReadChannel * +g_vfs_read_channel_new (GVfsBackend *backend) { - read_channel->priv->backend_handle = backend_handle; + return g_object_new (G_TYPE_VFS_READ_CHANNEL, + "backend", backend, + NULL); } |