summaryrefslogtreecommitdiff
path: root/client/gdaemonfileoutputstream.c
diff options
context:
space:
mode:
authorAlexander Larsson <alexl@redhat.com>2009-02-27 15:50:51 +0000
committerAlexander Larsson <alexl@src.gnome.org>2009-02-27 15:50:51 +0000
commitafc01fa5e7f0d4626f578f9d92e17f9ba05003f0 (patch)
tree6af7428db5e0d6877c8d9c30946a162f53a76703 /client/gdaemonfileoutputstream.c
parent415187aeda32a168c2270f93b9ebb7b07a785ba7 (diff)
downloadgvfs-afc01fa5e7f0d4626f578f9d92e17f9ba05003f0.tar.gz
Support query info on output streams
2009-02-27 Alexander Larsson <alexl@redhat.com> * client/gdaemonfileoutputstream.c: Support query info on output streams * daemon/Makefile.am: * daemon/gvfsbackend.h: * daemon/gvfsjobqueryinfowrite.[ch]: * daemon/gvfswritechannel.c: Add query info write support. * daemon/gvfsbackendtest.c: Implement writing to files in test backend. Implement query info on write * test/test-query-info-stream.c: Test g_file_output_stream_query_info(). svn path=/trunk/; revision=2260
Diffstat (limited to 'client/gdaemonfileoutputstream.c')
-rw-r--r--client/gdaemonfileoutputstream.c367
1 files changed, 318 insertions, 49 deletions
diff --git a/client/gdaemonfileoutputstream.c b/client/gdaemonfileoutputstream.c
index 558acd56..fc89d0d3 100644
--- a/client/gdaemonfileoutputstream.c
+++ b/client/gdaemonfileoutputstream.c
@@ -40,6 +40,7 @@
#include "gdaemonfileoutputstream.h"
#include "gvfsdaemondbus.h"
#include <gvfsdaemonprotocol.h>
+#include <gvfsfileinfo.h>
#define MAX_WRITE_SIZE (4*1024*1024)
@@ -116,6 +117,26 @@ typedef struct {
guint32 seq_nr;
} CloseOperation;
+typedef enum {
+ QUERY_STATE_INIT = 0,
+ QUERY_STATE_WROTE_REQUEST,
+ QUERY_STATE_HANDLE_INPUT,
+} QueryState;
+
+typedef struct {
+ QueryState state;
+
+ /* Input */
+ char *attributes;
+
+ /* Output */
+ GFileInfo *info;
+ GError *ret_error;
+
+ gboolean sent_cancel;
+
+ guint32 seq_nr;
+} QueryOperation;
typedef struct {
gboolean cancelled;
@@ -150,44 +171,55 @@ struct _GDaemonFileOutputStream {
};
-static gssize g_daemon_file_output_stream_write (GOutputStream *stream,
- const void *buffer,
- gsize count,
- GCancellable *cancellable,
- GError **error);
-static gboolean g_daemon_file_output_stream_close (GOutputStream *stream,
- GCancellable *cancellable,
- GError **error);
-static GFileInfo *g_daemon_file_output_stream_query_info (GFileOutputStream *stream,
- char *attributes,
- GCancellable *cancellable,
- GError **error);
-static char *g_daemon_file_output_stream_get_etag (GFileOutputStream *stream);
-static goffset g_daemon_file_output_stream_tell (GFileOutputStream *stream);
-static gboolean g_daemon_file_output_stream_can_seek (GFileOutputStream *stream);
-static gboolean g_daemon_file_output_stream_seek (GFileOutputStream *stream,
- goffset offset,
- GSeekType type,
- GCancellable *cancellable,
- GError **error);
-static void g_daemon_file_output_stream_write_async (GOutputStream *stream,
- const void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer data);
-static gssize g_daemon_file_output_stream_write_finish (GOutputStream *stream,
- GAsyncResult *result,
- GError **error);
-static void g_daemon_file_output_stream_close_async (GOutputStream *stream,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer data);
-static gboolean g_daemon_file_output_stream_close_finish (GOutputStream *stream,
- GAsyncResult *result,
- GError **error);
+static gssize g_daemon_file_output_stream_write (GOutputStream *stream,
+ const void *buffer,
+ gsize count,
+ GCancellable *cancellable,
+ GError **error);
+static gboolean g_daemon_file_output_stream_close (GOutputStream *stream,
+ GCancellable *cancellable,
+ GError **error);
+static GFileInfo *g_daemon_file_output_stream_query_info (GFileOutputStream *stream,
+ char *attributes,
+ GCancellable *cancellable,
+ GError **error);
+static char *g_daemon_file_output_stream_get_etag (GFileOutputStream *stream);
+static goffset g_daemon_file_output_stream_tell (GFileOutputStream *stream);
+static gboolean g_daemon_file_output_stream_can_seek (GFileOutputStream *stream);
+static gboolean g_daemon_file_output_stream_seek (GFileOutputStream *stream,
+ goffset offset,
+ GSeekType type,
+ GCancellable *cancellable,
+ GError **error);
+static void g_daemon_file_output_stream_write_async (GOutputStream *stream,
+ const void *buffer,
+ gsize count,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer data);
+static gssize g_daemon_file_output_stream_write_finish (GOutputStream *stream,
+ GAsyncResult *result,
+ GError **error);
+static void g_daemon_file_output_stream_close_async (GOutputStream *stream,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer data);
+static gboolean g_daemon_file_output_stream_close_finish (GOutputStream *stream,
+ GAsyncResult *result,
+ GError **error);
+static void g_daemon_file_output_stream_query_info_async (GFileOutputStream *stream,
+ char *attributes,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data);
+static GFileInfo *g_daemon_file_output_stream_query_info_finish (GFileOutputStream *stream,
+ GAsyncResult *result,
+ GError **error);
+
+
G_DEFINE_TYPE (GDaemonFileOutputStream, g_daemon_file_output_stream,
G_TYPE_FILE_OUTPUT_STREAM)
@@ -246,6 +278,8 @@ g_daemon_file_output_stream_class_init (GDaemonFileOutputStreamClass *klass)
file_stream_class->seek = g_daemon_file_output_stream_seek;
file_stream_class->query_info = g_daemon_file_output_stream_query_info;
file_stream_class->get_etag = g_daemon_file_output_stream_get_etag;
+ file_stream_class->query_info_async = g_daemon_file_output_stream_query_info_async;
+ file_stream_class->query_info_finish = g_daemon_file_output_stream_query_info_finish;
}
static void
@@ -317,9 +351,10 @@ get_reply_header_missing_bytes (GString *buffer)
type = g_ntohl (reply->type);
arg2 = g_ntohl (reply->arg2);
- /* ERROR and CLOSED has extra data w/ len in arg2 */
+ /* ERROR, CLOSED and INFO has extra data w/ len in arg2 */
if (type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR ||
- type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_CLOSED)
+ type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_CLOSED ||
+ type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_INFO)
return G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE + arg2 - buffer->len;
return 0;
}
@@ -885,7 +920,7 @@ iterate_seek_state_machine (GDaemonFileOutputStream *file, IOOperationData *io_o
}
/* Got full header */
-
+
{
GVfsDaemonSocketProtocolReply reply;
char *data;
@@ -975,24 +1010,165 @@ g_daemon_file_output_stream_get_etag (GFileOutputStream *stream)
return g_strdup (file->etag);
}
+static StateOp
+iterate_query_state_machine (GDaemonFileOutputStream *file,
+ IOOperationData *io_op,
+ QueryOperation *op)
+{
+ gsize len;
+ guint32 request;
+
+ while (TRUE)
+ {
+ switch (op->state)
+ {
+ /* Initial state for read op */
+ case QUERY_STATE_INIT:
+ request = G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_QUERY_INFO;
+ append_request (file, request,
+ 0,
+ 0,
+ strlen (op->attributes),
+ &op->seq_nr);
+ g_string_append (file->output_buffer,
+ op->attributes);
+
+ op->state = QUERY_STATE_WROTE_REQUEST;
+ io_op->io_buffer = file->output_buffer->str;
+ io_op->io_size = file->output_buffer->len;
+ io_op->io_allow_cancel = TRUE; /* Allow cancel before first byte of request sent */
+ return STATE_OP_WRITE;
+
+ /* wrote parts of output_buffer */
+ case QUERY_STATE_WROTE_REQUEST:
+ if (io_op->io_cancelled)
+ {
+ op->info = NULL;
+ g_set_error_literal (&op->ret_error,
+ G_IO_ERROR,
+ G_IO_ERROR_CANCELLED,
+ _("Operation was cancelled"));
+ return STATE_OP_DONE;
+ }
+
+ if (io_op->io_res < file->output_buffer->len)
+ {
+ g_string_remove_in_front (file->output_buffer,
+ io_op->io_res);
+ io_op->io_buffer = file->output_buffer->str;
+ io_op->io_size = file->output_buffer->len;
+ io_op->io_allow_cancel = FALSE;
+ return STATE_OP_WRITE;
+ }
+ g_string_truncate (file->output_buffer, 0);
+
+ op->state = QUERY_STATE_HANDLE_INPUT;
+ break;
+
+ /* No op */
+ case QUERY_STATE_HANDLE_INPUT:
+ if (io_op->cancelled && !op->sent_cancel)
+ {
+ op->sent_cancel = TRUE;
+ append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL,
+ op->seq_nr, 0, 0, NULL);
+ op->state = QUERY_STATE_WROTE_REQUEST;
+ io_op->io_buffer = file->output_buffer->str;
+ io_op->io_size = file->output_buffer->len;
+ io_op->io_allow_cancel = FALSE;
+ return STATE_OP_WRITE;
+ }
+
+
+ if (io_op->io_res > 0)
+ {
+ gsize unread_size = io_op->io_size - io_op->io_res;
+ g_string_set_size (file->input_buffer,
+ file->input_buffer->len - unread_size);
+ }
+
+ len = get_reply_header_missing_bytes (file->input_buffer);
+ if (len > 0)
+ {
+ gsize current_len = file->input_buffer->len;
+ g_string_set_size (file->input_buffer,
+ current_len + len);
+ io_op->io_buffer = file->input_buffer->str + current_len;
+ io_op->io_size = len;
+ io_op->io_allow_cancel = !op->sent_cancel;
+ return STATE_OP_READ;
+ }
+
+ /* Got full header */
+
+ {
+ GVfsDaemonSocketProtocolReply reply;
+ char *data;
+ data = decode_reply (file->input_buffer, &reply);
+
+ if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR &&
+ reply.seq_nr == op->seq_nr)
+ {
+ op->info = NULL;
+ decode_error (&reply, data, &op->ret_error);
+ g_string_truncate (file->input_buffer, 0);
+ return STATE_OP_DONE;
+ }
+ else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_INFO)
+ {
+ op->info = gvfs_file_info_demarshal (data, reply.arg2);
+ g_string_truncate (file->input_buffer, 0);
+ return STATE_OP_DONE;
+ }
+ /* Ignore other reply types */
+ }
+
+ g_string_truncate (file->input_buffer, 0);
+
+ /* This wasn't interesting, read next reply */
+ op->state = SEEK_STATE_HANDLE_INPUT;
+ break;
+
+ default:
+ g_assert_not_reached ();
+ }
+
+ /* Clear io_op between non-op state switches */
+ io_op->io_size = 0;
+ io_op->io_res = 0;
+ io_op->io_cancelled = FALSE;
+ }
+}
+
static GFileInfo *
g_daemon_file_output_stream_query_info (GFileOutputStream *stream,
char *attributes,
GCancellable *cancellable,
GError **error)
{
-#if 0
- GDaemonFileOutputStream *file;
+ GDaemonFileOutputStream *file;
+ QueryOperation op;
file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
-#endif
+
+ if (g_cancellable_set_error_if_cancelled (cancellable, error))
+ return NULL;
- g_set_error (error,
- G_IO_ERROR,
- G_IO_ERROR_NOT_SUPPORTED,
- _("Query info not supported on stream"));
+ memset (&op, 0, sizeof (op));
+ op.state = QUERY_STATE_INIT;
+ if (attributes)
+ op.attributes = attributes;
+ else
+ op.attributes = "";
+
+ if (!run_sync_state_machine (file, (state_machine_iterator)iterate_query_state_machine,
+ &op, cancellable, error))
+ return NULL; /* IO Error */
- return NULL;
+ if (op.info == NULL)
+ g_propagate_error (error, op.ret_error);
+
+ return op.info;
}
/************************************************************************
@@ -1374,3 +1550,96 @@ g_daemon_file_output_stream_close_finish (GOutputStream *stream,
/* Failures handled in generic close_finish code */
return TRUE;
}
+
+static void
+async_query_done (GOutputStream *stream,
+ gpointer op_data,
+ GAsyncReadyCallback callback,
+ gpointer user_data,
+ GError *io_error)
+{
+ GDaemonFileOutputStream *file;
+ GSimpleAsyncResult *simple;
+ QueryOperation *op;
+ GFileInfo *info;
+ GError *error;
+
+ file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
+
+ op = op_data;
+
+ if (io_error)
+ {
+ info = NULL;
+ error = io_error;
+ }
+ else
+ {
+ info = op->info;
+ error = op->ret_error;
+ }
+
+ simple = g_simple_async_result_new (G_OBJECT (stream),
+ callback, user_data,
+ g_daemon_file_output_stream_query_info_async);
+
+ if (info == NULL)
+ g_simple_async_result_set_from_error (simple, error);
+ else
+ g_simple_async_result_set_op_res_gpointer (simple, info,
+ g_object_unref);
+
+ /* Complete immediately, not in idle, since we're already in a mainloop callout */
+ g_simple_async_result_complete (simple);
+ g_object_unref (simple);
+
+ if (op->ret_error)
+ g_error_free (op->ret_error);
+ g_free (op->attributes);
+ g_free (op);
+}
+
+static void
+g_daemon_file_output_stream_query_info_async (GFileOutputStream *stream,
+ char *attributes,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ GDaemonFileOutputStream *file;
+ QueryOperation *op;
+
+ file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
+
+ op = g_new0 (QueryOperation, 1);
+ op->state = QUERY_STATE_INIT;
+ if (attributes)
+ op->attributes = g_strdup (attributes);
+ else
+ op->attributes = g_strdup ("");
+
+ run_async_state_machine (file,
+ (state_machine_iterator)iterate_query_state_machine,
+ op, io_priority,
+ callback, user_data,
+ cancellable,
+ async_query_done);
+}
+
+static GFileInfo *
+g_daemon_file_output_stream_query_info_finish (GFileOutputStream *stream,
+ GAsyncResult *result,
+ GError **error)
+{
+ GSimpleAsyncResult *simple;
+ GFileInfo *info;
+
+ simple = G_SIMPLE_ASYNC_RESULT (result);
+ g_assert (g_simple_async_result_get_source_tag (simple) == g_daemon_file_output_stream_query_info_async);
+
+ info = g_simple_async_result_get_op_res_gpointer (simple);
+
+ return g_object_ref (info);
+
+}