summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIsaac Ackerman <isaac.ackerman@mariadb.com>2020-02-27 19:06:22 +0000
committerSergei Petrunia <psergey@askmonty.org>2020-03-10 11:22:33 +0300
commit5546fee6c41009daa312f72846b1f48fe32fb331 (patch)
treea26facf2c0738b0f18b39dcfd4a58281a1f6ab3f
parentb2f56c36e9a7e0d1f1573eac71a5dcec9fc7998c (diff)
downloadmariadb-git-5546fee6c41009daa312f72846b1f48fe32fb331.tar.gz
Improved connection management to clustrix supporting round robin and fail over
-rw-r--r--storage/xpand/ha_xpand.cc146
-rw-r--r--storage/xpand/xpand_connection.cc132
-rw-r--r--storage/xpand/xpand_connection.h26
3 files changed, 217 insertions, 87 deletions
diff --git a/storage/xpand/ha_xpand.cc b/storage/xpand/ha_xpand.cc
index 90d8738e332..ce713ac5bc0 100644
--- a/storage/xpand/ha_xpand.cc
+++ b/storage/xpand/ha_xpand.cc
@@ -8,6 +8,7 @@ Copyright (c) 2019, MariaDB Corporation.
#include "ha_xpand_pushdown.h"
#include "key.h"
#include <strfunc.h> /* strconvert */
+#include "my_pthread.h"
handlerton *xpand_hton = NULL;
@@ -41,69 +42,100 @@ static MYSQL_SYSVAR_INT
NULL, NULL, -1, -1, 2147483647, 0
);
-char *xpand_host;
-static MYSQL_SYSVAR_STR
+//state for load balancing
+int xpand_hosts_cur; //protected by my_atomic's
+ulong xpand_balance_algorithm;
+const char* balance_algorithm_names[]=
+{
+ "first", "round_robin", NullS
+};
+
+TYPELIB balance_algorithms=
+{
+ array_elements(balance_algorithm_names) - 1, "",
+ balance_algorithm_names, NULL
+};
+
+static void update_balance_algorithm(MYSQL_THD thd, struct st_mysql_sys_var *var,
+ void *var_ptr, const void *save)
+{
+ *static_cast<ulong *>(var_ptr) = *static_cast<const ulong *>(save);
+ my_atomic_store32(&xpand_hosts_cur, 0);
+}
+
+static MYSQL_SYSVAR_ENUM
(
- host,
- xpand_host,
- PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,
- "Xpand host",
- NULL, NULL, "127.0.0.1"
+ balance_algorithm,
+ xpand_balance_algorithm,
+ PLUGIN_VAR_OPCMDARG,
+ "Method for managing load balancing of Clustrix nodes, can take values FIRST or ROUND_ROBIN",
+ NULL, update_balance_algorithm, XPAND_BALANCE_ROUND_ROBIN, &balance_algorithms
);
-int host_list_cnt;
-char **host_list;
+//current list of clx hosts
+static PSI_rwlock_key key_xpand_hosts;
+mysql_rwlock_t xpand_hosts_lock;
+xpand_host_list *xpand_hosts;
-static void free_host_list()
+//only call while holding lock
+static void clear_hosts()
{
- if (host_list) {
- for (int i = 0; host_list[i]; i++)
- my_free(host_list[i]);
- my_free(host_list);
- host_list = NULL;
- }
+ delete xpand_hosts;
+ xpand_hosts = NULL;
+ my_atomic_store32(&xpand_hosts_cur, 0);
}
-static void update_host_list(char *xpand_host)
+static int check_hosts(MYSQL_THD thd, struct st_mysql_sys_var *var,
+ void *save, struct st_mysql_value *value)
{
- free_host_list();
+ char b;
+ int len = 0;
+ const char *val = value->val_str(value, &b, &len);
- int cnt = 0;
- for (char *p = xpand_host, *s = xpand_host; ; p++) {
- if (*p == ',' || *p == '\0') {
- if (p > s) {
- cnt++;
- }
- if (!*p)
- break;
- s = p + 1;
- }
- }
+ if (!val)
+ return HA_ERR_OUT_OF_MEM;
- DBUG_PRINT("host_cnt", ("%d", cnt));
- host_list = (char **)my_malloc(sizeof(char *) * cnt+1, MYF(MY_WME));
- host_list[cnt] = 0;
- host_list_cnt = cnt;
-
- int i = 0;
- for (char *p = xpand_host, *s = xpand_host; ; p++) {
- if (*p == ',' || *p == '\0') {
- if (p > s) {
- char *host = (char *)my_malloc(p - s + 1, MYF(MY_WME));
- host[p-s] = '\0';
- memcpy(host, s, p-s);
- DBUG_PRINT("host", ("%s", host));
- host_list[i++] = host;
- }
- if (!*p)
- break;
- s = p + 1;
- }
+ int error_code = 0;
+ xpand_host_list *host_list = xpand_host_list::create(val, thd, &error_code);
+ if (error_code)
+ return error_code;
+
+ *static_cast<xpand_host_list**>(save) = host_list;
+ return 0;
+}
+
+static void update_hosts(MYSQL_THD thd, struct st_mysql_sys_var *var,
+ void *var_ptr, const void *save)
+{
+ mysql_rwlock_wrlock(&xpand_hosts_lock);
+
+ xpand_host_list *from_save = *static_cast<xpand_host_list * const *>(save);
+ char* raw = from_save->full_list;
+
+ int error_code = 0;
+ xpand_host_list *new_hosts = xpand_host_list::create(raw, &error_code);
+ if (error_code) {
+ my_printf_error(error_code, "Unhandled error setting xpand hostlist", MYF(0));
+ return;
}
- DBUG_PRINT("xpand_host", ("%s", xpand_host));
+ clear_hosts();
+ xpand_hosts = new_hosts;
+ *static_cast<char**>(var_ptr) = new_hosts->full_list;
+
+ mysql_rwlock_unlock(&xpand_hosts_lock);
}
+static char *xpand_hosts_str;
+static MYSQL_SYSVAR_STR
+(
+ hosts,
+ xpand_hosts_str,
+ PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,
+ "List of xpand hostnames seperated by commas, semicolons or spaces",
+ check_hosts, update_hosts, "localhost"
+);
+
char *xpand_username;
static MYSQL_SYSVAR_STR
(
@@ -1403,15 +1435,20 @@ static int xpand_init(void *p)
xpand_hton->create_select = create_xpand_select_handler;
xpand_hton->create_derived = create_xpand_derived_handler;
- update_host_list(xpand_host);
-
- DBUG_RETURN(0);
+ mysql_rwlock_init(key_xpand_hosts, &xpand_hosts_lock);
+ mysql_rwlock_wrlock(&xpand_hosts_lock);
+ int error_code = 0;
+ xpand_hosts = xpand_host_list::create(xpand_hosts_str, &error_code);
+ mysql_rwlock_unlock(&xpand_hosts_lock);
+ DBUG_RETURN(error_code);
}
static int xpand_deinit(void *p)
{
DBUG_ENTER("xpand_deinit");
- free_host_list();
+ mysql_rwlock_wrlock(&xpand_hosts_lock);
+ delete xpand_hosts;
+ mysql_rwlock_destroy(&xpand_hosts_lock);
DBUG_RETURN(0);
}
@@ -1425,7 +1462,8 @@ static struct st_mysql_sys_var* xpand_system_variables[] =
MYSQL_SYSVAR(connect_timeout),
MYSQL_SYSVAR(read_timeout),
MYSQL_SYSVAR(write_timeout),
- MYSQL_SYSVAR(host),
+ MYSQL_SYSVAR(balance_algorithm),
+ MYSQL_SYSVAR(hosts),
MYSQL_SYSVAR(username),
MYSQL_SYSVAR(password),
MYSQL_SYSVAR(port),
diff --git a/storage/xpand/xpand_connection.cc b/storage/xpand/xpand_connection.cc
index 34ba7b7dfe1..73f837e6108 100644
--- a/storage/xpand/xpand_connection.cc
+++ b/storage/xpand/xpand_connection.cc
@@ -10,6 +10,7 @@ Copyright (c) 2019, MariaDB Corporation.
#include "handler.h"
#include "table.h"
#include "sql_class.h"
+#include "my_pthread.h"
#include "tztime.h"
//#include "errmsg.h"
@@ -125,30 +126,42 @@ void xpand_connection::disconnect(bool is_destructor)
DBUG_VOID_RETURN;
}
-int host_list_next;
-extern int host_list_cnt;
-extern char **host_list;
+extern int xpand_hosts_cur;
+extern ulong xpand_balance_algorithm;
+
+extern mysql_rwlock_t xpand_hosts_lock;
+extern xpand_host_list *xpand_hosts;
int xpand_connection::connect()
{
- int error_code = 0;
- my_bool my_true = 1;
DBUG_ENTER("xpand_connection::connect");
+ int start = 0;
+ if (xpand_balance_algorithm == XPAND_BALANCE_ROUND_ROBIN)
+ start = my_atomic_add32(&xpand_hosts_cur, 1);
- // cpu concurrency by damned!
- int host_num = host_list_next;
- host_num = host_num % host_list_cnt;
- char *host = host_list[host_num];
- host_list_next = host_num + 1;
- DBUG_PRINT("host", ("%s", host));
+ mysql_rwlock_rdlock(&xpand_hosts_lock);
- /* Validate the connection parameters */
- if (!strcmp(xpand_socket, ""))
- if (!strcmp(host, "127.0.0.1"))
- if (xpand_port == MYSQL_PORT_DEFAULT)
- DBUG_RETURN(ER_CONNECT_TO_FOREIGN_DATA_SOURCE);
+ //search for available host
+ int error_code = 0;
+ for (int i = 0; i < xpand_hosts->hosts_len; i++) {
+ char *host = xpand_hosts->hosts[(start + i) % xpand_hosts->hosts_len];
+ error_code = connect_direct(host);
+ if (!error_code)
+ break;
+ }
+ mysql_rwlock_unlock(&xpand_hosts_lock);
+ if (error_code)
+ my_error(error_code, MYF(0), "clustrix");
+
+ DBUG_RETURN(error_code);
+}
- //xpand_net.methods = &connection_methods;
+
+int xpand_connection::connect_direct(char *host)
+{
+ DBUG_ENTER("xpand_connection::connect_direct");
+ my_bool my_true = true;
+ DBUG_PRINT("host", ("%s", host));
if (!mysql_init(&xpand_net))
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
@@ -180,29 +193,20 @@ int xpand_connection::connect()
}
#endif
+ int error_code = 0;
if (!mysql_real_connect(&xpand_net, host, xpand_username, xpand_password,
NULL, xpand_port, xpand_socket,
CLIENT_MULTI_STATEMENTS))
{
error_code = mysql_errno(&xpand_net);
disconnect();
-
- if (error_code != CR_CONN_HOST_ERROR &&
- error_code != CR_CONNECTION_ERROR)
- {
- if (error_code == ER_CON_COUNT_ERROR)
- {
- my_error(ER_CON_COUNT_ERROR, MYF(0));
- DBUG_RETURN(ER_CON_COUNT_ERROR);
- }
- my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), host);
- DBUG_RETURN(ER_CONNECT_TO_FOREIGN_DATA_SOURCE);
- }
}
- xpand_net.reconnect = 1;
+ if (error_code && error_code != ER_CON_COUNT_ERROR) {
+ error_code = ER_CONNECT_TO_FOREIGN_DATA_SOURCE;
+ }
- DBUG_RETURN(0);
+ DBUG_RETURN(error_code);
}
int xpand_connection::add_status_vars()
@@ -1268,3 +1272,69 @@ int xpand_connection::add_command_operand_bitmap(MY_BITMAP *bitmap)
command_length += no_bytes;
return 0;
}
+
+/****************************************************************************
+** Class xpand_host_list
+****************************************************************************/
+
+xpand_host_list *xpand_host_list::create(const char *hosts, int *error_code)
+{
+ return xpand_host_list::create(hosts, NULL, error_code);
+}
+
+xpand_host_list *xpand_host_list::create(const char *hosts, THD *thd, int *error_code)
+{
+ xpand_host_list *list = static_cast<xpand_host_list*>(
+ thd ?
+ thd_calloc(thd, sizeof(xpand_host_list)) :
+ my_malloc(sizeof(xpand_host_list), MYF(MY_WME | MY_ZEROFILL)));
+ if (!list) {
+ *error_code = HA_ERR_OUT_OF_MEM;
+ return NULL;
+ }
+
+ list->full_list = thd ?
+ thd_strdup(thd, hosts) :
+ my_strdup(hosts, MYF(MY_WME));
+ list->strtok_buf = thd ?
+ thd_strdup(thd, hosts) :
+ my_strdup(hosts, MYF(MY_WME));
+ if (!list->full_list || !list->strtok_buf) {
+ *error_code = HA_ERR_OUT_OF_MEM;
+ return NULL;
+ }
+
+ const char *sep = ",; ";
+ //parse into array
+ int i = 0;
+ char *cursor = NULL;
+ char *token = NULL;
+ for (token = strtok_r(list->strtok_buf, sep, &cursor);
+ token && i < max_host_count;
+ token = strtok_r(NULL, sep, &cursor)) {
+ list->hosts[i] = token;
+ i++;
+ }
+
+ //host count out of range
+ if (i == 0 || token) {
+ my_free(list->full_list);
+ my_free(list->strtok_buf);
+ my_free(list);
+ *error_code = ER_BAD_HOST_ERROR;
+ return NULL;
+ }
+ list->hosts_len = i;
+
+ return list;
+}
+
+void xpand_host_list::operator delete(void *p)
+{
+ xpand_host_list *list = static_cast<xpand_host_list*>(p);
+ if (list) {
+ my_free(list->full_list);
+ my_free(list->strtok_buf);
+ }
+ my_free(list);
+}
diff --git a/storage/xpand/xpand_connection.h b/storage/xpand/xpand_connection.h
index e2d4761dd68..f641a105e89 100644
--- a/storage/xpand/xpand_connection.h
+++ b/storage/xpand/xpand_connection.h
@@ -21,11 +21,32 @@ Copyright (c) 2019, MariaDB Corporation.
#define XPAND_SERVER_REQUEST 30
-typedef enum xpand_lock_mode {
+enum xpand_lock_mode_t {
XPAND_NO_LOCKS,
XPAND_SHARED,
XPAND_EXCLUSIVE,
-} xpand_lock_mode_t;
+};
+
+enum xpand_balance_algorithm_enum {
+ XPAND_BALANCE_FIRST,
+ XPAND_BALANCE_ROUND_ROBIN
+};
+
+static const int max_host_count = 128;
+class xpand_host_list {
+private:
+ char *strtok_buf;
+public:
+ char *full_list;
+ int hosts_len;
+ char *hosts[max_host_count];
+
+ static xpand_host_list *create(const char *hosts, int *error_code);
+ static xpand_host_list *create(const char *hosts, THD *thd, int *error_code);
+ xpand_host_list() = delete;
+ static void operator delete(void *p);
+};
+
class xpand_connection_cursor;
class xpand_connection
@@ -50,6 +71,7 @@ public:
return xpand_net.net.vio;
}
int connect();
+ int connect_direct(char *host);
void disconnect(bool is_destructor = FALSE);
bool has_open_transaction();