diff options
author | Carl-Anton Ingmarsson <ca.ingmarsson@gmail.com> | 2012-01-29 23:24:05 +0100 |
---|---|---|
committer | Carl-Anton Ingmarsson <ca.ingmarsson@gmail.com> | 2012-08-09 21:06:05 +0200 |
commit | c1b3b8a3e3d979a4857587b57026d91f56ff1627 (patch) | |
tree | 23d597b7ad35e20aa9f8cc129a78d77c677adc61 /daemon/gvfsafpconnection.c | |
parent | daa712e7a8848c923ce698e4296e21ab5fde6c68 (diff) | |
download | gvfs-c1b3b8a3e3d979a4857587b57026d91f56ff1627.tar.gz |
Use a worker thread in GVfsAfpConnection
Diffstat (limited to 'daemon/gvfsafpconnection.c')
-rw-r--r-- | daemon/gvfsafpconnection.c | 550 |
1 files changed, 393 insertions, 157 deletions
diff --git a/daemon/gvfsafpconnection.c b/daemon/gvfsafpconnection.c index 26c42bab..aaeebc2a 100644 --- a/daemon/gvfsafpconnection.c +++ b/daemon/gvfsafpconnection.c @@ -618,16 +618,30 @@ typedef struct { guint32 reserved; } DSIHeader; +enum { + FLAG_INITIALIZED = 1 << 0, + FLAG_PENDING_CLOSE = 1 << 1, + FLAG_CLOSED = 1 << 2 +}; + struct _GVfsAfpConnectionPrivate { GSocketConnectable *addr; - GIOStream *conn; + GIOStream *stream; + /* Flags */ + volatile gint atomic_flags; + guint16 request_id; guint32 kRequestQuanta; guint32 kServerReplayCacheSize; + GThread *worker_thread; + GMainContext *worker_context; + GMainLoop *worker_loop; + GMutex mutex; + GQueue *request_queue; GHashTable *request_hash; @@ -636,10 +650,12 @@ struct _GVfsAfpConnectionPrivate DSIHeader write_dsi_header; /* read loop */ - gboolean read_loop_running; + GCancellable *read_cancellable; DSIHeader read_dsi_header; char *reply_buf; gboolean free_reply_buf; + + GSList *pending_closes; }; typedef enum @@ -667,8 +683,58 @@ typedef struct char *reply_buf; GSimpleAsyncResult *simple; GCancellable *cancellable; + + GVfsAfpConnection *conn; } RequestData; +typedef struct +{ + GMutex mutex; + GCond cond; + GVfsAfpConnection *conn; + GCancellable *cancellable; + gboolean res; + GError **error; + void *data; +} SyncData; + +static void +sync_data_init (SyncData *data, GVfsAfpConnection *conn, GError **error) +{ + g_mutex_init (&data->mutex); + g_cond_init (&data->cond); + data->conn = conn; + data->error = error; + data->res = FALSE; +} + +static void +sync_data_clear (SyncData *data) +{ + g_mutex_clear (&data->mutex); + g_cond_clear (&data->cond); +} + +static void +sync_data_signal (SyncData *data) +{ + g_mutex_lock (&data->mutex); + g_cond_signal (&data->cond); + g_mutex_unlock (&data->mutex); +} + +static void +sync_data_wait (SyncData *data) +{ + g_mutex_lock (&data->mutex); + g_cond_wait (&data->cond, &data->mutex); + g_mutex_unlock (&data->mutex); +} + +static void send_request_unlocked (GVfsAfpConnection *afp_connection); +static void close_connection (GVfsAfpConnection *conn); +static void read_reply (GVfsAfpConnection *afp_connection); + static void free_request_data (RequestData *req_data) { @@ -682,6 +748,31 @@ free_request_data (RequestData *req_data) g_slice_free (RequestData, req_data); } +static gboolean +check_open (GVfsAfpConnection *conn, GError **error) +{ + GVfsAfpConnectionPrivate *priv = conn->priv; + + /* Acts as memory barrier */ + gint flags = g_atomic_int_get (&priv->atomic_flags); + + if (!(flags & FLAG_INITIALIZED)) + { + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED, + _("The connection is not opened")); + return FALSE; + } + + else if ((flags & FLAG_CLOSED)) + { + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED, + _("The connection is closed")); + return FALSE; + } + + return TRUE; +} + static void g_vfs_afp_connection_init (GVfsAfpConnection *afp_connection) { @@ -690,20 +781,17 @@ g_vfs_afp_connection_init (GVfsAfpConnection *afp_connection) afp_connection->priv = priv = G_TYPE_INSTANCE_GET_PRIVATE (afp_connection, G_VFS_TYPE_AFP_CONNECTION, GVfsAfpConnectionPrivate); - - priv->addr = NULL; - priv->conn = NULL; - priv->request_id = 0; - priv->kRequestQuanta = -1; priv->kServerReplayCacheSize = -1; + g_mutex_init (&priv->mutex); + priv->request_queue = g_queue_new (); priv->request_hash = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify)free_request_data); + priv->read_cancellable = g_cancellable_new (); priv->send_loop_running = FALSE; - priv->read_loop_running = FALSE; } static void @@ -712,11 +800,11 @@ g_vfs_afp_connection_finalize (GObject *object) GVfsAfpConnection *afp_connection = (GVfsAfpConnection *)object; GVfsAfpConnectionPrivate *priv = afp_connection->priv; - if (priv->addr) - g_object_unref (priv->addr); - - if (priv->conn) - g_object_unref (priv->conn); + g_clear_object (&priv->addr); + g_clear_object (&priv->stream); + g_clear_object (&priv->read_cancellable); + + g_mutex_clear (&priv->mutex); G_OBJECT_CLASS (g_vfs_afp_connection_parent_class)->finalize (object); } @@ -738,9 +826,6 @@ g_vfs_afp_connection_class_init (GVfsAfpConnectionClass *klass) G_TYPE_NONE, 1, G_TYPE_UINT); } -static void read_reply (GVfsAfpConnection *afp_connection); -static void send_request (GVfsAfpConnection *afp_connection); - static guint16 get_request_id (GVfsAfpConnection *afp_connection) { @@ -749,23 +834,6 @@ get_request_id (GVfsAfpConnection *afp_connection) return priv->request_id++; } -static void -run_loop (GVfsAfpConnection *afp_connection) -{ - GVfsAfpConnectionPrivate *priv = afp_connection->priv; - - if (!priv->send_loop_running) - { - priv->send_loop_running = TRUE; - send_request (afp_connection); - } - if (!priv->read_loop_running) - { - priv->read_loop_running = TRUE; - read_reply (afp_connection); - } -} - typedef struct { void *buffer; @@ -896,8 +964,16 @@ dispatch_reply (GVfsAfpConnection *afp_connection) req_data = g_slice_new0 (RequestData); req_data->type = REQUEST_TYPE_TICKLE; + /* take lock */ + g_mutex_lock (&priv->mutex); g_queue_push_head (priv->request_queue, req_data); - run_loop (afp_connection); + if (!priv->send_loop_running) { + priv->send_loop_running = TRUE; + send_request_unlocked (afp_connection); + } + /* release lock */ + g_mutex_unlock (&priv->mutex); + break; } @@ -928,7 +1004,7 @@ dispatch_reply (GVfsAfpConnection *afp_connection) g_simple_async_result_set_op_res_gpointer (req_data->simple, reply, g_object_unref); - g_simple_async_result_complete (req_data->simple); + g_simple_async_result_complete_in_idle (req_data->simple); g_hash_table_remove (priv->request_hash, GUINT_TO_POINTER ((guint)dsi_header->requestID)); @@ -951,6 +1027,13 @@ read_data_cb (GObject *object, GAsyncResult *res, gpointer user_data) gboolean result; GError *err = NULL; + if (g_atomic_int_get (&priv->atomic_flags) & FLAG_PENDING_CLOSE) + { + if (!priv->send_loop_running) + close_connection (afp_connection); + return; + } + result = read_all_finish (input, res, NULL, &err); if (!result) { @@ -976,6 +1059,13 @@ read_dsi_header_cb (GObject *object, GAsyncResult *res, gpointer user_data) gboolean result; GError *err = NULL; DSIHeader *dsi_header; + + if (g_atomic_int_get (&priv->atomic_flags) & FLAG_PENDING_CLOSE) + { + if (!priv->send_loop_running) + close_connection (afp_conn); + return; + } result = read_all_finish (input, res, NULL, &err); if (!result) @@ -1008,7 +1098,7 @@ read_dsi_header_cb (GObject *object, GAsyncResult *res, gpointer user_data) } read_all_async (input, priv->reply_buf, dsi_header->totalDataLength, - 0, NULL, read_data_cb, afp_conn); + 0, priv->read_cancellable, read_data_cb, afp_conn); return; } @@ -1024,10 +1114,17 @@ read_reply (GVfsAfpConnection *afp_connection) GInputStream *input; - input = g_io_stream_get_input_stream (priv->conn); + if (g_atomic_int_get (&priv->atomic_flags) & FLAG_PENDING_CLOSE) + { + if (!priv->send_loop_running) + close_connection (afp_connection); + return; + } - read_all_async (input, &priv->read_dsi_header, sizeof (DSIHeader), 0, NULL, - read_dsi_header_cb, afp_connection); + input = g_io_stream_get_input_stream (priv->stream); + + read_all_async (input, &priv->read_dsi_header, sizeof (DSIHeader), 0, + priv->read_cancellable, read_dsi_header_cb, afp_connection); } typedef struct @@ -1141,15 +1238,6 @@ write_all_finish (GOutputStream *stream, return TRUE; } -static void -remove_first (GQueue *request_queue) -{ - RequestData *req_data; - - req_data = g_queue_pop_head (request_queue); - free_request_data (req_data); -} - #define HANDLE_RES() { \ gboolean result; \ GError *err = NULL; \ @@ -1160,99 +1248,95 @@ remove_first (GQueue *request_queue) if (req_data->simple) \ { \ g_simple_async_result_set_from_error (req_data->simple, err); \ - g_simple_async_result_complete (req_data->simple); \ + g_simple_async_result_complete_in_idle (req_data->simple); \ } \ \ - remove_first (priv->request_queue); \ g_error_free (err); \ + free_request_data (req_data); \ \ - send_request (afp_conn); \ + g_mutex_lock (&priv->mutex); \ + send_request_unlocked (afp_conn); \ + g_mutex_unlock (&priv->mutex); \ return; \ } \ } - static void write_buf_cb (GObject *object, GAsyncResult *res, gpointer user_data) { GOutputStream *output = G_OUTPUT_STREAM (object); - GVfsAfpConnection *afp_conn = G_VFS_AFP_CONNECTION (user_data); + RequestData *req_data = user_data; + GVfsAfpConnection *afp_conn = req_data->conn; GVfsAfpConnectionPrivate *priv = afp_conn->priv; - - RequestData *req_data; - - req_data = g_queue_peek_head (priv->request_queue); HANDLE_RES (); g_hash_table_insert (priv->request_hash, GUINT_TO_POINTER ((guint)GUINT16_FROM_BE (priv->write_dsi_header.requestID)), req_data); - g_queue_pop_head (priv->request_queue); - send_request (afp_conn); + + g_mutex_lock (&priv->mutex); + send_request_unlocked (afp_conn); + g_mutex_unlock (&priv->mutex); } static void write_command_cb (GObject *object, GAsyncResult *res, gpointer user_data) { GOutputStream *output = G_OUTPUT_STREAM (object); - GVfsAfpConnection *afp_conn = G_VFS_AFP_CONNECTION (user_data); + RequestData *req_data = user_data; + GVfsAfpConnection *afp_conn = req_data->conn; GVfsAfpConnectionPrivate *priv = afp_conn->priv; - RequestData *req_data; - - req_data = g_queue_peek_head (priv->request_queue); - HANDLE_RES (); if (priv->write_dsi_header.command == DSI_WRITE && req_data->command->buf) { write_all_async (output, req_data->command->buf, req_data->command->buf_size, - 0, NULL, write_buf_cb, afp_conn); + 0, NULL, write_buf_cb, req_data); return; } g_hash_table_insert (priv->request_hash, GUINT_TO_POINTER ((guint)GUINT16_FROM_BE (priv->write_dsi_header.requestID)), req_data); - g_queue_pop_head (priv->request_queue); - send_request (afp_conn); + g_mutex_lock (&priv->mutex); + send_request_unlocked (afp_conn); + g_mutex_unlock (&priv->mutex); } static void write_dsi_header_cb (GObject *object, GAsyncResult *res, gpointer user_data) { GOutputStream *output = G_OUTPUT_STREAM (object); - GVfsAfpConnection *afp_conn = G_VFS_AFP_CONNECTION (user_data); + RequestData *req_data = user_data; + GVfsAfpConnection *afp_conn = req_data->conn; GVfsAfpConnectionPrivate *priv = afp_conn->priv; - RequestData *req_data; - char *data; gsize size; - req_data = g_queue_peek_head (priv->request_queue); - HANDLE_RES (); if (req_data->type == REQUEST_TYPE_TICKLE) { - remove_first (priv->request_queue); - send_request (afp_conn); + g_mutex_lock (&priv->mutex); + send_request_unlocked (afp_conn); + g_mutex_unlock (&priv->mutex); return; } data = g_vfs_afp_command_get_data (req_data->command); size = g_vfs_afp_command_get_size (req_data->command); - write_all_async (output, data, size, 0, NULL, write_command_cb, afp_conn); + write_all_async (output, data, size, 0, NULL, write_command_cb, req_data); } static void -send_request (GVfsAfpConnection *afp_connection) +send_request_unlocked (GVfsAfpConnection *afp_connection) { GVfsAfpConnectionPrivate *priv = afp_connection->priv; @@ -1260,7 +1344,7 @@ send_request (GVfsAfpConnection *afp_connection) guint32 writeOffset; guint8 dsi_command; - while ((req_data = g_queue_peek_head (priv->request_queue))) + while ((req_data = g_queue_pop_head (priv->request_queue))) { if (req_data->cancellable && g_cancellable_is_cancelled (req_data->cancellable)) { @@ -1270,9 +1354,8 @@ send_request (GVfsAfpConnection *afp_connection) g_cancellable_set_error_if_cancelled (req_data->cancellable, &err); g_simple_async_result_take_error (req_data->simple, err); - g_simple_async_result_complete (req_data->simple); + g_simple_async_result_complete_in_idle (req_data->simple); } - remove_first (priv->request_queue); } else break; @@ -1335,9 +1418,28 @@ send_request (GVfsAfpConnection *afp_connection) } - write_all_async (g_io_stream_get_output_stream (priv->conn), + write_all_async (g_io_stream_get_output_stream (priv->stream), &priv->write_dsi_header, sizeof (DSIHeader), 0, - NULL, write_dsi_header_cb, afp_connection); + NULL, write_dsi_header_cb, req_data); +} + +static gboolean +start_send_loop_func (gpointer data) +{ + GVfsAfpConnection *conn = data; + GVfsAfpConnectionPrivate *priv = conn->priv; + + g_mutex_lock (&priv->mutex); + + if (priv->send_loop_running) + goto out; + + priv->send_loop_running = TRUE; + send_request_unlocked (conn); + +out: + g_mutex_unlock (&priv->mutex); + return G_SOURCE_REMOVE; } void @@ -1350,7 +1452,15 @@ g_vfs_afp_connection_send_command (GVfsAfpConnection *afp_connection, { GVfsAfpConnectionPrivate *priv = afp_connection->priv; + GError *err = NULL; RequestData *req_data; + + if (!check_open (afp_connection, &err)) + { + g_simple_async_report_take_gerror_in_idle (G_OBJECT(afp_connection), callback, + user_data, err); + return; + } req_data = g_slice_new0 (RequestData); req_data->type = REQUEST_TYPE_COMMAND; @@ -1360,12 +1470,23 @@ g_vfs_afp_connection_send_command (GVfsAfpConnection *afp_connection, req_data->simple = g_simple_async_result_new (G_OBJECT (afp_connection), callback, user_data, g_vfs_afp_connection_send_command); + req_data->conn = afp_connection; + if (cancellable) req_data->cancellable = g_object_ref (cancellable); + /* Take lock */ + g_mutex_lock (&priv->mutex); + g_queue_push_tail (priv->request_queue, req_data); + if (!priv->send_loop_running) + { + g_main_context_invoke (priv->worker_context, start_send_loop_func, + afp_connection); + } - run_loop (afp_connection); + /* Release lock */ + g_mutex_unlock (&priv->mutex); } GVfsAfpReply * @@ -1443,25 +1564,6 @@ read_reply_sync (GInputStream *input, return TRUE; } -GVfsAfpReply * -g_vfs_afp_connection_read_reply_sync (GVfsAfpConnection *afp_connection, - GCancellable *cancellable, - GError **error) -{ - GVfsAfpConnectionPrivate *priv = afp_connection->priv; - - gboolean res; - char *data; - DSIHeader dsi_header; - - res = read_reply_sync (g_io_stream_get_input_stream (priv->conn), &dsi_header, - &data, cancellable, error); - if (!res) - return NULL; - - return g_vfs_afp_reply_new (dsi_header.errorCode, data, dsi_header.totalDataLength, TRUE); -} - static gboolean send_request_sync (GOutputStream *output, DsiCommand command, @@ -1501,79 +1603,162 @@ send_request_sync (GOutputStream *output, return TRUE; } -gboolean +static void +send_command_sync_cb (GObject *source_object, GAsyncResult *res, gpointer user_data) +{ + SyncData *sync_data = user_data; + + sync_data->data = g_object_ref (res); + sync_data_signal (sync_data); +} + +GVfsAfpReply * g_vfs_afp_connection_send_command_sync (GVfsAfpConnection *afp_connection, - GVfsAfpCommand *afp_command, + GVfsAfpCommand *command, GCancellable *cancellable, GError **error) { - GVfsAfpConnectionPrivate *priv = afp_connection->priv; + SyncData sync_data; + GVfsAfpReply *reply; - DsiCommand dsi_command; - guint16 req_id; - guint32 writeOffset; + if (!check_open (afp_connection, error)) + return FALSE; - /* set dsi_command */ - switch (afp_command->type) - { - case AFP_COMMAND_WRITE: - writeOffset = 8; - dsi_command = DSI_WRITE; - break; - case AFP_COMMAND_WRITE_EXT: - writeOffset = 20; - dsi_command = DSI_WRITE; - break; + sync_data_init (&sync_data, afp_connection, NULL); - default: - writeOffset = 0; - dsi_command = DSI_COMMAND; - break; - } + g_vfs_afp_connection_send_command (afp_connection, command, NULL, + send_command_sync_cb, cancellable, &sync_data); - req_id = get_request_id (afp_connection); - return send_request_sync (g_io_stream_get_output_stream (priv->conn), - dsi_command, req_id, writeOffset, - g_vfs_afp_command_get_size (afp_command), - g_vfs_afp_command_get_data (afp_command), - cancellable, error); + sync_data_wait (&sync_data); + + reply = g_vfs_afp_connection_send_command_finish (afp_connection, sync_data.data, + error); + g_object_unref (sync_data.data); + sync_data_clear (&sync_data); + + return reply; } -gboolean -g_vfs_afp_connection_close_sync (GVfsAfpConnection *afp_connection, - GCancellable *cancellable, - GError **error) +static void +close_connection (GVfsAfpConnection *conn) { - GVfsAfpConnectionPrivate *priv = afp_connection->priv; + GVfsAfpConnectionPrivate *priv = conn->priv; guint16 req_id; gboolean res; + GError *err = NULL; + + GQueue *request_queue; + GSList *pending_closes, *siter; + GHashTable *request_hash; + GHashTableIter iter; + RequestData *req_data; + + /* Take lock */ + g_mutex_lock (&priv->mutex); + + /* Set closed flag */ + g_atomic_int_or (&priv->atomic_flags, FLAG_CLOSED); + + request_queue = priv->request_queue; + priv->request_queue = NULL; + + request_hash = priv->request_hash; + priv->request_hash = NULL; + + pending_closes = priv->pending_closes; + priv->pending_closes = NULL; + + /* Release lock */ + g_mutex_unlock (&priv->mutex); /* close DSI session */ - req_id = get_request_id (afp_connection); - res = send_request_sync (g_io_stream_get_output_stream (priv->conn), + req_id = get_request_id (conn); + res = send_request_sync (g_io_stream_get_output_stream (priv->stream), DSI_CLOSE_SESSION, req_id, 0, 0, NULL, - cancellable, error); + NULL, &err); if (!res) + g_io_stream_close (priv->stream, NULL, NULL); + else + res = g_io_stream_close (priv->stream, NULL, &err); + + g_clear_object (&priv->stream); + +#define REQUEST_DATA_CLOSED(request_data) { \ + g_simple_async_result_set_from_error (req_data->simple, \ + g_error_new_literal (G_IO_ERROR, G_IO_ERROR_CLOSED, "Connection was closed")); \ + \ + g_simple_async_result_complete_in_idle (req_data->simple); \ + free_request_data (req_data); \ +} + + while ((req_data = g_queue_pop_head (request_queue))) { - g_io_stream_close (priv->conn, cancellable, NULL); - g_object_unref (priv->conn); - return FALSE; + REQUEST_DATA_CLOSED (req_data); } - res = g_io_stream_close (priv->conn, cancellable, error); - g_object_unref (priv->conn); + g_hash_table_iter_init (&iter, request_hash); + while (g_hash_table_iter_next (&iter, NULL, (void **)&req_data)) + { + REQUEST_DATA_CLOSED (req_data); + } - return res; +#undef REQUEST_DATA_CLOSED + + for (siter = pending_closes; siter != NULL; siter = siter->next) + { + SyncData *close_data = siter->data; + + close_data->res = TRUE; + sync_data_signal (close_data); + } + g_slist_free (pending_closes); + + /* quit main_loop */ + g_main_loop_quit (priv->worker_loop); + g_main_loop_unref (priv->worker_loop); + g_main_context_unref (priv->worker_context); } gboolean -g_vfs_afp_connection_open_sync (GVfsAfpConnection *afp_connection, - GCancellable *cancellable, - GError **error) +g_vfs_afp_connection_close_sync (GVfsAfpConnection *afp_connection, + GCancellable *cancellable, + GError **error) { GVfsAfpConnectionPrivate *priv = afp_connection->priv; + SyncData close_data; + + /* Take lock */ + g_mutex_lock (&priv->mutex); + + if (!check_open (afp_connection, error)) { + g_mutex_unlock (&priv->mutex); + return FALSE; + } + + sync_data_init (&close_data, afp_connection, error); + priv->pending_closes = g_slist_prepend (priv->pending_closes, &close_data); + + /* Release lock */ + g_mutex_unlock (&priv->mutex); + + g_atomic_int_or (&priv->atomic_flags, FLAG_PENDING_CLOSE); + g_cancellable_cancel (priv->read_cancellable); + + sync_data_wait (&close_data); + + + return close_data.res; +} + +static gpointer +open_thread_func (gpointer user_data) +{ + SyncData *data = user_data; + GVfsAfpConnection *conn = data->conn; + GVfsAfpConnectionPrivate *priv = conn->priv; + GSocketClient *client; guint16 req_id; @@ -1583,23 +1768,24 @@ g_vfs_afp_connection_open_sync (GVfsAfpConnection *afp_connection, guint pos; client = g_socket_client_new (); - priv->conn = G_IO_STREAM (g_socket_client_connect (client, priv->addr, cancellable, error)); + priv->stream = G_IO_STREAM (g_socket_client_connect (client, priv->addr, data->cancellable, + data->error)); g_object_unref (client); - if (!priv->conn) - return FALSE; + if (!priv->stream) + goto out; - req_id = get_request_id (afp_connection); - res = send_request_sync (g_io_stream_get_output_stream (priv->conn), + req_id = get_request_id (conn); + res = send_request_sync (g_io_stream_get_output_stream (priv->stream), DSI_OPEN_SESSION, req_id, 0, 0, NULL, - cancellable, error); + data->cancellable, data->error); if (!res) - return FALSE; + goto out; - res = read_reply_sync (g_io_stream_get_input_stream (priv->conn), - &dsi_header, &reply, cancellable, error); + res = read_reply_sync (g_io_stream_get_input_stream (priv->stream), + &dsi_header, &reply, data->cancellable, data->error); if (!res) - return FALSE; + goto out; pos = 0; while ((dsi_header.totalDataLength - pos) > 2) @@ -1634,7 +1820,57 @@ g_vfs_afp_connection_open_sync (GVfsAfpConnection *afp_connection, } g_free (reply); - return TRUE; +out: + if (res) + g_atomic_int_or (&priv->atomic_flags, FLAG_INITIALIZED); + + /* Signal sync call thread */ + data->res = res; + sync_data_signal (data); + + /* Return from thread on failure */ + if (!res) + { + g_clear_object (&priv->stream); + return NULL; + } + + /* Create MainLoop */ + priv->worker_context = g_main_context_new (); + priv->worker_loop = g_main_loop_new (priv->worker_context, TRUE); + + read_reply (conn); + + /* Run mainloop */ + g_main_loop_run (priv->worker_loop); + + return NULL; +} + +gboolean +g_vfs_afp_connection_open_sync (GVfsAfpConnection *afp_connection, + GCancellable *cancellable, + GError **error) +{ + GVfsAfpConnectionPrivate *priv = afp_connection->priv; + + SyncData data; + + sync_data_init (&data, afp_connection, error); + data.cancellable = cancellable; + + priv->worker_thread = g_thread_create (open_thread_func, &data, + FALSE, error); + if (!priv->worker_thread) + goto out; + + sync_data_wait (&data); + +out: + + sync_data_clear (&data); + + return data.res; } GVfsAfpConnection * |