From a10fdaeeb4421794f073fe348020435aa6dcef4e Mon Sep 17 00:00:00 2001 From: Oleksandr Byelkin Date: Wed, 13 Apr 2016 11:09:38 +0200 Subject: MDEV-9059: draft for integration with client --- include/mysql_com.h | 5 +++- sql/sql_acl.cc | 60 ++++++++++++++++++++++++++++++++++++++++++++---- sql/sql_class.cc | 11 +++++++++ sql/sql_class.h | 2 ++ sql/sql_connect.cc | 29 +++++++++++++++++++---- sql/sql_parse.cc | 39 +++++++++++++++---------------- sql/sql_parse.h | 1 + sql/threadpool_common.cc | 16 ++++++++++++- 8 files changed, 133 insertions(+), 30 deletions(-) diff --git a/include/mysql_com.h b/include/mysql_com.h index 461800f3ce7..65ad3d9f9cd 100644 --- a/include/mysql_com.h +++ b/include/mysql_com.h @@ -256,6 +256,8 @@ enum enum_server_command #define MARIADB_CLIENT_PROGRESS (1ULL << 32) /* support COM_MULTI */ #define MARIADB_CLIENT_COM_MULTI (1ULL << 33) +/* support bundle first command with the authentication packet */ +#define MARIADB_CLIENT_COM_IN_AUTH (1ULL << 34) #ifdef HAVE_COMPRESS #define CAN_CLIENT_COMPRESS CLIENT_COMPRESS @@ -295,7 +297,8 @@ enum enum_server_command CLIENT_SESSION_TRACK |\ CLIENT_DEPRECATE_EOF |\ CLIENT_CONNECT_ATTRS |\ - MARIADB_CLIENT_COM_MULTI) + MARIADB_CLIENT_COM_MULTI |\ + MARIADB_CLIENT_COM_IN_AUTH) /* To be added later: diff --git a/sql/sql_acl.cc b/sql/sql_acl.cc index ab6a8032348..31b792b8af2 100644 --- a/sql/sql_acl.cc +++ b/sql/sql_acl.cc @@ -11687,38 +11687,78 @@ static bool find_mpvio_user(MPVIO_EXT *mpvio) } static bool -read_client_connect_attrs(char **ptr, char *end, CHARSET_INFO *from_cs) +read_bundle_length (size_t *length, char **ptr, char *end) { - size_t length; char *ptr_save= *ptr; /* not enough bytes to hold the length */ if (ptr_save >= end) return true; - length= safe_net_field_length_ll((uchar **) ptr, end - ptr_save); + *length= safe_net_field_length_ll((uchar **) ptr, end - ptr_save); /* cannot even read the length */ if (*ptr == NULL) return true; /* length says there're more data than can fit into the packet */ - if (*ptr + length > end) + if (*ptr + *length > end) + return true; + + return false; +} + +static bool +read_client_connect_attrs(char **ptr, char *end, CHARSET_INFO *from_cs) +{ + size_t length; + + if (read_bundle_length(&length, ptr, end)) return true; /* impose an artificial length limit of 64k */ if (length > 65535) return true; + #ifdef HAVE_PSI_THREAD_INTERFACE if (PSI_THREAD_CALL(set_thread_connect_attrs)(*ptr, length, from_cs) && current_thd->variables.log_warnings) sql_print_warning("Connection attributes of length %lu were truncated", (unsigned long) length); #endif + *ptr+= length; return false; } +static LEX_STRING +read_client_bundle_com(char **ptr, char *end) +{ + LEX_STRING res= {0, packet_error}; + + if (read_bundle_length(&res.length, ptr, end)) + return res; + + if (!res.length) + return res; + + /* do_command add \0 to the end so we need allocate more */ + res.str= (char *)my_malloc(res.length + 1, MYF(MY_WME)); + + if (likely(res.str)) + { + memcpy(res.str, *ptr, res.length); + *ptr+= res.length; + } + else + { + *ptr+= res.length; + res.length= packet_error; + } + + return res; +} + #endif /* the packet format is described in send_change_user_packet() */ @@ -12096,6 +12136,18 @@ static ulong parse_client_handshake_packet(MPVIO_EXT *mpvio, mpvio->thd->charset())) return packet_error; + if (thd->client_capabilities & MARIADB_CLIENT_COM_IN_AUTH) + { + thd->bundle_command= + read_client_bundle_com(&next_field, + ((char *)net->read_pos) + pkt_len); + if (thd->bundle_command.length == packet_error) + { + thd->bundle_command.length= 0; + return packet_error; + } + } + /* if the acl_user needs a different plugin to authenticate (specified in GRANT ... AUTHENTICATED VIA plugin_name ..) diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 6433786a079..8bb7adfd50a 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -1077,6 +1077,8 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) prepare_derived_at_open= FALSE; create_tmp_table_for_derived= FALSE; save_prep_leaf_list= FALSE; + bundle_command.str= NULL; + bundle_command.length= 0; /* Restore THR_THD */ set_current_thd(old_THR_THD); inc_thread_count(); @@ -1473,6 +1475,8 @@ void THD::init(void) #endif //EMBEDDED_LIBRARY apc_target.init(&LOCK_thd_data); + bundle_command.str= NULL; + bundle_command.length= 0; DBUG_VOID_RETURN; } @@ -1582,6 +1586,13 @@ void THD::cleanup(void) DBUG_ENTER("THD::cleanup"); DBUG_ASSERT(cleanup_done == 0); + if (bundle_command.str) + { + my_free(bundle_command.str); + bundle_command.str= 0; + bundle_command.length= 0; + } + killed= KILL_CONNECTION; #ifdef ENABLE_WHEN_BINLOG_WILL_BE_ABLE_TO_PREPARE if (transaction.xid_state.xa_state == XA_PREPARED) diff --git a/sql/sql_class.h b/sql/sql_class.h index 994a161a646..e0dbd1645f0 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -4263,6 +4263,8 @@ public: current_linfo= 0; mysql_mutex_unlock(&LOCK_thread_count); } + /* Auth packet bundle packet */ + LEX_STRING bundle_command; }; inline void add_to_active_threads(THD *thd) diff --git a/sql/sql_connect.cc b/sql/sql_connect.cc index fe6a101f999..a25fc0a6034 100644 --- a/sql/sql_connect.cc +++ b/sql/sql_connect.cc @@ -1294,6 +1294,8 @@ void do_handle_one_connection(CONNECT *connect) { ulonglong thr_create_utime= microsecond_interval_timer(); THD *thd; + bool close_conn= false; + if (connect->scheduler->init_new_connection_thread() || !(thd= connect->create_thd(NULL))) { @@ -1341,18 +1343,37 @@ void do_handle_one_connection(CONNECT *connect) { bool create_user= TRUE; + mysql_socket_set_thread_owner(thd->net.vio->mysql_socket); if (thd_prepare_connection(thd)) { create_user= FALSE; goto end_thread; - } + } - while (thd_is_connection_alive(thd)) + if (thd->bundle_command.str) { + thd->bundle_command.str[thd->bundle_command.length]= '\0'; /* safety */ + + enum enum_server_command command= + fetch_command(thd, thd->bundle_command.str); + + close_conn= dispatch_command(command, thd, thd->bundle_command.str + 1, + (uint) (thd->bundle_command.length - 1), + FALSE, FALSE); + mysql_audit_release(thd); - if (do_command(thd)) - break; + } + + if (!close_conn) + { + while (thd_is_connection_alive(thd)) + { + DBUG_ASSERT(thd->bundle_command.str == NULL); + mysql_audit_release(thd); + if (do_command(thd)) + break; + } } end_connection(thd); diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index f4ac33d77fa..3c58391b290 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -1131,7 +1131,7 @@ void cleanup_items(Item *item) DBUG_VOID_RETURN; } -static enum enum_server_command fetch_command(THD *thd, char *packet) +enum enum_server_command fetch_command(THD *thd, char *packet) { enum enum_server_command command= (enum enum_server_command) (uchar) packet[0]; @@ -1338,25 +1338,6 @@ bool do_command(THD *thd) command= fetch_command(thd, packet); -#ifdef WITH_WSREP - /* - Bail out if DB snapshot has not been installed. - */ - if (!(server_command_flags[command] & CF_SKIP_WSREP_CHECK) && - !wsrep_node_is_ready(thd)) - { - thd->protocol->end_statement(); - - /* Performance Schema Interface instrumentation end. */ - MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da()); - thd->m_statement_psi= NULL; - thd->m_digest= NULL; - - return_value= FALSE; - goto out; - } -#endif - /* Restore read timeout value */ my_net_set_read_timeout(net, thd->variables.net_read_timeout); @@ -1549,6 +1530,24 @@ bool dispatch_command(enum enum_server_command command, THD *thd, command_name[command].str : ""))); bool drop_more_results= 0; +#ifdef WITH_WSREP + /* + Bail out if DB snapshot has not been installed. + */ + if (!(server_command_flags[command] & CF_SKIP_WSREP_CHECK) && + !wsrep_node_is_ready(thd)) + { + thd->protocol->end_statement(); + + /* Performance Schema Interface instrumentation end. */ + MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da()); + thd->m_statement_psi= NULL; + thd->m_digest= NULL; + + DBUG_RETURN (FALSE); + } +#endif + if (!is_com_multi) inc_thread_running(); diff --git a/sql/sql_parse.h b/sql/sql_parse.h index ad29bb2cdd3..10e4a128084 100644 --- a/sql/sql_parse.h +++ b/sql/sql_parse.h @@ -101,6 +101,7 @@ pthread_handler_t handle_bootstrap(void *arg); int mysql_execute_command(THD *thd); bool do_command(THD *thd); void do_handle_bootstrap(THD *thd); +enum enum_server_command fetch_command(THD *thd, char *packet); bool dispatch_command(enum enum_server_command command, THD *thd, char* packet, uint packet_length, bool is_com_multi, bool is_next_command); diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc index 2308f4277d6..b5f0e9133ad 100644 --- a/sql/threadpool_common.cc +++ b/sql/threadpool_common.cc @@ -23,6 +23,7 @@ #include #include #include +#include "sql_parse.h" /* Threadpool parameters */ @@ -46,7 +47,6 @@ static int threadpool_process_request(THD *thd); static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data); extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys); -extern bool do_command(THD*); static inline TP_connection *get_TP_connection(THD *thd) { @@ -255,6 +255,20 @@ static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data) } } } + if (thd && thd->bundle_command.str) + { + thd->bundle_command.str[thd->bundle_command.length]= '\0'; /* safety */ + enum enum_server_command command= + fetch_command(thd, thd->bundle_command.str); + + /* it is not a real error, just QUIT */ + error= dispatch_command(command, thd, thd->bundle_command.str + 1, + (uint) (thd->bundle_command.length - 1), + FALSE, FALSE); + net_flush(&thd->net); + mysql_audit_release(thd); + } + if (error) { threadpool_remove_connection(thd); -- cgit v1.2.1