diff options
author | Alexander Larsson <alexl@redhat.com> | 2009-02-27 15:50:51 +0000 |
---|---|---|
committer | Alexander Larsson <alexl@src.gnome.org> | 2009-02-27 15:50:51 +0000 |
commit | afc01fa5e7f0d4626f578f9d92e17f9ba05003f0 (patch) | |
tree | 6af7428db5e0d6877c8d9c30946a162f53a76703 /client | |
parent | 415187aeda32a168c2270f93b9ebb7b07a785ba7 (diff) | |
download | gvfs-afc01fa5e7f0d4626f578f9d92e17f9ba05003f0.tar.gz |
Support query info on output streams
2009-02-27 Alexander Larsson <alexl@redhat.com>
* client/gdaemonfileoutputstream.c:
Support query info on output streams
* daemon/Makefile.am:
* daemon/gvfsbackend.h:
* daemon/gvfsjobqueryinfowrite.[ch]:
* daemon/gvfswritechannel.c:
Add query info write support.
* daemon/gvfsbackendtest.c:
Implement writing to files in test backend.
Implement query info on write
* test/test-query-info-stream.c:
Test g_file_output_stream_query_info().
svn path=/trunk/; revision=2260
Diffstat (limited to 'client')
-rw-r--r-- | client/gdaemonfileoutputstream.c | 367 |
1 files changed, 318 insertions, 49 deletions
diff --git a/client/gdaemonfileoutputstream.c b/client/gdaemonfileoutputstream.c index 558acd56..fc89d0d3 100644 --- a/client/gdaemonfileoutputstream.c +++ b/client/gdaemonfileoutputstream.c @@ -40,6 +40,7 @@ #include "gdaemonfileoutputstream.h" #include "gvfsdaemondbus.h" #include <gvfsdaemonprotocol.h> +#include <gvfsfileinfo.h> #define MAX_WRITE_SIZE (4*1024*1024) @@ -116,6 +117,26 @@ typedef struct { guint32 seq_nr; } CloseOperation; +typedef enum { + QUERY_STATE_INIT = 0, + QUERY_STATE_WROTE_REQUEST, + QUERY_STATE_HANDLE_INPUT, +} QueryState; + +typedef struct { + QueryState state; + + /* Input */ + char *attributes; + + /* Output */ + GFileInfo *info; + GError *ret_error; + + gboolean sent_cancel; + + guint32 seq_nr; +} QueryOperation; typedef struct { gboolean cancelled; @@ -150,44 +171,55 @@ struct _GDaemonFileOutputStream { }; -static gssize g_daemon_file_output_stream_write (GOutputStream *stream, - const void *buffer, - gsize count, - GCancellable *cancellable, - GError **error); -static gboolean g_daemon_file_output_stream_close (GOutputStream *stream, - GCancellable *cancellable, - GError **error); -static GFileInfo *g_daemon_file_output_stream_query_info (GFileOutputStream *stream, - char *attributes, - GCancellable *cancellable, - GError **error); -static char *g_daemon_file_output_stream_get_etag (GFileOutputStream *stream); -static goffset g_daemon_file_output_stream_tell (GFileOutputStream *stream); -static gboolean g_daemon_file_output_stream_can_seek (GFileOutputStream *stream); -static gboolean g_daemon_file_output_stream_seek (GFileOutputStream *stream, - goffset offset, - GSeekType type, - GCancellable *cancellable, - GError **error); -static void g_daemon_file_output_stream_write_async (GOutputStream *stream, - const void *buffer, - gsize count, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer data); -static gssize g_daemon_file_output_stream_write_finish (GOutputStream *stream, - GAsyncResult *result, - GError **error); -static void g_daemon_file_output_stream_close_async (GOutputStream *stream, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer data); -static gboolean g_daemon_file_output_stream_close_finish (GOutputStream *stream, - GAsyncResult *result, - GError **error); +static gssize g_daemon_file_output_stream_write (GOutputStream *stream, + const void *buffer, + gsize count, + GCancellable *cancellable, + GError **error); +static gboolean g_daemon_file_output_stream_close (GOutputStream *stream, + GCancellable *cancellable, + GError **error); +static GFileInfo *g_daemon_file_output_stream_query_info (GFileOutputStream *stream, + char *attributes, + GCancellable *cancellable, + GError **error); +static char *g_daemon_file_output_stream_get_etag (GFileOutputStream *stream); +static goffset g_daemon_file_output_stream_tell (GFileOutputStream *stream); +static gboolean g_daemon_file_output_stream_can_seek (GFileOutputStream *stream); +static gboolean g_daemon_file_output_stream_seek (GFileOutputStream *stream, + goffset offset, + GSeekType type, + GCancellable *cancellable, + GError **error); +static void g_daemon_file_output_stream_write_async (GOutputStream *stream, + const void *buffer, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer data); +static gssize g_daemon_file_output_stream_write_finish (GOutputStream *stream, + GAsyncResult *result, + GError **error); +static void g_daemon_file_output_stream_close_async (GOutputStream *stream, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer data); +static gboolean g_daemon_file_output_stream_close_finish (GOutputStream *stream, + GAsyncResult *result, + GError **error); +static void g_daemon_file_output_stream_query_info_async (GFileOutputStream *stream, + char *attributes, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); +static GFileInfo *g_daemon_file_output_stream_query_info_finish (GFileOutputStream *stream, + GAsyncResult *result, + GError **error); + + G_DEFINE_TYPE (GDaemonFileOutputStream, g_daemon_file_output_stream, G_TYPE_FILE_OUTPUT_STREAM) @@ -246,6 +278,8 @@ g_daemon_file_output_stream_class_init (GDaemonFileOutputStreamClass *klass) file_stream_class->seek = g_daemon_file_output_stream_seek; file_stream_class->query_info = g_daemon_file_output_stream_query_info; file_stream_class->get_etag = g_daemon_file_output_stream_get_etag; + file_stream_class->query_info_async = g_daemon_file_output_stream_query_info_async; + file_stream_class->query_info_finish = g_daemon_file_output_stream_query_info_finish; } static void @@ -317,9 +351,10 @@ get_reply_header_missing_bytes (GString *buffer) type = g_ntohl (reply->type); arg2 = g_ntohl (reply->arg2); - /* ERROR and CLOSED has extra data w/ len in arg2 */ + /* ERROR, CLOSED and INFO has extra data w/ len in arg2 */ if (type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR || - type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_CLOSED) + type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_CLOSED || + type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_INFO) return G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE + arg2 - buffer->len; return 0; } @@ -885,7 +920,7 @@ iterate_seek_state_machine (GDaemonFileOutputStream *file, IOOperationData *io_o } /* Got full header */ - + { GVfsDaemonSocketProtocolReply reply; char *data; @@ -975,24 +1010,165 @@ g_daemon_file_output_stream_get_etag (GFileOutputStream *stream) return g_strdup (file->etag); } +static StateOp +iterate_query_state_machine (GDaemonFileOutputStream *file, + IOOperationData *io_op, + QueryOperation *op) +{ + gsize len; + guint32 request; + + while (TRUE) + { + switch (op->state) + { + /* Initial state for read op */ + case QUERY_STATE_INIT: + request = G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_QUERY_INFO; + append_request (file, request, + 0, + 0, + strlen (op->attributes), + &op->seq_nr); + g_string_append (file->output_buffer, + op->attributes); + + op->state = QUERY_STATE_WROTE_REQUEST; + io_op->io_buffer = file->output_buffer->str; + io_op->io_size = file->output_buffer->len; + io_op->io_allow_cancel = TRUE; /* Allow cancel before first byte of request sent */ + return STATE_OP_WRITE; + + /* wrote parts of output_buffer */ + case QUERY_STATE_WROTE_REQUEST: + if (io_op->io_cancelled) + { + op->info = NULL; + g_set_error_literal (&op->ret_error, + G_IO_ERROR, + G_IO_ERROR_CANCELLED, + _("Operation was cancelled")); + return STATE_OP_DONE; + } + + if (io_op->io_res < file->output_buffer->len) + { + g_string_remove_in_front (file->output_buffer, + io_op->io_res); + io_op->io_buffer = file->output_buffer->str; + io_op->io_size = file->output_buffer->len; + io_op->io_allow_cancel = FALSE; + return STATE_OP_WRITE; + } + g_string_truncate (file->output_buffer, 0); + + op->state = QUERY_STATE_HANDLE_INPUT; + break; + + /* No op */ + case QUERY_STATE_HANDLE_INPUT: + if (io_op->cancelled && !op->sent_cancel) + { + op->sent_cancel = TRUE; + append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL, + op->seq_nr, 0, 0, NULL); + op->state = QUERY_STATE_WROTE_REQUEST; + io_op->io_buffer = file->output_buffer->str; + io_op->io_size = file->output_buffer->len; + io_op->io_allow_cancel = FALSE; + return STATE_OP_WRITE; + } + + + if (io_op->io_res > 0) + { + gsize unread_size = io_op->io_size - io_op->io_res; + g_string_set_size (file->input_buffer, + file->input_buffer->len - unread_size); + } + + len = get_reply_header_missing_bytes (file->input_buffer); + if (len > 0) + { + gsize current_len = file->input_buffer->len; + g_string_set_size (file->input_buffer, + current_len + len); + io_op->io_buffer = file->input_buffer->str + current_len; + io_op->io_size = len; + io_op->io_allow_cancel = !op->sent_cancel; + return STATE_OP_READ; + } + + /* Got full header */ + + { + GVfsDaemonSocketProtocolReply reply; + char *data; + data = decode_reply (file->input_buffer, &reply); + + if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR && + reply.seq_nr == op->seq_nr) + { + op->info = NULL; + decode_error (&reply, data, &op->ret_error); + g_string_truncate (file->input_buffer, 0); + return STATE_OP_DONE; + } + else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_INFO) + { + op->info = gvfs_file_info_demarshal (data, reply.arg2); + g_string_truncate (file->input_buffer, 0); + return STATE_OP_DONE; + } + /* Ignore other reply types */ + } + + g_string_truncate (file->input_buffer, 0); + + /* This wasn't interesting, read next reply */ + op->state = SEEK_STATE_HANDLE_INPUT; + break; + + default: + g_assert_not_reached (); + } + + /* Clear io_op between non-op state switches */ + io_op->io_size = 0; + io_op->io_res = 0; + io_op->io_cancelled = FALSE; + } +} + static GFileInfo * g_daemon_file_output_stream_query_info (GFileOutputStream *stream, char *attributes, GCancellable *cancellable, GError **error) { -#if 0 - GDaemonFileOutputStream *file; + GDaemonFileOutputStream *file; + QueryOperation op; file = G_DAEMON_FILE_OUTPUT_STREAM (stream); -#endif + + if (g_cancellable_set_error_if_cancelled (cancellable, error)) + return NULL; - g_set_error (error, - G_IO_ERROR, - G_IO_ERROR_NOT_SUPPORTED, - _("Query info not supported on stream")); + memset (&op, 0, sizeof (op)); + op.state = QUERY_STATE_INIT; + if (attributes) + op.attributes = attributes; + else + op.attributes = ""; + + if (!run_sync_state_machine (file, (state_machine_iterator)iterate_query_state_machine, + &op, cancellable, error)) + return NULL; /* IO Error */ - return NULL; + if (op.info == NULL) + g_propagate_error (error, op.ret_error); + + return op.info; } /************************************************************************ @@ -1374,3 +1550,96 @@ g_daemon_file_output_stream_close_finish (GOutputStream *stream, /* Failures handled in generic close_finish code */ return TRUE; } + +static void +async_query_done (GOutputStream *stream, + gpointer op_data, + GAsyncReadyCallback callback, + gpointer user_data, + GError *io_error) +{ + GDaemonFileOutputStream *file; + GSimpleAsyncResult *simple; + QueryOperation *op; + GFileInfo *info; + GError *error; + + file = G_DAEMON_FILE_OUTPUT_STREAM (stream); + + op = op_data; + + if (io_error) + { + info = NULL; + error = io_error; + } + else + { + info = op->info; + error = op->ret_error; + } + + simple = g_simple_async_result_new (G_OBJECT (stream), + callback, user_data, + g_daemon_file_output_stream_query_info_async); + + if (info == NULL) + g_simple_async_result_set_from_error (simple, error); + else + g_simple_async_result_set_op_res_gpointer (simple, info, + g_object_unref); + + /* Complete immediately, not in idle, since we're already in a mainloop callout */ + g_simple_async_result_complete (simple); + g_object_unref (simple); + + if (op->ret_error) + g_error_free (op->ret_error); + g_free (op->attributes); + g_free (op); +} + +static void +g_daemon_file_output_stream_query_info_async (GFileOutputStream *stream, + char *attributes, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GDaemonFileOutputStream *file; + QueryOperation *op; + + file = G_DAEMON_FILE_OUTPUT_STREAM (stream); + + op = g_new0 (QueryOperation, 1); + op->state = QUERY_STATE_INIT; + if (attributes) + op->attributes = g_strdup (attributes); + else + op->attributes = g_strdup (""); + + run_async_state_machine (file, + (state_machine_iterator)iterate_query_state_machine, + op, io_priority, + callback, user_data, + cancellable, + async_query_done); +} + +static GFileInfo * +g_daemon_file_output_stream_query_info_finish (GFileOutputStream *stream, + GAsyncResult *result, + GError **error) +{ + GSimpleAsyncResult *simple; + GFileInfo *info; + + simple = G_SIMPLE_ASYNC_RESULT (result); + g_assert (g_simple_async_result_get_source_tag (simple) == g_daemon_file_output_stream_query_info_async); + + info = g_simple_async_result_get_op_res_gpointer (simple); + + return g_object_ref (info); + +} |