diff options
author | Oleksandr Byelkin <sanja@mariadb.com> | 2016-01-05 20:44:45 +0100 |
---|---|---|
committer | Oleksandr Byelkin <sanja@mariadb.com> | 2016-03-18 17:25:29 +0100 |
commit | fd1b7d0f59d720c92135036e288f0a4fd8c5a54d (patch) | |
tree | 6ee2c4188a7c38503e088dbb563d54dc6f6327c9 | |
parent | e5377457d80ddd7ffe7c358c88b7e9985d45b972 (diff) | |
download | mariadb-git-fd1b7d0f59d720c92135036e288f0a4fd8c5a54d.tar.gz |
MDEV-9058: protocol: COM_MULTI command (part 2)
simple COM_MULTI support (no prepared statements chain yet).
-rw-r--r-- | include/mysql_com.h | 5 | ||||
-rw-r--r-- | libmysqld/lib_sql.cc | 2 | ||||
-rw-r--r-- | sql/log_event.cc | 3 | ||||
-rw-r--r-- | sql/net_serv.cc | 22 | ||||
-rw-r--r-- | sql/share/errmsg-utf8.txt | 7 | ||||
-rw-r--r-- | sql/sql_class.h | 4 | ||||
-rw-r--r-- | sql/sql_parse.cc | 206 | ||||
-rw-r--r-- | sql/sql_parse.h | 6 | ||||
-rw-r--r-- | sql/wsrep_sst.cc | 2 |
9 files changed, 217 insertions, 40 deletions
diff --git a/include/mysql_com.h b/include/mysql_com.h index 8a2b405c0be..c13999a1028 100644 --- a/include/mysql_com.h +++ b/include/mysql_com.h @@ -235,6 +235,8 @@ enum enum_server_command #define MARIADB_CLIENT_FLAGS_MASK 0xffffffff00000000ULL /* Client support progress indicator */ #define MARIADB_CLIENT_PROGRESS (1ULL << 32) +/* support COM_MULTI */ +#define MARIADB_CLIENT_COM_MULTI (1ULL << 33) #ifdef HAVE_COMPRESS #define CAN_CLIENT_COMPRESS CLIENT_COMPRESS @@ -271,7 +273,8 @@ enum enum_server_command MARIADB_CLIENT_PROGRESS | \ CLIENT_PLUGIN_AUTH | \ CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA | \ - CLIENT_CONNECT_ATTRS) + CLIENT_CONNECT_ATTRS |\ + MARIADB_CLIENT_COM_MULTI) /* To be added later: diff --git a/libmysqld/lib_sql.cc b/libmysqld/lib_sql.cc index 36164c7a68b..b9fd3ef171c 100644 --- a/libmysqld/lib_sql.cc +++ b/libmysqld/lib_sql.cc @@ -165,7 +165,7 @@ emb_advanced_command(MYSQL *mysql, enum enum_server_command command, arg_length= header_length; } - result= dispatch_command(command, thd, (char *) arg, arg_length); + result= dispatch_command(command, thd, (char *) arg, arg_length, FALSE); thd->cur_data= 0; thd->mysys_var= NULL; diff --git a/sql/log_event.cc b/sql/log_event.cc index e99ef164064..78a351e1a9e 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -4429,7 +4429,8 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, thd->m_digest->reset(thd->m_token_array, max_digest_length); thd->enable_slow_log= thd->variables.sql_log_slow; - mysql_parse(thd, thd->query(), thd->query_length(), &parser_state); + mysql_parse(thd, thd->query(), thd->query_length(), &parser_state, + FALSE); /* Finalize server status flags after executing a statement. */ thd->update_server_status(); log_slow_statement(thd); diff --git a/sql/net_serv.cc b/sql/net_serv.cc index d81c89fe534..f0284462206 100644 --- a/sql/net_serv.cc +++ b/sql/net_serv.cc @@ -121,6 +121,8 @@ extern my_bool thd_net_is_killed(); static my_bool net_write_buff(NET *, const uchar *, ulong); +my_bool net_allocate_new_packet(NET *net, void *thd, uint my_flags); + /** Init with packet info. */ my_bool my_net_init(NET *net, Vio *vio, void *thd, uint my_flags) @@ -129,14 +131,12 @@ my_bool my_net_init(NET *net, Vio *vio, void *thd, uint my_flags) DBUG_PRINT("enter", ("my_flags: %u", my_flags)); net->vio = vio; my_net_local_init(net); /* Set some limits */ - if (!(net->buff=(uchar*) my_malloc((size_t) net->max_packet+ - NET_HEADER_SIZE + COMP_HEADER_SIZE +1, - MYF(MY_WME | my_flags)))) + + if (net_allocate_new_packet(net, thd, my_flags)) DBUG_RETURN(1); - net->buff_end=net->buff+net->max_packet; + net->error=0; net->return_status=0; net->pkt_nr=net->compress_pkt_nr=0; - net->write_pos=net->read_pos = net->buff; net->last_error[0]=0; net->compress=0; net->reading_or_writing=0; net->where_b = net->remain_in_buf=0; @@ -165,6 +165,18 @@ my_bool my_net_init(NET *net, Vio *vio, void *thd, uint my_flags) DBUG_RETURN(0); } +my_bool net_allocate_new_packet(NET *net, void *thd, uint my_flags) +{ + DBUG_ENTER("net_allocate_new_packet"); + if (!(net->buff=(uchar*) my_malloc((size_t) net->max_packet+ + NET_HEADER_SIZE + COMP_HEADER_SIZE +1, + MYF(MY_WME | my_flags)))) + DBUG_RETURN(1); + net->buff_end=net->buff+net->max_packet; + net->write_pos=net->read_pos = net->buff; + DBUG_RETURN(0); +} + void net_end(NET *net) { diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt index 2ef64a86f45..6003a0f44b1 100644 --- a/sql/share/errmsg-utf8.txt +++ b/sql/share/errmsg-utf8.txt @@ -7139,3 +7139,10 @@ ER_KILL_QUERY_DENIED_ERROR ER_NO_EIS_FOR_FIELD eng "Engine-independent statistics are not collected for column '%s'" ukr "Незалежна від типу таблиці статистика не збирається для стовбця '%s'" +ER_COMMULTI_BADCONTEXT 0A000 + eng "COM_MULTI can't return a result set in the given context" + ger "COM_MULTI kann im gegebenen Kontext keine Ergebnismenge zurückgeben" + ukr "COM_MULTI не може повернути результати у цьому контексті" +ER_BAD_COMMAND_IN_MULTI + eng "Command '%s' is not allowed for COM_MULTI" + ukr "Команда '%s' не дозволена для COM_MULTI" diff --git a/sql/sql_class.h b/sql/sql_class.h index 10f47687163..6c73aeec4bb 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -5365,6 +5365,10 @@ public: Do not check that wsrep snapshot is ready before allowing this command */ #define CF_SKIP_WSREP_CHECK (1U << 2) +/** + Do not allow it for COM_MULTI batch +*/ +#define CF_NO_COM_MULTI (1U << 3) /* Inline functions */ diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 988b03b357e..2f3e20a9744 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -110,7 +110,7 @@ #include "wsrep_thd.h" static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length, - Parser_state *parser_state); + Parser_state *parser_state, bool is_next_command); /** @defgroup Runtime_Environment Runtime Environment @@ -494,7 +494,7 @@ void init_update_queries(void) memset(server_command_flags, 0, sizeof(server_command_flags)); server_command_flags[COM_STATISTICS]= CF_SKIP_QUERY_ID | CF_SKIP_QUESTIONS | CF_SKIP_WSREP_CHECK; - server_command_flags[COM_PING]= CF_SKIP_QUERY_ID | CF_SKIP_QUESTIONS | CF_SKIP_WSREP_CHECK; + server_command_flags[COM_PING]= CF_SKIP_QUERY_ID | CF_SKIP_QUESTIONS | CF_SKIP_WSREP_CHECK | CF_NO_COM_MULTI; server_command_flags[COM_QUIT]= CF_SKIP_WSREP_CHECK; server_command_flags[COM_PROCESS_INFO]= CF_SKIP_WSREP_CHECK; @@ -519,7 +519,7 @@ void init_update_queries(void) server_command_flags[COM_STMT_RESET]= CF_SKIP_QUESTIONS | CF_SKIP_WSREP_CHECK; server_command_flags[COM_STMT_EXECUTE]= CF_SKIP_WSREP_CHECK; server_command_flags[COM_STMT_SEND_LONG_DATA]= CF_SKIP_WSREP_CHECK; - server_command_flags[COM_MULTI]= CF_SKIP_WSREP_CHECK; + server_command_flags[COM_MULTI]= CF_SKIP_WSREP_CHECK | CF_NO_COM_MULTI; /* Initialize the sql command flags array. */ memset(sql_command_flags, 0, sizeof(sql_command_flags)); @@ -903,7 +903,7 @@ void execute_init_command(THD *thd, LEX_STRING *init_command, */ save_vio= thd->net.vio; thd->net.vio= 0; - dispatch_command(COM_QUERY, thd, buf, len); + dispatch_command(COM_QUERY, thd, buf, len, FALSE, FALSE); thd->client_capabilities= save_client_capabilities; thd->net.vio= save_vio; @@ -1021,7 +1021,7 @@ static void handle_bootstrap_impl(THD *thd) break; } - mysql_parse(thd, thd->query(), length, &parser_state); + mysql_parse(thd, thd->query(), length, &parser_state, FALSE); bootstrap_error= thd->is_error(); thd->protocol->end_statement(); @@ -1119,6 +1119,23 @@ void cleanup_items(Item *item) DBUG_VOID_RETURN; } +static enum enum_server_command fetch_command(THD *thd, char *packet) +{ + enum enum_server_command + command= (enum enum_server_command) (uchar) packet[0]; + NET *net= &thd->net; + DBUG_ENTER("fetch_command"); + + if (command >= COM_END || + (command >= COM_MDB_GAP_BEG && command <= COM_MDB_GAP_END)) + command= COM_END; // Wrong command + + DBUG_PRINT("info",("Command on %s = %d (%s)", + vio_description(net->vio), command, + command_name[command].str)); + DBUG_RETURN(command); +} + #ifndef EMBEDDED_LIBRARY @@ -1307,15 +1324,8 @@ bool do_command(THD *thd) /* Do not rely on my_net_read, extra safety against programming errors. */ packet[packet_length]= '\0'; /* safety */ - command= (enum enum_server_command) (uchar) packet[0]; - - if (command >= COM_END || - (command >= COM_MDB_GAP_BEG && command <= COM_MDB_GAP_END)) - command= COM_END; // Wrong command - DBUG_PRINT("info",("Command on %s = %d (%s)", - vio_description(net->vio), command, - command_name[command].str)); + command= fetch_command(thd, packet); #ifdef WITH_WSREP /* @@ -1341,7 +1351,8 @@ bool do_command(THD *thd) DBUG_ASSERT(packet_length); DBUG_ASSERT(!thd->apc_target.is_enabled()); - return_value= dispatch_command(command, thd, packet+1, (uint) (packet_length-1)); + return_value= dispatch_command(command, thd, packet+1, + (uint) (packet_length-1), FALSE, FALSE); #ifdef WITH_WSREP if (WSREP(thd)) { @@ -1359,7 +1370,7 @@ bool do_command(THD *thd) my_charset_latin1.csname); } return_value= dispatch_command(command, thd, thd->wsrep_retry_query, - thd->wsrep_retry_query_len); + thd->wsrep_retry_query_len, FALSE, FALSE); thd->variables.character_set_client = current_charset; } @@ -1451,6 +1462,44 @@ static my_bool deny_updates_if_read_only_option(THD *thd, /** + check COM_MULTI packet + + @param thd thread handle + @param packet pointer on the packet of commands + @param packet_length length of this packet + + @retval 0 - Error + @retval # - Number of commands in the batch +*/ + +uint maria_multi_check(THD *thd, char *packet, uint packet_length) +{ + uint counter= 0; + DBUG_ENTER("maria_multi_check"); + while (packet_length) + { + // length of command + 3 bytes where that length was stored + uint subpacket_length= (uint3korr(packet) + 3); + DBUG_PRINT("info", ("sub-packet length: %d command: %x", + subpacket_length, packet[3])); + + if (subpacket_length == 3 || + subpacket_length > packet_length) + { + my_message(ER_UNKNOWN_COM_ERROR, ER_THD(thd, ER_UNKNOWN_COM_ERROR), + MYF(0)); + DBUG_RETURN(0); + } + + counter++; + packet+= subpacket_length; + packet_length-= subpacket_length; + } + DBUG_RETURN(counter); +} + + +/** Perform one connection-level (COM_XXXX) command. @param command type of command to perform @@ -1459,6 +1508,8 @@ static my_bool deny_updates_if_read_only_option(THD *thd, @param packet_length length of packet + 1 (to show that data is null-terminated) except for COM_SLEEP, where it can be zero. + @param is_com_multi recursive call from COM_MULTI + @param is_next_command there will be more command in the COM_MULTI batch @todo set thd->lex->sql_command to SQLCOM_END here. @@ -1472,15 +1523,22 @@ static my_bool deny_updates_if_read_only_option(THD *thd, COM_QUIT/COM_SHUTDOWN */ bool dispatch_command(enum enum_server_command command, THD *thd, - char* packet, uint packet_length) + char* packet, uint packet_length, bool is_com_multi, + bool is_next_command) { NET *net= &thd->net; bool error= 0; bool do_end_of_statement= true; DBUG_ENTER("dispatch_command"); - DBUG_PRINT("info", ("command: %d", command)); + DBUG_PRINT("info", ("command: %d %s", command, + (command_name[command].str != 0 ? + command_name[command].str : + "<?>"))); + bool drop_more_results= 0; + + if (!is_com_multi) + inc_thread_running(); - inc_thread_running(); /* keep it withing 1 byte */ compile_time_assert(COM_END == 255); @@ -1572,6 +1630,13 @@ bool dispatch_command(enum enum_server_command command, THD *thd, beginning of each command. */ thd->server_status&= ~SERVER_STATUS_CLEAR_SET; + if (is_next_command) + { + drop_more_results= !MY_TEST(thd->server_status & + SERVER_MORE_RESULTS_EXISTS); + thd->server_status|= SERVER_MORE_RESULTS_EXISTS; + } + switch (command) { case COM_INIT_DB: { @@ -1720,9 +1785,11 @@ bool dispatch_command(enum enum_server_command command, THD *thd, break; if (WSREP_ON) - wsrep_mysql_parse(thd, thd->query(), thd->query_length(), &parser_state); + wsrep_mysql_parse(thd, thd->query(), thd->query_length(), &parser_state, + is_next_command); else - mysql_parse(thd, thd->query(), thd->query_length(), &parser_state); + mysql_parse(thd, thd->query(), thd->query_length(), &parser_state, + is_next_command); while (!thd->killed && (parser_state.m_lip.found_semicolon != NULL) && ! thd->is_error()) @@ -1807,9 +1874,11 @@ bool dispatch_command(enum enum_server_command command, THD *thd, /* TODO: set thd->lex->sql_command to SQLCOM_END here */ if (WSREP_ON) - wsrep_mysql_parse(thd, beginning_of_next_stmt, length, &parser_state); + wsrep_mysql_parse(thd, beginning_of_next_stmt, length, &parser_state, + is_next_command); else - mysql_parse(thd, beginning_of_next_stmt, length, &parser_state); + mysql_parse(thd, beginning_of_next_stmt, length, &parser_state, + is_next_command); } @@ -1861,6 +1930,9 @@ bool dispatch_command(enum enum_server_command command, THD *thd, } packet= arg_end + 1; thd->reset_for_next_command(); + // thd->reset_for_next_command reset state => restore it + if (is_next_command) + thd->server_status|= SERVER_MORE_RESULTS_EXISTS; lex_start(thd); /* Must be before we init the table list. */ if (lower_case_table_names) @@ -2139,6 +2211,71 @@ bool dispatch_command(enum enum_server_command command, THD *thd, general_log_print(thd, command, NullS); my_eof(thd); break; + case COM_MULTI: + { + uint counter; + uint current_com= 0; + DBUG_ASSERT(!is_com_multi); + if (!(thd->client_capabilities & CLIENT_MULTI_RESULTS)) + { + /* The client does not support multiple result sets being sent back */ + my_error(ER_COMMULTI_BADCONTEXT, MYF(0)); + break; + } + + if (!(counter= maria_multi_check(thd, packet, packet_length))) + break; + + { + /* We have to store next length because it will be destroyed by '\0' */ + uint next_subpacket_length= uint3korr(packet); + unsigned char *readbuff= net->buff; + unsigned long readbuff_max_packet= net->max_packet; + + if (net_allocate_new_packet(net, thd, MYF(0))) + break; + + while (packet_length) + { + current_com++; + uint subpacket_length= next_subpacket_length + 3; + if (subpacket_length < packet_length) + next_subpacket_length= uint3korr(packet + subpacket_length); + /* safety like in do_command() */ + packet[subpacket_length]= '\0'; + + enum enum_server_command subcommand= fetch_command(thd, (packet + 3)); + + if (server_command_flags[subcommand] & CF_NO_COM_MULTI) + { + my_error(ER_BAD_COMMAND_IN_MULTI, MYF(0), command_name[subcommand]); + goto com_multi_end; + } + + if (dispatch_command(subcommand, thd, packet + (1 + 3), + subpacket_length - (1 + 3), TRUE, + (current_com != counter))) + { + DBUG_ASSERT(thd->is_error()); + goto com_multi_end; + } + + DBUG_ASSERT(subpacket_length <= packet_length); + packet+= subpacket_length; + packet_length-= subpacket_length; + } + +com_multi_end: + /* restore buffer to the original one */ + DBUG_ASSERT(net->buff == net->write_pos); // nothing to send + my_free(net->buff); + net->buff= readbuff; + net->max_packet= readbuff_max_packet; + net->buff_end=net->buff + net->max_packet; + net->write_pos=net->read_pos = net->buff; + } + break; + } case COM_SLEEP: case COM_CONNECT: // Impossible here case COM_TIME: // Impossible from client @@ -2175,9 +2312,14 @@ bool dispatch_command(enum enum_server_command command, THD *thd, thd_proc_info(thd, "updating status"); /* Finalize server status flags after executing a command. */ thd->update_server_status(); - thd->protocol->end_statement(); - query_cache_end_of_result(thd); + if (command != COM_MULTI) + { + thd->protocol->end_statement(); + query_cache_end_of_result(thd); + } } + if (drop_more_results) + thd->server_status&= ~SERVER_MORE_RESULTS_EXISTS; if (!thd->is_error() && !thd->killed_errno()) mysql_audit_general(thd, MYSQL_AUDIT_GENERAL_RESULT, 0, 0); @@ -2201,8 +2343,11 @@ bool dispatch_command(enum enum_server_command command, THD *thd, thd->m_statement_psi= NULL; thd->m_digest= NULL; - dec_thread_running(); - thd->packet.shrink(thd->variables.net_buffer_length); // Reclaim some memory + if (!is_com_multi) + { + dec_thread_running(); + thd->packet.shrink(thd->variables.net_buffer_length); // Reclaim some memory + } free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC)); #if defined(ENABLED_PROFILING) @@ -7388,7 +7533,7 @@ void mysql_init_multi_delete(LEX *lex) } static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length, - Parser_state *parser_state) + Parser_state *parser_state, bool is_next_command) { #ifdef WITH_WSREP bool is_autocommit= @@ -7407,7 +7552,7 @@ static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length, MYSQL_SET_STATEMENT_TEXT(thd->m_statement_psi, thd->query(), thd->query_length()); } - mysql_parse(thd, rawbuf, length, parser_state); + mysql_parse(thd, rawbuf, length, parser_state, is_next_command); if (WSREP(thd)) { /* wsrep BF abort in query exec phase */ @@ -7505,10 +7650,11 @@ static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length, @param length Length of the query text @param[out] found_semicolon For multi queries, position of the character of the next query in the query text. + @param is_next_command there will be more command in the COM_MULTI batch */ void mysql_parse(THD *thd, char *rawbuf, uint length, - Parser_state *parser_state) + Parser_state *parser_state, bool is_next_command) { int error __attribute__((unused)); DBUG_ENTER("mysql_parse"); @@ -7532,6 +7678,8 @@ void mysql_parse(THD *thd, char *rawbuf, uint length, */ lex_start(thd); thd->reset_for_next_command(); + if (is_next_command) + thd->server_status|= SERVER_MORE_RESULTS_EXISTS; if (query_cache_send_result_to_client(thd, rawbuf, length) <= 0) { diff --git a/sql/sql_parse.h b/sql/sql_parse.h index 6cb49f267d2..53a9ed3b24c 100644 --- a/sql/sql_parse.h +++ b/sql/sql_parse.h @@ -35,6 +35,7 @@ enum enum_mysql_completiontype { extern "C" int test_if_data_home_dir(const char *dir); int error_if_data_home_dir(const char *path, const char *what); +my_bool net_allocate_new_packet(NET *net, void *thd, uint my_flags); bool multi_update_precheck(THD *thd, TABLE_LIST *tables); bool multi_delete_precheck(THD *thd, TABLE_LIST *tables); @@ -87,7 +88,7 @@ bool is_log_table_write_query(enum enum_sql_command command); bool alloc_query(THD *thd, const char *packet, uint packet_length); void mysql_init_select(LEX *lex); void mysql_parse(THD *thd, char *rawbuf, uint length, - Parser_state *parser_state); + Parser_state *parser_state, bool is_com_multi); bool mysql_new_select(LEX *lex, bool move_down); void create_select_for_variable(const char *var_name); void create_table_set_open_action_and_adjust_tables(LEX *lex); @@ -99,7 +100,8 @@ int mysql_execute_command(THD *thd); bool do_command(THD *thd); void do_handle_bootstrap(THD *thd); bool dispatch_command(enum enum_server_command command, THD *thd, - char* packet, uint packet_length); + char* packet, uint packet_length, + bool is_com_multi, bool is_next_command); void log_slow_statement(THD *thd); bool append_file_to_dir(THD *thd, const char **filename_ptr, const char *table_name); diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc index c75f2c116ec..3bb23cf4601 100644 --- a/sql/wsrep_sst.cc +++ b/sql/wsrep_sst.cc @@ -907,7 +907,7 @@ static int run_sql_command(THD *thd, const char *query) return -1; } - mysql_parse(thd, thd->query(), thd->query_length(), &ps); + mysql_parse(thd, thd->query(), thd->query_length(), &ps, FALSE); if (thd->is_error()) { int const err= thd->get_stmt_da()->sql_errno(); |