diff options
Diffstat (limited to 'storage/xpand/xpand_connection.cc')
-rw-r--r-- | storage/xpand/xpand_connection.cc | 1357 |
1 files changed, 1357 insertions, 0 deletions
diff --git a/storage/xpand/xpand_connection.cc b/storage/xpand/xpand_connection.cc new file mode 100644 index 00000000000..e6b5059322b --- /dev/null +++ b/storage/xpand/xpand_connection.cc @@ -0,0 +1,1357 @@ +/***************************************************************************** +Copyright (c) 2019, 2020, MariaDB Corporation. +*****************************************************************************/ + +/** @file xpand_connection.cc */ + +#include "xpand_connection.h" +#include "ha_xpand.h" +#include <string> +#include "handler.h" +#include "table.h" +#include "sql_class.h" +#include "my_pthread.h" +#include "tztime.h" +#include "errmsg.h" + +#ifdef _WIN32 +#include <stdlib.h> +#define htobe64 _byteswap_uint64 +#define be64toh _byteswap_uint64 +#define htobe32 _byteswap_ulong +#define be32toh _byteswap_ulong +#define htobe16 _byteswap_ushort +#define be16toh _byteswap_ushort +#endif + +#if defined(__APPLE__) +#include <libkern/OSByteOrder.h> +#define htobe64(x) OSSwapHostToBigInt64(x) +#define be64toh(x) OSSwapBigToHostInt64(x) +#define htobe32(x) OSSwapHostToBigInt32(x) +#define be32toh(x) OSSwapBigToHostInt32(x) +#define htobe16(x) OSSwapHostToBigInt16(x) +#define be16toh(x) OSSwapBigToHostInt16(x) +#endif + + +extern int xpand_connect_timeout; +extern int xpand_read_timeout; +extern int xpand_write_timeout; +extern char *xpand_username; +extern char *xpand_password; +extern uint xpand_port; +extern char *xpand_socket; + +/* + This class implements the commands that can be sent to the cluster by the + Xpand engine. All of these commands return a status to the caller, but some + commands also create open invocations on the cluster, which must be closed by + sending additional commands. + + Transactions on the cluster are started using flags attached to commands, and + transactions are committed or rolled back using separate commands. + + Methods ending with _next affect the transaction state after the next command + is sent to the cluster. Other transaction commands are sent to the cluster + immediately, and the state is changed before they return. + + _____________________ _______________________ + | | | | | | + V | | V | | + NONE --> REQUESTED --> STARTED --> NEW_STMT | + | | + `----> ROLLBACK_STMT ---` + + The commit and rollback commands will change any other state to NONE. This + includes the REQUESTED state, for which nothing will be sent to the cluster. + The rollback statement command can likewise change the state from NEW_STMT to + STARTED without sending anything to the cluster. + + In addition, the XPAND_TRANS_AUTOCOMMIT flag will cause the transactions + for commands that complete without leaving open invocations on the cluster to + be committed if successful or rolled back if there was an error. If + auto-commit is enabled, only one open invocation may be in progress at a + time. +*/ + +enum xpand_trans_state { + XPAND_TRANS_STARTED = 0, + XPAND_TRANS_REQUESTED = 1, + XPAND_TRANS_NEW_STMT = 2, + XPAND_TRANS_ROLLBACK_STMT = 4, + XPAND_TRANS_NONE = 32, +}; +const int XPAND_TRANS_STARTS_STMT = (XPAND_TRANS_NEW_STMT | + XPAND_TRANS_REQUESTED | + XPAND_TRANS_ROLLBACK_STMT); + +enum xpand_trans_post_flags { + XPAND_TRANS_AUTOCOMMIT = 8, + XPAND_TRANS_NO_POST_FLAGS = 0, +}; + +enum xpand_commands { + XPAND_WRITE_ROW = 1, + XPAND_SCAN_TABLE, + XPAND_SCAN_NEXT, + XPAND_SCAN_STOP, + XPAND_KEY_READ, + XPAND_KEY_DELETE, + XPAND_SCAN_QUERY, + XPAND_KEY_UPDATE, + XPAND_SCAN_FROM_KEY, + XPAND_UPDATE_QUERY, + XPAND_COMMIT, + XPAND_ROLLBACK, + XPAND_SCAN_TABLE_COND, +}; + +/**************************************************************************** +** Class xpand_connection +****************************************************************************/ +xpand_connection::xpand_connection() + : command_buffer(NULL), command_buffer_length(0), command_length(0), + trans_state(XPAND_TRANS_NONE), trans_flags(XPAND_TRANS_NO_POST_FLAGS) +{ + DBUG_ENTER("xpand_connection::xpand_connection"); + memset(&xpand_net, 0, sizeof(MYSQL)); + DBUG_VOID_RETURN; +} + +xpand_connection::~xpand_connection() +{ + DBUG_ENTER("xpand_connection::~xpand_connection"); + if (is_connected()) + disconnect(TRUE); + + if (command_buffer) + my_free(command_buffer); + DBUG_VOID_RETURN; +} + +void xpand_connection::disconnect(bool is_destructor) +{ + DBUG_ENTER("xpand_connection::disconnect"); + if (is_destructor) + { + /* + Connection object destruction occurs after the destruction of + the thread used by the network has begun, so usage of that + thread object now is not reliable + */ + xpand_net.net.thd = NULL; + } + mysql_close(&xpand_net); + DBUG_VOID_RETURN; +} + +extern int xpand_hosts_cur; +extern ulong xpand_balance_algorithm; + +extern mysql_rwlock_t xpand_hosts_lock; +extern xpand_host_list *xpand_hosts; + +int xpand_connection::connect() +{ + DBUG_ENTER("xpand_connection::connect"); + int start = 0; + if (xpand_balance_algorithm == XPAND_BALANCE_ROUND_ROBIN) + start = my_atomic_add32(&xpand_hosts_cur, 1); + + mysql_rwlock_rdlock(&xpand_hosts_lock); + + //search for available host + int error_code = HA_ERR_NO_CONNECTION; + for (int i = 0; i < xpand_hosts->hosts_len; i++) { + char *host = xpand_hosts->hosts[(start + i) % xpand_hosts->hosts_len]; + error_code = connect_direct(host); + if (!error_code) + break; + } + mysql_rwlock_unlock(&xpand_hosts_lock); + DBUG_RETURN(error_code); +} + + +int xpand_connection::connect_direct(char *host) +{ + DBUG_ENTER("xpand_connection::connect_direct"); + my_bool my_true = true; + DBUG_PRINT("host", ("%s", host)); + + if (!mysql_init(&xpand_net)) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + + uint protocol_tcp = MYSQL_PROTOCOL_TCP; + mysql_options(&xpand_net, MYSQL_OPT_PROTOCOL, &protocol_tcp); + mysql_options(&xpand_net, MYSQL_OPT_READ_TIMEOUT, + &xpand_read_timeout); + mysql_options(&xpand_net, MYSQL_OPT_WRITE_TIMEOUT, + &xpand_write_timeout); + mysql_options(&xpand_net, MYSQL_OPT_CONNECT_TIMEOUT, + &xpand_connect_timeout); + mysql_options(&xpand_net, MYSQL_OPT_USE_REMOTE_CONNECTION, + NULL); + mysql_options(&xpand_net, MYSQL_SET_CHARSET_NAME, "utf8mb4"); + mysql_options(&xpand_net, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY, + (char *) &my_true); + mysql_options(&xpand_net, MYSQL_INIT_COMMAND,"SET autocommit=0"); + +#ifdef XPAND_CONNECTION_SSL + if (opt_ssl_ca_length | conn->tgt_ssl_capath_length | + conn->tgt_ssl_cert_length | conn->tgt_ssl_key_length) + { + mysql_ssl_set(&xpand_net, conn->tgt_ssl_key, conn->tgt_ssl_cert, + conn->tgt_ssl_ca, conn->tgt_ssl_capath, conn->tgt_ssl_cipher); + if (conn->tgt_ssl_vsc) + { + my_bool verify_flg = TRUE; + mysql_options(&xpand_net, MYSQL_OPT_SSL_VERIFY_SERVER_CERT, &verify_flg); + } + } +#endif + + int error_code = 0; + if (!mysql_real_connect(&xpand_net, host, xpand_username, xpand_password, + NULL, xpand_port, xpand_socket, + CLIENT_MULTI_STATEMENTS)) + { + sql_print_error("Error connecting to xpand: %s", mysql_error(&xpand_net)); + disconnect(); + error_code = HA_ERR_NO_CONNECTION; + } + + DBUG_RETURN(error_code); +} + +int xpand_connection::add_status_vars() +{ + DBUG_ENTER("xpand_connection::add_status_vars"); + + if (!(trans_state & XPAND_TRANS_STARTS_STMT)) + DBUG_RETURN(add_command_operand_uchar(0)); + + int error_code = 0; + system_variables vars = current_thd->variables; + if ((error_code = add_command_operand_uchar(1))) + DBUG_RETURN(error_code); + //sql mode + if ((error_code = add_command_operand_ulonglong(vars.sql_mode))) + DBUG_RETURN(error_code); + //auto increment state + if ((error_code = add_command_operand_ushort(vars.auto_increment_increment))) + DBUG_RETURN(error_code); + if ((error_code = add_command_operand_ushort(vars.auto_increment_offset))) + DBUG_RETURN(error_code); + //character sets and collations + if ((error_code = add_command_operand_ushort(vars.character_set_results->number))) + DBUG_RETURN(error_code); + if ((error_code = add_command_operand_ushort(vars.character_set_client->number))) + DBUG_RETURN(error_code); + if ((error_code = add_command_operand_ushort(vars.collation_connection->number))) + DBUG_RETURN(error_code); + if ((error_code = add_command_operand_ushort(vars.collation_server->number))) + DBUG_RETURN(error_code); + //timezone and time names + String tzone; + vars.time_zone->get_name()->print(&tzone, system_charset_info); + if ((error_code = add_command_operand_str((const uchar*)tzone.ptr(),tzone.length()))) + DBUG_RETURN(error_code); + if ((error_code = add_command_operand_ushort(vars.lc_time_names->number))) + DBUG_RETURN(error_code); + //transaction isolation + if ((error_code = add_command_operand_uchar(vars.tx_isolation))) + DBUG_RETURN(error_code); + DBUG_RETURN(0); +} + +int xpand_connection::begin_command(uchar command) +{ + if (trans_state == XPAND_TRANS_NONE) + return HA_ERR_INTERNAL_ERROR; + + command_length = 0; + int error_code = 0; + if ((error_code = add_command_operand_uchar(command))) + return error_code; + + if ((error_code = add_command_operand_uchar(trans_state | trans_flags))) + return error_code; + + if ((error_code = add_status_vars())) + return error_code; + + return error_code; +} + +int xpand_connection::send_command() +{ + /* + Please note: + * The transaction state is set before the command is sent because rolling + back a nonexistent transaction is better than leaving a tranaction open + on the cluster. + * The state may have alreadly been STARTED. + * Commit and rollback commands update the transaction state after calling + this function. + * If auto-commit is enabled, the state may also updated after the + response has been processed. We do not clear the auto-commit flag here + because it needs to be sent with each command until the transaction is + committed or rolled back. + */ + trans_state = XPAND_TRANS_STARTED; + + if (simple_command(&xpand_net, + (enum_server_command)XPAND_SERVER_REQUEST, + command_buffer, command_length, TRUE)) + return mysql_errno(&xpand_net); + return 0; +} + +int xpand_connection::read_query_response() +{ + int error_code = 0; + if (xpand_net.methods->read_query_result(&xpand_net)) + error_code = mysql_errno(&xpand_net); + auto_commit_closed(); + return error_code; +} + +bool xpand_connection::has_open_transaction() +{ + return trans_state != XPAND_TRANS_NONE; +} + +int xpand_connection::commit_transaction() +{ + DBUG_ENTER("xpand_connection::commit_transaction"); + if (trans_state == XPAND_TRANS_NONE) + DBUG_RETURN(HA_ERR_INTERNAL_ERROR); + + if (trans_state == XPAND_TRANS_REQUESTED) { + trans_state = XPAND_TRANS_NONE; + trans_flags = XPAND_TRANS_NO_POST_FLAGS; + DBUG_RETURN(0); + } + + int error_code; + if ((error_code = begin_command(XPAND_COMMIT))) + DBUG_RETURN(error_code); + + if ((error_code = send_command())) + DBUG_RETURN(error_code); + + if ((error_code = read_query_response())) + DBUG_RETURN(error_code); + + trans_state = XPAND_TRANS_NONE; + trans_flags = XPAND_TRANS_NO_POST_FLAGS; + DBUG_RETURN(error_code); +} + +int xpand_connection::rollback_transaction() +{ + DBUG_ENTER("xpand_connection::rollback_transaction"); + if (trans_state == XPAND_TRANS_NONE || + trans_state == XPAND_TRANS_REQUESTED) { + trans_state = XPAND_TRANS_NONE; + DBUG_RETURN(0); + } + + int error_code; + if ((error_code = begin_command(XPAND_ROLLBACK))) + DBUG_RETURN(error_code); + + if ((error_code = send_command())) + DBUG_RETURN(error_code); + + if ((error_code = read_query_response())) + DBUG_RETURN(error_code); + + trans_state = XPAND_TRANS_NONE; + trans_flags = XPAND_TRANS_NO_POST_FLAGS; + DBUG_RETURN(error_code); +} + +int xpand_connection::begin_transaction_next() +{ + DBUG_ENTER("xpand_connection::begin_transaction_next"); + if (trans_state != XPAND_TRANS_NONE || + trans_flags != XPAND_TRANS_NO_POST_FLAGS) + DBUG_RETURN(HA_ERR_INTERNAL_ERROR); + + trans_state = XPAND_TRANS_REQUESTED; + DBUG_RETURN(0); +} + +int xpand_connection::new_statement_next() +{ + DBUG_ENTER("xpand_connection::new_statement_next"); + if (trans_state != XPAND_TRANS_STARTED || + trans_flags != XPAND_TRANS_NO_POST_FLAGS) + DBUG_RETURN(HA_ERR_INTERNAL_ERROR); + + trans_state = XPAND_TRANS_NEW_STMT; + DBUG_RETURN(0); +} + +int xpand_connection::rollback_statement_next() +{ + DBUG_ENTER("xpand_connection::rollback_statement_next"); + if (trans_state != XPAND_TRANS_STARTED || + trans_flags != XPAND_TRANS_NO_POST_FLAGS) + DBUG_RETURN(HA_ERR_INTERNAL_ERROR); + + trans_state = XPAND_TRANS_ROLLBACK_STMT; + DBUG_RETURN(0); +} + +void xpand_connection::auto_commit_next() +{ + trans_flags |= XPAND_TRANS_AUTOCOMMIT; +} + +void xpand_connection::auto_commit_closed() +{ + if (trans_flags & XPAND_TRANS_AUTOCOMMIT) { + trans_flags &= ~XPAND_TRANS_AUTOCOMMIT; + trans_state = XPAND_TRANS_NONE; + } +} + +int xpand_connection::run_query(String &stmt) +{ + int error_code = mysql_real_query(&xpand_net, stmt.ptr(), stmt.length()); + if (error_code) + return mysql_errno(&xpand_net); + return error_code; +} + +int xpand_connection::write_row(ulonglong xpand_table_oid, uchar *packed_row, + size_t packed_size, ulonglong *last_insert_id) +{ + int error_code; + command_length = 0; + + // row based commands should not be called with auto commit. + if (trans_flags & XPAND_TRANS_AUTOCOMMIT) + return HA_ERR_INTERNAL_ERROR; + + if ((error_code = begin_command(XPAND_WRITE_ROW))) + return error_code; + + if ((error_code = add_command_operand_ulonglong(xpand_table_oid))) + return error_code; + + if ((error_code = add_command_operand_str(packed_row, packed_size))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + if ((error_code = read_query_response())) { + if (error_code == ER_DUP_ENTRY) + return HA_ERR_FOUND_DUPP_KEY; + return error_code; + } + + *last_insert_id = xpand_net.insert_id; + return error_code; +} + +int xpand_connection::key_update(ulonglong xpand_table_oid, uchar *packed_key, + size_t packed_key_length, + MY_BITMAP *update_set, uchar *packed_new_data, + size_t packed_new_length) +{ + int error_code; + command_length = 0; + + // row based commands should not be called with auto commit. + if (trans_flags & XPAND_TRANS_AUTOCOMMIT) + return HA_ERR_INTERNAL_ERROR; + + if ((error_code = begin_command(XPAND_KEY_UPDATE))) + return error_code; + + if ((error_code = add_command_operand_ulonglong(xpand_table_oid))) + return error_code; + + if ((error_code = add_command_operand_str(packed_key, packed_key_length))) + return error_code; + + if ((error_code = add_command_operand_bitmap(update_set))) + return error_code; + + if ((error_code = add_command_operand_str(packed_new_data, + packed_new_length))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + if ((error_code = read_query_response())) + return error_code; + + return error_code; +} + +int xpand_connection::key_delete(ulonglong xpand_table_oid, + uchar *packed_key, size_t packed_key_length) +{ + int error_code; + command_length = 0; + + // row based commands should not be called with auto commit. + if (trans_flags & XPAND_TRANS_AUTOCOMMIT) + return HA_ERR_INTERNAL_ERROR; + + if ((error_code = begin_command(XPAND_KEY_DELETE))) + return error_code; + + if ((error_code = add_command_operand_ulonglong(xpand_table_oid))) + return error_code; + + if ((error_code = add_command_operand_str(packed_key, packed_key_length))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + if ((error_code = read_query_response())) + return error_code; + + return error_code; +} + +int xpand_connection::key_read(ulonglong xpand_table_oid, uint index, + xpand_lock_mode_t lock_mode, MY_BITMAP *read_set, + uchar *packed_key, ulong packed_key_length, + uchar **rowdata, ulonglong *rowdata_length) +{ + int error_code; + command_length = 0; + + // row based commands should not be called with auto commit. + if (trans_flags & XPAND_TRANS_AUTOCOMMIT) + return HA_ERR_INTERNAL_ERROR; + + if ((error_code = begin_command(XPAND_KEY_READ))) + return error_code; + + if ((error_code = add_command_operand_ulonglong(xpand_table_oid))) + return error_code; + + if ((error_code = add_command_operand_uint(index))) + return error_code; + + if ((error_code = add_command_operand_uchar((uchar)lock_mode))) + return error_code; + + if ((error_code = add_command_operand_bitmap(read_set))) + return error_code; + + if ((error_code = add_command_operand_str(packed_key, packed_key_length))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + ulong packet_length = cli_safe_read(&xpand_net); + if (packet_length == packet_error) + return mysql_errno(&xpand_net); + + uchar *data = xpand_net.net.read_pos; + *rowdata_length = safe_net_field_length_ll(&data, packet_length); + *rowdata = (uchar *)my_malloc(*rowdata_length, MYF(MY_WME)); + memcpy(*rowdata, data, *rowdata_length); + + packet_length = cli_safe_read(&xpand_net); + if (packet_length == packet_error) { + my_free(*rowdata); + *rowdata = NULL; + *rowdata_length = 0; + return mysql_errno(&xpand_net); + } + + return 0; +} + +class xpand_connection_cursor { + struct rowdata { + ulong length; + uchar *data; + }; + + ulong current_row; + ulong last_row; + struct rowdata *rows; + uchar *outstanding_row; // to be freed on next request. + MYSQL *xpand_net; + +public: + ulong buffer_size; + ulonglong scan_refid; + bool eof_reached; + +private: + int cache_row(uchar *rowdata, ulong rowdata_length) + { + DBUG_ENTER("xpand_connection_cursor::cache_row"); + rows[last_row].length = rowdata_length; + rows[last_row].data = (uchar *)my_malloc(rowdata_length, MYF(MY_WME)); + if (!rows[last_row].data) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + memcpy(rows[last_row].data, rowdata, rowdata_length); + last_row++; + DBUG_RETURN(0); + } + + int load_rows_impl(bool *stmt_completed) + { + DBUG_ENTER("xpand_connection_cursor::load_rows_impl"); + int error_code = 0; + ulong packet_length = cli_safe_read(xpand_net); + if (packet_length == packet_error) { + error_code = mysql_errno(xpand_net); + *stmt_completed = TRUE; + if (error_code == HA_ERR_END_OF_FILE) { + // We have read all rows for query. + eof_reached = TRUE; + DBUG_RETURN(0); + } + DBUG_RETURN(error_code); + } + + uchar *rowdata = xpand_net->net.read_pos; + ulong rowdata_length = (ulong) safe_net_field_length_ll(&rowdata, packet_length); + if (!rowdata_length) { + // We have read all rows in this batch. + DBUG_RETURN(0); + } + + if ((error_code = cache_row(rowdata, rowdata_length))) + DBUG_RETURN(error_code); + + DBUG_RETURN(load_rows_impl(stmt_completed)); + } + +public: + xpand_connection_cursor(MYSQL *xpand_net_, ulong bufsize) + { + DBUG_ENTER("xpand_connection_cursor::xpand_connection_cursor"); + xpand_net = xpand_net_; + eof_reached = FALSE; + current_row = 0; + last_row = 0; + outstanding_row = NULL; + buffer_size = bufsize; + rows = NULL; + DBUG_VOID_RETURN; + } + + ~xpand_connection_cursor() + { + DBUG_ENTER("xpand_connection_cursor::~xpand_connection_cursor"); + if (outstanding_row) + my_free(outstanding_row); + if (rows) { + while (current_row < last_row) + my_free(rows[current_row++].data); + my_free(rows); + } + DBUG_VOID_RETURN; + } + + int load_rows(bool *stmt_completed) + { + DBUG_ENTER("xpand_connection_cursor::load_rows"); + current_row = 0; + last_row = 0; + DBUG_RETURN(load_rows_impl(stmt_completed)); + } + + int initialize(bool *stmt_completed) + { + DBUG_ENTER("xpand_connection_cursor::initialize"); + ulong packet_length = cli_safe_read(xpand_net); + if (packet_length == packet_error) { + *stmt_completed = TRUE; + int error_code = mysql_errno(xpand_net); + my_printf_error(error_code, "Xpand error: %s", MYF(0), + mysql_error(xpand_net)); + DBUG_RETURN(error_code); + } + + unsigned char *pos = xpand_net->net.read_pos; + scan_refid = safe_net_field_length_ll(&pos, packet_length); + + rows = (struct rowdata *)my_malloc(buffer_size * sizeof(struct rowdata), + MYF(MY_WME)); + if (!rows) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + + DBUG_RETURN(load_rows(stmt_completed)); + } + + uchar *retrieve_row(ulong *rowdata_length) + { + DBUG_ENTER("xpand_connection_cursor::retrieve_row"); + if (outstanding_row) { + my_free(outstanding_row); + outstanding_row = NULL; + } + if (current_row == last_row) + DBUG_RETURN(NULL); + *rowdata_length = rows[current_row].length; + outstanding_row = rows[current_row].data; + current_row++; + DBUG_RETURN(outstanding_row); + } +}; + +int xpand_connection::allocate_cursor(MYSQL *xpand_net, ulong buffer_size, + xpand_connection_cursor **scan) +{ + DBUG_ENTER("xpand_connection::allocate_cursor"); + *scan = new xpand_connection_cursor(xpand_net, buffer_size); + if (!*scan) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + + bool stmt_completed = FALSE; + int error_code = (*scan)->initialize(&stmt_completed); + if (error_code) { + delete *scan; + *scan = NULL; + } + + if (stmt_completed) + auto_commit_closed(); + + DBUG_RETURN(error_code); +} + +int xpand_connection::scan_table(ulonglong xpand_table_oid, + xpand_lock_mode_t lock_mode, + MY_BITMAP *read_set, ushort row_req, + xpand_connection_cursor **scan, + String* pushdown_cond_sql) +{ + int error_code; + command_length = 0; + + // row based commands should not be called with auto commit. + if (trans_flags & XPAND_TRANS_AUTOCOMMIT) + return HA_ERR_INTERNAL_ERROR; + + if (pushdown_cond_sql != nullptr) { + if ((error_code= begin_command(XPAND_SCAN_TABLE_COND))) + return error_code; + } else { + if ((error_code= begin_command(XPAND_SCAN_TABLE))) + return error_code; + } + + if ((error_code = add_command_operand_ushort(row_req))) + return error_code; + + if ((error_code = add_command_operand_ulonglong(xpand_table_oid))) + return error_code; + + if ((error_code = add_command_operand_uchar((uchar)lock_mode))) + return error_code; + + if ((error_code = add_command_operand_bitmap(read_set))) + return error_code; + + if (pushdown_cond_sql != nullptr) { + if ((error_code= add_command_operand_str( + reinterpret_cast<const uchar*>(pushdown_cond_sql->ptr()), + pushdown_cond_sql->length()))) { + return error_code; + } + } + + if ((error_code = send_command())) + return error_code; + + return allocate_cursor(&xpand_net, row_req, scan); +} + +/** + * @brief + * Sends a command to initiate query scan. + * @details + * Sends a command over mysql protocol connection to initiate an + * arbitrary query using a query text. + * Uses field types, field metadata and nullability to explicitly + * cast result to expected data type. Exploits RBR TABLE_MAP_EVENT + * format + sends SQL text. + * @args + * stmt& Query text to send + * fieldtype* array of byte wide field types of result projection + * null_bits* fields nullability bitmap of result projection + * field_metadata* Field metadata of result projection + * scan_refid id used to reference this scan later + * Used in pushdowns to initiate query scan. + **/ +int xpand_connection::scan_query(String &stmt, uchar *fieldtype, uint fields, + uchar *null_bits, uint null_bits_size, + uchar *field_metadata, + uint field_metadata_size, ushort row_req, + ulonglong *oids, + xpand_connection_cursor **scan) +{ + int error_code; + command_length = 0; + + if ((error_code = begin_command(XPAND_SCAN_QUERY))) + return error_code; + + do { + if ((error_code = add_command_operand_ulonglong(*oids))) + return error_code; + } + while (*oids++); + + if ((error_code = add_command_operand_ushort(row_req))) + return error_code; + + if ((error_code = add_command_operand_str((uchar*)stmt.ptr(), stmt.length()))) + return error_code; + + if ((error_code = add_command_operand_str(fieldtype, fields))) + return error_code; + + if ((error_code = add_command_operand_str(field_metadata, + field_metadata_size))) + return error_code; + + // This variable length string calls for an additional store w/o lcb lenth prefix. + if ((error_code = add_command_operand_vlstr(null_bits, null_bits_size))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + return allocate_cursor(&xpand_net, row_req, scan); +} + +/** + * @brief + * Sends a command to initiate UPDATE. + * @details + * Sends a command over mysql protocol connection to initiate an + * UPDATE query using a query text. + * @args + * stmt& Query text to send + * dbname current working database + * dbname ¤t database name + **/ +int xpand_connection::update_query(String &stmt, LEX_CSTRING &dbname, + ulonglong *oids, ulonglong *affected_rows) +{ + int error_code; + command_length = 0; + + if ((error_code = begin_command(XPAND_UPDATE_QUERY))) + return error_code; + + do { + if ((error_code = add_command_operand_ulonglong(*oids))) + return error_code; + } + while (*oids++); + + if ((error_code = add_command_operand_str((uchar*)dbname.str, dbname.length))) + return error_code; + + if ((error_code = add_command_operand_str((uchar*)stmt.ptr(), stmt.length()))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + error_code = read_query_response(); + if (!error_code) + *affected_rows = xpand_net.affected_rows; + + return error_code; +} + +int xpand_connection::scan_from_key(ulonglong xpand_table_oid, uint index, + xpand_lock_mode_t lock_mode, + enum scan_type scan_dir, + int no_key_cols, bool sorted_scan, + MY_BITMAP *read_set, uchar *packed_key, + ulong packed_key_length, ushort row_req, + xpand_connection_cursor **scan) +{ + int error_code; + command_length = 0; + + // row based commands should not be called with auto commit. + if (trans_flags & XPAND_TRANS_AUTOCOMMIT) + return HA_ERR_INTERNAL_ERROR; + + if ((error_code = begin_command(XPAND_SCAN_FROM_KEY))) + return error_code; + + if ((error_code = add_command_operand_ushort(row_req))) + return error_code; + + if ((error_code = add_command_operand_ulonglong(xpand_table_oid))) + return error_code; + + if ((error_code = add_command_operand_uint(index))) + return error_code; + + if ((error_code = add_command_operand_uchar((uchar)lock_mode))) + return error_code; + + if ((error_code = add_command_operand_uchar(scan_dir))) + return error_code; + + if ((error_code = add_command_operand_uint(no_key_cols))) + return error_code; + + if ((error_code = add_command_operand_uchar(sorted_scan))) + return error_code; + + if ((error_code = add_command_operand_str(packed_key, packed_key_length))) + return error_code; + + if ((error_code = add_command_operand_bitmap(read_set))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + return allocate_cursor(&xpand_net, row_req, scan); +} + +int xpand_connection::scan_next(xpand_connection_cursor *scan, + uchar **rowdata, ulong *rowdata_length) +{ + *rowdata = scan->retrieve_row(rowdata_length); + if (*rowdata) + return 0; + + if (scan->eof_reached) + return HA_ERR_END_OF_FILE; + + int error_code; + command_length = 0; + + if ((error_code = begin_command(XPAND_SCAN_NEXT))) + return error_code; + + // This should not happen as @@xpand_row_buffer has this limit. + if (scan->buffer_size > 65535) + return HA_ERR_INTERNAL_ERROR; + + if ((error_code = add_command_operand_ushort((ushort)scan->buffer_size))) + return error_code; + + if ((error_code = add_command_operand_lcb(scan->scan_refid))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + bool stmt_completed = FALSE; + error_code = scan->load_rows(&stmt_completed); + if (stmt_completed) + auto_commit_closed(); + if (error_code) + return error_code; + + *rowdata = scan->retrieve_row(rowdata_length); + if (!*rowdata) + return HA_ERR_END_OF_FILE; + + return 0; +} + +int xpand_connection::scan_end(xpand_connection_cursor *scan) +{ + int error_code; + command_length = 0; + ulonglong scan_refid = scan->scan_refid; + bool eof_reached = scan->eof_reached; + delete scan; + + if (eof_reached) + return 0; + + if ((error_code = begin_command(XPAND_SCAN_STOP))) + return error_code; + + if ((error_code = add_command_operand_lcb(scan_refid))) + return error_code; + + if ((error_code = send_command())) + return error_code; + + return read_query_response(); +} + +int xpand_connection::populate_table_list(LEX_CSTRING *db, + handlerton::discovered_list *result) +{ + int error_code = 0; + String stmt; + stmt.append("SHOW FULL TABLES FROM "); + stmt.append(db); + stmt.append(" WHERE table_type = 'BASE TABLE'"); + + if (mysql_real_query(&xpand_net, stmt.c_ptr(), stmt.length())) { + int error_code = mysql_errno(&xpand_net); + if (error_code == ER_BAD_DB_ERROR) + return 0; + else + return error_code; + } + + MYSQL_RES *results = mysql_store_result(&xpand_net); + if (mysql_num_fields(results) != 2) { + error_code = HA_ERR_CORRUPT_EVENT; + goto error; + } + + MYSQL_ROW row; + while((row = mysql_fetch_row(results))) + result->add_table(row[0], strlen(row[0])); + +error: + mysql_free_result(results); + return error_code; +} + + +/* + Given a table name, find its OID in the Clustrix, and save it in TABLE_SHARE + + @param db Database name + @param name Table name + @param oid OUT Return the OID here + @param share INOUT If not NULL and the share has ha_share pointer, also + update Xpand_share::xpand_table_oid. + + @return + 0 - OK + error code if an error occurred +*/ + +int xpand_connection::get_table_oid(const char *db, size_t db_len, + const char *name, size_t name_len, + ulonglong *oid, TABLE_SHARE *share) +{ + MYSQL_ROW row; + int error_code = 0; + MYSQL_RES *results_oid = NULL; + String get_oid; + DBUG_ENTER("xpand_connection::get_table_oid"); + + /* get oid */ + get_oid.append("select r.table " + "from system.databases d " + " inner join ""system.relations r on d.db = r.db " + "where d.name = '"); + get_oid.append(db, db_len); + get_oid.append("' and r.name = '"); + get_oid.append(name, name_len); + get_oid.append("'"); + + if (mysql_real_query(&xpand_net, get_oid.c_ptr(), get_oid.length())) { + if ((error_code = mysql_errno(&xpand_net))) { + DBUG_PRINT("mysql_real_query returns ", ("%d", error_code)); + error_code = HA_ERR_NO_SUCH_TABLE; + goto error; + } + } + + results_oid = mysql_store_result(&xpand_net); + DBUG_PRINT("oid results", + ("rows: %llu, fields: %u", mysql_num_rows(results_oid), + mysql_num_fields(results_oid))); + + if (mysql_num_rows(results_oid) != 1) { + error_code = HA_ERR_NO_SUCH_TABLE; + goto error; + } + + if ((row = mysql_fetch_row(results_oid))) { + DBUG_PRINT("row", ("%s", row[0])); + *oid = strtoull((const char *)row[0], NULL, 10); + } else { + error_code = HA_ERR_NO_SUCH_TABLE; + goto error; + } + +error: + if (results_oid) + mysql_free_result(results_oid); + + DBUG_RETURN(error_code); +} + + +/* + Given a table name, fetch table definition from Clustrix and fill the TABLE_SHARE + object with details about field, indexes, etc. +*/ +int xpand_connection::discover_table_details(LEX_CSTRING *db, LEX_CSTRING *name, + THD *thd, TABLE_SHARE *share) +{ + DBUG_ENTER("xpand_connection::discover_table_details"); + int error_code = 0; + MYSQL_RES *results_create = NULL; + MYSQL_ROW row; + String show; + ulonglong oid = 0; + Xpand_share *cs; + + if ((error_code = xpand_connection::get_table_oid(db->str, db->length, + name->str, name->length, + &oid, share))) + goto error; + + if (!share->ha_share) + share->ha_share= new Xpand_share; + cs= static_cast<Xpand_share*>(share->ha_share); + cs->xpand_table_oid = oid; + + /* get show create statement */ + show.append("show simple create table "); + show.append(db); + show.append("."); + show.append("`"); + show.append(name); + show.append("`"); + if (mysql_real_query(&xpand_net, show.c_ptr(), show.length())) { + if ((error_code = mysql_errno(&xpand_net))) { + DBUG_PRINT("mysql_real_query returns ", ("%d", error_code)); + error_code = HA_ERR_NO_SUCH_TABLE; + goto error; + } + } + + results_create = mysql_store_result(&xpand_net); + DBUG_PRINT("show table results", + ("rows: %llu, fields: %u", mysql_num_rows(results_create), + mysql_num_fields(results_create))); + + if (mysql_num_rows(results_create) != 1) { + error_code = HA_ERR_NO_SUCH_TABLE; + goto error; + } + + if (mysql_num_fields(results_create) != 2) { + error_code = HA_ERR_CORRUPT_EVENT; + goto error; + } + + while((row = mysql_fetch_row(results_create))) { + DBUG_PRINT("row", ("%s - %s", row[0], row[1])); + error_code = share->init_from_sql_statement_string(thd, false, row[1], + strlen(row[1])); + } + + cs->rediscover_table = false; +error: + if (results_create) + mysql_free_result(results_create); + DBUG_RETURN(error_code); +} + +#define COMMAND_BUFFER_SIZE_INCREMENT 1024 +#define COMMAND_BUFFER_SIZE_INCREMENT_BITS 10 +int xpand_connection::expand_command_buffer(size_t add_length) +{ + size_t expanded_length; + + if (command_buffer_length >= command_length + add_length) + return 0; + + expanded_length = command_buffer_length + + ((add_length >> COMMAND_BUFFER_SIZE_INCREMENT_BITS) + << COMMAND_BUFFER_SIZE_INCREMENT_BITS) + + COMMAND_BUFFER_SIZE_INCREMENT; + + if (!command_buffer_length) + command_buffer = (uchar *) my_malloc(expanded_length, MYF(MY_WME)); + else + command_buffer = (uchar *) my_realloc(command_buffer, expanded_length, + MYF(MY_WME)); + if (!command_buffer) + return HA_ERR_OUT_OF_MEM; + + command_buffer_length = expanded_length; + + return 0; +} + +int xpand_connection::add_command_operand_uchar(uchar value) +{ + int error_code = expand_command_buffer(sizeof(value)); + if (error_code) + return error_code; + + memcpy(command_buffer + command_length, &value, sizeof(value)); + command_length += sizeof(value); + + return 0; +} + +int xpand_connection::add_command_operand_ushort(ushort value) +{ + ushort be_value = htobe16(value); + int error_code = expand_command_buffer(sizeof(be_value)); + if (error_code) + return error_code; + + memcpy(command_buffer + command_length, &be_value, sizeof(be_value)); + command_length += sizeof(be_value); + return 0; +} + +int xpand_connection::add_command_operand_uint(uint value) +{ + uint be_value = htobe32(value); + int error_code = expand_command_buffer(sizeof(be_value)); + if (error_code) + return error_code; + + memcpy(command_buffer + command_length, &be_value, sizeof(be_value)); + command_length += sizeof(be_value); + return 0; +} + +int xpand_connection::add_command_operand_ulonglong(ulonglong value) +{ + ulonglong be_value = htobe64(value); + int error_code = expand_command_buffer(sizeof(be_value)); + if (error_code) + return error_code; + + memcpy(command_buffer + command_length, &be_value, sizeof(be_value)); + command_length += sizeof(be_value); + return 0; +} + +int xpand_connection::add_command_operand_lcb(ulonglong value) +{ + int len = net_length_size(value); + int error_code = expand_command_buffer(len); + if (error_code) + return error_code; + + net_store_length(command_buffer + command_length, value); + command_length += len; + return 0; +} + +int xpand_connection::add_command_operand_str(const uchar *str, + size_t str_length) +{ + int error_code = add_command_operand_lcb(str_length); + if (error_code) + return error_code; + + if (!str_length) + return 0; + + error_code = expand_command_buffer(str_length); + if (error_code) + return error_code; + + memcpy(command_buffer + command_length, str, str_length); + command_length += str_length; + return 0; +} + +/** + * @brief + * Puts variable length string into the buffer. + * @details + * Puts into the buffer variable length string the size + * of which is send by other means. For details see + * MDB Client/Server Protocol. + * @args + * str - string to send + * str_length - size + **/ +int xpand_connection::add_command_operand_vlstr(const uchar *str, + size_t str_length) +{ + int error_code = expand_command_buffer(str_length); + if (error_code) + return error_code; + + memcpy(command_buffer + command_length, str, str_length); + command_length += str_length; + return 0; +} + +int xpand_connection::add_command_operand_lex_string(LEX_CSTRING str) +{ + return add_command_operand_str((const uchar *)str.str, str.length); +} + +int xpand_connection::add_command_operand_bitmap(MY_BITMAP *bitmap) +{ + int error_code = add_command_operand_lcb(bitmap->n_bits); + if (error_code) + return error_code; + + int no_bytes = no_bytes_in_map(bitmap); + error_code = expand_command_buffer(no_bytes); + if (error_code) + return error_code; + + memcpy(command_buffer + command_length, bitmap->bitmap, no_bytes); + command_length += no_bytes; + return 0; +} + +/**************************************************************************** +** Class xpand_host_list +****************************************************************************/ + +int xpand_host_list::fill(const char *hosts) +{ + strtok_buf = my_strdup(hosts, MYF(MY_WME)); + if (!strtok_buf) { + return HA_ERR_OUT_OF_MEM; + } + + const char *sep = ",; "; + //parse into array + int i = 0; + char *cursor = NULL; + char *token = NULL; + for (token = strtok_r(strtok_buf, sep, &cursor); + token && i < max_host_count; + token = strtok_r(NULL, sep, &cursor)) { + this->hosts[i] = token; + i++; + } + + //host count out of range + if (i == 0 || token) { + my_free(strtok_buf); + return ER_BAD_HOST_ERROR; + } + hosts_len = i; + + return 0; +} + +void xpand_host_list::empty() +{ + my_free(strtok_buf); + strtok_buf = NULL; + hosts_len = 0; +} |