summaryrefslogtreecommitdiff
path: root/libpurple/queuedoutputstream.c
diff options
context:
space:
mode:
authorElliott Sales de Andrade <qulogic@pidgin.im>2019-03-29 03:20:55 -0400
committerElliott Sales de Andrade <qulogic@pidgin.im>2019-03-29 03:20:55 -0400
commita8e049b0d3d7475da38a75ba4e62f790eadbd904 (patch)
tree8dd958b780f87b23813ed5014e592b9db1531d1a /libpurple/queuedoutputstream.c
parentb7123573a7d04bcaf51ec1847b4bcf93e8a3fea4 (diff)
downloadpidgin-a8e049b0d3d7475da38a75ba4e62f790eadbd904.tar.gz
Use more standard GObject setup for queuedoutputstream.
Diffstat (limited to 'libpurple/queuedoutputstream.c')
-rw-r--r--libpurple/queuedoutputstream.c156
1 files changed, 93 insertions, 63 deletions
diff --git a/libpurple/queuedoutputstream.c b/libpurple/queuedoutputstream.c
index c0cf44dfc6..ff14d5fb56 100644
--- a/libpurple/queuedoutputstream.c
+++ b/libpurple/queuedoutputstream.c
@@ -24,68 +24,42 @@
#include "internal.h"
#include "queuedoutputstream.h"
-struct _PurpleQueuedOutputStreamPrivate {
- GAsyncQueue *queue;
- gboolean pending_queued;
-};
-
-static GObjectClass *parent_class = NULL;
-
-#define PURPLE_QUEUED_OUTPUT_STREAM_GET_PRIVATE(obj) \
- (G_TYPE_INSTANCE_GET_PRIVATE((obj), \
- PURPLE_TYPE_QUEUED_OUTPUT_STREAM, \
- PurpleQueuedOutputStreamPrivate))
-
-G_DEFINE_TYPE_WITH_CODE(PurpleQueuedOutputStream, purple_queued_output_stream,
- G_TYPE_FILTER_OUTPUT_STREAM,
- G_ADD_PRIVATE(PurpleQueuedOutputStream))
-
-static void purple_queued_output_stream_dispose(GObject *object);
-static void purple_queued_output_stream_start_push_bytes_async(GTask *task);
-
-static void
-purple_queued_output_stream_class_init(PurpleQueuedOutputStreamClass *klass)
+/**
+ * PurpleQueuedOutputStream:
+ *
+ * An implementation of #GFilterOutputStream which allows queuing data for
+ * output. This allows data to be queued while other data is being output.
+ * Therefore, data doesn't have to be manually stored while waiting for
+ * stream operations to finish.
+ *
+ * To create a queued output stream, use #purple_queued_output_stream_new().
+ *
+ * To queue data, use #purple_queued_output_stream_push_bytes_async().
+ *
+ * If there's a fatal stream error, it's suggested to clear the remaining
+ * bytes queued with #purple_queued_output_stream_clear_queue() to avoid
+ * excessive errors returned in
+ * #purple_queued_output_stream_push_bytes_async()'s async callback.
+ */
+struct _PurpleQueuedOutputStream
{
- GObjectClass *object_class;
-
- parent_class = g_type_class_peek_parent(klass);
-
- object_class = G_OBJECT_CLASS(klass);
- object_class->dispose = purple_queued_output_stream_dispose;
-}
+ GFilterOutputStream parent;
+};
-PurpleQueuedOutputStream *
-purple_queued_output_stream_new(GOutputStream *base_stream)
+typedef struct _PurpleQueuedOutputStreamPrivate
{
- PurpleQueuedOutputStream *stream;
-
- g_return_val_if_fail(G_IS_OUTPUT_STREAM(base_stream), NULL);
-
- stream = g_object_new(PURPLE_TYPE_QUEUED_OUTPUT_STREAM,
- "base-stream", base_stream,
- NULL);
-
- return stream;
-}
+ GAsyncQueue *queue;
+ gboolean pending_queued;
+} PurpleQueuedOutputStreamPrivate;
-static void
-purple_queued_output_stream_init(PurpleQueuedOutputStream *stream)
-{
- stream->priv = PURPLE_QUEUED_OUTPUT_STREAM_GET_PRIVATE(stream);
- stream->priv->queue =
- g_async_queue_new_full((GDestroyNotify)g_bytes_unref);
- stream->priv->pending_queued = FALSE;
-}
+G_DEFINE_TYPE_WITH_PRIVATE(PurpleQueuedOutputStream,
+ purple_queued_output_stream, G_TYPE_FILTER_OUTPUT_STREAM)
-static void
-purple_queued_output_stream_dispose(GObject *object)
-{
- PurpleQueuedOutputStream *stream = PURPLE_QUEUED_OUTPUT_STREAM(object);
+/******************************************************************************
+ * Helpers
+ *****************************************************************************/
- g_clear_pointer(&stream->priv->queue, g_async_queue_unref);
-
- G_OBJECT_CLASS(parent_class)->dispose(object);
-}
+static void purple_queued_output_stream_start_push_bytes_async(GTask *task);
static void
purple_queued_output_stream_push_bytes_async_cb(GObject *source,
@@ -93,6 +67,7 @@ purple_queued_output_stream_push_bytes_async_cb(GObject *source,
{
GTask *task = G_TASK(user_data);
PurpleQueuedOutputStream *stream = g_task_get_source_object(task);
+ PurpleQueuedOutputStreamPrivate *priv = purple_queued_output_stream_get_instance_private(stream);
gssize written;
GBytes *bytes;
gsize size;
@@ -126,7 +101,7 @@ purple_queued_output_stream_push_bytes_async_cb(GObject *source,
if (task == NULL) {
/* Any queued data left? */
- task = g_async_queue_try_pop(stream->priv->queue);
+ task = g_async_queue_try_pop(priv->queue);
}
if (task != NULL) {
@@ -134,7 +109,7 @@ purple_queued_output_stream_push_bytes_async_cb(GObject *source,
purple_queued_output_stream_start_push_bytes_async(task);
} else {
/* All done */
- stream->priv->pending_queued = FALSE;
+ priv->pending_queued = FALSE;
g_output_stream_clear_pending(G_OUTPUT_STREAM(stream));
}
}
@@ -156,6 +131,55 @@ purple_queued_output_stream_start_push_bytes_async(GTask *task)
task);
}
+/******************************************************************************
+ * GObject Implementation
+ *****************************************************************************/
+
+static void
+purple_queued_output_stream_dispose(GObject *object)
+{
+ PurpleQueuedOutputStream *stream = PURPLE_QUEUED_OUTPUT_STREAM(object);
+ PurpleQueuedOutputStreamPrivate *priv = purple_queued_output_stream_get_instance_private(stream);
+
+ g_clear_pointer(&priv->queue, g_async_queue_unref);
+
+ G_OBJECT_CLASS(purple_queued_output_stream_parent_class)->dispose(object);
+}
+
+static void
+purple_queued_output_stream_class_init(PurpleQueuedOutputStreamClass *klass)
+{
+ GObjectClass *obj_class = G_OBJECT_CLASS(klass);
+
+ obj_class->dispose = purple_queued_output_stream_dispose;
+}
+
+static void
+purple_queued_output_stream_init(PurpleQueuedOutputStream *stream)
+{
+ PurpleQueuedOutputStreamPrivate *priv = purple_queued_output_stream_get_instance_private(stream);
+ priv->queue = g_async_queue_new_full((GDestroyNotify)g_bytes_unref);
+ priv->pending_queued = FALSE;
+}
+
+/******************************************************************************
+ * Public API
+ *****************************************************************************/
+
+PurpleQueuedOutputStream *
+purple_queued_output_stream_new(GOutputStream *base_stream)
+{
+ PurpleQueuedOutputStream *stream;
+
+ g_return_val_if_fail(G_IS_OUTPUT_STREAM(base_stream), NULL);
+
+ stream = g_object_new(PURPLE_TYPE_QUEUED_OUTPUT_STREAM,
+ "base-stream", base_stream,
+ NULL);
+
+ return stream;
+}
+
void
purple_queued_output_stream_push_bytes_async(PurpleQueuedOutputStream *stream,
GBytes *bytes, int io_priority, GCancellable *cancellable,
@@ -164,10 +188,13 @@ purple_queued_output_stream_push_bytes_async(PurpleQueuedOutputStream *stream,
GTask *task;
gboolean set_pending;
GError *error = NULL;
+ PurpleQueuedOutputStreamPrivate *priv = NULL;
- g_return_if_fail(PURPLE_QUEUED_OUTPUT_STREAM(stream));
+ g_return_if_fail(PURPLE_IS_QUEUED_OUTPUT_STREAM(stream));
g_return_if_fail(bytes != NULL);
+ priv = purple_queued_output_stream_get_instance_private(stream);
+
task = g_task_new(stream, cancellable, callback, user_data);
g_task_set_task_data(task, g_bytes_ref(bytes),
(GDestroyNotify)g_bytes_unref);
@@ -183,20 +210,20 @@ purple_queued_output_stream_push_bytes_async(PurpleQueuedOutputStream *stream,
*/
if (!set_pending && (!g_error_matches(error,
G_IO_ERROR, G_IO_ERROR_PENDING) ||
- !stream->priv->pending_queued)) {
+ !priv->pending_queued)) {
g_task_return_error(task, error);
g_object_unref(task);
return;
}
- stream->priv->pending_queued = TRUE;
+ priv->pending_queued = TRUE;
if (set_pending) {
/* Start processing if there were no pending operations */
purple_queued_output_stream_start_push_bytes_async(task);
} else {
/* Otherwise queue the data */
- g_async_queue_push(stream->priv->queue, task);
+ g_async_queue_push(priv->queue, task);
}
}
@@ -216,10 +243,13 @@ void
purple_queued_output_stream_clear_queue(PurpleQueuedOutputStream *stream)
{
GTask *task;
+ PurpleQueuedOutputStreamPrivate *priv = NULL;
g_return_if_fail(PURPLE_IS_QUEUED_OUTPUT_STREAM(stream));
- while ((task = g_async_queue_try_pop(stream->priv->queue)) != NULL) {
+ priv = purple_queued_output_stream_get_instance_private(stream);
+
+ while ((task = g_async_queue_try_pop(priv->queue)) != NULL) {
g_task_return_new_error(task, G_IO_ERROR, G_IO_ERROR_CANCELLED,
"PurpleQueuedOutputStream queue cleared");
g_object_unref(task);