/* GIO - GLib Input, Output and Streaming Library * * Copyright (C) 2006-2007 Red Hat, Inc. * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General * Public License along with this library; if not, write to the * Free Software Foundation, Inc., 59 Temple Place, Suite 330, * Boston, MA 02111-1307, USA. * * Author: Alexander Larsson */ /* GIO - GLib Input, Output and Streaming Library * * Copyright (C) 2006-2007 Red Hat, Inc. * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General * Public License along with this library; if not, write to the * Free Software Foundation, Inc., 59 Temple Place, Suite 330, * Boston, MA 02111-1307, USA. * * Author: Alexander Larsson */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "gdaemonfileinputstream.h" #include "gvfsdaemondbus.h" #include #define MAX_READ_SIZE (4*1024*1024) typedef enum { INPUT_STATE_IN_REPLY_HEADER, INPUT_STATE_IN_BLOCK } InputState; typedef enum { STATE_OP_DONE, STATE_OP_READ, STATE_OP_WRITE, STATE_OP_SKIP } StateOp; typedef enum { READ_STATE_INIT = 0, READ_STATE_WROTE_COMMAND, READ_STATE_HANDLE_INPUT, READ_STATE_HANDLE_INPUT_BLOCK, READ_STATE_SKIP_BLOCK, READ_STATE_HANDLE_HEADER, READ_STATE_READ_BLOCK } ReadState; typedef struct { ReadState state; /* Input */ char *buffer; gsize buffer_size; /* Output */ gssize ret_val; GError *ret_error; gboolean sent_cancel; guint32 seq_nr; } ReadOperation; typedef enum { SEEK_STATE_INIT = 0, SEEK_STATE_WROTE_REQUEST, SEEK_STATE_HANDLE_INPUT, SEEK_STATE_HANDLE_INPUT_BLOCK, SEEK_STATE_SKIP_BLOCK, SEEK_STATE_HANDLE_HEADER } SeekState; typedef struct { SeekState state; /* Input */ goffset offset; GSeekType seek_type; /* Output */ gboolean ret_val; GError *ret_error; goffset ret_offset; gboolean sent_cancel; gboolean sent_seek; guint32 seq_nr; } SeekOperation; typedef enum { CLOSE_STATE_INIT = 0, CLOSE_STATE_WROTE_REQUEST, CLOSE_STATE_HANDLE_INPUT, CLOSE_STATE_HANDLE_INPUT_BLOCK, CLOSE_STATE_SKIP_BLOCK, CLOSE_STATE_HANDLE_HEADER } CloseState; typedef struct { CloseState state; /* Input */ /* Output */ gboolean ret_val; GError *ret_error; gboolean sent_cancel; guint32 seq_nr; } CloseOperation; typedef struct { gboolean cancelled; char *io_buffer; gsize io_size; gsize io_res; /* The operation always succeeds, or gets cancelled. If we get an error doing the i/o that is considered fatal */ gboolean io_allow_cancel; gboolean io_cancelled; } IOOperationData; typedef StateOp (*state_machine_iterator) (GDaemonFileInputStream *file, IOOperationData *io_op, gpointer data); struct _GDaemonFileInputStream { GFileInputStream parent; GOutputStream *command_stream; GInputStream *data_stream; guint can_seek : 1; int seek_generation; guint32 seq_nr; goffset current_offset; InputState input_state; gsize input_block_size; int input_block_seek_generation; GString *input_buffer; GString *output_buffer; }; static gssize g_daemon_file_input_stream_read (GInputStream *stream, void *buffer, gsize count, GCancellable *cancellable, GError **error); static gssize g_daemon_file_input_stream_skip (GInputStream *stream, gsize count, GCancellable *cancellable, GError **error); static gboolean g_daemon_file_input_stream_close (GInputStream *stream, GCancellable *cancellable, GError **error); static GFileInfo *g_daemon_file_input_stream_query_info (GFileInputStream *stream, char *attributes, GCancellable *cancellable, GError **error); static goffset g_daemon_file_input_stream_tell (GFileInputStream *stream); static gboolean g_daemon_file_input_stream_can_seek (GFileInputStream *stream); static gboolean g_daemon_file_input_stream_seek (GFileInputStream *stream, goffset offset, GSeekType type, GCancellable *cancellable, GError **error); static void g_daemon_file_input_stream_read_async (GInputStream *stream, void *buffer, gsize count, int io_priority, GCancellable *cancellable, GAsyncReadyCallback callback, gpointer data); static gssize g_daemon_file_input_stream_read_finish (GInputStream *stream, GAsyncResult *result, GError **error); static void g_daemon_file_input_stream_skip_async (GInputStream *stream, gsize count, int io_priority, GCancellable *cancellable, GAsyncReadyCallback callback, gpointer data); static gssize g_daemon_file_input_stream_skip_finish (GInputStream *stream, GAsyncResult *result, GError **error); static void g_daemon_file_input_stream_close_async (GInputStream *stream, int io_priority, GCancellable *cancellable, GAsyncReadyCallback callback, gpointer data); static gboolean g_daemon_file_input_stream_close_finish (GInputStream *stream, GAsyncResult *result, GError **error); G_DEFINE_TYPE (GDaemonFileInputStream, g_daemon_file_input_stream, G_TYPE_FILE_INPUT_STREAM) static void g_daemon_file_input_stream_finalize (GObject *object) { GDaemonFileInputStream *file; file = G_DAEMON_FILE_INPUT_STREAM (object); if (file->command_stream) g_object_unref (file->command_stream); if (file->data_stream) g_object_unref (file->data_stream); g_string_free (file->input_buffer, TRUE); g_string_free (file->output_buffer, TRUE); if (G_OBJECT_CLASS (g_daemon_file_input_stream_parent_class)->finalize) (*G_OBJECT_CLASS (g_daemon_file_input_stream_parent_class)->finalize) (object); } static void g_daemon_file_input_stream_class_init (GDaemonFileInputStreamClass *klass) { GObjectClass *gobject_class = G_OBJECT_CLASS (klass); GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass); GFileInputStreamClass *file_stream_class = G_FILE_INPUT_STREAM_CLASS (klass); gobject_class->finalize = g_daemon_file_input_stream_finalize; stream_class->read_fn = g_daemon_file_input_stream_read; if (0) stream_class->skip = g_daemon_file_input_stream_skip; stream_class->close_fn = g_daemon_file_input_stream_close; stream_class->read_async = g_daemon_file_input_stream_read_async; stream_class->read_finish = g_daemon_file_input_stream_read_finish; if (0) { stream_class->skip_async = g_daemon_file_input_stream_skip_async; stream_class->skip_finish = g_daemon_file_input_stream_skip_finish; } stream_class->close_async = g_daemon_file_input_stream_close_async; stream_class->close_finish = g_daemon_file_input_stream_close_finish; file_stream_class->tell = g_daemon_file_input_stream_tell; file_stream_class->can_seek = g_daemon_file_input_stream_can_seek; file_stream_class->seek = g_daemon_file_input_stream_seek; file_stream_class->query_info = g_daemon_file_input_stream_query_info; } static void g_daemon_file_input_stream_init (GDaemonFileInputStream *info) { info->output_buffer = g_string_new (""); info->input_buffer = g_string_new (""); info->seq_nr = 1; } GFileInputStream * g_daemon_file_input_stream_new (int fd, gboolean can_seek) { GDaemonFileInputStream *stream; stream = g_object_new (G_TYPE_DAEMON_FILE_INPUT_STREAM, NULL); stream->command_stream = g_unix_output_stream_new (fd, FALSE); stream->data_stream = g_unix_input_stream_new (fd, TRUE); stream->can_seek = can_seek; return G_FILE_INPUT_STREAM (stream); } static gboolean error_is_cancel (GError *error) { return error != NULL && error->domain == G_IO_ERROR && error->code == G_IO_ERROR_CANCELLED; } static void append_request (GDaemonFileInputStream *stream, guint32 command, guint32 arg1, guint32 arg2, guint32 *seq_nr) { GVfsDaemonSocketProtocolRequest cmd; g_assert (sizeof (cmd) == G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE); if (seq_nr) *seq_nr = stream->seq_nr; cmd.command = g_htonl (command); cmd.seq_nr = g_htonl (stream->seq_nr++); cmd.arg1 = g_htonl (arg1); cmd.arg2 = g_htonl (arg2); cmd.data_len = 0; g_string_append_len (stream->output_buffer, (char *)&cmd, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE); } static gsize get_reply_header_missing_bytes (GString *buffer) { GVfsDaemonSocketProtocolReply *reply; guint32 type; guint32 arg2; if (buffer->len < G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE) return G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE - buffer->len; reply = (GVfsDaemonSocketProtocolReply *)buffer->str; type = g_ntohl (reply->type); arg2 = g_ntohl (reply->arg2); if (type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR) return G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE + arg2 - buffer->len; return 0; } static char * decode_reply (GString *buffer, GVfsDaemonSocketProtocolReply *reply_out) { GVfsDaemonSocketProtocolReply *reply; reply = (GVfsDaemonSocketProtocolReply *)buffer->str; reply_out->type = g_ntohl (reply->type); reply_out->seq_nr = g_ntohl (reply->seq_nr); reply_out->arg1 = g_ntohl (reply->arg1); reply_out->arg2 = g_ntohl (reply->arg2); return buffer->str + G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE; } static void decode_error (GVfsDaemonSocketProtocolReply *reply, char *data, GError **error) { g_set_error (error, g_quark_from_string (data), reply->arg1, "%s", data + strlen (data) + 1); } static gboolean run_sync_state_machine (GDaemonFileInputStream *file, state_machine_iterator iterator, gpointer data, GCancellable *cancellable, GError **error) { gssize res; StateOp io_op; IOOperationData io_data; GError *io_error; memset (&io_data, 0, sizeof (io_data)); while (TRUE) { if (cancellable) io_data.cancelled = g_cancellable_is_cancelled (cancellable); io_op = iterator (file, &io_data, data); if (io_op == STATE_OP_DONE) return TRUE; io_error = NULL; if (io_op == STATE_OP_READ) { res = g_input_stream_read (file->data_stream, io_data.io_buffer, io_data.io_size, io_data.io_allow_cancel ? cancellable : NULL, &io_error); } else if (io_op == STATE_OP_SKIP) { res = g_input_stream_skip (file->data_stream, io_data.io_size, io_data.io_allow_cancel ? cancellable : NULL, &io_error); } else if (io_op == STATE_OP_WRITE) { res = g_output_stream_write (file->command_stream, io_data.io_buffer, io_data.io_size, io_data.io_allow_cancel ? cancellable : NULL, &io_error); } else { res = 0; g_assert_not_reached (); } if (res == -1) { if (error_is_cancel (io_error)) { io_data.io_res = 0; io_data.io_cancelled = TRUE; g_error_free (io_error); } else { g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, _("Error in stream protocol: %s"), io_error->message); g_error_free (io_error); return FALSE; } } else if (res == 0 && io_data.io_size != 0) { g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, _("Error in stream protocol: %s"), _("End of stream")); return FALSE; } else { io_data.io_res = res; io_data.io_cancelled = FALSE; } } } /* read cycle: if we know of a (partially read) matching outstanding block, read from it create packet, append to outgoing flush outgoing start processing input, looking for a data block with same seek gen, or an error with same seq nr on cancel, send cancel command and go back to loop */ static StateOp iterate_read_state_machine (GDaemonFileInputStream *file, IOOperationData *io_op, ReadOperation *op) { gsize len; while (TRUE) { switch (op->state) { /* Initial state for read op */ case READ_STATE_INIT: /* If we're already reading some data, but we didn't read all, just use that and don't even send a request */ if (file->input_state == INPUT_STATE_IN_BLOCK && file->seek_generation == file->input_block_seek_generation) { op->state = READ_STATE_READ_BLOCK; io_op->io_buffer = op->buffer; io_op->io_size = MIN (op->buffer_size, file->input_block_size); io_op->io_allow_cancel = TRUE; /* Allow cancel before we sent request */ return STATE_OP_READ; } append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_READ, op->buffer_size, 0, &op->seq_nr); op->state = READ_STATE_WROTE_COMMAND; io_op->io_buffer = file->output_buffer->str; io_op->io_size = file->output_buffer->len; io_op->io_allow_cancel = TRUE; /* Allow cancel before first byte of request sent */ return STATE_OP_WRITE; /* wrote parts of output_buffer */ case READ_STATE_WROTE_COMMAND: if (io_op->io_cancelled) { op->ret_val = -1; g_set_error (&op->ret_error, G_IO_ERROR, G_IO_ERROR_CANCELLED, "%s", _("Operation was cancelled")); return STATE_OP_DONE; } if (io_op->io_res < file->output_buffer->len) { memcpy (file->output_buffer->str, file->output_buffer->str + io_op->io_res, file->output_buffer->len - io_op->io_res); g_string_truncate (file->output_buffer, file->output_buffer->len - io_op->io_res); io_op->io_buffer = file->output_buffer->str; io_op->io_size = file->output_buffer->len; io_op->io_allow_cancel = FALSE; return STATE_OP_WRITE; } g_string_truncate (file->output_buffer, 0); op->state = READ_STATE_HANDLE_INPUT; break; /* No op */ case READ_STATE_HANDLE_INPUT: if (io_op->cancelled && !op->sent_cancel) { op->sent_cancel = TRUE; append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL, op->seq_nr, 0, NULL); op->state = READ_STATE_WROTE_COMMAND; io_op->io_buffer = file->output_buffer->str; io_op->io_size = file->output_buffer->len; io_op->io_allow_cancel = FALSE; return STATE_OP_WRITE; } if (file->input_state == INPUT_STATE_IN_BLOCK) { op->state = READ_STATE_HANDLE_INPUT_BLOCK; break; } else if (file->input_state == INPUT_STATE_IN_REPLY_HEADER) { op->state = READ_STATE_HANDLE_HEADER; break; } g_assert_not_reached (); break; /* No op */ case READ_STATE_HANDLE_INPUT_BLOCK: g_assert (file->input_state == INPUT_STATE_IN_BLOCK); if (file->seek_generation == file->input_block_seek_generation) { op->state = READ_STATE_READ_BLOCK; io_op->io_buffer = op->buffer; io_op->io_size = MIN (op->buffer_size, file->input_block_size); io_op->io_allow_cancel = FALSE; return STATE_OP_READ; } else { op->state = READ_STATE_SKIP_BLOCK; io_op->io_buffer = NULL; io_op->io_size = file->input_block_size; io_op->io_allow_cancel = !op->sent_cancel; return STATE_OP_SKIP; } break; /* Read block data */ case READ_STATE_SKIP_BLOCK: if (io_op->io_cancelled) { op->state = READ_STATE_HANDLE_INPUT; break; } g_assert (io_op->io_res <= file->input_block_size); file->input_block_size -= io_op->io_res; if (file->input_block_size == 0) file->input_state = INPUT_STATE_IN_REPLY_HEADER; op->state = READ_STATE_HANDLE_INPUT; break; /* read header data, (or manual io_len/res = 0) */ case READ_STATE_HANDLE_HEADER: if (io_op->io_cancelled) { op->state = READ_STATE_HANDLE_INPUT; break; } if (io_op->io_res > 0) { gsize unread_size = io_op->io_size - io_op->io_res; g_string_set_size (file->input_buffer, file->input_buffer->len - unread_size); } len = get_reply_header_missing_bytes (file->input_buffer); if (len > 0) { gsize current_len = file->input_buffer->len; g_string_set_size (file->input_buffer, current_len + len); io_op->io_buffer = file->input_buffer->str + current_len; io_op->io_size = len; io_op->io_allow_cancel = !op->sent_cancel; return STATE_OP_READ; } /* Got full header */ { GVfsDaemonSocketProtocolReply reply; char *data; data = decode_reply (file->input_buffer, &reply); if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR && reply.seq_nr == op->seq_nr) { op->ret_val = -1; decode_error (&reply, data, &op->ret_error); g_string_truncate (file->input_buffer, 0); return STATE_OP_DONE; } else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_DATA) { g_string_truncate (file->input_buffer, 0); file->input_state = INPUT_STATE_IN_BLOCK; file->input_block_size = reply.arg1; file->input_block_seek_generation = reply.arg2; op->state = READ_STATE_HANDLE_INPUT_BLOCK; break; } /* Ignore other reply types */ } g_string_truncate (file->input_buffer, 0); /* This wasn't interesting, read next reply */ op->state = READ_STATE_HANDLE_HEADER; break; /* Read block data */ case READ_STATE_READ_BLOCK: if (io_op->io_cancelled) { op->ret_val = -1; g_set_error (&op->ret_error, G_IO_ERROR, G_IO_ERROR_CANCELLED, "%s", _("Operation was cancelled")); return STATE_OP_DONE; } if (io_op->io_res > 0) { g_assert (io_op->io_res <= file->input_block_size); file->input_block_size -= io_op->io_res; if (file->input_block_size == 0) file->input_state = INPUT_STATE_IN_REPLY_HEADER; } op->ret_val = io_op->io_res; op->ret_error = NULL; return STATE_OP_DONE; default: g_assert_not_reached (); } /* Clear io_op between non-op state switches */ io_op->io_size = 0; io_op->io_res = 0; io_op->io_cancelled = FALSE; } } static gssize g_daemon_file_input_stream_read (GInputStream *stream, void *buffer, gsize count, GCancellable *cancellable, GError **error) { GDaemonFileInputStream *file; ReadOperation op; file = G_DAEMON_FILE_INPUT_STREAM (stream); if (g_cancellable_set_error_if_cancelled (cancellable, error)) return -1; /* Limit for sanity and to avoid 32bit overflow */ if (count > MAX_READ_SIZE) count = MAX_READ_SIZE; memset (&op, 0, sizeof (op)); op.state = READ_STATE_INIT; op.buffer = buffer; op.buffer_size = count; if (!run_sync_state_machine (file, (state_machine_iterator)iterate_read_state_machine, &op, cancellable, error)) return -1; /* IO Error */ if (op.ret_val == -1) g_propagate_error (error, op.ret_error); else file->current_offset += op.ret_val; return op.ret_val; } static gssize g_daemon_file_input_stream_skip (GInputStream *stream, gsize count, GCancellable *cancellable, GError **error) { #if 0 GDaemonFileInputStream *file; file = G_DAEMON_FILE_INPUT_STREAM (stream); #endif /* TODO: implement skip */ g_assert_not_reached (); return 0; } static StateOp iterate_close_state_machine (GDaemonFileInputStream *file, IOOperationData *io_op, CloseOperation *op) { gsize len; while (TRUE) { switch (op->state) { /* Initial state for read op */ case CLOSE_STATE_INIT: append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CLOSE, 0, 0, &op->seq_nr); op->state = CLOSE_STATE_WROTE_REQUEST; io_op->io_buffer = file->output_buffer->str; io_op->io_size = file->output_buffer->len; io_op->io_allow_cancel = TRUE; /* Allow cancel before first byte of request sent */ return STATE_OP_WRITE; /* wrote parts of output_buffer */ case CLOSE_STATE_WROTE_REQUEST: if (io_op->io_cancelled) { op->ret_val = FALSE; g_set_error (&op->ret_error, G_IO_ERROR, G_IO_ERROR_CANCELLED, "%s", _("Operation was cancelled")); return STATE_OP_DONE; } if (io_op->io_res < file->output_buffer->len) { memcpy (file->output_buffer->str, file->output_buffer->str + io_op->io_res, file->output_buffer->len - io_op->io_res); g_string_truncate (file->output_buffer, file->output_buffer->len - io_op->io_res); io_op->io_buffer = file->output_buffer->str; io_op->io_size = file->output_buffer->len; io_op->io_allow_cancel = FALSE; return STATE_OP_WRITE; } g_string_truncate (file->output_buffer, 0); op->state = CLOSE_STATE_HANDLE_INPUT; break; /* No op */ case CLOSE_STATE_HANDLE_INPUT: if (io_op->cancelled && !op->sent_cancel) { op->sent_cancel = TRUE; append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL, op->seq_nr, 0, NULL); op->state = CLOSE_STATE_WROTE_REQUEST; io_op->io_buffer = file->output_buffer->str; io_op->io_size = file->output_buffer->len; io_op->io_allow_cancel = FALSE; return STATE_OP_WRITE; } if (file->input_state == INPUT_STATE_IN_BLOCK) { op->state = CLOSE_STATE_HANDLE_INPUT_BLOCK; break; } else if (file->input_state == INPUT_STATE_IN_REPLY_HEADER) { op->state = CLOSE_STATE_HANDLE_HEADER; break; } g_assert_not_reached (); break; /* No op */ case CLOSE_STATE_HANDLE_INPUT_BLOCK: g_assert (file->input_state == INPUT_STATE_IN_BLOCK); op->state = CLOSE_STATE_SKIP_BLOCK; io_op->io_buffer = NULL; io_op->io_size = file->input_block_size; io_op->io_allow_cancel = !op->sent_cancel; return STATE_OP_SKIP; /* Read block data */ case CLOSE_STATE_SKIP_BLOCK: if (io_op->io_cancelled) { op->state = CLOSE_STATE_HANDLE_INPUT; break; } g_assert (io_op->io_res <= file->input_block_size); file->input_block_size -= io_op->io_res; if (file->input_block_size == 0) file->input_state = INPUT_STATE_IN_REPLY_HEADER; op->state = CLOSE_STATE_HANDLE_INPUT; break; /* read header data, (or manual io_len/res = 0) */ case CLOSE_STATE_HANDLE_HEADER: if (io_op->io_cancelled) { op->state = CLOSE_STATE_HANDLE_INPUT; break; } if (io_op->io_res > 0) { gsize unread_size = io_op->io_size - io_op->io_res; g_string_set_size (file->input_buffer, file->input_buffer->len - unread_size); } len = get_reply_header_missing_bytes (file->input_buffer); if (len > 0) { gsize current_len = file->input_buffer->len; g_string_set_size (file->input_buffer, current_len + len); io_op->io_buffer = file->input_buffer->str + current_len; io_op->io_size = len; io_op->io_allow_cancel = !op->sent_cancel; return STATE_OP_READ; } /* Got full header */ { GVfsDaemonSocketProtocolReply reply; char *data; data = decode_reply (file->input_buffer, &reply); if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR && reply.seq_nr == op->seq_nr) { op->ret_val = FALSE; decode_error (&reply, data, &op->ret_error); g_string_truncate (file->input_buffer, 0); return STATE_OP_DONE; } else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_DATA) { g_string_truncate (file->input_buffer, 0); file->input_state = INPUT_STATE_IN_BLOCK; file->input_block_size = reply.arg1; file->input_block_seek_generation = reply.arg2; op->state = CLOSE_STATE_HANDLE_INPUT_BLOCK; break; } else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_CLOSED) { op->ret_val = TRUE; g_string_truncate (file->input_buffer, 0); return STATE_OP_DONE; } /* Ignore other reply types */ } g_string_truncate (file->input_buffer, 0); /* This wasn't interesting, read next reply */ op->state = CLOSE_STATE_HANDLE_HEADER; break; default: g_assert_not_reached (); } /* Clear io_op between non-op state switches */ io_op->io_size = 0; io_op->io_res = 0; io_op->io_cancelled = FALSE; } } static gboolean g_daemon_file_input_stream_close (GInputStream *stream, GCancellable *cancellable, GError **error) { GDaemonFileInputStream *file; CloseOperation op; gboolean res; file = G_DAEMON_FILE_INPUT_STREAM (stream); /* We need to do a full roundtrip to guarantee that the writes have reached the disk. */ memset (&op, 0, sizeof (op)); op.state = CLOSE_STATE_INIT; if (!run_sync_state_machine (file, (state_machine_iterator)iterate_close_state_machine, &op, cancellable, error)) res = FALSE; else { if (!op.ret_val) g_propagate_error (error, op.ret_error); res = op.ret_val; } /* Return the first error, but close all streams */ if (res) res = g_output_stream_close (file->command_stream, cancellable, error); else g_output_stream_close (file->command_stream, cancellable, NULL); if (res) res = g_input_stream_close (file->data_stream, cancellable, error); else g_input_stream_close (file->data_stream, cancellable, NULL); return res; } static goffset g_daemon_file_input_stream_tell (GFileInputStream *stream) { GDaemonFileInputStream *file; file = G_DAEMON_FILE_INPUT_STREAM (stream); return file->current_offset; } static gboolean g_daemon_file_input_stream_can_seek (GFileInputStream *stream) { GDaemonFileInputStream *file; file = G_DAEMON_FILE_INPUT_STREAM (stream); return file->can_seek; } static StateOp iterate_seek_state_machine (GDaemonFileInputStream *file, IOOperationData *io_op, SeekOperation *op) { gsize len; guint32 request; while (TRUE) { switch (op->state) { /* Initial state for read op */ case SEEK_STATE_INIT: request = G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_SET; if (op->seek_type == G_SEEK_CUR) op->offset = file->current_offset + op->offset; else if (op->seek_type == G_SEEK_END) request = G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_END; append_request (file, request, op->offset & 0xffffffff, op->offset >> 32, &op->seq_nr); op->state = SEEK_STATE_WROTE_REQUEST; op->sent_seek = FALSE; io_op->io_buffer = file->output_buffer->str; io_op->io_size = file->output_buffer->len; io_op->io_allow_cancel = TRUE; /* Allow cancel before first byte of request sent */ return STATE_OP_WRITE; /* wrote parts of output_buffer */ case SEEK_STATE_WROTE_REQUEST: if (io_op->io_cancelled) { op->ret_val = -1; g_set_error (&op->ret_error, G_IO_ERROR, G_IO_ERROR_CANCELLED, "%s", _("Operation was cancelled")); return STATE_OP_DONE; } /* We weren't cancelled before first byte sent, so now we will send * the seek request. Increase the seek generation now. */ if (!op->sent_seek) file->seek_generation++; op->sent_seek = TRUE; if (io_op->io_res < file->output_buffer->len) { memcpy (file->output_buffer->str, file->output_buffer->str + io_op->io_res, file->output_buffer->len - io_op->io_res); g_string_truncate (file->output_buffer, file->output_buffer->len - io_op->io_res); io_op->io_buffer = file->output_buffer->str; io_op->io_size = file->output_buffer->len; io_op->io_allow_cancel = FALSE; return STATE_OP_WRITE; } g_string_truncate (file->output_buffer, 0); op->state = SEEK_STATE_HANDLE_INPUT; break; /* No op */ case SEEK_STATE_HANDLE_INPUT: if (io_op->cancelled && !op->sent_cancel) { op->sent_cancel = TRUE; append_request (file, G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL, op->seq_nr, 0, NULL); op->state = SEEK_STATE_WROTE_REQUEST; io_op->io_buffer = file->output_buffer->str; io_op->io_size = file->output_buffer->len; io_op->io_allow_cancel = FALSE; return STATE_OP_WRITE; } if (file->input_state == INPUT_STATE_IN_BLOCK) { op->state = SEEK_STATE_HANDLE_INPUT_BLOCK; break; } else if (file->input_state == INPUT_STATE_IN_REPLY_HEADER) { op->state = SEEK_STATE_HANDLE_HEADER; break; } g_assert_not_reached (); break; /* No op */ case SEEK_STATE_HANDLE_INPUT_BLOCK: g_assert (file->input_state == INPUT_STATE_IN_BLOCK); op->state = SEEK_STATE_SKIP_BLOCK; /* Reuse client buffer for skipping */ io_op->io_buffer = NULL; io_op->io_size = file->input_block_size; io_op->io_allow_cancel = !op->sent_cancel; return STATE_OP_SKIP; /* Read block data */ case SEEK_STATE_SKIP_BLOCK: if (io_op->io_cancelled) { op->state = SEEK_STATE_HANDLE_INPUT; break; } g_assert (io_op->io_res <= file->input_block_size); file->input_block_size -= io_op->io_res; if (file->input_block_size == 0) file->input_state = INPUT_STATE_IN_REPLY_HEADER; op->state = SEEK_STATE_HANDLE_INPUT; break; /* read header data, (or manual io_len/res = 0) */ case SEEK_STATE_HANDLE_HEADER: if (io_op->io_cancelled) { op->state = SEEK_STATE_HANDLE_INPUT; break; } if (io_op->io_res > 0) { gsize unread_size = io_op->io_size - io_op->io_res; g_string_set_size (file->input_buffer, file->input_buffer->len - unread_size); } len = get_reply_header_missing_bytes (file->input_buffer); if (len > 0) { gsize current_len = file->input_buffer->len; g_string_set_size (file->input_buffer, current_len + len); io_op->io_buffer = file->input_buffer->str + current_len; io_op->io_size = len; io_op->io_allow_cancel = !op->sent_cancel; return STATE_OP_READ; } /* Got full header */ { GVfsDaemonSocketProtocolReply reply; char *data; data = decode_reply (file->input_buffer, &reply); if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_ERROR && reply.seq_nr == op->seq_nr) { op->ret_val = FALSE; decode_error (&reply, data, &op->ret_error); g_string_truncate (file->input_buffer, 0); return STATE_OP_DONE; } else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_DATA) { g_string_truncate (file->input_buffer, 0); file->input_state = INPUT_STATE_IN_BLOCK; file->input_block_size = reply.arg1; file->input_block_seek_generation = reply.arg2; op->state = SEEK_STATE_HANDLE_INPUT_BLOCK; break; } else if (reply.type == G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SEEK_POS) { op->ret_val = TRUE; op->ret_offset = ((goffset)reply.arg2) << 32 | (goffset)reply.arg1; g_string_truncate (file->input_buffer, 0); return STATE_OP_DONE; } /* Ignore other reply types */ } g_string_truncate (file->input_buffer, 0); /* This wasn't interesting, read next reply */ op->state = SEEK_STATE_HANDLE_HEADER; break; default: g_assert_not_reached (); } /* Clear io_op between non-op state switches */ io_op->io_size = 0; io_op->io_res = 0; io_op->io_cancelled = FALSE; } } static gboolean g_daemon_file_input_stream_seek (GFileInputStream *stream, goffset offset, GSeekType type, GCancellable *cancellable, GError **error) { GDaemonFileInputStream *file; SeekOperation op; file = G_DAEMON_FILE_INPUT_STREAM (stream); if (!file->can_seek) { g_set_error (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, "%s", _("Seek not supported on stream")); return FALSE; } if (g_cancellable_set_error_if_cancelled (cancellable, error)) return FALSE; memset (&op, 0, sizeof (op)); op.state = SEEK_STATE_INIT; op.offset = offset; op.seek_type = type; if (!run_sync_state_machine (file, (state_machine_iterator)iterate_seek_state_machine, &op, cancellable, error)) return FALSE; /* IO Error */ if (!op.ret_val) g_propagate_error (error, op.ret_error); else file->current_offset = op.ret_offset; return op.ret_val; } static GFileInfo * g_daemon_file_input_stream_query_info (GFileInputStream *stream, char *attributes, GCancellable *cancellable, GError **error) { g_set_error (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, "%s", _("The query info operation is not supported")); return NULL; } /************************************************************************ * Async I/O Code * ************************************************************************/ typedef struct AsyncIterator AsyncIterator; typedef void (*AsyncIteratorDone) (GInputStream *stream, gpointer op_data, GAsyncReadyCallback callback, gpointer callback_data, GError *io_error); struct AsyncIterator { AsyncIteratorDone done_cb; GDaemonFileInputStream *file; GCancellable *cancellable; IOOperationData io_data; state_machine_iterator iterator; gpointer iterator_data; int io_priority; GAsyncReadyCallback callback; gpointer callback_data; }; static void async_iterate (AsyncIterator *iterator); static void async_iterator_done (AsyncIterator *iterator, GError *io_error) { iterator->done_cb (G_INPUT_STREAM (iterator->file), iterator->iterator_data, iterator->callback, iterator->callback_data, io_error); g_free (iterator); } static void async_op_handle (AsyncIterator *iterator, gssize res, GError *io_error) { IOOperationData *io_data = &iterator->io_data; GError *error; if (io_error != NULL) { if (error_is_cancel (io_error)) { io_data->io_res = 0; io_data->io_cancelled = TRUE; } else { error = NULL; g_set_error (&error, G_IO_ERROR, G_IO_ERROR_FAILED, _("Error in stream protocol: %s"), io_error->message); async_iterator_done (iterator, error); g_error_free (error); return; } } else if (res == 0 && io_data->io_size != 0) { error = NULL; g_set_error (&error, G_IO_ERROR, G_IO_ERROR_FAILED, _("Error in stream protocol: %s"), _("End of stream")); async_iterator_done (iterator, error); g_error_free (error); return; } else { io_data->io_res = res; io_data->io_cancelled = FALSE; } async_iterate (iterator); } static void async_read_op_callback (GObject *source_object, GAsyncResult *res, gpointer user_data) { GInputStream *stream = G_INPUT_STREAM (source_object); gssize count_read; GError *error = NULL; count_read = g_input_stream_read_finish (stream, res, &error); async_op_handle ((AsyncIterator *)user_data, count_read, error); if (error) g_error_free (error); } static void async_skip_op_callback (GObject *source_object, GAsyncResult *res, gpointer user_data) { GInputStream *stream = G_INPUT_STREAM (source_object); gssize count_skipped; GError *error = NULL; count_skipped = g_input_stream_skip_finish (stream, res, &error); async_op_handle ((AsyncIterator *)user_data, count_skipped, error); if (error) g_error_free (error); } static void async_write_op_callback (GObject *source_object, GAsyncResult *res, gpointer user_data) { GOutputStream *stream = G_OUTPUT_STREAM (source_object); gssize bytes_written; GError *error = NULL; bytes_written = g_output_stream_write_finish (stream, res, &error); async_op_handle ((AsyncIterator *)user_data, bytes_written, error); if (error) g_error_free (error); } static void async_iterate (AsyncIterator *iterator) { IOOperationData *io_data = &iterator->io_data; GDaemonFileInputStream *file = iterator->file; StateOp io_op; io_data->cancelled = g_cancellable_is_cancelled (iterator->cancellable); io_op = iterator->iterator (file, io_data, iterator->iterator_data); if (io_op == STATE_OP_DONE) { async_iterator_done (iterator, NULL); return; } /* TODO: Handle allow_cancel... */ if (io_op == STATE_OP_READ) { g_input_stream_read_async (file->data_stream, io_data->io_buffer, io_data->io_size, iterator->io_priority, io_data->io_allow_cancel ? iterator->cancellable : NULL, async_read_op_callback, iterator); } else if (io_op == STATE_OP_SKIP) { g_input_stream_skip_async (file->data_stream, io_data->io_size, iterator->io_priority, io_data->io_allow_cancel ? iterator->cancellable : NULL, async_skip_op_callback, iterator); } else if (io_op == STATE_OP_WRITE) { g_output_stream_write_async (file->command_stream, io_data->io_buffer, io_data->io_size, iterator->io_priority, io_data->io_allow_cancel ? iterator->cancellable : NULL, async_write_op_callback, iterator); } else g_assert_not_reached (); } static void run_async_state_machine (GDaemonFileInputStream *file, state_machine_iterator iterator_cb, gpointer iterator_data, int io_priority, GAsyncReadyCallback callback, gpointer data, GCancellable *cancellable, AsyncIteratorDone done_cb) { AsyncIterator *iterator; iterator = g_new0 (AsyncIterator, 1); iterator->file = file; iterator->iterator = iterator_cb; iterator->iterator_data = iterator_data; iterator->io_priority = io_priority; iterator->cancellable = cancellable; iterator->callback = callback; iterator->callback_data = data; iterator->done_cb = done_cb; async_iterate (iterator); } static void async_read_done (GInputStream *stream, gpointer op_data, GAsyncReadyCallback callback, gpointer user_data, GError *io_error) { ReadOperation *op; gssize count_read; GError *error; GSimpleAsyncResult *simple; op = op_data; if (io_error) { count_read = -1; error = io_error; } else { count_read = op->ret_val; error = op->ret_error; } simple = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_daemon_file_input_stream_read_async); g_simple_async_result_set_op_res_gssize (simple, count_read); if (count_read == -1) g_simple_async_result_set_from_error (simple, error); /* Complete immediately, not in idle, since we're already in a mainloop callout */ g_simple_async_result_complete (simple); g_object_unref (simple); if (op->ret_error) g_error_free (op->ret_error); g_free (op); } static void g_daemon_file_input_stream_read_async (GInputStream *stream, void *buffer, gsize count, int io_priority, GCancellable *cancellable, GAsyncReadyCallback callback, gpointer user_data) { GDaemonFileInputStream *file; ReadOperation *op; file = G_DAEMON_FILE_INPUT_STREAM (stream); /* Limit for sanity and to avoid 32bit overflow */ if (count > MAX_READ_SIZE) count = MAX_READ_SIZE; op = g_new0 (ReadOperation, 1); op->state = READ_STATE_INIT; op->buffer = buffer; op->buffer_size = count; run_async_state_machine (file, (state_machine_iterator)iterate_read_state_machine, op, io_priority, callback, user_data, cancellable, async_read_done); } static gssize g_daemon_file_input_stream_read_finish (GInputStream *stream, GAsyncResult *result, GError **error) { GSimpleAsyncResult *simple; gssize nread; simple = G_SIMPLE_ASYNC_RESULT (result); g_assert (g_simple_async_result_get_source_tag (simple) == g_daemon_file_input_stream_read_async); nread = g_simple_async_result_get_op_res_gssize (simple); return nread; } static void g_daemon_file_input_stream_skip_async (GInputStream *stream, gsize count, int io_priority, GCancellable *cancellable, GAsyncReadyCallback callback, gpointer data) { g_assert_not_reached (); /* TODO: Not implemented */ } static gssize g_daemon_file_input_stream_skip_finish (GInputStream *stream, GAsyncResult *result, GError **error) { g_assert_not_reached (); /* TODO: Not implemented */ } static void async_close_done (GInputStream *stream, gpointer op_data, GAsyncReadyCallback callback, gpointer user_data, GError *io_error) { GDaemonFileInputStream *file; GSimpleAsyncResult *simple; CloseOperation *op; gboolean result; GError *error; GCancellable *cancellable = NULL; /* TODO: get cancellable */ file = G_DAEMON_FILE_INPUT_STREAM (stream); op = op_data; if (io_error) { result = FALSE; error = io_error; } else { result = op->ret_val; error = op->ret_error; } if (result) result = g_output_stream_close (file->command_stream, cancellable, &error); else g_output_stream_close (file->command_stream, cancellable, NULL); if (result) result = g_input_stream_close (file->data_stream, cancellable, &error); else g_input_stream_close (file->data_stream, cancellable, NULL); simple = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_daemon_file_input_stream_read_async); if (!result) g_simple_async_result_set_from_error (simple, error); /* Complete immediately, not in idle, since we're already in a mainloop callout */ g_simple_async_result_complete (simple); g_object_unref (simple); if (op->ret_error) g_error_free (op->ret_error); g_free (op); } static void g_daemon_file_input_stream_close_async (GInputStream *stream, int io_priority, GCancellable *cancellable, GAsyncReadyCallback callback, gpointer data) { GDaemonFileInputStream *file; CloseOperation *op; file = G_DAEMON_FILE_INPUT_STREAM (stream); op = g_new0 (CloseOperation, 1); op->state = CLOSE_STATE_INIT; run_async_state_machine (file, (state_machine_iterator)iterate_close_state_machine, op, io_priority, callback, data, cancellable, async_close_done); } static gboolean g_daemon_file_input_stream_close_finish (GInputStream *stream, GAsyncResult *result, GError **error) { /* Failures handled in generic close_finish code */ return TRUE; }