summaryrefslogtreecommitdiff
path: root/sql/sql_sequence.cc
diff options
context:
space:
mode:
authorMonty <monty@mariadb.org>2017-05-29 16:08:11 +0300
committerMonty <monty@mariadb.org>2017-05-29 16:08:49 +0300
commit7e5bd1500f7149ed67b0593e021d3695a8f9d81a (patch)
treea9fb1e045b8965845512d0c549aa5dc524cb59e7 /sql/sql_sequence.cc
parentd7e3120da8be3b517b81cce160dbe53f91876ce8 (diff)
downloadmariadb-git-7e5bd1500f7149ed67b0593e021d3695a8f9d81a.tar.gz
Add locks for sequence's to ensure that there is only one writer or many readers
This is needed for MyISAM and other storage engines which normally relies on THR_LOCK's to ensure that one is not writing the same block one could be reading from.
Diffstat (limited to 'sql/sql_sequence.cc')
-rw-r--r--sql/sql_sequence.cc59
1 files changed, 45 insertions, 14 deletions
diff --git a/sql/sql_sequence.cc b/sql/sql_sequence.cc
index 1bf360a4f41..1b591f10c9f 100644
--- a/sql/sql_sequence.cc
+++ b/sql/sql_sequence.cc
@@ -342,14 +342,46 @@ bool sequence_insert(THD *thd, LEX *lex, TABLE_LIST *table_list)
SEQUENCE::SEQUENCE() :all_values_used(0), initialized(SEQ_UNINTIALIZED), table(0)
{
- mysql_mutex_init(key_LOCK_SEQUENCE, &mutex, MY_MUTEX_INIT_SLOW);
+ mysql_rwlock_init(key_LOCK_SEQUENCE, &mutex);
}
SEQUENCE::~SEQUENCE()
{
- mysql_mutex_destroy(&mutex);
+ mysql_rwlock_destroy(&mutex);
}
+/*
+ The following functions is to ensure that we when reserve new values
+ trough sequence object sequence we have only one writer at at time.
+ A sequence table can have many readers (trough normal SELECT's).
+
+ We mark that we have a write lock in the table object so that
+ ha_sequence::ha_write() can check if we have a lock. If already locked, then
+ ha_write() knows that we are running a sequence operation. If not, then
+ ha_write() knows that it's an INSERT.
+*/
+
+void SEQUENCE::write_lock(TABLE *table)
+{
+ DBUG_ASSERT(((ha_sequence*) table->file)->is_locked() == 0);
+ mysql_rwlock_wrlock(&mutex);
+ ((ha_sequence*) table->file)->write_lock();
+}
+void SEQUENCE::write_unlock(TABLE *table)
+{
+ ((ha_sequence*) table->file)->unlock();
+ mysql_rwlock_unlock(&mutex);
+}
+void SEQUENCE::read_lock(TABLE *table)
+{
+ if (!((ha_sequence*) table->file)->is_locked())
+ mysql_rwlock_rdlock(&mutex);
+}
+void SEQUENCE::read_unlock(TABLE *table)
+{
+ if (!((ha_sequence*) table->file)->is_locked())
+ mysql_rwlock_unlock(&mutex);
+}
/**
Read values from the sequence tables to table_share->sequence.
@@ -366,7 +398,7 @@ int SEQUENCE::read_initial_values(TABLE *table_arg)
if (likely(initialized != SEQ_UNINTIALIZED))
DBUG_RETURN(0);
table= table_arg;
- mysql_mutex_lock(&mutex);
+ write_lock(table);
if (likely(initialized == SEQ_UNINTIALIZED))
{
MYSQL_LOCK *lock;
@@ -422,7 +454,7 @@ int SEQUENCE::read_initial_values(TABLE *table_arg)
if (!has_active_transaction && !thd->transaction.stmt.is_empty())
trans_commit_stmt(thd);
}
- mysql_mutex_unlock(&mutex);
+ write_unlock(table);
DBUG_RETURN(error);
}
@@ -436,7 +468,6 @@ int SEQUENCE::read_stored_values()
int error;
my_bitmap_map *save_read_set;
DBUG_ENTER("SEQUENCE::read_stored_values");
- mysql_mutex_assert_owner(&mutex);
save_read_set= tmp_use_all_columns(table, table->read_set);
error= table->file->ha_read_first_row(table->record[0], MAX_KEY);
@@ -546,6 +577,7 @@ int sequence_definition::write(TABLE *table, bool all_fields)
{
int error;
MY_BITMAP *save_rpl_write_set, *save_write_set;
+ DBUG_ASSERT(((ha_sequence*) table->file)->is_locked());
save_rpl_write_set= table->rpl_write_set;
if (likely(!all_fields))
@@ -563,11 +595,8 @@ int sequence_definition::write(TABLE *table, bool all_fields)
save_write_set= table->write_set;
table->write_set= &table->s->all_set;
store_fields(table);
- /* Tell ha_sequence::write_row that we already hold the mutex */
- ((ha_sequence*) table->file)->sequence_locked= 1;
if ((error= table->file->ha_write_row(table->record[0])))
table->file->print_error(error, MYF(0));
- ((ha_sequence*) table->file)->sequence_locked= 0;
table->rpl_write_set= save_rpl_write_set;
table->write_set= save_write_set;
return error;
@@ -610,7 +639,7 @@ longlong SEQUENCE::next_value(TABLE *table, bool second_round, int *error)
*error= 0;
if (!second_round)
- lock();
+ write_lock(table);
res_value= next_free_value;
next_free_value= increment_value(next_free_value);
@@ -618,7 +647,7 @@ longlong SEQUENCE::next_value(TABLE *table, bool second_round, int *error)
if ((real_increment > 0 && res_value < reserved_until) ||
(real_increment < 0 && res_value > reserved_until))
{
- unlock();
+ write_unlock(table);
DBUG_RETURN(res_value);
}
@@ -677,11 +706,11 @@ longlong SEQUENCE::next_value(TABLE *table, bool second_round, int *error)
next_free_value= res_value;
}
- unlock();
+ write_unlock(table);
DBUG_RETURN(res_value);
err:
- unlock();
+ write_unlock(table);
my_error(ER_SEQUENCE_RUN_OUT, MYF(0), table->s->db.str,
table->s->table_name.str);
*error= ER_SEQUENCE_RUN_OUT;
@@ -740,7 +769,7 @@ bool SEQUENCE::set_value(TABLE *table, longlong next_val, ulonglong next_round,
ulonglong org_round= round;
DBUG_ENTER("SEQUENCE::set_value");
- lock();
+ write_lock(table);
if (is_used)
next_val= increment_value(next_val);
@@ -782,7 +811,7 @@ bool SEQUENCE::set_value(TABLE *table, longlong next_val, ulonglong next_round,
error= 0;
end:
- unlock();
+ write_unlock(table);
DBUG_RETURN(error);
}
@@ -871,6 +900,7 @@ bool Sql_cmd_alter_sequence::execute(THD *thd)
goto end;
}
+ table->s->sequence->write_lock(table);
if (!(error= new_seq->write(table, 1)))
{
/* Store the sequence values in table share */
@@ -878,6 +908,7 @@ bool Sql_cmd_alter_sequence::execute(THD *thd)
}
else
table->file->print_error(error, MYF(0));
+ table->s->sequence->write_unlock(table);
if (trans_commit_stmt(thd))
error= 1;
if (trans_commit_implicit(thd))