summaryrefslogtreecommitdiff
path: root/storage
diff options
context:
space:
mode:
authorRich Prohaska <prohaska@tokutek.com>2013-12-05 11:59:34 -0500
committerRich Prohaska <prohaska@tokutek.com>2013-12-05 11:59:34 -0500
commitb65e6e3d746ef8261ca415928df2f1c1614d40e1 (patch)
tree2ce55528ccf1312b3cf8527fa21a9af0c8bf05c0 /storage
parent7fe66e0cf002197554e6ab09069974035a23decf (diff)
downloadmariadb-git-b65e6e3d746ef8261ca415928df2f1c1614d40e1.tar.gz
#141 redo table open and close locking to avoid table opening pileup
Diffstat (limited to 'storage')
-rw-r--r--storage/tokudb/ha_tokudb.cc199
-rw-r--r--storage/tokudb/ha_tokudb.h17
-rw-r--r--storage/tokudb/hatoku_defines.h40
-rw-r--r--storage/tokudb/hatoku_hton.cc4
4 files changed, 166 insertions, 94 deletions
diff --git a/storage/tokudb/ha_tokudb.cc b/storage/tokudb/ha_tokudb.cc
index 8dbdbcba028..37962ce42d0 100644
--- a/storage/tokudb/ha_tokudb.cc
+++ b/storage/tokudb/ha_tokudb.cc
@@ -205,14 +205,39 @@ exit:
return error;
}
-/** @brief
- Simple lock controls. The "share" it creates is a structure we will
- pass to each tokudb handler. Do you have to have one of these? Well, you have
- pieces that are used for locking, and they are needed to function.
-
- MUST have tokudb_mutex locked on input
+static void free_key_and_col_info (KEY_AND_COL_INFO* kc_info) {
+ for (uint i = 0; i < MAX_KEY+1; i++) {
+ bitmap_free(&kc_info->key_filters[i]);
+ }
-*/
+ for (uint i = 0; i < MAX_KEY+1; i++) {
+ tokudb_my_free(kc_info->cp_info[i]);
+ kc_info->cp_info[i] = NULL; // 3144
+ }
+
+ tokudb_my_free(kc_info->field_lengths);
+ tokudb_my_free(kc_info->length_bytes);
+ tokudb_my_free(kc_info->blob_fields);
+}
+
+void TOKUDB_SHARE::init(void) {
+ use_count = 0;
+ thr_lock_init(&lock);
+ tokudb_pthread_mutex_init(&mutex, MY_MUTEX_INIT_FAST);
+ my_rwlock_init(&num_DBs_lock, 0);
+ tokudb_pthread_cond_init(&m_openclose_cond, NULL);
+ m_state = CLOSED;
+}
+
+void TOKUDB_SHARE::destroy(void) {
+ assert(m_state == CLOSED);
+ thr_lock_delete(&lock);
+ tokudb_pthread_mutex_destroy(&mutex);
+ rwlock_destroy(&num_DBs_lock);
+ tokudb_pthread_cond_destroy(&m_openclose_cond);
+}
+
+// MUST have tokudb_mutex locked on input
static TOKUDB_SHARE *get_share(const char *table_name, TABLE_SHARE* table_share) {
TOKUDB_SHARE *share = NULL;
int error = 0;
@@ -234,7 +259,8 @@ static TOKUDB_SHARE *get_share(const char *table_name, TABLE_SHARE* table_share)
);
assert(share);
- share->use_count = 0;
+ share->init();
+
share->table_name_length = length;
share->table_name = tmp_name;
strmov(share->table_name, table_name);
@@ -244,54 +270,30 @@ static TOKUDB_SHARE *get_share(const char *table_name, TABLE_SHARE* table_share)
goto exit;
}
- memset((void *) share->key_file, 0, sizeof(share->key_file));
-
error = my_hash_insert(&tokudb_open_tables, (uchar *) share);
if (error) {
+ free_key_and_col_info(&share->kc_info);
goto exit;
}
- thr_lock_init(&share->lock);
- pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST);
- my_rwlock_init(&share->num_DBs_lock, 0);
}
exit:
if (error) {
- pthread_mutex_destroy(&share->mutex);
+ share->destroy();
tokudb_my_free((uchar *) share);
share = NULL;
}
return share;
}
-
-static void free_key_and_col_info (KEY_AND_COL_INFO* kc_info) {
- for (uint i = 0; i < MAX_KEY+1; i++) {
- bitmap_free(&kc_info->key_filters[i]);
- }
-
- for (uint i = 0; i < MAX_KEY+1; i++) {
- tokudb_my_free(kc_info->cp_info[i]);
- kc_info->cp_info[i] = NULL; // 3144
- }
-
- tokudb_my_free(kc_info->field_lengths);
- tokudb_my_free(kc_info->length_bytes);
- tokudb_my_free(kc_info->blob_fields);
-}
-
-//
-// MUST have tokudb_mutex locked on input
-// bool mutex_is_locked specifies if share->mutex is locked
-//
-static int free_share(TOKUDB_SHARE * share, bool mutex_is_locked) {
+static int free_share(TOKUDB_SHARE * share) {
int error, result = 0;
- if (mutex_is_locked) {
- pthread_mutex_unlock(&share->mutex);
- }
+ tokudb_pthread_mutex_lock(&share->mutex);
+ DBUG_PRINT("info", ("share->use_count %u", share->use_count));
if (!--share->use_count) {
- DBUG_PRINT("info", ("share->use_count %u", share->use_count));
+ share->m_state = TOKUDB_SHARE::CLOSING;
+ tokudb_pthread_mutex_unlock(&share->mutex);
//
// number of open DB's may not be equal to number of keys we have because add_index
@@ -316,13 +318,25 @@ static int free_share(TOKUDB_SHARE * share, bool mutex_is_locked) {
error = tokudb::close_status(&share->status_block);
assert(error == 0);
-
- my_hash_delete(&tokudb_open_tables, (uchar *) share);
- thr_lock_delete(&share->lock);
- pthread_mutex_destroy(&share->mutex);
- rwlock_destroy(&share->num_DBs_lock);
- tokudb_my_free((uchar *) share);
+ tokudb_pthread_mutex_lock(&tokudb_mutex);
+ tokudb_pthread_mutex_lock(&share->mutex);
+ share->m_state = TOKUDB_SHARE::CLOSED;
+ if (share->use_count > 0) {
+ tokudb_pthread_cond_broadcast(&share->m_openclose_cond);
+ tokudb_pthread_mutex_unlock(&share->mutex);
+ tokudb_pthread_mutex_unlock(&tokudb_mutex);
+ } else {
+ my_hash_delete(&tokudb_open_tables, (uchar *) share);
+
+ tokudb_pthread_mutex_unlock(&share->mutex);
+ tokudb_pthread_mutex_unlock(&tokudb_mutex);
+
+ share->destroy();
+ tokudb_my_free((uchar *) share);
+ }
+ } else {
+ tokudb_pthread_mutex_unlock(&share->mutex);
}
return result;
@@ -588,7 +602,7 @@ smart_dbt_callback_ir_rowread(DBT const *key, DBT const *row, void *context) {
// Returns:
// The value of the auto increment column in record
//
-ulonglong retrieve_auto_increment(uint16 type, uint32 offset,const uchar *record)
+static ulonglong retrieve_auto_increment(uint16 type, uint32 offset,const uchar *record)
{
const uchar *key; /* Key */
ulonglong unsigned_autoinc = 0; /* Unsigned auto-increment */
@@ -1710,8 +1724,6 @@ exit:
return error;
}
-
-
//
// Creates and opens a handle to a table which already exists in a tokudb
// database.
@@ -1788,36 +1800,49 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
goto exit;
}
- /* Init shared structure */
- pthread_mutex_lock(&tokudb_mutex);
+ // lookup or create share
+ tokudb_pthread_mutex_lock(&tokudb_mutex);
share = get_share(name, table_share);
assert(share);
thr_lock_data_init(&share->lock, &lock, NULL);
- /* Fill in shared structure, if needed */
- pthread_mutex_lock(&share->mutex);
- if (!share->use_count++) {
- ret_val = initialize_share(
- name,
- mode
- );
- if (ret_val) {
- free_share(share, true);
- pthread_mutex_unlock(&tokudb_mutex);
- goto exit;
+ tokudb_pthread_mutex_lock(&share->mutex);
+ tokudb_pthread_mutex_unlock(&tokudb_mutex);
+ share->use_count++;
+ while (share->m_state == TOKUDB_SHARE::OPENING || share->m_state == TOKUDB_SHARE::CLOSING) {
+ tokudb_pthread_cond_wait(&share->m_openclose_cond, &share->mutex);
+ }
+ if (share->m_state == TOKUDB_SHARE::CLOSED) {
+ share->m_state = TOKUDB_SHARE::OPENING;
+ tokudb_pthread_mutex_unlock(&share->mutex);
+
+ ret_val = initialize_share(name, mode);
+
+ tokudb_pthread_mutex_lock(&share->mutex);
+ if (ret_val == 0) {
+ share->m_state = TOKUDB_SHARE::OPENED;
+ } else {
+ share->m_state = TOKUDB_SHARE::ERROR;
+ share->m_error = ret_val;
}
+ tokudb_pthread_cond_broadcast(&share->m_openclose_cond);
+ }
+ if (share->m_state == TOKUDB_SHARE::ERROR) {
+ ret_val = share->m_error;
+ tokudb_pthread_mutex_unlock(&share->mutex);
+ free_share(share);
+ goto exit;
+ } else {
+ assert(share->m_state == TOKUDB_SHARE::OPENED);
+ tokudb_pthread_mutex_unlock(&share->mutex);
}
- pthread_mutex_unlock(&share->mutex);
- pthread_mutex_unlock(&tokudb_mutex);
ref_length = share->ref_length; // If second open
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
- pthread_mutex_lock(&share->mutex);
TOKUDB_TRACE("tokudbopen:%p:share=%p:file=%p:table=%p:table->s=%p:%d\n",
this, share, share->file, table, table->s, share->use_count);
- pthread_mutex_unlock(&share->mutex);
}
key_read = false;
@@ -2128,9 +2153,7 @@ int ha_tokudb::__close() {
rec_update_buff = NULL;
alloc_ptr = NULL;
ha_tokudb::reset();
- pthread_mutex_lock(&tokudb_mutex);
- int retval = free_share(share, false);
- pthread_mutex_unlock(&tokudb_mutex);
+ int retval = free_share(share);
TOKUDB_DBUG_RETURN(retval);
}
@@ -2926,7 +2949,7 @@ DBT *ha_tokudb::pack_ext_key(
//
void ha_tokudb::init_hidden_prim_key_info() {
TOKUDB_DBUG_ENTER("ha_tokudb::init_prim_key_info");
- pthread_mutex_lock(&share->mutex);
+ tokudb_pthread_mutex_lock(&share->mutex);
if (!(share->status & STATUS_PRIMARY_KEY_INIT)) {
int error = 0;
THD* thd = ha_thd();
@@ -2966,7 +2989,7 @@ void ha_tokudb::init_hidden_prim_key_info() {
}
share->status |= STATUS_PRIMARY_KEY_INIT;
}
- pthread_mutex_unlock(&share->mutex);
+ tokudb_pthread_mutex_unlock(&share->mutex);
DBUG_VOID_RETURN;
}
@@ -3246,9 +3269,9 @@ void ha_tokudb::start_bulk_insert(ha_rows rows) {
}
}
exit_try_table_lock:
- pthread_mutex_lock(&share->mutex);
+ tokudb_pthread_mutex_lock(&share->mutex);
share->try_table_lock = false;
- pthread_mutex_unlock(&share->mutex);
+ tokudb_pthread_mutex_unlock(&share->mutex);
}
DBUG_VOID_RETURN;
}
@@ -3265,9 +3288,9 @@ int ha_tokudb::end_bulk_insert(bool abort) {
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
bool using_loader = (loader != NULL);
if (ai_metadata_update_required) {
- pthread_mutex_lock(&share->mutex);
+ tokudb_pthread_mutex_lock(&share->mutex);
error = update_max_auto_inc(share->status_block, share->last_auto_increment);
- pthread_mutex_unlock(&share->mutex);
+ tokudb_pthread_mutex_unlock(&share->mutex);
if (error) { goto cleanup; }
}
delay_updating_ai_metadata = false;
@@ -3891,7 +3914,7 @@ int ha_tokudb::write_row(uchar * record) {
// of the auto inc field.
//
if (share->has_auto_inc && record == table->record[0]) {
- pthread_mutex_lock(&share->mutex);
+ tokudb_pthread_mutex_lock(&share->mutex);
ulonglong curr_auto_inc = retrieve_auto_increment(
table->field[share->ai_field_index]->key_type(),
field_offset(table->field[share->ai_field_index], table),
@@ -3906,7 +3929,7 @@ int ha_tokudb::write_row(uchar * record) {
update_max_auto_inc(share->status_block, share->last_auto_increment);
}
}
- pthread_mutex_unlock(&share->mutex);
+ tokudb_pthread_mutex_unlock(&share->mutex);
}
//
@@ -4084,7 +4107,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
// of the auto inc field.
//
if (share->has_auto_inc && new_row == table->record[0]) {
- pthread_mutex_lock(&share->mutex);
+ tokudb_pthread_mutex_lock(&share->mutex);
ulonglong curr_auto_inc = retrieve_auto_increment(
table->field[share->ai_field_index]->key_type(),
field_offset(table->field[share->ai_field_index], table),
@@ -4096,7 +4119,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
share->last_auto_increment = curr_auto_inc;
}
}
- pthread_mutex_unlock(&share->mutex);
+ tokudb_pthread_mutex_unlock(&share->mutex);
}
//
@@ -5377,6 +5400,8 @@ int ha_tokudb::index_prev(uchar * buf) {
TOKUDB_DBUG_RETURN(error);
}
+volatile int tokudb_index_first_wait = 0;
+
//
// Reads the first row from the active index (cursor) into buf, and advances cursor
// Parameters:
@@ -5398,6 +5423,8 @@ int ha_tokudb::index_first(uchar * buf) {
ha_statistic_increment(&SSV::ha_read_first_count);
+ while (tokudb_index_first_wait) sleep(1);
+
info.ha = this;
info.buf = buf;
info.keynr = tokudb_active_index;
@@ -6125,7 +6152,7 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) {
transaction = trx->sub_sp_level;
}
else {
- pthread_mutex_lock(&share->mutex);
+ tokudb_pthread_mutex_lock(&share->mutex);
// hate dealing with comparison of signed vs unsigned, so doing this
if (deleted_rows > added_rows && share->rows < (deleted_rows - added_rows)) {
share->rows = 0;
@@ -6133,7 +6160,7 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) {
else {
share->rows += (added_rows - deleted_rows);
}
- pthread_mutex_unlock(&share->mutex);
+ tokudb_pthread_mutex_unlock(&share->mutex);
added_rows = 0;
deleted_rows = 0;
share->rows_from_locked_table = 0;
@@ -7443,7 +7470,7 @@ void ha_tokudb::get_auto_increment(ulonglong offset, ulonglong increment, ulongl
ulonglong nr;
bool over;
- pthread_mutex_lock(&share->mutex);
+ tokudb_pthread_mutex_lock(&share->mutex);
if (share->auto_inc_create_value > share->last_auto_increment) {
nr = share->auto_inc_create_value;
@@ -7472,7 +7499,7 @@ void ha_tokudb::get_auto_increment(ulonglong offset, ulonglong increment, ulongl
}
*first_value = nr;
*nb_reserved_values = nb_desired_values;
- pthread_mutex_unlock(&share->mutex);
+ tokudb_pthread_mutex_unlock(&share->mutex);
DBUG_VOID_RETURN;
}
@@ -7838,18 +7865,18 @@ int ha_tokudb::tokudb_add_index(
// We have an accurate row count, might as well update share->rows
//
if(!creating_hot_index) {
- pthread_mutex_lock(&share->mutex);
+ tokudb_pthread_mutex_lock(&share->mutex);
share->rows = num_processed;
- pthread_mutex_unlock(&share->mutex);
+ tokudb_pthread_mutex_unlock(&share->mutex);
}
//
// now write stuff to status.tokudb
//
- pthread_mutex_lock(&share->mutex);
+ tokudb_pthread_mutex_lock(&share->mutex);
for (uint i = 0; i < num_of_keys; i++) {
write_key_name_to_status(share->status_block, key_info[i].name, txn);
}
- pthread_mutex_unlock(&share->mutex);
+ tokudb_pthread_mutex_unlock(&share->mutex);
error = 0;
cleanup:
diff --git a/storage/tokudb/ha_tokudb.h b/storage/tokudb/ha_tokudb.h
index f28048e279e..11179775868 100644
--- a/storage/tokudb/ha_tokudb.h
+++ b/storage/tokudb/ha_tokudb.h
@@ -127,6 +127,10 @@ typedef struct hot_optimize_context {
//
class TOKUDB_SHARE {
public:
+ void init(void);
+ void destroy(void);
+
+public:
char *table_name;
uint table_name_length, use_count;
pthread_mutex_t mutex;
@@ -184,6 +188,10 @@ public:
bool replace_into_fast;
rw_lock_t num_DBs_lock;
uint32_t num_DBs;
+
+ pthread_cond_t m_openclose_cond;
+ enum { CLOSED, OPENING, OPENED, CLOSING, ERROR } m_state;
+ int m_error;
};
typedef struct st_filter_key_part_info {
@@ -443,10 +451,7 @@ private:
int write_auto_inc_create(DB* db, ulonglong val, DB_TXN* txn);
void init_auto_increment();
bool can_replace_into_be_fast(TABLE_SHARE* table_share, KEY_AND_COL_INFO* kc_info, uint pk);
- int initialize_share(
- const char* name,
- int mode
- );
+ int initialize_share(const char* name, int mode);
void set_query_columns(uint keynr);
int prelock_range (const key_range *start_key, const key_range *end_key);
@@ -599,10 +604,10 @@ public:
int get_status(DB_TXN* trans);
void init_hidden_prim_key_info();
inline void get_auto_primary_key(uchar * to) {
- pthread_mutex_lock(&share->mutex);
+ tokudb_pthread_mutex_lock(&share->mutex);
share->auto_ident++;
hpk_num_to_char(to, share->auto_ident);
- pthread_mutex_unlock(&share->mutex);
+ tokudb_pthread_mutex_unlock(&share->mutex);
}
virtual void get_auto_increment(ulonglong offset, ulonglong increment, ulonglong nb_desired_values, ulonglong * first_value, ulonglong * nb_reserved_values);
bool is_optimize_blocking();
diff --git a/storage/tokudb/hatoku_defines.h b/storage/tokudb/hatoku_defines.h
index 5d3409d0967..66724457748 100644
--- a/storage/tokudb/hatoku_defines.h
+++ b/storage/tokudb/hatoku_defines.h
@@ -443,4 +443,44 @@ static inline void* tokudb_my_multi_malloc(myf myFlags, ...) {
return start;
}
+static inline void tokudb_pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr) {
+ int r = pthread_mutex_init(mutex, attr);
+ assert(r == 0);
+}
+
+static inline void tokudb_pthread_mutex_destroy(pthread_mutex_t *mutex) {
+ int r = pthread_mutex_destroy(mutex);
+ assert(r == 0);
+}
+
+static inline void tokudb_pthread_mutex_lock(pthread_mutex_t *mutex) {
+ int r = pthread_mutex_lock(mutex);
+ assert(r == 0);
+}
+
+static inline void tokudb_pthread_mutex_unlock(pthread_mutex_t *mutex) {
+ int r = pthread_mutex_unlock(mutex);
+ assert(r == 0);
+}
+
+static inline void tokudb_pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr) {
+ int r = pthread_cond_init(cond, attr);
+ assert(r == 0);
+}
+
+static inline void tokudb_pthread_cond_destroy(pthread_cond_t *cond) {
+ int r = pthread_cond_destroy(cond);
+ assert(r == 0);
+}
+
+static inline void tokudb_pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) {
+ int r = pthread_cond_wait(cond, mutex);
+ assert(r == 0);
+}
+
+static inline void tokudb_pthread_cond_broadcast(pthread_cond_t *cond) {
+ int r = pthread_cond_broadcast(cond);
+ assert(r == 0);
+}
+
#endif
diff --git a/storage/tokudb/hatoku_hton.cc b/storage/tokudb/hatoku_hton.cc
index 93a2f1312eb..b0493a635ad 100644
--- a/storage/tokudb/hatoku_hton.cc
+++ b/storage/tokudb/hatoku_hton.cc
@@ -311,7 +311,7 @@ static int tokudb_init_func(void *p) {
tokudb_hton = (handlerton *) p;
- pthread_mutex_init(&tokudb_mutex, MY_MUTEX_INIT_FAST);
+ tokudb_pthread_mutex_init(&tokudb_mutex, MY_MUTEX_INIT_FAST);
(void) my_hash_init(&tokudb_open_tables, table_alias_charset, 32, 0, 0, (my_hash_get_key) tokudb_get_key, 0, 0);
tokudb_hton->state = SHOW_OPTION_YES;
@@ -529,7 +529,7 @@ static int tokudb_done_func(void *p) {
tokudb_my_free(toku_global_status_rows);
toku_global_status_rows = NULL;
my_hash_free(&tokudb_open_tables);
- pthread_mutex_destroy(&tokudb_mutex);
+ tokudb_pthread_mutex_destroy(&tokudb_mutex);
#if defined(_WIN64)
toku_ydb_destroy();
#endif