summaryrefslogtreecommitdiff
path: root/trunk/client/gdaemonfileoutputstream.c
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/client/gdaemonfileoutputstream.c')
-rw-r--r--trunk/client/gdaemonfileoutputstream.c1647
1 files changed, 1647 insertions, 0 deletions
diff --git a/trunk/client/gdaemonfileoutputstream.c b/trunk/client/gdaemonfileoutputstream.c
new file mode 100644
index 00000000..27992bf8
--- /dev/null
+++ b/trunk/client/gdaemonfileoutputstream.c
@@ -0,0 +1,1647 @@
+/* GIO - GLib Input, Output and Streaming Library
+ *
+ * Copyright (C) 2006-2007 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ *
+ * Author: Alexander Larsson <alexl@redhat.com>
+ */
+
+#include <config.h>
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#include <glib.h>
+#include <glib/gstdio.h>
+#include <glib/gi18n-lib.h>
+#include <gio/gio.h>
+#include <gio/gunixinputstream.h>
+#include <gio/gunixoutputstream.h>
+#include "gdaemonfileoutputstream.h"
+#include "gvfsdaemondbus.h"
+#include <gvfsdaemonprotocol.h>
+#include <gvfsfileinfo.h>
+
+#define MAX_WRITE_SIZE (4*1024*1024)
+
+typedef enum {
+ STATE_OP_DONE,
+ STATE_OP_READ,
+ STATE_OP_WRITE,
+ STATE_OP_SKIP
+} StateOp;
+
+typedef enum {
+ WRITE_STATE_INIT = 0,
+ WRITE_STATE_WROTE_COMMAND,
+ WRITE_STATE_SEND_DATA,
+ WRITE_STATE_HANDLE_INPUT
+} WriteState;
+
+typedef struct {
+ WriteState state;
+
+ /* Output */
+ const char *buffer;
+ gsize buffer_size;
+ gsize buffer_pos;
+
+ /* Input */
+ gssize ret_val;
+ GError *ret_error;
+
+ gboolean sent_cancel;
+
+ guint32 seq_nr;
+} WriteOperation;
+
+typedef enum {
+ SEEK_STATE_INIT = 0,
+ SEEK_STATE_WROTE_REQUEST,
+ SEEK_STATE_HANDLE_INPUT
+} SeekState;
+
+typedef struct {
+ SeekState state;
+
+ /* Output */
+ goffset offset;
+ GSeekType seek_type;
+ /* Output */
+ gboolean ret_val;
+ GError *ret_error;
+ goffset ret_offset;
+
+ gboolean sent_cancel;
+
+ guint32 seq_nr;
+} SeekOperation;
+
+typedef enum {
+ CLOSE_STATE_INIT = 0,
+ CLOSE_STATE_WROTE_REQUEST,
+ CLOSE_STATE_HANDLE_INPUT
+} CloseState;
+
+typedef struct {
+ CloseState state;
+
+ /* Output */
+
+ /* Output */
+ gboolean ret_val;
+ GError *ret_error;
+
+ gboolean sent_cancel;
+
+ guint32 seq_nr;
+} CloseOperation;
+
+typedef enum {
+ QUERY_STATE_INIT = 0,
+ QUERY_STATE_WROTE_REQUEST,
+ QUERY_STATE_HANDLE_INPUT,
+} QueryState;
+
+typedef struct {
+ QueryState state;
+
+ /* Input */
+ char *attributes;
+
+ /* Output */
+ GFileInfo *info;
+ GError *ret_error;
+
+ gboolean sent_cancel;
+
+ guint32 seq_nr;
+} QueryOperation;
+
+typedef struct {
+ gboolean cancelled;
+
+ char *io_buffer;
+ gsize io_size;
+ gsize io_res;
+ /* The operation always succeeds, or gets cancelled.
+ If we get an error doing the i/o that is considered fatal */
+ gboolean io_allow_cancel;
+ gboolean io_cancelled;
+} IOOperationData;
+
+typedef StateOp (*state_machine_iterator) (GDaemonFileOutputStream *file, IOOperationData *io_op, gpointer data);
+
+struct _GDaemonFileOutputStream {
+ GFileOutputStream parent;
+
+ GOutputStream *command_stream;
+ GInputStream *data_stream;
+ guint can_seek : 1;
+
+ guint32 seq_nr;
+ goffset current_offset;
+
+ gsize input_block_size;
+ GString *input_buffer;
+
+ GString *output_buffer;
+
+ char *etag;
+
+};
+
+static gssize g_daemon_file_output_stream_write (GOutputStream *stream,
+ const void *buffer,
+ gsize count,
+ GCancellable *cancellable,
+ GError **error);
+static gboolean g_daemon_file_output_stream_close (GOutputStream *stream,
+ GCancellable *cancellable,
+ GError **error);
+static GFileInfo *g_daemon_file_output_stream_query_info (GFileOutputStream *stream,
+ const char *attributes,
+ GCancellable *cancellable,
+ GError **error);
+static char *g_daemon_file_output_stream_get_etag (GFileOutputStream *stream);
+static goffset g_daemon_file_output_stream_tell (GFileOutputStream *stream);
+static gboolean g_daemon_file_output_stream_can_seek (GFileOutputStream *stream);
+static gboolean g_daemon_file_output_stream_seek (GFileOutputStream *stream,
+ goffset offset,
+ GSeekType type,
+ GCancellable *cancellable,
+ GError **error);
+static void g_daemon_file_output_stream_write_async (GOutputStream *stream,
+ const void *buffer,
+ gsize count,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer data);
+static gssize g_daemon_file_output_stream_write_finish (GOutputStream *stream,
+ GAsyncResult *result,
+ GError **error);
+static void g_daemon_file_output_stream_close_async (GOutputStream *stream,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer data);
+static gboolean g_daemon_file_output_stream_close_finish (GOutputStream *stream,
+ GAsyncResult *result,
+ GError **error);
+static void g_daemon_file_output_stream_query_info_async (GFileOutputStream *stream,
+ const char *attributes,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data);
+static GFileInfo *g_daemon_file_output_stream_query_info_finish (GFileOutputStream *stream,
+ GAsyncResult *result,
+ GError **error);
+
+
+
+G_DEFINE_TYPE (GDaemonFileOutputStream, g_daemon_file_output_stream,
+ G_TYPE_FILE_OUTPUT_STREAM)
+
+static void
+g_string_remove_in_front (GString *string,
+ gsize bytes)
+{
+ memmove (string->str,
+ string->str + bytes,
+ string->len - bytes);
+ g_string_truncate (string,
+ string->len - bytes);
+}
+
+static void
+g_daemon_file_output_stream_finalize (GObject *object)
+{
+ GDaemonFileOutputStream *file;
+
+ file = G_DAEMON_FILE_OUTPUT_STREAM (object);
+
+ if (file->command_stream)
+ g_object_unref (file->command_stream);
+ if (file->data_stream)
+ g_object_unref (file->data_stream);
+
+ g_string_free (file->input_buffer, TRUE);
+ g_string_free (file->output_buffer, TRUE);
+
+ g_free (file->etag);
+
+ if (G_OBJECT_CLASS (g_daemon_file_output_stream_parent_class)->finalize)
+ (*G_OBJECT_CLASS (g_daemon_file_output_stream_parent_class)->finalize) (object);
+}
+
+static void
+g_daemon_file_output_stream_class_init (GDaemonFileOutputStreamClass *klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GOutputStreamClass *stream_class = G_OUTPUT_STREAM_CLASS (klass);
+ GFileOutputStreamClass *file_stream_class = G_FILE_OUTPUT_STREAM_CLASS (klass);
+
+ gobject_class->finalize = g_daemon_file_output_stream_finalize;
+
+ stream_class->write_fn = g_daemon_file_output_stream_write;
+ stream_class->close_fn = g_daemon_file_output_stream_close;
+
+ stream_class->write_async = g_daemon_file_output_stream_write_async;
+ stream_class->write_finish = g_daemon_file_output_stream_write_finish;
+ stream_class->close_async = g_daemon_file_output_stream_close_async;
+ stream_class->close_finish = g_daemon_file_output_stream_close_finish;
+
+ file_stream_class->tell = g_daemon_file_output_stream_tell;
+ file_stream_class->can_seek = g_daemon_file_output_stream_can_seek;
+ file_stream_class->seek = g_daemon_file_output_stream_seek;
+ file_stream_class->query_info = g_daemon_file_output_stream_query_info;
+ file_stream_class->get_etag = g_daemon_file_output_stream_get_etag;
+ file_stream_class->query_info_async = g_daemon_file_output_stream_query_info_async;
+ file_stream_class->query_info_finish = g_daemon_file_output_stream_query_info_finish;
+}
+
+static void
+g_daemon_file_output_stream_init (GDaemonFileOutputStream *info)
+{
+ info->output_buffer = g_string_new ("");
+ info->input_buffer = g_string_new ("");
+ info->seq_nr = 1;
+}
+
+GFileOutputStream *
+g_daemon_file_output_stream_new (int fd,
+ gboolean can_seek,
+ goffset initial_offset)
+{
+ GDaemonFileOutputStream *stream;
+
+ stream = g_object_new (G_TYPE_DAEMON_FILE_OUTPUT_STREAM, NULL);
+
+ stream->command_stream = g_unix_output_stream_new (fd, FALSE);
+ stream->data_stream = g_unix_input_stream_new (fd, TRUE);
+ stream->can_seek = can_seek;
+ stream->current_offset = initial_offset;
+
+ return G_FILE_OUTPUT_STREAM (stream);
+}
+
+static gboolean
+error_is_cancel (GError *error)
+{
+ return error != NULL &&
+ error->domain == G_IO_ERROR &&
+ error->code == G_IO_ERROR_CANCELLED;
+}
+
+static void
+append_request (GDaemonFileOutputStream *stream, guint32 command,
+ guint32 arg1, guint32 arg2, guint32 data_len, guint32 *seq_nr)
+{
+ GVfsDaemonSocketProtocolRequest cmd;
+
+ g_assert (sizeof (cmd) == G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE);
+
+ if (seq_nr)
+ *seq_nr = stream->seq_nr;
+
+ cmd.command = g_htonl (command);
+ cmd.seq_nr = g_htonl (stream->seq_nr);
+ cmd.arg1 = g_htonl (arg1);
+ cmd.arg2 = g_htonl (arg2);
+ cmd.data_len = g_htonl (data_len);
+
+ stream->seq_nr++;
+
+ g_string_append_len (stream->output_buffer,
+ (char *)&cmd, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE);
+}
+
+static gsize
+get_reply_header_missing_bytes (GString *buffer)
+{
+ GVfsDaemonSocketProtocolReply *reply;
+ guint32 type;
+ guint32 arg2;
+
+ if (buffer->len < G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE)
+ return G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE - buffer->len;
+
+ reply = (GVfsDaemonSocketProtocolReply *)buffer->str;
+
+ type = g_ntohl (reply->type);
+ arg2 = g_ntohl (reply->arg2);
+
+ /* ERROR, CLOSED and INFO has extra data w/ len in arg2 */
+ if (type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR ||
+ type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_CLOSED ||
+ type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_INFO)
+ return G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE + arg2 - buffer->len;
+ return 0;
+}
+
+static char *
+decode_reply (GString *buffer, GVfsDaemonSocketProtocolReply *reply_out)
+{
+ GVfsDaemonSocketProtocolReply *reply;
+ reply = (GVfsDaemonSocketProtocolReply *)buffer->str;
+ reply_out->type = g_ntohl (reply->type);
+ reply_out->seq_nr = g_ntohl (reply->seq_nr);
+ reply_out->arg1 = g_ntohl (reply->arg1);
+ reply_out->arg2 = g_ntohl (reply->arg2);
+
+ return buffer->str + G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE;
+}
+
+static void
+decode_error (GVfsDaemonSocketProtocolReply *reply, char *data, GError **error)
+{
+ g_set_error_literal (error,
+ g_quark_from_string (data),
+ reply->arg1,
+ data + strlen (data) + 1);
+}
+
+
+static gboolean
+run_sync_state_machine (GDaemonFileOutputStream *file,
+ state_machine_iterator iterator,
+ gpointer data,
+ GCancellable *cancellable,
+ GError **error)
+{
+ gssize res;
+ StateOp io_op;
+ IOOperationData io_data;
+ GError *io_error;
+
+ memset (&io_data, 0, sizeof (io_data));
+
+ while (TRUE)
+ {
+ if (cancellable)
+ io_data.cancelled = g_cancellable_is_cancelled (cancellable);
+
+ io_op = iterator (file, &io_data, data);
+
+ if (io_op == STATE_OP_DONE)
+ return TRUE;
+
+ io_error = NULL;
+ if (io_op == STATE_OP_READ)
+ {
+ res = g_input_stream_read (file->data_stream,
+ io_data.io_buffer, io_data.io_size,
+ io_data.io_allow_cancel ? cancellable : NULL,
+ &io_error);
+ }
+ else if (io_op == STATE_OP_SKIP)
+ {
+ res = g_input_stream_skip (file->data_stream,
+ io_data.io_size,
+ io_data.io_allow_cancel ? cancellable : NULL,
+ &io_error);
+ }
+ else if (io_op == STATE_OP_WRITE)
+ {
+ res = g_output_stream_write (file->command_stream,
+ io_data.io_buffer, io_data.io_size,
+ io_data.io_allow_cancel ? cancellable : NULL,
+ &io_error);
+ }
+ else
+ {
+ res = 0;
+ g_assert_not_reached ();
+ }
+
+ if (res == -1)
+ {
+ if (error_is_cancel (io_error))
+ {
+ io_data.io_res = 0;
+ io_data.io_cancelled = TRUE;
+ g_error_free (io_error);
+ }
+ else
+ {
+ g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
+ _("Error in stream protocol: %s"), io_error->message);
+ g_error_free (io_error);
+ return FALSE;
+ }
+ }
+ else if (res == 0 && io_data.io_size != 0)
+ {
+ g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
+ _("Error in stream protocol: %s"), _("End of stream"));
+ return FALSE;
+ }
+ else
+ {
+ io_data.io_res = res;
+ io_data.io_cancelled = FALSE;
+ }
+ }
+}
+
+/* read cycle:
+
+ if we know of a (partially read) matching outstanding block, read from it
+ create packet, append to outgoing
+ flush outgoing
+ start processing output, looking for a data block with same seek gen,
+ or an error with same seq nr
+ on cancel, send cancel command and go back to loop
+ */
+
+static StateOp
+iterate_write_state_machine (GDaemonFileOutputStream *file, IOOperationData *io_op, WriteOperation *op)
+{
+ gsize len;
+
+ while (TRUE)
+ {
+ switch (op->state)
+ {
+ /* Initial state for read op */
+ case WRITE_STATE_INIT:
+ append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_WRITE,
+ op->buffer_size, 0, op->buffer_size, &op->seq_nr);
+ op->state = WRITE_STATE_WROTE_COMMAND;
+ io_op->io_buffer = file->output_buffer->str;
+ io_op->io_size = file->output_buffer->len;
+ io_op->io_allow_cancel = TRUE; /* Allow cancel before first byte of request sent */
+ return STATE_OP_WRITE;
+
+ /* wrote parts of output_buffer */
+ case WRITE_STATE_WROTE_COMMAND:
+ if (io_op->io_cancelled)
+ {
+ op->ret_val = -1;
+ g_set_error_literal (&op->ret_error,
+ G_IO_ERROR,
+ G_IO_ERROR_CANCELLED,
+ _("Operation was cancelled"));
+ return STATE_OP_DONE;
+ }
+
+ if (io_op->io_res < file->output_buffer->len)
+ {
+ g_string_remove_in_front (file->output_buffer,
+ io_op->io_res);
+ io_op->io_buffer = file->output_buffer->str;
+ io_op->io_size = file->output_buffer->len;
+ io_op->io_allow_cancel = FALSE;
+ return STATE_OP_WRITE;
+ }
+ g_string_truncate (file->output_buffer, 0);
+
+ op->buffer_pos = 0;
+ if (op->sent_cancel)
+ op->state = WRITE_STATE_HANDLE_INPUT;
+ else
+ op->state = WRITE_STATE_SEND_DATA;
+ break;
+
+ /* No op */
+ case WRITE_STATE_SEND_DATA:
+ op->buffer_pos += io_op->io_res;
+
+ if (op->buffer_pos < op->buffer_size)
+ {
+ io_op->io_buffer = (char *)(op->buffer + op->buffer_pos);
+ io_op->io_size = op->buffer_size - op->buffer_pos;
+ io_op->io_allow_cancel = FALSE;
+ return STATE_OP_WRITE;
+ }
+
+ op->state = WRITE_STATE_HANDLE_INPUT;
+ break;
+
+ /* No op */
+ case WRITE_STATE_HANDLE_INPUT:
+ if (io_op->cancelled && !op->sent_cancel)
+ {
+ op->sent_cancel = TRUE;
+ append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL,
+ op->seq_nr, 0, 0, NULL);
+ op->state = WRITE_STATE_WROTE_COMMAND;
+ io_op->io_buffer = file->output_buffer->str;
+ io_op->io_size = file->output_buffer->len;
+ io_op->io_allow_cancel = FALSE;
+ return STATE_OP_WRITE;
+ }
+
+ if (io_op->io_res > 0)
+ {
+ gsize unread_size = io_op->io_size - io_op->io_res;
+ g_string_set_size (file->input_buffer,
+ file->input_buffer->len - unread_size);
+ }
+
+ len = get_reply_header_missing_bytes (file->input_buffer);
+ if (len > 0)
+ {
+ gsize current_len = file->input_buffer->len;
+ g_string_set_size (file->input_buffer,
+ current_len + len);
+ io_op->io_buffer = file->input_buffer->str + current_len;
+ io_op->io_size = len;
+ io_op->io_allow_cancel = !op->sent_cancel;
+ return STATE_OP_READ;
+ }
+
+ /* Got full header */
+
+ {
+ GVfsDaemonSocketProtocolReply reply;
+ char *data;
+ data = decode_reply (file->input_buffer, &reply);
+
+ if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR &&
+ reply.seq_nr == op->seq_nr)
+ {
+ op->ret_val = -1;
+ decode_error (&reply, data, &op->ret_error);
+ g_string_truncate (file->input_buffer, 0);
+ return STATE_OP_DONE;
+ }
+ else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_WRITTEN)
+ {
+ op->ret_val = reply.arg1;
+ g_string_truncate (file->input_buffer, 0);
+ return STATE_OP_DONE;
+ }
+ /* Ignore other reply types */
+ }
+
+ g_string_truncate (file->input_buffer, 0);
+
+ /* This wasn't interesting, read next reply */
+ op->state = WRITE_STATE_HANDLE_INPUT;
+ break;
+
+ default:
+ g_assert_not_reached ();
+ }
+
+ /* Clear io_op between non-op state switches */
+ io_op->io_size = 0;
+ io_op->io_res = 0;
+ io_op->io_cancelled = FALSE;
+
+ }
+}
+
+static gssize
+g_daemon_file_output_stream_write (GOutputStream *stream,
+ const void *buffer,
+ gsize count,
+ GCancellable *cancellable,
+ GError **error)
+{
+ GDaemonFileOutputStream *file;
+ WriteOperation op;
+
+ file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
+
+ if (g_cancellable_set_error_if_cancelled (cancellable, error))
+ return -1;
+
+ /* Limit for sanity and to avoid 32bit overflow */
+ if (count > MAX_WRITE_SIZE)
+ count = MAX_WRITE_SIZE;
+
+ memset (&op, 0, sizeof (op));
+ op.state = WRITE_STATE_INIT;
+ op.buffer = buffer;
+ op.buffer_size = count;
+
+ if (!run_sync_state_machine (file, (state_machine_iterator)iterate_write_state_machine,
+ &op, cancellable, error))
+ return -1; /* IO Error */
+
+ if (op.ret_val == -1)
+ g_propagate_error (error, op.ret_error);
+ else
+ file->current_offset += op.ret_val;
+
+ return op.ret_val;
+}
+
+static StateOp
+iterate_close_state_machine (GDaemonFileOutputStream *file, IOOperationData *io_op, CloseOperation *op)
+{
+ gsize len;
+
+ while (TRUE)
+ {
+ switch (op->state)
+ {
+ /* Initial state for read op */
+ case CLOSE_STATE_INIT:
+ append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CLOSE,
+ 0, 0, 0, &op->seq_nr);
+ op->state = CLOSE_STATE_WROTE_REQUEST;
+ io_op->io_buffer = file->output_buffer->str;
+ io_op->io_size = file->output_buffer->len;
+ io_op->io_allow_cancel = TRUE; /* Allow cancel before first byte of request sent */
+ return STATE_OP_WRITE;
+
+ /* wrote parts of output_buffer */
+ case CLOSE_STATE_WROTE_REQUEST:
+ if (io_op->io_cancelled)
+ {
+ op->ret_val = FALSE;
+ g_set_error_literal (&op->ret_error,
+ G_IO_ERROR,
+ G_IO_ERROR_CANCELLED,
+ _("Operation was cancelled"));
+ return STATE_OP_DONE;
+ }
+
+ if (io_op->io_res < file->output_buffer->len)
+ {
+ g_string_remove_in_front (file->output_buffer,
+ io_op->io_res);
+ io_op->io_buffer = file->output_buffer->str;
+ io_op->io_size = file->output_buffer->len;
+ io_op->io_allow_cancel = FALSE;
+ return STATE_OP_WRITE;
+ }
+ g_string_truncate (file->output_buffer, 0);
+
+ op->state = CLOSE_STATE_HANDLE_INPUT;
+ break;
+
+ /* No op */
+ case CLOSE_STATE_HANDLE_INPUT:
+ if (io_op->cancelled && !op->sent_cancel)
+ {
+ op->sent_cancel = TRUE;
+ append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL,
+ op->seq_nr, 0, 0, NULL);
+ op->state = CLOSE_STATE_WROTE_REQUEST;
+ io_op->io_buffer = file->output_buffer->str;
+ io_op->io_size = file->output_buffer->len;
+ io_op->io_allow_cancel = FALSE;
+ return STATE_OP_WRITE;
+ }
+
+ if (io_op->io_res > 0)
+ {
+ gsize unread_size = io_op->io_size - io_op->io_res;
+ g_string_set_size (file->input_buffer,
+ file->input_buffer->len - unread_size);
+ }
+
+ len = get_reply_header_missing_bytes (file->input_buffer);
+ if (len > 0)
+ {
+ gsize current_len = file->input_buffer->len;
+ g_string_set_size (file->input_buffer,
+ current_len + len);
+ io_op->io_buffer = file->input_buffer->str + current_len;
+ io_op->io_size = len;
+ io_op->io_allow_cancel = !op->sent_cancel;
+ return STATE_OP_READ;
+ }
+
+ /* Got full header */
+
+ {
+ GVfsDaemonSocketProtocolReply reply;
+ char *data;
+ data = decode_reply (file->input_buffer, &reply);
+
+ if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR &&
+ reply.seq_nr == op->seq_nr)
+ {
+ op->ret_val = FALSE;
+ decode_error (&reply, data, &op->ret_error);
+ g_string_truncate (file->input_buffer, 0);
+ return STATE_OP_DONE;
+ }
+ else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_CLOSED)
+ {
+ op->ret_val = TRUE;
+ if (reply.arg2 > 0)
+ file->etag = g_strndup (data, reply.arg2);
+ g_string_truncate (file->input_buffer, 0);
+ return STATE_OP_DONE;
+ }
+ /* Ignore other reply types */
+ }
+
+ g_string_truncate (file->input_buffer, 0);
+
+ /* This wasn't interesting, read next reply */
+ op->state = CLOSE_STATE_HANDLE_INPUT;
+ break;
+
+ default:
+ g_assert_not_reached ();
+ }
+
+ /* Clear io_op between non-op state switches */
+ io_op->io_size = 0;
+ io_op->io_res = 0;
+ io_op->io_cancelled = FALSE;
+ }
+}
+
+
+static gboolean
+g_daemon_file_output_stream_close (GOutputStream *stream,
+ GCancellable *cancellable,
+ GError **error)
+{
+ GDaemonFileOutputStream *file;
+ CloseOperation op;
+ gboolean res;
+
+ file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
+
+ /* We need to do a full roundtrip to guarantee that the writes have
+ reached the disk. */
+
+ memset (&op, 0, sizeof (op));
+ op.state = CLOSE_STATE_INIT;
+
+ if (!run_sync_state_machine (file, (state_machine_iterator)iterate_close_state_machine,
+ &op, cancellable, error))
+ res = FALSE;
+ else
+ {
+ if (!op.ret_val)
+ g_propagate_error (error, op.ret_error);
+ res = op.ret_val;
+ }
+
+ /* Return the first error, but close all streams */
+ if (res)
+ res = g_output_stream_close (file->command_stream, cancellable, error);
+ else
+ g_output_stream_close (file->command_stream, cancellable, NULL);
+
+ if (res)
+ res = g_input_stream_close (file->data_stream, cancellable, error);
+ else
+ g_input_stream_close (file->data_stream, cancellable, NULL);
+
+ return res;
+}
+
+static goffset
+g_daemon_file_output_stream_tell (GFileOutputStream *stream)
+{
+ GDaemonFileOutputStream *file;
+
+ file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
+
+ return file->current_offset;
+}
+
+static gboolean
+g_daemon_file_output_stream_can_seek (GFileOutputStream *stream)
+{
+ GDaemonFileOutputStream *file;
+
+ file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
+
+ return file->can_seek;
+}
+
+static StateOp
+iterate_seek_state_machine (GDaemonFileOutputStream *file, IOOperationData *io_op, SeekOperation *op)
+{
+ gsize len;
+ guint32 request;
+
+ while (TRUE)
+ {
+ switch (op->state)
+ {
+ /* Initial state for read op */
+ case SEEK_STATE_INIT:
+ request = G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_SET;
+ if (op->seek_type == G_SEEK_CUR)
+ op->offset = file->current_offset + op->offset;
+ else if (op->seek_type == G_SEEK_END)
+ request = G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_END;
+ append_request (file, request,
+ op->offset & 0xffffffff,
+ op->offset >> 32,
+ 0,
+ &op->seq_nr);
+ op->state = SEEK_STATE_WROTE_REQUEST;
+ io_op->io_buffer = file->output_buffer->str;
+ io_op->io_size = file->output_buffer->len;
+ io_op->io_allow_cancel = TRUE; /* Allow cancel before first byte of request sent */
+ return STATE_OP_WRITE;
+
+ /* wrote parts of output_buffer */
+ case SEEK_STATE_WROTE_REQUEST:
+ if (io_op->io_cancelled)
+ {
+ op->ret_val = -1;
+ g_set_error_literal (&op->ret_error,
+ G_IO_ERROR,
+ G_IO_ERROR_CANCELLED,
+ _("Operation was cancelled"));
+ return STATE_OP_DONE;
+ }
+
+ if (io_op->io_res < file->output_buffer->len)
+ {
+ g_string_remove_in_front (file->output_buffer,
+ io_op->io_res);
+ io_op->io_buffer = file->output_buffer->str;
+ io_op->io_size = file->output_buffer->len;
+ io_op->io_allow_cancel = FALSE;
+ return STATE_OP_WRITE;
+ }
+ g_string_truncate (file->output_buffer, 0);
+
+ op->state = SEEK_STATE_HANDLE_INPUT;
+ break;
+
+ /* No op */
+ case SEEK_STATE_HANDLE_INPUT:
+ if (io_op->cancelled && !op->sent_cancel)
+ {
+ op->sent_cancel = TRUE;
+ append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL,
+ op->seq_nr, 0, 0, NULL);
+ op->state = SEEK_STATE_WROTE_REQUEST;
+ io_op->io_buffer = file->output_buffer->str;
+ io_op->io_size = file->output_buffer->len;
+ io_op->io_allow_cancel = FALSE;
+ return STATE_OP_WRITE;
+ }
+
+ if (io_op->io_res > 0)
+ {
+ gsize unread_size = io_op->io_size - io_op->io_res;
+ g_string_set_size (file->input_buffer,
+ file->input_buffer->len - unread_size);
+ }
+
+ len = get_reply_header_missing_bytes (file->input_buffer);
+ if (len > 0)
+ {
+ gsize current_len = file->input_buffer->len;
+ g_string_set_size (file->input_buffer,
+ current_len + len);
+ io_op->io_buffer = file->input_buffer->str + current_len;
+ io_op->io_size = len;
+ io_op->io_allow_cancel = !op->sent_cancel;
+ return STATE_OP_READ;
+ }
+
+ /* Got full header */
+
+ {
+ GVfsDaemonSocketProtocolReply reply;
+ char *data;
+ data = decode_reply (file->input_buffer, &reply);
+
+ if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR &&
+ reply.seq_nr == op->seq_nr)
+ {
+ op->ret_val = FALSE;
+ decode_error (&reply, data, &op->ret_error);
+ g_string_truncate (file->input_buffer, 0);
+ return STATE_OP_DONE;
+ }
+ else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SEEK_POS)
+ {
+ op->ret_val = TRUE;
+ op->ret_offset = ((goffset)reply.arg2) << 32 | (goffset)reply.arg1;
+ g_string_truncate (file->input_buffer, 0);
+ return STATE_OP_DONE;
+ }
+ /* Ignore other reply types */
+ }
+
+ g_string_truncate (file->input_buffer, 0);
+
+ /* This wasn't interesting, read next reply */
+ op->state = SEEK_STATE_HANDLE_INPUT;
+ break;
+
+ default:
+ g_assert_not_reached ();
+ }
+
+ /* Clear io_op between non-op state switches */
+ io_op->io_size = 0;
+ io_op->io_res = 0;
+ io_op->io_cancelled = FALSE;
+ }
+}
+
+static gboolean
+g_daemon_file_output_stream_seek (GFileOutputStream *stream,
+ goffset offset,
+ GSeekType type,
+ GCancellable *cancellable,
+ GError **error)
+{
+ GDaemonFileOutputStream *file;
+ SeekOperation op;
+
+ file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
+
+ if (!file->can_seek)
+ {
+ g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
+ _("Seek not supported on stream"));
+ return FALSE;
+ }
+
+ if (g_cancellable_set_error_if_cancelled (cancellable, error))
+ return FALSE;
+
+ memset (&op, 0, sizeof (op));
+ op.state = SEEK_STATE_INIT;
+ op.offset = offset;
+ op.seek_type = type;
+
+ if (!run_sync_state_machine (file, (state_machine_iterator)iterate_seek_state_machine,
+ &op, cancellable, error))
+ return FALSE; /* IO Error */
+
+ if (!op.ret_val)
+ g_propagate_error (error, op.ret_error);
+ else
+ file->current_offset = op.ret_offset;
+
+ return op.ret_val;
+}
+
+static char *
+g_daemon_file_output_stream_get_etag (GFileOutputStream *stream)
+{
+ GDaemonFileOutputStream *file;
+
+ file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
+
+ return g_strdup (file->etag);
+}
+
+static StateOp
+iterate_query_state_machine (GDaemonFileOutputStream *file,
+ IOOperationData *io_op,
+ QueryOperation *op)
+{
+ gsize len;
+ guint32 request;
+
+ while (TRUE)
+ {
+ switch (op->state)
+ {
+ /* Initial state for read op */
+ case QUERY_STATE_INIT:
+ request = G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_QUERY_INFO;
+ append_request (file, request,
+ 0,
+ 0,
+ strlen (op->attributes),
+ &op->seq_nr);
+ g_string_append (file->output_buffer,
+ op->attributes);
+
+ op->state = QUERY_STATE_WROTE_REQUEST;
+ io_op->io_buffer = file->output_buffer->str;
+ io_op->io_size = file->output_buffer->len;
+ io_op->io_allow_cancel = TRUE; /* Allow cancel before first byte of request sent */
+ return STATE_OP_WRITE;
+
+ /* wrote parts of output_buffer */
+ case QUERY_STATE_WROTE_REQUEST:
+ if (io_op->io_cancelled)
+ {
+ op->info = NULL;
+ g_set_error_literal (&op->ret_error,
+ G_IO_ERROR,
+ G_IO_ERROR_CANCELLED,
+ _("Operation was cancelled"));
+ return STATE_OP_DONE;
+ }
+
+ if (io_op->io_res < file->output_buffer->len)
+ {
+ g_string_remove_in_front (file->output_buffer,
+ io_op->io_res);
+ io_op->io_buffer = file->output_buffer->str;
+ io_op->io_size = file->output_buffer->len;
+ io_op->io_allow_cancel = FALSE;
+ return STATE_OP_WRITE;
+ }
+ g_string_truncate (file->output_buffer, 0);
+
+ op->state = QUERY_STATE_HANDLE_INPUT;
+ break;
+
+ /* No op */
+ case QUERY_STATE_HANDLE_INPUT:
+ if (io_op->cancelled && !op->sent_cancel)
+ {
+ op->sent_cancel = TRUE;
+ append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL,
+ op->seq_nr, 0, 0, NULL);
+ op->state = QUERY_STATE_WROTE_REQUEST;
+ io_op->io_buffer = file->output_buffer->str;
+ io_op->io_size = file->output_buffer->len;
+ io_op->io_allow_cancel = FALSE;
+ return STATE_OP_WRITE;
+ }
+
+
+ if (io_op->io_res > 0)
+ {
+ gsize unread_size = io_op->io_size - io_op->io_res;
+ g_string_set_size (file->input_buffer,
+ file->input_buffer->len - unread_size);
+ }
+
+ len = get_reply_header_missing_bytes (file->input_buffer);
+ if (len > 0)
+ {
+ gsize current_len = file->input_buffer->len;
+ g_string_set_size (file->input_buffer,
+ current_len + len);
+ io_op->io_buffer = file->input_buffer->str + current_len;
+ io_op->io_size = len;
+ io_op->io_allow_cancel = !op->sent_cancel;
+ return STATE_OP_READ;
+ }
+
+ /* Got full header */
+
+ {
+ GVfsDaemonSocketProtocolReply reply;
+ char *data;
+ data = decode_reply (file->input_buffer, &reply);
+
+ if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR &&
+ reply.seq_nr == op->seq_nr)
+ {
+ op->info = NULL;
+ decode_error (&reply, data, &op->ret_error);
+ g_string_truncate (file->input_buffer, 0);
+ return STATE_OP_DONE;
+ }
+ else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_INFO)
+ {
+ op->info = gvfs_file_info_demarshal (data, reply.arg2);
+ g_string_truncate (file->input_buffer, 0);
+ return STATE_OP_DONE;
+ }
+ /* Ignore other reply types */
+ }
+
+ g_string_truncate (file->input_buffer, 0);
+
+ /* This wasn't interesting, read next reply */
+ op->state = SEEK_STATE_HANDLE_INPUT;
+ break;
+
+ default:
+ g_assert_not_reached ();
+ }
+
+ /* Clear io_op between non-op state switches */
+ io_op->io_size = 0;
+ io_op->io_res = 0;
+ io_op->io_cancelled = FALSE;
+ }
+}
+
+static GFileInfo *
+g_daemon_file_output_stream_query_info (GFileOutputStream *stream,
+ const char *attributes,
+ GCancellable *cancellable,
+ GError **error)
+{
+ GDaemonFileOutputStream *file;
+ QueryOperation op;
+
+ file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
+
+ if (g_cancellable_set_error_if_cancelled (cancellable, error))
+ return NULL;
+
+ memset (&op, 0, sizeof (op));
+ op.state = QUERY_STATE_INIT;
+ if (attributes)
+ op.attributes = (char *)attributes;
+ else
+ op.attributes = "";
+
+ if (!run_sync_state_machine (file, (state_machine_iterator)iterate_query_state_machine,
+ &op, cancellable, error))
+ return NULL; /* IO Error */
+
+ if (op.info == NULL)
+ g_propagate_error (error, op.ret_error);
+
+ return op.info;
+}
+
+/************************************************************************
+ * Async I/O Code *
+ ************************************************************************/
+
+typedef struct AsyncIterator AsyncIterator;
+
+typedef void (*AsyncIteratorDone) (GOutputStream *stream,
+ gpointer op_data,
+ GAsyncReadyCallback callback,
+ gpointer callback_data,
+ GError *io_error);
+
+struct AsyncIterator {
+ AsyncIteratorDone done_cb;
+ GDaemonFileOutputStream *file;
+ GCancellable *cancellable;
+ IOOperationData io_data;
+ state_machine_iterator iterator;
+ gpointer iterator_data;
+ int io_priority;
+ GAsyncReadyCallback callback;
+ gpointer callback_data;
+};
+
+static void async_iterate (AsyncIterator *iterator);
+
+static void
+async_iterator_done (AsyncIterator *iterator, GError *io_error)
+{
+ iterator->done_cb (G_OUTPUT_STREAM (iterator->file),
+ iterator->iterator_data,
+ iterator->callback,
+ iterator->callback_data,
+ io_error);
+
+ g_free (iterator);
+
+}
+
+static void
+async_op_handle (AsyncIterator *iterator,
+ gssize res,
+ GError *io_error)
+{
+ IOOperationData *io_data = &iterator->io_data;
+ GError *error;
+
+ if (io_error != NULL)
+ {
+ if (error_is_cancel (io_error))
+ {
+ io_data->io_res = 0;
+ io_data->io_cancelled = TRUE;
+ }
+ else
+ {
+ error = NULL;
+ g_set_error (&error, G_IO_ERROR, G_IO_ERROR_FAILED,
+ _("Error in stream protocol: %s"), io_error->message);
+ async_iterator_done (iterator, error);
+ g_error_free (error);
+ return;
+ }
+ }
+ else if (res == 0 && io_data->io_size != 0)
+ {
+ error = NULL;
+ g_set_error (&error, G_IO_ERROR, G_IO_ERROR_FAILED,
+ _("Error in stream protocol: %s"), _("End of stream"));
+ async_iterator_done (iterator, error);
+ g_error_free (error);
+ return;
+ }
+ else
+ {
+ io_data->io_res = res;
+ io_data->io_cancelled = FALSE;
+ }
+
+ async_iterate (iterator);
+}
+
+static void
+async_read_op_callback (GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ GInputStream *stream = G_INPUT_STREAM (source_object);
+ gssize count_read;
+ GError *error = NULL;
+
+ count_read = g_input_stream_read_finish (stream, res, &error);
+
+ async_op_handle ((AsyncIterator *)user_data, count_read, error);
+ if (error)
+ g_error_free (error);
+}
+
+static void
+async_skip_op_callback (GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ GInputStream *stream = G_INPUT_STREAM (source_object);
+ gssize count_skipped;
+ GError *error = NULL;
+
+ count_skipped = g_input_stream_skip_finish (stream, res, &error);
+
+ async_op_handle ((AsyncIterator *)user_data, count_skipped, error);
+ if (error)
+ g_error_free (error);
+}
+
+static void
+async_write_op_callback (GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ GOutputStream *stream = G_OUTPUT_STREAM (source_object);
+ gssize bytes_written;
+ GError *error = NULL;
+
+ bytes_written = g_output_stream_write_finish (stream, res, &error);
+
+ async_op_handle ((AsyncIterator *)user_data, bytes_written, error);
+ if (error)
+ g_error_free (error);
+}
+
+static void
+async_iterate (AsyncIterator *iterator)
+{
+ IOOperationData *io_data = &iterator->io_data;
+ GDaemonFileOutputStream *file = iterator->file;
+ StateOp io_op;
+
+ io_data->cancelled =
+ g_cancellable_is_cancelled (iterator->cancellable);
+
+ io_op = iterator->iterator (file, io_data, iterator->iterator_data);
+
+ if (io_op == STATE_OP_DONE)
+ {
+ async_iterator_done (iterator, NULL);
+ return;
+ }
+
+ /* TODO: Handle allow_cancel... */
+
+ if (io_op == STATE_OP_READ)
+ {
+ g_input_stream_read_async (file->data_stream,
+ io_data->io_buffer, io_data->io_size,
+ iterator->io_priority,
+ io_data->io_allow_cancel ? iterator->cancellable : NULL,
+ async_read_op_callback, iterator);
+ }
+ else if (io_op == STATE_OP_SKIP)
+ {
+ g_input_stream_skip_async (file->data_stream,
+ io_data->io_size,
+ iterator->io_priority,
+ io_data->io_allow_cancel ? iterator->cancellable : NULL,
+ async_skip_op_callback, iterator);
+ }
+ else if (io_op == STATE_OP_WRITE)
+ {
+ g_output_stream_write_async (file->command_stream,
+ io_data->io_buffer, io_data->io_size,
+ iterator->io_priority,
+ io_data->io_allow_cancel ? iterator->cancellable : NULL,
+ async_write_op_callback, iterator);
+ }
+ else
+ g_assert_not_reached ();
+}
+
+static void
+run_async_state_machine (GDaemonFileOutputStream *file,
+ state_machine_iterator iterator_cb,
+ gpointer iterator_data,
+ int io_priority,
+ GAsyncReadyCallback callback,
+ gpointer data,
+ GCancellable *cancellable,
+ AsyncIteratorDone done_cb)
+{
+ AsyncIterator *iterator;
+
+ iterator = g_new0 (AsyncIterator, 1);
+ iterator->file = file;
+ iterator->iterator = iterator_cb;
+ iterator->iterator_data = iterator_data;
+ iterator->io_priority = io_priority;
+ iterator->cancellable = cancellable;
+ iterator->callback = callback;
+ iterator->callback_data = data;
+ iterator->done_cb = done_cb;
+
+ async_iterate (iterator);
+}
+
+static void
+async_write_done (GOutputStream *stream,
+ gpointer op_data,
+ GAsyncReadyCallback callback,
+ gpointer user_data,
+ GError *io_error)
+{
+ GSimpleAsyncResult *simple;
+ WriteOperation *op;
+ gssize count_written;
+ GError *error;
+
+ op = op_data;
+
+ if (io_error)
+ {
+ count_written = -1;
+ error = io_error;
+ }
+ else
+ {
+ count_written = op->ret_val;
+ error = op->ret_error;
+ }
+
+ simple = g_simple_async_result_new (G_OBJECT (stream),
+ callback, user_data,
+ g_daemon_file_output_stream_write_async);
+
+ g_simple_async_result_set_op_res_gssize (simple, count_written);
+
+ if (count_written == -1)
+ g_simple_async_result_set_from_error (simple, error);
+
+ /* Complete immediately, not in idle, since we're already in a mainloop callout */
+ g_simple_async_result_complete (simple);
+ g_object_unref (simple);
+
+ if (op->ret_error)
+ g_error_free (op->ret_error);
+ g_free (op);
+}
+
+static void
+g_daemon_file_output_stream_write_async (GOutputStream *stream,
+ const void *buffer,
+ gsize count,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer data)
+{
+ GDaemonFileOutputStream *file;
+ WriteOperation *op;
+
+ file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
+
+ /* Limit for sanity and to avoid 32bit overflow */
+ if (count > MAX_WRITE_SIZE)
+ count = MAX_WRITE_SIZE;
+
+ op = g_new0 (WriteOperation, 1);
+ op->state = WRITE_STATE_INIT;
+ op->buffer = buffer;
+ op->buffer_size = count;
+
+ run_async_state_machine (file,
+ (state_machine_iterator)iterate_write_state_machine,
+ op,
+ io_priority,
+ callback, data,
+ cancellable,
+ async_write_done);
+}
+
+static gssize
+g_daemon_file_output_stream_write_finish (GOutputStream *stream,
+ GAsyncResult *result,
+ GError **error)
+{
+ GSimpleAsyncResult *simple;
+ gssize nwritten;
+
+ simple = G_SIMPLE_ASYNC_RESULT (result);
+ g_assert (g_simple_async_result_get_source_tag (simple) == g_daemon_file_output_stream_write_async);
+
+ nwritten = g_simple_async_result_get_op_res_gssize (simple);
+ return nwritten;
+}
+
+static void
+async_close_done (GOutputStream *stream,
+ gpointer op_data,
+ GAsyncReadyCallback callback,
+ gpointer user_data,
+ GError *io_error)
+{
+ GDaemonFileOutputStream *file;
+ GSimpleAsyncResult *simple;
+ CloseOperation *op;
+ gboolean result;
+ GError *error;
+ GCancellable *cancellable = NULL; /* TODO: get cancellable */
+
+ file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
+
+ op = op_data;
+
+ if (io_error)
+ {
+ result = FALSE;
+ error = io_error;
+ }
+ else
+ {
+ result = op->ret_val;
+ error = op->ret_error;
+ }
+
+ if (result)
+ result = g_output_stream_close (file->command_stream, cancellable, &error);
+ else
+ g_output_stream_close (file->command_stream, cancellable, NULL);
+
+ if (result)
+ result = g_input_stream_close (file->data_stream, cancellable, &error);
+ else
+ g_input_stream_close (file->data_stream, cancellable, NULL);
+
+ simple = g_simple_async_result_new (G_OBJECT (stream),
+ callback, user_data,
+ g_daemon_file_output_stream_close_async);
+
+ if (!result)
+ g_simple_async_result_set_from_error (simple, error);
+
+ /* Complete immediately, not in idle, since we're already in a mainloop callout */
+ g_simple_async_result_complete (simple);
+ g_object_unref (simple);
+
+ if (op->ret_error)
+ g_error_free (op->ret_error);
+ g_free (op);
+}
+
+static void
+g_daemon_file_output_stream_close_async (GOutputStream *stream,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer data)
+{
+ GDaemonFileOutputStream *file;
+ CloseOperation *op;
+
+ file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
+
+ op = g_new0 (CloseOperation, 1);
+ op->state = CLOSE_STATE_INIT;
+
+ run_async_state_machine (file,
+ (state_machine_iterator)iterate_close_state_machine,
+ op, io_priority,
+ (GAsyncReadyCallback)callback, data,
+ cancellable,
+ (AsyncIteratorDone)async_close_done);
+}
+
+static gboolean
+g_daemon_file_output_stream_close_finish (GOutputStream *stream,
+ GAsyncResult *result,
+ GError **error)
+{
+ /* Failures handled in generic close_finish code */
+ return TRUE;
+}
+
+static void
+async_query_done (GOutputStream *stream,
+ gpointer op_data,
+ GAsyncReadyCallback callback,
+ gpointer user_data,
+ GError *io_error)
+{
+ GDaemonFileOutputStream *file;
+ GSimpleAsyncResult *simple;
+ QueryOperation *op;
+ GFileInfo *info;
+ GError *error;
+
+ file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
+
+ op = op_data;
+
+ if (io_error)
+ {
+ info = NULL;
+ error = io_error;
+ }
+ else
+ {
+ info = op->info;
+ error = op->ret_error;
+ }
+
+ simple = g_simple_async_result_new (G_OBJECT (stream),
+ callback, user_data,
+ g_daemon_file_output_stream_query_info_async);
+
+ if (info == NULL)
+ g_simple_async_result_set_from_error (simple, error);
+ else
+ g_simple_async_result_set_op_res_gpointer (simple, info,
+ g_object_unref);
+
+ /* Complete immediately, not in idle, since we're already in a mainloop callout */
+ g_simple_async_result_complete (simple);
+ g_object_unref (simple);
+
+ if (op->ret_error)
+ g_error_free (op->ret_error);
+ g_free (op->attributes);
+ g_free (op);
+}
+
+static void
+g_daemon_file_output_stream_query_info_async (GFileOutputStream *stream,
+ const char *attributes,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ GDaemonFileOutputStream *file;
+ QueryOperation *op;
+
+ file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
+
+ op = g_new0 (QueryOperation, 1);
+ op->state = QUERY_STATE_INIT;
+ if (attributes)
+ op->attributes = g_strdup (attributes);
+ else
+ op->attributes = g_strdup ("");
+
+ run_async_state_machine (file,
+ (state_machine_iterator)iterate_query_state_machine,
+ op, io_priority,
+ callback, user_data,
+ cancellable,
+ async_query_done);
+}
+
+static GFileInfo *
+g_daemon_file_output_stream_query_info_finish (GFileOutputStream *stream,
+ GAsyncResult *result,
+ GError **error)
+{
+ GSimpleAsyncResult *simple;
+ GFileInfo *info;
+
+ simple = G_SIMPLE_ASYNC_RESULT (result);
+ g_assert (g_simple_async_result_get_source_tag (simple) == g_daemon_file_output_stream_query_info_async);
+
+ info = g_simple_async_result_get_op_res_gpointer (simple);
+
+ return g_object_ref (info);
+
+}