summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorAlexey Botchkov <holyfoot@askmonty.org>2016-03-23 12:16:39 +0400
committerAlexey Botchkov <holyfoot@askmonty.org>2016-03-23 12:16:39 +0400
commit2783fc7d14bc8ad16acfeb509d3b19615023f47a (patch)
treeadfd8aeaa00e43cb12f35642e579a18fdfccacc7 /sql
parente4435b5ec304be1439475f6f6084fbf9f1fd9e1f (diff)
downloadmariadb-git-2783fc7d14bc8ad16acfeb509d3b19615023f47a.tar.gz
MDEV-717 LP:1003679 - Wrong binlog order on concurrent DROP schema and CREATE function.
The cause of the issue is when DROP DATABASE takes metadata lock and is in progress through it's execution, a concurrently running CREATE FUNCTION checks for the existence of database which it succeeds and then it waits on the metadata lock. Once DROP DATABASE writes to BINLOG and finally releases the metadata lock on schema object, the CREATE FUNCTION waiting on metadata lock gets in it's code path and succeeds and writes to binlog.
Diffstat (limited to 'sql')
-rw-r--r--sql/events.cc29
-rw-r--r--sql/sp.cc79
-rw-r--r--sql/sp.h2
-rw-r--r--sql/sql_parse.cc54
4 files changed, 82 insertions, 82 deletions
diff --git a/sql/events.cc b/sql/events.cc
index b80ec993ac4..77dbb8b82e5 100644
--- a/sql/events.cc
+++ b/sql/events.cc
@@ -333,6 +333,10 @@ Events::create_event(THD *thd, Event_parse_data *parse_data)
if (check_access(thd, EVENT_ACL, parse_data->dbname.str, NULL, NULL, 0, 0))
DBUG_RETURN(TRUE);
+ if (lock_object_name(thd, MDL_key::EVENT,
+ parse_data->dbname.str, parse_data->name.str))
+ DBUG_RETURN(TRUE);
+
if (check_db_dir_existence(parse_data->dbname.str))
{
my_error(ER_BAD_DB_ERROR, MYF(0), parse_data->dbname.str);
@@ -347,10 +351,6 @@ Events::create_event(THD *thd, Event_parse_data *parse_data)
*/
save_binlog_format= thd->set_current_stmt_binlog_format_stmt();
- if (lock_object_name(thd, MDL_key::EVENT,
- parse_data->dbname.str, parse_data->name.str))
- DBUG_RETURN(TRUE);
-
if (thd->lex->create_info.or_replace() && event_queue)
event_queue->drop_event(thd, parse_data->dbname, parse_data->name);
@@ -454,6 +454,16 @@ Events::update_event(THD *thd, Event_parse_data *parse_data,
if (check_access(thd, EVENT_ACL, parse_data->dbname.str, NULL, NULL, 0, 0))
DBUG_RETURN(TRUE);
+ if (lock_object_name(thd, MDL_key::EVENT,
+ parse_data->dbname.str, parse_data->name.str))
+ DBUG_RETURN(TRUE);
+
+ if (check_db_dir_existence(parse_data->dbname.str))
+ {
+ my_error(ER_BAD_DB_ERROR, MYF(0), parse_data->dbname.str);
+ DBUG_RETURN(TRUE);
+ }
+
if (new_dbname) /* It's a rename */
{
@@ -476,6 +486,13 @@ Events::update_event(THD *thd, Event_parse_data *parse_data,
if (check_access(thd, EVENT_ACL, new_dbname->str, NULL, NULL, 0, 0))
DBUG_RETURN(TRUE);
+ /*
+ Acquire mdl exclusive lock on target database name.
+ */
+ if (lock_object_name(thd, MDL_key::EVENT,
+ new_dbname->str, new_name->str))
+ DBUG_RETURN(TRUE);
+
/* Check that the target database exists */
if (check_db_dir_existence(new_dbname->str))
{
@@ -490,10 +507,6 @@ Events::update_event(THD *thd, Event_parse_data *parse_data,
*/
save_binlog_format= thd->set_current_stmt_binlog_format_stmt();
- if (lock_object_name(thd, MDL_key::EVENT,
- parse_data->dbname.str, parse_data->name.str))
- DBUG_RETURN(TRUE);
-
/* On error conditions my_error() is called so no need to handle here */
if (!(ret= db_repository->update_event(thd, parse_data,
new_dbname, new_name)))
diff --git a/sql/sp.cc b/sql/sp.cc
index 6ec59143720..3cc24089c40 100644
--- a/sql/sp.cc
+++ b/sql/sp.cc
@@ -34,6 +34,10 @@
#include <my_user.h>
+/* Used in error handling only */
+#define SP_TYPE_STRING(type) \
+ (type == TYPE_ENUM_FUNCTION ? "FUNCTION" : "PROCEDURE")
+
static int
db_load_routine(THD *thd, stored_procedure_type type, sp_name *name,
sp_head **sphp,
@@ -1007,15 +1011,16 @@ sp_drop_routine_internal(THD *thd, stored_procedure_type type,
followed by an implicit grant (sp_grant_privileges())
and this subsequent call opens and closes mysql.procs_priv.
- @return Error code. SP_OK is returned on success. Other
- SP_ constants are used to indicate about errors.
+ @return Error status.
+ @retval FALSE on success
+ @retval TRUE on error
*/
-int
+bool
sp_create_routine(THD *thd, stored_procedure_type type, sp_head *sp)
{
LEX *lex= thd->lex;
- int ret;
+ bool ret= TRUE;
TABLE *table;
char definer_buf[USER_HOST_BUFF_SIZE];
LEX_STRING definer;
@@ -1040,7 +1045,22 @@ sp_create_routine(THD *thd, stored_procedure_type type, sp_head *sp)
/* Grab an exclusive MDL lock. */
if (lock_object_name(thd, mdl_type, sp->m_db.str, sp->m_name.str))
- DBUG_RETURN(SP_OPEN_TABLE_FAILED);
+ {
+ my_error(ER_BAD_DB_ERROR, MYF(0), sp->m_db.str);
+ DBUG_RETURN(TRUE);
+ }
+
+ /*
+ Check that a database directory with this name
+ exists. Design note: This won't work on virtual databases
+ like information_schema.
+ */
+ if (check_db_dir_existence(sp->m_db.str))
+ {
+ my_error(ER_BAD_DB_ERROR, MYF(0), sp->m_db.str);
+ DBUG_RETURN(TRUE);
+ }
+
/* Reset sql_mode during data dictionary operations. */
thd->variables.sql_mode= 0;
@@ -1049,7 +1069,9 @@ sp_create_routine(THD *thd, stored_procedure_type type, sp_head *sp)
thd->count_cuted_fields= CHECK_FIELD_WARN;
if (!(table= open_proc_table_for_update(thd)))
- ret= SP_OPEN_TABLE_FAILED;
+ {
+ my_error(ER_SP_STORE_FAILED, MYF(0), SP_TYPE_STRING(type),sp->m_name.str);
+ }
else
{
/* Checking if the routine already exists */
@@ -1065,11 +1087,10 @@ sp_create_routine(THD *thd, stored_procedure_type type, sp_head *sp)
push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE,
ER_SP_ALREADY_EXISTS,
ER_THD(thd, ER_SP_ALREADY_EXISTS),
- type == TYPE_ENUM_FUNCTION ?
- "FUNCTION" : "PROCEDURE",
+ SP_TYPE_STRING(type),
lex->spname->m_name.str);
- ret= SP_OK;
+ ret= FALSE;
// Setting retstr as it is used for logging.
if (sp->m_type == TYPE_ENUM_FUNCTION)
@@ -1078,7 +1099,8 @@ sp_create_routine(THD *thd, stored_procedure_type type, sp_head *sp)
}
else
{
- ret= SP_WRITE_ROW_FAILED;
+ my_error(ER_SP_ALREADY_EXISTS, MYF(0),
+ SP_TYPE_STRING(type), sp->m_name.str);
goto done;
}
}
@@ -1090,7 +1112,8 @@ sp_create_routine(THD *thd, stored_procedure_type type, sp_head *sp)
if (table->s->fields < MYSQL_PROC_FIELD_COUNT)
{
- ret= SP_GET_FIELD_FAILED;
+ my_error(ER_SP_STORE_FAILED, MYF(0),
+ SP_TYPE_STRING(type), sp->m_name.str);
goto done;
}
@@ -1099,12 +1122,12 @@ sp_create_routine(THD *thd, stored_procedure_type type, sp_head *sp)
sp->m_name.str+sp->m_name.length) >
table->field[MYSQL_PROC_FIELD_NAME]->char_length())
{
- ret= SP_BAD_IDENTIFIER;
+ my_error(ER_TOO_LONG_IDENT, MYF(0), sp->m_name.str);
goto done;
}
if (sp->m_body.length > table->field[MYSQL_PROC_FIELD_BODY]->field_length)
{
- ret= SP_BODY_TOO_LONG;
+ my_error(ER_TOO_LONG_BODY, MYF(0), sp->m_name.str);
goto done;
}
@@ -1193,17 +1216,13 @@ sp_create_routine(THD *thd, stored_procedure_type type, sp_head *sp)
if (access == SP_CONTAINS_SQL ||
access == SP_MODIFIES_SQL_DATA)
{
- my_message(ER_BINLOG_UNSAFE_ROUTINE,
- ER_THD(thd, ER_BINLOG_UNSAFE_ROUTINE), MYF(0));
- ret= SP_INTERNAL_ERROR;
+ my_error(ER_BINLOG_UNSAFE_ROUTINE, MYF(0));
goto done;
}
}
if (!(thd->security_ctx->master_access & SUPER_ACL))
{
- my_message(ER_BINLOG_CREATE_ROUTINE_NEED_SUPER,
- ER_THD(thd, ER_BINLOG_CREATE_ROUTINE_NEED_SUPER), MYF(0));
- ret= SP_INTERNAL_ERROR;
+ my_error(ER_BINLOG_CREATE_ROUTINE_NEED_SUPER,MYF(0));
goto done;
}
}
@@ -1234,22 +1253,24 @@ sp_create_routine(THD *thd, stored_procedure_type type, sp_head *sp)
if (store_failed)
{
- ret= SP_FLD_STORE_FAILED;
+ my_error(ER_CANT_CREATE_SROUTINE, MYF(0), sp->m_name.str);
goto done;
}
- ret= SP_OK;
if (table->file->ha_write_row(table->record[0]))
- ret= SP_WRITE_ROW_FAILED;
+ {
+ my_error(ER_SP_ALREADY_EXISTS, MYF(0),
+ SP_TYPE_STRING(type), sp->m_name.str);
+ goto done;
+ }
/* Make change permanent and avoid 'table is marked as crashed' errors */
table->file->extra(HA_EXTRA_FLUSH);
- if (ret == SP_OK)
- sp_cache_invalidate();
+ sp_cache_invalidate();
}
log:
- if (ret == SP_OK && mysql_bin_log.is_open())
+ if (mysql_bin_log.is_open())
{
thd->clear_error();
@@ -1268,7 +1289,7 @@ log:
&(thd->lex->definer->host),
saved_mode))
{
- ret= SP_INTERNAL_ERROR;
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
goto done;
}
/* restore sql_mode when binloging */
@@ -1277,9 +1298,13 @@ log:
if (thd->binlog_query(THD::STMT_QUERY_TYPE,
log_query.ptr(), log_query.length(),
FALSE, FALSE, FALSE, 0))
- ret= SP_INTERNAL_ERROR;
+ {
+ my_error(ER_ERROR_ON_WRITE, MYF(MY_WME), "binary log", -1);
+ goto done;
+ }
thd->variables.sql_mode= 0;
}
+ ret= FALSE;
done:
thd->count_cuted_fields= saved_count_cuted_fields;
diff --git a/sql/sp.h b/sql/sp.h
index 4bfb0577fcc..df60482f8fd 100644
--- a/sql/sp.h
+++ b/sql/sp.h
@@ -126,7 +126,7 @@ sp_exist_routines(THD *thd, TABLE_LIST *procs, bool is_proc);
bool
sp_show_create_routine(THD *thd, stored_procedure_type type, sp_name *name);
-int
+bool
sp_create_routine(THD *thd, stored_procedure_type type, sp_head *sp);
int
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 248a15458d7..6fcb549f097 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -118,8 +118,6 @@ static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length,
*/
/* Used in error handling only */
-#define SP_TYPE_STRING(LP) \
- ((LP)->sphead->m_type == TYPE_ENUM_FUNCTION ? "FUNCTION" : "PROCEDURE")
#define SP_COM_STRING(LP) \
((LP)->sql_command == SQLCOM_CREATE_SPFUNCTION || \
(LP)->sql_command == SQLCOM_ALTER_FUNCTION || \
@@ -5104,7 +5102,6 @@ end_with_restore_list:
{
uint namelen;
char *name;
- int sp_result= SP_INTERNAL_ERROR;
DBUG_ASSERT(lex->sphead != 0);
DBUG_ASSERT(lex->sphead->m_db.str); /* Must be initialized in the parser */
@@ -5115,23 +5112,12 @@ end_with_restore_list:
if (check_db_name(&lex->sphead->m_db))
{
my_error(ER_WRONG_DB_NAME, MYF(0), lex->sphead->m_db.str);
- goto create_sp_error;
+ goto error;
}
if (check_access(thd, CREATE_PROC_ACL, lex->sphead->m_db.str,
NULL, NULL, 0, 0))
- goto create_sp_error;
-
- /*
- Check that a database directory with this name
- exists. Design note: This won't work on virtual databases
- like information_schema.
- */
- if (check_db_dir_existence(lex->sphead->m_db.str))
- {
- my_error(ER_BAD_DB_ERROR, MYF(0), lex->sphead->m_db.str);
- goto create_sp_error;
- }
+ goto error;
/* Checking the drop permissions if CREATE OR REPLACE is used */
if (lex->create_info.or_replace())
@@ -5139,7 +5125,7 @@ end_with_restore_list:
if (check_routine_access(thd, ALTER_PROC_ACL, lex->spname->m_db.str,
lex->spname->m_name.str,
lex->sql_command == SQLCOM_DROP_PROCEDURE, 0))
- goto create_sp_error;
+ goto error;
}
name= lex->sphead->name(&namelen);
@@ -5151,18 +5137,17 @@ end_with_restore_list:
if (udf)
{
my_error(ER_UDF_EXISTS, MYF(0), name);
- goto create_sp_error;
+ goto error;
}
}
#endif
if (sp_process_definer(thd))
- goto create_sp_error;
+ goto error;
WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL)
- res= (sp_result= sp_create_routine(thd, lex->sphead->m_type, lex->sphead));
- switch (sp_result) {
- case SP_OK: {
+ if (!sp_create_routine(thd, lex->sphead->m_type, lex->sphead))
+ {
#ifndef NO_EMBEDDED_ACCESS_CHECKS
/* only add privileges if really neccessary */
@@ -5227,31 +5212,8 @@ end_with_restore_list:
}
#endif
- break;
}
- case SP_WRITE_ROW_FAILED:
- my_error(ER_SP_ALREADY_EXISTS, MYF(0), SP_TYPE_STRING(lex), name);
- break;
- case SP_BAD_IDENTIFIER:
- my_error(ER_TOO_LONG_IDENT, MYF(0), name);
- break;
- case SP_BODY_TOO_LONG:
- my_error(ER_TOO_LONG_BODY, MYF(0), name);
- break;
- case SP_FLD_STORE_FAILED:
- my_error(ER_CANT_CREATE_SROUTINE, MYF(0), name);
- break;
- default:
- my_error(ER_SP_STORE_FAILED, MYF(0), SP_TYPE_STRING(lex), name);
- break;
- } /* end switch */
-
- /*
- Capture all errors within this CASE and
- clean up the environment.
- */
-create_sp_error:
- if (sp_result != SP_OK )
+ else
goto error;
my_ok(thd);
break; /* break super switch */