diff options
author | Alexander Larsson <alexl@redhat.com> | 2007-09-13 07:50:02 +0000 |
---|---|---|
committer | Alexander Larsson <alexl@src.gnome.org> | 2007-09-13 07:50:02 +0000 |
commit | c59fb83d3586adb1be032817baf140162d02329e (patch) | |
tree | 157300331681cc86f48613ae9d02fb627d759fee | |
parent | aebfb5d93e510a9f127acf1dbf34b75d14b944f4 (diff) | |
download | gvfs-c59fb83d3586adb1be032817baf140162d02329e.tar.gz |
Update of close and pending handling.
2006-09-27 Alexander Larsson <alexl@redhat.com>
* ginputstream.[ch]:
Update of close and pending handling.
* goutputstream.[ch]:
Implement default async version
Original git commit by alex <alex> at 1159363478 +0000
svn path=/trunk/; revision=6
-rw-r--r-- | ginputstream.c | 29 | ||||
-rw-r--r-- | ginputstream.h | 80 | ||||
-rw-r--r-- | goutputstream.c | 761 | ||||
-rw-r--r-- | goutputstream.h | 112 | ||||
-rw-r--r-- | txt/ops.txt | 6 |
5 files changed, 830 insertions, 158 deletions
diff --git a/ginputstream.c b/ginputstream.c index 91e21782..95bd9a41 100644 --- a/ginputstream.c +++ b/ginputstream.c @@ -35,7 +35,7 @@ static void g_input_stream_real_skip_async (GInputStream *stream, GDestroyNotify notify); static void g_input_stream_real_close_async (GInputStream *stream, int io_priority, - GAsyncCloseCallback callback, + GAsyncCloseInputCallback callback, gpointer data, GDestroyNotify notify); static void g_input_stream_real_cancel (GInputStream *stream); @@ -253,6 +253,10 @@ g_input_stream_real_skip (GInputStream *stream, * Some streams might keep the backing store of the stream (e.g. a file descriptor) * open after the stream is closed. See the documentation for the individual * stream for details. + * + * On failure the first error that happened will be reported, but the close + * operation will finish as much as possible. A stream that failed to + * close will still return %G_VFS_ERROR_CLOSED all operations. * * Return value: %TRUE on success, %FALSE on failure **/ @@ -285,8 +289,7 @@ g_input_stream_close (GInputStream *stream, if (class->close) res = class->close (stream, error); - if (res) - stream->priv->closed = TRUE; + stream->priv->closed = TRUE; stream->priv->pending = FALSE; @@ -509,6 +512,7 @@ g_input_stream_read_async (GInputStream *stream, class = G_INPUT_STREAM_GET_CLASS (stream); + stream->priv->pending = TRUE; class->read_async (stream, buffer, count, io_priority, callback, data, notify); } @@ -668,6 +672,7 @@ g_input_stream_skip_async (GInputStream *stream, } class = G_INPUT_STREAM_GET_CLASS (stream); + stream->priv->pending = TRUE; class->skip_async (stream, count, io_priority, callback, data, notify); } @@ -676,7 +681,7 @@ typedef struct { GInputStream *stream; gboolean result; GError *error; - GAsyncCloseCallback callback; + GAsyncCloseInputCallback callback; gpointer data; GDestroyNotify notify; } CloseAsyncResult; @@ -715,7 +720,7 @@ static void queue_close_async_result (GInputStream *stream, gboolean result, GError *error, - GAsyncCloseCallback callback, + GAsyncCloseInputCallback callback, gpointer data, GDestroyNotify notify) { @@ -758,7 +763,7 @@ queue_close_async_result (GInputStream *stream, void g_input_stream_close_async (GInputStream *stream, int io_priority, - GAsyncCloseCallback callback, + GAsyncCloseInputCallback callback, gpointer data, GDestroyNotify notify) { @@ -788,6 +793,7 @@ g_input_stream_close_async (GInputStream *stream, } class = G_INPUT_STREAM_GET_CLASS (stream); + stream->priv->pending = TRUE; class->close_async (stream, io_priority, callback, data, notify); } @@ -835,6 +841,10 @@ g_input_stream_is_cancelled (GInputStream *stream) } +/******************************************** + * Default implementation of async ops * + ********************************************/ + typedef struct { GInputStream *stream; @@ -938,7 +948,6 @@ g_input_stream_real_read_async (GInputStream *stream, op->data = data; op->notify = notify; - stream->priv->pending = TRUE; stream->priv->io_job_id = g_schedule_io_job (read_op_func, read_op_cancel, op, @@ -1045,7 +1054,6 @@ g_input_stream_real_skip_async (GInputStream *stream, op->data = data; op->notify = notify; - stream->priv->pending = TRUE; stream->priv->io_job_id = g_schedule_io_job (skip_op_func, skip_op_cancel, op, @@ -1059,7 +1067,7 @@ typedef struct { GInputStream *stream; gboolean res; GError *error; - GAsyncCloseCallback callback; + GAsyncCloseInputCallback callback; gpointer data; GDestroyNotify notify; } CloseAsyncOp; @@ -1135,7 +1143,7 @@ close_op_cancel (gpointer data) static void g_input_stream_real_close_async (GInputStream *stream, int io_priority, - GAsyncCloseCallback callback, + GAsyncCloseInputCallback callback, gpointer data, GDestroyNotify notify) { @@ -1148,7 +1156,6 @@ g_input_stream_real_close_async (GInputStream *stream, op->data = data; op->notify = notify; - stream->priv->pending = TRUE; stream->priv->io_job_id = g_schedule_io_job (close_op_func, close_op_cancel, op, diff --git a/ginputstream.h b/ginputstream.h index d954a71a..b11d807e 100644 --- a/ginputstream.h +++ b/ginputstream.h @@ -56,7 +56,7 @@ typedef void (*GAsyncSkipCallback) (GInputStream *stream, GError *error); /** - * GAsyncCloseCallback: + * GAsyncCloseInputCallback: * @stream: a #GInputStream * @result: %TRUE on success, %FALSE otherwis * @error: the error, if result is %FALSE, otherwise %NULL @@ -68,10 +68,10 @@ typedef void (*GAsyncSkipCallback) (GInputStream *stream, * If the operation was cancelled @result will be %FALSE, and @error * will be %G_VFS_ERROR_CANCELLED. **/ -typedef void (*GAsyncCloseCallback) (GInputStream *stream, - gboolean result, - gpointer data, - GError *error); +typedef void (*GAsyncCloseInputCallback) (GInputStream *stream, + gboolean result, + gpointer data, + GError *error); struct _GInputStream { @@ -113,7 +113,7 @@ struct _GInputStreamClass GDestroyNotify notify); void (* close_async) (GInputStream *stream, int io_priority, - GAsyncCloseCallback callback, + GAsyncCloseInputCallback callback, gpointer data, GDestroyNotify notify); void (* cancel) (GInputStream *stream); @@ -131,41 +131,39 @@ struct _GInputStreamClass GType g_input_stream_get_type (void) G_GNUC_CONST; -gssize g_input_stream_read (GInputStream *stream, - void *buffer, - gsize count, - GError **error); -gssize g_input_stream_skip (GInputStream *stream, - gsize count, - GError **error); -gboolean g_input_stream_close (GInputStream *stream, - GError **error); - -void g_input_stream_set_async_context (GInputStream *stream, - GMainContext *context); -GMainContext *g_input_stream_get_async_context (GInputStream *stream); -void g_input_stream_read_async (GInputStream *stream, - void *buffer, - gsize count, - int io_priority, - GAsyncReadCallback callback, - gpointer data, - GDestroyNotify notify); -void g_input_stream_skip_async (GInputStream *stream, - gsize count, - int io_priority, - GAsyncSkipCallback callback, - gpointer data, - GDestroyNotify notify); -void g_input_stream_close_async (GInputStream *stream, - int io_priority, - GAsyncCloseCallback callback, - gpointer data, - GDestroyNotify notify); -void g_input_stream_cancel (GInputStream *stream); - - -gboolean g_input_stream_is_cancelled (GInputStream *stream); +gssize g_input_stream_read (GInputStream *stream, + void *buffer, + gsize count, + GError **error); +gssize g_input_stream_skip (GInputStream *stream, + gsize count, + GError **error); +gboolean g_input_stream_close (GInputStream *stream, + GError **error); +void g_input_stream_set_async_context (GInputStream *stream, + GMainContext *context); +GMainContext *g_input_stream_get_async_context (GInputStream *stream); +void g_input_stream_read_async (GInputStream *stream, + void *buffer, + gsize count, + int io_priority, + GAsyncReadCallback callback, + gpointer data, + GDestroyNotify notify); +void g_input_stream_skip_async (GInputStream *stream, + gsize count, + int io_priority, + GAsyncSkipCallback callback, + gpointer data, + GDestroyNotify notify); +void g_input_stream_close_async (GInputStream *stream, + int io_priority, + GAsyncCloseInputCallback callback, + gpointer data, + GDestroyNotify notify); +void g_input_stream_cancel (GInputStream *stream); +gboolean g_input_stream_is_cancelled (GInputStream *stream); + G_END_DECLS diff --git a/goutputstream.c b/goutputstream.c index 48efcadc..dd31c637 100644 --- a/goutputstream.c +++ b/goutputstream.c @@ -1,33 +1,39 @@ #include <config.h> -#include "goutputstream.h" #include <glib/gi18n-lib.h> +#include "goutputstream.h" +#include "gioscheduler.h" + G_DEFINE_TYPE (GOutputStream, g_output_stream, G_TYPE_OBJECT); static GObjectClass *parent_class = NULL; struct _GOutputStreamPrivate { - /* TODO: Should be public for subclasses? */ guint closed : 1; guint pending : 1; guint cancelled : 1; GMainContext *context; + gint io_job_id; }; -static guint g_output_stream_real_write_async (GOutputStream *stream, - void *buffer, - gsize count, - int io_priority, - GAsyncWriteCallback callback, - gpointer data, - GDestroyNotify notify); -static guint g_output_stream_real_close_async (GOutputStream *stream, - GAsyncCloseCallback callback, - gpointer data, - GDestroyNotify notify); -static void g_output_stream_real_cancel (GOutputStream *stream, - guint tag); - +static void g_output_stream_real_write_async (GOutputStream *stream, + void *buffer, + gsize count, + int io_priority, + GAsyncWriteCallback callback, + gpointer data, + GDestroyNotify notify); +static void g_output_stream_real_flush_async (GOutputStream *stream, + int io_priority, + GAsyncFlushCallback callback, + gpointer data, + GDestroyNotify notify); +static void g_output_stream_real_close_async (GOutputStream *stream, + int io_priority, + GAsyncCloseOutputCallback callback, + gpointer data, + GDestroyNotify notify); +static void g_output_stream_real_cancel (GOutputStream *stream); static void g_output_stream_finalize (GObject *object) @@ -39,6 +45,12 @@ g_output_stream_finalize (GObject *object) if (!stream->priv->closed) g_output_stream_close (stream, NULL); + if (stream->priv->context) + { + g_main_context_unref (stream->priv->context); + stream->priv->context = NULL; + } + if (G_OBJECT_CLASS (parent_class)->finalize) (*G_OBJECT_CLASS (parent_class)->finalize) (object); } @@ -55,6 +67,7 @@ g_output_stream_class_init (GOutputStreamClass *klass) gobject_class->finalize = g_output_stream_finalize; klass->write_async = g_output_stream_real_write_async; + klass->flush_async = g_output_stream_real_flush_async; klass->close_async = g_output_stream_real_close_async; klass->cancel = g_output_stream_real_cancel; } @@ -96,6 +109,7 @@ g_output_stream_write (GOutputStream *stream, GError **error) { GOutputStreamClass *class; + gssize res; g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1); g_return_val_if_fail (stream != NULL, -1); @@ -134,7 +148,11 @@ g_output_stream_write (GOutputStream *stream, return -1; } - return class->write (stream, buffer, count, error); + stream->priv->pending = TRUE; + res = class->write (stream, buffer, count, error); + stream->priv->pending = FALSE; + + return res; } /** @@ -143,6 +161,7 @@ g_output_stream_write (GOutputStream *stream, * @error: location to store the error occuring, or %NULL to ignore * * Flushed any outstanding buffers in the stream. Will block during the operation. + * Closing the stream will implicitly cause a flush. * * This function is optional for inherited classes. * @@ -153,6 +172,7 @@ g_output_stream_flush (GOutputStream *stream, GError **error) { GOutputStreamClass *class; + gboolean res; g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); g_return_val_if_fail (stream != NULL, FALSE); @@ -172,11 +192,14 @@ g_output_stream_flush (GOutputStream *stream, } class = G_OUTPUT_STREAM_GET_CLASS (stream); - + + stream->priv->pending = TRUE; + res = TRUE; if (class->flush) - return class->flush (stream, error); - else - return TRUE; + res = class->flush (stream, error); + stream->priv->pending = FALSE; + + return res; } /** @@ -199,6 +222,10 @@ g_output_stream_flush (GOutputStream *stream, * Some streams might keep the backing store of the stream (e.g. a file descriptor) * open after the stream is closed. See the documentation for the individual * stream for details. + * + * On failure the first error that happened will be reported, but the close + * operation will finish as much as possible. A stream that failed to + * close will still return %G_VFS_ERROR_CLOSED all operations. * * Return value: %TRUE on success, %FALSE on failure **/ @@ -223,14 +250,28 @@ g_output_stream_close (GOutputStream *stream, _("Stream has outstanding operation")); return FALSE; } + + res = g_output_stream_flush (stream, error); + + stream->priv->pending = TRUE; - res = TRUE; - - if (class->close) - res = class->close (stream, error); + if (!res) + { + /* flushing caused the error that we want to return, + * but we still want to close the underlying stream if possible + */ + if (class->close) + class->close (stream, NULL); + } + else + { + res = TRUE; + if (class->close) + res = class->close (stream, error); + } - if (res) - stream->priv->closed = TRUE; + stream->priv->closed = TRUE; + stream->priv->pending = FALSE; return res; } @@ -286,6 +327,80 @@ g_output_stream_get_async_context (GOutputStream *stream) return stream->priv->context; } +typedef struct { + GOutputStream *stream; + void *buffer; + gsize bytes_requested; + gssize bytes_written; + GError *error; + GAsyncWriteCallback callback; + gpointer data; + GDestroyNotify notify; +} WriteAsyncResult; + +static gboolean +call_write_async_result (gpointer data) +{ + WriteAsyncResult *res = data; + + if (res->callback) + res->callback (res->stream, + res->buffer, + res->bytes_requested, + res->bytes_written, + res->data, + res->error); + + return FALSE; +} + +static void +write_async_result_free (gpointer data) +{ + WriteAsyncResult *res = data; + + if (res->notify) + res->notify (res->data); + + if (res->error) + g_error_free (res->error); + + g_object_unref (res->stream); + + g_free (res); +} + +static void +queue_write_async_result (GOutputStream *stream, + void *buffer, + gsize bytes_requested, + gssize bytes_written, + GError *error, + GAsyncWriteCallback callback, + gpointer data, + GDestroyNotify notify) +{ + GSource *source; + WriteAsyncResult *res; + + res = g_new0 (WriteAsyncResult, 1); + + res->stream = g_object_ref (stream); + res->buffer = buffer; + res->bytes_requested = bytes_requested; + res->bytes_written = bytes_written; + res->error = error; + res->callback = callback; + res->data = data; + res->notify = notify; + + source = g_idle_source_new (); + g_source_set_priority (source, G_PRIORITY_DEFAULT); + g_source_set_callback (source, call_write_async_result, res, write_async_result_free); + g_source_attach (source, g_output_stream_get_async_context (stream)); + g_source_unref (source); +} + /** * g_output_stream_write_async: * @stream: A #GOutputStream. @@ -316,10 +431,8 @@ g_output_stream_get_async_context (GOutputStream *stream) * The asyncronous methods have a default fallback that uses threads to implement * asynchronicity, so they are optional for inheriting classes. However, if you * override one you must override all. - * - * Return value: A tag that can be passed to g_output_stream_cancel() **/ -guint +void g_output_stream_write_async (GOutputStream *stream, void *buffer, gsize count, @@ -329,16 +442,230 @@ g_output_stream_write_async (GOutputStream *stream, GDestroyNotify notify) { GOutputStreamClass *class; + GError *error; - g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), 0); - g_return_val_if_fail (stream != NULL, 0); - g_return_val_if_fail (buffer != NULL, 0); + g_return_if_fail (G_IS_OUTPUT_STREAM (stream)); + g_return_if_fail (stream != NULL); + g_return_if_fail (buffer != NULL); + + stream->priv->cancelled = FALSE; + if (count == 0) + { + queue_write_async_result (stream, buffer, count, 0, NULL, + callback, data, notify); + return; + } + + if (((gssize) count) < 0) + { + error = NULL; + g_set_error (&error, G_VFS_ERROR, G_VFS_ERROR_INVALID_ARGUMENT, + _("Too large count value passed to g_input_stream_read_async")); + queue_write_async_result (stream, buffer, count, -1, error, + callback, data, notify); + return; + } + + if (stream->priv->closed) + { + error = NULL; + g_set_error (&error, G_VFS_ERROR, G_VFS_ERROR_CLOSED, + _("Stream is already closed")); + queue_write_async_result (stream, buffer, count, -1, error, + callback, data, notify); + return; + } + + if (stream->priv->pending) + { + error = NULL; + g_set_error (&error, G_VFS_ERROR, G_VFS_ERROR_PENDING, + _("Stream has outstanding operation")); + queue_write_async_result (stream, buffer, count, -1, error, + callback, data, notify); + return; + } + class = G_OUTPUT_STREAM_GET_CLASS (stream); + stream->priv->pending = TRUE; return class->write_async (stream, buffer, count, io_priority, callback, data, notify); } +typedef struct { + GOutputStream *stream; + gboolean result; + GError *error; + GAsyncFlushCallback callback; + gpointer data; + GDestroyNotify notify; +} FlushAsyncResult; + +static gboolean +call_flush_async_result (gpointer data) +{ + FlushAsyncResult *res = data; + + if (res->callback) + res->callback (res->stream, + res->result, + res->data, + res->error); + + return FALSE; +} + +static void +flush_async_result_free (gpointer data) +{ + FlushAsyncResult *res = data; + + if (res->notify) + res->notify (res->data); + + if (res->error) + g_error_free (res->error); + + g_object_unref (res->stream); + + g_free (res); +} + +static void +queue_flush_async_result (GOutputStream *stream, + gboolean result, + GError *error, + GAsyncFlushCallback callback, + gpointer data, + GDestroyNotify notify) +{ + GSource *source; + FlushAsyncResult *res; + + res = g_new0 (FlushAsyncResult, 1); + + res->stream = g_object_ref (stream); + res->result = result; + res->error = error; + res->callback = callback; + res->data = data; + res->notify = notify; + + source = g_idle_source_new (); + g_source_set_priority (source, G_PRIORITY_DEFAULT); + g_source_set_callback (source, call_flush_async_result, res, flush_async_result_free); + g_source_attach (source, g_output_stream_get_async_context (stream)); + g_source_unref (source); +} + +void +g_output_stream_flush_async (GOutputStream *stream, + int io_priority, + GAsyncFlushCallback callback, + gpointer data, + GDestroyNotify notify) +{ + GOutputStreamClass *class; + GError *error; + + g_return_if_fail (G_IS_OUTPUT_STREAM (stream)); + g_return_if_fail (stream != NULL); + + stream->priv->cancelled = FALSE; + + if (stream->priv->closed) + { + error = NULL; + g_set_error (&error, G_VFS_ERROR, G_VFS_ERROR_CLOSED, + _("Stream is already closed")); + queue_flush_async_result (stream, FALSE, error, + callback, data, notify); + return; + } + + if (stream->priv->pending) + { + error = NULL; + g_set_error (&error, G_VFS_ERROR, G_VFS_ERROR_PENDING, + _("Stream has outstanding operation")); + queue_flush_async_result (stream, FALSE, error, + callback, data, notify); + return; + } + + class = G_OUTPUT_STREAM_GET_CLASS (stream); + + stream->priv->pending = TRUE; + return class->flush_async (stream, io_priority, callback, data, notify); +} + +typedef struct { + GOutputStream *stream; + gboolean result; + GError *error; + GAsyncCloseOutputCallback callback; + gpointer data; + GDestroyNotify notify; +} CloseAsyncResult; + +static gboolean +call_close_async_result (gpointer data) +{ + CloseAsyncResult *res = data; + + if (res->callback) + res->callback (res->stream, + res->result, + res->data, + res->error); + + return FALSE; +} + +static void +close_async_result_free (gpointer data) +{ + CloseAsyncResult *res = data; + + if (res->notify) + res->notify (res->data); + + if (res->error) + g_error_free (res->error); + + g_object_unref (res->stream); + + g_free (res); +} + +static void +queue_close_async_result (GOutputStream *stream, + gboolean result, + GError *error, + GAsyncCloseOutputCallback callback, + gpointer data, + GDestroyNotify notify) +{ + GSource *source; + CloseAsyncResult *res; + + res = g_new0 (CloseAsyncResult, 1); + + res->stream = g_object_ref (stream); + res->result = result; + res->error = error; + res->callback = callback; + res->data = data; + res->notify = notify; + + source = g_idle_source_new (); + g_source_set_priority (source, G_PRIORITY_DEFAULT); + g_source_set_callback (source, call_close_async_result, res, close_async_result_free); + g_source_attach (source, g_output_stream_get_async_context (stream)); + g_source_unref (source); +} + /** * g_output_stream_close_async: * @stream: A #GOutputStream. @@ -354,30 +681,47 @@ g_output_stream_write_async (GOutputStream *stream, * The asyncronous methods have a default fallback that uses threads to implement * asynchronicity, so they are optional for inheriting classes. However, if you * override one you must override all. - * - * Return value: A tag that can be passed to g_output_stream_cancel() **/ -guint +void g_output_stream_close_async (GOutputStream *stream, - GAsyncCloseCallback callback, - gpointer data, - GDestroyNotify notify) + int io_priority, + GAsyncCloseOutputCallback callback, + gpointer data, + GDestroyNotify notify) { GOutputStreamClass *class; + GError *error; - g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), 0); - g_return_val_if_fail (stream != NULL, 0); + g_return_if_fail (G_IS_OUTPUT_STREAM (stream)); + g_return_if_fail (stream != NULL); - class = G_OUTPUT_STREAM_GET_CLASS (stream); + stream->priv->cancelled = FALSE; + + if (stream->priv->closed) + { + queue_close_async_result (stream, TRUE, NULL, + callback, data, notify); + return; + } - return class->close_async (stream, callback, data, notify); + if (stream->priv->pending) + { + error = NULL; + g_set_error (&error, G_VFS_ERROR, G_VFS_ERROR_PENDING, + _("Stream has outstanding operation")); + queue_close_async_result (stream, FALSE, error, + callback, data, notify); + return; + } + + class = G_OUTPUT_STREAM_GET_CLASS (stream); + stream->priv->pending = TRUE; + return class->close_async (stream, io_priority, callback, data, notify); } - /** * g_output_stream_cancel: * @stream: A #GOutputStream. - * @tag: a value returned from an async request * * Tries to cancel an outstanding request for the stream. If it * succeeds the outstanding request callback will be called with @@ -394,8 +738,7 @@ g_output_stream_close_async (GOutputStream *stream, * override one you must override all. **/ void -g_output_stream_cancel (GOutputStream *stream, - guint tag) +g_output_stream_cancel (GOutputStream *stream) { GOutputStreamClass *class; @@ -404,7 +747,9 @@ g_output_stream_cancel (GOutputStream *stream, class = G_OUTPUT_STREAM_GET_CLASS (stream); - class->cancel (stream, tag); + stream->priv->cancelled = TRUE; + + class->cancel (stream); } @@ -416,33 +761,329 @@ g_output_stream_is_cancelled (GOutputStream *stream) return stream->priv->cancelled; } -static guint -g_output_stream_real_write_async (GOutputStream *stream, + +/******************************************** + * Default implementation of async ops * + ********************************************/ + +typedef struct { + GOutputStream *stream; + void *buffer; + gsize count_requested; + gssize count_written; + GError *error; + GAsyncWriteCallback callback; + gpointer data; + GDestroyNotify notify; +} WriteAsyncOp; + +static void +write_op_report (gpointer data) +{ + WriteAsyncOp *op = data; + + op->stream->priv->pending = FALSE; + + op->callback (op->stream, + op->buffer, + op->count_requested, + op->count_written, + op->data, + op->error); + +} + +static void +write_op_free (gpointer data) +{ + WriteAsyncOp *op = data; + + g_object_unref (op->stream); + + if (op->error) + g_error_free (op->error); + + if (op->notify) + op->notify (op->data); + + g_free (op); +} + + +static void +write_op_func (GIOJob *job, + gpointer data) +{ + WriteAsyncOp *op = data; + GOutputStreamClass *class; + + if (g_io_job_is_cancelled (job)) + { + op->count_written = -1; + g_set_error (&op->error, + G_VFS_ERROR, + G_VFS_ERROR_CANCELLED, + _("Operation was cancelled")); + } + else + { + class = G_OUTPUT_STREAM_GET_CLASS (op->stream); + op->count_written = class->write (op->stream, op->buffer, op->count_requested, &op->error); + } + + g_io_job_mark_done (job); + g_io_job_send_to_mainloop (job, write_op_report, + op, write_op_free, + FALSE); +} + +static void +write_op_cancel (gpointer data) +{ + WriteAsyncOp *op = data; + GOutputStreamClass *class; + + class = G_OUTPUT_STREAM_GET_CLASS (op->stream); + if (class->cancel_sync) + class->cancel_sync (op->stream); +} + +static void +g_output_stream_real_write_async (GOutputStream *stream, void *buffer, gsize count, int io_priority, - GAsyncWriteCallback callback, + GAsyncWriteCallback callback, gpointer data, GDestroyNotify notify) { - g_error ("TODO"); - return 0; + WriteAsyncOp *op; + + op = g_new0 (WriteAsyncOp, 1); + + op->stream = g_object_ref (stream); + op->buffer = buffer; + op->count_requested = count; + op->callback = callback; + op->data = data; + op->notify = notify; + + stream->priv->io_job_id = g_schedule_io_job (write_op_func, + write_op_cancel, + op, + NULL, + io_priority, + g_output_stream_get_async_context (stream)); } -static guint +typedef struct { + GOutputStream *stream; + gboolean result; + GError *error; + GAsyncFlushCallback callback; + gpointer data; + GDestroyNotify notify; +} FlushAsyncOp; + +static void +flush_op_report (gpointer data) +{ + FlushAsyncOp *op = data; + + op->stream->priv->pending = FALSE; + + op->callback (op->stream, + op->result, + op->data, + op->error); + +} + +static void +flush_op_free (gpointer data) +{ + FlushAsyncOp *op = data; + + g_object_unref (op->stream); + + if (op->error) + g_error_free (op->error); + + if (op->notify) + op->notify (op->data); + + g_free (op); +} + + +static void +flush_op_func (GIOJob *job, + gpointer data) +{ + FlushAsyncOp *op = data; + GOutputStreamClass *class; + + if (g_io_job_is_cancelled (job)) + { + op->result = FALSE; + g_set_error (&op->error, + G_VFS_ERROR, + G_VFS_ERROR_CANCELLED, + _("Operation was cancelled")); + } + else + { + class = G_OUTPUT_STREAM_GET_CLASS (op->stream); + op->result = TRUE; + if (class->flush) + op->result = class->flush (op->stream, &op->error); + } + + g_io_job_mark_done (job); + g_io_job_send_to_mainloop (job, flush_op_report, + op, flush_op_free, + FALSE); +} + +static void +flush_op_cancel (gpointer data) +{ + FlushAsyncOp *op = data; + GOutputStreamClass *class; + + class = G_OUTPUT_STREAM_GET_CLASS (op->stream); + if (class->cancel_sync) + class->cancel_sync (op->stream); +} + +static void +g_output_stream_real_flush_async (GOutputStream *stream, + int io_priority, + GAsyncFlushCallback callback, + gpointer data, + GDestroyNotify notify) +{ + FlushAsyncOp *op; + + op = g_new0 (FlushAsyncOp, 1); + + op->stream = g_object_ref (stream); + op->callback = callback; + op->data = data; + op->notify = notify; + + stream->priv->io_job_id = g_schedule_io_job (flush_op_func, + flush_op_cancel, + op, + NULL, + io_priority, + g_output_stream_get_async_context (stream)); +} + +typedef struct { + GOutputStream *stream; + gboolean res; + GError *error; + GAsyncCloseOutputCallback callback; + gpointer data; + GDestroyNotify notify; +} CloseAsyncOp; + +static void +close_op_report (gpointer data) +{ + CloseAsyncOp *op = data; + + op->stream->priv->pending = FALSE; + + op->callback (op->stream, + op->res, + op->data, + op->error); +} + +static void +close_op_free (gpointer data) +{ + CloseAsyncOp *op = data; + + g_object_unref (op->stream); + + if (op->error) + g_error_free (op->error); + + if (op->notify) + op->notify (op->data); + + g_free (op); +} + + +static void +close_op_func (GIOJob *job, + gpointer data) +{ + CloseAsyncOp *op = data; + GOutputStreamClass *class; + + if (g_io_job_is_cancelled (job)) + { + op->res = FALSE; + g_set_error (&op->error, + G_VFS_ERROR, + G_VFS_ERROR_CANCELLED, + _("Operation was cancelled")); + } + else + { + class = G_OUTPUT_STREAM_GET_CLASS (op->stream); + op->res = class->close (op->stream, &op->error); + } + + g_io_job_mark_done (job); + g_io_job_send_to_mainloop (job, close_op_report, + op, close_op_free, + FALSE); +} + +static void +close_op_cancel (gpointer data) +{ + CloseAsyncOp *op = data; + GOutputStreamClass *class; + + class = G_OUTPUT_STREAM_GET_CLASS (op->stream); + if (class->cancel_sync) + class->cancel_sync (op->stream); +} + +static void g_output_stream_real_close_async (GOutputStream *stream, - GAsyncCloseCallback callback, + int io_priority, + GAsyncCloseOutputCallback callback, gpointer data, GDestroyNotify notify) { - g_error ("TODO"); - return 0; + CloseAsyncOp *op; + + op = g_new0 (CloseAsyncOp, 1); + + op->stream = g_object_ref (stream); + op->callback = callback; + op->data = data; + op->notify = notify; + + stream->priv->io_job_id = g_schedule_io_job (close_op_func, + close_op_cancel, + op, + NULL, + io_priority, + g_output_stream_get_async_context (stream)); } static void -g_output_stream_real_cancel (GOutputStream *stream, - guint tag) +g_output_stream_real_cancel (GOutputStream *stream) { - g_error ("TODO"); + g_cancel_io_job (stream->priv->io_job_id); } diff --git a/goutputstream.h b/goutputstream.h index f0385a00..21559761 100644 --- a/goutputstream.h +++ b/goutputstream.h @@ -3,7 +3,7 @@ #include <glib-object.h> #include <gvfstypes.h> -#include <ginputstream.h> +#include <gvfserror.h> G_BEGIN_DECLS @@ -19,11 +19,36 @@ typedef struct _GOutputStreamClass GOutputStreamClass; typedef struct _GOutputStreamPrivate GOutputStreamPrivate; typedef void (*GAsyncWriteCallback) (GOutputStream *stream, - void *buffer, - gssize bytes_requested, - gssize bytes_writen, - gpointer data, - GError *error); + void *buffer, + gsize bytes_requested, + gssize bytes_written, + gpointer data, + GError *error); + +typedef void (*GAsyncFlushCallback) (GOutputStream *stream, + gboolean result, + gpointer data, + GError *error); + + +/** + * GAsyncCloseOutputCallback: + * @stream: a #GOutputStream + * @result: %TRUE on success, %FALSE otherwis + * @error: the error, if result is %FALSE, otherwise %NULL + * + * This callback is called when an asychronous close operation + * is finished. + * + * The callback is always called, even if the operation was cancelled. + * If the operation was cancelled @result will be %FALSE, and @error + * will be %G_VFS_ERROR_CANCELLED. + **/ +typedef void (*GAsyncCloseOutputCallback) (GOutputStream *stream, + gboolean result, + gpointer data, + GError *error); + struct _GOutputStream { @@ -51,22 +76,27 @@ struct _GOutputStreamClass /* Async ops: (optional in derived classes) */ - guint (* write_async) (GOutputStream *stream, + void (* write_async) (GOutputStream *stream, void *buffer, gsize count, int io_priority, GAsyncWriteCallback callback, gpointer data, GDestroyNotify notify); - guint (* close_async) (GOutputStream *stream, - GAsyncCloseCallback callback, + void (* flush_async) (GOutputStream *stream, + int io_priority, + GAsyncFlushCallback callback, gpointer data, GDestroyNotify notify); - void (* cancel) (GOutputStream *stream, - guint tag); + void (* close_async) (GOutputStream *stream, + int io_priority, + GAsyncCloseOutputCallback callback, + gpointer data, + GDestroyNotify notify); + void (* cancel) (GOutputStream *stream); /* Optional cancel wakeup if using default async ops */ - void (* cancel_sync) (GInputStream *stream); + void (* cancel_sync) (GOutputStream *stream); /* Padding for future expansion */ void (*_g_reserved1) (void); @@ -78,34 +108,36 @@ struct _GOutputStreamClass GType g_output_stream_get_type (void) G_GNUC_CONST; - -gssize g_output_stream_write (GOutputStream *stream, - void *buffer, - gsize count, - GError **error); -gboolean g_output_stream_flush (GOutputStream *stream, - GError **error); -gboolean g_output_stream_close (GOutputStream *stream, - GError **error); -void g_output_stream_set_async_context (GOutputStream *stream, - GMainContext *context); -GMainContext *g_output_stream_get_async_context (GOutputStream *stream); -guint g_output_stream_write_async (GOutputStream *stream, - void *buffer, - gsize count, - int io_priority, - GAsyncWriteCallback callback, - gpointer data, - GDestroyNotify notify); -guint g_output_stream_close_async (GOutputStream *stream, - GAsyncCloseCallback callback, - gpointer data, - GDestroyNotify notify); -void g_output_stream_cancel (GOutputStream *stream, - guint tag); - -gboolean g_output_stream_is_cancelled (GOutputStream *stream); - +gssize g_output_stream_write (GOutputStream *stream, + void *buffer, + gsize count, + GError **error); +gboolean g_output_stream_flush (GOutputStream *stream, + GError **error); +gboolean g_output_stream_close (GOutputStream *stream, + GError **error); +void g_output_stream_set_async_context (GOutputStream *stream, + GMainContext *context); +GMainContext *g_output_stream_get_async_context (GOutputStream *stream); +void g_output_stream_write_async (GOutputStream *stream, + void *buffer, + gsize count, + int io_priority, + GAsyncWriteCallback callback, + gpointer data, + GDestroyNotify notify); +void g_output_stream_flush_async (GOutputStream *stream, + int io_priority, + GAsyncFlushCallback callback, + gpointer data, + GDestroyNotify notify); +void g_output_stream_close_async (GOutputStream *stream, + int io_priority, + GAsyncCloseOutputCallback callback, + gpointer data, + GDestroyNotify notify); +void g_output_stream_cancel (GOutputStream *stream); +gboolean g_output_stream_is_cancelled (GOutputStream *stream); G_END_DECLS diff --git a/txt/ops.txt b/txt/ops.txt index 25222813..cd1bb274 100644 --- a/txt/ops.txt +++ b/txt/ops.txt @@ -1,9 +1,3 @@ - -streams: - -InputStream <- FileInputStream (adds fstat + last_mtime) <- LocalFileInputStream - - type: File, Folder, Symlink, Shortcut, Mountable, special (fifo, socket, chardec, blockdev) flags: hidden, GFileInfo { |