summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexander Larsson <alexl@redhat.com>2007-09-13 07:50:02 +0000
committerAlexander Larsson <alexl@src.gnome.org>2007-09-13 07:50:02 +0000
commitc59fb83d3586adb1be032817baf140162d02329e (patch)
tree157300331681cc86f48613ae9d02fb627d759fee
parentaebfb5d93e510a9f127acf1dbf34b75d14b944f4 (diff)
downloadgvfs-c59fb83d3586adb1be032817baf140162d02329e.tar.gz
Update of close and pending handling.
2006-09-27 Alexander Larsson <alexl@redhat.com> * ginputstream.[ch]: Update of close and pending handling. * goutputstream.[ch]: Implement default async version Original git commit by alex <alex> at 1159363478 +0000 svn path=/trunk/; revision=6
-rw-r--r--ginputstream.c29
-rw-r--r--ginputstream.h80
-rw-r--r--goutputstream.c761
-rw-r--r--goutputstream.h112
-rw-r--r--txt/ops.txt6
5 files changed, 830 insertions, 158 deletions
diff --git a/ginputstream.c b/ginputstream.c
index 91e21782..95bd9a41 100644
--- a/ginputstream.c
+++ b/ginputstream.c
@@ -35,7 +35,7 @@ static void g_input_stream_real_skip_async (GInputStream *stream,
GDestroyNotify notify);
static void g_input_stream_real_close_async (GInputStream *stream,
int io_priority,
- GAsyncCloseCallback callback,
+ GAsyncCloseInputCallback callback,
gpointer data,
GDestroyNotify notify);
static void g_input_stream_real_cancel (GInputStream *stream);
@@ -253,6 +253,10 @@ g_input_stream_real_skip (GInputStream *stream,
* Some streams might keep the backing store of the stream (e.g. a file descriptor)
* open after the stream is closed. See the documentation for the individual
* stream for details.
+ *
+ * On failure the first error that happened will be reported, but the close
+ * operation will finish as much as possible. A stream that failed to
+ * close will still return %G_VFS_ERROR_CLOSED all operations.
*
* Return value: %TRUE on success, %FALSE on failure
**/
@@ -285,8 +289,7 @@ g_input_stream_close (GInputStream *stream,
if (class->close)
res = class->close (stream, error);
- if (res)
- stream->priv->closed = TRUE;
+ stream->priv->closed = TRUE;
stream->priv->pending = FALSE;
@@ -509,6 +512,7 @@ g_input_stream_read_async (GInputStream *stream,
class = G_INPUT_STREAM_GET_CLASS (stream);
+ stream->priv->pending = TRUE;
class->read_async (stream, buffer, count, io_priority, callback, data, notify);
}
@@ -668,6 +672,7 @@ g_input_stream_skip_async (GInputStream *stream,
}
class = G_INPUT_STREAM_GET_CLASS (stream);
+ stream->priv->pending = TRUE;
class->skip_async (stream, count, io_priority, callback, data, notify);
}
@@ -676,7 +681,7 @@ typedef struct {
GInputStream *stream;
gboolean result;
GError *error;
- GAsyncCloseCallback callback;
+ GAsyncCloseInputCallback callback;
gpointer data;
GDestroyNotify notify;
} CloseAsyncResult;
@@ -715,7 +720,7 @@ static void
queue_close_async_result (GInputStream *stream,
gboolean result,
GError *error,
- GAsyncCloseCallback callback,
+ GAsyncCloseInputCallback callback,
gpointer data,
GDestroyNotify notify)
{
@@ -758,7 +763,7 @@ queue_close_async_result (GInputStream *stream,
void
g_input_stream_close_async (GInputStream *stream,
int io_priority,
- GAsyncCloseCallback callback,
+ GAsyncCloseInputCallback callback,
gpointer data,
GDestroyNotify notify)
{
@@ -788,6 +793,7 @@ g_input_stream_close_async (GInputStream *stream,
}
class = G_INPUT_STREAM_GET_CLASS (stream);
+ stream->priv->pending = TRUE;
class->close_async (stream, io_priority, callback, data, notify);
}
@@ -835,6 +841,10 @@ g_input_stream_is_cancelled (GInputStream *stream)
}
+/********************************************
+ * Default implementation of async ops *
+ ********************************************/
+
typedef struct {
GInputStream *stream;
@@ -938,7 +948,6 @@ g_input_stream_real_read_async (GInputStream *stream,
op->data = data;
op->notify = notify;
- stream->priv->pending = TRUE;
stream->priv->io_job_id = g_schedule_io_job (read_op_func,
read_op_cancel,
op,
@@ -1045,7 +1054,6 @@ g_input_stream_real_skip_async (GInputStream *stream,
op->data = data;
op->notify = notify;
- stream->priv->pending = TRUE;
stream->priv->io_job_id = g_schedule_io_job (skip_op_func,
skip_op_cancel,
op,
@@ -1059,7 +1067,7 @@ typedef struct {
GInputStream *stream;
gboolean res;
GError *error;
- GAsyncCloseCallback callback;
+ GAsyncCloseInputCallback callback;
gpointer data;
GDestroyNotify notify;
} CloseAsyncOp;
@@ -1135,7 +1143,7 @@ close_op_cancel (gpointer data)
static void
g_input_stream_real_close_async (GInputStream *stream,
int io_priority,
- GAsyncCloseCallback callback,
+ GAsyncCloseInputCallback callback,
gpointer data,
GDestroyNotify notify)
{
@@ -1148,7 +1156,6 @@ g_input_stream_real_close_async (GInputStream *stream,
op->data = data;
op->notify = notify;
- stream->priv->pending = TRUE;
stream->priv->io_job_id = g_schedule_io_job (close_op_func,
close_op_cancel,
op,
diff --git a/ginputstream.h b/ginputstream.h
index d954a71a..b11d807e 100644
--- a/ginputstream.h
+++ b/ginputstream.h
@@ -56,7 +56,7 @@ typedef void (*GAsyncSkipCallback) (GInputStream *stream,
GError *error);
/**
- * GAsyncCloseCallback:
+ * GAsyncCloseInputCallback:
* @stream: a #GInputStream
* @result: %TRUE on success, %FALSE otherwis
* @error: the error, if result is %FALSE, otherwise %NULL
@@ -68,10 +68,10 @@ typedef void (*GAsyncSkipCallback) (GInputStream *stream,
* If the operation was cancelled @result will be %FALSE, and @error
* will be %G_VFS_ERROR_CANCELLED.
**/
-typedef void (*GAsyncCloseCallback) (GInputStream *stream,
- gboolean result,
- gpointer data,
- GError *error);
+typedef void (*GAsyncCloseInputCallback) (GInputStream *stream,
+ gboolean result,
+ gpointer data,
+ GError *error);
struct _GInputStream
{
@@ -113,7 +113,7 @@ struct _GInputStreamClass
GDestroyNotify notify);
void (* close_async) (GInputStream *stream,
int io_priority,
- GAsyncCloseCallback callback,
+ GAsyncCloseInputCallback callback,
gpointer data,
GDestroyNotify notify);
void (* cancel) (GInputStream *stream);
@@ -131,41 +131,39 @@ struct _GInputStreamClass
GType g_input_stream_get_type (void) G_GNUC_CONST;
-gssize g_input_stream_read (GInputStream *stream,
- void *buffer,
- gsize count,
- GError **error);
-gssize g_input_stream_skip (GInputStream *stream,
- gsize count,
- GError **error);
-gboolean g_input_stream_close (GInputStream *stream,
- GError **error);
-
-void g_input_stream_set_async_context (GInputStream *stream,
- GMainContext *context);
-GMainContext *g_input_stream_get_async_context (GInputStream *stream);
-void g_input_stream_read_async (GInputStream *stream,
- void *buffer,
- gsize count,
- int io_priority,
- GAsyncReadCallback callback,
- gpointer data,
- GDestroyNotify notify);
-void g_input_stream_skip_async (GInputStream *stream,
- gsize count,
- int io_priority,
- GAsyncSkipCallback callback,
- gpointer data,
- GDestroyNotify notify);
-void g_input_stream_close_async (GInputStream *stream,
- int io_priority,
- GAsyncCloseCallback callback,
- gpointer data,
- GDestroyNotify notify);
-void g_input_stream_cancel (GInputStream *stream);
-
-
-gboolean g_input_stream_is_cancelled (GInputStream *stream);
+gssize g_input_stream_read (GInputStream *stream,
+ void *buffer,
+ gsize count,
+ GError **error);
+gssize g_input_stream_skip (GInputStream *stream,
+ gsize count,
+ GError **error);
+gboolean g_input_stream_close (GInputStream *stream,
+ GError **error);
+void g_input_stream_set_async_context (GInputStream *stream,
+ GMainContext *context);
+GMainContext *g_input_stream_get_async_context (GInputStream *stream);
+void g_input_stream_read_async (GInputStream *stream,
+ void *buffer,
+ gsize count,
+ int io_priority,
+ GAsyncReadCallback callback,
+ gpointer data,
+ GDestroyNotify notify);
+void g_input_stream_skip_async (GInputStream *stream,
+ gsize count,
+ int io_priority,
+ GAsyncSkipCallback callback,
+ gpointer data,
+ GDestroyNotify notify);
+void g_input_stream_close_async (GInputStream *stream,
+ int io_priority,
+ GAsyncCloseInputCallback callback,
+ gpointer data,
+ GDestroyNotify notify);
+void g_input_stream_cancel (GInputStream *stream);
+gboolean g_input_stream_is_cancelled (GInputStream *stream);
+
G_END_DECLS
diff --git a/goutputstream.c b/goutputstream.c
index 48efcadc..dd31c637 100644
--- a/goutputstream.c
+++ b/goutputstream.c
@@ -1,33 +1,39 @@
#include <config.h>
-#include "goutputstream.h"
#include <glib/gi18n-lib.h>
+#include "goutputstream.h"
+#include "gioscheduler.h"
+
G_DEFINE_TYPE (GOutputStream, g_output_stream, G_TYPE_OBJECT);
static GObjectClass *parent_class = NULL;
struct _GOutputStreamPrivate {
- /* TODO: Should be public for subclasses? */
guint closed : 1;
guint pending : 1;
guint cancelled : 1;
GMainContext *context;
+ gint io_job_id;
};
-static guint g_output_stream_real_write_async (GOutputStream *stream,
- void *buffer,
- gsize count,
- int io_priority,
- GAsyncWriteCallback callback,
- gpointer data,
- GDestroyNotify notify);
-static guint g_output_stream_real_close_async (GOutputStream *stream,
- GAsyncCloseCallback callback,
- gpointer data,
- GDestroyNotify notify);
-static void g_output_stream_real_cancel (GOutputStream *stream,
- guint tag);
-
+static void g_output_stream_real_write_async (GOutputStream *stream,
+ void *buffer,
+ gsize count,
+ int io_priority,
+ GAsyncWriteCallback callback,
+ gpointer data,
+ GDestroyNotify notify);
+static void g_output_stream_real_flush_async (GOutputStream *stream,
+ int io_priority,
+ GAsyncFlushCallback callback,
+ gpointer data,
+ GDestroyNotify notify);
+static void g_output_stream_real_close_async (GOutputStream *stream,
+ int io_priority,
+ GAsyncCloseOutputCallback callback,
+ gpointer data,
+ GDestroyNotify notify);
+static void g_output_stream_real_cancel (GOutputStream *stream);
static void
g_output_stream_finalize (GObject *object)
@@ -39,6 +45,12 @@ g_output_stream_finalize (GObject *object)
if (!stream->priv->closed)
g_output_stream_close (stream, NULL);
+ if (stream->priv->context)
+ {
+ g_main_context_unref (stream->priv->context);
+ stream->priv->context = NULL;
+ }
+
if (G_OBJECT_CLASS (parent_class)->finalize)
(*G_OBJECT_CLASS (parent_class)->finalize) (object);
}
@@ -55,6 +67,7 @@ g_output_stream_class_init (GOutputStreamClass *klass)
gobject_class->finalize = g_output_stream_finalize;
klass->write_async = g_output_stream_real_write_async;
+ klass->flush_async = g_output_stream_real_flush_async;
klass->close_async = g_output_stream_real_close_async;
klass->cancel = g_output_stream_real_cancel;
}
@@ -96,6 +109,7 @@ g_output_stream_write (GOutputStream *stream,
GError **error)
{
GOutputStreamClass *class;
+ gssize res;
g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
g_return_val_if_fail (stream != NULL, -1);
@@ -134,7 +148,11 @@ g_output_stream_write (GOutputStream *stream,
return -1;
}
- return class->write (stream, buffer, count, error);
+ stream->priv->pending = TRUE;
+ res = class->write (stream, buffer, count, error);
+ stream->priv->pending = FALSE;
+
+ return res;
}
/**
@@ -143,6 +161,7 @@ g_output_stream_write (GOutputStream *stream,
* @error: location to store the error occuring, or %NULL to ignore
*
* Flushed any outstanding buffers in the stream. Will block during the operation.
+ * Closing the stream will implicitly cause a flush.
*
* This function is optional for inherited classes.
*
@@ -153,6 +172,7 @@ g_output_stream_flush (GOutputStream *stream,
GError **error)
{
GOutputStreamClass *class;
+ gboolean res;
g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
g_return_val_if_fail (stream != NULL, FALSE);
@@ -172,11 +192,14 @@ g_output_stream_flush (GOutputStream *stream,
}
class = G_OUTPUT_STREAM_GET_CLASS (stream);
-
+
+ stream->priv->pending = TRUE;
+ res = TRUE;
if (class->flush)
- return class->flush (stream, error);
- else
- return TRUE;
+ res = class->flush (stream, error);
+ stream->priv->pending = FALSE;
+
+ return res;
}
/**
@@ -199,6 +222,10 @@ g_output_stream_flush (GOutputStream *stream,
* Some streams might keep the backing store of the stream (e.g. a file descriptor)
* open after the stream is closed. See the documentation for the individual
* stream for details.
+ *
+ * On failure the first error that happened will be reported, but the close
+ * operation will finish as much as possible. A stream that failed to
+ * close will still return %G_VFS_ERROR_CLOSED all operations.
*
* Return value: %TRUE on success, %FALSE on failure
**/
@@ -223,14 +250,28 @@ g_output_stream_close (GOutputStream *stream,
_("Stream has outstanding operation"));
return FALSE;
}
+
+ res = g_output_stream_flush (stream, error);
+
+ stream->priv->pending = TRUE;
- res = TRUE;
-
- if (class->close)
- res = class->close (stream, error);
+ if (!res)
+ {
+ /* flushing caused the error that we want to return,
+ * but we still want to close the underlying stream if possible
+ */
+ if (class->close)
+ class->close (stream, NULL);
+ }
+ else
+ {
+ res = TRUE;
+ if (class->close)
+ res = class->close (stream, error);
+ }
- if (res)
- stream->priv->closed = TRUE;
+ stream->priv->closed = TRUE;
+ stream->priv->pending = FALSE;
return res;
}
@@ -286,6 +327,80 @@ g_output_stream_get_async_context (GOutputStream *stream)
return stream->priv->context;
}
+typedef struct {
+ GOutputStream *stream;
+ void *buffer;
+ gsize bytes_requested;
+ gssize bytes_written;
+ GError *error;
+ GAsyncWriteCallback callback;
+ gpointer data;
+ GDestroyNotify notify;
+} WriteAsyncResult;
+
+static gboolean
+call_write_async_result (gpointer data)
+{
+ WriteAsyncResult *res = data;
+
+ if (res->callback)
+ res->callback (res->stream,
+ res->buffer,
+ res->bytes_requested,
+ res->bytes_written,
+ res->data,
+ res->error);
+
+ return FALSE;
+}
+
+static void
+write_async_result_free (gpointer data)
+{
+ WriteAsyncResult *res = data;
+
+ if (res->notify)
+ res->notify (res->data);
+
+ if (res->error)
+ g_error_free (res->error);
+
+ g_object_unref (res->stream);
+
+ g_free (res);
+}
+
+static void
+queue_write_async_result (GOutputStream *stream,
+ void *buffer,
+ gsize bytes_requested,
+ gssize bytes_written,
+ GError *error,
+ GAsyncWriteCallback callback,
+ gpointer data,
+ GDestroyNotify notify)
+{
+ GSource *source;
+ WriteAsyncResult *res;
+
+ res = g_new0 (WriteAsyncResult, 1);
+
+ res->stream = g_object_ref (stream);
+ res->buffer = buffer;
+ res->bytes_requested = bytes_requested;
+ res->bytes_written = bytes_written;
+ res->error = error;
+ res->callback = callback;
+ res->data = data;
+ res->notify = notify;
+
+ source = g_idle_source_new ();
+ g_source_set_priority (source, G_PRIORITY_DEFAULT);
+ g_source_set_callback (source, call_write_async_result, res, write_async_result_free);
+ g_source_attach (source, g_output_stream_get_async_context (stream));
+ g_source_unref (source);
+}
+
/**
* g_output_stream_write_async:
* @stream: A #GOutputStream.
@@ -316,10 +431,8 @@ g_output_stream_get_async_context (GOutputStream *stream)
* The asyncronous methods have a default fallback that uses threads to implement
* asynchronicity, so they are optional for inheriting classes. However, if you
* override one you must override all.
- *
- * Return value: A tag that can be passed to g_output_stream_cancel()
**/
-guint
+void
g_output_stream_write_async (GOutputStream *stream,
void *buffer,
gsize count,
@@ -329,16 +442,230 @@ g_output_stream_write_async (GOutputStream *stream,
GDestroyNotify notify)
{
GOutputStreamClass *class;
+ GError *error;
- g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), 0);
- g_return_val_if_fail (stream != NULL, 0);
- g_return_val_if_fail (buffer != NULL, 0);
+ g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
+ g_return_if_fail (stream != NULL);
+ g_return_if_fail (buffer != NULL);
+
+ stream->priv->cancelled = FALSE;
+ if (count == 0)
+ {
+ queue_write_async_result (stream, buffer, count, 0, NULL,
+ callback, data, notify);
+ return;
+ }
+
+ if (((gssize) count) < 0)
+ {
+ error = NULL;
+ g_set_error (&error, G_VFS_ERROR, G_VFS_ERROR_INVALID_ARGUMENT,
+ _("Too large count value passed to g_input_stream_read_async"));
+ queue_write_async_result (stream, buffer, count, -1, error,
+ callback, data, notify);
+ return;
+ }
+
+ if (stream->priv->closed)
+ {
+ error = NULL;
+ g_set_error (&error, G_VFS_ERROR, G_VFS_ERROR_CLOSED,
+ _("Stream is already closed"));
+ queue_write_async_result (stream, buffer, count, -1, error,
+ callback, data, notify);
+ return;
+ }
+
+ if (stream->priv->pending)
+ {
+ error = NULL;
+ g_set_error (&error, G_VFS_ERROR, G_VFS_ERROR_PENDING,
+ _("Stream has outstanding operation"));
+ queue_write_async_result (stream, buffer, count, -1, error,
+ callback, data, notify);
+ return;
+ }
+
class = G_OUTPUT_STREAM_GET_CLASS (stream);
+ stream->priv->pending = TRUE;
return class->write_async (stream, buffer, count, io_priority, callback, data, notify);
}
+typedef struct {
+ GOutputStream *stream;
+ gboolean result;
+ GError *error;
+ GAsyncFlushCallback callback;
+ gpointer data;
+ GDestroyNotify notify;
+} FlushAsyncResult;
+
+static gboolean
+call_flush_async_result (gpointer data)
+{
+ FlushAsyncResult *res = data;
+
+ if (res->callback)
+ res->callback (res->stream,
+ res->result,
+ res->data,
+ res->error);
+
+ return FALSE;
+}
+
+static void
+flush_async_result_free (gpointer data)
+{
+ FlushAsyncResult *res = data;
+
+ if (res->notify)
+ res->notify (res->data);
+
+ if (res->error)
+ g_error_free (res->error);
+
+ g_object_unref (res->stream);
+
+ g_free (res);
+}
+
+static void
+queue_flush_async_result (GOutputStream *stream,
+ gboolean result,
+ GError *error,
+ GAsyncFlushCallback callback,
+ gpointer data,
+ GDestroyNotify notify)
+{
+ GSource *source;
+ FlushAsyncResult *res;
+
+ res = g_new0 (FlushAsyncResult, 1);
+
+ res->stream = g_object_ref (stream);
+ res->result = result;
+ res->error = error;
+ res->callback = callback;
+ res->data = data;
+ res->notify = notify;
+
+ source = g_idle_source_new ();
+ g_source_set_priority (source, G_PRIORITY_DEFAULT);
+ g_source_set_callback (source, call_flush_async_result, res, flush_async_result_free);
+ g_source_attach (source, g_output_stream_get_async_context (stream));
+ g_source_unref (source);
+}
+
+void
+g_output_stream_flush_async (GOutputStream *stream,
+ int io_priority,
+ GAsyncFlushCallback callback,
+ gpointer data,
+ GDestroyNotify notify)
+{
+ GOutputStreamClass *class;
+ GError *error;
+
+ g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
+ g_return_if_fail (stream != NULL);
+
+ stream->priv->cancelled = FALSE;
+
+ if (stream->priv->closed)
+ {
+ error = NULL;
+ g_set_error (&error, G_VFS_ERROR, G_VFS_ERROR_CLOSED,
+ _("Stream is already closed"));
+ queue_flush_async_result (stream, FALSE, error,
+ callback, data, notify);
+ return;
+ }
+
+ if (stream->priv->pending)
+ {
+ error = NULL;
+ g_set_error (&error, G_VFS_ERROR, G_VFS_ERROR_PENDING,
+ _("Stream has outstanding operation"));
+ queue_flush_async_result (stream, FALSE, error,
+ callback, data, notify);
+ return;
+ }
+
+ class = G_OUTPUT_STREAM_GET_CLASS (stream);
+
+ stream->priv->pending = TRUE;
+ return class->flush_async (stream, io_priority, callback, data, notify);
+}
+
+typedef struct {
+ GOutputStream *stream;
+ gboolean result;
+ GError *error;
+ GAsyncCloseOutputCallback callback;
+ gpointer data;
+ GDestroyNotify notify;
+} CloseAsyncResult;
+
+static gboolean
+call_close_async_result (gpointer data)
+{
+ CloseAsyncResult *res = data;
+
+ if (res->callback)
+ res->callback (res->stream,
+ res->result,
+ res->data,
+ res->error);
+
+ return FALSE;
+}
+
+static void
+close_async_result_free (gpointer data)
+{
+ CloseAsyncResult *res = data;
+
+ if (res->notify)
+ res->notify (res->data);
+
+ if (res->error)
+ g_error_free (res->error);
+
+ g_object_unref (res->stream);
+
+ g_free (res);
+}
+
+static void
+queue_close_async_result (GOutputStream *stream,
+ gboolean result,
+ GError *error,
+ GAsyncCloseOutputCallback callback,
+ gpointer data,
+ GDestroyNotify notify)
+{
+ GSource *source;
+ CloseAsyncResult *res;
+
+ res = g_new0 (CloseAsyncResult, 1);
+
+ res->stream = g_object_ref (stream);
+ res->result = result;
+ res->error = error;
+ res->callback = callback;
+ res->data = data;
+ res->notify = notify;
+
+ source = g_idle_source_new ();
+ g_source_set_priority (source, G_PRIORITY_DEFAULT);
+ g_source_set_callback (source, call_close_async_result, res, close_async_result_free);
+ g_source_attach (source, g_output_stream_get_async_context (stream));
+ g_source_unref (source);
+}
+
/**
* g_output_stream_close_async:
* @stream: A #GOutputStream.
@@ -354,30 +681,47 @@ g_output_stream_write_async (GOutputStream *stream,
* The asyncronous methods have a default fallback that uses threads to implement
* asynchronicity, so they are optional for inheriting classes. However, if you
* override one you must override all.
- *
- * Return value: A tag that can be passed to g_output_stream_cancel()
**/
-guint
+void
g_output_stream_close_async (GOutputStream *stream,
- GAsyncCloseCallback callback,
- gpointer data,
- GDestroyNotify notify)
+ int io_priority,
+ GAsyncCloseOutputCallback callback,
+ gpointer data,
+ GDestroyNotify notify)
{
GOutputStreamClass *class;
+ GError *error;
- g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), 0);
- g_return_val_if_fail (stream != NULL, 0);
+ g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
+ g_return_if_fail (stream != NULL);
- class = G_OUTPUT_STREAM_GET_CLASS (stream);
+ stream->priv->cancelled = FALSE;
+
+ if (stream->priv->closed)
+ {
+ queue_close_async_result (stream, TRUE, NULL,
+ callback, data, notify);
+ return;
+ }
- return class->close_async (stream, callback, data, notify);
+ if (stream->priv->pending)
+ {
+ error = NULL;
+ g_set_error (&error, G_VFS_ERROR, G_VFS_ERROR_PENDING,
+ _("Stream has outstanding operation"));
+ queue_close_async_result (stream, FALSE, error,
+ callback, data, notify);
+ return;
+ }
+
+ class = G_OUTPUT_STREAM_GET_CLASS (stream);
+ stream->priv->pending = TRUE;
+ return class->close_async (stream, io_priority, callback, data, notify);
}
-
/**
* g_output_stream_cancel:
* @stream: A #GOutputStream.
- * @tag: a value returned from an async request
*
* Tries to cancel an outstanding request for the stream. If it
* succeeds the outstanding request callback will be called with
@@ -394,8 +738,7 @@ g_output_stream_close_async (GOutputStream *stream,
* override one you must override all.
**/
void
-g_output_stream_cancel (GOutputStream *stream,
- guint tag)
+g_output_stream_cancel (GOutputStream *stream)
{
GOutputStreamClass *class;
@@ -404,7 +747,9 @@ g_output_stream_cancel (GOutputStream *stream,
class = G_OUTPUT_STREAM_GET_CLASS (stream);
- class->cancel (stream, tag);
+ stream->priv->cancelled = TRUE;
+
+ class->cancel (stream);
}
@@ -416,33 +761,329 @@ g_output_stream_is_cancelled (GOutputStream *stream)
return stream->priv->cancelled;
}
-static guint
-g_output_stream_real_write_async (GOutputStream *stream,
+
+/********************************************
+ * Default implementation of async ops *
+ ********************************************/
+
+typedef struct {
+ GOutputStream *stream;
+ void *buffer;
+ gsize count_requested;
+ gssize count_written;
+ GError *error;
+ GAsyncWriteCallback callback;
+ gpointer data;
+ GDestroyNotify notify;
+} WriteAsyncOp;
+
+static void
+write_op_report (gpointer data)
+{
+ WriteAsyncOp *op = data;
+
+ op->stream->priv->pending = FALSE;
+
+ op->callback (op->stream,
+ op->buffer,
+ op->count_requested,
+ op->count_written,
+ op->data,
+ op->error);
+
+}
+
+static void
+write_op_free (gpointer data)
+{
+ WriteAsyncOp *op = data;
+
+ g_object_unref (op->stream);
+
+ if (op->error)
+ g_error_free (op->error);
+
+ if (op->notify)
+ op->notify (op->data);
+
+ g_free (op);
+}
+
+
+static void
+write_op_func (GIOJob *job,
+ gpointer data)
+{
+ WriteAsyncOp *op = data;
+ GOutputStreamClass *class;
+
+ if (g_io_job_is_cancelled (job))
+ {
+ op->count_written = -1;
+ g_set_error (&op->error,
+ G_VFS_ERROR,
+ G_VFS_ERROR_CANCELLED,
+ _("Operation was cancelled"));
+ }
+ else
+ {
+ class = G_OUTPUT_STREAM_GET_CLASS (op->stream);
+ op->count_written = class->write (op->stream, op->buffer, op->count_requested, &op->error);
+ }
+
+ g_io_job_mark_done (job);
+ g_io_job_send_to_mainloop (job, write_op_report,
+ op, write_op_free,
+ FALSE);
+}
+
+static void
+write_op_cancel (gpointer data)
+{
+ WriteAsyncOp *op = data;
+ GOutputStreamClass *class;
+
+ class = G_OUTPUT_STREAM_GET_CLASS (op->stream);
+ if (class->cancel_sync)
+ class->cancel_sync (op->stream);
+}
+
+static void
+g_output_stream_real_write_async (GOutputStream *stream,
void *buffer,
gsize count,
int io_priority,
- GAsyncWriteCallback callback,
+ GAsyncWriteCallback callback,
gpointer data,
GDestroyNotify notify)
{
- g_error ("TODO");
- return 0;
+ WriteAsyncOp *op;
+
+ op = g_new0 (WriteAsyncOp, 1);
+
+ op->stream = g_object_ref (stream);
+ op->buffer = buffer;
+ op->count_requested = count;
+ op->callback = callback;
+ op->data = data;
+ op->notify = notify;
+
+ stream->priv->io_job_id = g_schedule_io_job (write_op_func,
+ write_op_cancel,
+ op,
+ NULL,
+ io_priority,
+ g_output_stream_get_async_context (stream));
}
-static guint
+typedef struct {
+ GOutputStream *stream;
+ gboolean result;
+ GError *error;
+ GAsyncFlushCallback callback;
+ gpointer data;
+ GDestroyNotify notify;
+} FlushAsyncOp;
+
+static void
+flush_op_report (gpointer data)
+{
+ FlushAsyncOp *op = data;
+
+ op->stream->priv->pending = FALSE;
+
+ op->callback (op->stream,
+ op->result,
+ op->data,
+ op->error);
+
+}
+
+static void
+flush_op_free (gpointer data)
+{
+ FlushAsyncOp *op = data;
+
+ g_object_unref (op->stream);
+
+ if (op->error)
+ g_error_free (op->error);
+
+ if (op->notify)
+ op->notify (op->data);
+
+ g_free (op);
+}
+
+
+static void
+flush_op_func (GIOJob *job,
+ gpointer data)
+{
+ FlushAsyncOp *op = data;
+ GOutputStreamClass *class;
+
+ if (g_io_job_is_cancelled (job))
+ {
+ op->result = FALSE;
+ g_set_error (&op->error,
+ G_VFS_ERROR,
+ G_VFS_ERROR_CANCELLED,
+ _("Operation was cancelled"));
+ }
+ else
+ {
+ class = G_OUTPUT_STREAM_GET_CLASS (op->stream);
+ op->result = TRUE;
+ if (class->flush)
+ op->result = class->flush (op->stream, &op->error);
+ }
+
+ g_io_job_mark_done (job);
+ g_io_job_send_to_mainloop (job, flush_op_report,
+ op, flush_op_free,
+ FALSE);
+}
+
+static void
+flush_op_cancel (gpointer data)
+{
+ FlushAsyncOp *op = data;
+ GOutputStreamClass *class;
+
+ class = G_OUTPUT_STREAM_GET_CLASS (op->stream);
+ if (class->cancel_sync)
+ class->cancel_sync (op->stream);
+}
+
+static void
+g_output_stream_real_flush_async (GOutputStream *stream,
+ int io_priority,
+ GAsyncFlushCallback callback,
+ gpointer data,
+ GDestroyNotify notify)
+{
+ FlushAsyncOp *op;
+
+ op = g_new0 (FlushAsyncOp, 1);
+
+ op->stream = g_object_ref (stream);
+ op->callback = callback;
+ op->data = data;
+ op->notify = notify;
+
+ stream->priv->io_job_id = g_schedule_io_job (flush_op_func,
+ flush_op_cancel,
+ op,
+ NULL,
+ io_priority,
+ g_output_stream_get_async_context (stream));
+}
+
+typedef struct {
+ GOutputStream *stream;
+ gboolean res;
+ GError *error;
+ GAsyncCloseOutputCallback callback;
+ gpointer data;
+ GDestroyNotify notify;
+} CloseAsyncOp;
+
+static void
+close_op_report (gpointer data)
+{
+ CloseAsyncOp *op = data;
+
+ op->stream->priv->pending = FALSE;
+
+ op->callback (op->stream,
+ op->res,
+ op->data,
+ op->error);
+}
+
+static void
+close_op_free (gpointer data)
+{
+ CloseAsyncOp *op = data;
+
+ g_object_unref (op->stream);
+
+ if (op->error)
+ g_error_free (op->error);
+
+ if (op->notify)
+ op->notify (op->data);
+
+ g_free (op);
+}
+
+
+static void
+close_op_func (GIOJob *job,
+ gpointer data)
+{
+ CloseAsyncOp *op = data;
+ GOutputStreamClass *class;
+
+ if (g_io_job_is_cancelled (job))
+ {
+ op->res = FALSE;
+ g_set_error (&op->error,
+ G_VFS_ERROR,
+ G_VFS_ERROR_CANCELLED,
+ _("Operation was cancelled"));
+ }
+ else
+ {
+ class = G_OUTPUT_STREAM_GET_CLASS (op->stream);
+ op->res = class->close (op->stream, &op->error);
+ }
+
+ g_io_job_mark_done (job);
+ g_io_job_send_to_mainloop (job, close_op_report,
+ op, close_op_free,
+ FALSE);
+}
+
+static void
+close_op_cancel (gpointer data)
+{
+ CloseAsyncOp *op = data;
+ GOutputStreamClass *class;
+
+ class = G_OUTPUT_STREAM_GET_CLASS (op->stream);
+ if (class->cancel_sync)
+ class->cancel_sync (op->stream);
+}
+
+static void
g_output_stream_real_close_async (GOutputStream *stream,
- GAsyncCloseCallback callback,
+ int io_priority,
+ GAsyncCloseOutputCallback callback,
gpointer data,
GDestroyNotify notify)
{
- g_error ("TODO");
- return 0;
+ CloseAsyncOp *op;
+
+ op = g_new0 (CloseAsyncOp, 1);
+
+ op->stream = g_object_ref (stream);
+ op->callback = callback;
+ op->data = data;
+ op->notify = notify;
+
+ stream->priv->io_job_id = g_schedule_io_job (close_op_func,
+ close_op_cancel,
+ op,
+ NULL,
+ io_priority,
+ g_output_stream_get_async_context (stream));
}
static void
-g_output_stream_real_cancel (GOutputStream *stream,
- guint tag)
+g_output_stream_real_cancel (GOutputStream *stream)
{
- g_error ("TODO");
+ g_cancel_io_job (stream->priv->io_job_id);
}
diff --git a/goutputstream.h b/goutputstream.h
index f0385a00..21559761 100644
--- a/goutputstream.h
+++ b/goutputstream.h
@@ -3,7 +3,7 @@
#include <glib-object.h>
#include <gvfstypes.h>
-#include <ginputstream.h>
+#include <gvfserror.h>
G_BEGIN_DECLS
@@ -19,11 +19,36 @@ typedef struct _GOutputStreamClass GOutputStreamClass;
typedef struct _GOutputStreamPrivate GOutputStreamPrivate;
typedef void (*GAsyncWriteCallback) (GOutputStream *stream,
- void *buffer,
- gssize bytes_requested,
- gssize bytes_writen,
- gpointer data,
- GError *error);
+ void *buffer,
+ gsize bytes_requested,
+ gssize bytes_written,
+ gpointer data,
+ GError *error);
+
+typedef void (*GAsyncFlushCallback) (GOutputStream *stream,
+ gboolean result,
+ gpointer data,
+ GError *error);
+
+
+/**
+ * GAsyncCloseOutputCallback:
+ * @stream: a #GOutputStream
+ * @result: %TRUE on success, %FALSE otherwis
+ * @error: the error, if result is %FALSE, otherwise %NULL
+ *
+ * This callback is called when an asychronous close operation
+ * is finished.
+ *
+ * The callback is always called, even if the operation was cancelled.
+ * If the operation was cancelled @result will be %FALSE, and @error
+ * will be %G_VFS_ERROR_CANCELLED.
+ **/
+typedef void (*GAsyncCloseOutputCallback) (GOutputStream *stream,
+ gboolean result,
+ gpointer data,
+ GError *error);
+
struct _GOutputStream
{
@@ -51,22 +76,27 @@ struct _GOutputStreamClass
/* Async ops: (optional in derived classes) */
- guint (* write_async) (GOutputStream *stream,
+ void (* write_async) (GOutputStream *stream,
void *buffer,
gsize count,
int io_priority,
GAsyncWriteCallback callback,
gpointer data,
GDestroyNotify notify);
- guint (* close_async) (GOutputStream *stream,
- GAsyncCloseCallback callback,
+ void (* flush_async) (GOutputStream *stream,
+ int io_priority,
+ GAsyncFlushCallback callback,
gpointer data,
GDestroyNotify notify);
- void (* cancel) (GOutputStream *stream,
- guint tag);
+ void (* close_async) (GOutputStream *stream,
+ int io_priority,
+ GAsyncCloseOutputCallback callback,
+ gpointer data,
+ GDestroyNotify notify);
+ void (* cancel) (GOutputStream *stream);
/* Optional cancel wakeup if using default async ops */
- void (* cancel_sync) (GInputStream *stream);
+ void (* cancel_sync) (GOutputStream *stream);
/* Padding for future expansion */
void (*_g_reserved1) (void);
@@ -78,34 +108,36 @@ struct _GOutputStreamClass
GType g_output_stream_get_type (void) G_GNUC_CONST;
-
-gssize g_output_stream_write (GOutputStream *stream,
- void *buffer,
- gsize count,
- GError **error);
-gboolean g_output_stream_flush (GOutputStream *stream,
- GError **error);
-gboolean g_output_stream_close (GOutputStream *stream,
- GError **error);
-void g_output_stream_set_async_context (GOutputStream *stream,
- GMainContext *context);
-GMainContext *g_output_stream_get_async_context (GOutputStream *stream);
-guint g_output_stream_write_async (GOutputStream *stream,
- void *buffer,
- gsize count,
- int io_priority,
- GAsyncWriteCallback callback,
- gpointer data,
- GDestroyNotify notify);
-guint g_output_stream_close_async (GOutputStream *stream,
- GAsyncCloseCallback callback,
- gpointer data,
- GDestroyNotify notify);
-void g_output_stream_cancel (GOutputStream *stream,
- guint tag);
-
-gboolean g_output_stream_is_cancelled (GOutputStream *stream);
-
+gssize g_output_stream_write (GOutputStream *stream,
+ void *buffer,
+ gsize count,
+ GError **error);
+gboolean g_output_stream_flush (GOutputStream *stream,
+ GError **error);
+gboolean g_output_stream_close (GOutputStream *stream,
+ GError **error);
+void g_output_stream_set_async_context (GOutputStream *stream,
+ GMainContext *context);
+GMainContext *g_output_stream_get_async_context (GOutputStream *stream);
+void g_output_stream_write_async (GOutputStream *stream,
+ void *buffer,
+ gsize count,
+ int io_priority,
+ GAsyncWriteCallback callback,
+ gpointer data,
+ GDestroyNotify notify);
+void g_output_stream_flush_async (GOutputStream *stream,
+ int io_priority,
+ GAsyncFlushCallback callback,
+ gpointer data,
+ GDestroyNotify notify);
+void g_output_stream_close_async (GOutputStream *stream,
+ int io_priority,
+ GAsyncCloseOutputCallback callback,
+ gpointer data,
+ GDestroyNotify notify);
+void g_output_stream_cancel (GOutputStream *stream);
+gboolean g_output_stream_is_cancelled (GOutputStream *stream);
G_END_DECLS
diff --git a/txt/ops.txt b/txt/ops.txt
index 25222813..cd1bb274 100644
--- a/txt/ops.txt
+++ b/txt/ops.txt
@@ -1,9 +1,3 @@
-
-streams:
-
-InputStream <- FileInputStream (adds fstat + last_mtime) <- LocalFileInputStream
-
-
type: File, Folder, Symlink, Shortcut, Mountable, special (fifo, socket, chardec, blockdev)
flags: hidden,
GFileInfo {