summaryrefslogtreecommitdiff
path: root/client/gdaemonfileoutputstream.c
diff options
context:
space:
mode:
Diffstat (limited to 'client/gdaemonfileoutputstream.c')
-rw-r--r--client/gdaemonfileoutputstream.c1374
1 files changed, 0 insertions, 1374 deletions
diff --git a/client/gdaemonfileoutputstream.c b/client/gdaemonfileoutputstream.c
deleted file mode 100644
index 525475c8..00000000
--- a/client/gdaemonfileoutputstream.c
+++ /dev/null
@@ -1,1374 +0,0 @@
-/* GIO - GLib Input, Output and Streaming Library
- *
- * Copyright (C) 2006-2007 Red Hat, Inc.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General
- * Public License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
- * Boston, MA 02111-1307, USA.
- *
- * Author: Alexander Larsson <alexl@redhat.com>
- */
-
-#include <config.h>
-
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <fcntl.h>
-#include <unistd.h>
-#include <errno.h>
-#include <string.h>
-#include <sys/socket.h>
-#include <sys/un.h>
-
-#include <glib.h>
-#include <glib/gstdio.h>
-#include <glib/gi18n-lib.h>
-#include <gio/gio.h>
-#include <gio/gunixinputstream.h>
-#include <gio/gunixoutputstream.h>
-#include "gdaemonfileoutputstream.h"
-#include "gvfsdaemondbus.h"
-#include <gvfsdaemonprotocol.h>
-
-#define MAX_WRITE_SIZE (4*1024*1024)
-
-typedef enum {
- STATE_OP_DONE,
- STATE_OP_READ,
- STATE_OP_WRITE,
- STATE_OP_SKIP
-} StateOp;
-
-typedef enum {
- WRITE_STATE_INIT = 0,
- WRITE_STATE_WROTE_COMMAND,
- WRITE_STATE_SEND_DATA,
- WRITE_STATE_HANDLE_INPUT
-} WriteState;
-
-typedef struct {
- WriteState state;
-
- /* Output */
- const char *buffer;
- gsize buffer_size;
- gsize buffer_pos;
-
- /* Input */
- gssize ret_val;
- GError *ret_error;
-
- gboolean sent_cancel;
-
- guint32 seq_nr;
-} WriteOperation;
-
-typedef enum {
- SEEK_STATE_INIT = 0,
- SEEK_STATE_WROTE_REQUEST,
- SEEK_STATE_HANDLE_INPUT
-} SeekState;
-
-typedef struct {
- SeekState state;
-
- /* Output */
- goffset offset;
- GSeekType seek_type;
- /* Output */
- gboolean ret_val;
- GError *ret_error;
- goffset ret_offset;
-
- gboolean sent_cancel;
-
- guint32 seq_nr;
-} SeekOperation;
-
-typedef enum {
- CLOSE_STATE_INIT = 0,
- CLOSE_STATE_WROTE_REQUEST,
- CLOSE_STATE_HANDLE_INPUT
-} CloseState;
-
-typedef struct {
- CloseState state;
-
- /* Output */
-
- /* Output */
- gboolean ret_val;
- GError *ret_error;
-
- gboolean sent_cancel;
-
- guint32 seq_nr;
-} CloseOperation;
-
-
-typedef struct {
- gboolean cancelled;
-
- char *io_buffer;
- gsize io_size;
- gsize io_res;
- /* The operation always succeeds, or gets cancelled.
- If we get an error doing the i/o that is considered fatal */
- gboolean io_allow_cancel;
- gboolean io_cancelled;
-} IOOperationData;
-
-typedef StateOp (*state_machine_iterator) (GDaemonFileOutputStream *file, IOOperationData *io_op, gpointer data);
-
-struct _GDaemonFileOutputStream {
- GFileOutputStream parent;
-
- GOutputStream *command_stream;
- GInputStream *data_stream;
- guint can_seek : 1;
-
- guint32 seq_nr;
- goffset current_offset;
-
- gsize input_block_size;
- GString *input_buffer;
-
- GString *output_buffer;
-
- char *etag;
-
-};
-
-static gssize g_daemon_file_output_stream_write (GOutputStream *stream,
- const void *buffer,
- gsize count,
- GCancellable *cancellable,
- GError **error);
-static gboolean g_daemon_file_output_stream_close (GOutputStream *stream,
- GCancellable *cancellable,
- GError **error);
-static GFileInfo *g_daemon_file_output_stream_query_info (GFileOutputStream *stream,
- char *attributes,
- GCancellable *cancellable,
- GError **error);
-static char *g_daemon_file_output_stream_get_etag (GFileOutputStream *stream);
-static goffset g_daemon_file_output_stream_tell (GFileOutputStream *stream);
-static gboolean g_daemon_file_output_stream_can_seek (GFileOutputStream *stream);
-static gboolean g_daemon_file_output_stream_seek (GFileOutputStream *stream,
- goffset offset,
- GSeekType type,
- GCancellable *cancellable,
- GError **error);
-static void g_daemon_file_output_stream_write_async (GOutputStream *stream,
- const void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer data);
-static gssize g_daemon_file_output_stream_write_finish (GOutputStream *stream,
- GAsyncResult *result,
- GError **error);
-static void g_daemon_file_output_stream_close_async (GOutputStream *stream,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer data);
-static gboolean g_daemon_file_output_stream_close_finish (GOutputStream *stream,
- GAsyncResult *result,
- GError **error);
-
-G_DEFINE_TYPE (GDaemonFileOutputStream, g_daemon_file_output_stream,
- G_TYPE_FILE_OUTPUT_STREAM)
-
-static void
-g_daemon_file_output_stream_finalize (GObject *object)
-{
- GDaemonFileOutputStream *file;
-
- file = G_DAEMON_FILE_OUTPUT_STREAM (object);
-
- if (file->command_stream)
- g_object_unref (file->command_stream);
- if (file->data_stream)
- g_object_unref (file->data_stream);
-
- g_string_free (file->input_buffer, TRUE);
- g_string_free (file->output_buffer, TRUE);
-
- g_free (file->etag);
-
- if (G_OBJECT_CLASS (g_daemon_file_output_stream_parent_class)->finalize)
- (*G_OBJECT_CLASS (g_daemon_file_output_stream_parent_class)->finalize) (object);
-}
-
-static void
-g_daemon_file_output_stream_class_init (GDaemonFileOutputStreamClass *klass)
-{
- GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
- GOutputStreamClass *stream_class = G_OUTPUT_STREAM_CLASS (klass);
- GFileOutputStreamClass *file_stream_class = G_FILE_OUTPUT_STREAM_CLASS (klass);
-
- gobject_class->finalize = g_daemon_file_output_stream_finalize;
-
- stream_class->write_fn = g_daemon_file_output_stream_write;
- stream_class->close_fn = g_daemon_file_output_stream_close;
-
- stream_class->write_async = g_daemon_file_output_stream_write_async;
- stream_class->write_finish = g_daemon_file_output_stream_write_finish;
- stream_class->close_async = g_daemon_file_output_stream_close_async;
- stream_class->close_finish = g_daemon_file_output_stream_close_finish;
-
- file_stream_class->tell = g_daemon_file_output_stream_tell;
- file_stream_class->can_seek = g_daemon_file_output_stream_can_seek;
- file_stream_class->seek = g_daemon_file_output_stream_seek;
- file_stream_class->query_info = g_daemon_file_output_stream_query_info;
- file_stream_class->get_etag = g_daemon_file_output_stream_get_etag;
-}
-
-static void
-g_daemon_file_output_stream_init (GDaemonFileOutputStream *info)
-{
- info->output_buffer = g_string_new ("");
- info->input_buffer = g_string_new ("");
- info->seq_nr = 1;
-}
-
-GFileOutputStream *
-g_daemon_file_output_stream_new (int fd,
- gboolean can_seek,
- goffset initial_offset)
-{
- GDaemonFileOutputStream *stream;
-
- stream = g_object_new (G_TYPE_DAEMON_FILE_OUTPUT_STREAM, NULL);
-
- stream->command_stream = g_unix_output_stream_new (fd, FALSE);
- stream->data_stream = g_unix_input_stream_new (fd, TRUE);
- stream->can_seek = can_seek;
- stream->current_offset = initial_offset;
-
- return G_FILE_OUTPUT_STREAM (stream);
-}
-
-static gboolean
-error_is_cancel (GError *error)
-{
- return error != NULL &&
- error->domain == G_IO_ERROR &&
- error->code == G_IO_ERROR_CANCELLED;
-}
-
-static void
-append_request (GDaemonFileOutputStream *stream, guint32 command,
- guint32 arg1, guint32 arg2, guint32 data_len, guint32 *seq_nr)
-{
- GVfsDaemonSocketProtocolRequest cmd;
-
- g_assert (sizeof (cmd) == G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE);
-
- if (seq_nr)
- *seq_nr = stream->seq_nr;
-
- cmd.command = g_htonl (command);
- cmd.seq_nr = g_htonl (stream->seq_nr++);
- cmd.arg1 = g_htonl (arg1);
- cmd.arg2 = g_htonl (arg2);
- cmd.data_len = g_htonl (data_len);
-
- g_string_append_len (stream->output_buffer,
- (char *)&cmd, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE);
-}
-
-static gsize
-get_reply_header_missing_bytes (GString *buffer)
-{
- GVfsDaemonSocketProtocolReply *reply;
- guint32 type;
- guint32 arg2;
-
- if (buffer->len < G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE)
- return G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE - buffer->len;
-
- reply = (GVfsDaemonSocketProtocolReply *)buffer->str;
-
- type = g_ntohl (reply->type);
- arg2 = g_ntohl (reply->arg2);
-
- /* ERROR and CLOSED has extra data w/ len in arg2 */
- if (type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR ||
- type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_CLOSED)
- return G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE + arg2 - buffer->len;
- return 0;
-}
-
-static char *
-decode_reply (GString *buffer, GVfsDaemonSocketProtocolReply *reply_out)
-{
- GVfsDaemonSocketProtocolReply *reply;
- reply = (GVfsDaemonSocketProtocolReply *)buffer->str;
- reply_out->type = g_ntohl (reply->type);
- reply_out->seq_nr = g_ntohl (reply->seq_nr);
- reply_out->arg1 = g_ntohl (reply->arg1);
- reply_out->arg2 = g_ntohl (reply->arg2);
-
- return buffer->str + G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE;
-}
-
-static void
-decode_error (GVfsDaemonSocketProtocolReply *reply, char *data, GError **error)
-{
- g_set_error_literal (error,
- g_quark_from_string (data),
- reply->arg1,
- data + strlen (data) + 1);
-}
-
-
-static gboolean
-run_sync_state_machine (GDaemonFileOutputStream *file,
- state_machine_iterator iterator,
- gpointer data,
- GCancellable *cancellable,
- GError **error)
-{
- gssize res;
- StateOp io_op;
- IOOperationData io_data;
- GError *io_error;
-
- memset (&io_data, 0, sizeof (io_data));
-
- while (TRUE)
- {
- if (cancellable)
- io_data.cancelled = g_cancellable_is_cancelled (cancellable);
-
- io_op = iterator (file, &io_data, data);
-
- if (io_op == STATE_OP_DONE)
- return TRUE;
-
- io_error = NULL;
- if (io_op == STATE_OP_READ)
- {
- res = g_input_stream_read (file->data_stream,
- io_data.io_buffer, io_data.io_size,
- io_data.io_allow_cancel ? cancellable : NULL,
- &io_error);
- }
- else if (io_op == STATE_OP_SKIP)
- {
- res = g_input_stream_skip (file->data_stream,
- io_data.io_size,
- io_data.io_allow_cancel ? cancellable : NULL,
- &io_error);
- }
- else if (io_op == STATE_OP_WRITE)
- {
- res = g_output_stream_write (file->command_stream,
- io_data.io_buffer, io_data.io_size,
- io_data.io_allow_cancel ? cancellable : NULL,
- &io_error);
- }
- else
- {
- res = 0;
- g_assert_not_reached ();
- }
-
- if (res == -1)
- {
- if (error_is_cancel (io_error))
- {
- io_data.io_res = 0;
- io_data.io_cancelled = TRUE;
- g_error_free (io_error);
- }
- else
- {
- g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
- _("Error in stream protocol: %s"), io_error->message);
- g_error_free (io_error);
- return FALSE;
- }
- }
- else if (res == 0 && io_data.io_size != 0)
- {
- g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
- _("Error in stream protocol: %s"), _("End of stream"));
- return FALSE;
- }
- else
- {
- io_data.io_res = res;
- io_data.io_cancelled = FALSE;
- }
- }
-}
-
-/* read cycle:
-
- if we know of a (partially read) matching outstanding block, read from it
- create packet, append to outgoing
- flush outgoing
- start processing output, looking for a data block with same seek gen,
- or an error with same seq nr
- on cancel, send cancel command and go back to loop
- */
-
-static StateOp
-iterate_write_state_machine (GDaemonFileOutputStream *file, IOOperationData *io_op, WriteOperation *op)
-{
- gsize len;
-
- while (TRUE)
- {
- switch (op->state)
- {
- /* Initial state for read op */
- case WRITE_STATE_INIT:
- append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_WRITE,
- op->buffer_size, 0, op->buffer_size, &op->seq_nr);
- op->state = WRITE_STATE_WROTE_COMMAND;
- io_op->io_buffer = file->output_buffer->str;
- io_op->io_size = file->output_buffer->len;
- io_op->io_allow_cancel = TRUE; /* Allow cancel before first byte of request sent */
- return STATE_OP_WRITE;
-
- /* wrote parts of output_buffer */
- case WRITE_STATE_WROTE_COMMAND:
- if (io_op->io_cancelled)
- {
- op->ret_val = -1;
- g_set_error_literal (&op->ret_error,
- G_IO_ERROR,
- G_IO_ERROR_CANCELLED,
- _("Operation was cancelled"));
- return STATE_OP_DONE;
- }
-
- if (io_op->io_res < file->output_buffer->len)
- {
- memcpy (file->output_buffer->str,
- file->output_buffer->str + io_op->io_res,
- file->output_buffer->len - io_op->io_res);
- g_string_truncate (file->output_buffer,
- file->output_buffer->len - io_op->io_res);
- io_op->io_buffer = file->output_buffer->str;
- io_op->io_size = file->output_buffer->len;
- io_op->io_allow_cancel = FALSE;
- return STATE_OP_WRITE;
- }
- g_string_truncate (file->output_buffer, 0);
-
- op->buffer_pos = 0;
- if (op->sent_cancel)
- op->state = WRITE_STATE_HANDLE_INPUT;
- else
- op->state = WRITE_STATE_SEND_DATA;
- break;
-
- /* No op */
- case WRITE_STATE_SEND_DATA:
- op->buffer_pos += io_op->io_res;
-
- if (op->buffer_pos < op->buffer_size)
- {
- io_op->io_buffer = (char *)(op->buffer + op->buffer_pos);
- io_op->io_size = op->buffer_size - op->buffer_pos;
- io_op->io_allow_cancel = FALSE;
- return STATE_OP_WRITE;
- }
-
- op->state = WRITE_STATE_HANDLE_INPUT;
- break;
-
- /* No op */
- case WRITE_STATE_HANDLE_INPUT:
- if (io_op->cancelled && !op->sent_cancel)
- {
- op->sent_cancel = TRUE;
- append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL,
- op->seq_nr, 0, 0, NULL);
- op->state = WRITE_STATE_WROTE_COMMAND;
- io_op->io_buffer = file->output_buffer->str;
- io_op->io_size = file->output_buffer->len;
- io_op->io_allow_cancel = FALSE;
- return STATE_OP_WRITE;
- }
-
- if (io_op->io_res > 0)
- {
- gsize unread_size = io_op->io_size - io_op->io_res;
- g_string_set_size (file->input_buffer,
- file->input_buffer->len - unread_size);
- }
-
- len = get_reply_header_missing_bytes (file->input_buffer);
- if (len > 0)
- {
- gsize current_len = file->input_buffer->len;
- g_string_set_size (file->input_buffer,
- current_len + len);
- io_op->io_buffer = file->input_buffer->str + current_len;
- io_op->io_size = len;
- io_op->io_allow_cancel = !op->sent_cancel;
- return STATE_OP_READ;
- }
-
- /* Got full header */
-
- {
- GVfsDaemonSocketProtocolReply reply;
- char *data;
- data = decode_reply (file->input_buffer, &reply);
-
- if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR &&
- reply.seq_nr == op->seq_nr)
- {
- op->ret_val = -1;
- decode_error (&reply, data, &op->ret_error);
- g_string_truncate (file->input_buffer, 0);
- return STATE_OP_DONE;
- }
- else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_WRITTEN)
- {
- op->ret_val = reply.arg1;
- g_string_truncate (file->input_buffer, 0);
- return STATE_OP_DONE;
- }
- /* Ignore other reply types */
- }
-
- g_string_truncate (file->input_buffer, 0);
-
- /* This wasn't interesting, read next reply */
- op->state = WRITE_STATE_HANDLE_INPUT;
- break;
-
- default:
- g_assert_not_reached ();
- }
-
- /* Clear io_op between non-op state switches */
- io_op->io_size = 0;
- io_op->io_res = 0;
- io_op->io_cancelled = FALSE;
-
- }
-}
-
-static gssize
-g_daemon_file_output_stream_write (GOutputStream *stream,
- const void *buffer,
- gsize count,
- GCancellable *cancellable,
- GError **error)
-{
- GDaemonFileOutputStream *file;
- WriteOperation op;
-
- file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
-
- if (g_cancellable_set_error_if_cancelled (cancellable, error))
- return -1;
-
- /* Limit for sanity and to avoid 32bit overflow */
- if (count > MAX_WRITE_SIZE)
- count = MAX_WRITE_SIZE;
-
- memset (&op, 0, sizeof (op));
- op.state = WRITE_STATE_INIT;
- op.buffer = buffer;
- op.buffer_size = count;
-
- if (!run_sync_state_machine (file, (state_machine_iterator)iterate_write_state_machine,
- &op, cancellable, error))
- return -1; /* IO Error */
-
- if (op.ret_val == -1)
- g_propagate_error (error, op.ret_error);
- else
- file->current_offset += op.ret_val;
-
- return op.ret_val;
-}
-
-static StateOp
-iterate_close_state_machine (GDaemonFileOutputStream *file, IOOperationData *io_op, CloseOperation *op)
-{
- gsize len;
-
- while (TRUE)
- {
- switch (op->state)
- {
- /* Initial state for read op */
- case CLOSE_STATE_INIT:
- append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CLOSE,
- 0, 0, 0, &op->seq_nr);
- op->state = CLOSE_STATE_WROTE_REQUEST;
- io_op->io_buffer = file->output_buffer->str;
- io_op->io_size = file->output_buffer->len;
- io_op->io_allow_cancel = TRUE; /* Allow cancel before first byte of request sent */
- return STATE_OP_WRITE;
-
- /* wrote parts of output_buffer */
- case CLOSE_STATE_WROTE_REQUEST:
- if (io_op->io_cancelled)
- {
- op->ret_val = FALSE;
- g_set_error_literal (&op->ret_error,
- G_IO_ERROR,
- G_IO_ERROR_CANCELLED,
- _("Operation was cancelled"));
- return STATE_OP_DONE;
- }
-
- if (io_op->io_res < file->output_buffer->len)
- {
- memcpy (file->output_buffer->str,
- file->output_buffer->str + io_op->io_res,
- file->output_buffer->len - io_op->io_res);
- g_string_truncate (file->output_buffer,
- file->output_buffer->len - io_op->io_res);
- io_op->io_buffer = file->output_buffer->str;
- io_op->io_size = file->output_buffer->len;
- io_op->io_allow_cancel = FALSE;
- return STATE_OP_WRITE;
- }
- g_string_truncate (file->output_buffer, 0);
-
- op->state = CLOSE_STATE_HANDLE_INPUT;
- break;
-
- /* No op */
- case CLOSE_STATE_HANDLE_INPUT:
- if (io_op->cancelled && !op->sent_cancel)
- {
- op->sent_cancel = TRUE;
- append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL,
- op->seq_nr, 0, 0, NULL);
- op->state = CLOSE_STATE_WROTE_REQUEST;
- io_op->io_buffer = file->output_buffer->str;
- io_op->io_size = file->output_buffer->len;
- io_op->io_allow_cancel = FALSE;
- return STATE_OP_WRITE;
- }
-
- if (io_op->io_res > 0)
- {
- gsize unread_size = io_op->io_size - io_op->io_res;
- g_string_set_size (file->input_buffer,
- file->input_buffer->len - unread_size);
- }
-
- len = get_reply_header_missing_bytes (file->input_buffer);
- if (len > 0)
- {
- gsize current_len = file->input_buffer->len;
- g_string_set_size (file->input_buffer,
- current_len + len);
- io_op->io_buffer = file->input_buffer->str + current_len;
- io_op->io_size = len;
- io_op->io_allow_cancel = !op->sent_cancel;
- return STATE_OP_READ;
- }
-
- /* Got full header */
-
- {
- GVfsDaemonSocketProtocolReply reply;
- char *data;
- data = decode_reply (file->input_buffer, &reply);
-
- if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR &&
- reply.seq_nr == op->seq_nr)
- {
- op->ret_val = FALSE;
- decode_error (&reply, data, &op->ret_error);
- g_string_truncate (file->input_buffer, 0);
- return STATE_OP_DONE;
- }
- else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_CLOSED)
- {
- op->ret_val = TRUE;
- if (reply.arg2 > 0)
- file->etag = g_strndup (data, reply.arg2);
- g_string_truncate (file->input_buffer, 0);
- return STATE_OP_DONE;
- }
- /* Ignore other reply types */
- }
-
- g_string_truncate (file->input_buffer, 0);
-
- /* This wasn't interesting, read next reply */
- op->state = CLOSE_STATE_HANDLE_INPUT;
- break;
-
- default:
- g_assert_not_reached ();
- }
-
- /* Clear io_op between non-op state switches */
- io_op->io_size = 0;
- io_op->io_res = 0;
- io_op->io_cancelled = FALSE;
- }
-}
-
-
-static gboolean
-g_daemon_file_output_stream_close (GOutputStream *stream,
- GCancellable *cancellable,
- GError **error)
-{
- GDaemonFileOutputStream *file;
- CloseOperation op;
- gboolean res;
-
- file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
-
- /* We need to do a full roundtrip to guarantee that the writes have
- reached the disk. */
-
- memset (&op, 0, sizeof (op));
- op.state = CLOSE_STATE_INIT;
-
- if (!run_sync_state_machine (file, (state_machine_iterator)iterate_close_state_machine,
- &op, cancellable, error))
- res = FALSE;
- else
- {
- if (!op.ret_val)
- g_propagate_error (error, op.ret_error);
- res = op.ret_val;
- }
-
- /* Return the first error, but close all streams */
- if (res)
- res = g_output_stream_close (file->command_stream, cancellable, error);
- else
- g_output_stream_close (file->command_stream, cancellable, NULL);
-
- if (res)
- res = g_input_stream_close (file->data_stream, cancellable, error);
- else
- g_input_stream_close (file->data_stream, cancellable, NULL);
-
- return res;
-}
-
-static goffset
-g_daemon_file_output_stream_tell (GFileOutputStream *stream)
-{
- GDaemonFileOutputStream *file;
-
- file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
-
- return file->current_offset;
-}
-
-static gboolean
-g_daemon_file_output_stream_can_seek (GFileOutputStream *stream)
-{
- GDaemonFileOutputStream *file;
-
- file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
-
- return file->can_seek;
-}
-
-static StateOp
-iterate_seek_state_machine (GDaemonFileOutputStream *file, IOOperationData *io_op, SeekOperation *op)
-{
- gsize len;
- guint32 request;
-
- while (TRUE)
- {
- switch (op->state)
- {
- /* Initial state for read op */
- case SEEK_STATE_INIT:
- request = G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_SET;
- if (op->seek_type == G_SEEK_CUR)
- op->offset = file->current_offset + op->offset;
- else if (op->seek_type == G_SEEK_END)
- request = G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_END;
- append_request (file, request,
- op->offset & 0xffffffff,
- op->offset >> 32,
- 0,
- &op->seq_nr);
- op->state = SEEK_STATE_WROTE_REQUEST;
- io_op->io_buffer = file->output_buffer->str;
- io_op->io_size = file->output_buffer->len;
- io_op->io_allow_cancel = TRUE; /* Allow cancel before first byte of request sent */
- return STATE_OP_WRITE;
-
- /* wrote parts of output_buffer */
- case SEEK_STATE_WROTE_REQUEST:
- if (io_op->io_cancelled)
- {
- op->ret_val = -1;
- g_set_error_literal (&op->ret_error,
- G_IO_ERROR,
- G_IO_ERROR_CANCELLED,
- _("Operation was cancelled"));
- return STATE_OP_DONE;
- }
-
- if (io_op->io_res < file->output_buffer->len)
- {
- memcpy (file->output_buffer->str,
- file->output_buffer->str + io_op->io_res,
- file->output_buffer->len - io_op->io_res);
- g_string_truncate (file->output_buffer,
- file->output_buffer->len - io_op->io_res);
- io_op->io_buffer = file->output_buffer->str;
- io_op->io_size = file->output_buffer->len;
- io_op->io_allow_cancel = FALSE;
- return STATE_OP_WRITE;
- }
- g_string_truncate (file->output_buffer, 0);
-
- op->state = SEEK_STATE_HANDLE_INPUT;
- break;
-
- /* No op */
- case SEEK_STATE_HANDLE_INPUT:
- if (io_op->cancelled && !op->sent_cancel)
- {
- op->sent_cancel = TRUE;
- append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL,
- op->seq_nr, 0, 0, NULL);
- op->state = SEEK_STATE_WROTE_REQUEST;
- io_op->io_buffer = file->output_buffer->str;
- io_op->io_size = file->output_buffer->len;
- io_op->io_allow_cancel = FALSE;
- return STATE_OP_WRITE;
- }
-
- if (io_op->io_res > 0)
- {
- gsize unread_size = io_op->io_size - io_op->io_res;
- g_string_set_size (file->input_buffer,
- file->input_buffer->len - unread_size);
- }
-
- len = get_reply_header_missing_bytes (file->input_buffer);
- if (len > 0)
- {
- gsize current_len = file->input_buffer->len;
- g_string_set_size (file->input_buffer,
- current_len + len);
- io_op->io_buffer = file->input_buffer->str + current_len;
- io_op->io_size = len;
- io_op->io_allow_cancel = !op->sent_cancel;
- return STATE_OP_READ;
- }
-
- /* Got full header */
-
- {
- GVfsDaemonSocketProtocolReply reply;
- char *data;
- data = decode_reply (file->input_buffer, &reply);
-
- if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR &&
- reply.seq_nr == op->seq_nr)
- {
- op->ret_val = FALSE;
- decode_error (&reply, data, &op->ret_error);
- g_string_truncate (file->input_buffer, 0);
- return STATE_OP_DONE;
- }
- else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SEEK_POS)
- {
- op->ret_val = TRUE;
- op->ret_offset = ((goffset)reply.arg2) << 32 | (goffset)reply.arg1;
- g_string_truncate (file->input_buffer, 0);
- return STATE_OP_DONE;
- }
- /* Ignore other reply types */
- }
-
- g_string_truncate (file->input_buffer, 0);
-
- /* This wasn't interesting, read next reply */
- op->state = SEEK_STATE_HANDLE_INPUT;
- break;
-
- default:
- g_assert_not_reached ();
- }
-
- /* Clear io_op between non-op state switches */
- io_op->io_size = 0;
- io_op->io_res = 0;
- io_op->io_cancelled = FALSE;
- }
-}
-
-static gboolean
-g_daemon_file_output_stream_seek (GFileOutputStream *stream,
- goffset offset,
- GSeekType type,
- GCancellable *cancellable,
- GError **error)
-{
- GDaemonFileOutputStream *file;
- SeekOperation op;
-
- file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
-
- if (!file->can_seek)
- {
- g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
- _("Seek not supported on stream"));
- return FALSE;
- }
-
- if (g_cancellable_set_error_if_cancelled (cancellable, error))
- return FALSE;
-
- memset (&op, 0, sizeof (op));
- op.state = SEEK_STATE_INIT;
- op.offset = offset;
- op.seek_type = type;
-
- if (!run_sync_state_machine (file, (state_machine_iterator)iterate_seek_state_machine,
- &op, cancellable, error))
- return FALSE; /* IO Error */
-
- if (!op.ret_val)
- g_propagate_error (error, op.ret_error);
- else
- file->current_offset = op.ret_offset;
-
- return op.ret_val;
-}
-
-static char *
-g_daemon_file_output_stream_get_etag (GFileOutputStream *stream)
-{
- GDaemonFileOutputStream *file;
-
- file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
-
- return g_strdup (file->etag);
-}
-
-static GFileInfo *
-g_daemon_file_output_stream_query_info (GFileOutputStream *stream,
- char *attributes,
- GCancellable *cancellable,
- GError **error)
-{
-#if 0
- GDaemonFileOutputStream *file;
-
- file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
-#endif
-
- g_set_error (error,
- G_IO_ERROR,
- G_IO_ERROR_NOT_SUPPORTED,
- _("Query info not supported on stream"));
-
- return NULL;
-}
-
-/************************************************************************
- * Async I/O Code *
- ************************************************************************/
-
-typedef struct AsyncIterator AsyncIterator;
-
-typedef void (*AsyncIteratorDone) (GOutputStream *stream,
- gpointer op_data,
- GAsyncReadyCallback callback,
- gpointer callback_data,
- GError *io_error);
-
-struct AsyncIterator {
- AsyncIteratorDone done_cb;
- GDaemonFileOutputStream *file;
- GCancellable *cancellable;
- IOOperationData io_data;
- state_machine_iterator iterator;
- gpointer iterator_data;
- int io_priority;
- GAsyncReadyCallback callback;
- gpointer callback_data;
-};
-
-static void async_iterate (AsyncIterator *iterator);
-
-static void
-async_iterator_done (AsyncIterator *iterator, GError *io_error)
-{
- iterator->done_cb (G_OUTPUT_STREAM (iterator->file),
- iterator->iterator_data,
- iterator->callback,
- iterator->callback_data,
- io_error);
-
- g_free (iterator);
-
-}
-
-static void
-async_op_handle (AsyncIterator *iterator,
- gssize res,
- GError *io_error)
-{
- IOOperationData *io_data = &iterator->io_data;
- GError *error;
-
- if (io_error != NULL)
- {
- if (error_is_cancel (io_error))
- {
- io_data->io_res = 0;
- io_data->io_cancelled = TRUE;
- }
- else
- {
- error = NULL;
- g_set_error (&error, G_IO_ERROR, G_IO_ERROR_FAILED,
- _("Error in stream protocol: %s"), io_error->message);
- async_iterator_done (iterator, error);
- g_error_free (error);
- return;
- }
- }
- else if (res == 0 && io_data->io_size != 0)
- {
- error = NULL;
- g_set_error (&error, G_IO_ERROR, G_IO_ERROR_FAILED,
- _("Error in stream protocol: %s"), _("End of stream"));
- async_iterator_done (iterator, error);
- g_error_free (error);
- return;
- }
- else
- {
- io_data->io_res = res;
- io_data->io_cancelled = FALSE;
- }
-
- async_iterate (iterator);
-}
-
-static void
-async_read_op_callback (GObject *source_object,
- GAsyncResult *res,
- gpointer user_data)
-{
- GInputStream *stream = G_INPUT_STREAM (source_object);
- gssize count_read;
- GError *error = NULL;
-
- count_read = g_input_stream_read_finish (stream, res, &error);
-
- async_op_handle ((AsyncIterator *)user_data, count_read, error);
- if (error)
- g_error_free (error);
-}
-
-static void
-async_skip_op_callback (GObject *source_object,
- GAsyncResult *res,
- gpointer user_data)
-{
- GInputStream *stream = G_INPUT_STREAM (source_object);
- gssize count_skipped;
- GError *error = NULL;
-
- count_skipped = g_input_stream_skip_finish (stream, res, &error);
-
- async_op_handle ((AsyncIterator *)user_data, count_skipped, error);
- if (error)
- g_error_free (error);
-}
-
-static void
-async_write_op_callback (GObject *source_object,
- GAsyncResult *res,
- gpointer user_data)
-{
- GOutputStream *stream = G_OUTPUT_STREAM (source_object);
- gssize bytes_written;
- GError *error = NULL;
-
- bytes_written = g_output_stream_write_finish (stream, res, &error);
-
- async_op_handle ((AsyncIterator *)user_data, bytes_written, error);
- if (error)
- g_error_free (error);
-}
-
-static void
-async_iterate (AsyncIterator *iterator)
-{
- IOOperationData *io_data = &iterator->io_data;
- GDaemonFileOutputStream *file = iterator->file;
- StateOp io_op;
-
- io_data->cancelled =
- g_cancellable_is_cancelled (iterator->cancellable);
-
- io_op = iterator->iterator (file, io_data, iterator->iterator_data);
-
- if (io_op == STATE_OP_DONE)
- {
- async_iterator_done (iterator, NULL);
- return;
- }
-
- /* TODO: Handle allow_cancel... */
-
- if (io_op == STATE_OP_READ)
- {
- g_input_stream_read_async (file->data_stream,
- io_data->io_buffer, io_data->io_size,
- iterator->io_priority,
- io_data->io_allow_cancel ? iterator->cancellable : NULL,
- async_read_op_callback, iterator);
- }
- else if (io_op == STATE_OP_SKIP)
- {
- g_input_stream_skip_async (file->data_stream,
- io_data->io_size,
- iterator->io_priority,
- io_data->io_allow_cancel ? iterator->cancellable : NULL,
- async_skip_op_callback, iterator);
- }
- else if (io_op == STATE_OP_WRITE)
- {
- g_output_stream_write_async (file->command_stream,
- io_data->io_buffer, io_data->io_size,
- iterator->io_priority,
- io_data->io_allow_cancel ? iterator->cancellable : NULL,
- async_write_op_callback, iterator);
- }
- else
- g_assert_not_reached ();
-}
-
-static void
-run_async_state_machine (GDaemonFileOutputStream *file,
- state_machine_iterator iterator_cb,
- gpointer iterator_data,
- int io_priority,
- GAsyncReadyCallback callback,
- gpointer data,
- GCancellable *cancellable,
- AsyncIteratorDone done_cb)
-{
- AsyncIterator *iterator;
-
- iterator = g_new0 (AsyncIterator, 1);
- iterator->file = file;
- iterator->iterator = iterator_cb;
- iterator->iterator_data = iterator_data;
- iterator->io_priority = io_priority;
- iterator->cancellable = cancellable;
- iterator->callback = callback;
- iterator->callback_data = data;
- iterator->done_cb = done_cb;
-
- async_iterate (iterator);
-}
-
-static void
-async_write_done (GOutputStream *stream,
- gpointer op_data,
- GAsyncReadyCallback callback,
- gpointer user_data,
- GError *io_error)
-{
- GSimpleAsyncResult *simple;
- WriteOperation *op;
- gssize count_written;
- GError *error;
-
- op = op_data;
-
- if (io_error)
- {
- count_written = -1;
- error = io_error;
- }
- else
- {
- count_written = op->ret_val;
- error = op->ret_error;
- }
-
- simple = g_simple_async_result_new (G_OBJECT (stream),
- callback, user_data,
- g_daemon_file_output_stream_write_async);
-
- g_simple_async_result_set_op_res_gssize (simple, count_written);
-
- if (count_written == -1)
- g_simple_async_result_set_from_error (simple, error);
-
- /* Complete immediately, not in idle, since we're already in a mainloop callout */
- g_simple_async_result_complete (simple);
- g_object_unref (simple);
-
- if (op->ret_error)
- g_error_free (op->ret_error);
- g_free (op);
-}
-
-static void
-g_daemon_file_output_stream_write_async (GOutputStream *stream,
- const void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer data)
-{
- GDaemonFileOutputStream *file;
- WriteOperation *op;
-
- file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
-
- /* Limit for sanity and to avoid 32bit overflow */
- if (count > MAX_WRITE_SIZE)
- count = MAX_WRITE_SIZE;
-
- op = g_new0 (WriteOperation, 1);
- op->state = WRITE_STATE_INIT;
- op->buffer = buffer;
- op->buffer_size = count;
-
- run_async_state_machine (file,
- (state_machine_iterator)iterate_write_state_machine,
- op,
- io_priority,
- callback, data,
- cancellable,
- async_write_done);
-}
-
-static gssize
-g_daemon_file_output_stream_write_finish (GOutputStream *stream,
- GAsyncResult *result,
- GError **error)
-{
- GSimpleAsyncResult *simple;
- gssize nwritten;
-
- simple = G_SIMPLE_ASYNC_RESULT (result);
- g_assert (g_simple_async_result_get_source_tag (simple) == g_daemon_file_output_stream_write_async);
-
- nwritten = g_simple_async_result_get_op_res_gssize (simple);
- return nwritten;
-}
-
-static void
-async_close_done (GOutputStream *stream,
- gpointer op_data,
- GAsyncReadyCallback callback,
- gpointer user_data,
- GError *io_error)
-{
- GDaemonFileOutputStream *file;
- GSimpleAsyncResult *simple;
- CloseOperation *op;
- gboolean result;
- GError *error;
- GCancellable *cancellable = NULL; /* TODO: get cancellable */
-
- file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
-
- op = op_data;
-
- if (io_error)
- {
- result = FALSE;
- error = io_error;
- }
- else
- {
- result = op->ret_val;
- error = op->ret_error;
- }
-
- if (result)
- result = g_output_stream_close (file->command_stream, cancellable, &error);
- else
- g_output_stream_close (file->command_stream, cancellable, NULL);
-
- if (result)
- result = g_input_stream_close (file->data_stream, cancellable, &error);
- else
- g_input_stream_close (file->data_stream, cancellable, NULL);
-
- simple = g_simple_async_result_new (G_OBJECT (stream),
- callback, user_data,
- g_daemon_file_output_stream_close_async);
-
- if (!result)
- g_simple_async_result_set_from_error (simple, error);
-
- /* Complete immediately, not in idle, since we're already in a mainloop callout */
- g_simple_async_result_complete (simple);
- g_object_unref (simple);
-
- if (op->ret_error)
- g_error_free (op->ret_error);
- g_free (op);
-}
-
-static void
-g_daemon_file_output_stream_close_async (GOutputStream *stream,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer data)
-{
- GDaemonFileOutputStream *file;
- CloseOperation *op;
-
- file = G_DAEMON_FILE_OUTPUT_STREAM (stream);
-
- op = g_new0 (CloseOperation, 1);
- op->state = CLOSE_STATE_INIT;
-
- run_async_state_machine (file,
- (state_machine_iterator)iterate_close_state_machine,
- op, io_priority,
- (GAsyncReadyCallback)callback, data,
- cancellable,
- (AsyncIteratorDone)async_close_done);
-}
-
-static gboolean
-g_daemon_file_output_stream_close_finish (GOutputStream *stream,
- GAsyncResult *result,
- GError **error)
-{
- /* Failures handled in generic close_finish code */
- return TRUE;
-}