summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOleksandr Byelkin <sanja@mariadb.com>2020-06-22 18:21:21 +0200
committerOleksandr Byelkin <sanja@mariadb.com>2021-07-15 16:28:13 +0200
commita7d880f0b095939118594ecbeba953959ae8e351 (patch)
tree4bd583a11bf5fc42327bdb8b069fb7673cb54a47
parent826eab3f9b28dbc370e5d7a2f81dc42d2f09ed40 (diff)
downloadmariadb-git-a7d880f0b095939118594ecbeba953959ae8e351.tar.gz
MDEV-21916: COM_STMT_BULK_EXECUTE with RETURNING insert wrong values
The problem is that array binding uses net buffer to read parameters for each execution while each execiting with RETURNING write in the same buffer. Solution is to allocate new net buffer to avoid changing buffer we are reading from.
-rw-r--r--sql/net_serv.cc20
-rw-r--r--sql/protocol.cc1
-rw-r--r--sql/sql_delete.cc7
-rw-r--r--sql/sql_error.cc22
-rw-r--r--sql/sql_error.h11
-rw-r--r--sql/sql_insert.cc24
-rw-r--r--sql/sql_parse.cc2
-rw-r--r--sql/sql_prepare.cc46
-rw-r--r--storage/perfschema/pfs.cc2
-rw-r--r--tests/mysql_client_test.c174
10 files changed, 282 insertions, 27 deletions
diff --git a/sql/net_serv.cc b/sql/net_serv.cc
index a96c43a94fe..1d2409f16cf 100644
--- a/sql/net_serv.cc
+++ b/sql/net_serv.cc
@@ -179,14 +179,26 @@ my_bool my_net_init(NET *net, Vio *vio, void *thd, uint my_flags)
DBUG_RETURN(0);
}
+
+/**
+ Allocate and assign new net buffer
+
+ @note In case of error the old buffer left
+
+ @retval TRUE error
+ @retval FALSE success
+*/
+
my_bool net_allocate_new_packet(NET *net, void *thd, uint my_flags)
{
+ uchar *tmp;
DBUG_ENTER("net_allocate_new_packet");
- if (!(net->buff=(uchar*) my_malloc(key_memory_NET_buff,
- (size_t) net->max_packet +
- NET_HEADER_SIZE + COMP_HEADER_SIZE + 1,
- MYF(MY_WME | my_flags))))
+ if (!(tmp= (uchar*) my_malloc(key_memory_NET_buff,
+ (size_t) net->max_packet +
+ NET_HEADER_SIZE + COMP_HEADER_SIZE + 1,
+ MYF(MY_WME | my_flags))))
DBUG_RETURN(1);
+ net->buff= tmp;
net->buff_end=net->buff+net->max_packet;
net->write_pos=net->read_pos = net->buff;
DBUG_RETURN(0);
diff --git a/sql/protocol.cc b/sql/protocol.cc
index 08b874adba1..eb50b02cf3c 100644
--- a/sql/protocol.cc
+++ b/sql/protocol.cc
@@ -598,6 +598,7 @@ void Protocol::end_statement()
thd->get_stmt_da()->get_sqlstate());
break;
case Diagnostics_area::DA_EOF:
+ case Diagnostics_area::DA_EOF_BULK:
error= send_eof(thd->server_status,
thd->get_stmt_da()->statement_warn_count());
break;
diff --git a/sql/sql_delete.cc b/sql/sql_delete.cc
index 870eecdd494..1966a77aa3e 100644
--- a/sql/sql_delete.cc
+++ b/sql/sql_delete.cc
@@ -698,7 +698,12 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds,
!table->prepare_triggers_for_delete_stmt_or_event())
will_batch= !table->file->start_bulk_delete();
- if (returning)
+ /*
+ thd->get_stmt_da()->is_set() means first iteration of prepared statement
+ with array binding operation execution (non optimized so it is not
+ INSERT)
+ */
+ if (returning && !thd->get_stmt_da()->is_set())
{
if (result->send_result_set_metadata(returning->item_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
diff --git a/sql/sql_error.cc b/sql/sql_error.cc
index b3ef0d89a98..80cdc0bc734 100644
--- a/sql/sql_error.cc
+++ b/sql/sql_error.cc
@@ -372,7 +372,7 @@ Diagnostics_area::set_eof_status(THD *thd)
{
DBUG_ENTER("set_eof_status");
/* Only allowed to report eof if has not yet reported an error */
- DBUG_ASSERT(! is_set());
+ DBUG_ASSERT(!is_set() || (m_status == DA_EOF_BULK && is_bulk_op()));
/*
In production, refuse to overwrite an error or a custom response
with an EOF packet.
@@ -385,11 +385,23 @@ Diagnostics_area::set_eof_status(THD *thd)
number of warnings, since they are not available to the client
anyway.
*/
- m_statement_warn_count= (thd->spcont ?
- 0 :
- current_statement_warn_count());
+ if (m_status == DA_EOF_BULK)
+ {
+ if (!thd->spcont)
+ m_statement_warn_count+= current_statement_warn_count();
+ }
+ else
+ {
+ if (thd->spcont)
+ {
+ m_statement_warn_count= 0;
+ m_affected_rows= 0;
+ }
+ else
+ m_statement_warn_count= current_statement_warn_count();
+ m_status= (is_bulk_op() ? DA_EOF_BULK : DA_EOF);
+ }
- m_status= DA_EOF;
DBUG_VOID_RETURN;
}
diff --git a/sql/sql_error.h b/sql/sql_error.h
index a0497af78cb..c4ac88d6414 100644
--- a/sql/sql_error.h
+++ b/sql/sql_error.h
@@ -960,6 +960,8 @@ public:
DA_EOF,
/** Set whenever one calls my_ok() in PS bulk mode. */
DA_OK_BULK,
+ /** Set whenever one calls my_eof() in PS bulk mode. */
+ DA_EOF_BULK,
/** Set whenever one calls my_error() or my_message(). */
DA_ERROR,
/** Set in case of a custom response, such as one from COM_STMT_PREPARE. */
@@ -1019,8 +1021,11 @@ public:
enum_diagnostics_status status() const { return m_status; }
const char *message() const
- { DBUG_ASSERT(m_status == DA_ERROR || m_status == DA_OK ||
- m_status == DA_OK_BULK); return m_message; }
+ {
+ DBUG_ASSERT(m_status == DA_ERROR || m_status == DA_OK ||
+ m_status == DA_OK_BULK || m_status == DA_EOF_BULK);
+ return m_message;
+ }
bool skip_flush() const
{
@@ -1055,7 +1060,7 @@ public:
uint statement_warn_count() const
{
DBUG_ASSERT(m_status == DA_OK || m_status == DA_OK_BULK ||
- m_status == DA_EOF);
+ m_status == DA_EOF ||m_status == DA_EOF_BULK );
return m_statement_warn_count;
}
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index ab2c479af5c..9d3444a5ac6 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -710,6 +710,7 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list,
Name_resolution_context *context;
Name_resolution_context_state ctx_state;
SELECT_LEX *returning= thd->lex->has_returning() ? thd->lex->returning() : 0;
+ unsigned char *readbuff= NULL;
#ifndef EMBEDDED_LIBRARY
char *query= thd->query();
@@ -786,7 +787,25 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list,
/* Prepares LEX::returing_list if it is not empty */
if (returning)
+ {
result->prepare(returning->item_list, NULL);
+ if (thd->is_bulk_op())
+ {
+ /*
+ It is RETURNING which needs network buffer to write result set and
+ it is array binfing which need network buffer to read parameters.
+ So we allocate yet another network buffer.
+ The old buffer will be freed at the end of operation.
+ */
+ DBUG_ASSERT(thd->protocol == &thd->protocol_binary);
+ readbuff= thd->net.buff; // old buffer
+ if (net_allocate_new_packet(&thd->net, thd, MYF(MY_THREAD_SPECIFIC)))
+ {
+ readbuff= NULL; // failure, net_allocate_new_packet keeps old buffer
+ goto abort;
+ }
+ }
+ }
context= &thd->lex->first_select_lex()->context;
/*
@@ -1316,7 +1335,8 @@ values_loop_end:
thd->lex->current_select->save_leaf_tables(thd);
thd->lex->current_select->first_cond_optimization= 0;
}
-
+ if (readbuff)
+ my_free(readbuff);
DBUG_RETURN(FALSE);
abort:
@@ -1330,6 +1350,8 @@ abort:
if (!joins_freed)
free_underlaid_joins(thd, thd->lex->first_select_lex());
thd->abort_on_warning= 0;
+ if (readbuff)
+ my_free(readbuff);
DBUG_RETURN(retval);
}
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 63898ad4db7..19f80606e34 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -2356,7 +2356,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
size_t next_length_length= packet_start - packet;
unsigned char *readbuff= net->buff;
- if (net_allocate_new_packet(net, thd, MYF(0)))
+ if (net_allocate_new_packet(net, thd, MYF(MY_THREAD_SPECIFIC)))
break;
PSI_statement_locker *save_locker= thd->m_statement_psi;
diff --git a/sql/sql_prepare.cc b/sql/sql_prepare.cc
index 5fac9071575..0684f8c5e51 100644
--- a/sql/sql_prepare.cc
+++ b/sql/sql_prepare.cc
@@ -894,6 +894,9 @@ static bool insert_bulk_params(Prepared_statement *stmt,
case STMT_INDICATOR_IGNORE:
param->set_ignore();
break;
+ default:
+ DBUG_ASSERT(0);
+ DBUG_RETURN(1);
}
}
else
@@ -4567,6 +4570,7 @@ Prepared_statement::execute_bulk_loop(String *expanded_query,
uchar *packet_end_arg)
{
Reprepare_observer reprepare_observer;
+ unsigned char *readbuff= NULL;
bool error= 0;
packet= packet_arg;
packet_end= packet_end_arg;
@@ -4580,24 +4584,37 @@ Prepared_statement::execute_bulk_loop(String *expanded_query,
if (state == Query_arena::STMT_ERROR)
{
my_message(last_errno, last_error, MYF(0));
- thd->set_bulk_execution(0);
- return TRUE;
+ goto err;
}
/* Check for non zero parameter count*/
if (param_count == 0)
{
DBUG_PRINT("error", ("Statement with no parameters for bulk execution."));
my_error(ER_UNSUPPORTED_PS, MYF(0));
- thd->set_bulk_execution(0);
- return TRUE;
+ goto err;
}
if (!(sql_command_flags[lex->sql_command] & CF_PS_ARRAY_BINDING_SAFE))
{
DBUG_PRINT("error", ("Command is not supported in bulk execution."));
my_error(ER_UNSUPPORTED_PS, MYF(0));
- thd->set_bulk_execution(0);
- return TRUE;
+ goto err;
+ }
+ /*
+ Here second buffer for not optimized commands,
+ optimized commands do it inside thier internal loop.
+ */
+ if (!(sql_command_flags[lex->sql_command] & CF_PS_ARRAY_BINDING_OPTIMIZED) &&
+ this->lex->has_returning())
+ {
+ // Above check can be true for SELECT in future
+ DBUG_ASSERT(lex->sql_command != SQLCOM_SELECT);
+ readbuff= thd->net.buff; // old buffer
+ if (net_allocate_new_packet(&thd->net, thd, MYF(MY_THREAD_SPECIFIC)))
+ {
+ readbuff= NULL; // failure, net_allocate_new_packet keeps old buffer
+ goto err;
+ }
}
#ifndef EMBEDDED_LIBRARY
@@ -4609,9 +4626,7 @@ Prepared_statement::execute_bulk_loop(String *expanded_query,
{
my_error(ER_WRONG_ARGUMENTS, MYF(0),
"mysqld_stmt_bulk_execute");
- reset_stmt_params(this);
- thd->set_bulk_execution(0);
- return true;
+ goto err;
}
read_types= FALSE;
@@ -4628,8 +4643,7 @@ Prepared_statement::execute_bulk_loop(String *expanded_query,
{
if (set_bulk_parameters(TRUE))
{
- thd->set_bulk_execution(0);
- return true;
+ goto err;
}
}
@@ -4693,8 +4707,16 @@ reexecute:
}
reset_stmt_params(this);
thd->set_bulk_execution(0);
-
+ if (readbuff)
+ my_free(readbuff);
return error;
+
+err:
+ reset_stmt_params(this);
+ thd->set_bulk_execution(0);
+ if (readbuff)
+ my_free(readbuff);
+ return true;
}
diff --git a/storage/perfschema/pfs.cc b/storage/perfschema/pfs.cc
index 1bb712f64cc..3e9198a6b6c 100644
--- a/storage/perfschema/pfs.cc
+++ b/storage/perfschema/pfs.cc
@@ -5466,6 +5466,7 @@ void pfs_end_statement_v1(PSI_statement_locker *locker, void *stmt_da)
switch(da->status())
{
case Diagnostics_area::DA_OK_BULK:
+ case Diagnostics_area::DA_EOF_BULK:
case Diagnostics_area::DA_EMPTY:
break;
case Diagnostics_area::DA_OK:
@@ -5706,6 +5707,7 @@ void pfs_end_statement_v1(PSI_statement_locker *locker, void *stmt_da)
switch (da->status())
{
case Diagnostics_area::DA_OK_BULK:
+ case Diagnostics_area::DA_EOF_BULK:
case Diagnostics_area::DA_EMPTY:
break;
case Diagnostics_area::DA_OK:
diff --git a/tests/mysql_client_test.c b/tests/mysql_client_test.c
index 4b3e0a5b446..7587e08e727 100644
--- a/tests/mysql_client_test.c
+++ b/tests/mysql_client_test.c
@@ -20547,6 +20547,178 @@ static void test_bulk_replace()
rc= mysql_query(mysql, "DROP TABLE t1");
myquery(rc);
}
+
+
+static void test_bulk_insert_returning()
+{
+ int rc;
+ MYSQL_STMT *stmt;
+ MYSQL_BIND bind[2], res_bind[2];
+ MYSQL_ROW row;
+ MYSQL_RES *result;
+ int i,
+ id[]= {1, 2, 3, 4},
+ val[]= {1, 1, 1, 1},
+ count= sizeof(id)/sizeof(id[0]);
+ unsigned long length[2];
+ my_bool is_null[2];
+ my_bool error[2];
+ int32 res[2];
+
+ rc= mysql_query(mysql, "DROP TABLE IF EXISTS t1");
+ myquery(rc);
+ rc= mysql_query(mysql,
+ "CREATE TABLE t1 (id int not null primary key, active int)");
+ myquery(rc);
+
+ stmt= mysql_stmt_init(mysql);
+ rc= mysql_stmt_prepare(stmt,
+ "insert into t1 values (?, ?) returning id, active",
+ -1);
+ check_execute(stmt, rc);
+
+ memset(bind, 0, sizeof(bind));
+ bind[0].buffer_type = MYSQL_TYPE_LONG;
+ bind[0].buffer = (void *)id;
+ bind[0].buffer_length = 0;
+ bind[1].buffer_type = MYSQL_TYPE_LONG;
+ bind[1].buffer = (void *)val;
+ bind[1].buffer_length = 0;
+
+ mysql_stmt_attr_set(stmt, STMT_ATTR_ARRAY_SIZE, (void*)&count);
+ rc= mysql_stmt_bind_param(stmt, bind);
+ check_execute(stmt, rc);
+
+ rc= mysql_stmt_execute(stmt);
+ myquery(rc);
+
+ memset(res_bind, 0, sizeof(res_bind));
+ for (i= 0; i < 2; i++)
+ {
+ res_bind[i].buffer_type= MYSQL_TYPE_LONG;
+ res_bind[i].buffer= (char *)&res[i];
+ res_bind[i].is_null= &is_null[i];
+ res_bind[i].length= &length[i];
+ res_bind[i].error= &error[i];
+ }
+ rc= mysql_stmt_bind_result(stmt, res_bind);
+ myquery(rc);
+ rc= mysql_stmt_store_result(stmt);
+ myquery(rc);
+
+ i= 0;
+ while (!mysql_stmt_fetch(stmt))
+ {
+ i++;
+ DIE_IF(is_null[0] || is_null[1]);
+ DIE_IF(res[0] != i);
+ DIE_IF(res[1] != 1);
+ }
+ DIE_IF(i != 4);
+
+ mysql_stmt_close(stmt);
+
+ rc= mysql_query(mysql, "SELECT id,active FROM t1");
+ myquery(rc);
+
+ result= mysql_store_result(mysql);
+ mytest(result);
+
+ i= 0;
+ while ((row= mysql_fetch_row(result)))
+ {
+ i++;
+ DIE_IF(atoi(row[0]) != i);
+ DIE_IF(atoi(row[1]) != 1);
+ }
+ DIE_IF(i != 4);
+ mysql_free_result(result);
+
+
+ rc= mysql_query(mysql, "DROP TABLE t1");
+ myquery(rc);
+}
+
+static void test_bulk_delete_returning()
+{
+ int rc;
+ MYSQL_STMT *stmt;
+ MYSQL_BIND bind[2], res_bind[2];
+ MYSQL_ROW row;
+ MYSQL_RES *result;
+ int i,
+ id[]= {1, 2, 3, 4},
+ count= sizeof(id)/sizeof(id[0]);
+ unsigned long length[1];
+ my_bool is_null[1];
+ my_bool error[1];
+ int32 res[1];
+
+ rc= mysql_query(mysql, "DROP TABLE IF EXISTS t1");
+ myquery(rc);
+ rc= mysql_query(mysql, "CREATE TABLE t1 (id int not null primary key)");
+ myquery(rc);
+ rc= mysql_query(mysql, "insert into t1 values (1), (2), (3), (4)");
+ myquery(rc);
+ verify_affected_rows(4);
+
+ stmt= mysql_stmt_init(mysql);
+ rc= mysql_stmt_prepare(stmt, "DELETE FROM t1 WHERE id=? RETURNING id", -1);
+ check_execute(stmt, rc);
+
+ memset(bind, 0, sizeof(bind));
+ bind[0].buffer_type = MYSQL_TYPE_LONG;
+ bind[0].buffer = (void *)id;
+ bind[0].buffer_length = 0;
+
+ mysql_stmt_attr_set(stmt, STMT_ATTR_ARRAY_SIZE, (void*)&count);
+ rc= mysql_stmt_bind_param(stmt, bind);
+ check_execute(stmt, rc);
+
+ rc= mysql_stmt_execute(stmt);
+ myquery(rc);
+
+ memset(res_bind, 0, sizeof(res_bind));
+ res_bind[0].buffer_type= MYSQL_TYPE_LONG;
+ res_bind[0].buffer= (char *)&res[0];
+ res_bind[0].is_null= &is_null[0];
+ res_bind[0].length= &length[0];
+ res_bind[0].error= &error[0];
+ rc= mysql_stmt_bind_result(stmt, res_bind);
+ myquery(rc);
+ rc= mysql_stmt_store_result(stmt);
+ myquery(rc);
+
+ i= 0;
+ while (!mysql_stmt_fetch(stmt))
+ {
+ i++;
+ DIE_IF(is_null[0]);
+ DIE_IF(res[0] != i);
+ }
+ DIE_IF(i != 4);
+
+ mysql_stmt_close(stmt);
+
+ rc= mysql_query(mysql, "SELECT id FROM t1");
+ myquery(rc);
+
+ result= mysql_store_result(mysql);
+ mytest(result);
+
+ i= 0;
+ while ((row= mysql_fetch_row(result)))
+ {
+ i++;
+ printf("\nResult (SHOULD NOT BE HERE!!!) %d %s \n", i, row[0]);
+ }
+ DIE_IF(i != 0 );
+ mysql_free_result(result);
+
+
+ rc= mysql_query(mysql, "DROP TABLE t1");
+ myquery(rc);
+}
#endif
@@ -21427,6 +21599,8 @@ static struct my_tests_st my_tests[]= {
{ "test_bulk_autoinc", test_bulk_autoinc},
{ "test_bulk_delete", test_bulk_delete },
{ "test_bulk_replace", test_bulk_replace },
+ { "test_bulk_insert_returning", test_bulk_insert_returning },
+ { "test_bulk_delete_returning", test_bulk_delete_returning },
#endif
{ "test_ps_params_in_ctes", test_ps_params_in_ctes },
{ "test_explain_meta", test_explain_meta },