summaryrefslogtreecommitdiff
path: root/daemon
diff options
context:
space:
mode:
authorCarl-Anton Ingmarsson <ca.ingmarsson@gmail.com>2012-01-29 23:24:05 +0100
committerCarl-Anton Ingmarsson <ca.ingmarsson@gmail.com>2012-08-09 21:06:05 +0200
commitc1b3b8a3e3d979a4857587b57026d91f56ff1627 (patch)
tree23d597b7ad35e20aa9f8cc129a78d77c677adc61 /daemon
parentdaa712e7a8848c923ce698e4296e21ab5fde6c68 (diff)
downloadgvfs-c1b3b8a3e3d979a4857587b57026d91f56ff1627.tar.gz
Use a worker thread in GVfsAfpConnection
Diffstat (limited to 'daemon')
-rw-r--r--daemon/gvfsafpconnection.c550
-rw-r--r--daemon/gvfsafpconnection.h6
-rw-r--r--daemon/gvfsafpserver.c71
-rw-r--r--daemon/gvfsafpvolume.c8
4 files changed, 413 insertions, 222 deletions
diff --git a/daemon/gvfsafpconnection.c b/daemon/gvfsafpconnection.c
index 26c42bab..aaeebc2a 100644
--- a/daemon/gvfsafpconnection.c
+++ b/daemon/gvfsafpconnection.c
@@ -618,16 +618,30 @@ typedef struct {
guint32 reserved;
} DSIHeader;
+enum {
+ FLAG_INITIALIZED = 1 << 0,
+ FLAG_PENDING_CLOSE = 1 << 1,
+ FLAG_CLOSED = 1 << 2
+};
+
struct _GVfsAfpConnectionPrivate
{
GSocketConnectable *addr;
- GIOStream *conn;
+ GIOStream *stream;
+ /* Flags */
+ volatile gint atomic_flags;
+
guint16 request_id;
guint32 kRequestQuanta;
guint32 kServerReplayCacheSize;
+ GThread *worker_thread;
+ GMainContext *worker_context;
+ GMainLoop *worker_loop;
+ GMutex mutex;
+
GQueue *request_queue;
GHashTable *request_hash;
@@ -636,10 +650,12 @@ struct _GVfsAfpConnectionPrivate
DSIHeader write_dsi_header;
/* read loop */
- gboolean read_loop_running;
+ GCancellable *read_cancellable;
DSIHeader read_dsi_header;
char *reply_buf;
gboolean free_reply_buf;
+
+ GSList *pending_closes;
};
typedef enum
@@ -667,8 +683,58 @@ typedef struct
char *reply_buf;
GSimpleAsyncResult *simple;
GCancellable *cancellable;
+
+ GVfsAfpConnection *conn;
} RequestData;
+typedef struct
+{
+ GMutex mutex;
+ GCond cond;
+ GVfsAfpConnection *conn;
+ GCancellable *cancellable;
+ gboolean res;
+ GError **error;
+ void *data;
+} SyncData;
+
+static void
+sync_data_init (SyncData *data, GVfsAfpConnection *conn, GError **error)
+{
+ g_mutex_init (&data->mutex);
+ g_cond_init (&data->cond);
+ data->conn = conn;
+ data->error = error;
+ data->res = FALSE;
+}
+
+static void
+sync_data_clear (SyncData *data)
+{
+ g_mutex_clear (&data->mutex);
+ g_cond_clear (&data->cond);
+}
+
+static void
+sync_data_signal (SyncData *data)
+{
+ g_mutex_lock (&data->mutex);
+ g_cond_signal (&data->cond);
+ g_mutex_unlock (&data->mutex);
+}
+
+static void
+sync_data_wait (SyncData *data)
+{
+ g_mutex_lock (&data->mutex);
+ g_cond_wait (&data->cond, &data->mutex);
+ g_mutex_unlock (&data->mutex);
+}
+
+static void send_request_unlocked (GVfsAfpConnection *afp_connection);
+static void close_connection (GVfsAfpConnection *conn);
+static void read_reply (GVfsAfpConnection *afp_connection);
+
static void
free_request_data (RequestData *req_data)
{
@@ -682,6 +748,31 @@ free_request_data (RequestData *req_data)
g_slice_free (RequestData, req_data);
}
+static gboolean
+check_open (GVfsAfpConnection *conn, GError **error)
+{
+ GVfsAfpConnectionPrivate *priv = conn->priv;
+
+ /* Acts as memory barrier */
+ gint flags = g_atomic_int_get (&priv->atomic_flags);
+
+ if (!(flags & FLAG_INITIALIZED))
+ {
+ g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
+ _("The connection is not opened"));
+ return FALSE;
+ }
+
+ else if ((flags & FLAG_CLOSED))
+ {
+ g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+ _("The connection is closed"));
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
static void
g_vfs_afp_connection_init (GVfsAfpConnection *afp_connection)
{
@@ -690,20 +781,17 @@ g_vfs_afp_connection_init (GVfsAfpConnection *afp_connection)
afp_connection->priv = priv = G_TYPE_INSTANCE_GET_PRIVATE (afp_connection,
G_VFS_TYPE_AFP_CONNECTION,
GVfsAfpConnectionPrivate);
-
- priv->addr = NULL;
- priv->conn = NULL;
- priv->request_id = 0;
-
priv->kRequestQuanta = -1;
priv->kServerReplayCacheSize = -1;
+ g_mutex_init (&priv->mutex);
+
priv->request_queue = g_queue_new ();
priv->request_hash = g_hash_table_new_full (g_direct_hash, g_direct_equal,
NULL, (GDestroyNotify)free_request_data);
+ priv->read_cancellable = g_cancellable_new ();
priv->send_loop_running = FALSE;
- priv->read_loop_running = FALSE;
}
static void
@@ -712,11 +800,11 @@ g_vfs_afp_connection_finalize (GObject *object)
GVfsAfpConnection *afp_connection = (GVfsAfpConnection *)object;
GVfsAfpConnectionPrivate *priv = afp_connection->priv;
- if (priv->addr)
- g_object_unref (priv->addr);
-
- if (priv->conn)
- g_object_unref (priv->conn);
+ g_clear_object (&priv->addr);
+ g_clear_object (&priv->stream);
+ g_clear_object (&priv->read_cancellable);
+
+ g_mutex_clear (&priv->mutex);
G_OBJECT_CLASS (g_vfs_afp_connection_parent_class)->finalize (object);
}
@@ -738,9 +826,6 @@ g_vfs_afp_connection_class_init (GVfsAfpConnectionClass *klass)
G_TYPE_NONE, 1, G_TYPE_UINT);
}
-static void read_reply (GVfsAfpConnection *afp_connection);
-static void send_request (GVfsAfpConnection *afp_connection);
-
static guint16
get_request_id (GVfsAfpConnection *afp_connection)
{
@@ -749,23 +834,6 @@ get_request_id (GVfsAfpConnection *afp_connection)
return priv->request_id++;
}
-static void
-run_loop (GVfsAfpConnection *afp_connection)
-{
- GVfsAfpConnectionPrivate *priv = afp_connection->priv;
-
- if (!priv->send_loop_running)
- {
- priv->send_loop_running = TRUE;
- send_request (afp_connection);
- }
- if (!priv->read_loop_running)
- {
- priv->read_loop_running = TRUE;
- read_reply (afp_connection);
- }
-}
-
typedef struct
{
void *buffer;
@@ -896,8 +964,16 @@ dispatch_reply (GVfsAfpConnection *afp_connection)
req_data = g_slice_new0 (RequestData);
req_data->type = REQUEST_TYPE_TICKLE;
+ /* take lock */
+ g_mutex_lock (&priv->mutex);
g_queue_push_head (priv->request_queue, req_data);
- run_loop (afp_connection);
+ if (!priv->send_loop_running) {
+ priv->send_loop_running = TRUE;
+ send_request_unlocked (afp_connection);
+ }
+ /* release lock */
+ g_mutex_unlock (&priv->mutex);
+
break;
}
@@ -928,7 +1004,7 @@ dispatch_reply (GVfsAfpConnection *afp_connection)
g_simple_async_result_set_op_res_gpointer (req_data->simple, reply,
g_object_unref);
- g_simple_async_result_complete (req_data->simple);
+ g_simple_async_result_complete_in_idle (req_data->simple);
g_hash_table_remove (priv->request_hash,
GUINT_TO_POINTER ((guint)dsi_header->requestID));
@@ -951,6 +1027,13 @@ read_data_cb (GObject *object, GAsyncResult *res, gpointer user_data)
gboolean result;
GError *err = NULL;
+ if (g_atomic_int_get (&priv->atomic_flags) & FLAG_PENDING_CLOSE)
+ {
+ if (!priv->send_loop_running)
+ close_connection (afp_connection);
+ return;
+ }
+
result = read_all_finish (input, res, NULL, &err);
if (!result)
{
@@ -976,6 +1059,13 @@ read_dsi_header_cb (GObject *object, GAsyncResult *res, gpointer user_data)
gboolean result;
GError *err = NULL;
DSIHeader *dsi_header;
+
+ if (g_atomic_int_get (&priv->atomic_flags) & FLAG_PENDING_CLOSE)
+ {
+ if (!priv->send_loop_running)
+ close_connection (afp_conn);
+ return;
+ }
result = read_all_finish (input, res, NULL, &err);
if (!result)
@@ -1008,7 +1098,7 @@ read_dsi_header_cb (GObject *object, GAsyncResult *res, gpointer user_data)
}
read_all_async (input, priv->reply_buf, dsi_header->totalDataLength,
- 0, NULL, read_data_cb, afp_conn);
+ 0, priv->read_cancellable, read_data_cb, afp_conn);
return;
}
@@ -1024,10 +1114,17 @@ read_reply (GVfsAfpConnection *afp_connection)
GInputStream *input;
- input = g_io_stream_get_input_stream (priv->conn);
+ if (g_atomic_int_get (&priv->atomic_flags) & FLAG_PENDING_CLOSE)
+ {
+ if (!priv->send_loop_running)
+ close_connection (afp_connection);
+ return;
+ }
- read_all_async (input, &priv->read_dsi_header, sizeof (DSIHeader), 0, NULL,
- read_dsi_header_cb, afp_connection);
+ input = g_io_stream_get_input_stream (priv->stream);
+
+ read_all_async (input, &priv->read_dsi_header, sizeof (DSIHeader), 0,
+ priv->read_cancellable, read_dsi_header_cb, afp_connection);
}
typedef struct
@@ -1141,15 +1238,6 @@ write_all_finish (GOutputStream *stream,
return TRUE;
}
-static void
-remove_first (GQueue *request_queue)
-{
- RequestData *req_data;
-
- req_data = g_queue_pop_head (request_queue);
- free_request_data (req_data);
-}
-
#define HANDLE_RES() { \
gboolean result; \
GError *err = NULL; \
@@ -1160,99 +1248,95 @@ remove_first (GQueue *request_queue)
if (req_data->simple) \
{ \
g_simple_async_result_set_from_error (req_data->simple, err); \
- g_simple_async_result_complete (req_data->simple); \
+ g_simple_async_result_complete_in_idle (req_data->simple); \
} \
\
- remove_first (priv->request_queue); \
g_error_free (err); \
+ free_request_data (req_data); \
\
- send_request (afp_conn); \
+ g_mutex_lock (&priv->mutex); \
+ send_request_unlocked (afp_conn); \
+ g_mutex_unlock (&priv->mutex); \
return; \
} \
}
-
static void
write_buf_cb (GObject *object, GAsyncResult *res, gpointer user_data)
{
GOutputStream *output = G_OUTPUT_STREAM (object);
- GVfsAfpConnection *afp_conn = G_VFS_AFP_CONNECTION (user_data);
+ RequestData *req_data = user_data;
+ GVfsAfpConnection *afp_conn = req_data->conn;
GVfsAfpConnectionPrivate *priv = afp_conn->priv;
-
- RequestData *req_data;
-
- req_data = g_queue_peek_head (priv->request_queue);
HANDLE_RES ();
g_hash_table_insert (priv->request_hash,
GUINT_TO_POINTER ((guint)GUINT16_FROM_BE (priv->write_dsi_header.requestID)),
req_data);
- g_queue_pop_head (priv->request_queue);
- send_request (afp_conn);
+
+ g_mutex_lock (&priv->mutex);
+ send_request_unlocked (afp_conn);
+ g_mutex_unlock (&priv->mutex);
}
static void
write_command_cb (GObject *object, GAsyncResult *res, gpointer user_data)
{
GOutputStream *output = G_OUTPUT_STREAM (object);
- GVfsAfpConnection *afp_conn = G_VFS_AFP_CONNECTION (user_data);
+ RequestData *req_data = user_data;
+ GVfsAfpConnection *afp_conn = req_data->conn;
GVfsAfpConnectionPrivate *priv = afp_conn->priv;
- RequestData *req_data;
-
- req_data = g_queue_peek_head (priv->request_queue);
-
HANDLE_RES ();
if (priv->write_dsi_header.command == DSI_WRITE &&
req_data->command->buf)
{
write_all_async (output, req_data->command->buf, req_data->command->buf_size,
- 0, NULL, write_buf_cb, afp_conn);
+ 0, NULL, write_buf_cb, req_data);
return;
}
g_hash_table_insert (priv->request_hash,
GUINT_TO_POINTER ((guint)GUINT16_FROM_BE (priv->write_dsi_header.requestID)),
req_data);
- g_queue_pop_head (priv->request_queue);
- send_request (afp_conn);
+ g_mutex_lock (&priv->mutex);
+ send_request_unlocked (afp_conn);
+ g_mutex_unlock (&priv->mutex);
}
static void
write_dsi_header_cb (GObject *object, GAsyncResult *res, gpointer user_data)
{
GOutputStream *output = G_OUTPUT_STREAM (object);
- GVfsAfpConnection *afp_conn = G_VFS_AFP_CONNECTION (user_data);
+ RequestData *req_data = user_data;
+ GVfsAfpConnection *afp_conn = req_data->conn;
GVfsAfpConnectionPrivate *priv = afp_conn->priv;
- RequestData *req_data;
-
char *data;
gsize size;
- req_data = g_queue_peek_head (priv->request_queue);
-
HANDLE_RES ();
if (req_data->type == REQUEST_TYPE_TICKLE)
{
- remove_first (priv->request_queue);
- send_request (afp_conn);
+ g_mutex_lock (&priv->mutex);
+ send_request_unlocked (afp_conn);
+ g_mutex_unlock (&priv->mutex);
return;
}
data = g_vfs_afp_command_get_data (req_data->command);
size = g_vfs_afp_command_get_size (req_data->command);
- write_all_async (output, data, size, 0, NULL, write_command_cb, afp_conn);
+ write_all_async (output, data, size, 0, NULL, write_command_cb, req_data);
}
static void
-send_request (GVfsAfpConnection *afp_connection)
+send_request_unlocked (GVfsAfpConnection *afp_connection)
{
GVfsAfpConnectionPrivate *priv = afp_connection->priv;
@@ -1260,7 +1344,7 @@ send_request (GVfsAfpConnection *afp_connection)
guint32 writeOffset;
guint8 dsi_command;
- while ((req_data = g_queue_peek_head (priv->request_queue)))
+ while ((req_data = g_queue_pop_head (priv->request_queue)))
{
if (req_data->cancellable && g_cancellable_is_cancelled (req_data->cancellable))
{
@@ -1270,9 +1354,8 @@ send_request (GVfsAfpConnection *afp_connection)
g_cancellable_set_error_if_cancelled (req_data->cancellable, &err);
g_simple_async_result_take_error (req_data->simple, err);
- g_simple_async_result_complete (req_data->simple);
+ g_simple_async_result_complete_in_idle (req_data->simple);
}
- remove_first (priv->request_queue);
}
else
break;
@@ -1335,9 +1418,28 @@ send_request (GVfsAfpConnection *afp_connection)
}
- write_all_async (g_io_stream_get_output_stream (priv->conn),
+ write_all_async (g_io_stream_get_output_stream (priv->stream),
&priv->write_dsi_header, sizeof (DSIHeader), 0,
- NULL, write_dsi_header_cb, afp_connection);
+ NULL, write_dsi_header_cb, req_data);
+}
+
+static gboolean
+start_send_loop_func (gpointer data)
+{
+ GVfsAfpConnection *conn = data;
+ GVfsAfpConnectionPrivate *priv = conn->priv;
+
+ g_mutex_lock (&priv->mutex);
+
+ if (priv->send_loop_running)
+ goto out;
+
+ priv->send_loop_running = TRUE;
+ send_request_unlocked (conn);
+
+out:
+ g_mutex_unlock (&priv->mutex);
+ return G_SOURCE_REMOVE;
}
void
@@ -1350,7 +1452,15 @@ g_vfs_afp_connection_send_command (GVfsAfpConnection *afp_connection,
{
GVfsAfpConnectionPrivate *priv = afp_connection->priv;
+ GError *err = NULL;
RequestData *req_data;
+
+ if (!check_open (afp_connection, &err))
+ {
+ g_simple_async_report_take_gerror_in_idle (G_OBJECT(afp_connection), callback,
+ user_data, err);
+ return;
+ }
req_data = g_slice_new0 (RequestData);
req_data->type = REQUEST_TYPE_COMMAND;
@@ -1360,12 +1470,23 @@ g_vfs_afp_connection_send_command (GVfsAfpConnection *afp_connection,
req_data->simple = g_simple_async_result_new (G_OBJECT (afp_connection), callback,
user_data,
g_vfs_afp_connection_send_command);
+ req_data->conn = afp_connection;
+
if (cancellable)
req_data->cancellable = g_object_ref (cancellable);
+ /* Take lock */
+ g_mutex_lock (&priv->mutex);
+
g_queue_push_tail (priv->request_queue, req_data);
+ if (!priv->send_loop_running)
+ {
+ g_main_context_invoke (priv->worker_context, start_send_loop_func,
+ afp_connection);
+ }
- run_loop (afp_connection);
+ /* Release lock */
+ g_mutex_unlock (&priv->mutex);
}
GVfsAfpReply *
@@ -1443,25 +1564,6 @@ read_reply_sync (GInputStream *input,
return TRUE;
}
-GVfsAfpReply *
-g_vfs_afp_connection_read_reply_sync (GVfsAfpConnection *afp_connection,
- GCancellable *cancellable,
- GError **error)
-{
- GVfsAfpConnectionPrivate *priv = afp_connection->priv;
-
- gboolean res;
- char *data;
- DSIHeader dsi_header;
-
- res = read_reply_sync (g_io_stream_get_input_stream (priv->conn), &dsi_header,
- &data, cancellable, error);
- if (!res)
- return NULL;
-
- return g_vfs_afp_reply_new (dsi_header.errorCode, data, dsi_header.totalDataLength, TRUE);
-}
-
static gboolean
send_request_sync (GOutputStream *output,
DsiCommand command,
@@ -1501,79 +1603,162 @@ send_request_sync (GOutputStream *output,
return TRUE;
}
-gboolean
+static void
+send_command_sync_cb (GObject *source_object, GAsyncResult *res, gpointer user_data)
+{
+ SyncData *sync_data = user_data;
+
+ sync_data->data = g_object_ref (res);
+ sync_data_signal (sync_data);
+}
+
+GVfsAfpReply *
g_vfs_afp_connection_send_command_sync (GVfsAfpConnection *afp_connection,
- GVfsAfpCommand *afp_command,
+ GVfsAfpCommand *command,
GCancellable *cancellable,
GError **error)
{
- GVfsAfpConnectionPrivate *priv = afp_connection->priv;
+ SyncData sync_data;
+ GVfsAfpReply *reply;
- DsiCommand dsi_command;
- guint16 req_id;
- guint32 writeOffset;
+ if (!check_open (afp_connection, error))
+ return FALSE;
- /* set dsi_command */
- switch (afp_command->type)
- {
- case AFP_COMMAND_WRITE:
- writeOffset = 8;
- dsi_command = DSI_WRITE;
- break;
- case AFP_COMMAND_WRITE_EXT:
- writeOffset = 20;
- dsi_command = DSI_WRITE;
- break;
+ sync_data_init (&sync_data, afp_connection, NULL);
- default:
- writeOffset = 0;
- dsi_command = DSI_COMMAND;
- break;
- }
+ g_vfs_afp_connection_send_command (afp_connection, command, NULL,
+ send_command_sync_cb, cancellable, &sync_data);
- req_id = get_request_id (afp_connection);
- return send_request_sync (g_io_stream_get_output_stream (priv->conn),
- dsi_command, req_id, writeOffset,
- g_vfs_afp_command_get_size (afp_command),
- g_vfs_afp_command_get_data (afp_command),
- cancellable, error);
+ sync_data_wait (&sync_data);
+
+ reply = g_vfs_afp_connection_send_command_finish (afp_connection, sync_data.data,
+ error);
+ g_object_unref (sync_data.data);
+ sync_data_clear (&sync_data);
+
+ return reply;
}
-gboolean
-g_vfs_afp_connection_close_sync (GVfsAfpConnection *afp_connection,
- GCancellable *cancellable,
- GError **error)
+static void
+close_connection (GVfsAfpConnection *conn)
{
- GVfsAfpConnectionPrivate *priv = afp_connection->priv;
+ GVfsAfpConnectionPrivate *priv = conn->priv;
guint16 req_id;
gboolean res;
+ GError *err = NULL;
+
+ GQueue *request_queue;
+ GSList *pending_closes, *siter;
+ GHashTable *request_hash;
+ GHashTableIter iter;
+ RequestData *req_data;
+
+ /* Take lock */
+ g_mutex_lock (&priv->mutex);
+
+ /* Set closed flag */
+ g_atomic_int_or (&priv->atomic_flags, FLAG_CLOSED);
+
+ request_queue = priv->request_queue;
+ priv->request_queue = NULL;
+
+ request_hash = priv->request_hash;
+ priv->request_hash = NULL;
+
+ pending_closes = priv->pending_closes;
+ priv->pending_closes = NULL;
+
+ /* Release lock */
+ g_mutex_unlock (&priv->mutex);
/* close DSI session */
- req_id = get_request_id (afp_connection);
- res = send_request_sync (g_io_stream_get_output_stream (priv->conn),
+ req_id = get_request_id (conn);
+ res = send_request_sync (g_io_stream_get_output_stream (priv->stream),
DSI_CLOSE_SESSION, req_id, 0, 0, NULL,
- cancellable, error);
+ NULL, &err);
if (!res)
+ g_io_stream_close (priv->stream, NULL, NULL);
+ else
+ res = g_io_stream_close (priv->stream, NULL, &err);
+
+ g_clear_object (&priv->stream);
+
+#define REQUEST_DATA_CLOSED(request_data) { \
+ g_simple_async_result_set_from_error (req_data->simple, \
+ g_error_new_literal (G_IO_ERROR, G_IO_ERROR_CLOSED, "Connection was closed")); \
+ \
+ g_simple_async_result_complete_in_idle (req_data->simple); \
+ free_request_data (req_data); \
+}
+
+ while ((req_data = g_queue_pop_head (request_queue)))
{
- g_io_stream_close (priv->conn, cancellable, NULL);
- g_object_unref (priv->conn);
- return FALSE;
+ REQUEST_DATA_CLOSED (req_data);
}
- res = g_io_stream_close (priv->conn, cancellable, error);
- g_object_unref (priv->conn);
+ g_hash_table_iter_init (&iter, request_hash);
+ while (g_hash_table_iter_next (&iter, NULL, (void **)&req_data))
+ {
+ REQUEST_DATA_CLOSED (req_data);
+ }
- return res;
+#undef REQUEST_DATA_CLOSED
+
+ for (siter = pending_closes; siter != NULL; siter = siter->next)
+ {
+ SyncData *close_data = siter->data;
+
+ close_data->res = TRUE;
+ sync_data_signal (close_data);
+ }
+ g_slist_free (pending_closes);
+
+ /* quit main_loop */
+ g_main_loop_quit (priv->worker_loop);
+ g_main_loop_unref (priv->worker_loop);
+ g_main_context_unref (priv->worker_context);
}
gboolean
-g_vfs_afp_connection_open_sync (GVfsAfpConnection *afp_connection,
- GCancellable *cancellable,
- GError **error)
+g_vfs_afp_connection_close_sync (GVfsAfpConnection *afp_connection,
+ GCancellable *cancellable,
+ GError **error)
{
GVfsAfpConnectionPrivate *priv = afp_connection->priv;
+ SyncData close_data;
+
+ /* Take lock */
+ g_mutex_lock (&priv->mutex);
+
+ if (!check_open (afp_connection, error)) {
+ g_mutex_unlock (&priv->mutex);
+ return FALSE;
+ }
+
+ sync_data_init (&close_data, afp_connection, error);
+ priv->pending_closes = g_slist_prepend (priv->pending_closes, &close_data);
+
+ /* Release lock */
+ g_mutex_unlock (&priv->mutex);
+
+ g_atomic_int_or (&priv->atomic_flags, FLAG_PENDING_CLOSE);
+ g_cancellable_cancel (priv->read_cancellable);
+
+ sync_data_wait (&close_data);
+
+
+ return close_data.res;
+}
+
+static gpointer
+open_thread_func (gpointer user_data)
+{
+ SyncData *data = user_data;
+ GVfsAfpConnection *conn = data->conn;
+ GVfsAfpConnectionPrivate *priv = conn->priv;
+
GSocketClient *client;
guint16 req_id;
@@ -1583,23 +1768,24 @@ g_vfs_afp_connection_open_sync (GVfsAfpConnection *afp_connection,
guint pos;
client = g_socket_client_new ();
- priv->conn = G_IO_STREAM (g_socket_client_connect (client, priv->addr, cancellable, error));
+ priv->stream = G_IO_STREAM (g_socket_client_connect (client, priv->addr, data->cancellable,
+ data->error));
g_object_unref (client);
- if (!priv->conn)
- return FALSE;
+ if (!priv->stream)
+ goto out;
- req_id = get_request_id (afp_connection);
- res = send_request_sync (g_io_stream_get_output_stream (priv->conn),
+ req_id = get_request_id (conn);
+ res = send_request_sync (g_io_stream_get_output_stream (priv->stream),
DSI_OPEN_SESSION, req_id, 0, 0, NULL,
- cancellable, error);
+ data->cancellable, data->error);
if (!res)
- return FALSE;
+ goto out;
- res = read_reply_sync (g_io_stream_get_input_stream (priv->conn),
- &dsi_header, &reply, cancellable, error);
+ res = read_reply_sync (g_io_stream_get_input_stream (priv->stream),
+ &dsi_header, &reply, data->cancellable, data->error);
if (!res)
- return FALSE;
+ goto out;
pos = 0;
while ((dsi_header.totalDataLength - pos) > 2)
@@ -1634,7 +1820,57 @@ g_vfs_afp_connection_open_sync (GVfsAfpConnection *afp_connection,
}
g_free (reply);
- return TRUE;
+out:
+ if (res)
+ g_atomic_int_or (&priv->atomic_flags, FLAG_INITIALIZED);
+
+ /* Signal sync call thread */
+ data->res = res;
+ sync_data_signal (data);
+
+ /* Return from thread on failure */
+ if (!res)
+ {
+ g_clear_object (&priv->stream);
+ return NULL;
+ }
+
+ /* Create MainLoop */
+ priv->worker_context = g_main_context_new ();
+ priv->worker_loop = g_main_loop_new (priv->worker_context, TRUE);
+
+ read_reply (conn);
+
+ /* Run mainloop */
+ g_main_loop_run (priv->worker_loop);
+
+ return NULL;
+}
+
+gboolean
+g_vfs_afp_connection_open_sync (GVfsAfpConnection *afp_connection,
+ GCancellable *cancellable,
+ GError **error)
+{
+ GVfsAfpConnectionPrivate *priv = afp_connection->priv;
+
+ SyncData data;
+
+ sync_data_init (&data, afp_connection, error);
+ data.cancellable = cancellable;
+
+ priv->worker_thread = g_thread_create (open_thread_func, &data,
+ FALSE, error);
+ if (!priv->worker_thread)
+ goto out;
+
+ sync_data_wait (&data);
+
+out:
+
+ sync_data_clear (&data);
+
+ return data.res;
}
GVfsAfpConnection *
diff --git a/daemon/gvfsafpconnection.h b/daemon/gvfsafpconnection.h
index 57db8627..924a47c0 100644
--- a/daemon/gvfsafpconnection.h
+++ b/daemon/gvfsafpconnection.h
@@ -360,15 +360,11 @@ gboolean g_vfs_afp_connection_close_sync (GVfsAfpConnection *af
GCancellable *cancellable,
GError **error);
-gboolean g_vfs_afp_connection_send_command_sync (GVfsAfpConnection *afp_connection,
+GVfsAfpReply* g_vfs_afp_connection_send_command_sync (GVfsAfpConnection *afp_connection,
GVfsAfpCommand *afp_command,
GCancellable *cancellable,
GError **error);
-GVfsAfpReply* g_vfs_afp_connection_read_reply_sync (GVfsAfpConnection *afp_connection,
- GCancellable *cancellable,
- GError **error);
-
GVfsAfpReply* g_vfs_afp_connection_send_command_finish (GVfsAfpConnection *afp_connnection,
GAsyncResult *res,
GError **error);
diff --git a/daemon/gvfsafpserver.c b/daemon/gvfsafpserver.c
index e405917a..9acc2b18 100644
--- a/daemon/gvfsafpserver.c
+++ b/daemon/gvfsafpserver.c
@@ -48,7 +48,6 @@ g_vfs_afp_server_new (GNetworkAddress *addr)
afp_serv = g_object_new (G_VFS_TYPE_AFP_SERVER, NULL);
afp_serv->addr = addr;
- afp_serv->conn = g_vfs_afp_connection_new (G_SOCKET_CONNECTABLE (addr));
return afp_serv;
}
@@ -180,13 +179,9 @@ dhx2_login (GVfsAfpServer *afp_serv,
g_vfs_afp_command_put_pascal (comm, username);
g_vfs_afp_command_pad_to_even (comm);
- res = g_vfs_afp_connection_send_command_sync (afp_serv->conn, comm,
- cancellable, error);
+ reply = g_vfs_afp_connection_send_command_sync (afp_serv->conn, comm,
+ cancellable, error);
g_object_unref (comm);
- if (!res)
- goto error;
-
- reply = g_vfs_afp_connection_read_reply_sync (afp_serv->conn, cancellable, error);
if (!reply)
goto error;
@@ -288,17 +283,11 @@ dhx2_login (GVfsAfpServer *afp_serv,
/* clientNonce */
g_output_stream_write_all (G_OUTPUT_STREAM (comm), clientNonce_buf, 16, NULL, NULL, NULL);
- res = g_vfs_afp_connection_send_command_sync (afp_serv->conn, comm,
+ reply = g_vfs_afp_connection_send_command_sync (afp_serv->conn, comm,
cancellable, error);
g_object_unref (comm);
- if (!res)
- goto error;
-
-
- reply = g_vfs_afp_connection_read_reply_sync (afp_serv->conn, cancellable, error);
if (!reply)
goto error;
-
res_code = g_vfs_afp_reply_get_result_code (reply);
if (res_code != AFP_RESULT_AUTH_CONTINUE)
@@ -359,13 +348,9 @@ dhx2_login (GVfsAfpServer *afp_serv,
G_N_ELEMENTS (answer_buf), NULL, NULL, NULL);
- res = g_vfs_afp_connection_send_command_sync (afp_serv->conn, comm,
- cancellable, error);
+ reply = g_vfs_afp_connection_send_command_sync (afp_serv->conn, comm,
+ cancellable, error);
g_object_unref (comm);
- if (!res)
- goto error;
-
- reply = g_vfs_afp_connection_read_reply_sync (afp_serv->conn, cancellable, error);
if (!reply)
goto error;
@@ -493,15 +478,11 @@ dhx_login (GVfsAfpServer *afp_serv,
g_output_stream_write_all (G_OUTPUT_STREAM(comm), ma_buf, G_N_ELEMENTS (ma_buf),
NULL, NULL, NULL);
- res = g_vfs_afp_connection_send_command_sync (afp_serv->conn, comm,
- cancellable, error);
+ reply = g_vfs_afp_connection_send_command_sync (afp_serv->conn, comm,
+ cancellable, error);
g_object_unref (comm);
- if (!res)
- goto done;
-
- reply = g_vfs_afp_connection_read_reply_sync (afp_serv->conn, cancellable, error);
if (!reply)
- goto error;
+ goto error;
res_code = g_vfs_afp_reply_get_result_code (reply);
if (res_code != AFP_RESULT_AUTH_CONTINUE)
@@ -587,13 +568,9 @@ dhx_login (GVfsAfpServer *afp_serv,
G_N_ELEMENTS (answer_buf), NULL, NULL, NULL);
- res = g_vfs_afp_connection_send_command_sync (afp_serv->conn, comm,
- cancellable, error);
+ reply = g_vfs_afp_connection_send_command_sync (afp_serv->conn, comm,
+ cancellable, error);
g_object_unref (comm);
- if (!res)
- goto done;
-
- reply = g_vfs_afp_connection_read_reply_sync (afp_serv->conn, cancellable, error);
if (!reply)
goto error;
@@ -643,7 +620,6 @@ do_login (GVfsAfpServer *afp_serv,
if (anonymous)
{
GVfsAfpCommand *comm;
- gboolean res;
GVfsAfpReply *reply;
AfpResultCode res_code;
@@ -659,13 +635,9 @@ do_login (GVfsAfpServer *afp_serv,
g_vfs_afp_command_put_pascal (comm, afp_version_to_string (afp_serv->version));
g_vfs_afp_command_put_pascal (comm, AFP_UAM_NO_USER);
- res = g_vfs_afp_connection_send_command_sync (afp_serv->conn, comm,
- cancellable, error);
+ reply = g_vfs_afp_connection_send_command_sync (afp_serv->conn, comm,
+ cancellable, error);
g_object_unref (comm);
- if (!res)
- return FALSE;
-
- reply = g_vfs_afp_connection_read_reply_sync (afp_serv->conn, cancellable, error);
if (!reply)
return FALSE;
@@ -805,7 +777,6 @@ get_server_parms (GVfsAfpServer *server,
{
GVfsAfpCommand *comm;
GVfsAfpReply *reply;
- gboolean res;
AfpResultCode res_code;
gint32 server_time;
@@ -814,13 +785,9 @@ get_server_parms (GVfsAfpServer *server,
/* pad byte */
g_vfs_afp_command_put_byte (comm, 0);
- res = g_vfs_afp_connection_send_command_sync (server->conn, comm, cancellable,
- error);
+ reply = g_vfs_afp_connection_send_command_sync (server->conn, comm, cancellable,
+ error);
g_object_unref (comm);
- if (!res)
- return FALSE;
-
- reply = g_vfs_afp_connection_read_reply_sync (server->conn, cancellable, error);
if (!reply)
return FALSE;
@@ -849,7 +816,6 @@ get_userinfo (GVfsAfpServer *server,
{
GVfsAfpCommand *comm;
guint16 bitmap;
- gboolean res;
GVfsAfpReply *reply;
AfpResultCode res_code;
@@ -863,14 +829,9 @@ get_userinfo (GVfsAfpServer *server,
bitmap = AFP_GET_USER_INFO_BITMAP_GET_UID_BIT | AFP_GET_USER_INFO_BITMAP_GET_GID_BIT;
g_vfs_afp_command_put_uint16 (comm, bitmap);
- res = g_vfs_afp_connection_send_command_sync (server->conn,
+ reply = g_vfs_afp_connection_send_command_sync (server->conn,
comm, cancellable, error);
g_object_unref (comm);
- if (!res)
- return FALSE;
-
- reply = g_vfs_afp_connection_read_reply_sync (server->conn,
- cancellable, error);
if (!reply)
return FALSE;
@@ -1038,6 +999,7 @@ g_vfs_afp_server_login (GVfsAfpServer *server,
try_login:
/* Open connection */
+ server->conn = g_vfs_afp_connection_new (G_SOCKET_CONNECTABLE (server->addr));
res = g_vfs_afp_connection_open_sync (server->conn, cancellable, &err);
if (!res)
break;
@@ -1047,6 +1009,7 @@ try_login:
if (!res)
{
g_vfs_afp_connection_close_sync (server->conn, cancellable, NULL);
+ g_clear_object (&server->conn);
if (!g_error_matches (err, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED))
break;
diff --git a/daemon/gvfsafpvolume.c b/daemon/gvfsafpvolume.c
index 589b21db..e257b015 100644
--- a/daemon/gvfsafpvolume.c
+++ b/daemon/gvfsafpvolume.c
@@ -112,13 +112,9 @@ g_vfs_afp_volume_mount_sync (GVfsAfpVolume *volume,
/* TODO: password? */
- res = g_vfs_afp_connection_send_command_sync (priv->server->conn, comm, cancellable,
- error);
+ reply = g_vfs_afp_connection_send_command_sync (priv->server->conn, comm, cancellable,
+ error);
g_object_unref (comm);
- if (!res)
- return FALSE;
-
- reply = g_vfs_afp_connection_read_reply_sync (priv->server->conn, cancellable, error);
if (!reply)
return FALSE;