diff options
Diffstat (limited to 'client/gdaemonfileoutputstream.c')
-rw-r--r-- | client/gdaemonfileoutputstream.c | 1374 |
1 files changed, 0 insertions, 1374 deletions
diff --git a/client/gdaemonfileoutputstream.c b/client/gdaemonfileoutputstream.c deleted file mode 100644 index 525475c8..00000000 --- a/client/gdaemonfileoutputstream.c +++ /dev/null @@ -1,1374 +0,0 @@ -/* GIO - GLib Input, Output and Streaming Library - * - * Copyright (C) 2006-2007 Red Hat, Inc. - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General - * Public License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place, Suite 330, - * Boston, MA 02111-1307, USA. - * - * Author: Alexander Larsson <alexl@redhat.com> - */ - -#include <config.h> - -#include <sys/types.h> -#include <sys/stat.h> -#include <fcntl.h> -#include <unistd.h> -#include <errno.h> -#include <string.h> -#include <sys/socket.h> -#include <sys/un.h> - -#include <glib.h> -#include <glib/gstdio.h> -#include <glib/gi18n-lib.h> -#include <gio/gio.h> -#include <gio/gunixinputstream.h> -#include <gio/gunixoutputstream.h> -#include "gdaemonfileoutputstream.h" -#include "gvfsdaemondbus.h" -#include <gvfsdaemonprotocol.h> - -#define MAX_WRITE_SIZE (4*1024*1024) - -typedef enum { - STATE_OP_DONE, - STATE_OP_READ, - STATE_OP_WRITE, - STATE_OP_SKIP -} StateOp; - -typedef enum { - WRITE_STATE_INIT = 0, - WRITE_STATE_WROTE_COMMAND, - WRITE_STATE_SEND_DATA, - WRITE_STATE_HANDLE_INPUT -} WriteState; - -typedef struct { - WriteState state; - - /* Output */ - const char *buffer; - gsize buffer_size; - gsize buffer_pos; - - /* Input */ - gssize ret_val; - GError *ret_error; - - gboolean sent_cancel; - - guint32 seq_nr; -} WriteOperation; - -typedef enum { - SEEK_STATE_INIT = 0, - SEEK_STATE_WROTE_REQUEST, - SEEK_STATE_HANDLE_INPUT -} SeekState; - -typedef struct { - SeekState state; - - /* Output */ - goffset offset; - GSeekType seek_type; - /* Output */ - gboolean ret_val; - GError *ret_error; - goffset ret_offset; - - gboolean sent_cancel; - - guint32 seq_nr; -} SeekOperation; - -typedef enum { - CLOSE_STATE_INIT = 0, - CLOSE_STATE_WROTE_REQUEST, - CLOSE_STATE_HANDLE_INPUT -} CloseState; - -typedef struct { - CloseState state; - - /* Output */ - - /* Output */ - gboolean ret_val; - GError *ret_error; - - gboolean sent_cancel; - - guint32 seq_nr; -} CloseOperation; - - -typedef struct { - gboolean cancelled; - - char *io_buffer; - gsize io_size; - gsize io_res; - /* The operation always succeeds, or gets cancelled. - If we get an error doing the i/o that is considered fatal */ - gboolean io_allow_cancel; - gboolean io_cancelled; -} IOOperationData; - -typedef StateOp (*state_machine_iterator) (GDaemonFileOutputStream *file, IOOperationData *io_op, gpointer data); - -struct _GDaemonFileOutputStream { - GFileOutputStream parent; - - GOutputStream *command_stream; - GInputStream *data_stream; - guint can_seek : 1; - - guint32 seq_nr; - goffset current_offset; - - gsize input_block_size; - GString *input_buffer; - - GString *output_buffer; - - char *etag; - -}; - -static gssize g_daemon_file_output_stream_write (GOutputStream *stream, - const void *buffer, - gsize count, - GCancellable *cancellable, - GError **error); -static gboolean g_daemon_file_output_stream_close (GOutputStream *stream, - GCancellable *cancellable, - GError **error); -static GFileInfo *g_daemon_file_output_stream_query_info (GFileOutputStream *stream, - char *attributes, - GCancellable *cancellable, - GError **error); -static char *g_daemon_file_output_stream_get_etag (GFileOutputStream *stream); -static goffset g_daemon_file_output_stream_tell (GFileOutputStream *stream); -static gboolean g_daemon_file_output_stream_can_seek (GFileOutputStream *stream); -static gboolean g_daemon_file_output_stream_seek (GFileOutputStream *stream, - goffset offset, - GSeekType type, - GCancellable *cancellable, - GError **error); -static void g_daemon_file_output_stream_write_async (GOutputStream *stream, - const void *buffer, - gsize count, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer data); -static gssize g_daemon_file_output_stream_write_finish (GOutputStream *stream, - GAsyncResult *result, - GError **error); -static void g_daemon_file_output_stream_close_async (GOutputStream *stream, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer data); -static gboolean g_daemon_file_output_stream_close_finish (GOutputStream *stream, - GAsyncResult *result, - GError **error); - -G_DEFINE_TYPE (GDaemonFileOutputStream, g_daemon_file_output_stream, - G_TYPE_FILE_OUTPUT_STREAM) - -static void -g_daemon_file_output_stream_finalize (GObject *object) -{ - GDaemonFileOutputStream *file; - - file = G_DAEMON_FILE_OUTPUT_STREAM (object); - - if (file->command_stream) - g_object_unref (file->command_stream); - if (file->data_stream) - g_object_unref (file->data_stream); - - g_string_free (file->input_buffer, TRUE); - g_string_free (file->output_buffer, TRUE); - - g_free (file->etag); - - if (G_OBJECT_CLASS (g_daemon_file_output_stream_parent_class)->finalize) - (*G_OBJECT_CLASS (g_daemon_file_output_stream_parent_class)->finalize) (object); -} - -static void -g_daemon_file_output_stream_class_init (GDaemonFileOutputStreamClass *klass) -{ - GObjectClass *gobject_class = G_OBJECT_CLASS (klass); - GOutputStreamClass *stream_class = G_OUTPUT_STREAM_CLASS (klass); - GFileOutputStreamClass *file_stream_class = G_FILE_OUTPUT_STREAM_CLASS (klass); - - gobject_class->finalize = g_daemon_file_output_stream_finalize; - - stream_class->write_fn = g_daemon_file_output_stream_write; - stream_class->close_fn = g_daemon_file_output_stream_close; - - stream_class->write_async = g_daemon_file_output_stream_write_async; - stream_class->write_finish = g_daemon_file_output_stream_write_finish; - stream_class->close_async = g_daemon_file_output_stream_close_async; - stream_class->close_finish = g_daemon_file_output_stream_close_finish; - - file_stream_class->tell = g_daemon_file_output_stream_tell; - file_stream_class->can_seek = g_daemon_file_output_stream_can_seek; - file_stream_class->seek = g_daemon_file_output_stream_seek; - file_stream_class->query_info = g_daemon_file_output_stream_query_info; - file_stream_class->get_etag = g_daemon_file_output_stream_get_etag; -} - -static void -g_daemon_file_output_stream_init (GDaemonFileOutputStream *info) -{ - info->output_buffer = g_string_new (""); - info->input_buffer = g_string_new (""); - info->seq_nr = 1; -} - -GFileOutputStream * -g_daemon_file_output_stream_new (int fd, - gboolean can_seek, - goffset initial_offset) -{ - GDaemonFileOutputStream *stream; - - stream = g_object_new (G_TYPE_DAEMON_FILE_OUTPUT_STREAM, NULL); - - stream->command_stream = g_unix_output_stream_new (fd, FALSE); - stream->data_stream = g_unix_input_stream_new (fd, TRUE); - stream->can_seek = can_seek; - stream->current_offset = initial_offset; - - return G_FILE_OUTPUT_STREAM (stream); -} - -static gboolean -error_is_cancel (GError *error) -{ - return error != NULL && - error->domain == G_IO_ERROR && - error->code == G_IO_ERROR_CANCELLED; -} - -static void -append_request (GDaemonFileOutputStream *stream, guint32 command, - guint32 arg1, guint32 arg2, guint32 data_len, guint32 *seq_nr) -{ - GVfsDaemonSocketProtocolRequest cmd; - - g_assert (sizeof (cmd) == G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE); - - if (seq_nr) - *seq_nr = stream->seq_nr; - - cmd.command = g_htonl (command); - cmd.seq_nr = g_htonl (stream->seq_nr++); - cmd.arg1 = g_htonl (arg1); - cmd.arg2 = g_htonl (arg2); - cmd.data_len = g_htonl (data_len); - - g_string_append_len (stream->output_buffer, - (char *)&cmd, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE); -} - -static gsize -get_reply_header_missing_bytes (GString *buffer) -{ - GVfsDaemonSocketProtocolReply *reply; - guint32 type; - guint32 arg2; - - if (buffer->len < G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE) - return G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE - buffer->len; - - reply = (GVfsDaemonSocketProtocolReply *)buffer->str; - - type = g_ntohl (reply->type); - arg2 = g_ntohl (reply->arg2); - - /* ERROR and CLOSED has extra data w/ len in arg2 */ - if (type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR || - type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_CLOSED) - return G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE + arg2 - buffer->len; - return 0; -} - -static char * -decode_reply (GString *buffer, GVfsDaemonSocketProtocolReply *reply_out) -{ - GVfsDaemonSocketProtocolReply *reply; - reply = (GVfsDaemonSocketProtocolReply *)buffer->str; - reply_out->type = g_ntohl (reply->type); - reply_out->seq_nr = g_ntohl (reply->seq_nr); - reply_out->arg1 = g_ntohl (reply->arg1); - reply_out->arg2 = g_ntohl (reply->arg2); - - return buffer->str + G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE; -} - -static void -decode_error (GVfsDaemonSocketProtocolReply *reply, char *data, GError **error) -{ - g_set_error_literal (error, - g_quark_from_string (data), - reply->arg1, - data + strlen (data) + 1); -} - - -static gboolean -run_sync_state_machine (GDaemonFileOutputStream *file, - state_machine_iterator iterator, - gpointer data, - GCancellable *cancellable, - GError **error) -{ - gssize res; - StateOp io_op; - IOOperationData io_data; - GError *io_error; - - memset (&io_data, 0, sizeof (io_data)); - - while (TRUE) - { - if (cancellable) - io_data.cancelled = g_cancellable_is_cancelled (cancellable); - - io_op = iterator (file, &io_data, data); - - if (io_op == STATE_OP_DONE) - return TRUE; - - io_error = NULL; - if (io_op == STATE_OP_READ) - { - res = g_input_stream_read (file->data_stream, - io_data.io_buffer, io_data.io_size, - io_data.io_allow_cancel ? cancellable : NULL, - &io_error); - } - else if (io_op == STATE_OP_SKIP) - { - res = g_input_stream_skip (file->data_stream, - io_data.io_size, - io_data.io_allow_cancel ? cancellable : NULL, - &io_error); - } - else if (io_op == STATE_OP_WRITE) - { - res = g_output_stream_write (file->command_stream, - io_data.io_buffer, io_data.io_size, - io_data.io_allow_cancel ? cancellable : NULL, - &io_error); - } - else - { - res = 0; - g_assert_not_reached (); - } - - if (res == -1) - { - if (error_is_cancel (io_error)) - { - io_data.io_res = 0; - io_data.io_cancelled = TRUE; - g_error_free (io_error); - } - else - { - g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, - _("Error in stream protocol: %s"), io_error->message); - g_error_free (io_error); - return FALSE; - } - } - else if (res == 0 && io_data.io_size != 0) - { - g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, - _("Error in stream protocol: %s"), _("End of stream")); - return FALSE; - } - else - { - io_data.io_res = res; - io_data.io_cancelled = FALSE; - } - } -} - -/* read cycle: - - if we know of a (partially read) matching outstanding block, read from it - create packet, append to outgoing - flush outgoing - start processing output, looking for a data block with same seek gen, - or an error with same seq nr - on cancel, send cancel command and go back to loop - */ - -static StateOp -iterate_write_state_machine (GDaemonFileOutputStream *file, IOOperationData *io_op, WriteOperation *op) -{ - gsize len; - - while (TRUE) - { - switch (op->state) - { - /* Initial state for read op */ - case WRITE_STATE_INIT: - append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_WRITE, - op->buffer_size, 0, op->buffer_size, &op->seq_nr); - op->state = WRITE_STATE_WROTE_COMMAND; - io_op->io_buffer = file->output_buffer->str; - io_op->io_size = file->output_buffer->len; - io_op->io_allow_cancel = TRUE; /* Allow cancel before first byte of request sent */ - return STATE_OP_WRITE; - - /* wrote parts of output_buffer */ - case WRITE_STATE_WROTE_COMMAND: - if (io_op->io_cancelled) - { - op->ret_val = -1; - g_set_error_literal (&op->ret_error, - G_IO_ERROR, - G_IO_ERROR_CANCELLED, - _("Operation was cancelled")); - return STATE_OP_DONE; - } - - if (io_op->io_res < file->output_buffer->len) - { - memcpy (file->output_buffer->str, - file->output_buffer->str + io_op->io_res, - file->output_buffer->len - io_op->io_res); - g_string_truncate (file->output_buffer, - file->output_buffer->len - io_op->io_res); - io_op->io_buffer = file->output_buffer->str; - io_op->io_size = file->output_buffer->len; - io_op->io_allow_cancel = FALSE; - return STATE_OP_WRITE; - } - g_string_truncate (file->output_buffer, 0); - - op->buffer_pos = 0; - if (op->sent_cancel) - op->state = WRITE_STATE_HANDLE_INPUT; - else - op->state = WRITE_STATE_SEND_DATA; - break; - - /* No op */ - case WRITE_STATE_SEND_DATA: - op->buffer_pos += io_op->io_res; - - if (op->buffer_pos < op->buffer_size) - { - io_op->io_buffer = (char *)(op->buffer + op->buffer_pos); - io_op->io_size = op->buffer_size - op->buffer_pos; - io_op->io_allow_cancel = FALSE; - return STATE_OP_WRITE; - } - - op->state = WRITE_STATE_HANDLE_INPUT; - break; - - /* No op */ - case WRITE_STATE_HANDLE_INPUT: - if (io_op->cancelled && !op->sent_cancel) - { - op->sent_cancel = TRUE; - append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL, - op->seq_nr, 0, 0, NULL); - op->state = WRITE_STATE_WROTE_COMMAND; - io_op->io_buffer = file->output_buffer->str; - io_op->io_size = file->output_buffer->len; - io_op->io_allow_cancel = FALSE; - return STATE_OP_WRITE; - } - - if (io_op->io_res > 0) - { - gsize unread_size = io_op->io_size - io_op->io_res; - g_string_set_size (file->input_buffer, - file->input_buffer->len - unread_size); - } - - len = get_reply_header_missing_bytes (file->input_buffer); - if (len > 0) - { - gsize current_len = file->input_buffer->len; - g_string_set_size (file->input_buffer, - current_len + len); - io_op->io_buffer = file->input_buffer->str + current_len; - io_op->io_size = len; - io_op->io_allow_cancel = !op->sent_cancel; - return STATE_OP_READ; - } - - /* Got full header */ - - { - GVfsDaemonSocketProtocolReply reply; - char *data; - data = decode_reply (file->input_buffer, &reply); - - if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR && - reply.seq_nr == op->seq_nr) - { - op->ret_val = -1; - decode_error (&reply, data, &op->ret_error); - g_string_truncate (file->input_buffer, 0); - return STATE_OP_DONE; - } - else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_WRITTEN) - { - op->ret_val = reply.arg1; - g_string_truncate (file->input_buffer, 0); - return STATE_OP_DONE; - } - /* Ignore other reply types */ - } - - g_string_truncate (file->input_buffer, 0); - - /* This wasn't interesting, read next reply */ - op->state = WRITE_STATE_HANDLE_INPUT; - break; - - default: - g_assert_not_reached (); - } - - /* Clear io_op between non-op state switches */ - io_op->io_size = 0; - io_op->io_res = 0; - io_op->io_cancelled = FALSE; - - } -} - -static gssize -g_daemon_file_output_stream_write (GOutputStream *stream, - const void *buffer, - gsize count, - GCancellable *cancellable, - GError **error) -{ - GDaemonFileOutputStream *file; - WriteOperation op; - - file = G_DAEMON_FILE_OUTPUT_STREAM (stream); - - if (g_cancellable_set_error_if_cancelled (cancellable, error)) - return -1; - - /* Limit for sanity and to avoid 32bit overflow */ - if (count > MAX_WRITE_SIZE) - count = MAX_WRITE_SIZE; - - memset (&op, 0, sizeof (op)); - op.state = WRITE_STATE_INIT; - op.buffer = buffer; - op.buffer_size = count; - - if (!run_sync_state_machine (file, (state_machine_iterator)iterate_write_state_machine, - &op, cancellable, error)) - return -1; /* IO Error */ - - if (op.ret_val == -1) - g_propagate_error (error, op.ret_error); - else - file->current_offset += op.ret_val; - - return op.ret_val; -} - -static StateOp -iterate_close_state_machine (GDaemonFileOutputStream *file, IOOperationData *io_op, CloseOperation *op) -{ - gsize len; - - while (TRUE) - { - switch (op->state) - { - /* Initial state for read op */ - case CLOSE_STATE_INIT: - append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CLOSE, - 0, 0, 0, &op->seq_nr); - op->state = CLOSE_STATE_WROTE_REQUEST; - io_op->io_buffer = file->output_buffer->str; - io_op->io_size = file->output_buffer->len; - io_op->io_allow_cancel = TRUE; /* Allow cancel before first byte of request sent */ - return STATE_OP_WRITE; - - /* wrote parts of output_buffer */ - case CLOSE_STATE_WROTE_REQUEST: - if (io_op->io_cancelled) - { - op->ret_val = FALSE; - g_set_error_literal (&op->ret_error, - G_IO_ERROR, - G_IO_ERROR_CANCELLED, - _("Operation was cancelled")); - return STATE_OP_DONE; - } - - if (io_op->io_res < file->output_buffer->len) - { - memcpy (file->output_buffer->str, - file->output_buffer->str + io_op->io_res, - file->output_buffer->len - io_op->io_res); - g_string_truncate (file->output_buffer, - file->output_buffer->len - io_op->io_res); - io_op->io_buffer = file->output_buffer->str; - io_op->io_size = file->output_buffer->len; - io_op->io_allow_cancel = FALSE; - return STATE_OP_WRITE; - } - g_string_truncate (file->output_buffer, 0); - - op->state = CLOSE_STATE_HANDLE_INPUT; - break; - - /* No op */ - case CLOSE_STATE_HANDLE_INPUT: - if (io_op->cancelled && !op->sent_cancel) - { - op->sent_cancel = TRUE; - append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL, - op->seq_nr, 0, 0, NULL); - op->state = CLOSE_STATE_WROTE_REQUEST; - io_op->io_buffer = file->output_buffer->str; - io_op->io_size = file->output_buffer->len; - io_op->io_allow_cancel = FALSE; - return STATE_OP_WRITE; - } - - if (io_op->io_res > 0) - { - gsize unread_size = io_op->io_size - io_op->io_res; - g_string_set_size (file->input_buffer, - file->input_buffer->len - unread_size); - } - - len = get_reply_header_missing_bytes (file->input_buffer); - if (len > 0) - { - gsize current_len = file->input_buffer->len; - g_string_set_size (file->input_buffer, - current_len + len); - io_op->io_buffer = file->input_buffer->str + current_len; - io_op->io_size = len; - io_op->io_allow_cancel = !op->sent_cancel; - return STATE_OP_READ; - } - - /* Got full header */ - - { - GVfsDaemonSocketProtocolReply reply; - char *data; - data = decode_reply (file->input_buffer, &reply); - - if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR && - reply.seq_nr == op->seq_nr) - { - op->ret_val = FALSE; - decode_error (&reply, data, &op->ret_error); - g_string_truncate (file->input_buffer, 0); - return STATE_OP_DONE; - } - else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_CLOSED) - { - op->ret_val = TRUE; - if (reply.arg2 > 0) - file->etag = g_strndup (data, reply.arg2); - g_string_truncate (file->input_buffer, 0); - return STATE_OP_DONE; - } - /* Ignore other reply types */ - } - - g_string_truncate (file->input_buffer, 0); - - /* This wasn't interesting, read next reply */ - op->state = CLOSE_STATE_HANDLE_INPUT; - break; - - default: - g_assert_not_reached (); - } - - /* Clear io_op between non-op state switches */ - io_op->io_size = 0; - io_op->io_res = 0; - io_op->io_cancelled = FALSE; - } -} - - -static gboolean -g_daemon_file_output_stream_close (GOutputStream *stream, - GCancellable *cancellable, - GError **error) -{ - GDaemonFileOutputStream *file; - CloseOperation op; - gboolean res; - - file = G_DAEMON_FILE_OUTPUT_STREAM (stream); - - /* We need to do a full roundtrip to guarantee that the writes have - reached the disk. */ - - memset (&op, 0, sizeof (op)); - op.state = CLOSE_STATE_INIT; - - if (!run_sync_state_machine (file, (state_machine_iterator)iterate_close_state_machine, - &op, cancellable, error)) - res = FALSE; - else - { - if (!op.ret_val) - g_propagate_error (error, op.ret_error); - res = op.ret_val; - } - - /* Return the first error, but close all streams */ - if (res) - res = g_output_stream_close (file->command_stream, cancellable, error); - else - g_output_stream_close (file->command_stream, cancellable, NULL); - - if (res) - res = g_input_stream_close (file->data_stream, cancellable, error); - else - g_input_stream_close (file->data_stream, cancellable, NULL); - - return res; -} - -static goffset -g_daemon_file_output_stream_tell (GFileOutputStream *stream) -{ - GDaemonFileOutputStream *file; - - file = G_DAEMON_FILE_OUTPUT_STREAM (stream); - - return file->current_offset; -} - -static gboolean -g_daemon_file_output_stream_can_seek (GFileOutputStream *stream) -{ - GDaemonFileOutputStream *file; - - file = G_DAEMON_FILE_OUTPUT_STREAM (stream); - - return file->can_seek; -} - -static StateOp -iterate_seek_state_machine (GDaemonFileOutputStream *file, IOOperationData *io_op, SeekOperation *op) -{ - gsize len; - guint32 request; - - while (TRUE) - { - switch (op->state) - { - /* Initial state for read op */ - case SEEK_STATE_INIT: - request = G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_SET; - if (op->seek_type == G_SEEK_CUR) - op->offset = file->current_offset + op->offset; - else if (op->seek_type == G_SEEK_END) - request = G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_END; - append_request (file, request, - op->offset & 0xffffffff, - op->offset >> 32, - 0, - &op->seq_nr); - op->state = SEEK_STATE_WROTE_REQUEST; - io_op->io_buffer = file->output_buffer->str; - io_op->io_size = file->output_buffer->len; - io_op->io_allow_cancel = TRUE; /* Allow cancel before first byte of request sent */ - return STATE_OP_WRITE; - - /* wrote parts of output_buffer */ - case SEEK_STATE_WROTE_REQUEST: - if (io_op->io_cancelled) - { - op->ret_val = -1; - g_set_error_literal (&op->ret_error, - G_IO_ERROR, - G_IO_ERROR_CANCELLED, - _("Operation was cancelled")); - return STATE_OP_DONE; - } - - if (io_op->io_res < file->output_buffer->len) - { - memcpy (file->output_buffer->str, - file->output_buffer->str + io_op->io_res, - file->output_buffer->len - io_op->io_res); - g_string_truncate (file->output_buffer, - file->output_buffer->len - io_op->io_res); - io_op->io_buffer = file->output_buffer->str; - io_op->io_size = file->output_buffer->len; - io_op->io_allow_cancel = FALSE; - return STATE_OP_WRITE; - } - g_string_truncate (file->output_buffer, 0); - - op->state = SEEK_STATE_HANDLE_INPUT; - break; - - /* No op */ - case SEEK_STATE_HANDLE_INPUT: - if (io_op->cancelled && !op->sent_cancel) - { - op->sent_cancel = TRUE; - append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL, - op->seq_nr, 0, 0, NULL); - op->state = SEEK_STATE_WROTE_REQUEST; - io_op->io_buffer = file->output_buffer->str; - io_op->io_size = file->output_buffer->len; - io_op->io_allow_cancel = FALSE; - return STATE_OP_WRITE; - } - - if (io_op->io_res > 0) - { - gsize unread_size = io_op->io_size - io_op->io_res; - g_string_set_size (file->input_buffer, - file->input_buffer->len - unread_size); - } - - len = get_reply_header_missing_bytes (file->input_buffer); - if (len > 0) - { - gsize current_len = file->input_buffer->len; - g_string_set_size (file->input_buffer, - current_len + len); - io_op->io_buffer = file->input_buffer->str + current_len; - io_op->io_size = len; - io_op->io_allow_cancel = !op->sent_cancel; - return STATE_OP_READ; - } - - /* Got full header */ - - { - GVfsDaemonSocketProtocolReply reply; - char *data; - data = decode_reply (file->input_buffer, &reply); - - if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR && - reply.seq_nr == op->seq_nr) - { - op->ret_val = FALSE; - decode_error (&reply, data, &op->ret_error); - g_string_truncate (file->input_buffer, 0); - return STATE_OP_DONE; - } - else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SEEK_POS) - { - op->ret_val = TRUE; - op->ret_offset = ((goffset)reply.arg2) << 32 | (goffset)reply.arg1; - g_string_truncate (file->input_buffer, 0); - return STATE_OP_DONE; - } - /* Ignore other reply types */ - } - - g_string_truncate (file->input_buffer, 0); - - /* This wasn't interesting, read next reply */ - op->state = SEEK_STATE_HANDLE_INPUT; - break; - - default: - g_assert_not_reached (); - } - - /* Clear io_op between non-op state switches */ - io_op->io_size = 0; - io_op->io_res = 0; - io_op->io_cancelled = FALSE; - } -} - -static gboolean -g_daemon_file_output_stream_seek (GFileOutputStream *stream, - goffset offset, - GSeekType type, - GCancellable *cancellable, - GError **error) -{ - GDaemonFileOutputStream *file; - SeekOperation op; - - file = G_DAEMON_FILE_OUTPUT_STREAM (stream); - - if (!file->can_seek) - { - g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, - _("Seek not supported on stream")); - return FALSE; - } - - if (g_cancellable_set_error_if_cancelled (cancellable, error)) - return FALSE; - - memset (&op, 0, sizeof (op)); - op.state = SEEK_STATE_INIT; - op.offset = offset; - op.seek_type = type; - - if (!run_sync_state_machine (file, (state_machine_iterator)iterate_seek_state_machine, - &op, cancellable, error)) - return FALSE; /* IO Error */ - - if (!op.ret_val) - g_propagate_error (error, op.ret_error); - else - file->current_offset = op.ret_offset; - - return op.ret_val; -} - -static char * -g_daemon_file_output_stream_get_etag (GFileOutputStream *stream) -{ - GDaemonFileOutputStream *file; - - file = G_DAEMON_FILE_OUTPUT_STREAM (stream); - - return g_strdup (file->etag); -} - -static GFileInfo * -g_daemon_file_output_stream_query_info (GFileOutputStream *stream, - char *attributes, - GCancellable *cancellable, - GError **error) -{ -#if 0 - GDaemonFileOutputStream *file; - - file = G_DAEMON_FILE_OUTPUT_STREAM (stream); -#endif - - g_set_error (error, - G_IO_ERROR, - G_IO_ERROR_NOT_SUPPORTED, - _("Query info not supported on stream")); - - return NULL; -} - -/************************************************************************ - * Async I/O Code * - ************************************************************************/ - -typedef struct AsyncIterator AsyncIterator; - -typedef void (*AsyncIteratorDone) (GOutputStream *stream, - gpointer op_data, - GAsyncReadyCallback callback, - gpointer callback_data, - GError *io_error); - -struct AsyncIterator { - AsyncIteratorDone done_cb; - GDaemonFileOutputStream *file; - GCancellable *cancellable; - IOOperationData io_data; - state_machine_iterator iterator; - gpointer iterator_data; - int io_priority; - GAsyncReadyCallback callback; - gpointer callback_data; -}; - -static void async_iterate (AsyncIterator *iterator); - -static void -async_iterator_done (AsyncIterator *iterator, GError *io_error) -{ - iterator->done_cb (G_OUTPUT_STREAM (iterator->file), - iterator->iterator_data, - iterator->callback, - iterator->callback_data, - io_error); - - g_free (iterator); - -} - -static void -async_op_handle (AsyncIterator *iterator, - gssize res, - GError *io_error) -{ - IOOperationData *io_data = &iterator->io_data; - GError *error; - - if (io_error != NULL) - { - if (error_is_cancel (io_error)) - { - io_data->io_res = 0; - io_data->io_cancelled = TRUE; - } - else - { - error = NULL; - g_set_error (&error, G_IO_ERROR, G_IO_ERROR_FAILED, - _("Error in stream protocol: %s"), io_error->message); - async_iterator_done (iterator, error); - g_error_free (error); - return; - } - } - else if (res == 0 && io_data->io_size != 0) - { - error = NULL; - g_set_error (&error, G_IO_ERROR, G_IO_ERROR_FAILED, - _("Error in stream protocol: %s"), _("End of stream")); - async_iterator_done (iterator, error); - g_error_free (error); - return; - } - else - { - io_data->io_res = res; - io_data->io_cancelled = FALSE; - } - - async_iterate (iterator); -} - -static void -async_read_op_callback (GObject *source_object, - GAsyncResult *res, - gpointer user_data) -{ - GInputStream *stream = G_INPUT_STREAM (source_object); - gssize count_read; - GError *error = NULL; - - count_read = g_input_stream_read_finish (stream, res, &error); - - async_op_handle ((AsyncIterator *)user_data, count_read, error); - if (error) - g_error_free (error); -} - -static void -async_skip_op_callback (GObject *source_object, - GAsyncResult *res, - gpointer user_data) -{ - GInputStream *stream = G_INPUT_STREAM (source_object); - gssize count_skipped; - GError *error = NULL; - - count_skipped = g_input_stream_skip_finish (stream, res, &error); - - async_op_handle ((AsyncIterator *)user_data, count_skipped, error); - if (error) - g_error_free (error); -} - -static void -async_write_op_callback (GObject *source_object, - GAsyncResult *res, - gpointer user_data) -{ - GOutputStream *stream = G_OUTPUT_STREAM (source_object); - gssize bytes_written; - GError *error = NULL; - - bytes_written = g_output_stream_write_finish (stream, res, &error); - - async_op_handle ((AsyncIterator *)user_data, bytes_written, error); - if (error) - g_error_free (error); -} - -static void -async_iterate (AsyncIterator *iterator) -{ - IOOperationData *io_data = &iterator->io_data; - GDaemonFileOutputStream *file = iterator->file; - StateOp io_op; - - io_data->cancelled = - g_cancellable_is_cancelled (iterator->cancellable); - - io_op = iterator->iterator (file, io_data, iterator->iterator_data); - - if (io_op == STATE_OP_DONE) - { - async_iterator_done (iterator, NULL); - return; - } - - /* TODO: Handle allow_cancel... */ - - if (io_op == STATE_OP_READ) - { - g_input_stream_read_async (file->data_stream, - io_data->io_buffer, io_data->io_size, - iterator->io_priority, - io_data->io_allow_cancel ? iterator->cancellable : NULL, - async_read_op_callback, iterator); - } - else if (io_op == STATE_OP_SKIP) - { - g_input_stream_skip_async (file->data_stream, - io_data->io_size, - iterator->io_priority, - io_data->io_allow_cancel ? iterator->cancellable : NULL, - async_skip_op_callback, iterator); - } - else if (io_op == STATE_OP_WRITE) - { - g_output_stream_write_async (file->command_stream, - io_data->io_buffer, io_data->io_size, - iterator->io_priority, - io_data->io_allow_cancel ? iterator->cancellable : NULL, - async_write_op_callback, iterator); - } - else - g_assert_not_reached (); -} - -static void -run_async_state_machine (GDaemonFileOutputStream *file, - state_machine_iterator iterator_cb, - gpointer iterator_data, - int io_priority, - GAsyncReadyCallback callback, - gpointer data, - GCancellable *cancellable, - AsyncIteratorDone done_cb) -{ - AsyncIterator *iterator; - - iterator = g_new0 (AsyncIterator, 1); - iterator->file = file; - iterator->iterator = iterator_cb; - iterator->iterator_data = iterator_data; - iterator->io_priority = io_priority; - iterator->cancellable = cancellable; - iterator->callback = callback; - iterator->callback_data = data; - iterator->done_cb = done_cb; - - async_iterate (iterator); -} - -static void -async_write_done (GOutputStream *stream, - gpointer op_data, - GAsyncReadyCallback callback, - gpointer user_data, - GError *io_error) -{ - GSimpleAsyncResult *simple; - WriteOperation *op; - gssize count_written; - GError *error; - - op = op_data; - - if (io_error) - { - count_written = -1; - error = io_error; - } - else - { - count_written = op->ret_val; - error = op->ret_error; - } - - simple = g_simple_async_result_new (G_OBJECT (stream), - callback, user_data, - g_daemon_file_output_stream_write_async); - - g_simple_async_result_set_op_res_gssize (simple, count_written); - - if (count_written == -1) - g_simple_async_result_set_from_error (simple, error); - - /* Complete immediately, not in idle, since we're already in a mainloop callout */ - g_simple_async_result_complete (simple); - g_object_unref (simple); - - if (op->ret_error) - g_error_free (op->ret_error); - g_free (op); -} - -static void -g_daemon_file_output_stream_write_async (GOutputStream *stream, - const void *buffer, - gsize count, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer data) -{ - GDaemonFileOutputStream *file; - WriteOperation *op; - - file = G_DAEMON_FILE_OUTPUT_STREAM (stream); - - /* Limit for sanity and to avoid 32bit overflow */ - if (count > MAX_WRITE_SIZE) - count = MAX_WRITE_SIZE; - - op = g_new0 (WriteOperation, 1); - op->state = WRITE_STATE_INIT; - op->buffer = buffer; - op->buffer_size = count; - - run_async_state_machine (file, - (state_machine_iterator)iterate_write_state_machine, - op, - io_priority, - callback, data, - cancellable, - async_write_done); -} - -static gssize -g_daemon_file_output_stream_write_finish (GOutputStream *stream, - GAsyncResult *result, - GError **error) -{ - GSimpleAsyncResult *simple; - gssize nwritten; - - simple = G_SIMPLE_ASYNC_RESULT (result); - g_assert (g_simple_async_result_get_source_tag (simple) == g_daemon_file_output_stream_write_async); - - nwritten = g_simple_async_result_get_op_res_gssize (simple); - return nwritten; -} - -static void -async_close_done (GOutputStream *stream, - gpointer op_data, - GAsyncReadyCallback callback, - gpointer user_data, - GError *io_error) -{ - GDaemonFileOutputStream *file; - GSimpleAsyncResult *simple; - CloseOperation *op; - gboolean result; - GError *error; - GCancellable *cancellable = NULL; /* TODO: get cancellable */ - - file = G_DAEMON_FILE_OUTPUT_STREAM (stream); - - op = op_data; - - if (io_error) - { - result = FALSE; - error = io_error; - } - else - { - result = op->ret_val; - error = op->ret_error; - } - - if (result) - result = g_output_stream_close (file->command_stream, cancellable, &error); - else - g_output_stream_close (file->command_stream, cancellable, NULL); - - if (result) - result = g_input_stream_close (file->data_stream, cancellable, &error); - else - g_input_stream_close (file->data_stream, cancellable, NULL); - - simple = g_simple_async_result_new (G_OBJECT (stream), - callback, user_data, - g_daemon_file_output_stream_close_async); - - if (!result) - g_simple_async_result_set_from_error (simple, error); - - /* Complete immediately, not in idle, since we're already in a mainloop callout */ - g_simple_async_result_complete (simple); - g_object_unref (simple); - - if (op->ret_error) - g_error_free (op->ret_error); - g_free (op); -} - -static void -g_daemon_file_output_stream_close_async (GOutputStream *stream, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer data) -{ - GDaemonFileOutputStream *file; - CloseOperation *op; - - file = G_DAEMON_FILE_OUTPUT_STREAM (stream); - - op = g_new0 (CloseOperation, 1); - op->state = CLOSE_STATE_INIT; - - run_async_state_machine (file, - (state_machine_iterator)iterate_close_state_machine, - op, io_priority, - (GAsyncReadyCallback)callback, data, - cancellable, - (AsyncIteratorDone)async_close_done); -} - -static gboolean -g_daemon_file_output_stream_close_finish (GOutputStream *stream, - GAsyncResult *result, - GError **error) -{ - /* Failures handled in generic close_finish code */ - return TRUE; -} |