diff options
Diffstat (limited to 'ndb/src/ndbapi/NdbPoolImpl.cpp')
-rw-r--r-- | ndb/src/ndbapi/NdbPoolImpl.cpp | 527 |
1 files changed, 527 insertions, 0 deletions
diff --git a/ndb/src/ndbapi/NdbPoolImpl.cpp b/ndb/src/ndbapi/NdbPoolImpl.cpp new file mode 100644 index 00000000000..08252d26d79 --- /dev/null +++ b/ndb/src/ndbapi/NdbPoolImpl.cpp @@ -0,0 +1,527 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "NdbPoolImpl.hpp" +#include <assert.h> +#include <string.h> + +NdbMutex *NdbPool::pool_mutex = NULL; +NdbPool *the_pool = NULL; + +NdbPool* +NdbPool::create_instance(Uint32 max_ndb_obj, + Uint32 no_conn_obj, + Uint32 init_no_ndb_objects) +{ + if (!initPoolMutex()) { + return NULL; + } + NdbMutex_Lock(pool_mutex); + NdbPool* a_pool; + if (the_pool != NULL) { + a_pool = NULL; + } else { + the_pool = new NdbPool(max_ndb_obj, no_conn_obj); + if (!the_pool->init(init_no_ndb_objects)) { + delete the_pool; + the_pool = NULL; + } + a_pool = the_pool; + } + NdbMutex* temp = pool_mutex; + if (a_pool == NULL) { + pool_mutex = NULL; + } + NdbMutex_Unlock(pool_mutex); + if (a_pool == NULL) { + NdbMutex_Destroy(temp); + } + return a_pool; +} + +void +NdbPool::drop_instance() +{ + if (pool_mutex == NULL) { + return; + } + NdbMutex_Lock(pool_mutex); + the_pool->release_all(); + delete the_pool; + the_pool = NULL; + NdbMutex* temp = pool_mutex; + NdbMutex_Unlock(temp); + NdbMutex_Destroy(temp); +} + +bool +NdbPool::initPoolMutex() +{ + bool ret_result = false; + if (pool_mutex == NULL) { + pool_mutex = NdbMutex_Create(); + ret_result = ((pool_mutex == NULL) ? false : true); + } + return ret_result; +} + +NdbPool::NdbPool(Uint32 max_no_objects, + Uint32 no_conn_objects) +{ + if (no_conn_objects > 1024) { + no_conn_objects = 1024; + } + if (max_no_objects > MAX_NDB_OBJECTS) { + max_no_objects = MAX_NDB_OBJECTS; + } else if (max_no_objects == 0) { + max_no_objects = 1; + } + m_max_ndb_objects = max_no_objects; + m_no_of_conn_objects = no_conn_objects; + m_no_of_objects = 0; + m_waiting = 0; + m_pool_reference = NULL; + m_hash_entry = NULL; + m_first_free = NULL_POOL; + m_first_not_in_use = NULL_POOL; + m_last_free = NULL_POOL; + input_pool_cond = NULL; + output_pool_cond = NULL; + m_output_queue = 0; + m_input_queue = 0; + m_signal_count = 0; +} + +NdbPool::~NdbPool() +{ + NdbCondition_Destroy(input_pool_cond); + NdbCondition_Destroy(output_pool_cond); +} + +void +NdbPool::release_all() +{ + int i; + for (i = 0; i < m_max_ndb_objects + 1; i++) { + if (m_pool_reference[i].ndb_reference != NULL) { + assert(m_pool_reference[i].in_use); + assert(m_pool_reference[i].free_entry); + delete m_pool_reference[i].ndb_reference; + } + } + delete [] m_pool_reference; + delete [] m_hash_entry; + m_pool_reference = NULL; + m_hash_entry = NULL; +} + +bool +NdbPool::init(Uint32 init_no_objects) +{ + bool ret_result = false; + int i; + do { + input_pool_cond = NdbCondition_Create(); + output_pool_cond = NdbCondition_Create(); + if (input_pool_cond == NULL || output_pool_cond == NULL) { + break; + } + if (init_no_objects > m_max_ndb_objects) { + init_no_objects = m_max_ndb_objects; + } + if (init_no_objects == 0) { + init_no_objects = 1; + } + m_pool_reference = new NdbPool::POOL_STRUCT[m_max_ndb_objects + 1]; + m_hash_entry = new Uint8[POOL_HASH_TABLE_SIZE]; + + if ((m_pool_reference == NULL) || (m_hash_entry == NULL)) { + delete [] m_pool_reference; + delete [] m_hash_entry; + break; + } + for (i = 0; i < m_max_ndb_objects + 1; i++) { + m_pool_reference[i].ndb_reference = NULL; + m_pool_reference[i].in_use = false; + m_pool_reference[i].next_free_object = i+1; + m_pool_reference[i].prev_free_object = i-1; + m_pool_reference[i].next_db_object = NULL_POOL; + m_pool_reference[i].prev_db_object = NULL_POOL; + } + for (i = 0; i < POOL_HASH_TABLE_SIZE; i++) { + m_hash_entry[i] = NULL_HASH; + } + m_pool_reference[m_max_ndb_objects].next_free_object = NULL_POOL; + m_pool_reference[1].prev_free_object = NULL_POOL; + m_first_not_in_use = 1; + m_no_of_objects = init_no_objects; + for (i = init_no_objects; i > 0 ; i--) { + Uint32 fake_id; + if (!allocate_ndb(fake_id, (const char*)NULL, (const char*)NULL)) { + release_all(); + break; + } + } + ret_result = true; + break; + } while (1); + return ret_result; +} + +/* +Get an Ndb object. +Input: +hint_id: 0 = no hint, otherwise a hint of which Ndb object the thread + used the last time. +a_db_name: NULL = don't check for database specific Ndb object, otherwise + a hint of which database is preferred. +Output: +hint_id: Returns id of Ndb object returned +Return value: Ndb object pointer +*/ +Ndb* +NdbPool::get_ndb_object(Uint32 &hint_id, + const char* a_catalog_name, + const char* a_schema_name) +{ + Ndb* ret_ndb = NULL; + Uint32 hash_entry = compute_hash(a_schema_name); + NdbMutex_Lock(pool_mutex); + while (1) { + /* + We start by checking if we can use the hinted Ndb object. + */ + if ((ret_ndb = get_hint_ndb(hint_id, hash_entry)) != NULL) { + break; + } + /* + The hinted Ndb object was not free. We need to allocate another object. + We start by checking for a free Ndb object connected to the same database. + */ + if (a_schema_name && (ret_ndb = get_db_hash(hint_id, + hash_entry, + a_catalog_name, + a_schema_name))) { + break; + } + /* + No Ndb object connected to the preferred database was found. + We look for a free Ndb object in general. + */ + if ((ret_ndb = get_free_list(hint_id, hash_entry)) != NULL) { + break; + } + /* + No free Ndb object was found. If we haven't allocated objects up until the + maximum number yet then we can allocate a new Ndb object here. + */ + if (m_no_of_objects < m_max_ndb_objects) { + if (allocate_ndb(hint_id, a_catalog_name, a_schema_name)) { + assert((ret_ndb = get_hint_ndb(hint_id, hash_entry)) != NULL); + break; + } + } + /* + We need to wait until an Ndb object becomes + available. + */ + if ((ret_ndb = wait_free_ndb(hint_id)) != NULL) { + break; + } + /* + Not even after waiting were we able to get hold of an Ndb object. We + return NULL to indicate this problem. + */ + ret_ndb = NULL; + break; + } + NdbMutex_Unlock(pool_mutex); + if (ret_ndb != NULL) { + /* + We need to set the catalog and schema name of the Ndb object before + returning it to the caller. + */ + ret_ndb->setCatalogName(a_catalog_name); + ret_ndb->setSchemaName(a_schema_name); + } + return ret_ndb; +} + +void +NdbPool::return_ndb_object(Ndb* returned_ndb, Uint32 id) +{ + NdbMutex_Lock(pool_mutex); + assert(id <= m_max_ndb_objects); + assert(id != 0); + assert(returned_ndb == m_pool_reference[id].ndb_reference); + bool wait_cond = m_waiting; + if (wait_cond) { + NdbCondition* pool_cond; + if (m_signal_count > 0) { + pool_cond = output_pool_cond; + m_signal_count--; + } else { + pool_cond = input_pool_cond; + } + add_wait_list(id); + NdbMutex_Unlock(pool_mutex); + NdbCondition_Signal(pool_cond); + } else { + add_free_list(id); + add_db_hash(id); + NdbMutex_Unlock(pool_mutex); + } +} + +bool +NdbPool::allocate_ndb(Uint32 &id, + const char* a_catalog_name, + const char* a_schema_name) +{ + Ndb* a_ndb; + if (m_first_not_in_use == NULL_POOL) { + return false; + } + if (a_schema_name) { + a_ndb = new Ndb(a_schema_name, a_catalog_name); + } else { + a_ndb = new Ndb(""); + } + if (a_ndb == NULL) { + return false; + } + a_ndb->init(m_no_of_conn_objects); + m_no_of_objects++; + + id = m_first_not_in_use; + Uint32 allocated_id = m_first_not_in_use; + m_first_not_in_use = m_pool_reference[allocated_id].next_free_object; + + m_pool_reference[allocated_id].ndb_reference = a_ndb; + m_pool_reference[allocated_id].in_use = true; + m_pool_reference[allocated_id].free_entry = false; + + add_free_list(allocated_id); + add_db_hash(allocated_id); + return true; +} + +void +NdbPool::add_free_list(Uint32 id) +{ + assert(!m_pool_reference[id].free_entry); + assert(m_pool_reference[id].in_use); + m_pool_reference[id].free_entry = true; + m_pool_reference[id].next_free_object = m_first_free; + m_pool_reference[id].prev_free_object = (Uint8)NULL_POOL; + m_first_free = (Uint8)id; + if (m_last_free == (Uint8)NULL_POOL) { + m_last_free = (Uint8)id; + } +} + +void +NdbPool::add_db_hash(Uint32 id) +{ + Ndb* t_ndb = m_pool_reference[id].ndb_reference; + const char* schema_name = t_ndb->getSchemaName(); + Uint32 hash_entry = compute_hash(schema_name); + Uint8 next_db_entry = m_hash_entry[hash_entry]; + m_pool_reference[id].next_db_object = next_db_entry; + m_pool_reference[id].prev_db_object = (Uint8)NULL_HASH; + m_hash_entry[hash_entry] = (Uint8)id; +} + +Ndb* +NdbPool::get_free_list(Uint32 &id, Uint32 hash_entry) +{ + if (m_first_free == NULL_POOL) { + return NULL; + } + id = m_first_free; + Ndb* ret_ndb = get_hint_ndb(m_first_free, hash_entry); + assert(ret_ndb != NULL); + return ret_ndb; +} + +Ndb* +NdbPool::get_db_hash(Uint32 &id, + Uint32 hash_entry, + const char *a_catalog_name, + const char *a_schema_name) +{ + Uint32 entry_id = m_hash_entry[hash_entry]; + bool found = false; + while (entry_id != NULL_HASH) { + Ndb* t_ndb = m_pool_reference[entry_id].ndb_reference; + const char *a_ndb_catalog_name = t_ndb->getCatalogName(); + if (strcmp(a_catalog_name, a_ndb_catalog_name) == 0) { + const char *a_ndb_schema_name = t_ndb->getSchemaName(); + if (strcmp(a_schema_name, a_ndb_schema_name) == 0) { + found = true; + break; + } + } + entry_id = m_pool_reference[entry_id].next_db_object; + } + if (found) { + id = entry_id; + Ndb* ret_ndb = get_hint_ndb(entry_id, hash_entry); + assert(ret_ndb != NULL); + return ret_ndb; + } + return NULL; +} + +Ndb* +NdbPool::get_hint_ndb(Uint32 hint_id, Uint32 hash_entry) +{ + Ndb* ret_ndb = NULL; + do { + if ((hint_id != 0) && + (hint_id <= m_max_ndb_objects) && + (m_pool_reference[hint_id].in_use) && + (m_pool_reference[hint_id].free_entry)) { + ret_ndb = m_pool_reference[hint_id].ndb_reference; + if (ret_ndb != NULL) { + break; + } else { + assert(false); + } + } + return NULL; + } while (1); + /* + This is where we remove the entry from the free list and from the db hash + table. + */ + remove_free_list(hint_id); + remove_db_hash(hint_id, hash_entry); + return ret_ndb; +} + +void +NdbPool::remove_free_list(Uint32 id) +{ + Uint8 next_free_entry = m_pool_reference[id].next_free_object; + Uint8 prev_free_entry = m_pool_reference[id].prev_free_object; + if (prev_free_entry == (Uint8)NULL_POOL) { + m_first_free = next_free_entry; + } else { + m_pool_reference[prev_free_entry].next_free_object = next_free_entry; + } + if (next_free_entry == (Uint8)NULL_POOL) { + m_last_free = prev_free_entry; + } else { + m_pool_reference[next_free_entry].prev_free_object = prev_free_entry; + } + m_pool_reference[id].next_free_object = NULL_POOL; + m_pool_reference[id].prev_free_object = NULL_POOL; + m_pool_reference[id].free_entry = false; +} + +void +NdbPool::remove_db_hash(Uint32 id, Uint32 hash_entry) +{ + Uint8 next_free_entry = m_pool_reference[id].next_db_object; + Uint8 prev_free_entry = m_pool_reference[id].prev_db_object; + if (prev_free_entry == (Uint8)NULL_HASH) { + m_hash_entry[hash_entry] = next_free_entry; + } else { + m_pool_reference[prev_free_entry].next_db_object = next_free_entry; + } + if (next_free_entry == (Uint8)NULL_HASH) { + ; + } else { + m_pool_reference[next_free_entry].prev_db_object = prev_free_entry; + } + m_pool_reference[id].next_db_object = NULL_HASH; + m_pool_reference[id].prev_db_object = NULL_HASH; +} + +Uint32 +NdbPool::compute_hash(const char *a_schema_name) +{ + Uint32 len = strlen(a_schema_name); + Uint32 h = 147; + for (Uint32 i = 0; i < len; i++) { + Uint32 c = a_schema_name[i]; + h = (h << 5) + h + c; + } + h &= (POOL_HASH_TABLE_SIZE - 1); + return h; +} + +Ndb* +NdbPool::wait_free_ndb(Uint32 &id) +{ + int res; + int time_out = 3500; + do { + NdbCondition* tmp = input_pool_cond; + m_waiting++; + m_input_queue++; + time_out -= 500; + res = NdbCondition_WaitTimeout(input_pool_cond, pool_mutex, time_out); + if (tmp == input_pool_cond) { + m_input_queue--; + } else { + m_output_queue--; + if (m_output_queue == 0) { + switch_condition_queue(); + } + } + m_waiting--; + } while (res == 0 && m_first_wait == NULL_POOL); + if (res != 0 && m_first_wait == NULL_POOL) { + return NULL; + } + id = m_first_wait; + remove_wait_list(); + assert(m_waiting != 0 || m_first_wait == NULL_POOL); + return m_pool_reference[id].ndb_reference; +} + +void +NdbPool::remove_wait_list() +{ + Uint32 id = m_first_wait; + m_first_wait = m_pool_reference[id].next_free_object; + m_pool_reference[id].next_free_object = NULL_POOL; + m_pool_reference[id].prev_free_object = NULL_POOL; + m_pool_reference[id].free_entry = false; +} + +void +NdbPool::add_wait_list(Uint32 id) +{ + m_pool_reference[id].next_free_object = m_first_wait; + m_first_wait = id; +} + +void +NdbPool::switch_condition_queue() +{ + m_signal_count = m_input_queue; + Uint8 move_queue = m_input_queue; + m_input_queue = m_output_queue; + m_output_queue = move_queue; + + NdbCondition* move_cond = input_pool_cond; + input_pool_cond = output_pool_cond; + output_pool_cond = move_cond; +} + |