diff options
author | Oleksandr Byelkin <sanja@mariadb.com> | 2020-06-22 18:21:21 +0200 |
---|---|---|
committer | Oleksandr Byelkin <sanja@mariadb.com> | 2020-10-12 17:07:33 +0200 |
commit | 1df5a92df899d033841d77c34e1512e2b118b2ec (patch) | |
tree | ff7879dbeb9f0afd8ef95cfa4aad0b5ba741ea20 | |
parent | 861cd4ce286e7b41cc38facfc86c358e15161a74 (diff) | |
download | mariadb-git-bb-10.5-MDEV-21916.tar.gz |
MDEV-21916: COM_STMT_BULK_EXECUTE with RETURNING insert wrong valuesbb-10.5-MDEV-21916
To allocate new net buffer to avoid changing bufer we are reading.
-rw-r--r-- | sql/net_serv.cc | 20 | ||||
-rw-r--r-- | sql/protocol.cc | 1 | ||||
-rw-r--r-- | sql/sql_delete.cc | 8 | ||||
-rw-r--r-- | sql/sql_error.cc | 37 | ||||
-rw-r--r-- | sql/sql_error.h | 11 | ||||
-rw-r--r-- | sql/sql_insert.cc | 24 | ||||
-rw-r--r-- | sql/sql_parse.cc | 2 | ||||
-rw-r--r-- | sql/sql_prepare.cc | 46 | ||||
-rw-r--r-- | storage/perfschema/pfs.cc | 2 | ||||
-rw-r--r-- | tests/mysql_client_test.c | 175 |
10 files changed, 294 insertions, 32 deletions
diff --git a/sql/net_serv.cc b/sql/net_serv.cc index a96c43a94fe..1d2409f16cf 100644 --- a/sql/net_serv.cc +++ b/sql/net_serv.cc @@ -179,14 +179,26 @@ my_bool my_net_init(NET *net, Vio *vio, void *thd, uint my_flags) DBUG_RETURN(0); } + +/** + Allocate and assign new net buffer + + @note In case of error the old buffer left + + @retval TRUE error + @retval FALSE success +*/ + my_bool net_allocate_new_packet(NET *net, void *thd, uint my_flags) { + uchar *tmp; DBUG_ENTER("net_allocate_new_packet"); - if (!(net->buff=(uchar*) my_malloc(key_memory_NET_buff, - (size_t) net->max_packet + - NET_HEADER_SIZE + COMP_HEADER_SIZE + 1, - MYF(MY_WME | my_flags)))) + if (!(tmp= (uchar*) my_malloc(key_memory_NET_buff, + (size_t) net->max_packet + + NET_HEADER_SIZE + COMP_HEADER_SIZE + 1, + MYF(MY_WME | my_flags)))) DBUG_RETURN(1); + net->buff= tmp; net->buff_end=net->buff+net->max_packet; net->write_pos=net->read_pos = net->buff; DBUG_RETURN(0); diff --git a/sql/protocol.cc b/sql/protocol.cc index dfab9e50ac9..17e34965a28 100644 --- a/sql/protocol.cc +++ b/sql/protocol.cc @@ -598,6 +598,7 @@ void Protocol::end_statement() thd->get_stmt_da()->get_sqlstate()); break; case Diagnostics_area::DA_EOF: + case Diagnostics_area::DA_EOF_BULK: error= send_eof(thd->server_status, thd->get_stmt_da()->statement_warn_count()); break; diff --git a/sql/sql_delete.cc b/sql/sql_delete.cc index 7280236e43f..5aaff3cf623 100644 --- a/sql/sql_delete.cc +++ b/sql/sql_delete.cc @@ -685,8 +685,14 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds, !table->prepare_triggers_for_delete_stmt_or_event()) will_batch= !table->file->start_bulk_delete(); - if (returning) + /* + thd->get_stmt_da()->is_set() means first iteration of prepared statement + with array binding operation execution (non optimized so it is not + INSERT) + */ + if (returning && !thd->get_stmt_da()->is_set()) { + DBUG_ASSERT(thd->lex->sql_command != SQLCOM_INSERT); if (result->send_result_set_metadata(returning->item_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) goto cleanup; diff --git a/sql/sql_error.cc b/sql/sql_error.cc index b3ef0d89a98..a753af2b34d 100644 --- a/sql/sql_error.cc +++ b/sql/sql_error.cc @@ -372,7 +372,7 @@ Diagnostics_area::set_eof_status(THD *thd) { DBUG_ENTER("set_eof_status"); /* Only allowed to report eof if has not yet reported an error */ - DBUG_ASSERT(! is_set()); + DBUG_ASSERT(!is_set() || (m_status == DA_EOF_BULK && is_bulk_op())); /* In production, refuse to overwrite an error or a custom response with an EOF packet. @@ -380,16 +380,33 @@ Diagnostics_area::set_eof_status(THD *thd) if (unlikely(is_error() || is_disabled())) return; - /* - If inside a stored procedure, do not return the total - number of warnings, since they are not available to the client - anyway. - */ - m_statement_warn_count= (thd->spcont ? - 0 : - current_statement_warn_count()); + if (m_status == DA_EOF_BULK) + { + /* + If inside a stored procedure, do not return the total + number of warnings, since they are not available to the client + anyway. + */ + if (!thd->spcont) + m_statement_warn_count+= current_statement_warn_count(); + } + else + { + /* + If inside a stored procedure, do not return the total + number of warnings, since they are not available to the client + anyway. + */ + if (thd->spcont) + { + m_statement_warn_count= 0; + m_affected_rows= 0; + } + else + m_statement_warn_count= current_statement_warn_count(); + m_status= (is_bulk_op() ? DA_EOF_BULK : DA_EOF); + } - m_status= DA_EOF; DBUG_VOID_RETURN; } diff --git a/sql/sql_error.h b/sql/sql_error.h index a0497af78cb..c4ac88d6414 100644 --- a/sql/sql_error.h +++ b/sql/sql_error.h @@ -960,6 +960,8 @@ public: DA_EOF, /** Set whenever one calls my_ok() in PS bulk mode. */ DA_OK_BULK, + /** Set whenever one calls my_eof() in PS bulk mode. */ + DA_EOF_BULK, /** Set whenever one calls my_error() or my_message(). */ DA_ERROR, /** Set in case of a custom response, such as one from COM_STMT_PREPARE. */ @@ -1019,8 +1021,11 @@ public: enum_diagnostics_status status() const { return m_status; } const char *message() const - { DBUG_ASSERT(m_status == DA_ERROR || m_status == DA_OK || - m_status == DA_OK_BULK); return m_message; } + { + DBUG_ASSERT(m_status == DA_ERROR || m_status == DA_OK || + m_status == DA_OK_BULK || m_status == DA_EOF_BULK); + return m_message; + } bool skip_flush() const { @@ -1055,7 +1060,7 @@ public: uint statement_warn_count() const { DBUG_ASSERT(m_status == DA_OK || m_status == DA_OK_BULK || - m_status == DA_EOF); + m_status == DA_EOF ||m_status == DA_EOF_BULK ); return m_statement_warn_count; } diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index 4ad4c478937..ef2fa954bd6 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -710,6 +710,7 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list, Name_resolution_context *context; Name_resolution_context_state ctx_state; SELECT_LEX *returning= thd->lex->has_returning() ? thd->lex->returning() : 0; + unsigned char *readbuff= NULL; #ifndef EMBEDDED_LIBRARY char *query= thd->query(); @@ -771,7 +772,25 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list, /* Prepares LEX::returing_list if it is not empty */ if (returning) + { result->prepare(returning->item_list, NULL); + if (thd->is_bulk_op()) + { + /* + It is RETURNING which needs network buffer to write result set and + it is array binfing which need network buffer to read parameters. + So we allocate yet another network buffer. + The old buffer will be freed at the end of operation. + */ + DBUG_ASSERT(thd->protocol == &thd->protocol_binary); + readbuff= thd->net.buff; // old buffer + if (net_allocate_new_packet(&thd->net, thd, MYF(MY_THREAD_SPECIFIC))) + { + readbuff= NULL; // failure, net_allocate_new_packet keeps old buffer + goto abort; + } + } + } /* mysql_prepare_insert sets table_list->table if it was not set */ table= table_list->table; @@ -1304,7 +1323,8 @@ values_loop_end: thd->lex->current_select->save_leaf_tables(thd); thd->lex->current_select->first_cond_optimization= 0; } - + if (readbuff) + my_free(readbuff); DBUG_RETURN(FALSE); abort: @@ -1318,6 +1338,8 @@ abort: if (!joins_freed) free_underlaid_joins(thd, thd->lex->first_select_lex()); thd->abort_on_warning= 0; + if (readbuff) + my_free(readbuff); DBUG_RETURN(retval); } diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 4ef2a4c5d46..1d6e0497b63 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -2331,7 +2331,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, size_t next_length_length= packet_start - packet; unsigned char *readbuff= net->buff; - if (net_allocate_new_packet(net, thd, MYF(0))) + if (net_allocate_new_packet(net, thd, MYF(MY_THREAD_SPECIFIC))) break; PSI_statement_locker *save_locker= thd->m_statement_psi; diff --git a/sql/sql_prepare.cc b/sql/sql_prepare.cc index ecb56e70f88..c144d3a8d7e 100644 --- a/sql/sql_prepare.cc +++ b/sql/sql_prepare.cc @@ -888,6 +888,9 @@ static bool insert_bulk_params(Prepared_statement *stmt, case STMT_INDICATOR_IGNORE: param->set_ignore(); break; + default: + DBUG_ASSERT(0); + DBUG_RETURN(1); } } else @@ -4344,6 +4347,7 @@ Prepared_statement::execute_bulk_loop(String *expanded_query, uchar *packet_end_arg) { Reprepare_observer reprepare_observer; + unsigned char *readbuff= NULL; bool error= 0; packet= packet_arg; packet_end= packet_end_arg; @@ -4357,24 +4361,37 @@ Prepared_statement::execute_bulk_loop(String *expanded_query, if (state == Query_arena::STMT_ERROR) { my_message(last_errno, last_error, MYF(0)); - thd->set_bulk_execution(0); - return TRUE; + goto err; } /* Check for non zero parameter count*/ if (param_count == 0) { DBUG_PRINT("error", ("Statement with no parameters for bulk execution.")); my_error(ER_UNSUPPORTED_PS, MYF(0)); - thd->set_bulk_execution(0); - return TRUE; + goto err; } if (!(sql_command_flags[lex->sql_command] & CF_SP_BULK_SAFE)) { DBUG_PRINT("error", ("Command is not supported in bulk execution.")); my_error(ER_UNSUPPORTED_PS, MYF(0)); - thd->set_bulk_execution(0); - return TRUE; + goto err; + } + /* + Here second buffer for not optimized commands, + optimized commands do it inside thier internal loop. + */ + if (!(sql_command_flags[lex->sql_command] & CF_SP_BULK_OPTIMIZED) && + this->lex->has_returning()) + { + // Above check can be true for SELECT in future + DBUG_ASSERT(lex->sql_command != SQLCOM_SELECT); + readbuff= thd->net.buff; // old buffer + if (net_allocate_new_packet(&thd->net, thd, MYF(MY_THREAD_SPECIFIC))) + { + readbuff= NULL; // failure, net_allocate_new_packet keeps old buffer + goto err; + } } #ifndef EMBEDDED_LIBRARY @@ -4386,9 +4403,7 @@ Prepared_statement::execute_bulk_loop(String *expanded_query, { my_error(ER_WRONG_ARGUMENTS, MYF(0), "mysqld_stmt_bulk_execute"); - reset_stmt_params(this); - thd->set_bulk_execution(0); - return true; + goto err; } read_types= FALSE; @@ -4405,8 +4420,7 @@ Prepared_statement::execute_bulk_loop(String *expanded_query, { if (set_bulk_parameters(TRUE)) { - thd->set_bulk_execution(0); - return true; + goto err; } } @@ -4453,8 +4467,16 @@ reexecute: } reset_stmt_params(this); thd->set_bulk_execution(0); - + if (readbuff) + my_free(readbuff); return error; + +err: + reset_stmt_params(this); + thd->set_bulk_execution(0); + if (readbuff) + my_free(readbuff); + return true; } diff --git a/storage/perfschema/pfs.cc b/storage/perfschema/pfs.cc index 1bb712f64cc..3e9198a6b6c 100644 --- a/storage/perfschema/pfs.cc +++ b/storage/perfschema/pfs.cc @@ -5466,6 +5466,7 @@ void pfs_end_statement_v1(PSI_statement_locker *locker, void *stmt_da) switch(da->status()) { case Diagnostics_area::DA_OK_BULK: + case Diagnostics_area::DA_EOF_BULK: case Diagnostics_area::DA_EMPTY: break; case Diagnostics_area::DA_OK: @@ -5706,6 +5707,7 @@ void pfs_end_statement_v1(PSI_statement_locker *locker, void *stmt_da) switch (da->status()) { case Diagnostics_area::DA_OK_BULK: + case Diagnostics_area::DA_EOF_BULK: case Diagnostics_area::DA_EMPTY: break; case Diagnostics_area::DA_OK: diff --git a/tests/mysql_client_test.c b/tests/mysql_client_test.c index 99ae72b2417..ee2dd7fed2b 100644 --- a/tests/mysql_client_test.c +++ b/tests/mysql_client_test.c @@ -20547,6 +20547,179 @@ static void test_bulk_replace() rc= mysql_query(mysql, "DROP TABLE t1"); myquery(rc); } + + +static void test_bulk_insert_returning() +{ + int rc; + MYSQL_STMT *stmt; + MYSQL_BIND bind[2], res_bind[2]; + MYSQL_ROW row; + MYSQL_RES *result; + int i, + id[]= {1, 2, 3, 4}, + val[]= {1, 1, 1, 1}, + count= sizeof(id)/sizeof(id[0]); + unsigned long length[2]; + my_bool is_null[2]; + my_bool error[2]; + int32 res[2]; + + rc= mysql_query(mysql, "DROP TABLE IF EXISTS t1"); + myquery(rc); + rc= mysql_query(mysql, + "CREATE TABLE t1 (id int not null primary key, active int)"); + myquery(rc); + + stmt= mysql_stmt_init(mysql); + rc= mysql_stmt_prepare(stmt, + "insert into t1 values (?, ?) returning id, active", + -1); + check_execute(stmt, rc); + + memset(bind, 0, sizeof(bind)); + bind[0].buffer_type = MYSQL_TYPE_LONG; + bind[0].buffer = (void *)id; + bind[0].buffer_length = 0; + bind[1].buffer_type = MYSQL_TYPE_LONG; + bind[1].buffer = (void *)val; + bind[1].buffer_length = 0; + + mysql_stmt_attr_set(stmt, STMT_ATTR_ARRAY_SIZE, (void*)&count); + rc= mysql_stmt_bind_param(stmt, bind); + check_execute(stmt, rc); + + rc= mysql_stmt_execute(stmt); + myquery(rc); + + memset(bind, 0, sizeof(res_bind)); + for (i= 0; i < 2; i++) + { + res_bind[i].buffer_type= MYSQL_TYPE_LONG; + res_bind[i].buffer= (char *)&res[i]; + res_bind[i].is_null= &is_null[i]; + res_bind[i].length= &length[i]; + res_bind[i].error= &error[i]; + } + rc= mysql_stmt_bind_result(stmt, res_bind); + myquery(rc); + rc= mysql_stmt_store_result(stmt); + myquery(rc); + + i= 0; + while (!mysql_stmt_fetch(stmt)) + { + i++; + DIE_IF(is_null[0] || is_null[1]); + DIE_IF(res[0] != i); + DIE_IF(res[1] != 1); + } + DIE_IF(i != 4); + + mysql_stmt_close(stmt); + + rc= mysql_query(mysql, "SELECT id,active FROM t1"); + myquery(rc); + + result= mysql_store_result(mysql); + mytest(result); + + i= 0; + while ((row= mysql_fetch_row(result))) + { + i++; + DIE_IF(atoi(row[0]) != i); + DIE_IF(atoi(row[1]) != 1); + } + DIE_IF(i != 4); + mysql_free_result(result); + + + rc= mysql_query(mysql, "DROP TABLE t1"); + myquery(rc); +} + +static void test_bulk_delete_returning() +{ + int rc; + MYSQL_STMT *stmt; + MYSQL_BIND bind[2], res_bind[2]; + MYSQL_ROW row; + MYSQL_RES *result; + int i, + id[]= {1, 2, 3, 4}, + count= sizeof(id)/sizeof(id[0]); + unsigned long length[1]; + my_bool is_null[1]; + my_bool error[1]; + int32 res[1]; + + rc= mysql_query(mysql, "DROP TABLE IF EXISTS t1"); + myquery(rc); + rc= mysql_query(mysql, "CREATE TABLE t1 (id int not null primary key)"); + myquery(rc); + rc= mysql_query(mysql, "insert into t1 values (1), (2), (3), (4)"); + myquery(rc); + verify_affected_rows(4); + + stmt= mysql_stmt_init(mysql); + rc= mysql_stmt_prepare(stmt, "DELETE FROM t1 WHERE id=? RETURNING id", -1); + check_execute(stmt, rc); + + memset(bind, 0, sizeof(bind)); + bind[0].buffer_type = MYSQL_TYPE_LONG; + bind[0].buffer = (void *)id; + bind[0].buffer_length = 0; + + mysql_stmt_attr_set(stmt, STMT_ATTR_ARRAY_SIZE, (void*)&count); + rc= mysql_stmt_bind_param(stmt, bind); + check_execute(stmt, rc); + + rc= mysql_stmt_execute(stmt); + myquery(rc); + + memset(bind, 0, sizeof(res_bind)); + res_bind[0].buffer_type= MYSQL_TYPE_LONG; + res_bind[0].buffer= (char *)&res[0]; + res_bind[0].is_null= &is_null[0]; + res_bind[0].length= &length[0]; + res_bind[0].error= &error[0]; + rc= mysql_stmt_bind_result(stmt, res_bind); + myquery(rc); + rc= mysql_stmt_store_result(stmt); + myquery(rc); + + i= 0; + while (!mysql_stmt_fetch(stmt)) + { + i++; + DIE_IF(is_null[0]); + printf("\nXXX %d - %d (%d)", i, res[0], is_null[0]); + DIE_IF(res[0] != i); + } + DIE_IF(i != 4); + + mysql_stmt_close(stmt); + + rc= mysql_query(mysql, "SELECT id FROM t1"); + myquery(rc); + + result= mysql_store_result(mysql); + mytest(result); + + i= 0; + while ((row= mysql_fetch_row(result))) + { + i++; + printf("\nXXX %d %s \n", i, row[0]); + } + DIE_IF(i != 0 ); + mysql_free_result(result); + + + rc= mysql_query(mysql, "DROP TABLE t1"); + myquery(rc); +} #endif @@ -21277,6 +21450,8 @@ static struct my_tests_st my_tests[]= { { "test_bulk_autoinc", test_bulk_autoinc}, { "test_bulk_delete", test_bulk_delete }, { "test_bulk_replace", test_bulk_replace }, + { "test_bulk_insert_returnung", test_bulk_insert_returning }, + { "test_bulk_delete_returning", test_bulk_delete_returning }, #endif { "test_ps_params_in_ctes", test_ps_params_in_ctes }, { "test_explain_meta", test_explain_meta }, |