summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexander Larsson <alexl@src.gnome.org>2007-09-13 10:34:09 +0000
committerAlexander Larsson <alexl@src.gnome.org>2007-09-13 10:34:09 +0000
commit824891203199e3dd493846c94374020f422422db (patch)
tree1b13408d07d9f5ec38896e1ec2e71bc41e89a9a4
parent5d528aa6ebe7d9a4b0b669e9654729654ee209db (diff)
downloadgvfs-824891203199e3dd493846c94374020f422422db.tar.gz
Rename GVfsReadStream to GVfsReadChannel
Make GVfsJobDBus class to share code Make GVfsJob backend be construct property Implement daemon-side support for dbus call cancellation Finish dbus call cancellation client-side Original git commit by Alexander Larsson <alex@greebo.(none)> at 1165257484 +0100 svn path=/trunk/; revision=217
-rw-r--r--TODO4
-rw-r--r--daemon/Makefile.am3
-rw-r--r--daemon/gvfsbackendtest.c24
-rw-r--r--daemon/gvfsdaemon.c70
-rw-r--r--daemon/gvfsjob.c73
-rw-r--r--daemon/gvfsjob.h3
-rw-r--r--daemon/gvfsjobcloseread.c18
-rw-r--r--daemon/gvfsjobcloseread.h6
-rw-r--r--daemon/gvfsjobopenforread.c83
-rw-r--r--daemon/gvfsjobopenforread.h29
-rw-r--r--daemon/gvfsjobread.c22
-rw-r--r--daemon/gvfsjobread.h6
-rw-r--r--daemon/gvfsjobseekread.c20
-rw-r--r--daemon/gvfsjobseekread.h6
-rw-r--r--daemon/gvfsreadchannel.c552
-rw-r--r--daemon/gvfsreadchannel.h63
-rw-r--r--daemon/gvfsreadstream.c552
-rw-r--r--daemon/gvfsreadstream.h63
-rw-r--r--gvfs/gvfsdaemondbus.c30
-rw-r--r--gvfs/test.c34
20 files changed, 899 insertions, 762 deletions
diff --git a/TODO b/TODO
index efbe01e7..7e13f45a 100644
--- a/TODO
+++ b/TODO
@@ -29,10 +29,10 @@ implement seeks in local streams
pass cancellable to async callbacks?
-handle Cancel signal on daemon side
-
rename glib lib from gvfs to gio (since it has more than gvfs)
+G_TYPE_VFS... -> G_VFS_TYPE...
+
FUTURE IDEAS:
-------------
diff --git a/daemon/Makefile.am b/daemon/Makefile.am
index 51748e5a..58c85066 100644
--- a/daemon/Makefile.am
+++ b/daemon/Makefile.am
@@ -19,9 +19,10 @@ gvfs_daemon_SOURCES = \
gvfsdaemon.c gvfsdaemon.h \
gvfsbackend.c gvfsbackend.h \
gvfsbackendtest.c gvfsbackendtest.h \
- gvfsreadstream.c gvfsreadstream.h \
+ gvfsreadchannel.c gvfsreadchannel.h \
gvfsdaemonutils.c gvfsdaemonutils.h \
gvfsjob.c gvfsjob.h \
+ gvfsjobdbus.c gvfsjobdbus.h \
gvfsjobopenforread.c gvfsjobopenforread.h \
gvfsjobread.c gvfsjobread.h \
gvfsjobseekread.c gvfsjobseekread.h \
diff --git a/daemon/gvfsbackendtest.c b/daemon/gvfsbackendtest.c
index 92b6ffc6..3ce5a6c9 100644
--- a/daemon/gvfsbackendtest.c
+++ b/daemon/gvfsbackendtest.c
@@ -50,6 +50,14 @@ open_idle_cb (gpointer data)
GVfsJobOpenForRead *job = data;
int fd;
+ if (g_vfs_job_is_cancelled (G_VFS_JOB (job)))
+ {
+ g_vfs_job_failed (G_VFS_JOB (job), G_VFS_ERROR,
+ G_VFS_ERROR_CANCELLED,
+ _("Operation was cancelled"));
+ return FALSE;
+ }
+
fd = g_open (job->filename, O_RDONLY);
if (fd == -1)
{
@@ -67,6 +75,19 @@ open_idle_cb (gpointer data)
return FALSE;
}
+static void
+open_read_cancelled_cb (GVfsJob *job, gpointer data)
+{
+ guint tag = GPOINTER_TO_INT (data);
+
+ g_print ("open_read_cancelled_cb\n");
+
+ if (g_source_remove (tag))
+ g_vfs_job_failed (job, G_VFS_ERROR,
+ G_VFS_ERROR_CANCELLED,
+ _("Operation was cancelled"));
+}
+
static gboolean
do_open_for_read (GVfsBackend *backend,
GVfsJobOpenForRead *job,
@@ -84,7 +105,8 @@ do_open_for_read (GVfsBackend *backend,
}
else
{
- g_idle_add (open_idle_cb, job);
+ guint tag = g_timeout_add (0, open_idle_cb, job);
+ g_signal_connect (job, "cancelled", (GCallback)open_read_cancelled_cb, GINT_TO_POINTER (tag));
return TRUE;
}
}
diff --git a/daemon/gvfsdaemon.c b/daemon/gvfsdaemon.c
index b5ba0ead..2faa364e 100644
--- a/daemon/gvfsdaemon.c
+++ b/daemon/gvfsdaemon.c
@@ -29,7 +29,7 @@ struct _GVfsDaemonPrivate
GQueue *pending_jobs;
GQueue *jobs; /* protected by lock */
guint queued_job_start; /* protected by lock */
- GList *read_streams; /* protected by lock */
+ GList *read_channels; /* protected by lock */
};
typedef struct {
@@ -236,7 +236,7 @@ queue_start_jobs_at_idle (GVfsDaemon *daemon)
}
static void
-handle_new_job_callback (GVfsReadStream *stream,
+handle_new_job_callback (GVfsReadChannel *channel,
GVfsJob *job,
GVfsDaemon *daemon)
{
@@ -246,14 +246,14 @@ handle_new_job_callback (GVfsReadStream *stream,
}
static void
-handle_read_stream_closed_callback (GVfsReadStream *stream,
- GVfsDaemon *daemon)
+handle_read_channel_closed_callback (GVfsReadChannel *channel,
+ GVfsDaemon *daemon)
{
g_mutex_lock (daemon->priv->lock);
- daemon->priv->read_streams = g_list_remove (daemon->priv->read_streams, stream);
- g_signal_handlers_disconnect_by_func (stream, (GCallback)handle_new_job_callback, daemon);
- g_object_unref (stream);
+ daemon->priv->read_channels = g_list_remove (daemon->priv->read_channels, channel);
+ g_signal_handlers_disconnect_by_func (channel, (GCallback)handle_new_job_callback, daemon);
+ g_object_unref (channel);
g_mutex_unlock (daemon->priv->lock);
}
@@ -272,17 +272,17 @@ job_finished_callback (GVfsJob *job,
if (G_IS_VFS_JOB_OPEN_FOR_READ (job))
{
GVfsJobOpenForRead *open_job = G_VFS_JOB_OPEN_FOR_READ (job);
- GVfsReadStream *stream;
+ GVfsReadChannel *channel;
- stream = g_vfs_job_open_for_read_steal_stream (open_job);
+ channel = g_vfs_job_open_for_read_steal_channel (open_job);
- if (stream)
+ if (channel)
{
- g_print ("Got new read stream %p for daemon %p\n", stream, daemon);
- daemon->priv->read_streams = g_list_append (daemon->priv->read_streams,
- stream);
- g_signal_connect (stream, "new_job", (GCallback)handle_new_job_callback, daemon);
- g_signal_connect (stream, "closed", (GCallback)handle_read_stream_closed_callback, daemon);
+ g_print ("Got new read channel %p for daemon %p\n", channel, daemon);
+ daemon->priv->read_channels = g_list_append (daemon->priv->read_channels,
+ channel);
+ g_signal_connect (channel, "new_job", (GCallback)handle_new_job_callback, daemon);
+ g_signal_connect (channel, "closed", (GCallback)handle_read_channel_closed_callback, daemon);
}
}
@@ -741,6 +741,46 @@ daemon_message_func (DBusConnection *conn,
return DBUS_HANDLER_RESULT_HANDLED;
}
+ if (dbus_message_is_method_call (message,
+ G_VFS_DBUS_DAEMON_INTERFACE,
+ G_VFS_DBUS_OP_CANCEL))
+ {
+ GList *l;
+ dbus_uint32_t serial;
+ GVfsJob *job_to_cancel = NULL;
+
+ g_print ("Got cancel dbus call\n");
+
+ if (dbus_message_get_args (message, NULL,
+ DBUS_TYPE_UINT32, &serial,
+ DBUS_TYPE_INVALID))
+ {
+ g_mutex_lock (daemon->priv->lock);
+ for (l = daemon->priv->jobs->head; l != NULL; l = l->next)
+ {
+ GVfsJob *job = l->data;
+
+ if (G_IS_VFS_JOB_DBUS (job) &&
+ g_vfs_job_dbus_is_serial (G_VFS_JOB_DBUS (job),
+ conn, serial))
+ {
+ job_to_cancel = g_object_ref (job);
+ break;
+ }
+ }
+ g_mutex_unlock (daemon->priv->lock);
+
+
+ if (job_to_cancel)
+ {
+ g_vfs_job_cancel (job_to_cancel);
+ g_object_unref (job_to_cancel);
+ }
+ }
+
+ return DBUS_HANDLER_RESULT_HANDLED;
+ }
+
backend = NULL;
dest = dbus_message_get_destination (message);
if (dest != NULL)
diff --git a/daemon/gvfsjob.c b/daemon/gvfsjob.c
index 00d7d07e..6207773b 100644
--- a/daemon/gvfsjob.c
+++ b/daemon/gvfsjob.c
@@ -9,9 +9,18 @@
#include <dbus/dbus.h>
#include <glib/gi18n.h>
#include "gvfsjob.h"
+#include "gvfsbackend.h"
G_DEFINE_TYPE (GVfsJob, g_vfs_job, G_TYPE_OBJECT);
+/* TODO: Real P_() */
+#define P_(_x) (_x)
+
+enum {
+ PROP_0,
+ PROP_BACKEND,
+};
+
enum {
CANCELLED,
SEND_REPLY,
@@ -21,6 +30,15 @@ enum {
static guint signals[LAST_SIGNAL] = { 0 };
+static void g_vfs_job_get_property (GObject *object,
+ guint prop_id,
+ GValue *value,
+ GParamSpec *pspec);
+static void g_vfs_job_set_property (GObject *object,
+ guint prop_id,
+ const GValue *value,
+ GParamSpec *pspec);
+
static void
g_vfs_job_finalize (GObject *object)
{
@@ -41,6 +59,8 @@ g_vfs_job_class_init (GVfsJobClass *klass)
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
gobject_class->finalize = g_vfs_job_finalize;
+ gobject_class->set_property = g_vfs_job_set_property;
+ gobject_class->get_property = g_vfs_job_get_property;
signals[CANCELLED] =
g_signal_new ("cancelled",
@@ -67,6 +87,14 @@ g_vfs_job_class_init (GVfsJobClass *klass)
g_cclosure_marshal_VOID__VOID,
G_TYPE_NONE, 0);
+ g_object_class_install_property (gobject_class,
+ PROP_BACKEND,
+ g_param_spec_object ("backend",
+ P_("VFS Backend"),
+ P_("The implementation for this job operartion."),
+ G_TYPE_VFS_BACKEND,
+ G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY |
+ G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
}
static void
@@ -74,11 +102,42 @@ g_vfs_job_init (GVfsJob *job)
{
}
-void
-g_vfs_job_set_backend (GVfsJob *job,
- GVfsBackend *backend)
+static void
+g_vfs_job_set_property (GObject *object,
+ guint prop_id,
+ const GValue *value,
+ GParamSpec *pspec)
+{
+ GVfsJob *job = G_VFS_JOB (object);
+
+ switch (prop_id)
+ {
+ case PROP_BACKEND:
+ job->backend = g_value_get_object (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+g_vfs_job_get_property (GObject *object,
+ guint prop_id,
+ GValue *value,
+ GParamSpec *pspec)
{
- job->backend = backend;
+ GVfsJob *job = G_VFS_JOB (object);
+
+ switch (prop_id)
+ {
+ case PROP_BACKEND:
+ g_value_set_object (value, job->backend);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
}
gboolean
@@ -158,6 +217,12 @@ g_vfs_job_is_finished (GVfsJob *job)
return job->finished;
}
+gboolean
+g_vfs_job_is_cancelled (GVfsJob *job)
+{
+ return job->cancelled;
+}
+
/* Might be called on an i/o thread */
void
g_vfs_job_emit_finished (GVfsJob *job)
diff --git a/daemon/gvfsjob.h b/daemon/gvfsjob.h
index 8d63a95f..b9cc4702 100644
--- a/daemon/gvfsjob.h
+++ b/daemon/gvfsjob.h
@@ -52,6 +52,7 @@ struct _GVfsJobClass
GType g_vfs_job_get_type (void) G_GNUC_CONST;
gboolean g_vfs_job_is_finished (GVfsJob *job);
+gboolean g_vfs_job_is_cancelled (GVfsJob *job);
void g_vfs_job_cancel (GVfsJob *job);
gboolean g_vfs_job_start (GVfsJob *job);
void g_vfs_job_emit_finished (GVfsJob *job);
@@ -63,8 +64,6 @@ void g_vfs_job_failed (GVfsJob *job,
void g_vfs_job_failed_from_error (GVfsJob *job,
GError *error);
void g_vfs_job_succeeded (GVfsJob *job);
-void g_vfs_job_set_backend (GVfsJob *job,
- GVfsBackend *backend);
G_END_DECLS
diff --git a/daemon/gvfsjobcloseread.c b/daemon/gvfsjobcloseread.c
index 19f600ef..28aeddc2 100644
--- a/daemon/gvfsjobcloseread.c
+++ b/daemon/gvfsjobcloseread.c
@@ -7,7 +7,7 @@
#include <glib.h>
#include <glib/gi18n.h>
-#include "gvfsreadstream.h"
+#include "gvfsreadchannel.h"
#include "gvfsjobcloseread.h"
#include "gvfsdaemonutils.h"
@@ -22,7 +22,7 @@ g_vfs_job_close_read_finalize (GObject *object)
GVfsJobCloseRead *job;
job = G_VFS_JOB_CLOSE_READ (object);
- g_object_unref (job->stream);
+ g_object_unref (job->channel);
if (G_OBJECT_CLASS (g_vfs_job_close_read_parent_class)->finalize)
(*G_OBJECT_CLASS (g_vfs_job_close_read_parent_class)->finalize) (object);
@@ -46,17 +46,17 @@ g_vfs_job_close_read_init (GVfsJobCloseRead *job)
}
GVfsJob *
-g_vfs_job_close_read_new (GVfsReadStream *stream,
+g_vfs_job_close_read_new (GVfsReadChannel *channel,
GVfsBackendHandle handle,
GVfsBackend *backend)
{
GVfsJobCloseRead *job;
- job = g_object_new (G_TYPE_VFS_JOB_CLOSE_READ, NULL);
+ job = g_object_new (G_TYPE_VFS_JOB_CLOSE_READ,
+ "backend", backend,
+ NULL);
- g_vfs_job_set_backend (G_VFS_JOB (job), backend);
-
- job->stream = g_object_ref (stream);
+ job->channel = g_object_ref (channel);
job->handle = handle;
return G_VFS_JOB (job);
@@ -71,9 +71,9 @@ send_reply (GVfsJob *job)
g_print ("job_close_read send reply\n");
if (job->failed)
- g_vfs_read_stream_send_error (op_job->stream, job->error);
+ g_vfs_read_channel_send_error (op_job->channel, job->error);
else
- g_vfs_read_stream_send_closed (op_job->stream);
+ g_vfs_read_channel_send_closed (op_job->channel);
}
static gboolean
diff --git a/daemon/gvfsjobcloseread.h b/daemon/gvfsjobcloseread.h
index c64be03b..5333c006 100644
--- a/daemon/gvfsjobcloseread.h
+++ b/daemon/gvfsjobcloseread.h
@@ -3,7 +3,7 @@
#include <gvfsjob.h>
#include <gvfsbackend.h>
-#include <gvfsreadstream.h>
+#include <gvfsreadchannel.h>
G_BEGIN_DECLS
@@ -21,7 +21,7 @@ struct _GVfsJobCloseRead
{
GVfsJob parent_instance;
- GVfsReadStream *stream;
+ GVfsReadChannel *channel;
GVfsBackendHandle handle;
};
@@ -32,7 +32,7 @@ struct _GVfsJobCloseReadClass
GType g_vfs_job_close_read_get_type (void) G_GNUC_CONST;
-GVfsJob *g_vfs_job_close_read_new (GVfsReadStream *stream,
+GVfsJob *g_vfs_job_close_read_new (GVfsReadChannel *channel,
GVfsBackendHandle handle,
GVfsBackend *backend);
diff --git a/daemon/gvfsjobopenforread.c b/daemon/gvfsjobopenforread.c
index 9a9c16ca..6eec8d9f 100644
--- a/daemon/gvfsjobopenforread.c
+++ b/daemon/gvfsjobopenforread.c
@@ -8,14 +8,16 @@
#include <glib.h>
#include <dbus/dbus.h>
#include <glib/gi18n.h>
-#include "gvfsreadstream.h"
+#include "gvfsreadchannel.h"
#include "gvfsjobopenforread.h"
#include "gvfsdaemonutils.h"
-G_DEFINE_TYPE (GVfsJobOpenForRead, g_vfs_job_open_for_read, G_TYPE_VFS_JOB);
+G_DEFINE_TYPE (GVfsJobOpenForRead, g_vfs_job_open_for_read, G_TYPE_VFS_JOB_DBUS);
-static gboolean start (GVfsJob *job);
-static void send_reply (GVfsJob *job);
+static gboolean start (GVfsJob *job);
+static DBusMessage *create_reply (GVfsJob *job,
+ DBusConnection *connection,
+ DBusMessage *message);
static void
g_vfs_job_open_for_read_finalize (GObject *object)
@@ -24,16 +26,10 @@ g_vfs_job_open_for_read_finalize (GObject *object)
job = G_VFS_JOB_OPEN_FOR_READ (object);
- if (job->message)
- dbus_message_unref (job->message);
+ /* TODO: manage backend_handle if not put in read channel */
- if (job->connection)
- dbus_connection_unref (job->connection);
-
- /* TODO: manage backend_handle if not put in readstream */
-
- if (job->read_stream)
- g_object_unref (job->read_stream);
+ if (job->read_channel)
+ g_object_unref (job->read_channel);
g_free (job->filename);
@@ -46,11 +42,11 @@ g_vfs_job_open_for_read_class_init (GVfsJobOpenForReadClass *klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GVfsJobClass *job_class = G_VFS_JOB_CLASS (klass);
+ GVfsJobDBusClass *job_dbus_class = G_VFS_JOB_DBUS_CLASS (klass);
gobject_class->finalize = g_vfs_job_open_for_read_finalize;
-
job_class->start = start;
- job_class->send_reply = send_reply;
+ job_dbus_class->create_reply = create_reply;
}
static void
@@ -84,12 +80,12 @@ g_vfs_job_open_for_read_new (DBusConnection *connection,
return NULL;
}
- job = g_object_new (G_TYPE_VFS_JOB_OPEN_FOR_READ, NULL);
+ job = g_object_new (G_TYPE_VFS_JOB_OPEN_FOR_READ,
+ "backend", backend,
+ "message", message,
+ "connection", connection,
+ NULL);
- g_vfs_job_set_backend (G_VFS_JOB (job), backend);
-
- job->connection = dbus_connection_ref (connection);
- job->message = dbus_message_ref (message);
job->filename = g_strndup (path_data, path_len);
return G_VFS_JOB (job);
@@ -126,7 +122,7 @@ create_reply (GVfsJob *job,
DBusMessage *message)
{
GVfsJobOpenForRead *open_job = G_VFS_JOB_OPEN_FOR_READ (job);
- GVfsReadStream *stream;
+ GVfsReadChannel *channel;
DBusMessage *reply;
GError *error;
int remote_fd;
@@ -137,15 +133,15 @@ create_reply (GVfsJob *job,
g_assert (open_job->backend_handle != NULL);
error = NULL;
- stream = g_vfs_read_stream_new (job->backend, &error);
- if (stream == NULL)
+ channel = g_vfs_read_channel_new (job->backend, &error);
+ if (channel == NULL)
{
reply = dbus_message_new_error_from_gerror (message, error);
g_error_free (error);
return reply;
}
- remote_fd = g_vfs_read_stream_steal_remote_fd (stream);
+ remote_fd = g_vfs_read_channel_steal_remote_fd (channel);
if (!dbus_connection_send_fd (connection,
remote_fd,
&fd_id, &error))
@@ -153,7 +149,7 @@ create_reply (GVfsJob *job,
close (remote_fd);
reply = dbus_message_new_error_from_gerror (message, error);
g_error_free (error);
- g_object_unref (stream);
+ g_object_unref (channel);
return reply;
}
close (remote_fd);
@@ -165,42 +161,21 @@ create_reply (GVfsJob *job,
DBUS_TYPE_BOOLEAN, &can_seek,
DBUS_TYPE_INVALID);
- g_vfs_read_stream_set_backend_handle (stream, open_job->backend_handle);
+ g_vfs_read_channel_set_backend_handle (channel, open_job->backend_handle);
open_job->backend_handle = NULL;
- open_job->read_stream = stream;
+ open_job->read_channel = channel;
return reply;
}
-/* Might be called on an i/o thread */
-static void
-send_reply (GVfsJob *job)
-{
- GVfsJobOpenForRead *open_job = G_VFS_JOB_OPEN_FOR_READ (job);
- DBusMessage *reply;
-
- if (job->failed)
- reply = dbus_message_new_error_from_gerror (open_job->message, job->error);
- else
- reply = create_reply (job, open_job->connection, open_job->message);
-
- g_assert (reply != NULL);
-
- /* Queues reply (threadsafely), actually sends it in mainloop */
- dbus_connection_send (open_job->connection, reply, NULL);
- dbus_message_unref (reply);
-
- g_vfs_job_emit_finished (job);
-}
-
-GVfsReadStream *
-g_vfs_job_open_for_read_steal_stream (GVfsJobOpenForRead *job)
+GVfsReadChannel *
+g_vfs_job_open_for_read_steal_channel (GVfsJobOpenForRead *job)
{
- GVfsReadStream *stream;
+ GVfsReadChannel *channel;
- stream = job->read_stream;
- job->read_stream = NULL;
+ channel = job->read_channel;
+ job->read_channel = NULL;
- return stream;
+ return channel;
}
diff --git a/daemon/gvfsjobopenforread.h b/daemon/gvfsjobopenforread.h
index c000446e..b301bb5c 100644
--- a/daemon/gvfsjobopenforread.h
+++ b/daemon/gvfsjobopenforread.h
@@ -2,10 +2,9 @@
#define __G_VFS_JOB_OPEN_FOR_READ_H__
#include <dbus/dbus.h>
-#include <gvfsjob.h>
+#include <gvfsjobdbus.h>
#include <gvfsbackend.h>
-#include <gvfsreadstream.h>
-
+#include <gvfsreadchannel.h>
G_BEGIN_DECLS
@@ -21,31 +20,29 @@ typedef struct _GVfsJobOpenForReadClass GVfsJobOpenForReadClass;
struct _GVfsJobOpenForRead
{
- GVfsJob parent_instance;
+ GVfsJobDBus parent_instance;
- DBusConnection *connection;
- DBusMessage *message;
char *filename;
GVfsBackendHandle backend_handle;
gboolean can_seek;
- GVfsReadStream *read_stream;
+ GVfsReadChannel *read_channel;
};
struct _GVfsJobOpenForReadClass
{
- GVfsJobClass parent_class;
+ GVfsJobDBusClass parent_class;
};
GType g_vfs_job_open_for_read_get_type (void) G_GNUC_CONST;
-GVfsJob * g_vfs_job_open_for_read_new (DBusConnection *connection,
- DBusMessage *message,
- GVfsBackend *backend);
-void g_vfs_job_open_for_read_set_handle (GVfsJobOpenForRead *job,
- GVfsBackendHandle handle);
-void g_vfs_job_open_for_read_set_can_seek (GVfsJobOpenForRead *job,
- gboolean can_seek);
-GVfsReadStream *g_vfs_job_open_for_read_steal_stream (GVfsJobOpenForRead *job);
+GVfsJob * g_vfs_job_open_for_read_new (DBusConnection *connection,
+ DBusMessage *message,
+ GVfsBackend *backend);
+void g_vfs_job_open_for_read_set_handle (GVfsJobOpenForRead *job,
+ GVfsBackendHandle handle);
+void g_vfs_job_open_for_read_set_can_seek (GVfsJobOpenForRead *job,
+ gboolean can_seek);
+GVfsReadChannel *g_vfs_job_open_for_read_steal_channel (GVfsJobOpenForRead *job);
G_END_DECLS
diff --git a/daemon/gvfsjobread.c b/daemon/gvfsjobread.c
index 6b5fdcce..84cbe277 100644
--- a/daemon/gvfsjobread.c
+++ b/daemon/gvfsjobread.c
@@ -7,7 +7,7 @@
#include <glib.h>
#include <glib/gi18n.h>
-#include "gvfsreadstream.h"
+#include "gvfsreadchannel.h"
#include "gvfsjobread.h"
#include "gvfsdaemonutils.h"
@@ -23,7 +23,7 @@ g_vfs_job_read_finalize (GObject *object)
job = G_VFS_JOB_READ (object);
- g_object_unref (job->stream);
+ g_object_unref (job->channel);
g_free (job->buffer);
if (G_OBJECT_CLASS (g_vfs_job_read_parent_class)->finalize)
@@ -48,18 +48,18 @@ g_vfs_job_read_init (GVfsJobRead *job)
}
GVfsJob *
-g_vfs_job_read_new (GVfsReadStream *stream,
+g_vfs_job_read_new (GVfsReadChannel *channel,
GVfsBackendHandle handle,
gsize bytes_requested,
GVfsBackend *backend)
{
GVfsJobRead *job;
- job = g_object_new (G_TYPE_VFS_JOB_READ, NULL);
-
- g_vfs_job_set_backend (G_VFS_JOB (job), backend);
+ job = g_object_new (G_TYPE_VFS_JOB_READ,
+ "backend", backend,
+ NULL);
- job->stream = g_object_ref (stream);
+ job->channel = g_object_ref (channel);
job->handle = handle;
job->buffer = g_malloc (bytes_requested);
job->bytes_requested = bytes_requested;
@@ -75,12 +75,12 @@ send_reply (GVfsJob *job)
g_print ("job_read send reply, %d bytes\n", op_job->data_count);
if (job->failed)
- g_vfs_read_stream_send_error (op_job->stream, job->error);
+ g_vfs_read_channel_send_error (op_job->channel, job->error);
else
{
- g_vfs_read_stream_send_data (op_job->stream,
- op_job->buffer,
- op_job->data_count);
+ g_vfs_read_channel_send_data (op_job->channel,
+ op_job->buffer,
+ op_job->data_count);
}
}
diff --git a/daemon/gvfsjobread.h b/daemon/gvfsjobread.h
index 9e40b6b9..f7cda3e8 100644
--- a/daemon/gvfsjobread.h
+++ b/daemon/gvfsjobread.h
@@ -3,7 +3,7 @@
#include <gvfsjob.h>
#include <gvfsbackend.h>
-#include <gvfsreadstream.h>
+#include <gvfsreadchannel.h>
G_BEGIN_DECLS
@@ -21,7 +21,7 @@ struct _GVfsJobRead
{
GVfsJob parent_instance;
- GVfsReadStream *stream;
+ GVfsReadChannel *channel;
GVfsBackendHandle handle;
gsize bytes_requested;
char *buffer;
@@ -35,7 +35,7 @@ struct _GVfsJobReadClass
GType g_vfs_job_read_get_type (void) G_GNUC_CONST;
-GVfsJob *g_vfs_job_read_new (GVfsReadStream *stream,
+GVfsJob *g_vfs_job_read_new (GVfsReadChannel *channel,
GVfsBackendHandle handle,
gsize bytes_requested,
GVfsBackend *backend);
diff --git a/daemon/gvfsjobseekread.c b/daemon/gvfsjobseekread.c
index c0ee0876..d3678b31 100644
--- a/daemon/gvfsjobseekread.c
+++ b/daemon/gvfsjobseekread.c
@@ -7,7 +7,7 @@
#include <glib.h>
#include <glib/gi18n.h>
-#include "gvfsreadstream.h"
+#include "gvfsreadchannel.h"
#include "gvfsjobseekread.h"
#include "gvfsdaemonutils.h"
@@ -22,7 +22,7 @@ g_vfs_job_seek_read_finalize (GObject *object)
GVfsJobSeekRead *job;
job = G_VFS_JOB_SEEK_READ (object);
- g_object_unref (job->stream);
+ g_object_unref (job->channel);
if (G_OBJECT_CLASS (g_vfs_job_seek_read_parent_class)->finalize)
(*G_OBJECT_CLASS (g_vfs_job_seek_read_parent_class)->finalize) (object);
@@ -46,7 +46,7 @@ g_vfs_job_seek_read_init (GVfsJobSeekRead *job)
}
GVfsJob *
-g_vfs_job_seek_read_new (GVfsReadStream *stream,
+g_vfs_job_seek_read_new (GVfsReadChannel *channel,
GVfsBackendHandle handle,
GSeekType seek_type,
goffset offset,
@@ -54,11 +54,11 @@ g_vfs_job_seek_read_new (GVfsReadStream *stream,
{
GVfsJobSeekRead *job;
- job = g_object_new (G_TYPE_VFS_JOB_SEEK_READ, NULL);
-
- g_vfs_job_set_backend (G_VFS_JOB (job), backend);
+ job = g_object_new (G_TYPE_VFS_JOB_SEEK_READ,
+ "backend", backend,
+ NULL);
- job->stream = g_object_ref (stream);
+ job->channel = g_object_ref (channel);
job->handle = handle;
job->requested_offset = offset;
job->seek_type = seek_type;
@@ -75,11 +75,11 @@ send_reply (GVfsJob *job)
g_print ("job_seek_read send reply, pos %d\n", (int)op_job->final_offset);
if (job->failed)
- g_vfs_read_stream_send_error (op_job->stream, job->error);
+ g_vfs_read_channel_send_error (op_job->channel, job->error);
else
{
- g_vfs_read_stream_send_seek_offset (op_job->stream,
- op_job->final_offset);
+ g_vfs_read_channel_send_seek_offset (op_job->channel,
+ op_job->final_offset);
}
}
diff --git a/daemon/gvfsjobseekread.h b/daemon/gvfsjobseekread.h
index 177c8769..be11be53 100644
--- a/daemon/gvfsjobseekread.h
+++ b/daemon/gvfsjobseekread.h
@@ -3,7 +3,7 @@
#include <gvfsjob.h>
#include <gvfsbackend.h>
-#include <gvfsreadstream.h>
+#include <gvfsreadchannel.h>
G_BEGIN_DECLS
@@ -21,7 +21,7 @@ struct _GVfsJobSeekRead
{
GVfsJob parent_instance;
- GVfsReadStream *stream;
+ GVfsReadChannel *channel;
GVfsBackendHandle handle;
GSeekType seek_type;
goffset requested_offset;
@@ -35,7 +35,7 @@ struct _GVfsJobSeekReadClass
GType g_vfs_job_seek_read_get_type (void) G_GNUC_CONST;
-GVfsJob *g_vfs_job_seek_read_new (GVfsReadStream *stream,
+GVfsJob *g_vfs_job_seek_read_new (GVfsReadChannel *channel,
GVfsBackendHandle handle,
GSeekType seek_type,
goffset offset,
diff --git a/daemon/gvfsreadchannel.c b/daemon/gvfsreadchannel.c
new file mode 100644
index 00000000..54a0d2c5
--- /dev/null
+++ b/daemon/gvfsreadchannel.c
@@ -0,0 +1,552 @@
+#include <config.h>
+
+#include <unistd.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <unistd.h>
+#include <fcntl.h>
+
+#include <glib.h>
+#include <glib-object.h>
+#include <glib/gi18n.h>
+#include <dbus-gmain.h>
+#include <gvfsreadchannel.h>
+#include <gvfs/ginputstreamsocket.h>
+#include <gvfs/goutputstreamsocket.h>
+#include <gvfsdaemonprotocol.h>
+#include <gvfsdaemonutils.h>
+#include <gvfsjobread.h>
+#include <gvfsjobseekread.h>
+#include <gvfsjobcloseread.h>
+
+G_DEFINE_TYPE (GVfsReadChannel, g_vfs_read_channel, G_TYPE_OBJECT);
+
+enum {
+ PROP_0,
+};
+
+enum {
+ NEW_JOB,
+ CLOSED,
+ LAST_SIGNAL
+};
+
+static guint signals[LAST_SIGNAL] = { 0 };
+
+typedef struct
+{
+ GVfsReadChannel *read_channel;
+ GInputStream *command_stream;
+ char buffer[G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE];
+ int buffer_size;
+} RequestReader;
+
+struct _GVfsReadChannelPrivate
+{
+ GVfsBackend *backend;
+ gboolean connection_closed;
+ GInputStream *command_stream;
+ GOutputStream *reply_stream;
+ int remote_fd;
+ int seek_generation;
+
+ GVfsBackendHandle backend_handle;
+ GVfsJob *current_job;
+ guint32 current_job_seq_nr;
+
+ RequestReader *request_reader;
+
+ char reply_buffer[G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE];
+ int reply_buffer_pos;
+
+ char *output_data; /* Owned by job */
+ gsize output_data_size;
+ gsize output_data_pos;
+};
+
+static void start_request_reader (GVfsReadChannel *channel);
+
+static void
+g_vfs_read_channel_finalize (GObject *object)
+{
+ GVfsReadChannel *read_channel;
+
+ read_channel = G_VFS_READ_CHANNEL (object);
+
+ if (read_channel->priv->current_job)
+ g_object_unref (read_channel->priv->current_job);
+ read_channel->priv->current_job = NULL;
+
+ if (read_channel->priv->reply_stream)
+ g_object_unref (read_channel->priv->reply_stream);
+ read_channel->priv->reply_stream = NULL;
+
+ if (read_channel->priv->request_reader)
+ read_channel->priv->request_reader->read_channel = NULL;
+ read_channel->priv->request_reader = NULL;
+
+ if (read_channel->priv->command_stream)
+ g_object_unref (read_channel->priv->command_stream);
+ read_channel->priv->command_stream = NULL;
+
+ if (read_channel->priv->remote_fd != -1)
+ close (read_channel->priv->remote_fd);
+
+ g_assert (read_channel->priv->backend_handle == NULL);
+
+ if (G_OBJECT_CLASS (g_vfs_read_channel_parent_class)->finalize)
+ (*G_OBJECT_CLASS (g_vfs_read_channel_parent_class)->finalize) (object);
+}
+
+static void
+g_vfs_read_channel_class_init (GVfsReadChannelClass *klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+
+ g_type_class_add_private (klass, sizeof (GVfsReadChannelPrivate));
+
+ gobject_class->finalize = g_vfs_read_channel_finalize;
+
+ signals[NEW_JOB] =
+ g_signal_new ("new_job",
+ G_TYPE_FROM_CLASS (gobject_class),
+ G_SIGNAL_RUN_LAST,
+ G_STRUCT_OFFSET (GVfsReadChannelClass, new_job),
+ NULL, NULL,
+ g_cclosure_marshal_VOID__POINTER,
+ G_TYPE_NONE, 1, G_TYPE_VFS_JOB);
+
+ signals[CLOSED] =
+ g_signal_new ("closed",
+ G_TYPE_FROM_CLASS (gobject_class),
+ G_SIGNAL_RUN_LAST,
+ G_STRUCT_OFFSET (GVfsReadChannelClass, closed),
+ NULL, NULL,
+ g_cclosure_marshal_VOID__VOID,
+ G_TYPE_NONE, 0);
+
+}
+
+static void
+g_vfs_read_channel_init (GVfsReadChannel *channel)
+{
+ channel->priv = G_TYPE_INSTANCE_GET_PRIVATE (channel,
+ G_TYPE_VFS_READ_CHANNEL,
+ GVfsReadChannelPrivate);
+ channel->priv->remote_fd = -1;
+}
+
+static void
+g_vfs_read_channel_connection_closed (GVfsReadChannel *channel)
+{
+ if (channel->priv->connection_closed)
+ return;
+ channel->priv->connection_closed = TRUE;
+
+ if (channel->priv->current_job == NULL &&
+ channel->priv->backend_handle != NULL)
+ {
+ channel->priv->current_job = g_vfs_job_close_read_new (channel, channel->priv->backend_handle, channel->priv->backend);
+ channel->priv->current_job_seq_nr = 0;
+ g_signal_emit (channel, signals[NEW_JOB], 0, channel->priv->current_job);
+ }
+ /* Otherwise we'll close when current_job is finished */
+}
+
+static void
+got_command (GVfsReadChannel *channel,
+ guint32 command,
+ guint32 seq_nr,
+ guint32 arg1,
+ guint32 arg2)
+{
+ GVfsJob *job;
+ GError *error;
+ GSeekType seek_type;
+
+ g_print ("got_command %d %d %d %d\n", command, seq_nr, arg1, arg2);
+
+ if (channel->priv->current_job != NULL)
+ {
+ if (command != G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL)
+ {
+ g_warning ("Ignored non-cancel request with outstanding request");
+ return;
+ }
+
+ if (arg1 == channel->priv->current_job_seq_nr)
+ g_vfs_job_cancel (channel->priv->current_job);
+ return;
+ }
+
+ job = NULL;
+ switch (command)
+ {
+ case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_READ:
+ job = g_vfs_job_read_new (channel,
+ channel->priv->backend_handle,
+ arg1,
+ channel->priv->backend);
+ break;
+ case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CLOSE:
+ job = g_vfs_job_close_read_new (channel,
+ channel->priv->backend_handle,
+ channel->priv->backend);
+ break;
+ case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_CUR:
+ case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_END:
+ case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_SET:
+ seek_type = G_SEEK_SET;
+ if (command == G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_END)
+ seek_type = G_SEEK_END;
+ else if (command == G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_CUR)
+ seek_type = G_SEEK_CUR;
+
+ channel->priv->seek_generation++;
+ job = g_vfs_job_seek_read_new (channel,
+ channel->priv->backend_handle,
+ seek_type,
+ ((goffset)arg1) | (((goffset)arg2) << 32),
+ channel->priv->backend);
+ break;
+
+ case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL:
+ /* Ignore cancel with no outstanding job */
+ break;
+
+ default:
+ error = NULL;
+ g_set_error (&error, G_VFS_ERROR,
+ G_VFS_ERROR_INTERNAL_ERROR,
+ "Unknown stream command %d\n", command);
+ g_vfs_read_channel_send_error (channel, error);
+ g_error_free (error);
+ break;
+ }
+
+ if (job)
+ {
+ channel->priv->current_job = job;
+ channel->priv->current_job_seq_nr = seq_nr;
+ g_signal_emit (channel, signals[NEW_JOB], 0, job);
+ }
+}
+
+static void
+command_read_cb (GInputStream *input_stream,
+ void *buffer,
+ gsize count_requested,
+ gssize count_read,
+ gpointer data,
+ GError *error)
+{
+ RequestReader *reader = data;
+ GVfsDaemonSocketProtocolRequest *cmd;
+ guint32 seq_nr;
+ guint32 command;
+ guint32 arg1, arg2;
+
+ if (reader->read_channel == NULL)
+ {
+ /* ReadChannel was finalized */
+ g_object_unref (reader->command_stream);
+ g_free (reader);
+ return;
+ }
+
+ if (count_read <= 0)
+ {
+ reader->read_channel->priv->request_reader = NULL;
+ g_vfs_read_channel_connection_closed (reader->read_channel);
+ g_object_unref (reader->command_stream);
+ g_free (reader);
+ return;
+ }
+
+ reader->buffer_size += count_read;
+
+ if (reader->buffer_size < G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE)
+ {
+ g_input_stream_read_async (reader->command_stream,
+ reader->buffer + reader->buffer_size,
+ G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE - reader->buffer_size,
+ 0,
+ command_read_cb,
+ reader,
+ NULL);
+ return;
+ }
+
+ cmd = (GVfsDaemonSocketProtocolRequest *)reader->buffer;
+ command = g_ntohl (cmd->command);
+ arg1 = g_ntohl (cmd->arg1);
+ arg2 = g_ntohl (cmd->arg2);
+ seq_nr = g_ntohl (cmd->seq_nr);
+ reader->buffer_size = 0;
+
+ got_command (reader->read_channel, command, seq_nr, arg1, arg2);
+
+ /* Request more commands, so can get cancel requests */
+
+ reader->buffer_size = 0;
+ g_input_stream_read_async (reader->command_stream,
+ reader->buffer + reader->buffer_size,
+ G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE - reader->buffer_size,
+ 0,
+ command_read_cb,
+ reader,
+ NULL);
+}
+
+static void
+start_request_reader (GVfsReadChannel *channel)
+{
+ RequestReader *reader;
+
+ reader = g_new0 (RequestReader, 1);
+ reader->read_channel = channel;
+ reader->command_stream = g_object_ref (channel->priv->command_stream);
+ reader->buffer_size = 0;
+
+ g_input_stream_read_async (reader->command_stream,
+ reader->buffer + reader->buffer_size,
+ G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE - reader->buffer_size,
+ 0,
+ command_read_cb,
+ reader,
+ NULL);
+
+ channel->priv->request_reader = reader;
+}
+
+
+static void
+send_reply_cb (GOutputStream *output_stream,
+ void *buffer,
+ gsize bytes_requested,
+ gssize bytes_written,
+ gpointer data,
+ GError *error)
+{
+ GVfsReadChannel *channel = data;
+ GVfsJob *job;
+
+ g_print ("send_reply_cb: %d\n", bytes_written);
+
+ if (bytes_written <= 0)
+ {
+ g_vfs_read_channel_connection_closed (channel);
+ goto error_out;
+ }
+
+ if (channel->priv->reply_buffer_pos < G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE)
+ {
+ channel->priv->reply_buffer_pos += bytes_written;
+
+ /* Write more of reply header if needed */
+ if (channel->priv->reply_buffer_pos < G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE)
+ {
+ g_output_stream_write_async (channel->priv->reply_stream,
+ channel->priv->reply_buffer + channel->priv->reply_buffer_pos,
+ G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE - channel->priv->reply_buffer_pos,
+ 0,
+ send_reply_cb, channel,
+ NULL);
+ return;
+ }
+ bytes_written = 0;
+ }
+
+ channel->priv->output_data_pos += bytes_written;
+
+ /* Write more of output_data if needed */
+ if (channel->priv->output_data != NULL &&
+ channel->priv->output_data_pos < channel->priv->output_data_size)
+ {
+ g_output_stream_write_async (channel->priv->reply_stream,
+ channel->priv->output_data + channel->priv->output_data_pos,
+ channel->priv->output_data_size - channel->priv->output_data_pos,
+ 0,
+ send_reply_cb, channel,
+ NULL);
+ return;
+ }
+
+ error_out:
+
+ /* Sent full reply */
+ channel->priv->output_data = NULL;
+
+ job = channel->priv->current_job;
+ channel->priv->current_job = NULL;
+ g_vfs_job_emit_finished (job);
+
+ if (G_IS_VFS_JOB_CLOSE_READ (job))
+ {
+ g_signal_emit (channel, signals[CLOSED], 0);
+ channel->priv->backend_handle = NULL;
+ }
+ else if (channel->priv->connection_closed)
+ {
+ channel->priv->current_job = g_vfs_job_close_read_new (channel, channel->priv->backend_handle,
+ channel->priv->backend);
+ channel->priv->current_job_seq_nr = 0;
+ g_signal_emit (channel, signals[NEW_JOB], 0, channel->priv->current_job);
+ }
+
+ g_object_unref (job);
+ g_print ("Sent reply\n");
+}
+
+/* Might be called on an i/o thread */
+static void
+send_reply (GVfsReadChannel *channel,
+ gboolean use_header,
+ char *data,
+ gsize data_len)
+{
+
+ channel->priv->output_data = data;
+ channel->priv->output_data_size = data_len;
+ channel->priv->output_data_pos = 0;
+
+ if (use_header)
+ {
+ channel->priv->reply_buffer_pos = 0;
+
+ g_output_stream_write_async (channel->priv->reply_stream,
+ channel->priv->reply_buffer,
+ G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE,
+ 0,
+ send_reply_cb, channel,
+ NULL);
+ }
+ else
+ {
+ channel->priv->reply_buffer_pos = G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE;
+
+ g_output_stream_write_async (channel->priv->reply_stream,
+ channel->priv->output_data,
+ channel->priv->output_data_size,
+ 0,
+ send_reply_cb, channel,
+ NULL);
+ }
+}
+
+/* Might be called on an i/o thread
+ */
+void
+g_vfs_read_channel_send_error (GVfsReadChannel *read_channel,
+ GError *error)
+{
+ char *data;
+ gsize data_len;
+
+ data = g_error_to_daemon_reply (error, read_channel->priv->current_job_seq_nr, &data_len);
+ send_reply (read_channel, FALSE, data, data_len);
+}
+
+
+/* Might be called on an i/o thread
+ */
+void
+g_vfs_read_channel_send_seek_offset (GVfsReadChannel *read_channel,
+ goffset offset)
+{
+ GVfsDaemonSocketProtocolReply *reply;
+
+ reply = (GVfsDaemonSocketProtocolReply *)read_channel->priv->reply_buffer;
+ reply->type = g_htonl (G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SEEK_POS);
+ reply->seq_nr = g_htonl (read_channel->priv->current_job_seq_nr);
+ reply->arg1 = g_htonl (offset & 0xffffffff);
+ reply->arg2 = g_htonl (offset >> 32);
+
+ send_reply (read_channel, TRUE, NULL, 0);
+}
+
+/* Might be called on an i/o thread
+ */
+void
+g_vfs_read_channel_send_closed (GVfsReadChannel *read_channel)
+{
+ GVfsDaemonSocketProtocolReply *reply;
+
+ reply = (GVfsDaemonSocketProtocolReply *)read_channel->priv->reply_buffer;
+ reply->type = g_htonl (G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_CLOSED);
+ reply->seq_nr = g_htonl (read_channel->priv->current_job_seq_nr);
+ reply->arg1 = g_htonl (0);
+ reply->arg2 = g_htonl (0);
+
+ send_reply (read_channel, TRUE, NULL, 0);
+}
+
+/* Might be called on an i/o thread
+ */
+void
+g_vfs_read_channel_send_data (GVfsReadChannel *read_channel,
+ char *buffer,
+ gsize count)
+{
+ GVfsDaemonSocketProtocolReply *reply;
+
+ reply = (GVfsDaemonSocketProtocolReply *)read_channel->priv->reply_buffer;
+ reply->type = g_htonl (G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_DATA);
+ reply->seq_nr = g_htonl (read_channel->priv->current_job_seq_nr);
+ reply->arg1 = g_htonl (count);
+ reply->arg2 = g_htonl (read_channel->priv->seek_generation);
+
+ send_reply (read_channel, TRUE, buffer, count);
+}
+
+GVfsReadChannel *
+g_vfs_read_channel_new (GVfsBackend *backend,
+ GError **error)
+{
+ GVfsReadChannel *channel;
+ int socket_fds[2];
+ int ret;
+
+ ret = socketpair (AF_UNIX, SOCK_STREAM, 0, socket_fds);
+ if (ret == -1)
+ {
+ g_set_error (error, G_FILE_ERROR,
+ g_file_error_from_errno (errno),
+ _("Error creating socket pair"));
+ return NULL;
+ }
+
+ channel = g_object_new (G_TYPE_VFS_READ_CHANNEL, NULL);
+ channel->priv->backend = backend;
+ channel->priv->command_stream = g_input_stream_socket_new (socket_fds[0], TRUE);
+ channel->priv->reply_stream = g_output_stream_socket_new (socket_fds[0], FALSE);
+ channel->priv->remote_fd = socket_fds[1];
+
+ start_request_reader (channel);
+
+ return channel;
+}
+
+int
+g_vfs_read_channel_steal_remote_fd (GVfsReadChannel *channel)
+{
+ int fd;
+ fd = channel->priv->remote_fd;
+ channel->priv->remote_fd = -1;
+ return fd;
+}
+
+GVfsBackend *
+g_vfs_read_channel_get_backend (GVfsReadChannel *read_channel)
+{
+ return read_channel->priv->backend;
+}
+
+void
+g_vfs_read_channel_set_backend_handle (GVfsReadChannel *read_channel,
+ GVfsBackendHandle backend_handle)
+{
+ read_channel->priv->backend_handle = backend_handle;
+}
diff --git a/daemon/gvfsreadchannel.h b/daemon/gvfsreadchannel.h
new file mode 100644
index 00000000..3ff59e1f
--- /dev/null
+++ b/daemon/gvfsreadchannel.h
@@ -0,0 +1,63 @@
+#ifndef __G_VFS_READ_CHANNEL_H__
+#define __G_VFS_READ_CHANNEL_H__
+
+#include <glib-object.h>
+#include <gvfsjob.h>
+#include <gvfsbackend.h>
+#include <gvfs/gvfstypes.h>
+
+G_BEGIN_DECLS
+
+#define G_TYPE_VFS_READ_CHANNEL (g_vfs_read_channel_get_type ())
+#define G_VFS_READ_CHANNEL(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), G_TYPE_VFS_READ_CHANNEL, GVfsReadChannel))
+#define G_VFS_READ_CHANNEL_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), G_TYPE_VFS_READ_CHANNEL, GVfsReadChannelClass))
+#define G_IS_VFS_READ_CHANNEL(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), G_TYPE_VFS_READ_CHANNEL))
+#define G_IS_VFS_READ_CHANNEL_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), G_TYPE_VFS_READ_CHANNEL))
+#define G_VFS_READ_CHANNEL_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), G_TYPE_VFS_READ_CHANNEL, GVfsReadChannelClass))
+
+typedef struct _GVfsReadChannel GVfsReadChannel;
+typedef struct _GVfsReadChannelClass GVfsReadChannelClass;
+typedef struct _GVfsReadChannelPrivate GVfsReadChannelPrivate;
+
+struct _GVfsReadChannel
+{
+ GObject parent_instance;
+
+ GVfsReadChannelPrivate *priv;
+};
+
+struct _GVfsReadChannelClass
+{
+ GObjectClass parent_class;
+
+ /* signals */
+
+ void (*new_job) (GVfsReadChannel *stream,
+ GVfsJob *job);
+ void (*closed) (GVfsReadChannel *stream);
+};
+
+GType g_vfs_read_channel_get_type (void) G_GNUC_CONST;
+
+GVfsReadChannel *g_vfs_read_channel_new (GVfsBackend *backend,
+ GError **error);
+int g_vfs_read_channel_steal_remote_fd (GVfsReadChannel *read_channel);
+GVfsBackend *g_vfs_read_channel_get_backend (GVfsReadChannel *read_channel);
+void g_vfs_read_channel_set_backend_handle (GVfsReadChannel *read_channel,
+ GVfsBackendHandle backend_handle);
+gboolean g_vfs_read_channel_has_job (GVfsReadChannel *read_channel);
+GVfsJob * g_vfs_read_channel_get_job (GVfsReadChannel *read_channel);
+void g_vfs_read_channel_send_data (GVfsReadChannel *read_channel,
+ char *buffer,
+ gsize count);
+void g_vfs_read_channel_send_error (GVfsReadChannel *read_channel,
+ GError *error);
+void g_vfs_read_channel_send_closed (GVfsReadChannel *read_channel);
+void g_vfs_read_channel_send_seek_offset (GVfsReadChannel *read_channel,
+ goffset offset);
+
+/* TODO: i/o priority? */
+
+G_END_DECLS
+
+#endif /* __G_VFS_READ_CHANNEL_H__ */
diff --git a/daemon/gvfsreadstream.c b/daemon/gvfsreadstream.c
deleted file mode 100644
index 41e9b3ea..00000000
--- a/daemon/gvfsreadstream.c
+++ /dev/null
@@ -1,552 +0,0 @@
-#include <config.h>
-
-#include <unistd.h>
-#include <errno.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <sys/un.h>
-#include <unistd.h>
-#include <fcntl.h>
-
-#include <glib.h>
-#include <glib-object.h>
-#include <glib/gi18n.h>
-#include <dbus-gmain.h>
-#include <gvfsreadstream.h>
-#include <gvfs/ginputstreamsocket.h>
-#include <gvfs/goutputstreamsocket.h>
-#include <gvfsdaemonprotocol.h>
-#include <gvfsdaemonutils.h>
-#include <gvfsjobread.h>
-#include <gvfsjobseekread.h>
-#include <gvfsjobcloseread.h>
-
-G_DEFINE_TYPE (GVfsReadStream, g_vfs_read_stream, G_TYPE_OBJECT);
-
-enum {
- PROP_0,
-};
-
-enum {
- NEW_JOB,
- CLOSED,
- LAST_SIGNAL
-};
-
-static guint signals[LAST_SIGNAL] = { 0 };
-
-typedef struct
-{
- GVfsReadStream *read_stream;
- GInputStream *command_stream;
- char buffer[G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE];
- int buffer_size;
-} RequestReader;
-
-struct _GVfsReadStreamPrivate
-{
- GVfsBackend *backend;
- gboolean connection_closed;
- GInputStream *command_stream;
- GOutputStream *reply_stream;
- int remote_fd;
- int seek_generation;
-
- GVfsBackendHandle backend_handle;
- GVfsJob *current_job;
- guint32 current_job_seq_nr;
-
- RequestReader *request_reader;
-
- char reply_buffer[G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE];
- int reply_buffer_pos;
-
- char *output_data; /* Owned by job */
- gsize output_data_size;
- gsize output_data_pos;
-};
-
-static void start_request_reader (GVfsReadStream *stream);
-
-static void
-g_vfs_read_stream_finalize (GObject *object)
-{
- GVfsReadStream *read_stream;
-
- read_stream = G_VFS_READ_STREAM (object);
-
- if (read_stream->priv->current_job)
- g_object_unref (read_stream->priv->current_job);
- read_stream->priv->current_job = NULL;
-
- if (read_stream->priv->reply_stream)
- g_object_unref (read_stream->priv->reply_stream);
- read_stream->priv->reply_stream = NULL;
-
- if (read_stream->priv->request_reader)
- read_stream->priv->request_reader->read_stream = NULL;
- read_stream->priv->request_reader = NULL;
-
- if (read_stream->priv->command_stream)
- g_object_unref (read_stream->priv->command_stream);
- read_stream->priv->command_stream = NULL;
-
- if (read_stream->priv->remote_fd != -1)
- close (read_stream->priv->remote_fd);
-
- g_assert (read_stream->priv->backend_handle == NULL);
-
- if (G_OBJECT_CLASS (g_vfs_read_stream_parent_class)->finalize)
- (*G_OBJECT_CLASS (g_vfs_read_stream_parent_class)->finalize) (object);
-}
-
-static void
-g_vfs_read_stream_class_init (GVfsReadStreamClass *klass)
-{
- GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
-
- g_type_class_add_private (klass, sizeof (GVfsReadStreamPrivate));
-
- gobject_class->finalize = g_vfs_read_stream_finalize;
-
- signals[NEW_JOB] =
- g_signal_new ("new_job",
- G_TYPE_FROM_CLASS (gobject_class),
- G_SIGNAL_RUN_LAST,
- G_STRUCT_OFFSET (GVfsReadStreamClass, new_job),
- NULL, NULL,
- g_cclosure_marshal_VOID__POINTER,
- G_TYPE_NONE, 1, G_TYPE_VFS_JOB);
-
- signals[CLOSED] =
- g_signal_new ("closed",
- G_TYPE_FROM_CLASS (gobject_class),
- G_SIGNAL_RUN_LAST,
- G_STRUCT_OFFSET (GVfsReadStreamClass, closed),
- NULL, NULL,
- g_cclosure_marshal_VOID__VOID,
- G_TYPE_NONE, 0);
-
-}
-
-static void
-g_vfs_read_stream_init (GVfsReadStream *stream)
-{
- stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
- G_TYPE_VFS_READ_STREAM,
- GVfsReadStreamPrivate);
- stream->priv->remote_fd = -1;
-}
-
-static void
-g_vfs_read_stream_connection_closed (GVfsReadStream *stream)
-{
- if (stream->priv->connection_closed)
- return;
- stream->priv->connection_closed = TRUE;
-
- if (stream->priv->current_job == NULL &&
- stream->priv->backend_handle != NULL)
- {
- stream->priv->current_job = g_vfs_job_close_read_new (stream, stream->priv->backend_handle, stream->priv->backend);
- stream->priv->current_job_seq_nr = 0;
- g_signal_emit (stream, signals[NEW_JOB], 0, stream->priv->current_job);
- }
- /* Otherwise we'll close when current_job is finished */
-}
-
-static void
-got_command (GVfsReadStream *stream,
- guint32 command,
- guint32 seq_nr,
- guint32 arg1,
- guint32 arg2)
-{
- GVfsJob *job;
- GError *error;
- GSeekType seek_type;
-
- g_print ("got_command %d %d %d %d\n", command, seq_nr, arg1, arg2);
-
- if (stream->priv->current_job != NULL)
- {
- if (command != G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL)
- {
- g_warning ("Ignored non-cancel request with outstanding request");
- return;
- }
-
- if (arg1 == stream->priv->current_job_seq_nr)
- g_vfs_job_cancel (stream->priv->current_job);
- return;
- }
-
- job = NULL;
- switch (command)
- {
- case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_READ:
- job = g_vfs_job_read_new (stream,
- stream->priv->backend_handle,
- arg1,
- stream->priv->backend);
- break;
- case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CLOSE:
- job = g_vfs_job_close_read_new (stream,
- stream->priv->backend_handle,
- stream->priv->backend);
- break;
- case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_CUR:
- case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_END:
- case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_SET:
- seek_type = G_SEEK_SET;
- if (command == G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_END)
- seek_type = G_SEEK_END;
- else if (command == G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SEEK_CUR)
- seek_type = G_SEEK_CUR;
-
- stream->priv->seek_generation++;
- job = g_vfs_job_seek_read_new (stream,
- stream->priv->backend_handle,
- seek_type,
- ((goffset)arg1) | (((goffset)arg2) << 32),
- stream->priv->backend);
- break;
-
- case G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL:
- /* Ignore cancel with no outstanding job */
- break;
-
- default:
- error = NULL;
- g_set_error (&error, G_VFS_ERROR,
- G_VFS_ERROR_INTERNAL_ERROR,
- "Unknown stream command %d\n", command);
- g_vfs_read_stream_send_error (stream, error);
- g_error_free (error);
- break;
- }
-
- if (job)
- {
- stream->priv->current_job = job;
- stream->priv->current_job_seq_nr = seq_nr;
- g_signal_emit (stream, signals[NEW_JOB], 0, job);
- }
-}
-
-static void
-command_read_cb (GInputStream *input_stream,
- void *buffer,
- gsize count_requested,
- gssize count_read,
- gpointer data,
- GError *error)
-{
- RequestReader *reader = data;
- GVfsDaemonSocketProtocolRequest *cmd;
- guint32 seq_nr;
- guint32 command;
- guint32 arg1, arg2;
-
- if (reader->read_stream == NULL)
- {
- /* ReadStream was finalized */
- g_object_unref (reader->command_stream);
- g_free (reader);
- return;
- }
-
- if (count_read <= 0)
- {
- reader->read_stream->priv->request_reader = NULL;
- g_vfs_read_stream_connection_closed (reader->read_stream);
- g_object_unref (reader->command_stream);
- g_free (reader);
- return;
- }
-
- reader->buffer_size += count_read;
-
- if (reader->buffer_size < G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE)
- {
- g_input_stream_read_async (reader->command_stream,
- reader->buffer + reader->buffer_size,
- G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE - reader->buffer_size,
- 0,
- command_read_cb,
- reader,
- NULL);
- return;
- }
-
- cmd = (GVfsDaemonSocketProtocolRequest *)reader->buffer;
- command = g_ntohl (cmd->command);
- arg1 = g_ntohl (cmd->arg1);
- arg2 = g_ntohl (cmd->arg2);
- seq_nr = g_ntohl (cmd->seq_nr);
- reader->buffer_size = 0;
-
- got_command (reader->read_stream, command, seq_nr, arg1, arg2);
-
- /* Request more commands, so can get cancel requests */
-
- reader->buffer_size = 0;
- g_input_stream_read_async (reader->command_stream,
- reader->buffer + reader->buffer_size,
- G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE - reader->buffer_size,
- 0,
- command_read_cb,
- reader,
- NULL);
-}
-
-static void
-start_request_reader (GVfsReadStream *stream)
-{
- RequestReader *reader;
-
- reader = g_new0 (RequestReader, 1);
- reader->read_stream = stream;
- reader->command_stream = g_object_ref (stream->priv->command_stream);
- reader->buffer_size = 0;
-
- g_input_stream_read_async (reader->command_stream,
- reader->buffer + reader->buffer_size,
- G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_SIZE - reader->buffer_size,
- 0,
- command_read_cb,
- reader,
- NULL);
-
- stream->priv->request_reader = reader;
-}
-
-
-static void
-send_reply_cb (GOutputStream *output_stream,
- void *buffer,
- gsize bytes_requested,
- gssize bytes_written,
- gpointer data,
- GError *error)
-{
- GVfsReadStream *stream = data;
- GVfsJob *job;
-
- g_print ("send_reply_cb: %d\n", bytes_written);
-
- if (bytes_written <= 0)
- {
- g_vfs_read_stream_connection_closed (stream);
- goto error_out;
- }
-
- if (stream->priv->reply_buffer_pos < G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE)
- {
- stream->priv->reply_buffer_pos += bytes_written;
-
- /* Write more of reply header if needed */
- if (stream->priv->reply_buffer_pos < G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE)
- {
- g_output_stream_write_async (stream->priv->reply_stream,
- stream->priv->reply_buffer + stream->priv->reply_buffer_pos,
- G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE - stream->priv->reply_buffer_pos,
- 0,
- send_reply_cb, stream,
- NULL);
- return;
- }
- bytes_written = 0;
- }
-
- stream->priv->output_data_pos += bytes_written;
-
- /* Write more of output_data if needed */
- if (stream->priv->output_data != NULL &&
- stream->priv->output_data_pos < stream->priv->output_data_size)
- {
- g_output_stream_write_async (stream->priv->reply_stream,
- stream->priv->output_data + stream->priv->output_data_pos,
- stream->priv->output_data_size - stream->priv->output_data_pos,
- 0,
- send_reply_cb, stream,
- NULL);
- return;
- }
-
- error_out:
-
- /* Sent full reply */
- stream->priv->output_data = NULL;
-
- job = stream->priv->current_job;
- stream->priv->current_job = NULL;
- g_vfs_job_emit_finished (job);
-
- if (G_IS_VFS_JOB_CLOSE_READ (job))
- {
- g_signal_emit (stream, signals[CLOSED], 0);
- stream->priv->backend_handle = NULL;
- }
- else if (stream->priv->connection_closed)
- {
- stream->priv->current_job = g_vfs_job_close_read_new (stream, stream->priv->backend_handle,
- stream->priv->backend);
- stream->priv->current_job_seq_nr = 0;
- g_signal_emit (stream, signals[NEW_JOB], 0, stream->priv->current_job);
- }
-
- g_object_unref (job);
- g_print ("Sent reply\n");
-}
-
-/* Might be called on an i/o thread */
-static void
-send_reply (GVfsReadStream *stream,
- gboolean use_header,
- char *data,
- gsize data_len)
-{
-
- stream->priv->output_data = data;
- stream->priv->output_data_size = data_len;
- stream->priv->output_data_pos = 0;
-
- if (use_header)
- {
- stream->priv->reply_buffer_pos = 0;
-
- g_output_stream_write_async (stream->priv->reply_stream,
- stream->priv->reply_buffer,
- G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE,
- 0,
- send_reply_cb, stream,
- NULL);
- }
- else
- {
- stream->priv->reply_buffer_pos = G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE;
-
- g_output_stream_write_async (stream->priv->reply_stream,
- stream->priv->output_data,
- stream->priv->output_data_size,
- 0,
- send_reply_cb, stream,
- NULL);
- }
-}
-
-/* Might be called on an i/o thread
- */
-void
-g_vfs_read_stream_send_error (GVfsReadStream *read_stream,
- GError *error)
-{
- char *data;
- gsize data_len;
-
- data = g_error_to_daemon_reply (error, read_stream->priv->current_job_seq_nr, &data_len);
- send_reply (read_stream, FALSE, data, data_len);
-}
-
-
-/* Might be called on an i/o thread
- */
-void
-g_vfs_read_stream_send_seek_offset (GVfsReadStream *read_stream,
- goffset offset)
-{
- GVfsDaemonSocketProtocolReply *reply;
-
- reply = (GVfsDaemonSocketProtocolReply *)read_stream->priv->reply_buffer;
- reply->type = g_htonl (G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SEEK_POS);
- reply->seq_nr = g_htonl (read_stream->priv->current_job_seq_nr);
- reply->arg1 = g_htonl (offset & 0xffffffff);
- reply->arg2 = g_htonl (offset >> 32);
-
- send_reply (read_stream, TRUE, NULL, 0);
-}
-
-/* Might be called on an i/o thread
- */
-void
-g_vfs_read_stream_send_closed (GVfsReadStream *read_stream)
-{
- GVfsDaemonSocketProtocolReply *reply;
-
- reply = (GVfsDaemonSocketProtocolReply *)read_stream->priv->reply_buffer;
- reply->type = g_htonl (G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_CLOSED);
- reply->seq_nr = g_htonl (read_stream->priv->current_job_seq_nr);
- reply->arg1 = g_htonl (0);
- reply->arg2 = g_htonl (0);
-
- send_reply (read_stream, TRUE, NULL, 0);
-}
-
-/* Might be called on an i/o thread
- */
-void
-g_vfs_read_stream_send_data (GVfsReadStream *read_stream,
- char *buffer,
- gsize count)
-{
- GVfsDaemonSocketProtocolReply *reply;
-
- reply = (GVfsDaemonSocketProtocolReply *)read_stream->priv->reply_buffer;
- reply->type = g_htonl (G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_DATA);
- reply->seq_nr = g_htonl (read_stream->priv->current_job_seq_nr);
- reply->arg1 = g_htonl (count);
- reply->arg2 = g_htonl (read_stream->priv->seek_generation);
-
- send_reply (read_stream, TRUE, buffer, count);
-}
-
-GVfsReadStream *
-g_vfs_read_stream_new (GVfsBackend *backend,
- GError **error)
-{
- GVfsReadStream *stream;
- int socket_fds[2];
- int ret;
-
- ret = socketpair (AF_UNIX, SOCK_STREAM, 0, socket_fds);
- if (ret == -1)
- {
- g_set_error (error, G_FILE_ERROR,
- g_file_error_from_errno (errno),
- _("Error creating socket pair"));
- return NULL;
- }
-
- stream = g_object_new (G_TYPE_VFS_READ_STREAM, NULL);
- stream->priv->backend = backend;
- stream->priv->command_stream = g_input_stream_socket_new (socket_fds[0], TRUE);
- stream->priv->reply_stream = g_output_stream_socket_new (socket_fds[0], FALSE);
- stream->priv->remote_fd = socket_fds[1];
-
- start_request_reader (stream);
-
- return stream;
-}
-
-int
-g_vfs_read_stream_steal_remote_fd (GVfsReadStream *stream)
-{
- int fd;
- fd = stream->priv->remote_fd;
- stream->priv->remote_fd = -1;
- return fd;
-}
-
-GVfsBackend *
-g_vfs_read_stream_get_backend (GVfsReadStream *read_stream)
-{
- return read_stream->priv->backend;
-}
-
-void
-g_vfs_read_stream_set_backend_handle (GVfsReadStream *read_stream,
- GVfsBackendHandle backend_handle)
-{
- read_stream->priv->backend_handle = backend_handle;
-}
diff --git a/daemon/gvfsreadstream.h b/daemon/gvfsreadstream.h
deleted file mode 100644
index 1a571093..00000000
--- a/daemon/gvfsreadstream.h
+++ /dev/null
@@ -1,63 +0,0 @@
-#ifndef __G_VFS_READ_STREAM_H__
-#define __G_VFS_READ_STREAM_H__
-
-#include <glib-object.h>
-#include <gvfsjob.h>
-#include <gvfsbackend.h>
-#include <gvfs/gvfstypes.h>
-
-G_BEGIN_DECLS
-
-#define G_TYPE_VFS_READ_STREAM (g_vfs_read_stream_get_type ())
-#define G_VFS_READ_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), G_TYPE_VFS_READ_STREAM, GVfsReadStream))
-#define G_VFS_READ_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), G_TYPE_VFS_READ_STREAM, GVfsReadStreamClass))
-#define G_IS_VFS_READ_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), G_TYPE_VFS_READ_STREAM))
-#define G_IS_VFS_READ_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), G_TYPE_VFS_READ_STREAM))
-#define G_VFS_READ_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), G_TYPE_VFS_READ_STREAM, GVfsReadStreamClass))
-
-typedef struct _GVfsReadStream GVfsReadStream;
-typedef struct _GVfsReadStreamClass GVfsReadStreamClass;
-typedef struct _GVfsReadStreamPrivate GVfsReadStreamPrivate;
-
-struct _GVfsReadStream
-{
- GObject parent_instance;
-
- GVfsReadStreamPrivate *priv;
-};
-
-struct _GVfsReadStreamClass
-{
- GObjectClass parent_class;
-
- /* signals */
-
- void (*new_job) (GVfsReadStream *stream,
- GVfsJob *job);
- void (*closed) (GVfsReadStream *stream);
-};
-
-GType g_vfs_read_stream_get_type (void) G_GNUC_CONST;
-
-GVfsReadStream *g_vfs_read_stream_new (GVfsBackend *backend,
- GError **error);
-int g_vfs_read_stream_steal_remote_fd (GVfsReadStream *read_stream);
-GVfsBackend *g_vfs_read_stream_get_backend (GVfsReadStream *read_stream);
-void g_vfs_read_stream_set_backend_handle (GVfsReadStream *read_stream,
- GVfsBackendHandle backend_handle);
-gboolean g_vfs_read_stream_has_job (GVfsReadStream *read_stream);
-GVfsJob * g_vfs_read_stream_get_job (GVfsReadStream *read_stream);
-void g_vfs_read_stream_send_data (GVfsReadStream *read_stream,
- char *buffer,
- gsize count);
-void g_vfs_read_stream_send_error (GVfsReadStream *read_stream,
- GError *error);
-void g_vfs_read_stream_send_closed (GVfsReadStream *read_stream);
-void g_vfs_read_stream_send_seek_offset (GVfsReadStream *read_stream,
- goffset offset);
-
-/* TODO: i/o priority? */
-
-G_END_DECLS
-
-#endif /* __G_VFS_READ_STREAM_H__ */
diff --git a/gvfs/gvfsdaemondbus.c b/gvfs/gvfsdaemondbus.c
index 810a0ea6..9661d51a 100644
--- a/gvfs/gvfsdaemondbus.c
+++ b/gvfs/gvfsdaemondbus.c
@@ -396,6 +396,7 @@ async_dbus_response (DBusPendingCall *pending,
{
AsyncDBusCall *async_call = data;
DBusMessage *reply;
+ DBusError derror;
if (async_call->cancelled_tag)
g_signal_handler_disconnect (async_call->cancellable,
@@ -404,7 +405,15 @@ async_dbus_response (DBusPendingCall *pending,
reply = dbus_pending_call_steal_reply (pending);
dbus_pending_call_unref (pending);
- async_dbus_call_finish (async_call, reply);
+ dbus_error_init (&derror);
+ if (dbus_set_error_from_message (&derror, reply))
+ {
+ _g_error_from_dbus (&derror, &async_call->io_error);
+ dbus_error_free (&derror);
+ async_dbus_call_finish (async_call, NULL);
+ }
+ else
+ async_dbus_call_finish (async_call, reply);
dbus_message_unref (reply);
}
@@ -833,6 +842,7 @@ _g_vfs_daemon_call_sync (const char *mountpoint,
if (!sent_cancel && g_cancellable_is_cancelled (cancellable))
{
+ g_print ("Sending cancel\n");
sent_cancel = TRUE;
serial = dbus_message_get_serial (message);
cancel_message =
@@ -866,6 +876,9 @@ _g_vfs_daemon_call_sync (const char *mountpoint,
reply = dbus_pending_call_steal_reply (pending);
dbus_pending_call_unref (pending);
+
+ g_print ("reply: %p, is_error: %d\n", reply,
+ dbus_message_get_type (reply) == DBUS_MESSAGE_TYPE_ERROR);
}
else
{
@@ -883,6 +896,14 @@ _g_vfs_daemon_call_sync (const char *mountpoint,
if (connection_out)
*connection_out = connection;
+
+ if (dbus_set_error_from_message (&derror, reply))
+ {
+ _g_error_from_dbus (&derror, error);
+ dbus_error_free (&derror);
+ dbus_message_unref (reply);
+ return NULL;
+ }
return reply;
}
@@ -953,6 +974,13 @@ get_connection_sync (const char *mountpoint,
dbus_error_free (&derror);
return NULL;
}
+
+ if (dbus_set_error_from_message (&derror, reply))
+ {
+ _g_error_from_dbus (&derror, error);
+ dbus_error_free (&derror);
+ return NULL;
+ }
dbus_message_get_args (reply, NULL,
DBUS_TYPE_STRING, &address1,
diff --git a/gvfs/test.c b/gvfs/test.c
index 912f2aeb..c04e4e2f 100644
--- a/gvfs/test.c
+++ b/gvfs/test.c
@@ -8,6 +8,15 @@
#include "gfileoutputstreamlocal.h"
#include "ginputstreamsocket.h"
+static gpointer
+cancel_thread (gpointer data)
+{
+ sleep (1);
+ g_print ("cancel_thread GO!\n");
+ g_cancellable_cancel (G_CANCELLABLE (data));
+ return NULL;
+}
+
static void
test_out ()
{
@@ -73,15 +82,22 @@ test_sync (char *uri, gboolean dump)
gssize res;
gboolean close_res;
GCancellable *c;
+ GError *error;
g_print ("> test_sync %s\n", uri);
c = g_cancellable_new ();
file = g_file_get_for_uri (uri);
- in = (GInputStream *)g_file_read (file, c, NULL);
+ if (0) g_thread_create (cancel_thread, c, FALSE, NULL);
+ error = NULL;
+ in = (GInputStream *)g_file_read (file, c, &error);
+ g_print ("input stream: %p\n", in);
if (in == NULL)
- goto out;
+ {
+ g_print ("open error %d: %s\n", error->code, error->message);
+ goto out;
+ }
while (1)
{
@@ -164,6 +180,8 @@ test_async_open_callback (GFile *file,
g_print ("test_async_open_callback: %p\n", stream);
if (stream)
g_input_stream_read_async (G_INPUT_STREAM (stream), data->buffer, 1024, 0, read_done, data, data->c);
+ else
+ g_print ("%s\n", error->message);
}
@@ -177,7 +195,8 @@ test_async (char *uri, gboolean dump)
data->c = g_cancellable_new ();
file = g_file_get_for_uri (uri);
- g_file_read_async (file, 0, test_async_open_callback, data, NULL, NULL);
+ g_file_read_async (file, 0, test_async_open_callback, data, NULL, data->c);
+ if (0) g_thread_create (cancel_thread, data->c, FALSE, NULL);
}
static gboolean
@@ -191,15 +210,6 @@ cancel_cancellable_cb (gpointer data)
return FALSE;
}
-static gpointer
-cancel_thread (gpointer data)
-{
- sleep (1);
- g_print ("cancel_thread GO!\n");
- g_cancellable_cancel (G_CANCELLABLE (data));
- return NULL;
-}
-
static void
test_seek (void)
{