summaryrefslogtreecommitdiff
path: root/storage/spider/spd_conn.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/spider/spd_conn.cc')
-rw-r--r--storage/spider/spd_conn.cc758
1 files changed, 581 insertions, 177 deletions
diff --git a/storage/spider/spd_conn.cc b/storage/spider/spd_conn.cc
index 5c96fe321bd..3fa825a9d88 100644
--- a/storage/spider/spd_conn.cc
+++ b/storage/spider/spd_conn.cc
@@ -1,4 +1,4 @@
-/* Copyright (C) 2008-2015 Kentoku Shiba
+/* Copyright (C) 2008-2017 Kentoku Shiba
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -11,10 +11,12 @@
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#define MYSQL_SERVER 1
+#include <my_global.h>
#include "mysql_version.h"
+#include "spd_environ.h"
#if MYSQL_VERSION_ID < 50500
#include "mysql_priv.h"
#include <mysql/plugin.h>
@@ -25,6 +27,7 @@
#include "sql_partition.h"
#include "tztime.h"
#endif
+#include "spd_err.h"
#include "spd_param.h"
#include "spd_db_include.h"
#include "spd_include.h"
@@ -36,10 +39,24 @@
#include "spd_direct_sql.h"
#include "spd_ping_table.h"
#include "spd_malloc.h"
+#include "spd_err.h"
+
+#ifdef SPIDER_HAS_NEXT_THREAD_ID
+#define SPIDER_set_next_thread_id(A)
+#else
+extern ulong *spd_db_att_thread_id;
+inline void SPIDER_set_next_thread_id(THD *A)
+{
+ pthread_mutex_lock(&LOCK_thread_count);
+ A->thread_id = (*spd_db_att_thread_id)++;
+ pthread_mutex_unlock(&LOCK_thread_count);
+}
+#endif
extern handlerton *spider_hton_ptr;
extern SPIDER_DBTON spider_dbton[SPIDER_DBTON_SIZE];
pthread_mutex_t spider_conn_id_mutex;
+pthread_mutex_t spider_ipport_conn_mutex;
ulonglong spider_conn_id = 1;
#ifndef WITHOUT_SPIDER_BG_SEARCH
@@ -47,6 +64,8 @@ extern pthread_attr_t spider_pt_attr;
#ifdef HAVE_PSI_INTERFACE
extern PSI_mutex_key spd_key_mutex_mta_conn;
+extern PSI_mutex_key spd_key_mutex_conn_i;
+extern PSI_cond_key spd_key_cond_conn_i;
#ifndef WITHOUT_SPIDER_BG_SEARCH
extern PSI_mutex_key spd_key_mutex_bg_conn_chain;
extern PSI_mutex_key spd_key_mutex_bg_conn_sync;
@@ -74,6 +93,9 @@ extern SPIDER_TRX *spider_global_trx;
HASH spider_open_connections;
uint spider_open_connections_id;
+HASH spider_ipport_conns;
+long spider_conn_mutex_id = 0;
+
const char *spider_open_connections_func_name;
const char *spider_open_connections_file_name;
ulong spider_open_connections_line_no;
@@ -108,6 +130,17 @@ uchar *spider_conn_get_key(
DBUG_RETURN((uchar*) conn->conn_key);
}
+uchar *spider_ipport_conn_get_key(
+ SPIDER_IP_PORT_CONN *ip_port,
+ size_t *length,
+ my_bool not_used __attribute__ ((unused))
+)
+{
+ DBUG_ENTER("spider_ipport_conn_get_key");
+ *length = ip_port->key_len;
+ DBUG_RETURN((uchar*) ip_port->key);
+}
+
int spider_reset_conn_setted_parameter(
SPIDER_CONN *conn,
THD *thd
@@ -151,10 +184,10 @@ int spider_free_conn_alloc(
SPIDER_CONN *conn
) {
DBUG_ENTER("spider_free_conn_alloc");
- spider_db_disconnect(conn);
#ifndef WITHOUT_SPIDER_BG_SEARCH
spider_free_conn_thread(conn);
#endif
+ spider_db_disconnect(conn);
if (conn->db_conn)
{
delete conn->db_conn;
@@ -174,6 +207,7 @@ void spider_free_conn_from_trx(
int *roop_count
) {
ha_spider *spider;
+ SPIDER_IP_PORT_CONN *ip_port_conn = conn->ip_port_conn;
DBUG_ENTER("spider_free_conn_from_trx");
spider_conn_clear_queue(conn);
conn->use_for_active_standby = FALSE;
@@ -251,6 +285,15 @@ void spider_free_conn_from_trx(
pthread_mutex_unlock(&spider_conn_mutex);
spider_free_conn(conn);
} else {
+ if (ip_port_conn)
+ { /* exists */
+ if (ip_port_conn->waiting_count)
+ {
+ pthread_mutex_lock(&ip_port_conn->mutex);
+ pthread_cond_signal(&ip_port_conn->cond);
+ pthread_mutex_unlock(&ip_port_conn->mutex);
+ }
+ }
if (spider_open_connections.array.max_element > old_elements)
{
spider_alloc_calc_mem(spider_current_trx,
@@ -408,6 +451,7 @@ SPIDER_CONN *spider_create_conn(
) {
int *need_mon;
SPIDER_CONN *conn;
+ SPIDER_IP_PORT_CONN *ip_port_conn;
char *tmp_name, *tmp_host, *tmp_username, *tmp_password, *tmp_socket;
char *tmp_wrapper, *tmp_ssl_ca, *tmp_ssl_capath, *tmp_ssl_cert;
char *tmp_ssl_cipher, *tmp_ssl_key, *tmp_default_file, *tmp_default_group;
@@ -624,6 +668,28 @@ SPIDER_CONN *spider_create_conn(
conn->dbton_id = share->hs_dbton_ids[link_idx];
}
#endif
+ if (conn->dbton_id == SPIDER_DBTON_SIZE)
+ {
+#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
+ if (conn->conn_kind == SPIDER_CONN_KIND_MYSQL)
+ {
+#endif
+ my_printf_error(
+ ER_SPIDER_SQL_WRAPPER_IS_INVALID_NUM,
+ ER_SPIDER_SQL_WRAPPER_IS_INVALID_STR,
+ MYF(0), conn->tgt_wrapper);
+ *error_num = ER_SPIDER_SQL_WRAPPER_IS_INVALID_NUM;
+#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
+ } else {
+ my_printf_error(
+ ER_SPIDER_NOSQL_WRAPPER_IS_INVALID_NUM,
+ ER_SPIDER_NOSQL_WRAPPER_IS_INVALID_STR,
+ MYF(0), conn->tgt_wrapper);
+ *error_num = ER_SPIDER_NOSQL_WRAPPER_IS_INVALID_NUM;
+ }
+#endif
+ goto error_invalid_wrapper;
+ }
if (!(conn->db_conn = spider_dbton[conn->dbton_id].create_db_conn(conn)))
{
*error_num = HA_ERR_OUT_OF_MEM;
@@ -666,6 +732,47 @@ SPIDER_CONN *spider_create_conn(
++spider_conn_id;
pthread_mutex_unlock(&spider_conn_id_mutex);
+ pthread_mutex_lock(&spider_ipport_conn_mutex);
+#ifdef SPIDER_HAS_HASH_VALUE_TYPE
+ if ((ip_port_conn = (SPIDER_IP_PORT_CONN*) my_hash_search_using_hash_value(
+ &spider_ipport_conns, conn->conn_key_hash_value,
+ (uchar*)conn->conn_key, conn->conn_key_length)))
+#else
+ if ((ip_port_conn = (SPIDER_IP_PORT_CONN*) my_hash_search(
+ &spider_ipport_conns, (uchar*)conn->conn_key, conn->conn_key_length)))
+#endif
+ { /* exists, +1 */
+ pthread_mutex_unlock(&spider_ipport_conn_mutex);
+ pthread_mutex_lock(&ip_port_conn->mutex);
+ if (spider_param_max_connections())
+ { /* enable conncetion pool */
+ if (ip_port_conn->ip_port_count >= spider_param_max_connections())
+ { /* bigger than the max num of connections, free conn and return NULL */
+ pthread_mutex_unlock(&ip_port_conn->mutex);
+ *error_num = ER_SPIDER_CON_COUNT_ERROR;
+ goto error_too_many_ipport_count;
+ }
+ }
+ ip_port_conn->ip_port_count++;
+ pthread_mutex_unlock(&ip_port_conn->mutex);
+ }
+ else
+ {// do not exist
+ ip_port_conn = spider_create_ipport_conn(conn);
+ if (!ip_port_conn) {
+ /* failed, always do not effect 'create conn' */
+ pthread_mutex_unlock(&spider_ipport_conn_mutex);
+ DBUG_RETURN(conn);
+ }
+ if (my_hash_insert(&spider_ipport_conns, (uchar *)ip_port_conn)) {
+ /* insert failed, always do not effect 'create conn' */
+ pthread_mutex_unlock(&spider_ipport_conn_mutex);
+ DBUG_RETURN(conn);
+ }
+ pthread_mutex_unlock(&spider_ipport_conn_mutex);
+ }
+ conn->ip_port_conn = ip_port_conn;
+
DBUG_RETURN(conn);
/*
@@ -673,10 +780,12 @@ error_init_lock_table_hash:
DBUG_ASSERT(!conn->mta_conn_mutex_file_pos.file_name);
pthread_mutex_destroy(&conn->mta_conn_mutex);
*/
+error_too_many_ipport_count:
error_mta_conn_mutex_init:
error_db_conn_init:
delete conn->db_conn;
error_db_conn_create:
+error_invalid_wrapper:
spider_free(spider_current_trx, conn, MYF(0));
error_alloc_conn:
DBUG_RETURN(NULL);
@@ -826,16 +935,27 @@ SPIDER_CONN *spider_get_conn(
#endif
{
pthread_mutex_unlock(&spider_conn_mutex);
- DBUG_PRINT("info",("spider create new conn"));
- if (!(conn = spider_create_conn(share, spider, link_idx,
- base_link_idx, conn_kind, error_num)))
- goto error;
- *conn->conn_key = *conn_key;
- if (spider)
- {
- spider->conns[base_link_idx] = conn;
- if (spider_bit_is_set(spider->conn_can_fo, base_link_idx))
- conn->use_for_active_standby = TRUE;
+ if (spider_param_max_connections())
+ { /* enable connection pool */
+ conn = spider_get_conn_from_idle_connection(share, link_idx, conn_key, spider, conn_kind, base_link_idx, error_num);
+ /* failed get conn, goto error */
+ if (!conn)
+ goto error;
+
+ }
+ else
+ { /* did not enable conncetion pool , create_conn */
+ DBUG_PRINT("info",("spider create new conn"));
+ if (!(conn = spider_create_conn(share, spider, link_idx,
+ base_link_idx, conn_kind, error_num)))
+ goto error;
+ *conn->conn_key = *conn_key;
+ if (spider)
+ {
+ spider->conns[base_link_idx] = conn;
+ if (spider_bit_is_set(spider->conn_can_fo, base_link_idx))
+ conn->use_for_active_standby = TRUE;
+ }
}
} else {
#ifdef HASH_UPDATE_WITH_HASH_VALUE
@@ -1107,6 +1227,14 @@ int spider_free_conn(
) {
DBUG_ENTER("spider_free_conn");
DBUG_PRINT("info", ("spider conn=%p", conn));
+ SPIDER_IP_PORT_CONN* ip_port_conn = conn->ip_port_conn;
+ if (ip_port_conn)
+ { /* free conn, ip_port_count-- */
+ pthread_mutex_lock(&ip_port_conn->mutex);
+ if (ip_port_conn->ip_port_count > 0)
+ ip_port_conn->ip_port_count--;
+ pthread_mutex_unlock(&ip_port_conn->mutex);
+ }
spider_free_conn_alloc(conn);
spider_free(spider_current_trx, conn, MYF(0));
DBUG_RETURN(0);
@@ -1569,6 +1697,7 @@ int spider_set_conn_bg_param(
SPIDER_RESULT_LIST *result_list = &spider->result_list;
THD *thd = spider->trx->thd;
DBUG_ENTER("spider_set_conn_bg_param");
+ DBUG_PRINT("info",("spider spider=%p", spider));
bgs_mode =
spider_param_bgs_mode(thd, share->bgs_mode);
if (bgs_mode == 0)
@@ -1609,28 +1738,44 @@ int spider_set_conn_bg_param(
if (result_list->bgs_phase > 0)
{
- for (
- roop_count = spider_conn_link_idx_next(share->link_statuses,
- spider->conn_link_idx, -1, share->link_count,
- spider->lock_mode ?
- SPIDER_LINK_STATUS_RECOVERY : SPIDER_LINK_STATUS_OK);
- roop_count < (int) share->link_count;
- roop_count = spider_conn_link_idx_next(share->link_statuses,
- spider->conn_link_idx, roop_count, share->link_count,
- spider->lock_mode ?
- SPIDER_LINK_STATUS_RECOVERY : SPIDER_LINK_STATUS_OK)
- ) {
- if ((error_num = spider_create_conn_thread(spider->conns[roop_count])))
- DBUG_RETURN(error_num);
+#ifdef SPIDER_HAS_GROUP_BY_HANDLER
+ if (spider->use_fields)
+ {
+ SPIDER_LINK_IDX_CHAIN *link_idx_chain;
+ spider_fields *fields = spider->fields;
+ fields->set_pos_to_first_link_idx_chain();
+ while ((link_idx_chain = fields->get_next_link_idx_chain()))
+ {
+ if ((error_num = spider_create_conn_thread(link_idx_chain->conn)))
+ DBUG_RETURN(error_num);
+ }
+ } else {
+#endif
+ for (
+ roop_count = spider_conn_link_idx_next(share->link_statuses,
+ spider->conn_link_idx, -1, share->link_count,
+ spider->lock_mode ?
+ SPIDER_LINK_STATUS_RECOVERY : SPIDER_LINK_STATUS_OK);
+ roop_count < (int) share->link_count;
+ roop_count = spider_conn_link_idx_next(share->link_statuses,
+ spider->conn_link_idx, roop_count, share->link_count,
+ spider->lock_mode ?
+ SPIDER_LINK_STATUS_RECOVERY : SPIDER_LINK_STATUS_OK)
+ ) {
+ if ((error_num = spider_create_conn_thread(spider->conns[roop_count])))
+ DBUG_RETURN(error_num);
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
- if ((error_num = spider_create_conn_thread(
- spider->hs_r_conns[roop_count])))
- DBUG_RETURN(error_num);
- if ((error_num = spider_create_conn_thread(
- spider->hs_w_conns[roop_count])))
- DBUG_RETURN(error_num);
+ if ((error_num = spider_create_conn_thread(
+ spider->hs_r_conns[roop_count])))
+ DBUG_RETURN(error_num);
+ if ((error_num = spider_create_conn_thread(
+ spider->hs_w_conns[roop_count])))
+ DBUG_RETURN(error_num);
#endif
+ }
+#ifdef SPIDER_HAS_GROUP_BY_HANDLER
}
+#endif
}
DBUG_RETURN(0);
}
@@ -1975,6 +2120,7 @@ int spider_bg_conn_search(
SPIDER_RESULT_LIST *result_list = &spider->result_list;
bool with_lock = FALSE;
DBUG_ENTER("spider_bg_conn_search");
+ DBUG_PRINT("info",("spider spider=%p", spider));
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
if (spider->conn_kind[link_idx] == SPIDER_CONN_KIND_MYSQL)
{
@@ -2016,7 +2162,7 @@ int spider_bg_conn_search(
DBUG_RETURN(result_list->bgs_error);
}
}
- if (!result_list->finish_flg)
+ if (result_list->bgs_working || !result_list->finish_flg)
{
pthread_mutex_lock(&conn->bg_conn_mutex);
if (!result_list->finish_flg)
@@ -2040,10 +2186,18 @@ int spider_bg_conn_search(
result_list->bgs_error_msg, MYF(0));
DBUG_RETURN(result_list->bgs_error);
}
+ DBUG_PRINT("info",("spider result_list->quick_mode=%d",
+ result_list->quick_mode));
+ DBUG_PRINT("info",("spider result_list->bgs_current->result=%p",
+ result_list->bgs_current->result));
if (
result_list->quick_mode == 0 ||
!result_list->bgs_current->result
) {
+ DBUG_PRINT("info",("spider result_list->bgs_second_read=%lld",
+ result_list->bgs_second_read));
+ DBUG_PRINT("info",("spider result_list->bgs_split_read=%lld",
+ result_list->bgs_split_read));
result_list->split_read =
result_list->bgs_second_read > 0 ?
result_list->bgs_second_read :
@@ -2053,6 +2207,7 @@ int spider_bg_conn_search(
result_list->split_read ?
result_list->split_read :
result_list->internal_limit - result_list->record_num;
+ DBUG_PRINT("info",("spider sql_kinds=%u", spider->sql_kinds));
if (spider->sql_kinds & SPIDER_SQL_KIND_SQL)
{
if ((error_num = spider->reappend_limit_sql_part(
@@ -2094,6 +2249,9 @@ int spider_bg_conn_search(
conn->bg_target = spider;
conn->link_idx = link_idx;
conn->bg_discard_result = discard_result;
+#ifdef SPIDER_HAS_GROUP_BY_HANDLER
+ conn->link_idx_chain = spider->link_idx_chain;
+#endif
pthread_mutex_lock(&conn->bg_conn_sync_mutex);
pthread_cond_signal(&conn->bg_conn_cond);
pthread_mutex_unlock(&conn->bg_conn_mutex);
@@ -2105,11 +2263,33 @@ int spider_bg_conn_search(
DBUG_PRINT("info",("spider bg current->finish_flg=%s",
result_list->current ?
(result_list->current->finish_flg ? "TRUE" : "FALSE") : "NULL"));
+ if (result_list->bgs_error)
+ {
+ DBUG_PRINT("info",("spider bg error"));
+ if (result_list->bgs_error != HA_ERR_END_OF_FILE)
+ {
+ if (result_list->bgs_error_with_message)
+ my_message(result_list->bgs_error,
+ result_list->bgs_error_msg, MYF(0));
+ DBUG_RETURN(result_list->bgs_error);
+ }
+ }
}
} else {
DBUG_PRINT("info",("spider bg current->finish_flg=%s",
result_list->current ?
(result_list->current->finish_flg ? "TRUE" : "FALSE") : "NULL"));
+ if (result_list->bgs_error)
+ {
+ DBUG_PRINT("info",("spider bg error"));
+ if (result_list->bgs_error != HA_ERR_END_OF_FILE)
+ {
+ if (result_list->bgs_error_with_message)
+ my_message(result_list->bgs_error,
+ result_list->bgs_error_msg, MYF(0));
+ DBUG_RETURN(result_list->bgs_error);
+ }
+ }
}
} else {
DBUG_PRINT("info",("spider bg search"));
@@ -2147,6 +2327,10 @@ int spider_bg_conn_search(
DBUG_PRINT("info",("spider bg next search"));
if (!result_list->current->finish_flg)
{
+ DBUG_PRINT("info",("spider result_list->quick_mode=%d",
+ result_list->quick_mode));
+ DBUG_PRINT("info",("spider result_list->bgs_current->result=%p",
+ result_list->bgs_current->result));
pthread_mutex_lock(&conn->bg_conn_mutex);
result_list->bgs_phase = 3;
if (
@@ -2159,6 +2343,7 @@ int spider_bg_conn_search(
result_list->split_read ?
result_list->split_read :
result_list->internal_limit - result_list->record_num;
+ DBUG_PRINT("info",("spider sql_kinds=%u", spider->sql_kinds));
if (spider->sql_kinds & SPIDER_SQL_KIND_SQL)
{
if ((error_num = spider->reappend_limit_sql_part(
@@ -2193,6 +2378,9 @@ int spider_bg_conn_search(
conn->bg_target = spider;
conn->link_idx = link_idx;
conn->bg_discard_result = discard_result;
+#ifdef SPIDER_HAS_GROUP_BY_HANDLER
+ conn->link_idx_chain = spider->link_idx_chain;
+#endif
result_list->bgs_working = TRUE;
conn->bg_search = TRUE;
if (with_lock)
@@ -2259,7 +2447,7 @@ void *spider_bg_conn_action(
my_thread_init();
DBUG_ENTER("spider_bg_conn_action");
/* init start */
- if (!(thd = new THD(next_thread_id())))
+ if (!(thd = SPIDER_new_THD(next_thread_id())))
{
pthread_mutex_lock(&conn->bg_conn_sync_mutex);
pthread_cond_signal(&conn->bg_conn_sync_cond);
@@ -2267,6 +2455,7 @@ void *spider_bg_conn_action(
my_thread_end();
DBUG_RETURN(NULL);
}
+ SPIDER_set_next_thread_id(thd);
#ifdef HAVE_PSI_INTERFACE
mysql_thread_set_psi_id(thd->thread_id);
#endif
@@ -2393,12 +2582,23 @@ void *spider_bg_conn_action(
pthread_mutex_lock(&conn->mta_conn_mutex);
SPIDER_SET_FILE_POS(&conn->mta_conn_mutex_file_pos);
}
- if ((error_num = dbton_handler->set_sql_for_exec(sql_type,
- conn->link_idx)))
+ if (spider->use_fields)
{
- result_list->bgs_error = error_num;
- if ((result_list->bgs_error_with_message = thd->is_error()))
- strmov(result_list->bgs_error_msg, spider_stmt_da_message(thd));
+ if ((error_num = dbton_handler->set_sql_for_exec(sql_type,
+ conn->link_idx, conn->link_idx_chain)))
+ {
+ result_list->bgs_error = error_num;
+ if ((result_list->bgs_error_with_message = thd->is_error()))
+ strmov(result_list->bgs_error_msg, spider_stmt_da_message(thd));
+ }
+ } else {
+ if ((error_num = dbton_handler->set_sql_for_exec(sql_type,
+ conn->link_idx)))
+ {
+ result_list->bgs_error = error_num;
+ if ((result_list->bgs_error_with_message = thd->is_error()))
+ strmov(result_list->bgs_error_msg, spider_stmt_da_message(thd));
+ }
}
if (!dbton_handler->need_lock_before_set_sql_for_exec(sql_type))
{
@@ -2701,7 +2901,6 @@ void *spider_bg_sts_action(
SPIDER_TRX *trx;
int error_num = 0, roop_count;
ha_spider spider;
-#if defined(_MSC_VER) || defined(__SUNPRO_CC)
int *need_mons;
SPIDER_CONN **conns;
uint *conn_link_idx;
@@ -2712,48 +2911,31 @@ void *spider_bg_sts_action(
char **hs_w_conn_keys;
#endif
spider_db_handler **dbton_hdl;
-#else
- int need_mons[share->link_count];
- SPIDER_CONN *conns[share->link_count];
- uint conn_link_idx[share->link_count];
- uchar conn_can_fo[share->link_bitmap_size];
- char *conn_keys[share->link_count];
-#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
- char *hs_r_conn_keys[share->link_count];
- char *hs_w_conn_keys[share->link_count];
-#endif
- spider_db_handler *dbton_hdl[SPIDER_DBTON_SIZE];
-#endif
THD *thd;
my_thread_init();
DBUG_ENTER("spider_bg_sts_action");
/* init start */
-#if defined(_MSC_VER) || defined(__SUNPRO_CC)
+ char *ptr;
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
- if (!(need_mons = (int *)
- spider_bulk_malloc(spider_current_trx, 21, MYF(MY_WME),
- &need_mons, sizeof(int) * share->link_count,
- &conns, sizeof(SPIDER_CONN *) * share->link_count,
- &conn_link_idx, sizeof(uint) * share->link_count,
- &conn_can_fo, sizeof(uchar) * share->link_bitmap_size,
- &conn_keys, sizeof(char *) * share->link_count,
- &hs_r_conn_keys, sizeof(char *) * share->link_count,
- &hs_w_conn_keys, sizeof(char *) * share->link_count,
- &dbton_hdl, sizeof(spider_db_handler *) * SPIDER_DBTON_SIZE,
- NullS))
- )
+ ptr = (char *) my_alloca(
+ (sizeof(int) * share->link_count) +
+ (sizeof(SPIDER_CONN *) * share->link_count) +
+ (sizeof(uint) * share->link_count) +
+ (sizeof(uchar) * share->link_bitmap_size) +
+ (sizeof(char *) * share->link_count) +
+ (sizeof(char *) * share->link_count) +
+ (sizeof(char *) * share->link_count) +
+ (sizeof(spider_db_handler *) * SPIDER_DBTON_SIZE));
#else
- if (!(need_mons = (int *)
- spider_bulk_malloc(spider_current_trx, 21, MYF(MY_WME),
- &need_mons, sizeof(int) * share->link_count,
- &conns, sizeof(SPIDER_CONN *) * share->link_count,
- &conn_link_idx, sizeof(uint) * share->link_count,
- &conn_can_fo, sizeof(uchar) * share->link_bitmap_size,
- &conn_keys, sizeof(char *) * share->link_count,
- &dbton_hdl, sizeof(spider_db_handler *) * SPIDER_DBTON_SIZE,
- NullS))
- )
-#endif
+ ptr = (char *) my_alloca(
+ (sizeof(int) * share->link_count) +
+ (sizeof(SPIDER_CONN *) * share->link_count) +
+ (sizeof(uint) * share->link_count) +
+ (sizeof(uchar) * share->link_bitmap_size) +
+ (sizeof(char *) * share->link_count) +
+ (sizeof(spider_db_handler *) * SPIDER_DBTON_SIZE));
+#endif
+ if (!ptr)
{
pthread_mutex_lock(&share->sts_mutex);
share->bg_sts_thd_wait = FALSE;
@@ -2763,20 +2945,35 @@ void *spider_bg_sts_action(
my_thread_end();
DBUG_RETURN(NULL);
}
+ need_mons = (int *) ptr;
+ ptr += (sizeof(int) * share->link_count);
+ conns = (SPIDER_CONN **) ptr;
+ ptr += (sizeof(SPIDER_CONN *) * share->link_count);
+ conn_link_idx = (uint *) ptr;
+ ptr += (sizeof(uint) * share->link_count);
+ conn_can_fo = (uchar *) ptr;
+ ptr += (sizeof(uchar) * share->link_bitmap_size);
+ conn_keys = (char **) ptr;
+ ptr += (sizeof(char *) * share->link_count);
+#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
+ hs_r_conn_keys = (char **) ptr;
+ ptr += (sizeof(char *) * share->link_count);
+ hs_w_conn_keys = (char **) ptr;
+ ptr += (sizeof(char *) * share->link_count);
#endif
+ dbton_hdl = (spider_db_handler **) ptr;
pthread_mutex_lock(&share->sts_mutex);
- if (!(thd = new THD(next_thread_id())))
+ if (!(thd = SPIDER_new_THD(next_thread_id())))
{
share->bg_sts_thd_wait = FALSE;
share->bg_sts_kill = FALSE;
share->bg_sts_init = FALSE;
pthread_mutex_unlock(&share->sts_mutex);
my_thread_end();
-#if defined(_MSC_VER) || defined(__SUNPRO_CC)
- spider_free(NULL, need_mons, MYF(MY_WME));
-#endif
+ my_afree(need_mons);
DBUG_RETURN(NULL);
}
+ SPIDER_set_next_thread_id(thd);
#ifdef HAVE_PSI_INTERFACE
mysql_thread_set_psi_id(thd->thread_id);
#endif
@@ -2793,9 +2990,7 @@ void *spider_bg_sts_action(
my_pthread_setspecific_ptr(THR_THD, NULL);
#endif
my_thread_end();
-#if defined(_MSC_VER) || defined(__SUNPRO_CC)
- spider_free(NULL, need_mons, MYF(MY_WME));
-#endif
+ my_afree(need_mons);
DBUG_RETURN(NULL);
}
share->bg_sts_thd = thd;
@@ -2828,14 +3023,14 @@ void *spider_bg_sts_action(
spider_bit_is_set(share->dbton_bitmap, roop_count) &&
spider_dbton[roop_count].create_db_handler
) {
- if ((dbton_hdl[roop_count] = spider_dbton[roop_count].create_db_handler(
+ if (!(dbton_hdl[roop_count] = spider_dbton[roop_count].create_db_handler(
&spider, share->dbton_share[roop_count])))
break;
if (dbton_hdl[roop_count]->init())
break;
}
}
- if (roop_count == SPIDER_DBTON_SIZE)
+ if (roop_count < SPIDER_DBTON_SIZE)
{
DBUG_PRINT("info",("spider handler init error"));
for (roop_count = SPIDER_DBTON_SIZE - 1; roop_count >= 0; --roop_count)
@@ -2858,9 +3053,7 @@ void *spider_bg_sts_action(
my_pthread_setspecific_ptr(THR_THD, NULL);
#endif
my_thread_end();
-#if defined(_MSC_VER) || defined(__SUNPRO_CC)
- spider_free(NULL, need_mons, MYF(MY_WME));
-#endif
+ my_afree(need_mons);
DBUG_RETURN(NULL);
}
/* init end */
@@ -2889,12 +3082,10 @@ void *spider_bg_sts_action(
my_pthread_setspecific_ptr(THR_THD, NULL);
#endif
my_thread_end();
-#if defined(_MSC_VER) || defined(__SUNPRO_CC)
- spider_free(NULL, need_mons, MYF(MY_WME));
-#endif
+ my_afree(need_mons);
DBUG_RETURN(NULL);
}
- if (spider.search_link_idx == -1)
+ if (spider.search_link_idx < 0)
{
spider_trx_set_link_idx_for_all(&spider);
/*
@@ -2932,6 +3123,7 @@ void *spider_bg_sts_action(
spider_global_trx,
thd,
share,
+ spider.search_link_idx,
(uint32) share->monitoring_sid[spider.search_link_idx],
share->table_name,
share->table_name_length,
@@ -2974,6 +3166,7 @@ void *spider_bg_sts_action(
spider_global_trx,
thd,
share,
+ spider.search_link_idx,
(uint32) share->monitoring_sid[spider.search_link_idx],
share->table_name,
share->table_name_length,
@@ -3080,7 +3273,6 @@ void *spider_bg_crd_action(
int error_num = 0, roop_count;
ha_spider spider;
TABLE table;
-#if defined(_MSC_VER) || defined(__SUNPRO_CC)
int *need_mons;
SPIDER_CONN **conns;
uint *conn_link_idx;
@@ -3091,48 +3283,31 @@ void *spider_bg_crd_action(
char **hs_w_conn_keys;
#endif
spider_db_handler **dbton_hdl;
-#else
- int need_mons[share->link_count];
- SPIDER_CONN *conns[share->link_count];
- uint conn_link_idx[share->link_count];
- uchar conn_can_fo[share->link_bitmap_size];
- char *conn_keys[share->link_count];
-#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
- char *hs_r_conn_keys[share->link_count];
- char *hs_w_conn_keys[share->link_count];
-#endif
- spider_db_handler *dbton_hdl[SPIDER_DBTON_SIZE];
-#endif
THD *thd;
my_thread_init();
DBUG_ENTER("spider_bg_crd_action");
/* init start */
-#if defined(_MSC_VER) || defined(__SUNPRO_CC)
+ char *ptr;
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
- if (!(need_mons = (int *)
- spider_bulk_malloc(spider_current_trx, 22, MYF(MY_WME),
- &need_mons, sizeof(int) * share->link_count,
- &conns, sizeof(SPIDER_CONN *) * share->link_count,
- &conn_link_idx, sizeof(uint) * share->link_count,
- &conn_can_fo, sizeof(uchar) * share->link_bitmap_size,
- &conn_keys, sizeof(char *) * share->link_count,
- &hs_r_conn_keys, sizeof(char *) * share->link_count,
- &hs_w_conn_keys, sizeof(char *) * share->link_count,
- &dbton_hdl, sizeof(spider_db_handler *) * SPIDER_DBTON_SIZE,
- NullS))
- )
+ ptr = (char *) my_alloca(
+ (sizeof(int) * share->link_count) +
+ (sizeof(SPIDER_CONN *) * share->link_count) +
+ (sizeof(uint) * share->link_count) +
+ (sizeof(uchar) * share->link_bitmap_size) +
+ (sizeof(char *) * share->link_count) +
+ (sizeof(char *) * share->link_count) +
+ (sizeof(char *) * share->link_count) +
+ (sizeof(spider_db_handler *) * SPIDER_DBTON_SIZE));
#else
- if (!(need_mons = (int *)
- spider_bulk_malloc(spider_current_trx, 22, MYF(MY_WME),
- &need_mons, sizeof(int) * share->link_count,
- &conns, sizeof(SPIDER_CONN *) * share->link_count,
- &conn_link_idx, sizeof(uint) * share->link_count,
- &conn_can_fo, sizeof(uchar) * share->link_bitmap_size,
- &conn_keys, sizeof(char *) * share->link_count,
- &dbton_hdl, sizeof(spider_db_handler *) * SPIDER_DBTON_SIZE,
- NullS))
- )
-#endif
+ ptr = (char *) my_alloca(
+ (sizeof(int) * share->link_count) +
+ (sizeof(SPIDER_CONN *) * share->link_count) +
+ (sizeof(uint) * share->link_count) +
+ (sizeof(uchar) * share->link_bitmap_size) +
+ (sizeof(char *) * share->link_count) +
+ (sizeof(spider_db_handler *) * SPIDER_DBTON_SIZE));
+#endif
+ if (!ptr)
{
pthread_mutex_lock(&share->crd_mutex);
share->bg_crd_thd_wait = FALSE;
@@ -3142,20 +3317,35 @@ void *spider_bg_crd_action(
my_thread_end();
DBUG_RETURN(NULL);
}
+ need_mons = (int *) ptr;
+ ptr += (sizeof(int) * share->link_count);
+ conns = (SPIDER_CONN **) ptr;
+ ptr += (sizeof(SPIDER_CONN *) * share->link_count);
+ conn_link_idx = (uint *) ptr;
+ ptr += (sizeof(uint) * share->link_count);
+ conn_can_fo = (uchar *) ptr;
+ ptr += (sizeof(uchar) * share->link_bitmap_size);
+ conn_keys = (char **) ptr;
+ ptr += (sizeof(char *) * share->link_count);
+#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
+ hs_r_conn_keys = (char **) ptr;
+ ptr += (sizeof(char *) * share->link_count);
+ hs_w_conn_keys = (char **) ptr;
+ ptr += (sizeof(char *) * share->link_count);
#endif
+ dbton_hdl = (spider_db_handler **) ptr;
pthread_mutex_lock(&share->crd_mutex);
- if (!(thd = new THD(next_thread_id())))
+ if (!(thd = SPIDER_new_THD(next_thread_id())))
{
share->bg_crd_thd_wait = FALSE;
share->bg_crd_kill = FALSE;
share->bg_crd_init = FALSE;
pthread_mutex_unlock(&share->crd_mutex);
my_thread_end();
-#if defined(_MSC_VER) || defined(__SUNPRO_CC)
- spider_free(NULL, need_mons, MYF(MY_WME));
-#endif
+ my_afree(need_mons);
DBUG_RETURN(NULL);
}
+ SPIDER_set_next_thread_id(thd);
#ifdef HAVE_PSI_INTERFACE
mysql_thread_set_psi_id(thd->thread_id);
#endif
@@ -3172,9 +3362,7 @@ void *spider_bg_crd_action(
my_pthread_setspecific_ptr(THR_THD, NULL);
#endif
my_thread_end();
-#if defined(_MSC_VER) || defined(__SUNPRO_CC)
- spider_free(NULL, need_mons, MYF(MY_WME));
-#endif
+ my_afree(need_mons);
DBUG_RETURN(NULL);
}
share->bg_crd_thd = thd;
@@ -3211,14 +3399,14 @@ void *spider_bg_crd_action(
spider_bit_is_set(share->dbton_bitmap, roop_count) &&
spider_dbton[roop_count].create_db_handler
) {
- if ((dbton_hdl[roop_count] = spider_dbton[roop_count].create_db_handler(
+ if (!(dbton_hdl[roop_count] = spider_dbton[roop_count].create_db_handler(
&spider, share->dbton_share[roop_count])))
break;
if (dbton_hdl[roop_count]->init())
break;
}
}
- if (roop_count == SPIDER_DBTON_SIZE)
+ if (roop_count < SPIDER_DBTON_SIZE)
{
DBUG_PRINT("info",("spider handler init error"));
for (roop_count = SPIDER_DBTON_SIZE - 1; roop_count >= 0; --roop_count)
@@ -3241,9 +3429,7 @@ void *spider_bg_crd_action(
my_pthread_setspecific_ptr(THR_THD, NULL);
#endif
my_thread_end();
-#if defined(_MSC_VER) || defined(__SUNPRO_CC)
- spider_free(NULL, need_mons, MYF(MY_WME));
-#endif
+ my_afree(need_mons);
DBUG_RETURN(NULL);
}
/* init end */
@@ -3272,12 +3458,10 @@ void *spider_bg_crd_action(
my_pthread_setspecific_ptr(THR_THD, NULL);
#endif
my_thread_end();
-#if defined(_MSC_VER) || defined(__SUNPRO_CC)
- spider_free(NULL, need_mons, MYF(MY_WME));
-#endif
+ my_afree(need_mons);
DBUG_RETURN(NULL);
}
- if (spider.search_link_idx == -1)
+ if (spider.search_link_idx < 0)
{
spider_trx_set_link_idx_for_all(&spider);
/*
@@ -3315,6 +3499,7 @@ void *spider_bg_crd_action(
spider_global_trx,
thd,
share,
+ spider.search_link_idx,
(uint32) share->monitoring_sid[spider.search_link_idx],
share->table_name,
share->table_name_length,
@@ -3357,6 +3542,7 @@ void *spider_bg_crd_action(
spider_global_trx,
thd,
share,
+ spider.search_link_idx,
(uint32) share->monitoring_sid[spider.search_link_idx],
share->table_name,
share->table_name_length,
@@ -3406,15 +3592,9 @@ int spider_create_mon_threads(
{
char link_idx_str[SPIDER_SQL_INT_LEN];
int link_idx_str_length;
-#if defined(_MSC_VER) || defined(__SUNPRO_CC)
- spider_string conv_name_str(share->table_name_length +
- SPIDER_SQL_INT_LEN + 1);
- conv_name_str.set_charset(system_charset_info);
-#else
- char buf[share->table_name_length + SPIDER_SQL_INT_LEN + 1];
+ char *buf = (char *) my_alloca(share->table_name_length + SPIDER_SQL_INT_LEN + 1);
spider_string conv_name_str(buf, share->table_name_length +
SPIDER_SQL_INT_LEN + 1, system_charset_info);
-#endif
conv_name_str.init_calc_mem(105);
conv_name_str.length(0);
conv_name_str.q_append(share->table_name, share->table_name_length);
@@ -3424,14 +3604,26 @@ int spider_create_mon_threads(
if (share->monitoring_bg_kind[roop_count])
{
conv_name_str.length(share->table_name_length);
- link_idx_str_length = my_sprintf(link_idx_str, (link_idx_str,
- "%010d", roop_count));
+ if (share->static_link_ids[roop_count])
+ {
+ memcpy(link_idx_str, share->static_link_ids[roop_count],
+ share->static_link_ids_lengths[roop_count] + 1);
+ link_idx_str_length = share->static_link_ids_lengths[roop_count];
+ } else {
+ link_idx_str_length = my_sprintf(link_idx_str, (link_idx_str,
+ "%010d", roop_count));
+ }
conv_name_str.q_append(link_idx_str, link_idx_str_length + 1);
conv_name_str.length(conv_name_str.length() - 1);
if (!(table_mon_list = spider_get_ping_table_mon_list(trx, trx->thd,
&conv_name_str, share->table_name_length, roop_count,
+ share->static_link_ids[roop_count],
+ share->static_link_ids_lengths[roop_count],
(uint32) share->monitoring_sid[roop_count], FALSE, &error_num)))
+ {
+ my_afree(buf);
goto error_get_ping_table_mon_list;
+ }
spider_free_ping_table_mon_list(table_mon_list);
}
}
@@ -3447,6 +3639,7 @@ int spider_create_mon_threads(
NullS))
) {
error_num = HA_ERR_OUT_OF_MEM;
+ my_afree(buf);
goto error_alloc_base;
}
for (roop_count = 0; roop_count < (int) share->all_link_count;
@@ -3463,6 +3656,7 @@ int spider_create_mon_threads(
#endif
) {
error_num = HA_ERR_OUT_OF_MEM;
+ my_afree(buf);
goto error_mutex_init;
}
}
@@ -3479,6 +3673,7 @@ int spider_create_mon_threads(
#endif
) {
error_num = HA_ERR_OUT_OF_MEM;
+ my_afree(buf);
goto error_cond_init;
}
}
@@ -3495,6 +3690,7 @@ int spider_create_mon_threads(
#endif
) {
error_num = HA_ERR_OUT_OF_MEM;
+ my_afree(buf);
goto error_sleep_cond_init;
}
}
@@ -3518,6 +3714,7 @@ int spider_create_mon_threads(
#endif
{
error_num = HA_ERR_OUT_OF_MEM;
+ my_afree(buf);
goto error_thread_create;
}
pthread_cond_wait(&share->bg_mon_conds[roop_count],
@@ -3526,6 +3723,7 @@ int spider_create_mon_threads(
}
}
share->bg_mon_init = TRUE;
+ my_afree(buf);
}
}
DBUG_RETURN(0);
@@ -3633,7 +3831,7 @@ void *spider_bg_mon_action(
DBUG_ENTER("spider_bg_mon_action");
/* init start */
pthread_mutex_lock(&share->bg_mon_mutexes[link_idx]);
- if (!(thd = new THD(next_thread_id())))
+ if (!(thd = SPIDER_new_THD(next_thread_id())))
{
share->bg_mon_kill = FALSE;
share->bg_mon_init = FALSE;
@@ -3642,6 +3840,7 @@ void *spider_bg_mon_action(
my_thread_end();
DBUG_RETURN(NULL);
}
+ SPIDER_set_next_thread_id(thd);
#ifdef HAVE_PSI_INTERFACE
mysql_thread_set_psi_id(thd->thread_id);
#endif
@@ -3706,6 +3905,7 @@ void *spider_bg_mon_action(
spider_global_trx,
thd,
share,
+ link_idx,
(uint32) share->monitoring_sid[link_idx],
share->table_name,
share->table_name_length,
@@ -3734,25 +3934,19 @@ int spider_conn_first_link_idx(
int roop_count, active_links = 0;
longlong balance_total = 0, balance_val;
double rand_val;
-#if defined(_MSC_VER) || defined(__SUNPRO_CC)
int *link_idxs, link_idx;
long *balances;
-#else
- int link_idxs[link_count];
- long balances[link_count];
-#endif
DBUG_ENTER("spider_conn_first_link_idx");
-#if defined(_MSC_VER) || defined(__SUNPRO_CC)
- if (!(link_idxs = (int *)
- spider_bulk_malloc(spider_current_trx, 24, MYF(MY_WME),
- &link_idxs, sizeof(int) * link_count,
- &balances, sizeof(long) * link_count,
- NullS))
- ) {
+ char *ptr;
+ ptr = (char *) my_alloca((sizeof(int) * link_count) + (sizeof(long) * link_count));
+ if (!ptr)
+ {
DBUG_PRINT("info",("spider out of memory"));
- DBUG_RETURN(-1);
+ DBUG_RETURN(-2);
}
-#endif
+ link_idxs = (int *) ptr;
+ ptr += sizeof(int) * link_count;
+ balances = (long *) ptr;
for (roop_count = 0; roop_count < link_count; roop_count++)
{
DBUG_ASSERT((conn_link_idx[roop_count] - roop_count) % link_count == 0);
@@ -3768,9 +3962,7 @@ int spider_conn_first_link_idx(
if (active_links == 0)
{
DBUG_PRINT("info",("spider all links are failed"));
-#if defined(_MSC_VER) || defined(__SUNPRO_CC)
- spider_free(spider_current_trx, link_idxs, MYF(MY_WME));
-#endif
+ my_afree(link_idxs);
DBUG_RETURN(-1);
}
#if defined(MARIADB_BASE_VERSION) && MYSQL_VERSION_ID >= 100002
@@ -3797,13 +3989,9 @@ int spider_conn_first_link_idx(
}
DBUG_PRINT("info",("spider first link_idx=%d", link_idxs[roop_count]));
-#if defined(_MSC_VER) || defined(__SUNPRO_CC)
link_idx = link_idxs[roop_count];
- spider_free(spider_current_trx, link_idxs, MYF(MY_WME));
+ my_afree(link_idxs);
DBUG_RETURN(link_idx);
-#else
- DBUG_RETURN(link_idxs[roop_count]);
-#endif
}
int spider_conn_next_link_idx(
@@ -3856,6 +4044,17 @@ int spider_conn_link_idx_next(
DBUG_RETURN(link_idx);
}
+int spider_conn_get_link_status(
+ long *link_statuses,
+ uint *conn_link_idx,
+ int link_idx
+) {
+ DBUG_ENTER("spider_conn_get_link_status");
+ DBUG_PRINT("info",("spider link_status=%d",
+ (int) link_statuses[conn_link_idx[link_idx]]));
+ DBUG_RETURN((int) link_statuses[conn_link_idx[link_idx]]);
+}
+
int spider_conn_lock_mode(
ha_spider *spider
) {
@@ -4165,3 +4364,208 @@ bool spider_conn_need_open_handler(
}
DBUG_RETURN(TRUE);
}
+
+SPIDER_CONN* spider_get_conn_from_idle_connection(
+ SPIDER_SHARE *share,
+ int link_idx,
+ char *conn_key,
+ ha_spider *spider,
+ uint conn_kind,
+ int base_link_idx,
+ int *error_num
+ )
+{
+ DBUG_ENTER("spider_get_conn_from_idle_connection");
+ SPIDER_IP_PORT_CONN *ip_port_conn;
+ SPIDER_CONN *conn = NULL;
+ uint spider_max_connections = spider_param_max_connections();
+ struct timespec abstime;
+ ulonglong start, inter_val = 0;
+ longlong last_ntime = 0;
+ ulonglong wait_time = (ulonglong)spider_param_conn_wait_timeout()*1000*1000*1000; // default 10s
+
+ unsigned long ip_port_count = 0; // init 0
+
+ set_timespec(abstime, 0);
+
+ pthread_mutex_lock(&spider_ipport_conn_mutex);
+#ifdef SPIDER_HAS_HASH_VALUE_TYPE
+ if ((ip_port_conn = (SPIDER_IP_PORT_CONN*) my_hash_search_using_hash_value(
+ &spider_ipport_conns, share->conn_keys_hash_value[link_idx],
+ (uchar*) share->conn_keys[link_idx], share->conn_keys_lengths[link_idx])))
+#else
+ if ((ip_port_conn = (SPIDER_IP_PORT_CONN*) my_hash_search(
+ &spider_ipport_conns,
+ (uchar*) share->conn_keys[link_idx], share->conn_keys_lengths[link_idx])))
+#endif
+ { /* exists */
+ pthread_mutex_unlock(&spider_ipport_conn_mutex);
+ pthread_mutex_lock(&ip_port_conn->mutex);
+ ip_port_count = ip_port_conn->ip_port_count;
+ } else {
+ pthread_mutex_unlock(&spider_ipport_conn_mutex);
+ }
+
+ if (
+ ip_port_conn &&
+ ip_port_count >= spider_max_connections &&
+ spider_max_connections > 0
+ ) { /* no idle conn && enable connection pool, wait */
+ pthread_mutex_unlock(&ip_port_conn->mutex);
+ start = my_hrtime().val;
+ while(1)
+ {
+ int error;
+ inter_val = my_hrtime().val - start; // us
+ last_ntime = wait_time - inter_val*1000; // *1000, to ns
+ if(last_ntime <= 0)
+ {/* wait timeout */
+ *error_num = ER_SPIDER_CON_COUNT_ERROR;
+ DBUG_RETURN(NULL);
+ }
+ set_timespec_nsec(abstime, last_ntime);
+ pthread_mutex_lock(&ip_port_conn->mutex);
+ ++ip_port_conn->waiting_count;
+ error = pthread_cond_timedwait(&ip_port_conn->cond, &ip_port_conn->mutex, &abstime);
+ --ip_port_conn->waiting_count;
+ pthread_mutex_unlock(&ip_port_conn->mutex);
+ if (error == ETIMEDOUT || error == ETIME || error != 0 )
+ {
+ *error_num = ER_SPIDER_CON_COUNT_ERROR;
+ DBUG_RETURN(NULL);
+ }
+
+ pthread_mutex_lock(&spider_conn_mutex);
+#ifdef SPIDER_HAS_HASH_VALUE_TYPE
+ if ((conn = (SPIDER_CONN*) my_hash_search_using_hash_value(
+ &spider_open_connections, share->conn_keys_hash_value[link_idx],
+ (uchar*) share->conn_keys[link_idx],
+ share->conn_keys_lengths[link_idx])))
+#else
+ if ((conn = (SPIDER_CONN*) my_hash_search(&spider_open_connections,
+ (uchar*) share->conn_keys[link_idx],
+ share->conn_keys_lengths[link_idx])))
+#endif
+ {
+ /* get conn from spider_open_connections, then delete conn in spider_open_connections */
+#ifdef HASH_UPDATE_WITH_HASH_VALUE
+ my_hash_delete_with_hash_value(&spider_open_connections,
+ conn->conn_key_hash_value, (uchar*) conn);
+#else
+ my_hash_delete(&spider_open_connections, (uchar*) conn);
+#endif
+ pthread_mutex_unlock(&spider_conn_mutex);
+ DBUG_PRINT("info",("spider get global conn"));
+ if (spider)
+ {
+ spider->conns[base_link_idx] = conn;
+ if (spider_bit_is_set(spider->conn_can_fo, base_link_idx))
+ conn->use_for_active_standby = TRUE;
+ }
+ DBUG_RETURN(conn);
+ }
+ else
+ {
+ pthread_mutex_unlock(&spider_conn_mutex);
+ }
+ }
+ }
+ else
+ { /* create conn */
+ if (ip_port_conn)
+ pthread_mutex_unlock(&ip_port_conn->mutex);
+ DBUG_PRINT("info",("spider create new conn"));
+ if (!(conn = spider_create_conn(share, spider, link_idx, base_link_idx, conn_kind, error_num)))
+ DBUG_RETURN(conn);
+ *conn->conn_key = *conn_key;
+ if (spider)
+ {
+ spider->conns[base_link_idx] = conn;
+ if (spider_bit_is_set(spider->conn_can_fo, base_link_idx))
+ conn->use_for_active_standby = TRUE;
+ }
+ }
+
+ DBUG_RETURN(conn);
+}
+
+
+SPIDER_IP_PORT_CONN* spider_create_ipport_conn(SPIDER_CONN *conn)
+{
+ DBUG_ENTER("spider_create_ipport_conn");
+ if (conn)
+ {
+ SPIDER_IP_PORT_CONN *ret = (SPIDER_IP_PORT_CONN *) my_malloc(sizeof(*ret), MY_ZEROFILL | MY_WME);
+ if (!ret)
+ {
+ goto err_return_direct;
+ }
+
+#if MYSQL_VERSION_ID < 50500
+ if (pthread_mutex_init(&ret->mutex, MY_MUTEX_INIT_FAST))
+#else
+ if (mysql_mutex_init(spd_key_mutex_conn_i, &ret->mutex, MY_MUTEX_INIT_FAST))
+#endif
+ {
+ //error
+ goto err_malloc_key;
+ }
+
+#if MYSQL_VERSION_ID < 50500
+ if (pthread_cond_init(&ret->cond, NULL))
+#else
+ if (mysql_cond_init(spd_key_cond_conn_i, &ret->cond, NULL))
+#endif
+ {
+ pthread_mutex_destroy(&ret->mutex);
+ goto err_malloc_key;
+ //error
+ }
+
+ ret->key_len = conn->conn_key_length;
+ if (ret->key_len <= 0) {
+ pthread_cond_destroy(&ret->cond);
+ pthread_mutex_destroy(&ret->mutex);
+ goto err_malloc_key;
+ }
+
+ ret->key = (char *) my_malloc(ret->key_len, MY_ZEROFILL | MY_WME);
+ if (!ret->key) {
+ pthread_cond_destroy(&ret->cond);
+ pthread_mutex_destroy(&ret->mutex);
+ goto err_malloc_key;
+ }
+
+ memcpy(ret->key, conn->conn_key, ret->key_len);
+
+ strncpy(ret->remote_ip_str, conn->tgt_host, sizeof(ret->remote_ip_str));
+ ret->remote_port = conn->tgt_port;
+ ret->conn_id = conn->conn_id;
+ ret->ip_port_count = 1; // init
+
+#ifdef SPIDER_HAS_HASH_VALUE_TYPE
+ ret->key_hash_value = conn->conn_key_hash_value;
+#endif
+ DBUG_RETURN(ret);
+err_malloc_key:
+ spider_my_free(ret, MYF(0));
+err_return_direct:
+ DBUG_RETURN(NULL);
+ }
+ DBUG_RETURN(NULL);
+}
+
+
+void spider_free_ipport_conn(void *info)
+{
+ DBUG_ENTER("spider_free_ipport_conn");
+ if (info)
+ {
+ SPIDER_IP_PORT_CONN *p = (SPIDER_IP_PORT_CONN *)info;
+ pthread_cond_destroy(&p->cond);
+ pthread_mutex_destroy(&p->mutex);
+ spider_my_free(p->key, MYF(0));
+ spider_my_free(p, MYF(0));
+ }
+ DBUG_VOID_RETURN;
+}