summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOleksandr Byelkin <sanja@mariadb.com>2020-06-22 18:21:21 +0200
committerOleksandr Byelkin <sanja@mariadb.com>2020-10-12 17:07:33 +0200
commit1df5a92df899d033841d77c34e1512e2b118b2ec (patch)
treeff7879dbeb9f0afd8ef95cfa4aad0b5ba741ea20
parent861cd4ce286e7b41cc38facfc86c358e15161a74 (diff)
downloadmariadb-git-bb-10.5-MDEV-21916.tar.gz
MDEV-21916: COM_STMT_BULK_EXECUTE with RETURNING insert wrong valuesbb-10.5-MDEV-21916
To allocate new net buffer to avoid changing bufer we are reading.
-rw-r--r--sql/net_serv.cc20
-rw-r--r--sql/protocol.cc1
-rw-r--r--sql/sql_delete.cc8
-rw-r--r--sql/sql_error.cc37
-rw-r--r--sql/sql_error.h11
-rw-r--r--sql/sql_insert.cc24
-rw-r--r--sql/sql_parse.cc2
-rw-r--r--sql/sql_prepare.cc46
-rw-r--r--storage/perfschema/pfs.cc2
-rw-r--r--tests/mysql_client_test.c175
10 files changed, 294 insertions, 32 deletions
diff --git a/sql/net_serv.cc b/sql/net_serv.cc
index a96c43a94fe..1d2409f16cf 100644
--- a/sql/net_serv.cc
+++ b/sql/net_serv.cc
@@ -179,14 +179,26 @@ my_bool my_net_init(NET *net, Vio *vio, void *thd, uint my_flags)
DBUG_RETURN(0);
}
+
+/**
+ Allocate and assign new net buffer
+
+ @note In case of error the old buffer left
+
+ @retval TRUE error
+ @retval FALSE success
+*/
+
my_bool net_allocate_new_packet(NET *net, void *thd, uint my_flags)
{
+ uchar *tmp;
DBUG_ENTER("net_allocate_new_packet");
- if (!(net->buff=(uchar*) my_malloc(key_memory_NET_buff,
- (size_t) net->max_packet +
- NET_HEADER_SIZE + COMP_HEADER_SIZE + 1,
- MYF(MY_WME | my_flags))))
+ if (!(tmp= (uchar*) my_malloc(key_memory_NET_buff,
+ (size_t) net->max_packet +
+ NET_HEADER_SIZE + COMP_HEADER_SIZE + 1,
+ MYF(MY_WME | my_flags))))
DBUG_RETURN(1);
+ net->buff= tmp;
net->buff_end=net->buff+net->max_packet;
net->write_pos=net->read_pos = net->buff;
DBUG_RETURN(0);
diff --git a/sql/protocol.cc b/sql/protocol.cc
index dfab9e50ac9..17e34965a28 100644
--- a/sql/protocol.cc
+++ b/sql/protocol.cc
@@ -598,6 +598,7 @@ void Protocol::end_statement()
thd->get_stmt_da()->get_sqlstate());
break;
case Diagnostics_area::DA_EOF:
+ case Diagnostics_area::DA_EOF_BULK:
error= send_eof(thd->server_status,
thd->get_stmt_da()->statement_warn_count());
break;
diff --git a/sql/sql_delete.cc b/sql/sql_delete.cc
index 7280236e43f..5aaff3cf623 100644
--- a/sql/sql_delete.cc
+++ b/sql/sql_delete.cc
@@ -685,8 +685,14 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds,
!table->prepare_triggers_for_delete_stmt_or_event())
will_batch= !table->file->start_bulk_delete();
- if (returning)
+ /*
+ thd->get_stmt_da()->is_set() means first iteration of prepared statement
+ with array binding operation execution (non optimized so it is not
+ INSERT)
+ */
+ if (returning && !thd->get_stmt_da()->is_set())
{
+ DBUG_ASSERT(thd->lex->sql_command != SQLCOM_INSERT);
if (result->send_result_set_metadata(returning->item_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
goto cleanup;
diff --git a/sql/sql_error.cc b/sql/sql_error.cc
index b3ef0d89a98..a753af2b34d 100644
--- a/sql/sql_error.cc
+++ b/sql/sql_error.cc
@@ -372,7 +372,7 @@ Diagnostics_area::set_eof_status(THD *thd)
{
DBUG_ENTER("set_eof_status");
/* Only allowed to report eof if has not yet reported an error */
- DBUG_ASSERT(! is_set());
+ DBUG_ASSERT(!is_set() || (m_status == DA_EOF_BULK && is_bulk_op()));
/*
In production, refuse to overwrite an error or a custom response
with an EOF packet.
@@ -380,16 +380,33 @@ Diagnostics_area::set_eof_status(THD *thd)
if (unlikely(is_error() || is_disabled()))
return;
- /*
- If inside a stored procedure, do not return the total
- number of warnings, since they are not available to the client
- anyway.
- */
- m_statement_warn_count= (thd->spcont ?
- 0 :
- current_statement_warn_count());
+ if (m_status == DA_EOF_BULK)
+ {
+ /*
+ If inside a stored procedure, do not return the total
+ number of warnings, since they are not available to the client
+ anyway.
+ */
+ if (!thd->spcont)
+ m_statement_warn_count+= current_statement_warn_count();
+ }
+ else
+ {
+ /*
+ If inside a stored procedure, do not return the total
+ number of warnings, since they are not available to the client
+ anyway.
+ */
+ if (thd->spcont)
+ {
+ m_statement_warn_count= 0;
+ m_affected_rows= 0;
+ }
+ else
+ m_statement_warn_count= current_statement_warn_count();
+ m_status= (is_bulk_op() ? DA_EOF_BULK : DA_EOF);
+ }
- m_status= DA_EOF;
DBUG_VOID_RETURN;
}
diff --git a/sql/sql_error.h b/sql/sql_error.h
index a0497af78cb..c4ac88d6414 100644
--- a/sql/sql_error.h
+++ b/sql/sql_error.h
@@ -960,6 +960,8 @@ public:
DA_EOF,
/** Set whenever one calls my_ok() in PS bulk mode. */
DA_OK_BULK,
+ /** Set whenever one calls my_eof() in PS bulk mode. */
+ DA_EOF_BULK,
/** Set whenever one calls my_error() or my_message(). */
DA_ERROR,
/** Set in case of a custom response, such as one from COM_STMT_PREPARE. */
@@ -1019,8 +1021,11 @@ public:
enum_diagnostics_status status() const { return m_status; }
const char *message() const
- { DBUG_ASSERT(m_status == DA_ERROR || m_status == DA_OK ||
- m_status == DA_OK_BULK); return m_message; }
+ {
+ DBUG_ASSERT(m_status == DA_ERROR || m_status == DA_OK ||
+ m_status == DA_OK_BULK || m_status == DA_EOF_BULK);
+ return m_message;
+ }
bool skip_flush() const
{
@@ -1055,7 +1060,7 @@ public:
uint statement_warn_count() const
{
DBUG_ASSERT(m_status == DA_OK || m_status == DA_OK_BULK ||
- m_status == DA_EOF);
+ m_status == DA_EOF ||m_status == DA_EOF_BULK );
return m_statement_warn_count;
}
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index 4ad4c478937..ef2fa954bd6 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -710,6 +710,7 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list,
Name_resolution_context *context;
Name_resolution_context_state ctx_state;
SELECT_LEX *returning= thd->lex->has_returning() ? thd->lex->returning() : 0;
+ unsigned char *readbuff= NULL;
#ifndef EMBEDDED_LIBRARY
char *query= thd->query();
@@ -771,7 +772,25 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list,
/* Prepares LEX::returing_list if it is not empty */
if (returning)
+ {
result->prepare(returning->item_list, NULL);
+ if (thd->is_bulk_op())
+ {
+ /*
+ It is RETURNING which needs network buffer to write result set and
+ it is array binfing which need network buffer to read parameters.
+ So we allocate yet another network buffer.
+ The old buffer will be freed at the end of operation.
+ */
+ DBUG_ASSERT(thd->protocol == &thd->protocol_binary);
+ readbuff= thd->net.buff; // old buffer
+ if (net_allocate_new_packet(&thd->net, thd, MYF(MY_THREAD_SPECIFIC)))
+ {
+ readbuff= NULL; // failure, net_allocate_new_packet keeps old buffer
+ goto abort;
+ }
+ }
+ }
/* mysql_prepare_insert sets table_list->table if it was not set */
table= table_list->table;
@@ -1304,7 +1323,8 @@ values_loop_end:
thd->lex->current_select->save_leaf_tables(thd);
thd->lex->current_select->first_cond_optimization= 0;
}
-
+ if (readbuff)
+ my_free(readbuff);
DBUG_RETURN(FALSE);
abort:
@@ -1318,6 +1338,8 @@ abort:
if (!joins_freed)
free_underlaid_joins(thd, thd->lex->first_select_lex());
thd->abort_on_warning= 0;
+ if (readbuff)
+ my_free(readbuff);
DBUG_RETURN(retval);
}
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 4ef2a4c5d46..1d6e0497b63 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -2331,7 +2331,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
size_t next_length_length= packet_start - packet;
unsigned char *readbuff= net->buff;
- if (net_allocate_new_packet(net, thd, MYF(0)))
+ if (net_allocate_new_packet(net, thd, MYF(MY_THREAD_SPECIFIC)))
break;
PSI_statement_locker *save_locker= thd->m_statement_psi;
diff --git a/sql/sql_prepare.cc b/sql/sql_prepare.cc
index ecb56e70f88..c144d3a8d7e 100644
--- a/sql/sql_prepare.cc
+++ b/sql/sql_prepare.cc
@@ -888,6 +888,9 @@ static bool insert_bulk_params(Prepared_statement *stmt,
case STMT_INDICATOR_IGNORE:
param->set_ignore();
break;
+ default:
+ DBUG_ASSERT(0);
+ DBUG_RETURN(1);
}
}
else
@@ -4344,6 +4347,7 @@ Prepared_statement::execute_bulk_loop(String *expanded_query,
uchar *packet_end_arg)
{
Reprepare_observer reprepare_observer;
+ unsigned char *readbuff= NULL;
bool error= 0;
packet= packet_arg;
packet_end= packet_end_arg;
@@ -4357,24 +4361,37 @@ Prepared_statement::execute_bulk_loop(String *expanded_query,
if (state == Query_arena::STMT_ERROR)
{
my_message(last_errno, last_error, MYF(0));
- thd->set_bulk_execution(0);
- return TRUE;
+ goto err;
}
/* Check for non zero parameter count*/
if (param_count == 0)
{
DBUG_PRINT("error", ("Statement with no parameters for bulk execution."));
my_error(ER_UNSUPPORTED_PS, MYF(0));
- thd->set_bulk_execution(0);
- return TRUE;
+ goto err;
}
if (!(sql_command_flags[lex->sql_command] & CF_SP_BULK_SAFE))
{
DBUG_PRINT("error", ("Command is not supported in bulk execution."));
my_error(ER_UNSUPPORTED_PS, MYF(0));
- thd->set_bulk_execution(0);
- return TRUE;
+ goto err;
+ }
+ /*
+ Here second buffer for not optimized commands,
+ optimized commands do it inside thier internal loop.
+ */
+ if (!(sql_command_flags[lex->sql_command] & CF_SP_BULK_OPTIMIZED) &&
+ this->lex->has_returning())
+ {
+ // Above check can be true for SELECT in future
+ DBUG_ASSERT(lex->sql_command != SQLCOM_SELECT);
+ readbuff= thd->net.buff; // old buffer
+ if (net_allocate_new_packet(&thd->net, thd, MYF(MY_THREAD_SPECIFIC)))
+ {
+ readbuff= NULL; // failure, net_allocate_new_packet keeps old buffer
+ goto err;
+ }
}
#ifndef EMBEDDED_LIBRARY
@@ -4386,9 +4403,7 @@ Prepared_statement::execute_bulk_loop(String *expanded_query,
{
my_error(ER_WRONG_ARGUMENTS, MYF(0),
"mysqld_stmt_bulk_execute");
- reset_stmt_params(this);
- thd->set_bulk_execution(0);
- return true;
+ goto err;
}
read_types= FALSE;
@@ -4405,8 +4420,7 @@ Prepared_statement::execute_bulk_loop(String *expanded_query,
{
if (set_bulk_parameters(TRUE))
{
- thd->set_bulk_execution(0);
- return true;
+ goto err;
}
}
@@ -4453,8 +4467,16 @@ reexecute:
}
reset_stmt_params(this);
thd->set_bulk_execution(0);
-
+ if (readbuff)
+ my_free(readbuff);
return error;
+
+err:
+ reset_stmt_params(this);
+ thd->set_bulk_execution(0);
+ if (readbuff)
+ my_free(readbuff);
+ return true;
}
diff --git a/storage/perfschema/pfs.cc b/storage/perfschema/pfs.cc
index 1bb712f64cc..3e9198a6b6c 100644
--- a/storage/perfschema/pfs.cc
+++ b/storage/perfschema/pfs.cc
@@ -5466,6 +5466,7 @@ void pfs_end_statement_v1(PSI_statement_locker *locker, void *stmt_da)
switch(da->status())
{
case Diagnostics_area::DA_OK_BULK:
+ case Diagnostics_area::DA_EOF_BULK:
case Diagnostics_area::DA_EMPTY:
break;
case Diagnostics_area::DA_OK:
@@ -5706,6 +5707,7 @@ void pfs_end_statement_v1(PSI_statement_locker *locker, void *stmt_da)
switch (da->status())
{
case Diagnostics_area::DA_OK_BULK:
+ case Diagnostics_area::DA_EOF_BULK:
case Diagnostics_area::DA_EMPTY:
break;
case Diagnostics_area::DA_OK:
diff --git a/tests/mysql_client_test.c b/tests/mysql_client_test.c
index 99ae72b2417..ee2dd7fed2b 100644
--- a/tests/mysql_client_test.c
+++ b/tests/mysql_client_test.c
@@ -20547,6 +20547,179 @@ static void test_bulk_replace()
rc= mysql_query(mysql, "DROP TABLE t1");
myquery(rc);
}
+
+
+static void test_bulk_insert_returning()
+{
+ int rc;
+ MYSQL_STMT *stmt;
+ MYSQL_BIND bind[2], res_bind[2];
+ MYSQL_ROW row;
+ MYSQL_RES *result;
+ int i,
+ id[]= {1, 2, 3, 4},
+ val[]= {1, 1, 1, 1},
+ count= sizeof(id)/sizeof(id[0]);
+ unsigned long length[2];
+ my_bool is_null[2];
+ my_bool error[2];
+ int32 res[2];
+
+ rc= mysql_query(mysql, "DROP TABLE IF EXISTS t1");
+ myquery(rc);
+ rc= mysql_query(mysql,
+ "CREATE TABLE t1 (id int not null primary key, active int)");
+ myquery(rc);
+
+ stmt= mysql_stmt_init(mysql);
+ rc= mysql_stmt_prepare(stmt,
+ "insert into t1 values (?, ?) returning id, active",
+ -1);
+ check_execute(stmt, rc);
+
+ memset(bind, 0, sizeof(bind));
+ bind[0].buffer_type = MYSQL_TYPE_LONG;
+ bind[0].buffer = (void *)id;
+ bind[0].buffer_length = 0;
+ bind[1].buffer_type = MYSQL_TYPE_LONG;
+ bind[1].buffer = (void *)val;
+ bind[1].buffer_length = 0;
+
+ mysql_stmt_attr_set(stmt, STMT_ATTR_ARRAY_SIZE, (void*)&count);
+ rc= mysql_stmt_bind_param(stmt, bind);
+ check_execute(stmt, rc);
+
+ rc= mysql_stmt_execute(stmt);
+ myquery(rc);
+
+ memset(bind, 0, sizeof(res_bind));
+ for (i= 0; i < 2; i++)
+ {
+ res_bind[i].buffer_type= MYSQL_TYPE_LONG;
+ res_bind[i].buffer= (char *)&res[i];
+ res_bind[i].is_null= &is_null[i];
+ res_bind[i].length= &length[i];
+ res_bind[i].error= &error[i];
+ }
+ rc= mysql_stmt_bind_result(stmt, res_bind);
+ myquery(rc);
+ rc= mysql_stmt_store_result(stmt);
+ myquery(rc);
+
+ i= 0;
+ while (!mysql_stmt_fetch(stmt))
+ {
+ i++;
+ DIE_IF(is_null[0] || is_null[1]);
+ DIE_IF(res[0] != i);
+ DIE_IF(res[1] != 1);
+ }
+ DIE_IF(i != 4);
+
+ mysql_stmt_close(stmt);
+
+ rc= mysql_query(mysql, "SELECT id,active FROM t1");
+ myquery(rc);
+
+ result= mysql_store_result(mysql);
+ mytest(result);
+
+ i= 0;
+ while ((row= mysql_fetch_row(result)))
+ {
+ i++;
+ DIE_IF(atoi(row[0]) != i);
+ DIE_IF(atoi(row[1]) != 1);
+ }
+ DIE_IF(i != 4);
+ mysql_free_result(result);
+
+
+ rc= mysql_query(mysql, "DROP TABLE t1");
+ myquery(rc);
+}
+
+static void test_bulk_delete_returning()
+{
+ int rc;
+ MYSQL_STMT *stmt;
+ MYSQL_BIND bind[2], res_bind[2];
+ MYSQL_ROW row;
+ MYSQL_RES *result;
+ int i,
+ id[]= {1, 2, 3, 4},
+ count= sizeof(id)/sizeof(id[0]);
+ unsigned long length[1];
+ my_bool is_null[1];
+ my_bool error[1];
+ int32 res[1];
+
+ rc= mysql_query(mysql, "DROP TABLE IF EXISTS t1");
+ myquery(rc);
+ rc= mysql_query(mysql, "CREATE TABLE t1 (id int not null primary key)");
+ myquery(rc);
+ rc= mysql_query(mysql, "insert into t1 values (1), (2), (3), (4)");
+ myquery(rc);
+ verify_affected_rows(4);
+
+ stmt= mysql_stmt_init(mysql);
+ rc= mysql_stmt_prepare(stmt, "DELETE FROM t1 WHERE id=? RETURNING id", -1);
+ check_execute(stmt, rc);
+
+ memset(bind, 0, sizeof(bind));
+ bind[0].buffer_type = MYSQL_TYPE_LONG;
+ bind[0].buffer = (void *)id;
+ bind[0].buffer_length = 0;
+
+ mysql_stmt_attr_set(stmt, STMT_ATTR_ARRAY_SIZE, (void*)&count);
+ rc= mysql_stmt_bind_param(stmt, bind);
+ check_execute(stmt, rc);
+
+ rc= mysql_stmt_execute(stmt);
+ myquery(rc);
+
+ memset(bind, 0, sizeof(res_bind));
+ res_bind[0].buffer_type= MYSQL_TYPE_LONG;
+ res_bind[0].buffer= (char *)&res[0];
+ res_bind[0].is_null= &is_null[0];
+ res_bind[0].length= &length[0];
+ res_bind[0].error= &error[0];
+ rc= mysql_stmt_bind_result(stmt, res_bind);
+ myquery(rc);
+ rc= mysql_stmt_store_result(stmt);
+ myquery(rc);
+
+ i= 0;
+ while (!mysql_stmt_fetch(stmt))
+ {
+ i++;
+ DIE_IF(is_null[0]);
+ printf("\nXXX %d - %d (%d)", i, res[0], is_null[0]);
+ DIE_IF(res[0] != i);
+ }
+ DIE_IF(i != 4);
+
+ mysql_stmt_close(stmt);
+
+ rc= mysql_query(mysql, "SELECT id FROM t1");
+ myquery(rc);
+
+ result= mysql_store_result(mysql);
+ mytest(result);
+
+ i= 0;
+ while ((row= mysql_fetch_row(result)))
+ {
+ i++;
+ printf("\nXXX %d %s \n", i, row[0]);
+ }
+ DIE_IF(i != 0 );
+ mysql_free_result(result);
+
+
+ rc= mysql_query(mysql, "DROP TABLE t1");
+ myquery(rc);
+}
#endif
@@ -21277,6 +21450,8 @@ static struct my_tests_st my_tests[]= {
{ "test_bulk_autoinc", test_bulk_autoinc},
{ "test_bulk_delete", test_bulk_delete },
{ "test_bulk_replace", test_bulk_replace },
+ { "test_bulk_insert_returnung", test_bulk_insert_returning },
+ { "test_bulk_delete_returning", test_bulk_delete_returning },
#endif
{ "test_ps_params_in_ctes", test_ps_params_in_ctes },
{ "test_explain_meta", test_explain_meta },