From fcfd94ae7b7580811dc307502efa3460d1a1446f Mon Sep 17 00:00:00 2001 From: Alexander Larsson Date: Thu, 13 Sep 2007 13:05:15 +0000 Subject: Convert the rest of GInputStream to the new async API model Original git commit by Alexander Larsson at 1177595433 +0200 svn path=/trunk/; revision=482 --- client/gdaemonfileinputstream.c | 194 ++++++++------ client/gdaemonfileoutputstream.c | 36 +-- gio/gasyncresult.h | 3 +- gio/ginputstream.c | 547 +++++++++++++++++---------------------- gio/ginputstream.h | 165 ++++++------ gio/gmemoryinputstream.c | 139 ++++++---- gio/gsimpleasyncresult.c | 12 +- gio/gsimpleasyncresult.h | 2 + gio/gsocketinputstream.c | 130 ++++++---- gio/test-gio.c | 31 ++- 10 files changed, 668 insertions(+), 591 deletions(-) diff --git a/client/gdaemonfileinputstream.c b/client/gdaemonfileinputstream.c index 95b9d722..84c73093 100644 --- a/client/gdaemonfileinputstream.c +++ b/client/gdaemonfileinputstream.c @@ -143,50 +143,57 @@ struct _GDaemonFileInputStream { GString *output_buffer; }; -static gssize g_daemon_file_input_stream_read (GInputStream *stream, - void *buffer, - gsize count, - GCancellable *cancellable, - GError **error); -static gssize g_daemon_file_input_stream_skip (GInputStream *stream, - gsize count, - GCancellable *cancellable, - GError **error); -static gboolean g_daemon_file_input_stream_close (GInputStream *stream, - GCancellable *cancellable, - GError **error); -static GFileInfo *g_daemon_file_input_stream_get_file_info (GFileInputStream *stream, - char *attributes, - GCancellable *cancellable, - GError **error); -static goffset g_daemon_file_input_stream_tell (GFileInputStream *stream); -static gboolean g_daemon_file_input_stream_can_seek (GFileInputStream *stream); -static gboolean g_daemon_file_input_stream_seek (GFileInputStream *stream, - goffset offset, - GSeekType type, - GCancellable *cancellable, - GError **error); -static void g_daemon_file_input_stream_read_async (GInputStream *stream, - void *buffer, - gsize count, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer data); -static gssize g_daemon_file_input_stream_read_finish (GInputStream *stream, - GAsyncResult *result, - GError **error); -static void g_daemon_file_input_stream_skip_async (GInputStream *stream, - gsize count, - int io_priority, - GAsyncSkipCallback callback, - gpointer data, - GCancellable *cancellable); -static void g_daemon_file_input_stream_close_async (GInputStream *stream, - int io_priority, - GAsyncCloseInputCallback callback, - gpointer data, - GCancellable *cancellable); +static gssize g_daemon_file_input_stream_read (GInputStream *stream, + void *buffer, + gsize count, + GCancellable *cancellable, + GError **error); +static gssize g_daemon_file_input_stream_skip (GInputStream *stream, + gsize count, + GCancellable *cancellable, + GError **error); +static gboolean g_daemon_file_input_stream_close (GInputStream *stream, + GCancellable *cancellable, + GError **error); +static GFileInfo *g_daemon_file_input_stream_get_file_info (GFileInputStream *stream, + char *attributes, + GCancellable *cancellable, + GError **error); +static goffset g_daemon_file_input_stream_tell (GFileInputStream *stream); +static gboolean g_daemon_file_input_stream_can_seek (GFileInputStream *stream); +static gboolean g_daemon_file_input_stream_seek (GFileInputStream *stream, + goffset offset, + GSeekType type, + GCancellable *cancellable, + GError **error); +static void g_daemon_file_input_stream_read_async (GInputStream *stream, + void *buffer, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer data); +static gssize g_daemon_file_input_stream_read_finish (GInputStream *stream, + GAsyncResult *result, + GError **error); +static void g_daemon_file_input_stream_skip_async (GInputStream *stream, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer data); +static gssize g_daemon_file_input_stream_skip_finish (GInputStream *stream, + GAsyncResult *result, + GError **error); +static void g_daemon_file_input_stream_close_async (GInputStream *stream, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer data); +static gboolean g_daemon_file_input_stream_close_finish (GInputStream *stream, + GAsyncResult *result, + GError **error); + G_DEFINE_TYPE (GDaemonFileInputStream, g_daemon_file_input_stream, G_TYPE_FILE_INPUT_STREAM) @@ -225,8 +232,13 @@ g_daemon_file_input_stream_class_init (GDaemonFileInputStreamClass *klass) stream_class->read_async = g_daemon_file_input_stream_read_async; stream_class->read_finish = g_daemon_file_input_stream_read_finish; - if (0) stream_class->skip_async = g_daemon_file_input_stream_skip_async; + if (0) + { + stream_class->skip_async = g_daemon_file_input_stream_skip_async; + stream_class->skip_finish = g_daemon_file_input_stream_skip_finish; + } stream_class->close_async = g_daemon_file_input_stream_close_async; + stream_class->close_finish = g_daemon_file_input_stream_close_finish; file_stream_class->tell = g_daemon_file_input_stream_tell; file_stream_class->can_seek = g_daemon_file_input_stream_can_seek; @@ -1202,7 +1214,7 @@ typedef struct AsyncIterator AsyncIterator; typedef void (*AsyncIteratorDone) (GInputStream *stream, gpointer op_data, - gpointer callback, + GAsyncReadyCallback callback, gpointer callback_data, GError *io_error); @@ -1214,7 +1226,7 @@ struct AsyncIterator { state_machine_iterator iterator; gpointer iterator_data; int io_priority; - gpointer callback; + GAsyncReadyCallback callback; gpointer callback_data; }; @@ -1279,7 +1291,7 @@ async_op_handle (AsyncIterator *iterator, static void async_read_op_callback (GObject *source_object, GAsyncResult *res, - gpointer user_data) + gpointer user_data) { GInputStream *stream = G_INPUT_STREAM (source_object); gssize count_read; @@ -1293,13 +1305,19 @@ async_read_op_callback (GObject *source_object, } static void -async_skip_op_callback (GInputStream *stream, - gsize count_requested, - gssize count_skipped, - gpointer data, - GError *error) +async_skip_op_callback (GObject *source_object, + GAsyncResult *res, + gpointer user_data) { - async_op_handle ((AsyncIterator *)data, count_skipped, error); + GInputStream *stream = G_INPUT_STREAM (source_object); + gssize count_skipped; + GError *error = NULL; + + count_skipped = g_input_stream_skip_finish (stream, res, &error); + + async_op_handle ((AsyncIterator *)user_data, count_skipped, error); + if (error) + g_error_free (error); } static void @@ -1346,8 +1364,8 @@ async_iterate (AsyncIterator *iterator) g_input_stream_skip_async (file->data_stream, io_data->io_size, iterator->io_priority, - async_skip_op_callback, iterator, - io_data->io_allow_cancel ? iterator->cancellable : NULL); + io_data->io_allow_cancel ? iterator->cancellable : NULL, + async_skip_op_callback, iterator); } else if (io_op == STATE_OP_WRITE) { @@ -1366,7 +1384,7 @@ run_async_state_machine (GDaemonFileInputStream *file, state_machine_iterator iterator_cb, gpointer iterator_data, int io_priority, - gpointer callback, + GAsyncReadyCallback callback, gpointer data, GCancellable *cancellable, AsyncIteratorDone done_cb) @@ -1389,7 +1407,7 @@ run_async_state_machine (GDaemonFileInputStream *file, static void async_read_done (GInputStream *stream, gpointer op_data, - gpointer callback, + GAsyncReadyCallback callback, gpointer user_data, GError *io_error) { @@ -1486,20 +1504,32 @@ static void g_daemon_file_input_stream_skip_async (GInputStream *stream, gsize count, int io_priority, - GAsyncSkipCallback callback, - gpointer data, - GCancellable *cancellable) + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer data) +{ + g_assert_not_reached (); + /* TODO: Not implemented */ +} + +static gssize +g_daemon_file_input_stream_skip_finish (GInputStream *stream, + GAsyncResult *result, + GError **error) { + g_assert_not_reached (); + /* TODO: Not implemented */ } static void async_close_done (GInputStream *stream, gpointer op_data, - gpointer callback, - gpointer callback_data, + GAsyncReadyCallback callback, + gpointer user_data, GError *io_error) { GDaemonFileInputStream *file; + GSimpleAsyncResult *simple; CloseOperation *op; gboolean result; GError *error; @@ -1529,12 +1559,19 @@ async_close_done (GInputStream *stream, result = g_input_stream_close (file->data_stream, cancellable, &error); else g_input_stream_close (file->data_stream, cancellable, NULL); - - if (callback) - ((GAsyncCloseInputCallback)callback) (stream, - result, - callback_data, - error); + + + simple = g_simple_async_result_new (G_OBJECT (stream), + callback, user_data, + g_daemon_file_input_stream_read_async, + NULL, NULL); + + if (!result == -1) + g_simple_async_result_set_from_error (simple, error); + + /* Complete immediately, not in idle, since we're already in a mainloop callout */ + g_simple_async_result_complete (simple); + g_object_unref (simple); if (op->ret_error) g_error_free (op->ret_error); @@ -1542,11 +1579,11 @@ async_close_done (GInputStream *stream, } static void -g_daemon_file_input_stream_close_async (GInputStream *stream, - int io_priority, - GAsyncCloseInputCallback callback, - gpointer data, - GCancellable *cancellable) +g_daemon_file_input_stream_close_async (GInputStream *stream, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer data) { GDaemonFileInputStream *file; AsyncIterator *iterator; @@ -1566,3 +1603,12 @@ g_daemon_file_input_stream_close_async (GInputStream *stream, cancellable, async_close_done); } + +static gboolean +g_daemon_file_input_stream_close_finish (GInputStream *stream, + GAsyncResult *result, + GError **error) +{ + /* Failures handled in generic close_finish code */ + return TRUE; +} diff --git a/client/gdaemonfileoutputstream.c b/client/gdaemonfileoutputstream.c index f3256d2e..9933e139 100644 --- a/client/gdaemonfileoutputstream.c +++ b/client/gdaemonfileoutputstream.c @@ -953,7 +953,7 @@ typedef struct AsyncIterator AsyncIterator; typedef void (*AsyncIteratorDone) (GOutputStream *stream, gpointer op_data, - gpointer callback, + GAsyncReadyCallback callback, gpointer callback_data, GError *io_error); @@ -965,7 +965,7 @@ struct AsyncIterator { state_machine_iterator iterator; gpointer iterator_data; int io_priority; - gpointer callback; + GAsyncReadyCallback callback; gpointer callback_data; }; @@ -1044,13 +1044,19 @@ async_read_op_callback (GObject *source_object, } static void -async_skip_op_callback (GInputStream *stream, - gsize count_requested, - gssize count_skipped, - gpointer data, - GError *error) +async_skip_op_callback (GObject *source_object, + GAsyncResult *res, + gpointer user_data) { - async_op_handle ((AsyncIterator *)data, count_skipped, error); + GInputStream *stream = G_INPUT_STREAM (source_object); + gssize count_skipped; + GError *error = NULL; + + count_skipped = g_input_stream_skip_finish (stream, res, &error); + + async_op_handle ((AsyncIterator *)user_data, count_skipped, error); + if (error) + g_error_free (error); } static void @@ -1097,8 +1103,8 @@ async_iterate (AsyncIterator *iterator) g_input_stream_skip_async (file->data_stream, io_data->io_size, iterator->io_priority, - async_skip_op_callback, iterator, - io_data->io_allow_cancel ? iterator->cancellable : NULL); + io_data->io_allow_cancel ? iterator->cancellable : NULL, + async_skip_op_callback, iterator); } else if (io_op == STATE_OP_WRITE) { @@ -1117,7 +1123,7 @@ run_async_state_machine (GDaemonFileOutputStream *file, state_machine_iterator iterator_cb, gpointer iterator_data, int io_priority, - gpointer callback, + GAsyncReadyCallback callback, gpointer data, GCancellable *cancellable, AsyncIteratorDone done_cb) @@ -1204,9 +1210,9 @@ g_daemon_file_output_stream_write_async (GOutputStream *stream, (state_machine_iterator)iterate_write_state_machine, op, io_priority, - callback, data, + (gpointer)callback, data, cancellable, - async_write_done); + (gpointer)async_write_done); } static void @@ -1279,7 +1285,7 @@ g_daemon_file_output_stream_close_async (GOutputStream *stream, run_async_state_machine (file, (state_machine_iterator)iterate_close_state_machine, op, io_priority, - callback, data, + (gpointer)callback, data, cancellable, - async_close_done); + (gpointer)async_close_done); } diff --git a/gio/gasyncresult.h b/gio/gasyncresult.h index 5a0f6a59..b9be3d29 100644 --- a/gio/gasyncresult.h +++ b/gio/gasyncresult.h @@ -15,8 +15,7 @@ typedef struct _GAsyncResultIface GAsyncResultIface; typedef void (*GAsyncReadyCallback) (GObject *source_object, GAsyncResult *res, - gpointer user_data); - + gpointer user_data); struct _GAsyncResultIface { diff --git a/gio/ginputstream.c b/gio/ginputstream.c index 004a3613..fd3a747a 100644 --- a/gio/ginputstream.c +++ b/gio/ginputstream.c @@ -15,31 +15,37 @@ struct _GInputStreamPrivate { GAsyncReadyCallback outstanding_callback; }; -static gssize g_input_stream_real_skip (GInputStream *stream, - gsize count, - GCancellable *cancellable, - GError **error); -static void g_input_stream_real_read_async (GInputStream *stream, - void *buffer, - gsize count, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data); -static gssize g_input_stream_real_read_finish (GInputStream *stream, - GAsyncResult *result, - GError **error); -static void g_input_stream_real_skip_async (GInputStream *stream, - gsize count, - int io_priority, - GAsyncSkipCallback callback, - gpointer data, - GCancellable *cancellable); -static void g_input_stream_real_close_async (GInputStream *stream, - int io_priority, - GAsyncCloseInputCallback callback, - gpointer data, - GCancellable *cancellable); +static gssize g_input_stream_real_skip (GInputStream *stream, + gsize count, + GCancellable *cancellable, + GError **error); +static void g_input_stream_real_read_async (GInputStream *stream, + void *buffer, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); +static gssize g_input_stream_real_read_finish (GInputStream *stream, + GAsyncResult *result, + GError **error); +static void g_input_stream_real_skip_async (GInputStream *stream, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer data); +static gssize g_input_stream_real_skip_finish (GInputStream *stream, + GAsyncResult *result, + GError **error); +static void g_input_stream_real_close_async (GInputStream *stream, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer data); +static gboolean g_input_stream_real_close_finish (GInputStream *stream, + GAsyncResult *result, + GError **error); static void g_input_stream_finalize (GObject *object) @@ -68,7 +74,9 @@ g_input_stream_class_init (GInputStreamClass *klass) klass->read_async = g_input_stream_real_read_async; klass->read_finish = g_input_stream_real_read_finish; klass->skip_async = g_input_stream_real_skip_async; + klass->skip_finish = g_input_stream_real_skip_finish; klass->close_async = g_input_stream_real_close_async; + klass->close_finish = g_input_stream_real_close_finish; } static void @@ -553,68 +561,12 @@ g_input_stream_read_finish (GInputStream *stream, return class->read_finish (stream, result, error); } -typedef struct { - GAsyncResultData generic; - gsize count_requested; - gssize count_skipped; - GAsyncSkipCallback callback; -} SkipAsyncResult; - -static gboolean -call_skip_async_result (gpointer data) -{ - SkipAsyncResult *res = data; - - if (res->callback) - res->callback (res->generic.async_object, - res->count_requested, - res->count_skipped, - res->generic.user_data, - res->generic.error); - - return FALSE; -} - -static void -queue_skip_async_result (GInputStream *stream, - gsize count_requested, - gssize count_skipped, - GError *error, - GAsyncSkipCallback callback, - gpointer data) -{ - SkipAsyncResult *res; - - res = g_new0 (SkipAsyncResult, 1); - - res->count_requested = count_requested; - res->count_skipped = count_skipped; - res->callback = callback; - - _g_queue_async_result ((GAsyncResultData *)res, stream, - error, data, - call_skip_async_result); -} - -static void -skip_async_callback_wrapper (GInputStream *stream, - gsize count_requested, - gssize count_skipped, - gpointer data, - GError *error) -{ - GAsyncSkipCallback real_callback = (gpointer)stream->priv->outstanding_callback; - - stream->priv->pending = FALSE; - (*real_callback) (stream, count_requested, count_skipped, data, error); - g_object_unref (stream); -} - /** * g_input_stream_skip_async: * @stream: A #GInputStream. * @count: the number of bytes that will be skipped from the stream * @io_priority: the io priority of the request + * @cancellable: optional cancellable object * @callback: callback to call when the request is satisfied * @user_data: the data to pass to callback function * @@ -645,118 +597,94 @@ void g_input_stream_skip_async (GInputStream *stream, gsize count, int io_priority, - GAsyncSkipCallback callback, - gpointer user_data, - GCancellable *cancellable) + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) { GInputStreamClass *class; - GError *error; + GSimpleAsyncResult *simple; g_return_if_fail (G_IS_INPUT_STREAM (stream)); g_return_if_fail (stream != NULL); if (count == 0) { - queue_skip_async_result (stream, count, 0, NULL, - callback, user_data); + simple = g_simple_async_result_new (G_OBJECT (stream), + callback, + user_data, + g_input_stream_skip_async, + NULL, NULL); + g_simple_async_result_complete_in_idle (simple); + g_object_unref (simple); return; } if (((gssize) count) < 0) { - error = NULL; - g_set_error (&error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, - _("Too large count value passed to g_input_stream_skip_async")); - queue_skip_async_result (stream, count, -1, error, - callback, user_data); + report_error (stream, + callback, + user_data, + G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, + _("Too large count value passed to g_input_stream_skip_async")); return; } if (stream->priv->closed) { - error = NULL; - g_set_error (&error, G_IO_ERROR, G_IO_ERROR_CLOSED, - _("Stream is already closed")); - queue_skip_async_result (stream, count, -1, error, - callback, user_data); + report_error (stream, + callback, + user_data, + G_IO_ERROR, G_IO_ERROR_CLOSED, + _("Stream is already closed")); return; } if (stream->priv->pending) { - error = NULL; - g_set_error (&error, G_IO_ERROR, G_IO_ERROR_PENDING, - _("Stream has outstanding operation")); - queue_skip_async_result (stream, count, -1, error, - callback, user_data); + report_error (stream, + callback, + user_data, + G_IO_ERROR, G_IO_ERROR_PENDING, + _("Stream has outstanding operation")); return; } class = G_INPUT_STREAM_GET_CLASS (stream); stream->priv->pending = TRUE; - stream->priv->outstanding_callback = (gpointer)callback; + stream->priv->outstanding_callback = callback; g_object_ref (stream); - class->skip_async (stream, count, io_priority, skip_async_callback_wrapper, user_data, cancellable); -} - - -typedef struct { - GAsyncResultData generic; - gboolean result; - GAsyncCloseInputCallback callback; -} CloseAsyncResult; - -static gboolean -call_close_async_result (gpointer data) -{ - CloseAsyncResult *res = data; - - if (res->callback) - res->callback (res->generic.async_object, - res->result, - res->generic.user_data, - res->generic.error); - - return FALSE; + class->skip_async (stream, count, io_priority, cancellable, + async_ready_callback_wrapper, user_data); } -static void -queue_close_async_result (GInputStream *stream, - gboolean result, - GError *error, - GAsyncCloseInputCallback callback, - gpointer data) +gssize +g_input_stream_skip_finish (GInputStream *stream, + GAsyncResult *result, + GError **error) { - CloseAsyncResult *res; - - res = g_new0 (CloseAsyncResult, 1); - - res->result = result; - res->callback = callback; + GSimpleAsyncResult *simple; + GInputStreamClass *class; - _g_queue_async_result ((GAsyncResultData *)res, stream, - error, data, - call_close_async_result); -} + if (G_IS_SIMPLE_ASYNC_RESULT (result)) + { + simple = G_SIMPLE_ASYNC_RESULT (result); + if (g_simple_async_result_propagate_error (simple, error)) + return -1; -static void -close_async_callback_wrapper (GInputStream *stream, - gboolean result, - gpointer data, - GError *error) -{ - GAsyncCloseInputCallback real_callback = (gpointer)stream->priv->outstanding_callback; + /* Special case skip of 0 bytes */ + if (g_simple_async_result_get_source_tag (simple) == g_input_stream_skip_async) + return 0; + } - stream->priv->pending = FALSE; - stream->priv->closed = TRUE; - (*real_callback) (stream, result, data, error); - g_object_unref (stream); + class = G_INPUT_STREAM_GET_CLASS (stream); + return class->skip_finish (stream, result, error); } /** * g_input_stream_close_async: * @stream: A #GInputStream. * @io_priority: the io priority of the request + * @cancellable: optional cancellable object * @callback: callback to call when the request is satisfied * @user_data: the data to pass to callback function * @@ -772,40 +700,70 @@ close_async_callback_wrapper (GInputStream *stream, void g_input_stream_close_async (GInputStream *stream, int io_priority, - GAsyncCloseInputCallback callback, - gpointer user_data, - GCancellable *cancellable) + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) { GInputStreamClass *class; - GError *error; + GSimpleAsyncResult *simple; g_return_if_fail (G_IS_INPUT_STREAM (stream)); g_return_if_fail (stream != NULL); if (stream->priv->closed) { - queue_close_async_result (stream, TRUE, NULL, - callback, user_data); + simple = g_simple_async_result_new (G_OBJECT (stream), + callback, + user_data, + g_input_stream_close_async, + NULL, NULL); + g_simple_async_result_complete_in_idle (simple); + g_object_unref (simple); return; } if (stream->priv->pending) { - error = NULL; - g_set_error (&error, G_IO_ERROR, G_IO_ERROR_PENDING, - _("Stream has outstanding operation")); - queue_close_async_result (stream, FALSE, error, - callback, user_data); + report_error (stream, + callback, + user_data, + G_IO_ERROR, G_IO_ERROR_PENDING, + _("Stream has outstanding operation")); return; } class = G_INPUT_STREAM_GET_CLASS (stream); stream->priv->pending = TRUE; - stream->priv->outstanding_callback = (gpointer)callback; + stream->priv->outstanding_callback = callback; g_object_ref (stream); - class->close_async (stream, io_priority, close_async_callback_wrapper, user_data, cancellable); + class->close_async (stream, io_priority, cancellable, + async_ready_callback_wrapper, user_data); +} + +gboolean +g_input_stream_close_finish (GInputStream *stream, + GAsyncResult *result, + GError **error) +{ + GSimpleAsyncResult *simple; + GInputStreamClass *class; + + if (G_IS_SIMPLE_ASYNC_RESULT (result)) + { + simple = G_SIMPLE_ASYNC_RESULT (result); + if (g_simple_async_result_propagate_error (simple, error)) + return FALSE; + + /* Special case already closed */ + if (g_simple_async_result_get_source_tag (simple) == g_input_stream_close_async) + return TRUE; + } + + class = G_INPUT_STREAM_GET_CLASS (stream); + return class->close_finish (stream, result, error); } + gboolean g_input_stream_is_closed (GInputStream *stream) { @@ -838,25 +796,6 @@ g_input_stream_set_pending (GInputStream *stream, * Default implementation of async ops * ********************************************/ -typedef struct { - GInputStream *stream; - GError *error; - gpointer data; -} InputStreamOp; - -static void -input_stream_op_free (gpointer data) -{ - InputStreamOp *op = data; - - if (op->error) - g_error_free (op->error); - - g_free (op); -} - -/************ read_async ***********/ - typedef struct { void *buffer; gsize count_requested; @@ -920,90 +859,84 @@ g_input_stream_real_read_finish (GInputStream *stream, return op->count_read; } -/**** skip_async ****/ - typedef struct { - InputStreamOp op; - gsize count_requested; - gssize count_skipped; - GAsyncSkipCallback callback; -} SkipAsyncOp; - -static void -skip_op_report (gpointer data) -{ - SkipAsyncOp *op = data; - - op->callback (op->op.stream, - op->count_requested, - op->count_skipped, - op->op.data, - op->op.error); + gsize count_requested; + gssize count_skipped; +} SkipData; -} static void -skip_op_func (GIOJob *job, - GCancellable *c, - gpointer data) +skip_async_thread (GSimpleAsyncResult *res, + gpointer op_data, + GObject *object, + GCancellable *cancellable) { - SkipAsyncOp *op = data; + SkipData *op = op_data; GInputStreamClass *class; - - if (g_cancellable_is_cancelled (c)) - { - op->count_skipped = -1; - g_set_error (&op->op.error, - G_IO_ERROR, - G_IO_ERROR_CANCELLED, - _("Operation was cancelled")); - } - else + GError *error = NULL; + + class = G_INPUT_STREAM_GET_CLASS (object); + + op->count_skipped = class->skip (G_INPUT_STREAM (object), + op->count_requested, + cancellable, &error); + if (op->count_skipped == -1) { - class = G_INPUT_STREAM_GET_CLASS (op->op.stream); - op->count_skipped = class->skip (op->op.stream, op->count_requested, - c, &op->op.error); + g_simple_async_result_set_from_error (res, error); + g_error_free (error); } - - g_io_job_send_to_mainloop (job, skip_op_report, - op, input_stream_op_free, - FALSE); } typedef struct { char *buffer; gpointer user_data; - GAsyncSkipCallback callback; + GAsyncReadyCallback callback; } SkipFallbackAsyncData; -/* static void -skip_callback_wrapper (GInputStream *stream, - void *buffer, - gsize count_requested, - gssize count_skipped, - gpointer _data, - GError *error) +skip_callback_wrapper (GObject *source_object, + GAsyncResult *res, + gpointer user_data) { - SkipFallbackAsyncData *data = _data; + SkipFallbackAsyncData *data = user_data; + SkipData *op; + GSimpleAsyncResult *simple; + GError *error = NULL; + + op = g_new0 (SkipData, 1); + simple = g_simple_async_result_new (source_object, + data->callback, data->user_data, + g_input_stream_real_skip_async, op, g_free); + + op->count_skipped = + g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error); + + if (op->count_skipped == -1) + { + g_simple_async_result_set_from_error (simple, error); + g_error_free (error); + } - data->callback (stream, count_requested, count_skipped, data->user_data, error); + /* Complete immediately, not in idle, since we're already in a mainloop callout */ + g_simple_async_result_complete (simple); + g_object_unref (simple); + g_free (data->buffer); g_free (data); -} -*/ + } static void g_input_stream_real_skip_async (GInputStream *stream, gsize count, int io_priority, - GAsyncSkipCallback callback, - gpointer user_data, - GCancellable *cancellable) + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) { GInputStreamClass *class; - SkipAsyncOp *op; + SkipData *op; SkipFallbackAsyncData *data; + GSimpleAsyncResult *res; class = G_INPUT_STREAM_GET_CLASS (stream); @@ -1012,97 +945,101 @@ g_input_stream_real_skip_async (GInputStream *stream, /* Read is thread-using async fallback. Make skip use * threads too, so that we can use a possible sync skip * implementation. */ - op = g_new0 (SkipAsyncOp, 1); + op = g_new0 (SkipData, 1); - op->op.stream = stream; + res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, + g_input_stream_real_skip_async, op, g_free); op->count_requested = count; - op->callback = callback; - op->op.data = user_data; - - g_schedule_io_job (skip_op_func, - op, - NULL, - io_priority, - cancellable); + + g_simple_async_result_run_in_thread (res, skip_async_thread, io_priority, cancellable); + g_object_unref (res); } else { - /* TODO: Skip fallback uses too much memory, should do multiple calls */ - /* Custom async read function, lets use that. */ + /* TODO: Skip fallback uses too much memory, should do multiple read calls */ + + /* There is a custom async read function, lets use that. */ data = g_new (SkipFallbackAsyncData, 1); data->buffer = g_malloc (count); data->callback = callback; data->user_data = user_data; - /* TODO: uncomment when updating async - class->read_async (stream, data->buffer, count, io_priority, - skip_callback_wrapper, data, cancellable); - */ + class->read_async (stream, data->buffer, count, io_priority, cancellable, + skip_callback_wrapper, data); } } -typedef struct { - InputStreamOp op; - gboolean res; - GAsyncCloseInputCallback callback; -} CloseAsyncOp; - -static void -close_op_report (gpointer data) +static gssize +g_input_stream_real_skip_finish (GInputStream *stream, + GAsyncResult *result, + GError **error) { - CloseAsyncOp *op = data; + GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result); + SkipData *op; + + g_assert (g_simple_async_result_get_source_tag (simple) == g_input_stream_real_skip_async); - op->callback (op->op.stream, - op->res, - op->op.data, - op->op.error); + op = g_simple_async_result_get_op_data (simple); + return op->count_skipped; } +typedef struct { + gboolean res; +} CloseData; + static void -close_op_func (GIOJob *job, - GCancellable *c, - gpointer data) +close_async_thread (GSimpleAsyncResult *res, + gpointer op_data, + GObject *object, + GCancellable *cancellable) { - CloseAsyncOp *op = data; + CloseData *op = op_data; GInputStreamClass *class; + GError *error = NULL; - if (g_cancellable_is_cancelled (c)) - { - op->res = FALSE; - g_set_error (&op->op.error, - G_IO_ERROR, - G_IO_ERROR_CANCELLED, - _("Operation was cancelled")); - } - else + /* Auto handling of cancelation disabled, and ignore + cancellation, since we want to close things anyway, although + possibly in a quick-n-dirty way. At least we never want to leak + open handles */ + + class = G_INPUT_STREAM_GET_CLASS (object); + op->res = class->close (G_INPUT_STREAM (object), cancellable, &error); + if (!op->res) { - class = G_INPUT_STREAM_GET_CLASS (op->op.stream); - op->res = class->close (op->op.stream, c, &op->op.error); + g_simple_async_result_set_from_error (res, error); + g_error_free (error); } - - g_io_job_send_to_mainloop (job, close_op_report, - op, input_stream_op_free, - FALSE); } static void -g_input_stream_real_close_async (GInputStream *stream, - int io_priority, - GAsyncCloseInputCallback callback, - gpointer data, - GCancellable *cancellable) +g_input_stream_real_close_async (GInputStream *stream, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) { - CloseAsyncOp *op; - - op = g_new0 (CloseAsyncOp, 1); + GSimpleAsyncResult *res; + CloseData *op; + + op = g_new (CloseData, 1); + res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_input_stream_real_close_async, op, g_free); - op->op.stream = stream; - op->callback = callback; - op->op.data = data; + g_simple_async_result_set_handle_cancellation (res, FALSE); - g_schedule_io_job (close_op_func, - op, - NULL, - io_priority, - cancellable); + g_simple_async_result_run_in_thread (res, close_async_thread, io_priority, cancellable); + g_object_unref (res); +} + +static gboolean +g_input_stream_real_close_finish (GInputStream *stream, + GAsyncResult *result, + GError **error) +{ + GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result); + CloseData *op; + + g_assert (g_simple_async_result_get_source_tag (simple) == g_input_stream_real_close_async); + + op = g_simple_async_result_get_op_data (simple); + return op->res; } diff --git a/gio/ginputstream.h b/gio/ginputstream.h index f92e1cf4..7f407c31 100644 --- a/gio/ginputstream.h +++ b/gio/ginputstream.h @@ -20,30 +20,6 @@ typedef struct _GInputStream GInputStream; typedef struct _GInputStreamClass GInputStreamClass; typedef struct _GInputStreamPrivate GInputStreamPrivate; -typedef void (*GAsyncSkipCallback) (GInputStream *stream, - gsize count_requested, - gssize count_skipped, - gpointer user_data, - GError *error); - -/** - * GAsyncCloseInputCallback: - * @stream: a #GInputStream - * @result: %TRUE on success, %FALSE otherwis - * @error: the error, if result is %FALSE, otherwise %NULL - * - * This callback is called when an asychronous close operation - * is finished. - * - * The callback is always called, even if the operation was cancelled. - * If the operation was cancelled @result will be %FALSE, and @error - * will be %G_IO_ERROR_CANCELLED. - **/ -typedef void (*GAsyncCloseInputCallback) (GInputStream *stream, - gboolean result, - gpointer user_data, - GError *error); - struct _GInputStream { GObject parent; @@ -72,27 +48,33 @@ struct _GInputStreamClass GError **error); /* Async ops: (optional in derived classes) */ - void (* read_async) (GInputStream *stream, - void *buffer, - gsize count, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data); - gssize (* read_finish) (GInputStream *stream, - GAsyncResult *result, - GError **error); - void (* skip_async) (GInputStream *stream, - gsize count, - int io_priority, - GAsyncSkipCallback callback, - gpointer user_data, - GCancellable *cancellable); - void (* close_async) (GInputStream *stream, - int io_priority, - GAsyncCloseInputCallback callback, - gpointer user_data, - GCancellable *cancellable); + void (* read_async) (GInputStream *stream, + void *buffer, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); + gssize (* read_finish) (GInputStream *stream, + GAsyncResult *result, + GError **error); + void (* skip_async) (GInputStream *stream, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); + gssize (* skip_finish) (GInputStream *stream, + GAsyncResult *result, + GError **error); + void (* close_async) (GInputStream *stream, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); + gboolean (* close_finish)(GInputStream *stream, + GAsyncResult *result, + GError **error); /* Padding for future expansion */ void (*_g_reserved1) (void); @@ -104,53 +86,58 @@ struct _GInputStreamClass GType g_input_stream_get_type (void) G_GNUC_CONST; -gssize g_input_stream_read (GInputStream *stream, - void *buffer, - gsize count, - GCancellable *cancellable, - GError **error); -gboolean g_input_stream_read_all (GInputStream *stream, - void *buffer, - gsize count, - gsize *bytes_read, - GCancellable *cancellable, - GError **error); -gssize g_input_stream_skip (GInputStream *stream, - gsize count, - GCancellable *cancellable, - GError **error); -gboolean g_input_stream_close (GInputStream *stream, - GCancellable *cancellable, - GError **error); -void g_input_stream_read_async (GInputStream *stream, - void *buffer, - gsize count, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data); -gssize g_input_stream_read_finish (GInputStream *stream, - GAsyncResult *result, - GError **error); -void g_input_stream_skip_async (GInputStream *stream, - gsize count, - int io_priority, - GAsyncSkipCallback callback, - gpointer user_data, - GCancellable *cancellable); -void g_input_stream_close_async (GInputStream *stream, - int io_priority, - GAsyncCloseInputCallback callback, - gpointer user_data, - GCancellable *cancellable); +gssize g_input_stream_read (GInputStream *stream, + void *buffer, + gsize count, + GCancellable *cancellable, + GError **error); +gboolean g_input_stream_read_all (GInputStream *stream, + void *buffer, + gsize count, + gsize *bytes_read, + GCancellable *cancellable, + GError **error); +gssize g_input_stream_skip (GInputStream *stream, + gsize count, + GCancellable *cancellable, + GError **error); +gboolean g_input_stream_close (GInputStream *stream, + GCancellable *cancellable, + GError **error); +void g_input_stream_read_async (GInputStream *stream, + void *buffer, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); +gssize g_input_stream_read_finish (GInputStream *stream, + GAsyncResult *result, + GError **error); +void g_input_stream_skip_async (GInputStream *stream, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); +gssize g_input_stream_skip_finish (GInputStream *stream, + GAsyncResult *result, + GError **error); +void g_input_stream_close_async (GInputStream *stream, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); +gboolean g_input_stream_close_finish (GInputStream *stream, + GAsyncResult *result, + GError **error); /* For implementations: */ -gboolean g_input_stream_is_closed (GInputStream *stream); -gboolean g_input_stream_has_pending (GInputStream *stream); -void g_input_stream_set_pending (GInputStream *stream, - gboolean pending); - +gboolean g_input_stream_is_closed (GInputStream *stream); +gboolean g_input_stream_has_pending (GInputStream *stream); +void g_input_stream_set_pending (GInputStream *stream, + gboolean pending); G_END_DECLS diff --git a/gio/gmemoryinputstream.c b/gio/gmemoryinputstream.c index aaa7595d..71d9b12d 100644 --- a/gio/gmemoryinputstream.c +++ b/gio/gmemoryinputstream.c @@ -16,39 +16,45 @@ struct _GMemoryInputStreamPrivate { }; -static gssize g_memory_input_stream_read (GInputStream *stream, - void *buffer, - gsize count, - GCancellable *cancellable, - GError **error); -static gssize g_memory_input_stream_skip (GInputStream *stream, - gsize count, - GCancellable *cancellable, - GError **error); -static gboolean g_memory_input_stream_close (GInputStream *stream, - GCancellable *cancellable, - GError **error); -static void g_memory_input_stream_read_async (GInputStream *stream, - void *buffer, - gsize count, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data); -static gssize g_memory_input_stream_read_finish (GInputStream *stream, - GAsyncResult *result, - GError **error); -static void g_memory_input_stream_skip_async (GInputStream *stream, - gsize count, - int io_priority, - GAsyncSkipCallback callback, - gpointer data, - GCancellable *cancellable); -static void g_memory_input_stream_close_async (GInputStream *stream, - int io_priority, - GAsyncCloseInputCallback callback, - gpointer data, - GCancellable *cancellable); +static gssize g_memory_input_stream_read (GInputStream *stream, + void *buffer, + gsize count, + GCancellable *cancellable, + GError **error); +static gssize g_memory_input_stream_skip (GInputStream *stream, + gsize count, + GCancellable *cancellable, + GError **error); +static gboolean g_memory_input_stream_close (GInputStream *stream, + GCancellable *cancellable, + GError **error); +static void g_memory_input_stream_read_async (GInputStream *stream, + void *buffer, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); +static gssize g_memory_input_stream_read_finish (GInputStream *stream, + GAsyncResult *result, + GError **error); +static void g_memory_input_stream_skip_async (GInputStream *stream, + gsize count, + int io_priority, + GCancellable *cancellabl, + GAsyncReadyCallback callback, + gpointer datae); +static gssize g_memory_input_stream_skip_finish (GInputStream *stream, + GAsyncResult *result, + GError **error); +static void g_memory_input_stream_close_async (GInputStream *stream, + int io_priority, + GCancellable *cancellabl, + GAsyncReadyCallback callback, + gpointer data); +static gboolean g_memory_input_stream_close_finish (GInputStream *stream, + GAsyncResult *result, + GError **error); static void g_memory_input_stream_seekable_iface_init (GSeekableIface *iface); static goffset g_memory_input_stream_tell (GSeekable *seekable); @@ -84,8 +90,9 @@ g_memory_input_stream_class_init (GMemoryInputStreamClass *klass) istream_class->read_async = g_memory_input_stream_read_async; istream_class->read_finish = g_memory_input_stream_read_finish; istream_class->skip_async = g_memory_input_stream_skip_async; + istream_class->skip_finish = g_memory_input_stream_skip_finish; istream_class->close_async = g_memory_input_stream_close_async; - + istream_class->close_finish = g_memory_input_stream_close_finish; } static void @@ -214,31 +221,63 @@ static void g_memory_input_stream_skip_async (GInputStream *stream, gsize count, int io_priority, - GAsyncSkipCallback callback, - gpointer data, - GCancellable *cancellable) + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) { - GMemoryInputStream *memory_stream; - GMemoryInputStreamPrivate *priv; - gsize nskipped; + GSimpleAsyncResult *simple; + gssize *nskipped; - memory_stream = G_MEMORY_INPUT_STREAM (stream); - priv = memory_stream->priv; - - nskipped = MIN (count, priv->len - priv->pos); + nskipped = g_new (gssize, 1); + *nskipped = g_memory_input_stream_skip (stream, count, cancellable, NULL); + simple = g_simple_async_result_new (G_OBJECT (stream), + callback, + user_data, + g_memory_input_stream_skip_async, + nskipped, g_free); + g_simple_async_result_complete_in_idle (simple); + g_object_unref (simple); +} - priv->pos += nskipped; - (*callback) (stream, count, nskipped, data, NULL); +static gssize +g_memory_input_stream_skip_finish (GInputStream *stream, + GAsyncResult *result, + GError **error) +{ + GSimpleAsyncResult *simple; + gssize *nskipped; + + simple = G_SIMPLE_ASYNC_RESULT (result); + g_assert (g_simple_async_result_get_source_tag (simple) == g_memory_input_stream_skip_async); + + nskipped = g_simple_async_result_get_op_data (simple); + return *nskipped; } static void g_memory_input_stream_close_async (GInputStream *stream, int io_priority, - GAsyncCloseInputCallback callback, - gpointer data, - GCancellable *cancellable) + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) { - (*callback) (stream, TRUE, data, NULL); + GSimpleAsyncResult *simple; + + simple = g_simple_async_result_new (G_OBJECT (stream), + callback, + user_data, + g_memory_input_stream_close_async, + NULL, NULL); + g_simple_async_result_complete_in_idle (simple); + g_object_unref (simple); +} + +static gboolean +g_memory_input_stream_close_finish (GInputStream *stream, + GAsyncResult *result, + GError **error) +{ + return TRUE; } static goffset diff --git a/gio/gsimpleasyncresult.c b/gio/gsimpleasyncresult.c index 6ad1ac59..3a6273ae 100644 --- a/gio/gsimpleasyncresult.c +++ b/gio/gsimpleasyncresult.c @@ -24,6 +24,7 @@ struct _GSimpleAsyncResult gpointer user_data; GError *error; gboolean failed; + gboolean handle_cancellation; gpointer source_tag; @@ -66,6 +67,7 @@ g_simple_async_result_class_init (GSimpleAsyncResultClass *klass) static void g_simple_async_result_init (GSimpleAsyncResult *simple) { + simple->handle_cancellation = TRUE; } GSimpleAsyncResult * @@ -153,6 +155,13 @@ g_simple_async_result_async_result_iface_init (GAsyncResultIface *iface) } +void +g_simple_async_result_set_handle_cancellation (GSimpleAsyncResult *simple, + gboolean handle_cancellation) +{ + simple->handle_cancellation = handle_cancellation; +} + gpointer g_simple_async_result_get_source_tag (GSimpleAsyncResult *simple) { @@ -279,7 +288,8 @@ run_in_thread (GIOJob *job, RunInThreadData *data = _data; GSimpleAsyncResult *simple = data->simple; - if (g_cancellable_is_cancelled (c)) + if (simple->handle_cancellation && + g_cancellable_is_cancelled (c)) { g_simple_async_result_set_error (simple, G_IO_ERROR, diff --git a/gio/gsimpleasyncresult.h b/gio/gsimpleasyncresult.h index 74127f50..080d4ed7 100644 --- a/gio/gsimpleasyncresult.h +++ b/gio/gsimpleasyncresult.h @@ -48,6 +48,8 @@ GSimpleAsyncResult *g_simple_async_result_new_from_error (GObject GError *error); gpointer g_simple_async_result_get_op_data (GSimpleAsyncResult *simple); gpointer g_simple_async_result_get_source_tag (GSimpleAsyncResult *simple); +void g_simple_async_result_set_handle_cancellation (GSimpleAsyncResult *simple, + gboolean handle_cancellation); void g_simple_async_result_complete (GSimpleAsyncResult *simple); void g_simple_async_result_complete_in_idle (GSimpleAsyncResult *simple); void g_simple_async_result_run_in_thread (GSimpleAsyncResult *simple, diff --git a/gio/gsocketinputstream.c b/gio/gsocketinputstream.c index dada09b2..4a25f045 100644 --- a/gio/gsocketinputstream.c +++ b/gio/gsocketinputstream.c @@ -24,35 +24,41 @@ struct _GSocketInputStreamPrivate { gboolean close_fd_at_close; }; -static gssize g_socket_input_stream_read (GInputStream *stream, - void *buffer, - gsize count, - GCancellable *cancellable, - GError **error); -static gboolean g_socket_input_stream_close (GInputStream *stream, - GCancellable *cancellable, - GError **error); -static void g_socket_input_stream_read_async (GInputStream *stream, - void *buffer, - gsize count, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer data); -static gssize g_socket_input_stream_read_finish (GInputStream *stream, - GAsyncResult *result, - GError **error); -static void g_socket_input_stream_skip_async (GInputStream *stream, - gsize count, - int io_priority, - GAsyncSkipCallback callback, - gpointer data, - GCancellable *cancellable); -static void g_socket_input_stream_close_async (GInputStream *stream, - int io_priority, - GAsyncCloseInputCallback callback, - gpointer data, - GCancellable *cancellable); +static gssize g_socket_input_stream_read (GInputStream *stream, + void *buffer, + gsize count, + GCancellable *cancellable, + GError **error); +static gboolean g_socket_input_stream_close (GInputStream *stream, + GCancellable *cancellable, + GError **error); +static void g_socket_input_stream_read_async (GInputStream *stream, + void *buffer, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer data); +static gssize g_socket_input_stream_read_finish (GInputStream *stream, + GAsyncResult *result, + GError **error); +static void g_socket_input_stream_skip_async (GInputStream *stream, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer data); +static gssize g_socket_input_stream_skip_finish (GInputStream *stream, + GAsyncResult *result, + GError **error); +static void g_socket_input_stream_close_async (GInputStream *stream, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer data); +static gboolean g_socket_input_stream_close_finish (GInputStream *stream, + GAsyncResult *result, + GError **error); static void g_socket_input_stream_finalize (GObject *object) @@ -79,8 +85,14 @@ g_socket_input_stream_class_init (GSocketInputStreamClass *klass) stream_class->close = g_socket_input_stream_close; stream_class->read_async = g_socket_input_stream_read_async; stream_class->read_finish = g_socket_input_stream_read_finish; - stream_class->skip_async = g_socket_input_stream_skip_async; + if (0) + { + /* TODO: Implement instead of using fallbacks */ + stream_class->skip_async = g_socket_input_stream_skip_async; + stream_class->skip_finish = g_socket_input_stream_skip_finish; + } stream_class->close_async = g_socket_input_stream_close_async; + stream_class->close_finish = g_socket_input_stream_close_finish; } static void @@ -319,17 +331,27 @@ static void g_socket_input_stream_skip_async (GInputStream *stream, gsize count, int io_priority, - GAsyncSkipCallback callback, - gpointer data, - GCancellable *cancellable) + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer data) +{ + g_assert_not_reached (); + /* TODO: Not implemented */ +} + +static gssize +g_socket_input_stream_skip_finish (GInputStream *stream, + GAsyncResult *result, + GError **error) { g_assert_not_reached (); /* TODO: Not implemented */ } + typedef struct { GInputStream *stream; - GAsyncCloseInputCallback callback; + GAsyncReadyCallback callback; gpointer user_data; } CloseAsyncData; @@ -345,6 +367,7 @@ static gboolean close_async_cb (CloseAsyncData *data) { GSocketInputStream *socket_stream; + GSimpleAsyncResult *simple; GError *error = NULL; gboolean result; int res; @@ -371,14 +394,23 @@ close_async_cb (CloseAsyncData *data) } result = res != -1; - + out: - data->callback (data->stream, - result, - data->user_data, - error); - if (error) - g_error_free (error); + simple = g_simple_async_result_new (G_OBJECT (data->stream), + data->callback, + data->user_data, + g_socket_input_stream_close_async, + NULL, NULL); + + if (!result) + { + g_simple_async_result_set_from_error (simple, error); + g_error_free (error); + } + + /* Complete immediately, not in idle, since we're already in a mainloop callout */ + g_simple_async_result_complete (simple); + g_object_unref (simple); return FALSE; } @@ -386,9 +418,9 @@ close_async_cb (CloseAsyncData *data) static void g_socket_input_stream_close_async (GInputStream *stream, int io_priority, - GAsyncCloseInputCallback callback, - gpointer user_data, - GCancellable *cancellable) + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) { GSource *idle; CloseAsyncData *data; @@ -404,3 +436,13 @@ g_socket_input_stream_close_async (GInputStream *stream, g_source_attach (idle, NULL); g_source_unref (idle); } + +static gboolean +g_socket_input_stream_close_finish (GInputStream *stream, + GAsyncResult *result, + GError **error) +{ + /* Failures handled in generic close_finish code */ + return TRUE; +} + diff --git a/gio/test-gio.c b/gio/test-gio.c index 827c6e2c..2ed40fe8 100755 --- a/gio/test-gio.c +++ b/gio/test-gio.c @@ -142,16 +142,23 @@ typedef struct { } AsyncData; static void -close_done (GInputStream *stream, - gboolean result, - gpointer _data, - GError *error) +close_done (GObject *source_object, + GAsyncResult *res, + gpointer user_data) { - AsyncData *data = _data; + AsyncData *data = user_data; + GInputStream *stream = G_INPUT_STREAM (source_object); + gboolean result; + GError *error = NULL; + + result = g_input_stream_close_finish (stream, res, &error); g_print ("close result: %d\n", result); if (!result) - g_print ("Close error %d: %s\n", error->code, error->message); + { + g_print ("Close error %d: %s\n", error->code, error->message); + g_error_free (error); + } g_object_unref (data->c); g_free (data); @@ -164,16 +171,18 @@ read_done (GObject *source_object, { AsyncData *data = user_data; GInputStream *stream = G_INPUT_STREAM (source_object); - gssize count_read; - GError *error = NULL; - + gssize count_read; + GError *error = NULL; count_read = g_input_stream_read_finish (stream, res, &error); g_print ("count_read: %d\n", count_read); if (count_read == -1) - g_print ("Error %d: %s\n", error->code, error->message); + { + g_print ("Error %d: %s\n", error->code, error->message); + g_error_free (error); + } else if (0) { data->buffer[count_read] = 0; @@ -186,7 +195,7 @@ read_done (GObject *source_object, //g_cancellable_cancel (data->c); } else - g_input_stream_close_async (stream, 0, close_done, data, data->c); + g_input_stream_close_async (stream, 0, data->c, close_done, data); } static void -- cgit v1.2.1