diff options
author | Ross Lagerwall <rosslagerwall@gmail.com> | 2013-11-05 17:23:21 +0200 |
---|---|---|
committer | Ross Lagerwall <rosslagerwall@gmail.com> | 2013-11-13 17:01:18 +0200 |
commit | 7890d2801a7f3998b336452fadc8ad0d01d06e60 (patch) | |
tree | 55eea8ba7318c89049f8b1dce00a67d2ec194653 | |
parent | 9d90065bad034ab17fe10bb8bde9dd0979d7050a (diff) | |
download | gvfs-7890d2801a7f3998b336452fadc8ad0d01d06e60.tar.gz |
sftp: Implement push support
Implement sftp push support with a sliding window to improve the speed
of sftp uploads.
The implementation is based on the one from the OpenSSH sftp client. It
uses up to 64 outstanding write requests where each request is 32KiB in
size which gives a maximum theoretical bandwidth of 2MiB per RTT.
This patch results in substantial performance improvments, especially
for high-latency links.
Some benchmark figures:
Old behavior:
Copying from local server = 6.1MB/s
Copying from local server with 250ms of RTT latency = 0.249MB/s
Copying many small files with 250ms of RTT latency = 0.93 files per
second
New behavior:
Copying from local server = 12.2MB/s
Copying from local server with 250ms of RTT latency = 6.2MB/s
Copying many small files with 250ms of RTT latency = 1.24 files per
second
OpenSSH sftp client:
Copying from local server = 12.8MB/s
Copying from local server with 250ms of RTT latency = 6.7MB/s
Copying many small files with 250ms of RTT latency = 1.33 files per
second
https://bugzilla.gnome.org/show_bug.cgi?id=523015
-rw-r--r-- | daemon/gvfsbackendsftp.c | 688 |
1 files changed, 688 insertions, 0 deletions
diff --git a/daemon/gvfsbackendsftp.c b/daemon/gvfsbackendsftp.c index 0ae21086..99bfd5a7 100644 --- a/daemon/gvfsbackendsftp.c +++ b/daemon/gvfsbackendsftp.c @@ -62,6 +62,7 @@ #include "gvfsjobenumerate.h" #include "gvfsjobmakedirectory.h" #include "gvfsjobprogress.h" +#include "gvfsjobpush.h" #include "gvfsdaemonprotocol.h" #include "gvfskeyring.h" #include "sftp.h" @@ -4656,6 +4657,692 @@ try_set_attribute (GVfsBackend *backend, return TRUE; } +/* The push sliding window mechanism is based on the one in the OpenSSH sftp + * client. */ + +#define PUSH_BLOCKSIZE 32768 +#define PUSH_MAX_REQUESTS 64 + +typedef struct { + /* Job context */ + GVfsBackendSftp *backend; + GVfsJobPush *op_job; + GVfsJob *job; + + /* Open files */ + DataBuffer *raw_handle; + GInputStream *in; + + /* fstat information */ + goffset size; + guint32 permissions; + + /* state */ + goffset offset; + goffset n_written; + int num_req; + + /* replace data */ + char *tempname; + int temp_count; + + char buffer[PUSH_BLOCKSIZE]; +} SftpPushHandle; + +typedef struct { + SftpPushHandle *handle; + gssize count; +} PushWriteRequest; + +static void +sftp_push_handle_free (SftpPushHandle *handle) +{ + GDataOutputStream *command; + + /* Only free the handle if there are no write requests outstanding and no + * asynchronous reads pending. */ + if (handle->num_req == 0 && (!handle->in || !g_input_stream_has_pending (handle->in))) + { + if (handle->in) + { + g_input_stream_close_async (handle->in, 0, NULL, NULL, NULL); + g_object_unref (handle->in); + } + + /* If raw_handle is non-NULL, it means destination is still open. Close + * it. */ + if (handle->raw_handle) + { + command = new_command_stream (handle->backend, SSH_FXP_CLOSE); + put_data_buffer (command, handle->raw_handle); + queue_command_stream_and_free (handle->backend, command, NULL, handle->job, NULL); + data_buffer_free (handle->raw_handle); + } + + /* If tempname is non-NULL, it means we failed and should delete the temp + * file. */ + if (handle->tempname) + { + command = new_command_stream (handle->backend, SSH_FXP_REMOVE); + put_string (command, handle->tempname); + queue_command_stream_and_free (handle->backend, command, NULL, handle->job, NULL); + + g_free (handle->tempname); + } + + g_object_unref (handle->backend); + g_object_unref (handle->job); + g_slice_free (SftpPushHandle, handle); + } +} + +static void +push_read_cb (GObject *source, GAsyncResult *res, gpointer user_data); + +static void +push_enqueue_request (SftpPushHandle *handle) +{ + g_input_stream_read_async (handle->in, + handle->buffer, + PUSH_BLOCKSIZE, + G_PRIORITY_DEFAULT, + NULL, + push_read_cb, handle); + handle->num_req++; +} + +static void +push_close_moved_file (GVfsBackendSftp *backend, + int reply_type, + GDataInputStream *reply, + guint32 len, + GVfsJob *job, + gpointer user_data) +{ + SftpPushHandle *handle = user_data; + + if (reply_type == SSH_FXP_STATUS) + { + guint32 code = read_status_code (reply); + if (code == SSH_FX_OK) + { + if (handle->op_job->remove_source) + g_unlink (handle->op_job->local_path); + + g_vfs_job_succeeded (job); + } + else + result_from_status_code (job, code, -1, -1); + } + else + g_vfs_job_failed (job, G_IO_ERROR, G_IO_ERROR_FAILED, + _("Invalid reply received")); + + sftp_push_handle_free (handle); +} + +static void +push_close_deleted_file (GVfsBackendSftp *backend, + int reply_type, + GDataInputStream *reply, + guint32 len, + GVfsJob *job, + gpointer user_data) +{ + SftpPushHandle *handle = user_data; + + if (reply_type == SSH_FXP_STATUS) + { + guint32 code = read_status_code (reply); + if (code == SSH_FX_OK) + { + /* The delete completed successfully, now rename. */ + GDataOutputStream *command = new_command_stream (backend, SSH_FXP_RENAME); + put_string (command, handle->tempname); + put_string (command, handle->op_job->destination); + queue_command_stream_and_free (backend, command, push_close_moved_file, job, handle); + + g_free (handle->tempname); + handle->tempname = NULL; + return; + } + else + result_from_status_code (job, code, -1, -1); + } + else + g_vfs_job_failed (job, G_IO_ERROR, G_IO_ERROR_FAILED, + _("Invalid reply received")); + + sftp_push_handle_free (handle); +} + +static void +push_close_delete_or_succeed (SftpPushHandle *handle) +{ + if (handle->tempname) + { + /* If we wrote to a temp file, do delete then rename. */ + GDataOutputStream *command = new_command_stream (handle->backend, SSH_FXP_REMOVE); + put_string (command, handle->op_job->destination); + queue_command_stream_and_free (handle->backend, command, push_close_deleted_file, handle->job, handle); + } + else + { + if (handle->op_job->remove_source) + g_unlink (handle->op_job->local_path); + + g_vfs_job_succeeded (handle->job); + sftp_push_handle_free (handle); + } +} + +static void +push_close_restore_permissions (GVfsBackendSftp *backend, + int reply_type, + GDataInputStream *reply, + guint32 len, + GVfsJob *job, + gpointer user_data) +{ + /* We don't care if setting the permissions succeeded or not. */ + push_close_delete_or_succeed (user_data); +} + +static void +push_close_write_reply (GVfsBackendSftp *backend, + int reply_type, + GDataInputStream *reply, + guint32 len, + GVfsJob *job, + gpointer user_data) +{ + SftpPushHandle *handle = user_data; + + if (reply_type == SSH_FXP_STATUS) + { + guint32 code = read_status_code (reply); + if (code == SSH_FX_OK) + { + if (handle->op_job->flags & G_FILE_COPY_TARGET_DEFAULT_PERMS) + push_close_delete_or_succeed (handle); + else + { + /* Restore the source file's permissions. */ + GDataOutputStream *command = new_command_stream (backend, SSH_FXP_SETSTAT); + put_string (command, handle->tempname ? handle->tempname : handle->op_job->destination); + g_data_output_stream_put_uint32 (command, SSH_FILEXFER_ATTR_PERMISSIONS, NULL, NULL); + g_data_output_stream_put_uint32 (command, handle->permissions, NULL, NULL); + queue_command_stream_and_free (backend, command, push_close_restore_permissions, job, handle); + } + return; + } + else + result_from_status_code (job, code, -1, -1); + } + else + g_vfs_job_failed (job, G_IO_ERROR, G_IO_ERROR_FAILED, + _("Invalid reply received")); + + sftp_push_handle_free (handle); +} + +static void +push_finish (SftpPushHandle *handle) +{ + GDataOutputStream *command = new_command_stream (handle->backend, SSH_FXP_CLOSE); + put_data_buffer (command, handle->raw_handle); + queue_command_stream_and_free (handle->backend, command, push_close_write_reply, handle->job, handle); + + data_buffer_free (handle->raw_handle); + handle->raw_handle = NULL; +} + +static void +push_source_close_cb (GObject *source, GAsyncResult *res, gpointer user_data) +{ + SftpPushHandle *handle = user_data; + + g_input_stream_close_finish (handle->in, res, NULL); + g_clear_object (&handle->in); + + if (g_vfs_job_is_finished (handle->job) || g_vfs_job_is_cancelled (handle->job)) + { + sftp_push_handle_free (handle); + return; + } + + /* If there are no write requests outstanding, we are done. */ + if (handle->num_req == 0) + push_finish(handle); +} + +static void +push_write_reply (GVfsBackendSftp *backend, + int reply_type, + GDataInputStream *reply, + guint32 len, + GVfsJob *job, + gpointer user_data) +{ + PushWriteRequest *request = user_data; + SftpPushHandle *handle = request->handle; + gssize count = request->count; + + g_slice_free (PushWriteRequest, request); + + handle->num_req--; + + if (g_vfs_job_is_finished (job) || g_vfs_job_is_cancelled (job)) + { + sftp_push_handle_free (handle); + return; + } + + if (reply_type == SSH_FXP_STATUS) + { + guint32 code = read_status_code (reply); + + if (code == SSH_FX_OK) + { + handle->n_written += count; + g_vfs_job_progress_callback (handle->n_written, handle->size, job); + + /* Enqueue a read op if the file is still open, and there isn't + * already one pending. */ + if (handle->in && !g_input_stream_has_pending (handle->in)) + push_enqueue_request (handle); + + /* We are done if the file is closed and there are no write requests + * oustanding. */ + if (!handle->in && handle->num_req == 0) + { + push_finish (handle); + return; + } + } + else + result_from_status_code (handle->job, code, -1, -1); + } + else + g_vfs_job_failed (job, G_IO_ERROR, G_IO_ERROR_FAILED, + _("Invalid reply received")); + + sftp_push_handle_free (handle); +} + +static void +push_read_cb (GObject *source, GAsyncResult *res, gpointer user_data) +{ + PushWriteRequest *request; + GDataOutputStream *command; + gssize count; + + SftpPushHandle *handle = user_data; + GError *error = NULL; + + count = g_input_stream_read_finish (handle->in, res, &error); + + if (g_vfs_job_is_finished (handle->job) || g_vfs_job_is_cancelled (handle->job)) + { + g_clear_error (&error); + sftp_push_handle_free (handle); + return; + } + + if (error) + { + g_vfs_job_failed_from_error (handle->job, error); + g_error_free (error); + + sftp_push_handle_free (handle); + return; + } + + if (count == 0) + { + handle->num_req--; + + g_input_stream_close_async (handle->in, + 0, NULL, + push_source_close_cb, handle); + return; + } + + request = g_slice_new (PushWriteRequest); + request->handle = handle; + request->count = count; + + command = new_command_stream (handle->backend, SSH_FXP_WRITE); + put_data_buffer (command, handle->raw_handle); + g_data_output_stream_put_uint64 (command, handle->offset, NULL, NULL); + g_data_output_stream_put_uint32 (command, count, NULL, NULL); + g_output_stream_write_all (G_OUTPUT_STREAM (command), + handle->buffer, count, + NULL, NULL, NULL); + queue_command_stream_and_free (handle->backend, command, push_write_reply, handle->job, request); + handle->offset += count; + + if (handle->num_req < PUSH_MAX_REQUESTS) + push_enqueue_request (handle); +} + +static void push_create_temp (SftpPushHandle *handle); + +static void +push_truncate_original_reply (GVfsBackendSftp *backend, + int reply_type, + GDataInputStream *reply, + guint32 len, + GVfsJob *job, + gpointer user_data) +{ + SftpPushHandle *handle = user_data; + + if (reply_type == SSH_FXP_HANDLE) + { + handle->raw_handle = read_data_buffer (reply); + push_enqueue_request (handle); + } + else if (reply_type == SSH_FXP_STATUS) + result_from_status (job, reply, -1, -1); + else + g_vfs_job_failed (job, G_IO_ERROR, G_IO_ERROR_FAILED, + _("Invalid reply received")); + + sftp_push_handle_free (handle); +} + +static void +push_create_temp_reply (GVfsBackendSftp *backend, + int reply_type, + GDataInputStream *reply, + guint32 len, + GVfsJob *job, + gpointer user_data) +{ + SftpPushHandle *handle = user_data; + + if (reply_type == SSH_FXP_STATUS) + { + guint32 code = read_status_code (reply); + if (code == SSH_FX_PERMISSION_DENIED) + { + /* The temp file creation failed. Try truncating the existing file. */ + GDataOutputStream *command; + + g_free (handle->tempname); + handle->tempname = NULL; + + command = new_command_stream (backend, SSH_FXP_OPEN); + put_string (command, handle->op_job->destination); + g_data_output_stream_put_uint32 (command, SSH_FXF_WRITE|SSH_FXF_CREAT|SSH_FXF_TRUNC, NULL, NULL); + g_data_output_stream_put_uint32 (command, 0, NULL, NULL); + queue_command_stream_and_free (backend, command, push_truncate_original_reply, job, handle); + + return; + } + else if (code == SSH_FX_FAILURE) + { + push_create_temp (handle); + return; + } + else + g_vfs_job_failed (job, G_IO_ERROR, G_IO_ERROR_FAILED, + _("Invalid reply received")); + } + else if (reply_type != SSH_FXP_HANDLE) + g_vfs_job_failed (job, G_IO_ERROR, G_IO_ERROR_FAILED, + _("Invalid reply received")); + else + { + handle->raw_handle = read_data_buffer (reply); + push_enqueue_request (handle); + } + + sftp_push_handle_free (handle); +} + +static void +push_create_temp (SftpPushHandle *handle) +{ + GDataOutputStream *command; + char *dirname; + char basename[] = ".giosaveXXXXXX"; + + /* Write to a temp file and then rename to replace. */ + + handle->temp_count++; + + if (handle->temp_count == 100) + { + g_vfs_job_failed (handle->job, G_IO_ERROR, G_IO_ERROR_FAILED, + _("Unable to create temporary file")); + sftp_push_handle_free (handle); + return; + } + + g_free (handle->tempname); + dirname = g_path_get_dirname (handle->op_job->destination); + random_text (basename + 8); + handle->tempname = g_build_filename (dirname, basename, NULL); + g_free (dirname); + + command = new_command_stream (handle->backend, SSH_FXP_OPEN); + put_string (command, handle->tempname); + g_data_output_stream_put_uint32 (command, SSH_FXF_WRITE|SSH_FXF_CREAT|SSH_FXF_EXCL, NULL, NULL); + g_data_output_stream_put_uint32 (command, 0, NULL, NULL); + queue_command_stream_and_free (handle->backend, command, push_create_temp_reply, handle->job, handle); +} + +static void +push_open_stat_reply (GVfsBackendSftp *backend, + int reply_type, + GDataInputStream *reply, + guint32 len, + GVfsJob *job, + gpointer user_data) +{ + SftpPushHandle *handle = user_data; + + if (reply_type == SSH_FXP_ATTRS) + { + GFileInfo *info; + GFileType type; + + info = g_file_info_new (); + parse_attributes (backend, info, NULL, reply, NULL); + type = g_file_info_get_file_type (info); + g_object_unref (info); + + if (type == G_FILE_TYPE_DIRECTORY) + { + /* We cannot overwrite a directory. */ + if (handle->op_job->flags & G_FILE_COPY_OVERWRITE) + g_vfs_job_failed (job, G_IO_ERROR, G_IO_ERROR_IS_DIRECTORY, + _("File is directory")); + + sftp_push_handle_free (handle); + return; + } + + push_create_temp (handle); + } + else + { + result_from_status (job, reply, -1, -1); + sftp_push_handle_free (handle); + } +} + +static void +push_open_reply (GVfsBackendSftp *backend, + int reply_type, + GDataInputStream *reply, + guint32 len, + GVfsJob *job, + gpointer user_data) +{ + SftpPushHandle *handle = user_data; + + if (reply_type == SSH_FXP_STATUS) + { + guint32 code = read_status_code (reply); + if (code == SSH_FX_NO_SUCH_FILE) + not_dir_or_not_exist_error (backend, job, handle->op_job->destination); + else if (code == SSH_FX_FAILURE) + { + if (handle->op_job->flags & G_FILE_COPY_OVERWRITE) + { + /* The destination probably exists. Let's see if we can overwrite + * it. */ + GDataOutputStream *command = new_command_stream (backend, SSH_FXP_LSTAT); + put_string (command, handle->op_job->destination); + queue_command_stream_and_free (backend, command, push_open_stat_reply, job, handle); + return; + } + else + result_from_status_code (job, code, G_IO_ERROR_EXISTS, -1); + } + else + g_vfs_job_failed (job, G_IO_ERROR, G_IO_ERROR_FAILED, + _("Invalid reply received")); + } + else if (reply_type != SSH_FXP_HANDLE) + g_vfs_job_failed (job, G_IO_ERROR, G_IO_ERROR_FAILED, + _("Invalid reply received")); + else + { + handle->raw_handle = read_data_buffer (reply); + push_enqueue_request (handle); + } + + sftp_push_handle_free (handle); +} + +static void +push_source_fstat_cb (GObject *source, GAsyncResult *res, gpointer user_data) +{ + GFileInputStream *fin = G_FILE_INPUT_STREAM (source); + SftpPushHandle *handle = user_data; + GError *error = NULL; + GFileInfo *info; + GDataOutputStream *command; + + info = g_file_input_stream_query_info_finish (fin, res, &error); + if (info) + { + handle->permissions = g_file_info_get_attribute_uint32 (info, G_FILE_ATTRIBUTE_UNIX_MODE) & 0777; + handle->size = g_file_info_get_size (info); + + command = new_command_stream (handle->backend, SSH_FXP_OPEN); + put_string (command, handle->op_job->destination); + g_data_output_stream_put_uint32 (command, SSH_FXF_WRITE|SSH_FXF_CREAT|SSH_FXF_EXCL, NULL, NULL); + g_data_output_stream_put_uint32 (command, 0, NULL, NULL); + queue_command_stream_and_free (handle->backend, command, push_open_reply, handle->job, handle); + } + else + { + g_vfs_job_failed_from_error (handle->job, error); + g_error_free (error); + sftp_push_handle_free (handle); + } +} + +static void +push_source_open_cb (GObject *source, GAsyncResult *res, gpointer user_data) +{ + GFile *source_file = G_FILE (source); + SftpPushHandle *handle = user_data; + GError *error = NULL; + GFileInputStream *fin; + + fin = g_file_read_finish (source_file, res, &error); + if (fin) + { + handle->in = G_INPUT_STREAM (fin); + + g_file_input_stream_query_info_async (fin, + G_FILE_ATTRIBUTE_STANDARD_SIZE "," + G_FILE_ATTRIBUTE_UNIX_MODE, + 0, NULL, + push_source_fstat_cb, handle); + } + else + { + if (error->domain == G_IO_ERROR && error->code == G_IO_ERROR_IS_DIRECTORY) + { + /* Fall back to default implementation to improve the error message */ + g_vfs_job_failed (handle->job, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, + _("Not supported")); + } + else + g_vfs_job_failed_from_error (handle->job, error); + + g_error_free (error); + sftp_push_handle_free (handle); + } +} + +static void +push_source_lstat_cb (GObject *source, GAsyncResult *res, gpointer user_data) +{ + GFile *source_file = G_FILE (source); + SftpPushHandle *handle = user_data; + GError *error = NULL; + GFileInfo *info; + + info = g_file_query_info_finish (source_file, res, &error); + if (!info) + { + g_vfs_job_failed_from_error (handle->job, error); + g_error_free (error); + sftp_push_handle_free (handle); + return; + } + + if ((handle->op_job->flags & G_FILE_COPY_NOFOLLOW_SYMLINKS) && + g_file_info_get_file_type (info) == G_FILE_TYPE_SYMBOLIC_LINK) + { + /* Fall back to default implementation to copy symlink */ + g_vfs_job_failed (handle->job, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, + _("Not supported")); + sftp_push_handle_free (handle); + return; + } + + g_file_read_async (source_file, 0, NULL, push_source_open_cb, handle); +} + +static gboolean +try_push (GVfsBackend *backend, + GVfsJobPush *op_job, + const char *destination, + const char *local_path, + GFileCopyFlags flags, + gboolean remove_source, + GFileProgressCallback progress_callback, + gpointer progress_callback_data) +{ + GVfsBackendSftp *op_backend = G_VFS_BACKEND_SFTP (backend); + GFile *source; + SftpPushHandle *handle; + + handle = g_slice_new0 (SftpPushHandle); + handle->backend = g_object_ref (op_backend); + handle->job = g_object_ref (G_VFS_JOB (op_job)); + handle->op_job = op_job; + + source = g_file_new_for_path (local_path); + g_file_query_info_async (source, + G_FILE_ATTRIBUTE_STANDARD_TYPE, + G_FILE_QUERY_INFO_NOFOLLOW_SYMLINKS, + 0, NULL, + push_source_lstat_cb, handle); + g_object_unref (source); + + return TRUE; +} + static void g_vfs_backend_sftp_class_init (GVfsBackendSftpClass *klass) { @@ -4690,4 +5377,5 @@ g_vfs_backend_sftp_class_init (GVfsBackendSftpClass *klass) backend_class->try_set_display_name = try_set_display_name; backend_class->try_query_settable_attributes = try_query_settable_attributes; backend_class->try_set_attribute = try_set_attribute; + backend_class->try_push = try_push; } |