diff options
-rw-r--r-- | storage/xpand/ha_xpand.cc | 146 | ||||
-rw-r--r-- | storage/xpand/xpand_connection.cc | 132 | ||||
-rw-r--r-- | storage/xpand/xpand_connection.h | 26 |
3 files changed, 217 insertions, 87 deletions
diff --git a/storage/xpand/ha_xpand.cc b/storage/xpand/ha_xpand.cc index 90d8738e332..ce713ac5bc0 100644 --- a/storage/xpand/ha_xpand.cc +++ b/storage/xpand/ha_xpand.cc @@ -8,6 +8,7 @@ Copyright (c) 2019, MariaDB Corporation. #include "ha_xpand_pushdown.h" #include "key.h" #include <strfunc.h> /* strconvert */ +#include "my_pthread.h" handlerton *xpand_hton = NULL; @@ -41,69 +42,100 @@ static MYSQL_SYSVAR_INT NULL, NULL, -1, -1, 2147483647, 0 ); -char *xpand_host; -static MYSQL_SYSVAR_STR +//state for load balancing +int xpand_hosts_cur; //protected by my_atomic's +ulong xpand_balance_algorithm; +const char* balance_algorithm_names[]= +{ + "first", "round_robin", NullS +}; + +TYPELIB balance_algorithms= +{ + array_elements(balance_algorithm_names) - 1, "", + balance_algorithm_names, NULL +}; + +static void update_balance_algorithm(MYSQL_THD thd, struct st_mysql_sys_var *var, + void *var_ptr, const void *save) +{ + *static_cast<ulong *>(var_ptr) = *static_cast<const ulong *>(save); + my_atomic_store32(&xpand_hosts_cur, 0); +} + +static MYSQL_SYSVAR_ENUM ( - host, - xpand_host, - PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, - "Xpand host", - NULL, NULL, "127.0.0.1" + balance_algorithm, + xpand_balance_algorithm, + PLUGIN_VAR_OPCMDARG, + "Method for managing load balancing of Clustrix nodes, can take values FIRST or ROUND_ROBIN", + NULL, update_balance_algorithm, XPAND_BALANCE_ROUND_ROBIN, &balance_algorithms ); -int host_list_cnt; -char **host_list; +//current list of clx hosts +static PSI_rwlock_key key_xpand_hosts; +mysql_rwlock_t xpand_hosts_lock; +xpand_host_list *xpand_hosts; -static void free_host_list() +//only call while holding lock +static void clear_hosts() { - if (host_list) { - for (int i = 0; host_list[i]; i++) - my_free(host_list[i]); - my_free(host_list); - host_list = NULL; - } + delete xpand_hosts; + xpand_hosts = NULL; + my_atomic_store32(&xpand_hosts_cur, 0); } -static void update_host_list(char *xpand_host) +static int check_hosts(MYSQL_THD thd, struct st_mysql_sys_var *var, + void *save, struct st_mysql_value *value) { - free_host_list(); + char b; + int len = 0; + const char *val = value->val_str(value, &b, &len); - int cnt = 0; - for (char *p = xpand_host, *s = xpand_host; ; p++) { - if (*p == ',' || *p == '\0') { - if (p > s) { - cnt++; - } - if (!*p) - break; - s = p + 1; - } - } + if (!val) + return HA_ERR_OUT_OF_MEM; - DBUG_PRINT("host_cnt", ("%d", cnt)); - host_list = (char **)my_malloc(sizeof(char *) * cnt+1, MYF(MY_WME)); - host_list[cnt] = 0; - host_list_cnt = cnt; - - int i = 0; - for (char *p = xpand_host, *s = xpand_host; ; p++) { - if (*p == ',' || *p == '\0') { - if (p > s) { - char *host = (char *)my_malloc(p - s + 1, MYF(MY_WME)); - host[p-s] = '\0'; - memcpy(host, s, p-s); - DBUG_PRINT("host", ("%s", host)); - host_list[i++] = host; - } - if (!*p) - break; - s = p + 1; - } + int error_code = 0; + xpand_host_list *host_list = xpand_host_list::create(val, thd, &error_code); + if (error_code) + return error_code; + + *static_cast<xpand_host_list**>(save) = host_list; + return 0; +} + +static void update_hosts(MYSQL_THD thd, struct st_mysql_sys_var *var, + void *var_ptr, const void *save) +{ + mysql_rwlock_wrlock(&xpand_hosts_lock); + + xpand_host_list *from_save = *static_cast<xpand_host_list * const *>(save); + char* raw = from_save->full_list; + + int error_code = 0; + xpand_host_list *new_hosts = xpand_host_list::create(raw, &error_code); + if (error_code) { + my_printf_error(error_code, "Unhandled error setting xpand hostlist", MYF(0)); + return; } - DBUG_PRINT("xpand_host", ("%s", xpand_host)); + clear_hosts(); + xpand_hosts = new_hosts; + *static_cast<char**>(var_ptr) = new_hosts->full_list; + + mysql_rwlock_unlock(&xpand_hosts_lock); } +static char *xpand_hosts_str; +static MYSQL_SYSVAR_STR +( + hosts, + xpand_hosts_str, + PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, + "List of xpand hostnames seperated by commas, semicolons or spaces", + check_hosts, update_hosts, "localhost" +); + char *xpand_username; static MYSQL_SYSVAR_STR ( @@ -1403,15 +1435,20 @@ static int xpand_init(void *p) xpand_hton->create_select = create_xpand_select_handler; xpand_hton->create_derived = create_xpand_derived_handler; - update_host_list(xpand_host); - - DBUG_RETURN(0); + mysql_rwlock_init(key_xpand_hosts, &xpand_hosts_lock); + mysql_rwlock_wrlock(&xpand_hosts_lock); + int error_code = 0; + xpand_hosts = xpand_host_list::create(xpand_hosts_str, &error_code); + mysql_rwlock_unlock(&xpand_hosts_lock); + DBUG_RETURN(error_code); } static int xpand_deinit(void *p) { DBUG_ENTER("xpand_deinit"); - free_host_list(); + mysql_rwlock_wrlock(&xpand_hosts_lock); + delete xpand_hosts; + mysql_rwlock_destroy(&xpand_hosts_lock); DBUG_RETURN(0); } @@ -1425,7 +1462,8 @@ static struct st_mysql_sys_var* xpand_system_variables[] = MYSQL_SYSVAR(connect_timeout), MYSQL_SYSVAR(read_timeout), MYSQL_SYSVAR(write_timeout), - MYSQL_SYSVAR(host), + MYSQL_SYSVAR(balance_algorithm), + MYSQL_SYSVAR(hosts), MYSQL_SYSVAR(username), MYSQL_SYSVAR(password), MYSQL_SYSVAR(port), diff --git a/storage/xpand/xpand_connection.cc b/storage/xpand/xpand_connection.cc index 34ba7b7dfe1..73f837e6108 100644 --- a/storage/xpand/xpand_connection.cc +++ b/storage/xpand/xpand_connection.cc @@ -10,6 +10,7 @@ Copyright (c) 2019, MariaDB Corporation. #include "handler.h" #include "table.h" #include "sql_class.h" +#include "my_pthread.h" #include "tztime.h" //#include "errmsg.h" @@ -125,30 +126,42 @@ void xpand_connection::disconnect(bool is_destructor) DBUG_VOID_RETURN; } -int host_list_next; -extern int host_list_cnt; -extern char **host_list; +extern int xpand_hosts_cur; +extern ulong xpand_balance_algorithm; + +extern mysql_rwlock_t xpand_hosts_lock; +extern xpand_host_list *xpand_hosts; int xpand_connection::connect() { - int error_code = 0; - my_bool my_true = 1; DBUG_ENTER("xpand_connection::connect"); + int start = 0; + if (xpand_balance_algorithm == XPAND_BALANCE_ROUND_ROBIN) + start = my_atomic_add32(&xpand_hosts_cur, 1); - // cpu concurrency by damned! - int host_num = host_list_next; - host_num = host_num % host_list_cnt; - char *host = host_list[host_num]; - host_list_next = host_num + 1; - DBUG_PRINT("host", ("%s", host)); + mysql_rwlock_rdlock(&xpand_hosts_lock); - /* Validate the connection parameters */ - if (!strcmp(xpand_socket, "")) - if (!strcmp(host, "127.0.0.1")) - if (xpand_port == MYSQL_PORT_DEFAULT) - DBUG_RETURN(ER_CONNECT_TO_FOREIGN_DATA_SOURCE); + //search for available host + int error_code = 0; + for (int i = 0; i < xpand_hosts->hosts_len; i++) { + char *host = xpand_hosts->hosts[(start + i) % xpand_hosts->hosts_len]; + error_code = connect_direct(host); + if (!error_code) + break; + } + mysql_rwlock_unlock(&xpand_hosts_lock); + if (error_code) + my_error(error_code, MYF(0), "clustrix"); + + DBUG_RETURN(error_code); +} - //xpand_net.methods = &connection_methods; + +int xpand_connection::connect_direct(char *host) +{ + DBUG_ENTER("xpand_connection::connect_direct"); + my_bool my_true = true; + DBUG_PRINT("host", ("%s", host)); if (!mysql_init(&xpand_net)) DBUG_RETURN(HA_ERR_OUT_OF_MEM); @@ -180,29 +193,20 @@ int xpand_connection::connect() } #endif + int error_code = 0; if (!mysql_real_connect(&xpand_net, host, xpand_username, xpand_password, NULL, xpand_port, xpand_socket, CLIENT_MULTI_STATEMENTS)) { error_code = mysql_errno(&xpand_net); disconnect(); - - if (error_code != CR_CONN_HOST_ERROR && - error_code != CR_CONNECTION_ERROR) - { - if (error_code == ER_CON_COUNT_ERROR) - { - my_error(ER_CON_COUNT_ERROR, MYF(0)); - DBUG_RETURN(ER_CON_COUNT_ERROR); - } - my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), host); - DBUG_RETURN(ER_CONNECT_TO_FOREIGN_DATA_SOURCE); - } } - xpand_net.reconnect = 1; + if (error_code && error_code != ER_CON_COUNT_ERROR) { + error_code = ER_CONNECT_TO_FOREIGN_DATA_SOURCE; + } - DBUG_RETURN(0); + DBUG_RETURN(error_code); } int xpand_connection::add_status_vars() @@ -1268,3 +1272,69 @@ int xpand_connection::add_command_operand_bitmap(MY_BITMAP *bitmap) command_length += no_bytes; return 0; } + +/**************************************************************************** +** Class xpand_host_list +****************************************************************************/ + +xpand_host_list *xpand_host_list::create(const char *hosts, int *error_code) +{ + return xpand_host_list::create(hosts, NULL, error_code); +} + +xpand_host_list *xpand_host_list::create(const char *hosts, THD *thd, int *error_code) +{ + xpand_host_list *list = static_cast<xpand_host_list*>( + thd ? + thd_calloc(thd, sizeof(xpand_host_list)) : + my_malloc(sizeof(xpand_host_list), MYF(MY_WME | MY_ZEROFILL))); + if (!list) { + *error_code = HA_ERR_OUT_OF_MEM; + return NULL; + } + + list->full_list = thd ? + thd_strdup(thd, hosts) : + my_strdup(hosts, MYF(MY_WME)); + list->strtok_buf = thd ? + thd_strdup(thd, hosts) : + my_strdup(hosts, MYF(MY_WME)); + if (!list->full_list || !list->strtok_buf) { + *error_code = HA_ERR_OUT_OF_MEM; + return NULL; + } + + const char *sep = ",; "; + //parse into array + int i = 0; + char *cursor = NULL; + char *token = NULL; + for (token = strtok_r(list->strtok_buf, sep, &cursor); + token && i < max_host_count; + token = strtok_r(NULL, sep, &cursor)) { + list->hosts[i] = token; + i++; + } + + //host count out of range + if (i == 0 || token) { + my_free(list->full_list); + my_free(list->strtok_buf); + my_free(list); + *error_code = ER_BAD_HOST_ERROR; + return NULL; + } + list->hosts_len = i; + + return list; +} + +void xpand_host_list::operator delete(void *p) +{ + xpand_host_list *list = static_cast<xpand_host_list*>(p); + if (list) { + my_free(list->full_list); + my_free(list->strtok_buf); + } + my_free(list); +} diff --git a/storage/xpand/xpand_connection.h b/storage/xpand/xpand_connection.h index e2d4761dd68..f641a105e89 100644 --- a/storage/xpand/xpand_connection.h +++ b/storage/xpand/xpand_connection.h @@ -21,11 +21,32 @@ Copyright (c) 2019, MariaDB Corporation. #define XPAND_SERVER_REQUEST 30 -typedef enum xpand_lock_mode { +enum xpand_lock_mode_t { XPAND_NO_LOCKS, XPAND_SHARED, XPAND_EXCLUSIVE, -} xpand_lock_mode_t; +}; + +enum xpand_balance_algorithm_enum { + XPAND_BALANCE_FIRST, + XPAND_BALANCE_ROUND_ROBIN +}; + +static const int max_host_count = 128; +class xpand_host_list { +private: + char *strtok_buf; +public: + char *full_list; + int hosts_len; + char *hosts[max_host_count]; + + static xpand_host_list *create(const char *hosts, int *error_code); + static xpand_host_list *create(const char *hosts, THD *thd, int *error_code); + xpand_host_list() = delete; + static void operator delete(void *p); +}; + class xpand_connection_cursor; class xpand_connection @@ -50,6 +71,7 @@ public: return xpand_net.net.vio; } int connect(); + int connect_direct(char *host); void disconnect(bool is_destructor = FALSE); bool has_open_transaction(); |