/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "NdbPoolImpl.hpp" NdbMutex *NdbPool::pool_mutex = NULL; NdbPool *the_pool = NULL; NdbPool* NdbPool::create_instance(Uint32 max_ndb_obj, Uint32 no_conn_obj, Uint32 init_no_ndb_objects) { if (!initPoolMutex()) { return NULL; } NdbMutex_Lock(pool_mutex); NdbPool* a_pool; if (the_pool != NULL) { a_pool = NULL; } else { the_pool = new NdbPool(max_ndb_obj, no_conn_obj); if (!the_pool->init(init_no_ndb_objects)) { delete the_pool; the_pool = NULL; } a_pool = the_pool; } NdbMutex* temp = pool_mutex; if (a_pool == NULL) { pool_mutex = NULL; } NdbMutex_Unlock(pool_mutex); if (a_pool == NULL) { NdbMutex_Destroy(temp); } return a_pool; } void NdbPool::drop_instance() { if (pool_mutex == NULL) { return; } NdbMutex_Lock(pool_mutex); the_pool->release_all(); delete the_pool; the_pool = NULL; NdbMutex* temp = pool_mutex; NdbMutex_Unlock(temp); NdbMutex_Destroy(temp); } bool NdbPool::initPoolMutex() { bool ret_result = false; if (pool_mutex == NULL) { pool_mutex = NdbMutex_Create(); ret_result = ((pool_mutex == NULL) ? false : true); } return ret_result; } NdbPool::NdbPool(Uint32 max_no_objects, Uint32 no_conn_objects) { if (no_conn_objects > 1024) { no_conn_objects = 1024; } if (max_no_objects > MAX_NDB_OBJECTS) { max_no_objects = MAX_NDB_OBJECTS; } else if (max_no_objects == 0) { max_no_objects = 1; } m_max_ndb_objects = max_no_objects; m_no_of_conn_objects = no_conn_objects; m_no_of_objects = 0; m_waiting = 0; m_pool_reference = NULL; m_hash_entry = NULL; m_first_free = NULL_POOL; m_first_not_in_use = NULL_POOL; m_last_free = NULL_POOL; input_pool_cond = NULL; output_pool_cond = NULL; m_output_queue = 0; m_input_queue = 0; m_signal_count = 0; } NdbPool::~NdbPool() { NdbCondition_Destroy(input_pool_cond); NdbCondition_Destroy(output_pool_cond); } void NdbPool::release_all() { int i; for (i = 0; i < m_max_ndb_objects + 1; i++) { if (m_pool_reference[i].ndb_reference != NULL) { assert(m_pool_reference[i].in_use); assert(m_pool_reference[i].free_entry); delete m_pool_reference[i].ndb_reference; } } delete [] m_pool_reference; delete [] m_hash_entry; m_pool_reference = NULL; m_hash_entry = NULL; } bool NdbPool::init(Uint32 init_no_objects) { bool ret_result = false; int i; do { input_pool_cond = NdbCondition_Create(); output_pool_cond = NdbCondition_Create(); if (input_pool_cond == NULL || output_pool_cond == NULL) { break; } if (init_no_objects > m_max_ndb_objects) { init_no_objects = m_max_ndb_objects; } if (init_no_objects == 0) { init_no_objects = 1; } m_pool_reference = new NdbPool::POOL_STRUCT[m_max_ndb_objects + 1]; m_hash_entry = new Uint8[POOL_HASH_TABLE_SIZE]; if ((m_pool_reference == NULL) || (m_hash_entry == NULL)) { delete [] m_pool_reference; delete [] m_hash_entry; break; } for (i = 0; i < m_max_ndb_objects + 1; i++) { m_pool_reference[i].ndb_reference = NULL; m_pool_reference[i].in_use = false; m_pool_reference[i].next_free_object = i+1; m_pool_reference[i].prev_free_object = i-1; m_pool_reference[i].next_db_object = NULL_POOL; m_pool_reference[i].prev_db_object = NULL_POOL; } for (i = 0; i < POOL_HASH_TABLE_SIZE; i++) { m_hash_entry[i] = NULL_HASH; } m_pool_reference[m_max_ndb_objects].next_free_object = NULL_POOL; m_pool_reference[1].prev_free_object = NULL_POOL; m_first_not_in_use = 1; m_no_of_objects = init_no_objects; for (i = init_no_objects; i > 0 ; i--) { Uint32 fake_id; if (!allocate_ndb(fake_id, (const char*)NULL, (const char*)NULL)) { release_all(); break; } } ret_result = true; break; } while (1); return ret_result; } /* Get an Ndb object. Input: hint_id: 0 = no hint, otherwise a hint of which Ndb object the thread used the last time. a_db_name: NULL = don't check for database specific Ndb object, otherwise a hint of which database is preferred. Output: hint_id: Returns id of Ndb object returned Return value: Ndb object pointer */ Ndb* NdbPool::get_ndb_object(Uint32 &hint_id, const char* a_catalog_name, const char* a_schema_name) { Ndb* ret_ndb = NULL; Uint32 hash_entry = compute_hash(a_schema_name); NdbMutex_Lock(pool_mutex); while (1) { /* We start by checking if we can use the hinted Ndb object. */ if ((ret_ndb = get_hint_ndb(hint_id, hash_entry)) != NULL) { break; } /* The hinted Ndb object was not free. We need to allocate another object. We start by checking for a free Ndb object connected to the same database. */ if (a_schema_name && (ret_ndb = get_db_hash(hint_id, hash_entry, a_catalog_name, a_schema_name))) { break; } /* No Ndb object connected to the preferred database was found. We look for a free Ndb object in general. */ if ((ret_ndb = get_free_list(hint_id, hash_entry)) != NULL) { break; } /* No free Ndb object was found. If we haven't allocated objects up until the maximum number yet then we can allocate a new Ndb object here. */ if (m_no_of_objects < m_max_ndb_objects) { if (allocate_ndb(hint_id, a_catalog_name, a_schema_name)) { assert((ret_ndb = get_hint_ndb(hint_id, hash_entry)) != NULL); break; } } /* We need to wait until an Ndb object becomes available. */ if ((ret_ndb = wait_free_ndb(hint_id)) != NULL) { break; } /* Not even after waiting were we able to get hold of an Ndb object. We return NULL to indicate this problem. */ ret_ndb = NULL; break; } NdbMutex_Unlock(pool_mutex); if (ret_ndb != NULL) { /* We need to set the catalog and schema name of the Ndb object before returning it to the caller. */ ret_ndb->setCatalogName(a_catalog_name); ret_ndb->setSchemaName(a_schema_name); } return ret_ndb; } void NdbPool::return_ndb_object(Ndb* returned_ndb, Uint32 id) { NdbMutex_Lock(pool_mutex); assert(id <= m_max_ndb_objects); assert(id != 0); assert(returned_ndb == m_pool_reference[id].ndb_reference); bool wait_cond = m_waiting; if (wait_cond) { NdbCondition* pool_cond; if (m_signal_count > 0) { pool_cond = output_pool_cond; m_signal_count--; } else { pool_cond = input_pool_cond; } add_wait_list(id); NdbMutex_Unlock(pool_mutex); NdbCondition_Signal(pool_cond); } else { add_free_list(id); add_db_hash(id); NdbMutex_Unlock(pool_mutex); } } bool NdbPool::allocate_ndb(Uint32 &id, const char* a_catalog_name, const char* a_schema_name) { Ndb* a_ndb; if (m_first_not_in_use == NULL_POOL) { return false; } if (a_schema_name) { a_ndb = new Ndb(a_schema_name, a_catalog_name); } else { a_ndb = new Ndb(""); } if (a_ndb == NULL) { return false; } a_ndb->init(m_no_of_conn_objects); m_no_of_objects++; id = m_first_not_in_use; Uint32 allocated_id = m_first_not_in_use; m_first_not_in_use = m_pool_reference[allocated_id].next_free_object; m_pool_reference[allocated_id].ndb_reference = a_ndb; m_pool_reference[allocated_id].in_use = true; m_pool_reference[allocated_id].free_entry = false; add_free_list(allocated_id); add_db_hash(allocated_id); return true; } void NdbPool::add_free_list(Uint32 id) { assert(!m_pool_reference[id].free_entry); assert(m_pool_reference[id].in_use); m_pool_reference[id].free_entry = true; m_pool_reference[id].next_free_object = m_first_free; m_pool_reference[id].prev_free_object = (Uint8)NULL_POOL; m_first_free = (Uint8)id; if (m_last_free == (Uint8)NULL_POOL) { m_last_free = (Uint8)id; } } void NdbPool::add_db_hash(Uint32 id) { Ndb* t_ndb = m_pool_reference[id].ndb_reference; const char* schema_name = t_ndb->getSchemaName(); Uint32 hash_entry = compute_hash(schema_name); Uint8 next_db_entry = m_hash_entry[hash_entry]; m_pool_reference[id].next_db_object = next_db_entry; m_pool_reference[id].prev_db_object = (Uint8)NULL_HASH; m_hash_entry[hash_entry] = (Uint8)id; } Ndb* NdbPool::get_free_list(Uint32 &id, Uint32 hash_entry) { if (m_first_free == NULL_POOL) { return NULL; } id = m_first_free; Ndb* ret_ndb = get_hint_ndb(m_first_free, hash_entry); assert(ret_ndb != NULL); return ret_ndb; } Ndb* NdbPool::get_db_hash(Uint32 &id, Uint32 hash_entry, const char *a_catalog_name, const char *a_schema_name) { Uint32 entry_id = m_hash_entry[hash_entry]; bool found = false; while (entry_id != NULL_HASH) { Ndb* t_ndb = m_pool_reference[entry_id].ndb_reference; const char *a_ndb_catalog_name = t_ndb->getCatalogName(); if (strcmp(a_catalog_name, a_ndb_catalog_name) == 0) { const char *a_ndb_schema_name = t_ndb->getSchemaName(); if (strcmp(a_schema_name, a_ndb_schema_name) == 0) { found = true; break; } } entry_id = m_pool_reference[entry_id].next_db_object; } if (found) { id = entry_id; Ndb* ret_ndb = get_hint_ndb(entry_id, hash_entry); assert(ret_ndb != NULL); return ret_ndb; } return NULL; } Ndb* NdbPool::get_hint_ndb(Uint32 hint_id, Uint32 hash_entry) { Ndb* ret_ndb = NULL; do { if ((hint_id != 0) && (hint_id <= m_max_ndb_objects) && (m_pool_reference[hint_id].in_use) && (m_pool_reference[hint_id].free_entry)) { ret_ndb = m_pool_reference[hint_id].ndb_reference; if (ret_ndb != NULL) { break; } else { assert(false); } } return NULL; } while (1); /* This is where we remove the entry from the free list and from the db hash table. */ remove_free_list(hint_id); remove_db_hash(hint_id, hash_entry); return ret_ndb; } void NdbPool::remove_free_list(Uint32 id) { Uint8 next_free_entry = m_pool_reference[id].next_free_object; Uint8 prev_free_entry = m_pool_reference[id].prev_free_object; if (prev_free_entry == (Uint8)NULL_POOL) { m_first_free = next_free_entry; } else { m_pool_reference[prev_free_entry].next_free_object = next_free_entry; } if (next_free_entry == (Uint8)NULL_POOL) { m_last_free = prev_free_entry; } else { m_pool_reference[next_free_entry].prev_free_object = prev_free_entry; } m_pool_reference[id].next_free_object = NULL_POOL; m_pool_reference[id].prev_free_object = NULL_POOL; m_pool_reference[id].free_entry = false; } void NdbPool::remove_db_hash(Uint32 id, Uint32 hash_entry) { Uint8 next_free_entry = m_pool_reference[id].next_db_object; Uint8 prev_free_entry = m_pool_reference[id].prev_db_object; if (prev_free_entry == (Uint8)NULL_HASH) { m_hash_entry[hash_entry] = next_free_entry; } else { m_pool_reference[prev_free_entry].next_db_object = next_free_entry; } if (next_free_entry == (Uint8)NULL_HASH) { ; } else { m_pool_reference[next_free_entry].prev_db_object = prev_free_entry; } m_pool_reference[id].next_db_object = NULL_HASH; m_pool_reference[id].prev_db_object = NULL_HASH; } Uint32 NdbPool::compute_hash(const char *a_schema_name) { Uint32 len = strlen(a_schema_name); Uint32 h = 147; for (Uint32 i = 0; i < len; i++) { Uint32 c = a_schema_name[i]; h = (h << 5) + h + c; } h &= (POOL_HASH_TABLE_SIZE - 1); return h; } Ndb* NdbPool::wait_free_ndb(Uint32 &id) { int res; int time_out = 3500; do { NdbCondition* tmp = input_pool_cond; m_waiting++; m_input_queue++; time_out -= 500; res = NdbCondition_WaitTimeout(input_pool_cond, pool_mutex, time_out); if (tmp == input_pool_cond) { m_input_queue--; } else { m_output_queue--; if (m_output_queue == 0) { switch_condition_queue(); } } m_waiting--; } while (res == 0 && m_first_wait == NULL_POOL); if (res != 0 && m_first_wait == NULL_POOL) { return NULL; } id = m_first_wait; remove_wait_list(); assert(m_waiting != 0 || m_first_wait == NULL_POOL); return m_pool_reference[id].ndb_reference; } void NdbPool::remove_wait_list() { Uint32 id = m_first_wait; m_first_wait = m_pool_reference[id].next_free_object; m_pool_reference[id].next_free_object = NULL_POOL; m_pool_reference[id].prev_free_object = NULL_POOL; m_pool_reference[id].free_entry = false; } void NdbPool::add_wait_list(Uint32 id) { m_pool_reference[id].next_free_object = m_first_wait; m_first_wait = id; } void NdbPool::switch_condition_queue() { m_signal_count = m_input_queue; Uint8 move_queue = m_input_queue; m_input_queue = m_output_queue; m_output_queue = move_queue; NdbCondition* move_cond = input_pool_cond; input_pool_cond = output_pool_cond; output_pool_cond = move_cond; }