diff options
-rw-r--r-- | include/mysql.h.pp | 3 | ||||
-rw-r--r-- | include/mysql_com.h | 13 | ||||
-rw-r--r-- | mysql-test/r/mysqld--help.result | 2 | ||||
-rw-r--r-- | mysql-test/suite/sys_vars/r/sysvars_server_embedded.result | 4 | ||||
-rw-r--r-- | mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result | 4 | ||||
-rw-r--r-- | sql/item.cc | 2 | ||||
-rw-r--r-- | sql/item.h | 4 | ||||
-rw-r--r-- | sql/sql_insert.cc | 10 | ||||
-rw-r--r-- | sql/sql_parse.cc | 7 | ||||
-rw-r--r-- | sql/sql_prepare.cc | 222 | ||||
-rw-r--r-- | sql/sql_prepare.h | 3 |
11 files changed, 191 insertions, 83 deletions
diff --git a/include/mysql.h.pp b/include/mysql.h.pp index 517516aeb30..9c5d9fa4303 100644 --- a/include/mysql.h.pp +++ b/include/mysql.h.pp @@ -12,7 +12,8 @@ enum enum_server_command COM_UNIMPLEMENTED, COM_RESET_CONNECTION, COM_MDB_GAP_BEG, - COM_MDB_GAP_END=250, + COM_MDB_GAP_END=249, + COM_STMT_BULK_EXECUTE=250, COM_SLAVE_WORKER=251, COM_SLAVE_IO=252, COM_SLAVE_SQL=253, diff --git a/include/mysql_com.h b/include/mysql_com.h index ace54767b06..0ca40dd0418 100644 --- a/include/mysql_com.h +++ b/include/mysql_com.h @@ -115,7 +115,8 @@ enum enum_server_command COM_RESET_CONNECTION, /* don't forget to update const char *command_name[] in sql_parse.cc */ COM_MDB_GAP_BEG, - COM_MDB_GAP_END=250, + COM_MDB_GAP_END=249, + COM_STMT_BULK_EXECUTE=250, COM_SLAVE_WORKER=251, COM_SLAVE_IO=252, COM_SLAVE_SQL=253, @@ -136,6 +137,13 @@ enum enum_indicator_type STMT_INDICATOR_IGNORE }; +/* + bulk PS flags +*/ +#define STMT_BULK_FLAG_CLIENT_SEND_TYPES 128 +#define STMT_BULK_FLAG_INSERT_ID_REQUEST 64 + + /* sql type stored in .frm files for virtual fields */ #define MYSQL_TYPE_VIRTUAL 245 /* @@ -311,7 +319,8 @@ enum enum_indicator_type CLIENT_SESSION_TRACK |\ CLIENT_DEPRECATE_EOF |\ CLIENT_CONNECT_ATTRS |\ - MARIADB_CLIENT_COM_MULTI) + MARIADB_CLIENT_COM_MULTI |\ + MARIADB_CLIENT_STMT_BULK_OPERATIONS) /* To be added later: diff --git a/mysql-test/r/mysqld--help.result b/mysql-test/r/mysqld--help.result index cfd2ede6ef9..24827da53be 100644 --- a/mysql-test/r/mysqld--help.result +++ b/mysql-test/r/mysqld--help.result @@ -1396,7 +1396,7 @@ performance-schema-max-rwlock-instances -1 performance-schema-max-socket-classes 10 performance-schema-max-socket-instances -1 performance-schema-max-stage-classes 150 -performance-schema-max-statement-classes 187 +performance-schema-max-statement-classes 188 performance-schema-max-table-handles -1 performance-schema-max-table-instances -1 performance-schema-max-thread-classes 50 diff --git a/mysql-test/suite/sys_vars/r/sysvars_server_embedded.result b/mysql-test/suite/sys_vars/r/sysvars_server_embedded.result index 8880c9f8ca9..bf5e267bf44 100644 --- a/mysql-test/suite/sys_vars/r/sysvars_server_embedded.result +++ b/mysql-test/suite/sys_vars/r/sysvars_server_embedded.result @@ -2867,9 +2867,9 @@ READ_ONLY YES COMMAND_LINE_ARGUMENT REQUIRED VARIABLE_NAME PERFORMANCE_SCHEMA_MAX_STATEMENT_CLASSES SESSION_VALUE NULL -GLOBAL_VALUE 187 +GLOBAL_VALUE 188 GLOBAL_VALUE_ORIGIN COMPILE-TIME -DEFAULT_VALUE 187 +DEFAULT_VALUE 188 VARIABLE_SCOPE GLOBAL VARIABLE_TYPE BIGINT UNSIGNED VARIABLE_COMMENT Maximum number of statement instruments. diff --git a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result index 11d6594ee80..8b062075db8 100644 --- a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result +++ b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result @@ -3063,9 +3063,9 @@ READ_ONLY YES COMMAND_LINE_ARGUMENT REQUIRED VARIABLE_NAME PERFORMANCE_SCHEMA_MAX_STATEMENT_CLASSES SESSION_VALUE NULL -GLOBAL_VALUE 187 +GLOBAL_VALUE 188 GLOBAL_VALUE_ORIGIN COMPILE-TIME -DEFAULT_VALUE 187 +DEFAULT_VALUE 188 VARIABLE_SCOPE GLOBAL VARIABLE_TYPE BIGINT UNSIGNED VARIABLE_COMMENT Maximum number of statement instruments. diff --git a/sql/item.cc b/sql/item.cc index afdef1ac57c..8db5cfc6442 100644 --- a/sql/item.cc +++ b/sql/item.cc @@ -3347,7 +3347,7 @@ Item_param::Item_param(THD *thd, uint pos_in_query_arg): state(NO_VALUE), /* Don't pretend to be a literal unless value for this item is set. */ item_type(PARAM_ITEM), - indicators(0), indicator(STMT_INDICATOR_NONE), + indicator(STMT_INDICATOR_NONE), set_param_func(default_set_param_func), m_out_param_info(NULL), /* diff --git a/sql/item.h b/sql/item.h index 42e9fd94331..5ed3e8acb82 100644 --- a/sql/item.h +++ b/sql/item.h @@ -2873,10 +2873,8 @@ public: }; /* - Used for bulk protocol. Indicates if we should expect - indicators byte before value of the parameter + Used for bulk protocol only. */ - my_bool indicators; enum enum_indicator_type indicator; /* diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index 8e5dfb4f69c..24e4c07fdb9 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -697,9 +697,9 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, bool using_bulk_insert= 0; uint value_count; ulong counter = 1; - ulong iteration= 0; + /* counter of iteration in bulk PS operation*/ + ulonglong iteration= 0; ulonglong id; - ulong bulk_iterations= bulk_parameters_iterations(thd); COPY_INFO info; TABLE *table= 0; List_iterator_fast<List_item> its(values_list); @@ -767,7 +767,6 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, DBUG_RETURN(TRUE); value_count= values->elements; - DBUG_ASSERT(bulk_iterations > 0); if (mysql_prepare_insert(thd, table_list, table, fields, values, update_fields, update_values, duplic, &unused_conds, FALSE)) @@ -939,6 +938,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, } do { + DBUG_PRINT("info", ("iteration %llu", iteration)); if (iteration && bulk_parameters_set(thd)) goto abort; @@ -1059,7 +1059,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, } its.rewind(); iteration++; - } while (iteration < bulk_iterations); + } while (bulk_parameters_iterations(thd)); values_loop_end: free_underlaid_joins(thd, &thd->lex->select_lex); @@ -1206,7 +1206,7 @@ values_loop_end: retval= thd->lex->explain->send_explain(thd); goto abort; } - if ((bulk_iterations * values_list.elements) == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) || + if ((iteration * values_list.elements) == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) || !thd->cuted_fields)) { my_ok(thd, info.copied + info.deleted + diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index e97871e3dcf..412cd1dac70 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -391,7 +391,7 @@ const LEX_STRING command_name[257]={ { 0, 0 }, //247 { 0, 0 }, //248 { 0, 0 }, //249 - { 0, 0 }, //250 + { C_STRING_WITH_LEN("Bulk_execute") }, //250 { C_STRING_WITH_LEN("Slave_worker") }, //251 { C_STRING_WITH_LEN("Slave_IO") }, //252 { C_STRING_WITH_LEN("Slave_SQL") }, //253 @@ -1749,6 +1749,11 @@ bool dispatch_command(enum enum_server_command command, THD *thd, } break; } + case COM_STMT_BULK_EXECUTE: + { + mysqld_stmt_bulk_execute(thd, packet, packet_length); + break; + } case COM_STMT_EXECUTE: { mysqld_stmt_execute(thd, packet, packet_length); diff --git a/sql/sql_prepare.cc b/sql/sql_prepare.cc index cd330d8ee26..11274fbbaa2 100644 --- a/sql/sql_prepare.cc +++ b/sql/sql_prepare.cc @@ -164,7 +164,6 @@ public: Server_side_cursor *cursor; uchar *packet; uchar *packet_end; - ulong iterations; uint param_count; uint last_errno; uint flags; @@ -183,7 +182,9 @@ public: */ uint select_number_after_prepare; char last_error[MYSQL_ERRMSG_SIZE]; + my_bool iterations; my_bool start_param; + my_bool read_types; #ifndef EMBEDDED_LIBRARY bool (*set_params)(Prepared_statement *st, uchar *data, uchar *data_end, uchar *read_pos, String *expanded_query); @@ -213,11 +214,10 @@ public: uchar *packet_arg, uchar *packet_end_arg); bool execute_bulk_loop(String *expanded_query, bool open_cursor, - uchar *packet_arg, uchar *packet_end_arg, - ulong iterations); + uchar *packet_arg, uchar *packet_end_arg); bool execute_server_runnable(Server_runnable *server_runnable); my_bool set_bulk_parameters(bool reset); - ulong bulk_iterations(); + bool bulk_iterations() { return iterations; }; /* Destroy this statement */ void deallocate(); bool execute_immediate(const char *query, uint query_length); @@ -935,6 +935,7 @@ static bool insert_params(Prepared_statement *stmt, uchar *null_array, for (Item_param **it= begin; it < end; ++it) { Item_param *param= *it; + param->indicator= STMT_INDICATOR_NONE; // only for bulk parameters if (!param->has_long_data_value()) { if (is_param_null(null_array, (uint) (it - begin))) @@ -979,10 +980,7 @@ static bool insert_bulk_params(Prepared_statement *stmt, param->reset(); if (!param->has_long_data_value()) { - if (param->indicators) - param->indicator= (enum_indicator_type) *((*read_pos)++); - else - param->indicator= STMT_INDICATOR_NONE; + param->indicator= (enum_indicator_type) *((*read_pos)++); if ((*read_pos) > data_end) DBUG_RETURN(1); switch (param->indicator) @@ -993,6 +991,8 @@ static bool insert_bulk_params(Prepared_statement *stmt, param->set_param_func(param, read_pos, (uint) (data_end - (*read_pos))); if (param->has_no_value()) DBUG_RETURN(1); + if (param->convert_str_value(stmt->thd)) + DBUG_RETURN(1); /* out of memory */ break; case STMT_INDICATOR_NULL: param->set_null(); @@ -1011,6 +1011,36 @@ static bool insert_bulk_params(Prepared_statement *stmt, DBUG_RETURN(0); } +static bool set_conversion_functions(Prepared_statement *stmt, + uchar **data, uchar *data_end) +{ + uchar *read_pos= *data; + const uint signed_bit= 1 << 15; + DBUG_ENTER("set_conversion_functions"); + /* + First execute or types altered by the client, setup the + conversion routines for all parameters (one time) + */ + Item_param **it= stmt->param_array; + Item_param **end= it + stmt->param_count; + THD *thd= stmt->thd; + for (; it < end; ++it) + { + ushort typecode; + + if (read_pos >= data_end) + DBUG_RETURN(1); + + typecode= sint2korr(read_pos); + read_pos+= 2; + (**it).unsigned_flag= MY_TEST(typecode & signed_bit); + setup_one_conversion_function(thd, *it, (uchar) (typecode & 0xff)); + } + *data= read_pos; + DBUG_RETURN(0); +} + + static bool setup_conversion_functions(Prepared_statement *stmt, uchar **data, uchar *data_end, bool bulk_protocol= 0) @@ -1024,30 +1054,9 @@ static bool setup_conversion_functions(Prepared_statement *stmt, if (*read_pos++) //types supplied / first execute { - /* - First execute or types altered by the client, setup the - conversion routines for all parameters (one time) - */ - Item_param **it= stmt->param_array; - Item_param **end= it + stmt->param_count; - THD *thd= stmt->thd; - for (; it < end; ++it) - { - ushort typecode; - const uint signed_bit= 1 << 15; - const uint indicators_bit= 1 << 14; - - if (read_pos >= data_end) - DBUG_RETURN(1); - - typecode= sint2korr(read_pos); - read_pos+= 2; - (**it).unsigned_flag= MY_TEST(typecode & signed_bit); - if (bulk_protocol) - (**it).indicators= MY_TEST(typecode & indicators_bit); - setup_one_conversion_function(thd, *it, - (uchar) (typecode & 0xff)); - } + *data= read_pos; + bool res= set_conversion_functions(stmt, data, data_end); + DBUG_RETURN(res); } *data= read_pos; DBUG_RETURN(0); @@ -3032,6 +3041,14 @@ static void reset_stmt_params(Prepared_statement *stmt) } +static void mysql_stmt_execute_common(THD *thd, + ulong stmt_id, + uchar *packet, + uchar *packet_end, + ulong cursor_flags, + bool iteration, + bool types); + /** COM_STMT_EXECUTE handler: execute a previously prepared statement. @@ -3054,20 +3071,91 @@ void mysqld_stmt_execute(THD *thd, char *packet_arg, uint packet_length) uchar *packet= (uchar*)packet_arg; // GCC 4.0.1 workaround ulong stmt_id= uint4korr(packet); ulong flags= (ulong) packet[4]; -#ifndef EMBEDDED_LIBRARY - ulong iterations= uint4korr(packet + 5); -#else - ulong iterations= 0; // no support -#endif + uchar *packet_end= packet + packet_length; + DBUG_ENTER("mysqld_stmt_execute"); + + packet+= 9; /* stmt_id + 5 bytes of flags */ + + mysql_stmt_execute_common(thd, stmt_id, packet, packet_end, flags, FALSE, + FALSE); + DBUG_VOID_RETURN; +} + + +/** + COM_STMT_BULK_EXECUTE handler: execute a previously prepared statement. + + If there are any parameters, then replace parameter markers with the + data supplied from the client, and then execute the statement. + This function uses binary protocol to send a possible result set + to the client. + + @param thd current thread + @param packet_arg parameter types and data, if any + @param packet_length packet length, including the terminator character. + + @return + none: in case of success OK packet or a result set is sent to the + client, otherwise an error message is set in THD. +*/ + +void mysqld_stmt_bulk_execute(THD *thd, char *packet_arg, uint packet_length) +{ + uchar *packet= (uchar*)packet_arg; // GCC 4.0.1 workaround + ulong stmt_id= uint4korr(packet); + uint flags= (uint) uint2korr(packet + 4); + uchar *packet_end= packet + packet_length; + DBUG_ENTER("mysqld_stmt_execute_bulk"); + + if (!(thd->client_capabilities & + MARIADB_CLIENT_STMT_BULK_OPERATIONS)) + { + DBUG_PRINT("error", + ("An attempt to execute bulk operation without support")); + my_error(ER_UNSUPPORTED_PS, MYF(0)); + } + /* Check for implemented parameters */ + if (flags & (~STMT_BULK_FLAG_CLIENT_SEND_TYPES)) + { + DBUG_PRINT("error", ("unsupported bulk execute flags %x", flags)); + my_error(ER_UNSUPPORTED_PS, MYF(0)); + } + + /* stmt id and two bytes of flags */ + packet+= 4 + 2; + mysql_stmt_execute_common(thd, stmt_id, packet, packet_end, 0, TRUE, + (flags & STMT_BULK_FLAG_CLIENT_SEND_TYPES)); + DBUG_VOID_RETURN; +} + + +/** + Common part of prepared statement execution + + @param thd THD handle + @param stmt_id id of the prepared statement + @param paket packet with parameters to bind + @param packet_end pointer to the byte after parameters end + @param cursor_flags cursor flags + @param bulk_op id it bulk operation + @param read_types flag say that types muast been read +*/ + +static void mysql_stmt_execute_common(THD *thd, + ulong stmt_id, + uchar *packet, + uchar *packet_end, + ulong cursor_flags, + bool bulk_op, + bool read_types) +{ /* Query text for binary, general or slow log, if any of them is open */ String expanded_query; - uchar *packet_end= packet + packet_length; Prepared_statement *stmt; Protocol *save_protocol= thd->protocol; bool open_cursor; - DBUG_ENTER("mysqld_stmt_execute"); - - packet+= 9; /* stmt_id + 5 bytes of flags */ + DBUG_ENTER("mysqld_stmt_execute_common"); + DBUG_ASSERT((!read_types) || (read_types && bulk_op)); /* First of all clear possible warnings from the previous command */ thd->reset_for_next_command(); @@ -3079,21 +3167,21 @@ void mysqld_stmt_execute(THD *thd, char *packet_arg, uint packet_length) llstr(stmt_id, llbuf), "mysqld_stmt_execute"); DBUG_VOID_RETURN; } + stmt->read_types= read_types; #if defined(ENABLED_PROFILING) thd->profiling.set_query_source(stmt->query(), stmt->query_length()); #endif DBUG_PRINT("exec_query", ("%s", stmt->query())); - DBUG_PRINT("info",("stmt: %p iterations: %lu", stmt, iterations)); + DBUG_PRINT("info",("stmt: %p bulk_op %d", stmt, bulk_op)); - open_cursor= MY_TEST(flags & (ulong) CURSOR_TYPE_READ_ONLY); + open_cursor= MY_TEST(cursor_flags & (ulong) CURSOR_TYPE_READ_ONLY); thd->protocol= &thd->protocol_binary; - if (iterations <= 1) + if (!bulk_op) stmt->execute_loop(&expanded_query, open_cursor, packet, packet_end); else - stmt->execute_bulk_loop(&expanded_query, open_cursor, packet, packet_end, - iterations); + stmt->execute_bulk_loop(&expanded_query, open_cursor, packet, packet_end); thd->protocol= save_protocol; sp_cache_enforce_limit(thd->sp_proc_cache, stored_program_cache_size); @@ -3600,11 +3688,12 @@ Prepared_statement::Prepared_statement(THD *thd_arg) cursor(0), packet(0), packet_end(0), - iterations(0), param_count(0), last_errno(0), flags((uint) IS_IN_USE), + iterations(0), start_param(0), + read_types(0), m_sql_mode(thd->variables.sql_mode) { init_sql_alloc(&main_mem_root, thd_arg->variables.query_alloc_block_size, @@ -3641,7 +3730,7 @@ void Prepared_statement::setup_set_params() set_params_from_actual_params= insert_params_from_actual_params_with_log; #ifndef EMBEDDED_LIBRARY set_params= insert_params_with_log; - set_bulk_params= insert_bulk_params; // TODO: add binlog support + set_bulk_params= insert_bulk_params; // RBR is on for bulk operation #else //TODO: add bulk support for bulk parameters set_params_data= emb_insert_params_with_log; @@ -4023,7 +4112,7 @@ Prepared_statement::execute_loop(String *expanded_query, Reprepare_observer reprepare_observer; bool error; int reprepare_attempt= 0; - iterations= 0; + iterations= FALSE; /* - In mysql_sql_stmt_execute() we hide all "external" Items @@ -4126,11 +4215,11 @@ my_bool bulk_parameters_set(THD *thd) DBUG_RETURN(FALSE); } -ulong bulk_parameters_iterations(THD *thd) +my_bool bulk_parameters_iterations(THD *thd) { Prepared_statement *stmt= (Prepared_statement *) thd->bulk_param; if (!stmt) - return 1; + return FALSE; return stmt->bulk_iterations(); } @@ -4138,7 +4227,8 @@ ulong bulk_parameters_iterations(THD *thd) my_bool Prepared_statement::set_bulk_parameters(bool reset) { DBUG_ENTER("Prepared_statement::set_bulk_parameters"); - DBUG_PRINT("info", ("iteration: %lu", iterations)); + DBUG_PRINT("info", ("iteration: %d", iterations)); + if (iterations) { #ifndef EMBEDDED_LIBRARY @@ -4152,31 +4242,24 @@ my_bool Prepared_statement::set_bulk_parameters(bool reset) reset_stmt_params(this); DBUG_RETURN(true); } - iterations--; + if (packet >= packet_end) + iterations= FALSE; } start_param= 0; DBUG_RETURN(false); } -ulong Prepared_statement::bulk_iterations() -{ - if (iterations) - return iterations; - return start_param ? 1 : 0; -} - bool Prepared_statement::execute_bulk_loop(String *expanded_query, bool open_cursor, uchar *packet_arg, - uchar *packet_end_arg, - ulong iterations_arg) + uchar *packet_end_arg) { Reprepare_observer reprepare_observer; bool error= 0; packet= packet_arg; packet_end= packet_end_arg; - iterations= iterations_arg; + iterations= TRUE; start_param= true; #ifndef DBUG_OFF Item *free_list_state= thd->free_list; @@ -4190,16 +4273,26 @@ Prepared_statement::execute_bulk_loop(String *expanded_query, thd->set_bulk_execution(0); return TRUE; } + /* Check for non zero parameter count*/ + if (param_count == 0) + { + DBUG_PRINT("error", ("Statement with no parameters for bulk execution.")); + my_error(ER_UNSUPPORTED_PS, MYF(0)); + thd->set_bulk_execution(0); + return TRUE; + } if (!(sql_command_flags[lex->sql_command] & CF_SP_BULK_SAFE)) { + DBUG_PRINT("error", ("Command is not supported in bulk execution.")); my_error(ER_UNSUPPORTED_PS, MYF(0)); thd->set_bulk_execution(0); return TRUE; } #ifndef EMBEDDED_LIBRARY - if (setup_conversion_functions(this, &packet, packet_end, TRUE)) + if (read_types && + set_conversion_functions(this, &packet, packet_end)) #else // bulk parameters are not supported for embedded, so it will an error #endif @@ -4210,6 +4303,7 @@ Prepared_statement::execute_bulk_loop(String *expanded_query, thd->set_bulk_execution(0); return true; } + read_types= FALSE; #ifdef NOT_YET_FROM_MYSQL_5_6 if (unlikely(thd->security_ctx->password_expired && diff --git a/sql/sql_prepare.h b/sql/sql_prepare.h index 820cb43e6d5..203b37b3b26 100644 --- a/sql/sql_prepare.h +++ b/sql/sql_prepare.h @@ -72,6 +72,7 @@ private: void mysqld_stmt_prepare(THD *thd, const char *packet, uint packet_length); void mysqld_stmt_execute(THD *thd, char *packet, uint packet_length); +void mysqld_stmt_execute_bulk(THD *thd, char *packet, uint packet_length); void mysqld_stmt_bulk_execute(THD *thd, char *packet, uint packet_length); void mysqld_stmt_close(THD *thd, char *packet); void mysql_sql_stmt_prepare(THD *thd); @@ -83,7 +84,7 @@ void mysqld_stmt_reset(THD *thd, char *packet); void mysql_stmt_get_longdata(THD *thd, char *pos, ulong packet_length); void reinit_stmt_before_use(THD *thd, LEX *lex); -ulong bulk_parameters_iterations(THD *thd); +my_bool bulk_parameters_iterations(THD *thd); my_bool bulk_parameters_set(THD *thd); /** Execute a fragment of server code in an isolated context, so that |