summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoss Lagerwall <rosslagerwall@gmail.com>2013-11-01 10:03:52 +0200
committerRoss Lagerwall <rosslagerwall@gmail.com>2013-11-18 18:55:22 +0200
commited826fdf386cd0891cbda5c9fc3904d2a5aba03f (patch)
tree74bc37d73a067adb26c2a69aa80bf69fb85aa2d5
parent020d4e0176c82d2d19502b93757235f16b94a451 (diff)
downloadgvfs-ed826fdf386cd0891cbda5c9fc3904d2a5aba03f.tar.gz
sftp: Implement pull support
Implement pull support with a sliding window to improve the speed of sftp downloads. The implementation is based on the one from the OpenSSH sftp client. It uses up to 64 outstanding read requests. The limit of 64 is incremented gradually to prevent overwhelming the server. The file is fstat()ed to determine the size. When the expected size is reached, the maximum number of outstanding requests is reduced to 1. The implementation is complicated by the fact that reads can return short and they can also be serviced out of order. This patch results in substantial performance improvments, especially for high-latency links. Compared to the fallback copy implementation, other performance improvements are achieved by performing the initial lstat()/stat() and open() in parallel, as well as performing the fstat() and initial read requests in parallel. Some benchmark figures: Old behavior: Copying from local server = 6.1MB/s Copying from local server with 250ms of RTT latency = 0.251MB/s Copying many small files with 250ms of RTT latency = 0.64 files per second New behavior: Copying from local server = 13MB/s Copying from local server with 250ms of RTT latency = 6.6MB/s Copying many small files with 250ms of RTT latency = 1.24 files per second OpenSSH sftp client: Copying from local server = 14.2MB/s Copying from local server with 250ms of RTT latency = 6.4MB/s Copying many small files with 250ms of RTT latency = 1.34 files per second https://bugzilla.gnome.org/show_bug.cgi?id=532951
-rw-r--r--daemon/gvfsbackendsftp.c534
1 files changed, 534 insertions, 0 deletions
diff --git a/daemon/gvfsbackendsftp.c b/daemon/gvfsbackendsftp.c
index 36beade8..17ba4da3 100644
--- a/daemon/gvfsbackendsftp.c
+++ b/daemon/gvfsbackendsftp.c
@@ -63,6 +63,7 @@
#include "gvfsjobmakedirectory.h"
#include "gvfsjobprogress.h"
#include "gvfsjobpush.h"
+#include "gvfsjobpull.h"
#include "gvfsdaemonprotocol.h"
#include "gvfskeyring.h"
#include "sftp.h"
@@ -5396,6 +5397,538 @@ try_push (GVfsBackend *backend,
return TRUE;
}
+/* The pull sliding window mechanism is based on the one from the OpenSSH sftp
+ * client. It is complicated because requests can be returned out of order. */
+
+#define PULL_MAX_REQUESTS 64 /* Never have more than this many requests outstanding */
+#define PULL_BLOCKSIZE 32768 /* Request this much data per request */
+#define PULL_SIZE_INCOMPLETE -1 /* Indicates an incomplete fstat() request */
+#define PULL_SIZE_INVALID -2 /* Indicates that no fstat() request is in progress */
+
+typedef struct {
+ /* initial job information */
+ GVfsBackendSftp *backend;
+ GVfsJob *job;
+ GVfsJobPull *op_job;
+ GFile *dest;
+
+ /* Open files */
+ DataBuffer *raw_handle;
+ GOutputStream *output;
+
+ /* fstat information */
+ goffset size;
+ guint32 mode;
+
+ /* state */
+ goffset offset;
+ goffset n_written;
+ int num_req; /* Number of outstanding read requests */
+ int max_req; /* Current maximum number of outstanding read requests */
+ GList *queued_writes;
+} SftpPullHandle;
+
+typedef struct {
+ SftpPullHandle *handle;
+ guint32 request_len; /* number of bytes requested */
+ guint64 request_offset; /* offset of requested bytes */
+ gssize response_len; /* number of bytes returned */
+ gssize write_offset; /* offset in buffer of bytes written so far */
+ char *buffer;
+} PullRequest;
+
+static void
+pull_enqueue_next_request (SftpPullHandle *handle);
+
+static void
+pull_enqueue_request (SftpPullHandle *handle, guint64 offset, guint32 len);
+
+static void
+pull_try_start_write (SftpPullHandle *handle);
+
+static void
+pull_request_free (PullRequest *request)
+{
+ if (request->buffer)
+ g_slice_free1 (request->response_len, request->buffer);
+ g_slice_free (PullRequest, request);
+}
+
+static void
+sftp_pull_handle_free (SftpPullHandle *handle)
+{
+ if (handle->size != PULL_SIZE_INCOMPLETE && /* fstat complete */
+ (!handle->output || !g_output_stream_has_pending (handle->output)) && /* no writes outstanding */
+ handle->num_req == 0) /* no reads oustanding */
+ {
+ if (handle->raw_handle)
+ {
+ GDataOutputStream *command = new_command_stream (handle->backend, SSH_FXP_CLOSE);
+ put_data_buffer (command, handle->raw_handle);
+ queue_command_stream_and_free (handle->backend, command, NULL, handle->job, NULL);
+ data_buffer_free (handle->raw_handle);
+ }
+ g_clear_object (&handle->output);
+ g_object_unref(handle->backend);
+ g_object_unref(handle->op_job);
+ g_object_unref(handle->dest);
+ g_list_free_full (handle->queued_writes, (GDestroyNotify)pull_request_free);
+ g_slice_free (SftpPullHandle, handle);
+ }
+}
+
+static void
+pull_remove_source_reply (GVfsBackendSftp *backend,
+ int reply_type,
+ GDataInputStream *reply,
+ guint32 len,
+ GVfsJob *job,
+ gpointer user_data)
+{
+ if (reply_type == SSH_FXP_STATUS)
+ result_from_status (job, reply, -1, -1);
+ else
+ g_vfs_job_failed (job, G_IO_ERROR, G_IO_ERROR_FAILED,
+ _("Invalid reply received"));
+}
+
+static void
+pull_set_perms_cb (GObject *source, GAsyncResult *res, gpointer user_data)
+{
+ SftpPullHandle *handle = user_data;
+
+ if (handle->op_job->remove_source)
+ {
+ GDataOutputStream *command = new_command_stream (handle->backend, SSH_FXP_REMOVE);
+ put_string (command, handle->op_job->source);
+ queue_command_stream_and_free (handle->backend,
+ command,
+ pull_remove_source_reply,
+ handle->job,
+ NULL);
+ }
+ else
+ g_vfs_job_succeeded (handle->job);
+
+ sftp_pull_handle_free (handle);
+}
+
+static void
+pull_close_cb (GObject *source, GAsyncResult *res, gpointer user_data)
+{
+ SftpPullHandle *handle = user_data;
+ GError *error = NULL;
+
+ if (g_output_stream_close_finish(handle->output, res, &error))
+ {
+ g_vfs_job_progress_callback (handle->n_written, handle->n_written, handle->job);
+
+ if (handle->size >= 0 && !(handle->op_job->flags & G_FILE_COPY_TARGET_DEFAULT_PERMS))
+ {
+ GFileInfo *info = g_file_info_new ();
+ g_file_info_set_attribute_uint32 (info,
+ G_FILE_ATTRIBUTE_UNIX_MODE,
+ handle->mode);
+ g_file_set_attributes_async (handle->dest,
+ info,
+ G_FILE_QUERY_INFO_NONE,
+ G_PRIORITY_DEFAULT,
+ NULL,
+ pull_set_perms_cb, handle);
+ g_object_unref (info);
+ return;
+ }
+
+ if (handle->op_job->remove_source)
+ {
+ GDataOutputStream *command = new_command_stream (handle->backend, SSH_FXP_REMOVE);
+ put_string (command, handle->op_job->source);
+ queue_command_stream_and_free (handle->backend,
+ command,
+ pull_remove_source_reply,
+ handle->job,
+ NULL);
+ }
+ else
+ g_vfs_job_succeeded (handle->job);
+ }
+ else
+ {
+ g_vfs_job_failed_from_error (handle->job, error);
+ g_error_free (error);
+ }
+
+ sftp_pull_handle_free (handle);
+}
+
+static void
+pull_try_finish (SftpPullHandle *handle)
+{
+ if (handle->max_req == 0 && /* received EOF */
+ handle->size != PULL_SIZE_INCOMPLETE && /* fstat complete */
+ !g_output_stream_has_pending (handle->output) && /* no writes outstanding */
+ handle->num_req == 0) /* no reads oustanding */
+ {
+ g_output_stream_close_async (handle->output,
+ G_PRIORITY_DEFAULT,
+ NULL,
+ pull_close_cb, handle);
+ }
+}
+
+static void
+pull_write_cb (GObject *source, GAsyncResult *res, gpointer user_data)
+{
+ PullRequest *request = user_data;
+ SftpPullHandle *handle = request->handle;
+ GError *error = NULL;
+ gssize n_written;
+
+ n_written = g_output_stream_write_finish (handle->output, res, &error);
+ if (n_written == -1)
+ {
+ g_vfs_job_failed_from_error (handle->job, error);
+ g_error_free (error);
+ pull_request_free (request);
+ sftp_pull_handle_free (handle);
+ return;
+ }
+
+ handle->n_written += n_written;
+ request->write_offset += n_written;
+
+ /* If we didn't write everything, do another write */
+ if (request->write_offset < request->response_len)
+ {
+ g_output_stream_write_async (handle->output,
+ request->buffer + request->write_offset,
+ request->response_len - request->write_offset,
+ G_PRIORITY_DEFAULT,
+ NULL,
+ pull_write_cb, request);
+ return;
+ }
+
+ if (handle->size >= 0)
+ g_vfs_job_progress_callback (handle->n_written, handle->size, handle->job);
+
+ pull_try_start_write (handle);
+
+ /* If we read short, issue another request for the remaining data. */
+ if (request->response_len < request->request_len)
+ pull_enqueue_request (handle,
+ request->request_offset + request->response_len,
+ request->request_len - request->response_len);
+ else if (handle->max_req == 0)
+ pull_try_finish (handle);
+ else
+ {
+ /* Once we have requested past the estimated EOF, request one at a
+ * time. Otherwise try increase the number of concurrent requests. */
+ if (handle->offset > handle->size)
+ handle->max_req = 1;
+ else if (handle->max_req < PULL_MAX_REQUESTS)
+ handle->max_req++;
+
+ while (handle->num_req < handle->max_req)
+ pull_enqueue_next_request (handle);
+ }
+
+ pull_request_free (request);
+}
+
+static void
+pull_try_start_write (SftpPullHandle *handle)
+{
+ GError *error = NULL;
+ PullRequest *request;
+
+ if (g_output_stream_has_pending (handle->output))
+ return;
+
+ if (!handle->queued_writes)
+ return;
+
+ request = handle->queued_writes->data;
+ handle->queued_writes = g_list_delete_link (handle->queued_writes, handle->queued_writes);
+
+ if (!g_seekable_seek (G_SEEKABLE (handle->output),
+ request->request_offset, G_SEEK_SET,
+ NULL, &error))
+ {
+ g_vfs_job_failed_from_error (handle->job, error);
+ g_error_free (error);
+ pull_request_free (request);
+ sftp_pull_handle_free (handle);
+ return;
+ }
+
+ g_output_stream_write_async (handle->output,
+ request->buffer, request->response_len,
+ G_PRIORITY_DEFAULT,
+ NULL,
+ pull_write_cb, request);
+}
+
+static void
+pull_read_reply (GVfsBackendSftp *backend,
+ int reply_type,
+ GDataInputStream *reply,
+ guint32 len,
+ GVfsJob *job,
+ gpointer user_data)
+{
+ PullRequest *request = user_data;
+ SftpPullHandle *handle = request->handle;
+
+ handle->num_req--;
+
+ if (g_vfs_job_is_finished (job) || g_vfs_job_is_cancelled (job))
+ {
+ }
+ else if (reply_type == SSH_FXP_STATUS)
+ {
+ guint32 code = read_status_code (reply);
+ if (code == SSH_FX_EOF)
+ {
+ pull_request_free (request);
+ handle->max_req = 0;
+ pull_try_finish (handle);
+ return;
+ }
+ else
+ result_from_status_code (job, code, -1, -1);
+ }
+ else if (reply_type != SSH_FXP_DATA)
+ {
+ g_vfs_job_failed (job, G_IO_ERROR, G_IO_ERROR_FAILED,
+ _("Invalid reply received"));
+ }
+ else
+ {
+ request->response_len = g_data_input_stream_read_uint32 (reply, NULL, NULL);
+ request->buffer = g_slice_alloc (request->response_len);
+
+ if (g_input_stream_read_all (G_INPUT_STREAM (reply),
+ request->buffer, request->response_len,
+ NULL, NULL, NULL))
+ {
+ handle->queued_writes = g_list_append (handle->queued_writes, request);
+ pull_try_start_write (handle);
+ return;
+ }
+ else
+ g_vfs_job_failed (job, G_IO_ERROR, G_IO_ERROR_FAILED,
+ _("Invalid reply received"));
+ }
+
+ pull_request_free (request);
+ sftp_pull_handle_free (handle);
+}
+
+static void
+pull_enqueue_request (SftpPullHandle *handle, guint64 offset, guint32 len)
+{
+ PullRequest *request;
+ GDataOutputStream *command;
+
+ request = g_slice_new0 (PullRequest);
+ request->handle = handle;
+ request->request_len = len;
+ request->request_offset = offset;
+
+ command = new_command_stream (handle->backend, SSH_FXP_READ);
+ put_data_buffer (command, handle->raw_handle);
+ g_data_output_stream_put_uint64 (command, offset, NULL, NULL);
+ g_data_output_stream_put_uint32 (command, len, NULL, NULL);
+ queue_command_stream_and_free (handle->backend, command, pull_read_reply, handle->job, request);
+
+ handle->num_req++;
+}
+
+static void
+pull_enqueue_next_request (SftpPullHandle *handle)
+{
+ pull_enqueue_request (handle, handle->offset, PULL_BLOCKSIZE);
+ handle->offset += PULL_BLOCKSIZE;
+}
+
+static void
+pull_fstat_reply (GVfsBackendSftp *backend,
+ int reply_type,
+ GDataInputStream *reply,
+ guint32 len,
+ GVfsJob *job,
+ gpointer user_data)
+{
+ SftpPullHandle *handle = user_data;
+
+ if (g_vfs_job_is_finished (job) || g_vfs_job_is_cancelled (job))
+ {
+ handle->size = PULL_SIZE_INVALID;
+ sftp_pull_handle_free (handle);
+ return;
+ }
+
+ if (reply_type == SSH_FXP_ATTRS)
+ {
+ GFileInfo *info = g_file_info_new ();
+ parse_attributes (backend, info, NULL, reply, NULL);
+ handle->size = g_file_info_get_size (info);
+ handle->mode = g_file_info_get_attribute_uint32 (info,
+ G_FILE_ATTRIBUTE_UNIX_MODE);
+ g_object_unref (info);
+ }
+ else
+ handle->size = PULL_SIZE_INVALID;
+
+ pull_try_finish (handle);
+}
+
+static void
+pull_dest_open_cb (GObject *source, GAsyncResult *res, gpointer user_data)
+{
+ SftpPullHandle *handle = user_data;
+ GError *error = NULL;
+
+ if (handle->op_job->flags & G_FILE_COPY_OVERWRITE)
+ handle->output = G_OUTPUT_STREAM (g_file_replace_finish (handle->dest,
+ res,
+ &error));
+ else
+ handle->output = G_OUTPUT_STREAM (g_file_create_finish (handle->dest,
+ res,
+ &error));
+ if (handle->output)
+ {
+ /* Do an fstat() to find out the size and mode of the file. */
+ GDataOutputStream *command = new_command_stream (handle->backend,
+ SSH_FXP_FSTAT);
+ put_data_buffer (command, handle->raw_handle);
+ queue_command_stream_and_free (handle->backend,
+ command,
+ pull_fstat_reply,
+ handle->job,
+ handle);
+ handle->size = PULL_SIZE_INCOMPLETE;
+
+ while (handle->num_req < handle->max_req)
+ pull_enqueue_next_request (handle);
+ }
+ else
+ {
+ g_vfs_job_failed_from_error (handle->job, error);
+ g_error_free (error);
+ sftp_pull_handle_free (handle);
+ }
+}
+
+static void
+pull_open_reply (GVfsBackendSftp *backend,
+ MultiReply *replies,
+ int n_replies,
+ GVfsJob *job,
+ gpointer user_data)
+{
+ SftpPullHandle *handle = user_data;
+
+ if (replies[0].type == SSH_FXP_ATTRS)
+ {
+ GFileType type;
+ GFileInfo *info = g_file_info_new ();
+
+ parse_attributes (backend, info, NULL, replies[0].data, NULL);
+ type = g_file_info_get_file_type (info);
+ g_object_unref (info);
+
+ if (type != G_FILE_TYPE_REGULAR)
+ {
+ /* Fall back to default implementation to copy non-regular files */
+ g_vfs_job_failed (job, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
+ _("Not supported"));
+ }
+ else if (replies[1].type == SSH_FXP_STATUS)
+ result_from_status (job, replies[1].data, -1, -1);
+ else if (replies[1].type != SSH_FXP_HANDLE)
+ g_vfs_job_failed (job, G_IO_ERROR, G_IO_ERROR_FAILED,
+ _("Invalid reply received"));
+ else
+ {
+ /* We got a valid file handle. */
+ handle->raw_handle = read_data_buffer (replies[1].data);
+
+ if (handle->op_job->flags & G_FILE_COPY_OVERWRITE)
+ g_file_replace_async (handle->dest,
+ NULL,
+ handle->op_job->flags & G_FILE_COPY_BACKUP ? TRUE : FALSE,
+ G_FILE_CREATE_REPLACE_DESTINATION,
+ G_PRIORITY_DEFAULT,
+ NULL,
+ pull_dest_open_cb, handle);
+ else
+ g_file_create_async (handle->dest,
+ G_FILE_CREATE_NONE,
+ G_PRIORITY_DEFAULT,
+ NULL,
+ pull_dest_open_cb, handle);
+ return;
+ }
+ }
+ else if (replies[0].type == SSH_FXP_STATUS)
+ result_from_status (job, replies[0].data, -1, -1);
+ else
+ g_vfs_job_failed (job,
+ G_IO_ERROR, G_IO_ERROR_FAILED,
+ "%s", _("Invalid reply received"));
+
+ /* If we got a file handle, store it. It will be closed when the
+ * SftpPushHandle is freed. */
+ if (replies[1].type == SSH_FXP_HANDLE && !handle->raw_handle)
+ handle->raw_handle = read_data_buffer (replies[1].data);
+
+ sftp_pull_handle_free (handle);
+}
+
+static gboolean
+try_pull (GVfsBackend *backend,
+ GVfsJobPull *job,
+ const char *source,
+ const char *local_path,
+ GFileCopyFlags flags,
+ gboolean remove_source,
+ GFileProgressCallback progress_callback,
+ gpointer progress_callback_data)
+{
+ GVfsBackendSftp *op_backend = G_VFS_BACKEND_SFTP (backend);
+ SftpPullHandle *handle;
+ GDataOutputStream *commands[2];
+
+ handle = g_slice_new0 (SftpPullHandle);
+ handle->backend = g_object_ref (op_backend);
+ handle->op_job = g_object_ref (job);
+ handle->job = G_VFS_JOB (job);
+ handle->dest = g_file_new_for_path (local_path);
+ handle->size = PULL_SIZE_INVALID;
+ handle->max_req = 1;
+
+ commands[0] = new_command_stream (op_backend,
+ flags & G_FILE_COPY_NOFOLLOW_SYMLINKS ? SSH_FXP_LSTAT : SSH_FXP_STAT);
+ put_string (commands[0], source);
+
+ commands[1] = new_command_stream (op_backend, SSH_FXP_OPEN);
+ put_string (commands[1], source);
+ g_data_output_stream_put_uint32 (commands[1], SSH_FXF_READ, NULL, NULL);
+ g_data_output_stream_put_uint32 (commands[1], 0, NULL, NULL);
+
+ queue_command_streams_and_free (op_backend,
+ commands, 2,
+ pull_open_reply,
+ G_VFS_JOB(job),
+ handle);
+
+ return TRUE;
+}
+
static void
g_vfs_backend_sftp_class_init (GVfsBackendSftpClass *klass)
{
@@ -5431,4 +5964,5 @@ g_vfs_backend_sftp_class_init (GVfsBackendSftpClass *klass)
backend_class->try_query_settable_attributes = try_query_settable_attributes;
backend_class->try_set_attribute = try_set_attribute;
backend_class->try_push = try_push;
+ backend_class->try_pull = try_pull;
}