diff options
author | Seppo Jaakola <seppo.jaakola@codership.com> | 2012-04-26 13:09:06 +0300 |
---|---|---|
committer | Seppo Jaakola <seppo.jaakola@codership.com> | 2012-04-26 13:09:06 +0300 |
commit | f96fd3f40f37c0080e71e45f85e53bd156aa27f5 (patch) | |
tree | 7aa68fd52aed378a26ef8ae804e78fdbc0e99bd5 | |
parent | 2fc1ec43560b453b4694adbc1aac11f3f23b1761 (diff) | |
download | mariadb-git-f96fd3f40f37c0080e71e45f85e53bd156aa27f5.tar.gz |
Added wsrep specific files
-rwxr-xr-x | BUILD/compile-amd64-debug-wsrep | 11 | ||||
-rwxr-xr-x | BUILD/compile-amd64-wsrep | 9 | ||||
-rwxr-xr-x | BUILD/compile-pentium-debug-wsrep | 12 | ||||
-rwxr-xr-x | BUILD/compile-pentium-wsrep | 11 | ||||
-rwxr-xr-x | BUILD/compile-pentium64-wsrep | 28 | ||||
-rw-r--r-- | Docs/README-wsrep | 466 | ||||
-rw-r--r-- | cmake/wsrep.cmake | 59 | ||||
-rw-r--r-- | scripts/wsrep_sst_mysqldump.sh | 133 | ||||
-rw-r--r-- | scripts/wsrep_sst_rsync.sh | 214 | ||||
-rw-r--r-- | sql/wsrep_check_opts.cc | 392 | ||||
-rw-r--r-- | sql/wsrep_hton.cc | 419 | ||||
-rw-r--r-- | sql/wsrep_mysqld.cc | 1060 | ||||
-rw-r--r-- | sql/wsrep_mysqld.h | 297 | ||||
-rw-r--r-- | sql/wsrep_notify.cc | 107 | ||||
-rw-r--r-- | sql/wsrep_priv.h | 231 | ||||
-rw-r--r-- | sql/wsrep_sst.cc | 932 | ||||
-rw-r--r-- | sql/wsrep_utils.cc | 447 | ||||
-rw-r--r-- | sql/wsrep_var.cc | 505 | ||||
-rw-r--r-- | support-files/wsrep.cnf.sh | 125 | ||||
-rw-r--r-- | support-files/wsrep_notify.sh | 102 | ||||
-rw-r--r-- | wsrep/CMakeLists.txt | 24 | ||||
-rw-r--r-- | wsrep/Makefile.am | 2 | ||||
-rw-r--r-- | wsrep/wsrep_api.h | 875 | ||||
-rw-r--r-- | wsrep/wsrep_dummy.c | 368 | ||||
-rw-r--r-- | wsrep/wsrep_loader.c | 199 | ||||
-rw-r--r-- | wsrep/wsrep_uuid.c | 78 |
26 files changed, 7106 insertions, 0 deletions
diff --git a/BUILD/compile-amd64-debug-wsrep b/BUILD/compile-amd64-debug-wsrep new file mode 100755 index 00000000000..995a8afcca9 --- /dev/null +++ b/BUILD/compile-amd64-debug-wsrep @@ -0,0 +1,11 @@ +#! /bin/sh + +path=`dirname $0` +. "$path/SETUP.sh" + +extra_flags="$amd64_cflags $debug_cflags -g -O0 $wsrep_cflags" +c_warnings="$c_warnings $debug_extra_warnings" +cxx_warnings="$cxx_warnings $debug_extra_warnings" +extra_configs="$amd64_configs $debug_configs $wsrep_configs --with-wsrep" + +. "$path/FINISH.sh" diff --git a/BUILD/compile-amd64-wsrep b/BUILD/compile-amd64-wsrep new file mode 100755 index 00000000000..57dfbdd6da2 --- /dev/null +++ b/BUILD/compile-amd64-wsrep @@ -0,0 +1,9 @@ +#! /bin/sh + +path=`dirname $0` +. "$path/SETUP.sh" + +extra_flags="$amd64_cflags $fast_cflags -g $wsrep_cflags" +extra_configs="$amd64_configs $wsrep_configs --with-wsrep" + +. "$path/FINISH.sh" diff --git a/BUILD/compile-pentium-debug-wsrep b/BUILD/compile-pentium-debug-wsrep new file mode 100755 index 00000000000..ee68e3fd0c1 --- /dev/null +++ b/BUILD/compile-pentium-debug-wsrep @@ -0,0 +1,12 @@ +#! /bin/sh -x + +path=`dirname $0` +set -- "$@" --with-debug=full +. "$path/SETUP.sh" + +extra_flags="$pentium_cflags $debug_cflags -g -O0 $wsrep_cflags" +c_warnings="$c_warnings $debug_extra_warnings" +cxx_warnings="$cxx_warnings $debug_extra_warnings" +extra_configs="$pentium_configs $debug_configs $wsrep_configs --with-wsrep" + +. "$path/FINISH.sh" diff --git a/BUILD/compile-pentium-wsrep b/BUILD/compile-pentium-wsrep new file mode 100755 index 00000000000..eeb14310e4e --- /dev/null +++ b/BUILD/compile-pentium-wsrep @@ -0,0 +1,11 @@ +#! /bin/sh + +path=`dirname $0` +. "$path/SETUP.sh" + +extra_flags="$pentium_cflags $fast_cflags $wsrep_cflags" +extra_configs="$pentium_configs $wsrep_configs --with-wsrep" + +#strip=yes + +. "$path/FINISH.sh" diff --git a/BUILD/compile-pentium64-wsrep b/BUILD/compile-pentium64-wsrep new file mode 100755 index 00000000000..0bccb34e753 --- /dev/null +++ b/BUILD/compile-pentium64-wsrep @@ -0,0 +1,28 @@ +#! /bin/sh + +# Copyright (C) 2006, 2007 MySQL AB +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Library General Public +# License as published by the Free Software Foundation; version 2 +# of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Library General Public License for more details. +# +# You should have received a copy of the GNU Library General Public +# License along with this library; if not, write to the Free +# Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, +# MA 02111-1307, USA + +path=`dirname $0` +. "$path/SETUP.sh" + +extra_flags="$pentium64_cflags $fast_cflags -g $wsrep_cflags" +extra_configs="$pentium_configs $static_link $wsrep_configs --with-wsrep" +CC="$CC --pipe" +strip=yes + +. "$path/FINISH.sh" diff --git a/Docs/README-wsrep b/Docs/README-wsrep new file mode 100644 index 00000000000..ea6a62621ed --- /dev/null +++ b/Docs/README-wsrep @@ -0,0 +1,466 @@ +Codership Oy +http://www.codership.com +<info@codership.com> + +DISCLAIMER + +THIS SOFTWARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER +EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES +OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. +IN NO EVENT SHALL CODERSHIP OY BE HELD LIABLE TO ANY PARTY FOR ANY DAMAGES +RESULTING DIRECTLY OR INDIRECTLY FROM THE USE OF THIS SOFTWARE. + +Trademark Information. + +MySQL is a trademark or registered trademark of Oracle and/or its affiliates. +Other trademarks are the property of their respective owners. + +Licensing Information. + +Please see file COPYING that came with this distribution + +Source code can be found at +wsrep API: https://launchpad.net/wsrep +MySQL patch: https://launchpad.net/codership-mysql + + +ABOUT THIS DOCUMENT + +This document covers installation and configuration issues specific to this +wsrep-patched MySQL distribution by Codership. It does not cover the use or +administration of MySQL server per se. The reader is assumed to know how to +install, configure, administer and use standard MySQL server version 5.1.xx. + + + MYSQL-5.5.x/wsrep-23.x + +CONTENTS: +========= +1. WHAT IS WSREP PATCH FOR MYSQL +2. INSTALLATION +3. FIRST TIME SETUP + 3.1 CONFIGURATION FILES + 3.2 DATABASE PRIVILEGES + 3.3 CHECK AND CORRECT FIREWALL SETTINGS + 3.4 SELINUX + 3.5 APPARMOR + 3.6 CONNECT TO CLUSTER +4. UPGRADING FROM MySQL 5.1.x +5. CONFIGURATION OPTIONS + 5.1 MANDATORY MYSQL OPTIONS + 5.2 WSREP OPTIONS +6. ONLINE SCHEMA UPGRADE + 6.1 TOTAL ORDER ISOLATION (TOI) + 6.2 ROLLING SCHEMA UPGRADE (RSU) +7. LIMITATIONS + + +1. WHAT IS WSREP PATCH FOR MYSQL/INNODB + +Wsrep API developed by Codership Oy is a modern generic (database-agnostic) +replication API for transactional databases with a goal to make database +replication/logging subsystem completely modular and pluggable. It is developed +with flexibility and completeness in mind to satisfy broad range of modern +replication scenarios. It is equally suitable for synchronous and asynchronous, +master-slave and multi-master replication. + +wsrep stands for Write Set REPlication. + +Wsrep patch for MySQL/InnoDB allows MySQL server to load and use various wsrep +API implementations ("wsrep providers") with different qualities of service. +Without wsrep provider MySQL-wsrep server will function like a regular +standalone server. + + +2. INSTALLATION + +In the examples below mysql authentication options are omitted for brevity. + +2.1 Download and install mysql-wsrep package. + +Download binary package for your Linux distribution from +https://launchpad.net/codership-mysql/ + +2.1.1 On Debian and Debian-derived distributions. + +Upgrade from mysql-server-5.0 to mysql-wsrep is not supported yet, please +upgrade to mysql-server-5.1 first. + +If you're installing over an existing mysql installation, mysql-server-wsrep +will conflict with mysql-server-5.1 package, so remove it first: + +$ sudo apt-get remove mysql-server-5.1 mysql-server-core-5.1 + +mysql-server-wsrep requires psmisc and mysql-client-5.1.47 (or later). +MySQL 5.1 packages can be found from backports repositories. +For further information about configuring and using Debian or Ubuntu +backports, see: + +* http://backports.debian.org + +* https://help.ubuntu.com/community/UbuntuBackports + +For example, installation of required packages on Debian Lenny: + +$ sudo apt-get install psmisc +$ sudo apt-get -t lenny-backports install mysql-client-5.1 + +Now you should be able to install mysql-wsrep package: + +$ sudo dpkg -i <mysql-server-wsrep DEB> + +2.1.2 On CentOS and similar RPM-based distributions. + +If you're migrating from existing MySQL installation, there are two variants: + + a) If you're already using official MySQL-server-community 5.1.x RPM from + Oracle: + + # rpm -e mysql-server + + b) If you're upgrading from the stock mysql-5.0.77 on CentOS: + + 1) Make sure that the following packages are not installed: + # rpm --nodeps --allmatches -e mysql-server mysql-test mysql-bench + + 2) Install *official* MySQL-shared-compat-5.1.x from + http://dev.mysql.com/downloads/mysql/5.1.html + +Actual installation: + + # rpm -Uvh <MySQL-server-wsrep RPM> + + If this fails due to unsatisfied dependencies, install missing packages + (e.g. yum install perl-DBI) and retry. + +Additional packages to consider (if not yet installed): + * galera (multi-master replication provider, https://launchpad.net/galera) + * MySQL-client-community (for connecting to server and mysqldump-based SST) + * rsync (for rsync-based SST) + +2.2 Upgrade system tables. + +If you're upgrading a previous MySQL installation, it might be advisable to +upgrade system tables. To do that start mysqld and run mysql_upgrade command. +Consult MySQL documentation in case of errors. Normally they are not critical +and can be ignored unless specific functionality is needed. + + +3. FIRST TIME SETUP + +Unless you're upgrading an already installed mysql-wsrep package, you will need +to set up a few things to prepare server for operation. + +3.1 CONFIGURATION FILES + +* Make sure system-wide my.cnf does not bind mysqld to 127.0.0.1. That is, if + you have the following line in [mysqld] section, comment it out: + + #bind-address = 127.0.0.1 + +* Make sure system-wide my.cnf contains "!includedir /etc/mysql/conf.d/" line. + +* Edit /etc/mysql/conf.d/wsrep.cnf and set wsrep_provider option by specifying + a path to provider library. If you don't have a provider, leave it as it is. + +* When a new node joins the cluster it'll have to receive a state snapshot from + one of the peers. This requires a privileged MySQL account with access from + the rest of the cluster. Edit /etc/mysql/conf.d/wsrep.cnf and set mysql + login/password pair for SST, for example: + + wsrep_sst_auth=wsrep_sst:wspass + +* See CONFIGURATION section below about other configuration parameters that you + might want to change at this point. + +3.2 DATABASE PRIVILEGES + +Restart MySQL server and connect to it as root to grant privileges to SST +account (empty users confuse MySQL authentication matching rules, we need to +delete them too): + +$ mysql -e "SET wsrep_on=OFF; DELETE FROM mysql.user WHERE user='';" +$ mysql -e "SET wsrep_on=OFF; GRANT ALL ON *.* TO wsrep_sst@'%' IDENTIFIED BY 'wspass'"; + +3.3 CHECK AND CORRECT FIREWALL SETTINGS. + +MySQL-wsrep server needs to be accessible from other cluster members through +its client listening socket and through wsrep provider socket. See your +distribution and wsrep provider documentation for details. For example on +CentOS you might need to do something along these lines: + +# iptables --insert RH-Firewall-1-INPUT 1 --proto tcp --source <my IP>/24 --destination <my IP>/32 --dport 3306 -j ACCEPT +# iptables --insert RH-Firewall-1-INPUT 1 --proto tcp --source <my IP>/24 --destination <my IP>/32 --dport 4567 -j ACCEPT + +If there is a NAT firewall between the nodes, it must be configured to allow +direct connections between the nodes (e.g. via port forwarding). + +3.4 SELINUX + +If you have SELinux enabled, it may block mysqld from doing required operations. +You'll need to either disable it or configure to allow mysqld to run external +programs and open listen sockets at unprivileged ports (i.e. things that +an unprivileged user can do). See SELinux documentation about it. + +To quickly disable SELinux: +1) run 'setenforce 0' as root. +2) set 'SELINUX=permissive' in /etc/selinux/config + +3.5 APPARMOR + +AppArmor automatically comes with Ubuntu and may also prevent mysqld to from +opening additional ports or run scripts. See AppArmor documentation about its +configuration. To disable AppArmor for mysqld: + +$ cd /etc/apparmor.d/disable/ +$ sudo ln -s /etc/apparmor.d/usr.sbin.mysqld +$ sudo service apparmor restart + + +3.6 CONNECT TO CLUSTER + +Now you're ready to connect to cluster by setting wsrep_cluster_address variable +and monitor status of wsrep provider: + +mysql> SET GLOBAL wsrep_cluster_address='<cluster address string>'; +mysql> SHOW STATUS LIKE 'wsrep%'; + + +4 UPGRADING FROM MySQL 5.1.x + +!!! THESE INSTRUCTIONS ARE PRELIMINARY AND INCOMPLETE !!! + +1) BEFORE UPGRADE (while running 5.1.x): + - comment out 'wsrep_provider' setting from configuration files + (my.cnf and/or wsrep.cnf) + - If performing a rolling upgrade on a running cluster, set + wsrep_sst_method=mysqldump. + You might also need to configure wsrep_sst_receive_address and + wsrep_sst_auth appropriately. Mysqldump is the only way to transfer data + from 5.1.x to 5.5.x reliably. + - remove innodb_plugin settings from configuration files. + +2) Perform upgrade as usual: + http://dev.mysql.com/doc/refman/5.5/en/upgrading-from-previous-series.html + Don't forget to run 'mysql_upgrade' command. + +3) AFTER UPGRADING individual node: + - uncomment 'wsrep_provider' line in configuration file. + - restart the server and join the cluster. + +4) AFTER UPGRADING the whole cluster: + - revert to usual wsrep SST settings if not 'mysqldump'. + + +5. CONFIGURATION OPTIONS + +5.1 MANDATORY MYSQL OPTIONS + +binlog_format=ROW + This option is required to use row-level replication as opposed to + statement-level. For performance and consistency considerations don't change + that. As a side effect, binlog, if turned on, can be ROW only. In future this + option won't have special meaning. + +innodb_autoinc_lock_mode=2 + This is a required parameter. Without it INSERTs into tables with + AUTO_INCREMENT column may fail. + autoinc lock modes 0 and 1 can cause unresolved deadlock, and make + system unresponsive. + +innodb_locks_unsafe_for_binlog=1 + This option is required for parallel applying. + +5.2 WSREP OPTIONS + +All options are optional except for wsrep_provider, wsrep_cluster_address, and +wsrep_sst_auth. + +wsrep_provider=none + A full path to the library that implements WSREP interface. If none is + specified, the server behaves like a regular mysqld. + +wsrep_provider_options= + Provider-specific option string. Check wsrep provider documentation or + http://www.codership.com/wiki + +wsrep_cluster_address= + Provider-specific cluster address string. This is used to connect a node to + the desired cluster. This option can be given either on mysqld startup or set + during runtime. See wsrep provider documentation for possible values. + +wsrep_cluster_name="my_wsrep_cluster" + Logical cluster name, must be the same for all nodes of the cluster. + +wsrep_node_name= + Human readable node name (for easier log reading only). Defaults to hostname. + +wsrep_slave_threads=1 + Number of threads dedicated to processing of writesets from other nodes. + For best performance should be few per CPU core. + +wsrep_dbug_option + Options for the built-in DBUG library (independent from what MySQL uses). + Empty by default. Not currently in use. + +wsrep_debug=0 + Enable debug-level logging. + +wsrep_convert_LOCK_to_trx=0 + Implicitly convert locking sessions into transactions inside mysqld. By + itself it does not mean support for locking sessions, but it prevents the + database from going into logically inconsistent state. Note however, that + loading large database dump with LOCK statements might result in abnormally + large transactions and cause an out-of-memory condition + +wsrep_retry_autocommit=1 + Retry autocommit queries and single statement transactions should they fail + certification test. This is analogous to rescheduling an autocommit query + should it go into deadlock with other transactions in the database lock + manager. + +wsrep_auto_increment_control=1 + Automatically adjust auto_increment_increment and auto_increment_offset + variables based on the number of nodes in the cluster. Significantly reduces + certification conflict rate for INSERTS. + +wsrep_drupal_282555_workaround=1 + MySQL seems to have an obscure bug when INSERT into table with + AUTO_INCREMENT column with NULL value for that column can fail with a + duplicate key error. When this option is on, it retries such INSERTs. + Required for stable Drupal operation. Documented at: + http://bugs.mysql.com/bug.php?id=41984 + http://drupal.org/node/282555 + +wsrep_causal_reads=0 + Enforce strict READ COMMITTED semantics on reads and transactions. May + result in additional latencies. It is a session variable. + +wsrep_OSU_method=TOI + Online Schema Upgrade (OSU) can be performed with two alternative methods: + Total Order Isolation (TOI) runs DDL statement in all cluster nodes in + same total order sequence locking the affected table for the duration of the + operation. This may result in the whole cluster being blocked for the + duration of the operation. + Rolling Schema Upgrade (RSU) executes the DDL statement only locally, thus + blocking only one cluster node. During the DDL processing, the node + is not replicating and may be unable to process replication events (due to + table lock). Once DDL operation is complete, the node will catch up and sync + with the cluster to become fully operational again. The DDL statement or + its effects are not replicated, so it is user's responsibility to manually + perform this operation on each of the nodes. + +wsrep_forced_binlog_format=none + Force every transaction to use given binlog format. When this variable is + set to something else than NONE, all transactions will use the given forced + format, regardless of what the client session has specified in binlog_format. + Valid choices for wsrep_forced_binlog_format are: ROW, STATEMENT, MIXED and + special value NONE, meaning that there is no forced binlog format in effect. + This variable was intruduced to support STATEMENT format replication during + rolling schema upgrade processing. However, in most cases ROW replication + is valid for asymmetrict schema replication. + +State snapshot transfer options. + +When a new node joins the cluster it has to synchronize its initial state with +the other cluster members by transferring state snapshot from one of them. +The options below govern how this happens and should be set up before attempting +to join or start a cluster. + +wsrep_sst_method=mysqldump + What method to use to copy database state to a newly joined node. Supported + methods: + - mysqldump: slow (except for small datasets) but most tested. + - rsync: much faster on large datasets. + - rsync_wan: same as rsync but with deltaxfer to minimize network traffic. + +wsrep_sst_receive_address= + Address (hostname:port) at which this node wants to receive state snapshot. + Defaults to mysqld bind address, and if that is not specified (0.0.0.0) - + to the first IP of eth0 + mysqld bind port. + NOTE: check that your firewall allows connections to this address from other + cluster nodes. + +wsrep_sst_auth= + Authentication information needed for state transfer. Depends on the state + transfer method. For mysqldump-based SST it is + <mysql_root_user>:<mysql_root_password> + and should be the same on all nodes - it is used to authenticate with both + state snapshot receiver and state snapshot donor. + +wsrep_sst_donor= + A name of the node which should serve as state snapshot donor. This allows + to control which node will serve state snapshot request. By default the + most suitable node is chosen by wsrep provider. This is the same as given in + wsrep_node_name. + + +6. ONLINE SCHEMA UPGRADE + + Schema upgrades mean any data definition statements (DDL statemnents) run + for the database. They change the database structure and are non- + transactional. + + Release 22.3 brings a new method for performing schema upgrades. User can + now choose whether to use the traditional total order isolation or new + rolling schema upgrade method. The OSU method choice is done by global + parameter: 'wsrep_OSU_method'. + +6.1 Total Order Isolation (TOI) + + With earlier releases, DDL processing happened always by Total Order + Isolation (TOI) method. With TOI, the DDL was scheduled to be processed in + same transaction seqeuncing 'slot' in each cluster node. + The processing is secured by locking the affected table from any other use. + With TOI method, the whole cluster has part of the database locked for the + duration of the DDL processing. + +6.2 Rolling Schema Upgrade (RSU) + + Rolling schema upgrade is new DDL processing method, where DDL will be + processed locally for the node. The node is disconnected of the replication + for the duration of the DDL processing, so that there is only DDL statement + processing in the node and it does not block the rest of the cluster. When + the DDL processing is complete, the node applies delayed replication events + and synchronizes back with the cluster. + The DDL can then be executed cluster-wide by running the same DDL statement + for each node in turn. When this rolling schema upgrade proceeds, part of + the cluster will have old schema structure and part of the cluster will have + new schema structure. + + +7. LIMITATIONS + +1) Currently replication works only with InnoDB storage engine. Any writes to + tables of other types, including system (mysql.*) tables are not replicated. + However, DDL statements are replicated in statement level, and changes + to mysql.* tables will get replicated that way. + So, you can safely issue: CREATE USER..., + but issuing: INSERT INTO mysql.user..., will not be replicated. + +2) DELETE operation is unsupported on tables without primary key. Also rows in + tables without primary key may appear in different order on different nodes. + As a result SELECT...LIMIT... may return slightly different sets. + +3) Unsupported queries: + * LOCK/UNLOCK TABLES cannot be supported in multi-master setups. + * lock functions (GET_LOCK(), RELEASE_LOCK()... ) + +4) Query log cannot be directed to table. If you enable query logging, + you must forward the log to a file: + log_output = FILE + Use general_log and general_log_file to choose query logging and the + log file name + +5) Maximum allowed transaction size is defined by wsrep_max_ws_rows and + wsrep_max_ws_size. Anything bigger (e.g. huge LOAD DATA) will be rejected. + +6) Due to cluster level optimistic concurrency control, transaction issuing + COMMIT may still be aborted at that stage. There can be two transactions. + writing to same rows and committing in separate cluster nodes, and only one + of the them can successfully commit. The failing one will be aborted. + For cluster level aborts, MySQL/galera cluster gives back deadlock error. + code (Error: 1213 SQLSTATE: 40001 (ER_LOCK_DEADLOCK)). + +7) XA transactions can not be supported due to possible rollback on commit. + diff --git a/cmake/wsrep.cmake b/cmake/wsrep.cmake new file mode 100644 index 00000000000..1fd747cddd6 --- /dev/null +++ b/cmake/wsrep.cmake @@ -0,0 +1,59 @@ +# Copyright (c) 2011, Codership Oy <info@codership.com>. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +# We need to generate a proper spec file even without --with-wsrep flag, +# so WSREP_VERSION is produced regardless + +# Set the patch version +SET(WSREP_PATCH_VERSION "5") + +# Obtain patch revision number +SET(WSREP_PATCH_REVNO $ENV{WSREP_REV}) +IF(NOT WSREP_PATCH_REVNO) + EXECUTE_PROCESS( + COMMAND bzr revno + OUTPUT_VARIABLE WSREP_PATCH_REVNO + RESULT_VARIABLE RESULT + ) +STRING(REGEX REPLACE "(\r?\n)+$" "" WSREP_PATCH_REVNO "${WSREP_PATCH_REVNO}") +#FILE(WRITE "wsrep_config" "Debug: WSREP_PATCH_REVNO result: ${RESULT}\n") +ENDIF() +IF(NOT WSREP_PATCH_REVNO) + SET(WSREP_PATCH_REVNO "XXXX") +ENDIF() + +# Obtain wsrep API version +EXECUTE_PROCESS( + COMMAND sh -c "grep WSREP_INTERFACE_VERSION ${MySQL_SOURCE_DIR}/wsrep/wsrep_api.h | cut -d '\"' -f 2" + OUTPUT_VARIABLE WSREP_API_VERSION + RESULT_VARIABLE RESULT +) +#FILE(WRITE "wsrep_config" "Debug: WSREP_API_VERSION result: ${RESULT}\n") +STRING(REGEX REPLACE "(\r?\n)+$" "" WSREP_API_VERSION "${WSREP_API_VERSION}") + +SET(WSREP_VERSION + "${WSREP_API_VERSION}.${WSREP_PATCH_VERSION}.r${WSREP_PATCH_REVNO}" +) + +OPTION(WITH_WSREP "WSREP replication API (to use, e.g. Galera Replication library)" OFF) +IF (WITH_WSREP) + SET(WSREP_C_FLAGS "-DWITH_WSREP -DWSREP_PROC_INFO -DMYSQL_MAX_VARIABLE_VALUE_LEN=2048") + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${WSREP_C_FLAGS}") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${WSREP_C_FLAGS}") + SET(COMPILATION_COMMENT "${COMPILATION_COMMENT}, wsrep_${WSREP_VERSION}") + SET(WITH_EMBEDDED_SERVER OFF) +ENDIF() + +# diff --git a/scripts/wsrep_sst_mysqldump.sh b/scripts/wsrep_sst_mysqldump.sh new file mode 100644 index 00000000000..8106850e918 --- /dev/null +++ b/scripts/wsrep_sst_mysqldump.sh @@ -0,0 +1,133 @@ +#!/bin/sh -e +# Copyright (C) 2009 Codership Oy +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; see the file COPYING. If not, write to the +# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston +# MA 02110-1301 USA. + +# This is a reference script for mysqldump-based state snapshot tansfer + +USER=$1 +PSWD=$2 +HOST=$3 +PORT=$4 +LOCAL_HOST="127.0.0.1" +LOCAL_PORT=$5 +UUID=$6 +SEQNO=$7 +BYPASS=$8 + +EINVAL=22 + +err() +{ + echo "SST error: $*" >&2 +} + +local_ip() +{ + PATH=$PATH:/usr/sbin:/usr/bin:/sbin:/bin + + [ "$1" = "127.0.0.1" ] && return 0 + [ "$1" = "localhost" ] && return 0 + [ "$1" = "$(hostname -s)" ] && return 0 + [ "$1" = "$(hostname -f)" ] && return 0 + [ "$1" = "$(hostname -d)" ] && return 0 + + # Now if ip program is not found in the path, we can't return 0 since + # it would block any address. Thankfully grep should fail in this case + ip route get "$1" | grep local >/dev/null && return 0 + + return 1 +} + +if test -z "$USER"; then err "USER cannot be nil"; exit $EINVAL; fi +if test -z "$HOST"; then err "HOST cannot be nil"; exit $EINVAL; fi +if test -z "$PORT"; then err "PORT cannot be nil"; exit $EINVAL; fi +if test -z "$LOCAL_PORT"; then err "LOCAL_PORT cannot be nil"; exit $EINVAL; fi +if test -z "$UUID"; then err "UUID cannot be nil"; exit $EINVAL; fi +if test -z "$SEQNO"; then err "SEQNO cannot be nil"; exit $EINVAL; fi + +if local_ip $HOST && [ "$PORT" = "$LOCAL_PORT" ] +then + err "destination address '$HOST:$PORT' matches source address." + exit $EINVAL +fi + +# Check client version +if ! mysql --version | grep 'Distrib 5.5' >/dev/null +then + mysql --version >&2 + err "this procedure requires MySQL client version 5.5.x" + exit $EINVAL +fi + +AUTH="-u$USER" +if test -n "$PSWD"; then AUTH="$AUTH -p$PSWD"; fi + +STOP_WSREP="SET wsrep_on=OFF;" + +# NOTE: we don't use --routines here because we're dumping mysql.proc table +#MYSQLDUMP="@bindir@/mysqldump $AUTH -h$LOCAL_HOST -P$LOCAL_PORT \ +MYSQLDUMP="mysqldump $AUTH -h$LOCAL_HOST -P$LOCAL_PORT \ +--add-drop-database --add-drop-table --skip-add-locks --create-options \ +--disable-keys --extended-insert --skip-lock-tables --quick --set-charset \ +--skip-comments --flush-privileges --all-databases" + +# mysqldump cannot restore CSV tables, fix this issue +CSV_TABLES_FIX=" +set sql_mode=''; + +USE mysql; + +SET @str = IF (@@have_csv = 'YES', 'CREATE TABLE IF NOT EXISTS general_log (event_time TIMESTAMP NOT NULL, user_host MEDIUMTEXT NOT NULL, thread_id INTEGER NOT NULL, server_id INTEGER UNSIGNED NOT NULL, command_type VARCHAR(64) NOT NULL,argument MEDIUMTEXT NOT NULL) engine=CSV CHARACTER SET utf8 comment=\"General log\"', 'SET @dummy = 0'); + +PREPARE stmt FROM @str; +EXECUTE stmt; +DROP PREPARE stmt; + +SET @str = IF (@@have_csv = 'YES', 'CREATE TABLE IF NOT EXISTS slow_log (start_time TIMESTAMP NOT NULL, user_host MEDIUMTEXT NOT NULL, query_time TIME NOT NULL, lock_time TIME NOT NULL, rows_sent INTEGER NOT NULL, rows_examined INTEGER NOT NULL, db VARCHAR(512) NOT NULL, last_insert_id INTEGER NOT NULL, insert_id INTEGER NOT NULL, server_id INTEGER UNSIGNED NOT NULL, sql_text MEDIUMTEXT NOT NULL) engine=CSV CHARACTER SET utf8 comment=\"Slow log\"', 'SET @dummy = 0'); + +PREPARE stmt FROM @str; +EXECUTE stmt; +DROP PREPARE stmt;" + +SET_START_POSITION="SET GLOBAL wsrep_start_position='$UUID:$SEQNO';" + +#MYSQL="@bindir@/mysql -u'$USER' -p'$PSWD' -h'$HOST' -P'$PORT'" +MYSQL="mysql $AUTH -h$HOST -P$PORT --disable-reconnect --connect_timeout=10" + +# need to disable logging when loading the dump +# reason is that dump contains ALTER TABLE for log tables, and +# this causes an error if logging is enabled +GENERAL_LOG_OPT=`$MYSQL --skip-column-names -e"$STOP_WSREP SELECT @@GENERAL_LOG"` +SLOW_LOG_OPT=`$MYSQL --skip-column-names -e"$STOP_WSREP SELECT @@SLOW_QUERY_LOG"` +$MYSQL -e"$STOP_WSREP SET GLOBAL GENERAL_LOG=OFF" +$MYSQL -e"$STOP_WSREP SET GLOBAL SLOW_QUERY_LOG=OFF" + +# commands to restore log settings +RESTORE_GENERAL_LOG="SET GLOBAL GENERAL_LOG=$GENERAL_LOG_OPT;" +RESTORE_SLOW_QUERY_LOG="SET GLOBAL SLOW_QUERY_LOG=$SLOW_LOG_OPT;" + +if [ $BYPASS -eq 0 ] +then + (echo $STOP_WSREP && $MYSQLDUMP && echo $CSV_TABLES_FIX \ + && echo $RESTORE_GENERAL_LOG && echo $RESTORE_SLOW_QUERY_LOG \ + && echo $SET_START_POSITION \ + || echo "SST failed to complete;") | $MYSQL +else + echo "Bypassing state dump." >&2 + echo $SET_START_POSITION | $MYSQL +fi + +# diff --git a/scripts/wsrep_sst_rsync.sh b/scripts/wsrep_sst_rsync.sh new file mode 100644 index 00000000000..c0b54159eda --- /dev/null +++ b/scripts/wsrep_sst_rsync.sh @@ -0,0 +1,214 @@ +#!/bin/bash -ue + +# Copyright (C) 2010 Codership Oy +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; see the file COPYING. If not, write to the +# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston +# MA 02110-1301 USA. + +# This is a reference script for rsync-based state snapshot tansfer + +RSYNC_PID= +RSYNC_CONF= + +cleanup_joiner() +{ + echo "Joiner cleanup:" >&2 +set -x + local PID=$(cat "$RSYNC_PID" 2>/dev/null || echo 0) + [ "0" != "$PID" ] && kill $PID && sleep 0.5 && kill -9 $PID || : +set +x + rm -rf "$RSYNC_CONF" + rm -rf "$MAGIC_FILE" + rm -rf "$RSYNC_PID" +} + +check_pid() +{ + local pid_file=$1 + [ -r $pid_file ] && ps -p $(cat $pid_file) >/dev/null 2>&1 +} + +check_pid_and_port() +{ + local pid_file=$1 + local rsync_pid=$(cat $pid_file) + local rsync_port=$2 + + check_pid $pid_file && \ + netstat -anpt 2>/dev/null | \ + grep LISTEN | grep \:$rsync_port | grep $rsync_pid/rsync >/dev/null +} + +ROLE=$1 +ADDR=$2 +AUTH=$3 +DATA=$4 +CONF=$5 + +MAGIC_FILE="$DATA/rsync_sst_complete" +rm -rf "$MAGIC_FILE" + +if [ "$ROLE" = "donor" ] +then + UUID=$6 + SEQNO=$7 + BYPASS=$8 + + if [ $BYPASS -eq 0 ] + then + + FLUSHED="$DATA/tables_flushed" + rm -rf "$FLUSHED" + + # Use deltaxfer only for WAN + inv=$(basename $0) + [ "$inv" = "wsrep_sst_rsync_wan" ] && WHOLE_FILE_OPT="" \ + || WHOLE_FILE_OPT="--whole-file" + + echo "flush tables" + + # wait for tables flushed and state ID written to the file + while [ ! -r "$FLUSHED" ] && ! grep -q ':' "$FLUSHED" >/dev/null 2>&1 + do + sleep 0.2 + done + + STATE="$(cat $FLUSHED)" + rm -rf "$FLUSHED" + + sync + + # Old filter - include everything except selected + # FILTER=(--exclude '*.err' --exclude '*.pid' --exclude '*.sock' \ + # --exclude '*.conf' --exclude core --exclude 'galera.*' \ + # --exclude grastate.txt --exclude '*.pem' \ + # --exclude '*.[0-9][0-9][0-9][0-9][0-9][0-9]' --exclude '*.index') + + # New filter - exclude everything except dirs (schemas) and innodb files + FILTER=(-f '+ /ibdata*' -f '+ /ib_logfile*' -f '+ */' -f '-! */*') + + RC=0 + rsync --archive --no-times --ignore-times --inplace --delete --quiet \ + $WHOLE_FILE_OPT "${FILTER[@]}" "$DATA" rsync://$ADDR || RC=$? + + [ $RC -ne 0 ] && echo "rsync returned code $RC:" >> /dev/stderr + + case $RC in + 0) RC=0 # Success + ;; + 12) RC=71 # EPROTO + echo "rsync server on the other end has incompatible protocol. " \ + "Make sure you have the same version of rsync on all nodes."\ + >> /dev/stderr + ;; + 22) RC=12 # ENOMEM + ;; + *) RC=255 # unknown error + ;; + esac + + [ $RC -ne 0 ] && exit $RC + + else # BYPASS + STATE="$UUID:$SEQNO" + fi + + echo "continue" # now server can resume updating data + + echo "$STATE" > "$MAGIC_FILE" + rsync -aqc "$MAGIC_FILE" rsync://$ADDR + + echo "done $STATE" + +elif [ "$ROLE" = "joiner" ] +then + MYSQLD_PID=$6 + + MODULE="rsync_sst" + + RSYNC_PID="$DATA/$MODULE.pid" + + if check_pid $RSYNC_PID + then + echo "rsync daemon already running." + exit 114 # EALREADY + fi + rm -rf "$RSYNC_PID" + + RSYNC_PORT=$(echo $ADDR | awk -F ':' '{ print $2 }') + if [ -z "$RSYNC_PORT" ] + then + RSYNC_PORT=4444 + ADDR="$(echo $ADDR | awk -F ':' '{ print $1 }'):$RSYNC_PORT" + fi + + trap "exit 32" HUP PIPE + trap "exit 3" INT TERM + trap cleanup_joiner EXIT + + MYUID=$(id -u) + MYGID=$(id -g) + RSYNC_CONF="$DATA/$MODULE.conf" + +cat << EOF > "$RSYNC_CONF" +pid file = $RSYNC_PID +use chroot = no +[$MODULE] + path = $DATA + read only = no + timeout = 300 + uid = $MYUID + gid = $MYGID +EOF + +# rm -rf "$DATA"/ib_logfile* # we don't want old logs around + + # listen at all interfaces (for firewalled setups) + rsync --daemon --port $RSYNC_PORT --config "$RSYNC_CONF" + + until check_pid_and_port $RSYNC_PID $RSYNC_PORT + do + sleep 0.2 + done + + echo "ready $ADDR/$MODULE" + + # wait for SST to complete by monitoring magic file + while [ ! -r "$MAGIC_FILE" ] && check_pid "$RSYNC_PID" && \ + ps -p $MYSQLD_PID >/dev/null + do + sleep 1 + done + + if ! ps -p $MYSQLD_PID >/dev/null + then + echo "Parent mysqld process (PID:$MYSQLD_PID) terminated unexpectedly." >&2 + exit 32 + fi + + if [ -r "$MAGIC_FILE" ] + then + cat "$MAGIC_FILE" # output UUID:seqno + else + # this message should cause joiner to abort + echo "rsync process ended without creating '$MAGIC_FILE'" + fi + +# cleanup_joiner +else + echo "Unrecognized role: $ROLE" + exit 22 # EINVAL +fi + +exit 0 diff --git a/sql/wsrep_check_opts.cc b/sql/wsrep_check_opts.cc new file mode 100644 index 00000000000..5764be39093 --- /dev/null +++ b/sql/wsrep_check_opts.cc @@ -0,0 +1,392 @@ +/* Copyright 2011 Codership Oy <http://www.codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +//#include <mysqld.h> +#include <sql_class.h> +//#include <sql_plugin.h> +//#include <set_var.h> + +#include "wsrep_mysqld.h" + +#include <stdlib.h> +#include <string.h> +#include <errno.h> +#include <ctype.h> + +/* This file is about checking for correctness of mysql configuration options */ + +struct opt +{ + const char* const name; + const char* value; +}; + +/* A list of options to check. + * At first we assume default values and then see if they are changed on CLI or + * in my.cnf */ +static struct opt opts[] = +{ + { "wsrep_slave_threads", "1" }, // mysqld.cc + { "bind_address", "0.0.0.0" }, // mysqld.cc + { "wsrep_sst_method","mysqldump" }, // mysqld.cc + { "wsrep_sst_receive_address","AUTO"}, // mysqld.cc + { "binlog_format", "ROW" }, // mysqld.cc + { "wsrep_provider", "none" }, // mysqld.cc + { "query_cache_type", "0" }, // mysqld.cc + { "query_cache_size", "0" }, // mysqld.cc + { "locked_in_memory", "0" }, // mysqld.cc + { "wsrep_cluster_address", "0" }, // mysqld.cc + { "locks_unsafe_for_binlog", "0" }, // ha_innodb.cc + { "autoinc_lock_mode", "1" }, // ha_innodb.cc + { 0, 0 } +}; + +enum +{ + WSREP_SLAVE_THREADS, + BIND_ADDRESS, + WSREP_SST_METHOD, + WSREP_SST_RECEIVE_ADDRESS, + BINLOG_FORMAT, + WSREP_PROVIDER, + QUERY_CACHE_TYPE, + QUERY_CACHE_SIZE, + LOCKED_IN_MEMORY, + WSREP_CLUSTER_ADDRESS, + LOCKS_UNSAFE_FOR_BINLOG, + AUTOINC_LOCK_MODE +}; + + +/* A class to make a copy of argv[] vector */ +struct argv_copy +{ + int const argc_; + char** argv_; + + argv_copy (int const argc, const char* const argv[]) : + argc_ (argc), + argv_ (reinterpret_cast<char**>(calloc(argc_, sizeof(char*)))) + { + if (argv_) + { + for (int i = 0; i < argc_; ++i) + { + argv_[i] = strdup(argv[i]); + + if (!argv_[i]) + { + argv_free (); // free whatever bee allocated + return; + } + } + } + } + + ~argv_copy () { argv_free (); } + +private: + argv_copy (const argv_copy&); + argv_copy& operator= (const argv_copy&); + + void argv_free() + { + if (argv_) + { + for (int i = 0; (i < argc_) && argv_[i] ; ++i) free (argv_[i]); + free (argv_); + argv_ = 0; + } + } +}; + +/* a short corresponding to '--' byte sequence */ +static short const long_opt_prefix ('-' + ('-' << 8)); + +/* Normalizes long options to have '_' instead of '-' */ +static int +normalize_opts (argv_copy& a) +{ + if (a.argv_) + { + for (int i = 0; i < a.argc_; ++i) + { + char* ptr = a.argv_[i]; + if (long_opt_prefix == *(short*)ptr) // long option + { + ptr += 2; + const char* end = strchr(ptr, '='); + + if (!end) end = ptr + strlen(ptr); + + for (; ptr != end; ++ptr) if ('-' == *ptr) *ptr = '_'; + } + } + + return 0; + } + + return EINVAL; +} + +/* Find required options in the argument list and change their values */ +static int +find_opts (argv_copy& a, struct opt* const opts) +{ + for (int i = 0; i < a.argc_; ++i) + { + char* ptr = a.argv_[i] + 2; // we're interested only in long options + + struct opt* opt = opts; + for (; 0 != opt->name; ++opt) + { + if (!strstr(ptr, opt->name)) continue; // try next option + + /* 1. try to find value after the '=' */ + opt->value = strchr(ptr, '=') + 1; + + /* 2. if no '=', try next element in the argument vector */ + if (reinterpret_cast<void*>(1) == opt->value) + { + /* also check that the next element is not an option itself */ + if (i + 1 < a.argc_ && *(a.argv_[i + 1]) != '-') + { + ++i; + opt->value = a.argv_[i]; + } + else opt->value = ""; // no value supplied (like boolean opt) + } + + break; // option found, break inner loop + } + } + + return 0; +} + +/* Parses string for an integer. Returns 0 on success. */ +int get_long_long (const struct opt& opt, long long* const val, int const base) +{ + const char* const str = opt.value; + + if ('\0' != *str) + { + char* endptr; + + *val = strtoll (str, &endptr, base); + + if ('k' == *endptr || 'K' == *endptr) + { + *val *= 1024L; + endptr++; + } + else if ('m' == *endptr || 'M' == *endptr) + { + *val *= 1024L * 1024L; + endptr++; + } + else if ('g' == *endptr || 'G' == *endptr) + { + *val *= 1024L * 1024L * 1024L; + endptr++; + } + + if ('\0' == *endptr) return 0; // the whole string was a valid integer + } + + WSREP_ERROR ("Bad value for *%s: '%s'. Should be integer.", + opt.name, opt.value); + + return EINVAL; +} + +/* This is flimzy coz hell knows how mysql interprets boolean strings... + * and, no, I'm not going to become versed in how mysql handles options - + * I'd rather sing. + + Aha, http://dev.mysql.com/doc/refman/5.1/en/dynamic-system-variables.html: + Variables that have a type of “boolean” can be set to 0, 1, ON or OFF. (If you + set them on the command line or in an option file, use the numeric values.) + + So it is '0' for FALSE, '1' or empty string for TRUE + + */ +int get_bool (const struct opt& opt, bool* const val) +{ + const char* str = opt.value; + + while (isspace(*str)) ++str; // skip initial whitespaces + + ssize_t str_len = strlen(str); + switch (str_len) + { + case 0: + *val = true; + return 0; + case 1: + if ('0' == *str || '1' == *str) + { + *val = ('1' == *str); + return 0; + } + } + + WSREP_ERROR ("Bad value for *%s: '%s'. Should be '0', '1' or empty string.", + opt.name, opt.value); + + return EINVAL; +} + +static int +check_opts (int const argc, const char* const argv[], struct opt opts[]) +{ + /* First, make a copy of argv to be able to manipulate it */ + argv_copy a(argc, argv); + + if (!a.argv_) + { + WSREP_ERROR ("Could not copy argv vector: not enough memory."); + return ENOMEM; + } + + int err = normalize_opts (a); + if (err) + { + WSREP_ERROR ("Failed to normalize options."); + return err; + } + + err = find_opts (a, opts); + if (err) + { + WSREP_ERROR ("Failed to parse options."); + return err; + } + + /* At this point we have updated default values in our option list to + what has been specified on the command line / my.cnf */ + + long long slave_threads; + err = get_long_long (opts[WSREP_SLAVE_THREADS], &slave_threads, 10); + if (err) return err; + + int rcode = 0; + + if (slave_threads > 1) + /* Need to check AUTOINC_LOCK_MODE and LOCKS_UNSAFE_FOR_BINLOG */ + { + long long autoinc_lock_mode; + err = get_long_long (opts[AUTOINC_LOCK_MODE], &autoinc_lock_mode, 10); + if (err) return err; + + bool locks_unsafe_for_binlog; + err = get_bool (opts[LOCKS_UNSAFE_FOR_BINLOG],&locks_unsafe_for_binlog); + if (err) return err; + + if (autoinc_lock_mode != 2) + { + WSREP_ERROR ("Parallel applying (wsrep_slave_threads > 1) requires" + " innodb_autoinc_lock_mode = 2."); + rcode = EINVAL; + } + } + + long long query_cache_size, query_cache_type; + if ((err = get_long_long (opts[QUERY_CACHE_SIZE], &query_cache_size, 10))) + return err; + if ((err = get_long_long (opts[QUERY_CACHE_TYPE], &query_cache_type, 10))) + return err; + + if (0 != query_cache_size && 0 != query_cache_type) + { + WSREP_ERROR ("Query cache is not supported (size=%lld type=%lld)", + query_cache_size, query_cache_type); + rcode = EINVAL; + } + + bool locked_in_memory; + err = get_bool (opts[LOCKED_IN_MEMORY], &locked_in_memory); + if (err) { WSREP_ERROR("get_bool error: %s", strerror(err)); return err; } + if (locked_in_memory) + { + WSREP_ERROR ("Memory locking is not supported (locked_in_memory=%s)", + locked_in_memory ? "ON" : "OFF"); + rcode = EINVAL; + } + + if (!strcasecmp(opts[WSREP_SST_METHOD].value,"mysqldump")) + { + if (!strcasecmp(opts[BIND_ADDRESS].value, "127.0.0.1") || + !strcasecmp(opts[BIND_ADDRESS].value, "localhost")) + { + WSREP_ERROR ("wsrep_sst_method is set to 'mysqldump' yet " + "mysqld bind_address is set to '%s', which makes it " + "impossible to receive state transfer from another " + "node, since mysqld won't accept such connections. " + "If you wish to use mysqldump state transfer method, " + "set bind_address to allow mysql client connections " + "from other cluster members (e.g. 0.0.0.0).", + opts[BIND_ADDRESS].value); + rcode = EINVAL; + } + } + else + { + // non-mysqldump SST requires wsrep_cluster_address on startup + if (strlen(opts[WSREP_CLUSTER_ADDRESS].value) == 0) + { + WSREP_ERROR ("%s SST method requires wsrep_cluster_address to be " + "configured on startup.",opts[WSREP_SST_METHOD].value); + rcode = EINVAL; + } + } + + if (strcasecmp(opts[WSREP_SST_RECEIVE_ADDRESS].value, "AUTO")) + { + if (!strncasecmp(opts[WSREP_SST_RECEIVE_ADDRESS].value, + "127.0.0.1", strlen("127.0.0.1")) || + !strncasecmp(opts[WSREP_SST_RECEIVE_ADDRESS].value, + "localhost", strlen("localhost"))) + { + WSREP_WARN ("wsrep_sst_receive_address is set to '%s' which " + "makes it impossible for another host to reach this " + "one. Please set it to the address which this node " + "can be connected at by other cluster members.", + opts[WSREP_SST_RECEIVE_ADDRESS].value); +// rcode = EINVAL; + } + } + + if (strcasecmp(opts[WSREP_PROVIDER].value, "none")) + { + if (strcasecmp(opts[BINLOG_FORMAT].value, "ROW")) + { + WSREP_ERROR ("Only binlog_format = 'ROW' is currently supported. " + "Configured value: '%s'. Please adjust your " + "configuration.", opts[BINLOG_FORMAT].value); + + rcode = EINVAL; + } + } + + return rcode; +} + +int +wsrep_check_opts (int const argc, char* const* const argv) +{ + return check_opts (argc, argv, opts); +} + diff --git a/sql/wsrep_hton.cc b/sql/wsrep_hton.cc new file mode 100644 index 00000000000..9986d7c79cd --- /dev/null +++ b/sql/wsrep_hton.cc @@ -0,0 +1,419 @@ +/* Copyright 2008 Codership Oy <http://www.codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include <mysqld.h> +#include "sql_base.h" +#include <sql_class.h> +#include "wsrep_mysqld.h" +#include "wsrep_priv.h" +#include <cstdio> +#include <cstdlib> + +extern handlerton *binlog_hton; +extern int binlog_close_connection(handlerton *hton, THD *thd); +extern ulonglong thd_to_trx_id(THD *thd); + +extern "C" int thd_binlog_format(const MYSQL_THD thd); +// todo: share interface with ha_innodb.c + +enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all); + +/* + a post-commit cleanup on behalf of wsrep. Can't be a part of hton struct. + Is called by THD::transactions.cleanup() +*/ +void wsrep_cleanup_transaction(THD *thd) +{ + if (thd->thread_id == 0) return; + if (thd->wsrep_exec_mode == LOCAL_COMMIT) + { + if (thd->variables.wsrep_on && + thd->wsrep_conflict_state != MUST_REPLAY) + { + if (thd->wsrep_seqno_changed) + { + if (wsrep->post_commit(wsrep, &thd->wsrep_trx_handle)) + { + DBUG_PRINT("wsrep", ("set committed fail")); + WSREP_WARN("set committed fail: %llu %d", + (long long)thd->real_id, thd->stmt_da->status()); + } + } + //else + //WSREP_DEBUG("no trx handle for %s", thd->query()); + thd_binlog_trx_reset(thd); + thd->wsrep_seqno_changed = false; + } + thd->wsrep_exec_mode= LOCAL_STATE; + } +} + +/* + wsrep hton +*/ +handlerton *wsrep_hton; + +void wsrep_register_hton(THD* thd, bool all) +{ + THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt; + for (Ha_trx_info *i= trans->ha_list; WSREP(thd) && i; i = i->next()) + { + if (i->ht()->db_type == DB_TYPE_INNODB) + { + trans_register_ha(thd, all, wsrep_hton); + thd->ha_data[wsrep_hton->slot].ha_info[all].set_trx_read_write(); + break; + } + } +} + +/* + wsrep exploits binlog's caches even if binlogging itself is not + activated. In such case connection close needs calling + actual binlog's method. + Todo: split binlog hton from its caches to use ones by wsrep + without referring to binlog's stuff. +*/ +static int +wsrep_close_connection(handlerton* hton, THD* thd) +{ + DBUG_ENTER("wsrep_close_connection"); + if (thd_get_ha_data(thd, binlog_hton) != NULL) + binlog_hton->close_connection (binlog_hton, thd); + DBUG_RETURN(0); +} + +/* + prepare/wsrep_run_wsrep_commit can fail in two ways + - certification test or an equivalent. As a result, + the current transaction just rolls back + Error codes: + WSREP_TRX_ROLLBACK, WSREP_TRX_ERROR + - a post-certification failure makes this server unable to + commit its own WS and therefore the server must abort +*/ +static int wsrep_prepare(handlerton *hton, THD *thd, bool all) +{ +#ifndef DBUG_OFF + //wsrep_seqno_t old = thd->wsrep_trx_seqno; +#endif + DBUG_ENTER("wsrep_prepare"); + if ((all || + !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) && + (thd->variables.wsrep_on && !wsrep_trans_cache_is_empty(thd))) + { + switch (wsrep_run_wsrep_commit(thd, hton, all)) + { + case WSREP_TRX_OK: + // DBUG_ASSERT(thd->wsrep_trx_seqno > old || + // thd->wsrep_exec_mode == REPL_RECV || + // thd->wsrep_exec_mode == TOTAL_ORDER); + break; + case WSREP_TRX_ROLLBACK: + case WSREP_TRX_ERROR: + DBUG_RETURN(1); + } + } + DBUG_RETURN(0); +} + +static int wsrep_savepoint_set(handlerton *hton, THD *thd, void *sv) +{ + if (!wsrep_emulate_bin_log) return 0; + int rcode = binlog_hton->savepoint_set(binlog_hton, thd, sv); + return rcode; +} +static int wsrep_savepoint_rollback(handlerton *hton, THD *thd, void *sv) +{ + if (!wsrep_emulate_bin_log) return 0; + int rcode = binlog_hton->savepoint_rollback(binlog_hton, thd, sv); + return rcode; +} + +static int wsrep_rollback(handlerton *hton, THD *thd, bool all) +{ + DBUG_ENTER("wsrep_rollback"); + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + if ((all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) && + (thd->variables.wsrep_on && thd->wsrep_conflict_state != MUST_REPLAY)) + { + if (wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle)) + { + DBUG_PRINT("wsrep", ("setting rollback fail")); + WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s", + (long long)thd->real_id, thd->query()); + } + } + + int rcode = 0; + if (!wsrep_emulate_bin_log) + { + if (all) thd_binlog_trx_reset(thd); + } + + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + DBUG_RETURN(rcode); +} + +int wsrep_commit(handlerton *hton, THD *thd, bool all) +{ + DBUG_ENTER("wsrep_commit"); + + DBUG_RETURN(0); +} + +extern my_bool opt_log_slave_updates; +enum wsrep_trx_status +wsrep_run_wsrep_commit( + THD *thd, handlerton *hton, bool all) +{ + int rcode = -1; + uint data_len = 0; + uchar *rbr_data = NULL; + IO_CACHE *cache; + int replay_round= 0; + + if (thd->stmt_da->is_error()) { + WSREP_ERROR("commit issue, error: %d %s", + thd->stmt_da->sql_errno(), thd->stmt_da->message()); + } + + DBUG_ENTER("wsrep_run_wsrep_commit"); + if (thd->slave_thread && !opt_log_slave_updates) { + DBUG_RETURN(WSREP_TRX_OK); + } + if (thd->wsrep_exec_mode == REPL_RECV) { + + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + if (thd->wsrep_conflict_state == MUST_ABORT) { + if (wsrep_debug) + WSREP_INFO("WSREP: must abort for BF"); + DBUG_PRINT("wsrep", ("BF apply commit fail")); + thd->wsrep_conflict_state = NO_CONFLICT; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + // + // TODO: test all calls of the rollback. + // rollback must happen automagically innobase_rollback(hton, thd, 1); + // + DBUG_RETURN(WSREP_TRX_ERROR); + } + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + } + if (thd->wsrep_exec_mode != LOCAL_STATE) { + DBUG_RETURN(WSREP_TRX_OK); + } + if (thd->wsrep_consistency_check) { + WSREP_DEBUG("commit for consistency check: %s", thd->query()); + DBUG_RETURN(WSREP_TRX_OK); + } + + DBUG_PRINT("wsrep", ("replicating commit")); + + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + if (thd->wsrep_conflict_state == MUST_ABORT) { + DBUG_PRINT("wsrep", ("replicate commit fail")); + thd->wsrep_conflict_state = ABORTED; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + if (wsrep_debug) { + WSREP_INFO("innobase_commit, abort %s", + (thd->query()) ? thd->query() : "void"); + } + DBUG_RETURN(WSREP_TRX_ROLLBACK); + } + + mysql_mutex_lock(&LOCK_wsrep_replaying); + + while (wsrep_replaying > 0 && + thd->wsrep_conflict_state == NO_CONFLICT && + thd->killed == NOT_KILLED && + !shutdown_in_progress) { + + mysql_mutex_unlock(&LOCK_wsrep_replaying); + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + + mysql_mutex_lock(&thd->mysys_var->mutex); + thd_proc_info(thd, "wsrep waiting on replaying"); + thd->mysys_var->current_mutex= &LOCK_wsrep_replaying; + thd->mysys_var->current_cond= &COND_wsrep_replaying; + mysql_mutex_unlock(&thd->mysys_var->mutex); + + mysql_mutex_lock(&LOCK_wsrep_replaying); + // Using timedwait is a hack to avoid deadlock in case if BF victim + // misses the signal. + struct timespec wtime = {0, 1000000}; + mysql_cond_timedwait(&COND_wsrep_replaying, &LOCK_wsrep_replaying, + &wtime); + if (replay_round++ % 100000 == 0) + WSREP_DEBUG("commit waiting for replaying: replayers %d, thd: (%lu) conflict: %d (round: %d)", + wsrep_replaying, thd->thread_id, thd->wsrep_conflict_state, replay_round); + + mysql_mutex_unlock(&LOCK_wsrep_replaying); + + mysql_mutex_lock(&thd->mysys_var->mutex); + thd->mysys_var->current_mutex= 0; + thd->mysys_var->current_cond= 0; + mysql_mutex_unlock(&thd->mysys_var->mutex); + + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + mysql_mutex_lock(&LOCK_wsrep_replaying); + } + mysql_mutex_unlock(&LOCK_wsrep_replaying); + + if (thd->wsrep_conflict_state == MUST_ABORT) { + DBUG_PRINT("wsrep", ("replicate commit fail")); + thd->wsrep_conflict_state = ABORTED; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + WSREP_DEBUG("innobase_commit abort after replaying wait %s", + (thd->query()) ? thd->query() : "void"); + DBUG_RETURN(WSREP_TRX_ROLLBACK); + } thd->wsrep_query_state = QUERY_COMMITTING; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + + cache = get_trans_log(thd); + rcode = 0; + if (cache) { + thd->binlog_flush_pending_rows_event(true); + rcode = wsrep_write_cache(cache, &rbr_data, &data_len); + if (rcode) { + WSREP_ERROR("rbr write fail, data_len: %d, %d", data_len, rcode); + if (data_len) my_free(rbr_data); + DBUG_RETURN(WSREP_TRX_ROLLBACK); + } + } + if (data_len == 0) + { + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + thd->wsrep_exec_mode = LOCAL_COMMIT; + WSREP_DEBUG("empty rbr buffer, query: %s", thd->query()); + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + DBUG_RETURN(WSREP_TRX_OK); + } + if (!rcode) { + rcode = wsrep->pre_commit( + wsrep, + (wsrep_conn_id_t)thd->thread_id, + &thd->wsrep_trx_handle, + rbr_data, + data_len, + (thd->wsrep_PA_safe) ? WSREP_FLAG_PA_SAFE : 0ULL, + &thd->wsrep_trx_seqno); + if (rcode == WSREP_TRX_MISSING) { + rcode = WSREP_OK; + } else if (rcode == WSREP_BF_ABORT) { + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + thd->wsrep_conflict_state = MUST_REPLAY; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + mysql_mutex_lock(&LOCK_wsrep_replaying); + wsrep_replaying++; + WSREP_DEBUG("replaying increased: %d, thd: %lu", + wsrep_replaying, thd->thread_id); + mysql_mutex_unlock(&LOCK_wsrep_replaying); + } + thd->wsrep_seqno_changed = true; + } else { + WSREP_ERROR("I/O error reading from thd's binlog iocache: " + "errno=%d, io cache code=%d", my_errno, cache->error); + if (data_len) my_free(rbr_data); + DBUG_ASSERT(0); // failure like this can not normally happen + DBUG_RETURN(WSREP_TRX_ERROR); + } + + if (data_len) { + my_free(rbr_data); + } + + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + switch(rcode) { + case 0: + thd->wsrep_exec_mode = LOCAL_COMMIT; + /* Override XID iff it was generated by mysql */ + if (thd->transaction.xid_state.xid.get_my_xid()) + { + wsrep_xid_init(&thd->transaction.xid_state.xid, + wsrep_cluster_uuid(), + thd->wsrep_trx_seqno); + } + DBUG_PRINT("wsrep", ("replicating commit success")); + + break; + case WSREP_TRX_FAIL: + case WSREP_BF_ABORT: + WSREP_DEBUG("commit failed for reason: %d", rcode); + DBUG_PRINT("wsrep", ("replicating commit fail")); + + thd->wsrep_query_state= QUERY_EXEC; + + if (thd->wsrep_conflict_state == MUST_ABORT) { + thd->wsrep_conflict_state= ABORTED; + } + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + + DBUG_RETURN(WSREP_TRX_ROLLBACK); + + case WSREP_CONN_FAIL: + WSREP_ERROR("connection failure"); + DBUG_RETURN(WSREP_TRX_ERROR); + default: + WSREP_ERROR("unknown connection failure"); + DBUG_RETURN(WSREP_TRX_ERROR); + } + + thd->wsrep_query_state= QUERY_EXEC; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + + DBUG_RETURN(WSREP_TRX_OK); +} + + +static int wsrep_hton_init(void *p) +{ + wsrep_hton= (handlerton *)p; + //wsrep_hton->state=opt_bin_log ? SHOW_OPTION_YES : SHOW_OPTION_NO; + wsrep_hton->state= SHOW_OPTION_YES; + wsrep_hton->db_type=DB_TYPE_WSREP; + wsrep_hton->savepoint_offset= sizeof(my_off_t); + wsrep_hton->close_connection= wsrep_close_connection; + wsrep_hton->savepoint_set= wsrep_savepoint_set; + wsrep_hton->savepoint_rollback= wsrep_savepoint_rollback; + wsrep_hton->commit= wsrep_commit; + wsrep_hton->rollback= wsrep_rollback; + wsrep_hton->prepare= wsrep_prepare; + wsrep_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN; // todo: fix flags + wsrep_hton->slot= 0; + return 0; +} + + +struct st_mysql_storage_engine wsrep_storage_engine= +{ MYSQL_HANDLERTON_INTERFACE_VERSION }; + + +mysql_declare_plugin(wsrep) +{ + MYSQL_STORAGE_ENGINE_PLUGIN, + &wsrep_storage_engine, + "wsrep", + "Codership Oy", + "A pseudo storage engine to represent transactions in multi-master synchornous replication", + PLUGIN_LICENSE_GPL, + wsrep_hton_init, /* Plugin Init */ + NULL, /* Plugin Deinit */ + 0x0100 /* 1.0 */, + NULL, /* status variables */ + NULL, /* system variables */ + NULL, /* config options */ + 0, /* flags */ +} +mysql_declare_plugin_end; diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc new file mode 100644 index 00000000000..5a5250652b8 --- /dev/null +++ b/sql/wsrep_mysqld.cc @@ -0,0 +1,1060 @@ +/* Copyright 2008 Codership Oy <http://www.codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include <mysqld.h> +#include <sql_class.h> +#include <sql_parse.h> +#include "wsrep_priv.h" +#include <cstdio> +#include <cstdlib> +#include "log_event.h" + +wsrep_t *wsrep = NULL; +my_bool wsrep_emulate_bin_log = FALSE; // activating parts of binlog interface + +/* + * Begin configuration options and their default values + */ + +const char* wsrep_data_home_dir = NULL; + +#define WSREP_NODE_INCOMING_AUTO "AUTO" +const char* wsrep_node_incoming_address = WSREP_NODE_INCOMING_AUTO; +const char* wsrep_dbug_option = ""; + +long wsrep_slave_threads = 1; // # of slave action appliers wanted +my_bool wsrep_debug = 0; // enable debug level logging +my_bool wsrep_convert_LOCK_to_trx = 1; // convert locking sessions to trx +ulong wsrep_retry_autocommit = 5; // retry aborted autocommit trx +my_bool wsrep_auto_increment_control = 1; // control auto increment variables +my_bool wsrep_drupal_282555_workaround = 1; // retry autoinc insert after dupkey +my_bool wsrep_incremental_data_collection = 0; // incremental data collection +long long wsrep_max_ws_size = 1073741824LL; //max ws (RBR buffer) size +long wsrep_max_ws_rows = 65536; // max number of rows in ws +int wsrep_to_isolation = 0; // # of active TO isolation threads +my_bool wsrep_certify_nonPK = 1; // certify, even when no primary key +long wsrep_max_protocol_version = 1; // maximum protocol version to use +ulong wsrep_forced_binlog_format = BINLOG_FORMAT_UNSPEC; +my_bool wsrep_recovery = 0; // recovery + +/* + * End configuration options + */ + +static wsrep_uuid_t cluster_uuid = WSREP_UUID_UNDEFINED; +const wsrep_uuid_t* wsrep_cluster_uuid() +{ + return &cluster_uuid; +} +static char cluster_uuid_str[40]= { 0, }; +static const char* cluster_status_str[WSREP_VIEW_MAX] = +{ + "Primary", + "non-Primary", + "Disconnected" +}; + +static char provider_name[256]= { 0, }; +static char provider_version[256]= { 0, }; +static char provider_vendor[256]= { 0, }; + +/* + * wsrep status variables + */ +my_bool wsrep_connected = FALSE; +my_bool wsrep_ready = FALSE; // node can accept queries +const char* wsrep_cluster_state_uuid = cluster_uuid_str; +long long wsrep_cluster_conf_id = WSREP_SEQNO_UNDEFINED; +const char* wsrep_cluster_status = cluster_status_str[WSREP_VIEW_DISCONNECTED]; +long wsrep_cluster_size = 0; +long wsrep_local_index = -1; +const char* wsrep_provider_name = provider_name; +const char* wsrep_provider_version = provider_version; +const char* wsrep_provider_vendor = provider_vendor; +/* End wsrep status variables */ + + +wsrep_uuid_t local_uuid = WSREP_UUID_UNDEFINED; +wsrep_seqno_t local_seqno = WSREP_SEQNO_UNDEFINED; +wsp::node_status local_status; +long wsrep_protocol_version = 1; + +// action execute callback +extern wsrep_status_t wsrep_apply_cb(void *ctx, + const void* buf, size_t buf_len, + wsrep_seqno_t global_seqno); + +extern wsrep_status_t wsrep_commit_cb (void *ctx, + wsrep_seqno_t global_seqno, + bool commit); + +static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) { + switch (level) { + case WSREP_LOG_INFO: + sql_print_information("WSREP: %s", msg); + break; + case WSREP_LOG_WARN: + sql_print_warning("WSREP: %s", msg); + break; + case WSREP_LOG_ERROR: + case WSREP_LOG_FATAL: + sql_print_error("WSREP: %s", msg); + break; + case WSREP_LOG_DEBUG: + if (wsrep_debug) sql_print_information ("[Debug] WSREP: %s", msg); + default: + break; + } +} + +static void wsrep_log_states (wsrep_log_level_t level, + wsrep_uuid_t* group_uuid, + wsrep_seqno_t group_seqno, + wsrep_uuid_t* node_uuid, + wsrep_seqno_t node_seqno) +{ + char uuid_str[37]; + char msg[256]; + + wsrep_uuid_print (group_uuid, uuid_str, sizeof(uuid_str)); + snprintf (msg, 255, "WSREP: Group state: %s:%lld", + uuid_str, (long long)group_seqno); + wsrep_log_cb (level, msg); + + wsrep_uuid_print (node_uuid, uuid_str, sizeof(uuid_str)); + snprintf (msg, 255, "WSREP: Local state: %s:%lld", + uuid_str, (long long)node_seqno); + wsrep_log_cb (level, msg); +} + +static my_bool set_SE_checkpoint(THD* unused, plugin_ref plugin, void* arg) +{ + XID* xid= reinterpret_cast<XID*>(arg); + handlerton* hton= plugin_data(plugin, handlerton *); + if (hton->db_type == DB_TYPE_INNODB) + { + const wsrep_uuid_t* uuid(wsrep_xid_uuid(xid)); + char uuid_str[40] = {0, }; + wsrep_uuid_print(uuid, uuid_str, sizeof(uuid_str)); + WSREP_DEBUG("Set WSREPXid for InnoDB: %s:%lld", + uuid_str, (long long)wsrep_xid_seqno(xid)); + hton->wsrep_set_checkpoint(hton, xid); + } + return FALSE; +} + +void wsrep_set_SE_checkpoint(XID* xid) +{ + plugin_foreach(NULL, set_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, xid); +} + +static my_bool get_SE_checkpoint(THD* unused, plugin_ref plugin, void* arg) +{ + XID* xid= reinterpret_cast<XID*>(arg); + handlerton* hton= plugin_data(plugin, handlerton *); + if (hton->db_type == DB_TYPE_INNODB) + { + hton->wsrep_get_checkpoint(hton, xid); + const wsrep_uuid_t* uuid(wsrep_xid_uuid(xid)); + char uuid_str[40] = {0, }; + wsrep_uuid_print(uuid, uuid_str, sizeof(uuid_str)); + WSREP_DEBUG("Read WSREPXid from InnoDB: %s:%lld", + uuid_str, (long long)wsrep_xid_seqno(xid)); + + } + return FALSE; +} + +void wsrep_get_SE_checkpoint(XID* xid) +{ + plugin_foreach(NULL, get_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, xid); +} + +static void wsrep_view_handler_cb (void* app_ctx, + void* recv_ctx, + const wsrep_view_info_t* view, + const char* state, + size_t state_len, + void** sst_req, + ssize_t* sst_req_len) +{ + wsrep_member_status_t new_status= local_status.get(); + + if (memcmp(&cluster_uuid, &view->uuid, sizeof(wsrep_uuid_t))) + { + cluster_uuid= view->uuid; + wsrep_uuid_print (&cluster_uuid, cluster_uuid_str, + sizeof(cluster_uuid_str)); + } + + wsrep_cluster_conf_id= view->view; + wsrep_cluster_status= cluster_status_str[view->status]; + wsrep_cluster_size= view->memb_num; + wsrep_local_index= view->my_idx; + + WSREP_INFO("New cluster view: global state: %s:%lld, view# %lld: %s, " + "number of nodes: %ld, my index: %ld, protocol version %d", + wsrep_cluster_state_uuid, (long long)view->seqno, + (long long)wsrep_cluster_conf_id, wsrep_cluster_status, + wsrep_cluster_size, wsrep_local_index, view->proto_ver); + + /* Proceed further only if view is PRIMARY */ + if (WSREP_VIEW_PRIMARY != view->status) { + wsrep_ready= FALSE; + new_status= WSREP_MEMBER_UNDEFINED; + /* Always record local_uuid and local_seqno in non-prim since this + * may lead to re-initializing provider and start position is + * determined according to these variables */ + // WRONG! local_uuid should be the last primary configuration uuid we were + // a member of. local_seqno should be updated in commit calls. + // local_uuid= cluster_uuid; + // local_seqno= view->first - 1; + goto out; + } + + switch (view->proto_ver) + { + case 0: + case 1: + // version change + if (view->proto_ver != wsrep_protocol_version) + { + my_bool wsrep_ready_saved= wsrep_ready; + wsrep_ready= FALSE; + WSREP_INFO("closing client connections for " + "protocol change %ld -> %d", + wsrep_protocol_version, view->proto_ver); + wsrep_close_client_connections(TRUE); + wsrep_protocol_version= view->proto_ver; + wsrep_ready= wsrep_ready_saved; + } + break; + default: + WSREP_ERROR("Unsupported application protocol version: %d", + view->proto_ver); + unireg_abort(1); + } + + if (view->state_gap) + { + WSREP_WARN("Gap in state sequence. Need state transfer."); + + /* After that wsrep will call wsrep_sst_prepare. */ + /* keep ready flag 0 until we receive the snapshot */ + wsrep_ready= FALSE; + + /* Close client connections to ensure that they don't interfere + * with SST */ + WSREP_DEBUG("[debug]: closing client connections for PRIM"); + wsrep_close_client_connections(TRUE); + + *sst_req_len= wsrep_sst_prepare (sst_req); + + if (*sst_req_len < 0) + { + int err = *sst_req_len; + WSREP_ERROR("SST preparation failed: %d (%s)", -err, strerror(-err)); + new_status= WSREP_MEMBER_UNDEFINED; + } + else + { + new_status= WSREP_MEMBER_JOINER; + } + } + else + { + /* + * NOTE: Initialize wsrep_group_uuid here only if it wasn't initialized + * before. + */ + if (!memcmp (&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(wsrep_uuid_t))) + { + if (wsrep_init_first()) + { + wsrep_SE_init_grab(); + // Signal init thread to continue + wsrep_sst_complete (&cluster_uuid, view->seqno, false); + // and wait for SE initialization + wsrep_SE_init_wait(); + } + else + { + local_uuid= cluster_uuid; + local_seqno= view->seqno; + } + /* Init storage engine XIDs from first view */ + XID xid; + wsrep_xid_init(&xid, &local_uuid, local_seqno); + wsrep_set_SE_checkpoint(&xid); + new_status= WSREP_MEMBER_JOINED; + } + else // just some sanity check + { + if (memcmp (&local_uuid, &cluster_uuid, sizeof (wsrep_uuid_t))) + { + WSREP_ERROR("Undetected state gap. Can't continue."); + wsrep_log_states (WSREP_LOG_FATAL, &cluster_uuid, view->seqno, + &local_uuid, -1); + abort(); + } + } + } + + if (wsrep_auto_increment_control) + { + global_system_variables.auto_increment_offset= view->my_idx + 1; + global_system_variables.auto_increment_increment= view->memb_num; + } + +out: + + local_status.set(new_status, view); +} + +// Wait until wsrep has reached ready state +void wsrep_ready_wait () +{ + if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort(); + while (!wsrep_ready) + { + WSREP_INFO("Waiting to reach ready state"); + mysql_cond_wait (&COND_wsrep_ready, &LOCK_wsrep_ready); + } + WSREP_INFO("ready state reached"); + mysql_mutex_unlock (&LOCK_wsrep_ready); +} + +static void wsrep_synced_cb(void* app_ctx) +{ + WSREP_INFO("Synchronized with group, ready for connections"); + if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort(); + if (!wsrep_ready) + { + wsrep_ready= TRUE; + mysql_cond_signal (&COND_wsrep_ready); + } + local_status.set(WSREP_MEMBER_SYNCED); + mysql_mutex_unlock (&LOCK_wsrep_ready); +} + +static void wsrep_init_position() +{ + /* read XIDs from storage engines */ + XID xid; + memset(&xid, 0, sizeof(xid)); + xid.formatID= -1; + wsrep_get_SE_checkpoint(&xid); + + if (xid.formatID == -1) + { + WSREP_INFO("Read nil XID from storage engines, skipping position init"); + return; + } + else if (!wsrep_is_wsrep_xid(&xid)) + { + WSREP_WARN("Read non-wsrep XID from storage engines, skipping position init"); + return; + } + + const wsrep_uuid_t* uuid= wsrep_xid_uuid(&xid); + const wsrep_seqno_t seqno= wsrep_xid_seqno(&xid); + + char uuid_str[40] = {0, }; + wsrep_uuid_print(uuid, uuid_str, sizeof(uuid_str)); + WSREP_INFO("Initial position: %s:%lld", uuid_str, (long long)seqno); + + + if (!memcmp(&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(local_uuid)) && + local_seqno == WSREP_SEQNO_UNDEFINED) + { + // Initial state + local_uuid= *uuid; + local_seqno= seqno; + } + else if (memcmp(&local_uuid, uuid, sizeof(local_uuid)) || + local_seqno != seqno) + { + WSREP_WARN("Initial position was provided by configuration or SST, " + "avoiding override"); + } +} + + +int wsrep_init() +{ + int rcode= -1; + + wsrep_ready= FALSE; + assert(wsrep_provider); + + wsrep_init_position(); + + if ((rcode= wsrep_load(wsrep_provider, &wsrep, wsrep_log_cb)) != WSREP_OK) + { + if (strcasecmp(wsrep_provider, WSREP_NONE)) + { + WSREP_ERROR("wsrep_load(%s) failed: %s (%d). Reverting to no provider.", + wsrep_provider, strerror(rcode), rcode); + strcpy((char*)wsrep_provider, WSREP_NONE); // damn it's a dirty hack + (void) wsrep_init(); + return rcode; + } + else /* this is for recursive call above */ + { + WSREP_ERROR("Could not revert to no provider: %s (%d). Need to abort.", + strerror(rcode), rcode); + unireg_abort(1); + } + } + + if (strlen(wsrep_provider)== 0 || + !strcmp(wsrep_provider, WSREP_NONE)) + { + // enable normal operation in case no provider is specified + wsrep_ready= TRUE; + global_system_variables.wsrep_on = 0; + } + else + { + global_system_variables.wsrep_on = 1; + strncpy(provider_name, + wsrep->provider_name, sizeof(provider_name) - 1); + strncpy(provider_version, + wsrep->provider_version, sizeof(provider_version) - 1); + strncpy(provider_vendor, + wsrep->provider_vendor, sizeof(provider_vendor) - 1); + } + + struct wsrep_init_args wsrep_args; + + if (!wsrep_data_home_dir || strlen(wsrep_data_home_dir) == 0) + wsrep_data_home_dir = mysql_real_data_home; + + if (strcmp (wsrep_provider, WSREP_NONE) && + (!wsrep_node_incoming_address || + !strcmp (wsrep_node_incoming_address, WSREP_NODE_INCOMING_AUTO))) { + static char inc_addr[256]; + size_t inc_addr_max = sizeof (inc_addr); + size_t ret = default_address (inc_addr, inc_addr_max); + if (ret > 0 && ret < inc_addr_max) { + wsrep_node_incoming_address = inc_addr; + } + else { + wsrep_node_incoming_address = NULL; + } + } + + wsrep_args.data_dir = wsrep_data_home_dir; + wsrep_args.node_name = (wsrep_node_name) ? wsrep_node_name : ""; + wsrep_args.node_address = (wsrep_node_address) ? wsrep_node_address : ""; + wsrep_args.node_incoming = wsrep_node_incoming_address; + wsrep_args.options = (wsrep_provider_options) ? + wsrep_provider_options : ""; + wsrep_args.proto_ver = wsrep_max_protocol_version; + + wsrep_args.state_uuid = &local_uuid; + wsrep_args.state_seqno = local_seqno; + + wsrep_args.logger_cb = wsrep_log_cb; + wsrep_args.view_handler_cb = wsrep_view_handler_cb; + wsrep_args.apply_cb = wsrep_apply_cb; + wsrep_args.commit_cb = wsrep_commit_cb; + wsrep_args.sst_donate_cb = wsrep_sst_donate_cb; + wsrep_args.synced_cb = wsrep_synced_cb; + + rcode = wsrep->init(wsrep, &wsrep_args); + + if (rcode) + { + DBUG_PRINT("wsrep",("wsrep::init() failed: %d", rcode)); + WSREP_ERROR("wsrep::init() failed: %d, must shutdown", rcode); + free(wsrep); + wsrep = NULL; + } + + return rcode; +} + +extern "C" int wsrep_on(void *); + +void wsrep_init_startup (bool first) +{ + if (wsrep_init()) unireg_abort(1); + + wsrep_thr_lock_init(wsrep_thd_is_brute_force, wsrep_abort_thd, + wsrep_debug, wsrep_convert_LOCK_to_trx, wsrep_on); + + /* Skip replication start if no cluster address */ + if (!wsrep_cluster_address || strlen(wsrep_cluster_address) == 0) return; + + if (first) wsrep_sst_grab(); // do it so we can wait for SST below + + if (!wsrep_start_replication()) unireg_abort(1); + + wsrep_create_rollbacker(); + wsrep_create_appliers(1); + + if (first && !wsrep_sst_wait()) unireg_abort(1);// wait until SST is completed +} + + +void wsrep_deinit() +{ + wsrep_unload(wsrep); + wsrep= 0; + provider_name[0]= '\0'; + provider_version[0]= '\0'; + provider_vendor[0]= '\0'; +} + +void wsrep_recover() +{ + XID xid; + memset(&xid, 0, sizeof(xid)); + xid.formatID= -1; + wsrep_get_SE_checkpoint(&xid); + char uuid_str[40]; + wsrep_uuid_print(wsrep_xid_uuid(&xid), uuid_str, sizeof(uuid_str)); + WSREP_INFO("Recovered position: %s:%lld", uuid_str, + (long long)wsrep_xid_seqno(&xid)); +} + + +void wsrep_stop_replication(THD *thd) +{ + WSREP_INFO("Stop replication"); + if (!wsrep) + { + WSREP_INFO("Provider was not loaded, in stop replication"); + return; + } + + /* disconnect from group first to get wsrep_ready == FALSE */ + WSREP_DEBUG("Provider disconnect"); + wsrep->disconnect(wsrep); + + wsrep_connected= FALSE; + + wsrep_close_client_connections(TRUE); + + /* wait until appliers have stopped */ + wsrep_wait_appliers_close(thd); + + return; +} + + +bool wsrep_start_replication() +{ + wsrep_status_t rcode; + + /* + if provider is trivial, don't even try to connect, + but resume local node operation + */ + if (strlen(wsrep_provider)== 0 || + !strcmp(wsrep_provider, WSREP_NONE)) + { + // enable normal operation in case no provider is specified + wsrep_ready = TRUE; + return true; + } + + if (!wsrep_cluster_address || strlen(wsrep_cluster_address)== 0) + { + // if provider is non-trivial, but no address is specified, wait for address + wsrep_ready = FALSE; + return true; + } + + WSREP_INFO("Start replication"); + + if ((rcode = wsrep->connect(wsrep, + wsrep_cluster_name, + wsrep_cluster_address, + wsrep_sst_donor))) + { + if (-ESOCKTNOSUPPORT == rcode) + { + DBUG_PRINT("wsrep",("unrecognized cluster address: '%s', rcode: %d", + wsrep_cluster_address, rcode)); + WSREP_ERROR("unrecognized cluster address: '%s', rcode: %d", + wsrep_cluster_address, rcode); + } + else + { + DBUG_PRINT("wsrep",("wsrep->connect() failed: %d", rcode)); + WSREP_ERROR("wsrep::connect() failed: %d", rcode); + } + + return false; + } + else + { + wsrep_connected= TRUE; + + uint64_t caps = wsrep->capabilities (wsrep); + + wsrep_incremental_data_collection = + (caps & WSREP_CAP_WRITE_SET_INCREMENTS); + + char* opts= wsrep->options_get(wsrep); + if (opts) + { + wsrep_provider_options_init(opts); + free(opts); + } + else + { + WSREP_WARN("Failed to get wsrep options"); + } + } + + return true; +} + +bool +wsrep_causal_wait (THD* thd) +{ + if (thd->variables.wsrep_causal_reads && thd->variables.wsrep_on && + !thd->in_active_multi_stmt_transaction()) + { + // This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0 + // TODO: modify to check if thd has locked any rows. + wsrep_seqno_t seqno; + wsrep_status_t ret= wsrep->causal_read (wsrep, &seqno); + + if (unlikely(WSREP_OK != ret)) + { + const char* msg; + int err; + + // Possibly relevant error codes: + // ER_CHECKREAD, ER_ERROR_ON_READ, ER_INVALID_DEFAULT, ER_EMPTY_QUERY, + // ER_FUNCTION_NOT_DEFINED, ER_NOT_ALLOWED_COMMAND, ER_NOT_SUPPORTED_YET, + // ER_FEATURE_DISABLED, ER_QUERY_INTERRUPTED + + switch (ret) + { + case WSREP_NOT_IMPLEMENTED: + msg= "consistent reads by wsrep backend. " + "Please unset wsrep_causal_reads variable."; + err= ER_NOT_SUPPORTED_YET; + break; + default: + msg= "Causal wait failed."; + err= ER_ERROR_ON_READ; + } + + my_error(err, MYF(0), msg); + + return true; + } + } + + return false; +} + +bool wsrep_prepare_key_for_isolation(const char* db, + const char* table, + wsrep_key_part_t* key, + size_t* key_len) +{ + if (*key_len < 2) return false; + + switch (wsrep_protocol_version) + { + case 0: + *key_len= 0; + break; + case 1: + { + *key_len= 0; + if (db) + { + // sql_print_information("%s.%s", db, table); + if (db) + { + key[*key_len].buf= db; + key[*key_len].buf_len= strlen(db); + ++(*key_len); + if (table) + { + key[*key_len].buf= table; + key[*key_len].buf_len= strlen(table); + ++(*key_len); + } + } + } + break; + } + default: + return false; + } + + return true; +} + +bool wsrep_prepare_key_for_innodb(const uchar* cache_key, + size_t cache_key_len, + const uchar* row_id, + size_t row_id_len, + wsrep_key_part_t* key, + size_t* key_len) +{ + if (*key_len < 3) return false; + + *key_len= 0; + switch (wsrep_protocol_version) + { + case 0: + { + key[*key_len].buf = cache_key; + key[*key_len].buf_len = cache_key_len; + ++(*key_len); + break; + } + case 1: + { + key[*key_len].buf = cache_key; + key[*key_len].buf_len = strlen( (char*)cache_key ); + ++(*key_len); + key[*key_len].buf = cache_key + strlen( (char*)cache_key ) + 1; + key[*key_len].buf_len = strlen( (char*)(key[*key_len].buf) ); + ++(*key_len); + break; + } + default: + return false; + } + + key[*key_len].buf = row_id; + key[*key_len].buf_len = row_id_len; + ++(*key_len); + + return true; +} + +/* + * Construct Query_log_Event from thd query and serialize it + * into buffer. + * + * Return 0 in case of success, 1 in case of error. + */ +int wsrep_to_buf_helper( + THD* thd, const char *query, uint query_len, uchar** buf, uint* buf_len) +{ + IO_CACHE tmp_io_cache; + if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX, + 65536, MYF(MY_WME))) + return 1; + Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0); + int ret(0); + if (ev.write(&tmp_io_cache)) ret= 1; + if (!ret && wsrep_write_cache(&tmp_io_cache, buf, buf_len)) ret= 1; + close_cached_file(&tmp_io_cache); + return ret; +} + +#include "sql_show.h" +static int +create_view_query(THD *thd, uchar** buf, uint* buf_len) +{ + LEX *lex= thd->lex; + SELECT_LEX *select_lex= &lex->select_lex; + TABLE_LIST *first_table= select_lex->table_list.first; + TABLE_LIST *views = first_table; + + String buff; + const LEX_STRING command[3]= + {{ C_STRING_WITH_LEN("CREATE ") }, + { C_STRING_WITH_LEN("ALTER ") }, + { C_STRING_WITH_LEN("CREATE OR REPLACE ") }}; + + buff.append(command[thd->lex->create_view_mode].str, + command[thd->lex->create_view_mode].length); + + if (!lex->definer) + { + /* + DEFINER-clause is missing; we have to create default definer in + persistent arena to be PS/SP friendly. + If this is an ALTER VIEW then the current user should be set as + the definer. + */ + + if (!(lex->definer= create_default_definer(thd))) + { + WSREP_WARN("view default definer issue"); + } + } + + views->algorithm = lex->create_view_algorithm; + views->definer.user = lex->definer->user; + views->definer.host = lex->definer->host; + views->view_suid = lex->create_view_suid; + views->with_check = lex->create_view_check; + + view_store_options(thd, views, &buff); + buff.append(STRING_WITH_LEN("VIEW ")); + /* Test if user supplied a db (ie: we did not use thd->db) */ + if (views->db && views->db[0] && + (thd->db == NULL || strcmp(views->db, thd->db))) + { + append_identifier(thd, &buff, views->db, + views->db_length); + buff.append('.'); + } + append_identifier(thd, &buff, views->table_name, + views->table_name_length); + if (lex->view_list.elements) + { + List_iterator_fast<LEX_STRING> names(lex->view_list); + LEX_STRING *name; + int i; + + for (i= 0; (name= names++); i++) + { + buff.append(i ? ", " : "("); + append_identifier(thd, &buff, name->str, name->length); + } + buff.append(')'); + } + buff.append(STRING_WITH_LEN(" AS ")); + //buff.append(views->source.str, views->source.length); + buff.append(thd->lex->create_view_select.str, + thd->lex->create_view_select.length); + //int errcode= query_error_code(thd, TRUE); + //if (thd->binlog_query(THD::STMT_QUERY_TYPE, + // buff.ptr(), buff.length(), FALSE, FALSE, FALSE, errcod + return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len); +} + +static int wsrep_TOI_begin(THD *thd, char *db_, char *table_) +{ + wsrep_status_t ret(WSREP_WARNING); + uchar* buf(0); + uint buf_len(0); + int buf_err; + + wsrep_key_part_t wkey_part[2]; + wsrep_key_t wkey = {wkey_part, 2}; + WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, + thd->wsrep_exec_mode, thd->query() ); + switch (thd->lex->sql_command) + { + case SQLCOM_CREATE_VIEW: + buf_err= create_view_query(thd, &buf, &buf_len); + break; + case SQLCOM_CREATE_PROCEDURE: + case SQLCOM_CREATE_SPFUNCTION: + buf_err= wsrep_create_sp(thd, &buf, &buf_len); + break; + case SQLCOM_CREATE_TRIGGER: + buf_err= wsrep_create_trigger_query(thd, &buf, &buf_len); + break; + case SQLCOM_CREATE_EVENT: + buf_err= wsrep_create_event_query(thd, &buf, &buf_len); + break; + default: + buf_err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(), &buf, + &buf_len); + break; + } + + if (!buf_err && + wsrep_prepare_key_for_isolation(db_, table_, wkey_part, + &wkey.key_parts_len) && + WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id, + &wkey, 1, + buf, buf_len, + &thd->wsrep_trx_seqno))) + { + thd->wsrep_exec_mode= TOTAL_ORDER; + wsrep_to_isolation++; + if (buf) my_free(buf); + WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)thd->wsrep_trx_seqno, + thd->wsrep_exec_mode); + } + else { + /* jump to error handler in mysql_execute_command() */ + WSREP_WARN("TO isolation failed for: %d, sql: %s. Check wsrep " + "connection state and retry the query.", + ret, (thd->query()) ? thd->query() : "void"); + my_error(ER_LOCK_DEADLOCK, MYF(0), "WSREP replication failed. Check " + "your wsrep connection state and retry the query."); + if (buf) my_free(buf); + return -1; + } + return 0; +} + +static void wsrep_TOI_end(THD *thd) { + wsrep_status_t ret; + wsrep_to_isolation--; + WSREP_DEBUG("TO END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, + thd->wsrep_exec_mode, (thd->query()) ? thd->query() : "void") + if (WSREP_OK == (ret = wsrep->to_execute_end(wsrep, thd->thread_id))) { + WSREP_DEBUG("TO END: %lld", (long long)thd->wsrep_trx_seqno); + } + else { + WSREP_WARN("TO isolation end failed for: %d, sql: %s", + ret, (thd->query()) ? thd->query() : "void"); + } +} + +static int wsrep_RSU_begin(THD *thd, char *db_, char *table_) +{ + wsrep_status_t ret(WSREP_WARNING); + WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, + thd->wsrep_exec_mode, thd->query() ); + + ret = wsrep->desync(wsrep); + if (ret != WSREP_OK) + { + WSREP_WARN("desync failed %d for %s", ret, thd->query()); + return(ret); + } + wsrep_seqno_t seqno = wsrep->pause(wsrep); + if (seqno == WSREP_SEQNO_UNDEFINED) + { + WSREP_WARN("pause failed %lld for %s", (long long)seqno, thd->query()); + return(1); + } + WSREP_DEBUG("paused at %lld", (long long)seqno); + thd->variables.wsrep_on = 0; + return 0; +} + +static void wsrep_RSU_end(THD *thd) +{ + wsrep_status_t ret(WSREP_WARNING); + WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, + thd->wsrep_exec_mode, thd->query() ); + + ret = wsrep->resume(wsrep); + if (ret != WSREP_OK) + { + WSREP_WARN("resume failed %d for %s", ret, thd->query()); + } + ret = wsrep->resync(wsrep); + if (ret != WSREP_OK) + { + WSREP_WARN("resync failed %d for %s", ret, thd->query()); + return; + } + thd->variables.wsrep_on = 1; + return; +} + +int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_) +{ + int ret= 0; + if (thd->variables.wsrep_on && thd->wsrep_exec_mode==LOCAL_STATE) + { + switch (wsrep_OSU_method_options) { + case WSREP_OSU_TOI: ret = wsrep_TOI_begin(thd, db_, table_); break; + case WSREP_OSU_RSU: ret = wsrep_RSU_begin(thd, db_, table_); break; + } + if (!ret) + { + thd->wsrep_exec_mode= TOTAL_ORDER; + } + } + return ret; +} + +void wsrep_to_isolation_end(THD *thd) { + if (thd->wsrep_exec_mode==TOTAL_ORDER) + { + switch(wsrep_OSU_method_options) + { + case WSREP_OSU_TOI: return wsrep_TOI_end(thd); + case WSREP_OSU_RSU: return wsrep_RSU_end(thd); + } + } +} + +#define WSREP_MDL_LOG(severity, msg, req, gra) \ + WSREP_##severity( \ + "%s\n" \ + "request: (%lu \tseqno %lld \tmode %d \tQstate \t%d cmd %d %d \t%s)\n" \ + "granted: (%lu \tseqno %lld \tmode %d \tQstate \t%d cmd %d %d \t%s)", \ + msg, \ + req->thread_id, (long long)req->wsrep_trx_seqno, \ + req->wsrep_exec_mode, req->wsrep_query_state, \ + req->command, req->lex->sql_command, req->query(), \ + gra->thread_id, (long long)gra->wsrep_trx_seqno, \ + gra->wsrep_exec_mode, gra->wsrep_query_state, \ + gra->command, gra->lex->sql_command, gra->query()); + +bool +wsrep_grant_mdl_exception(MDL_context *requestor_ctx, + MDL_ticket *ticket +) { + if (!WSREP_ON) return FALSE; + + THD *request_thd = requestor_ctx->get_thd(); + THD *granted_thd = ticket->get_ctx()->get_thd(); + + mysql_mutex_lock(&request_thd->LOCK_wsrep_thd); + if (request_thd->wsrep_exec_mode == TOTAL_ORDER || + request_thd->wsrep_exec_mode == REPL_RECV) + { + mysql_mutex_unlock(&request_thd->LOCK_wsrep_thd); + WSREP_MDL_LOG(DEBUG, "MDL conflict", request_thd, granted_thd); + + mysql_mutex_lock(&granted_thd->LOCK_wsrep_thd); + if (granted_thd->wsrep_exec_mode == TOTAL_ORDER || + granted_thd->wsrep_exec_mode == REPL_RECV) + { + WSREP_MDL_LOG(INFO, "MDL BF-BF conflict", request_thd, granted_thd); + mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd); + return TRUE; + } + else if (granted_thd->lex->sql_command == SQLCOM_FLUSH) + { + WSREP_DEBUG("mdl granted over FLUSH BF"); + mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd); + return TRUE; + } + else if (request_thd->lex->sql_command == SQLCOM_DROP_TABLE) + { + WSREP_DEBUG("DROP caused BF abort"); + mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd); + wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1); + return FALSE; + } + else if (granted_thd->wsrep_query_state == QUERY_COMMITTING) + { + WSREP_DEBUG("mdl granted, but commiting thd abort scheduled"); + mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd); + wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1); + return FALSE; + } + else + { + WSREP_MDL_LOG(INFO, "MDL conflict -> BF abort", request_thd, granted_thd); + mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd); + wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1); + return FALSE; + } + } + else + { + mysql_mutex_unlock(&request_thd->LOCK_wsrep_thd); + } + return FALSE; +} diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h new file mode 100644 index 00000000000..90498c266ee --- /dev/null +++ b/sql/wsrep_mysqld.h @@ -0,0 +1,297 @@ +/* Copyright 2008 Codership Oy <http://www.codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef WSREP_MYSQLD_H +#define WSREP_MYSQLD_H + +#include "mysqld.h" +typedef struct st_mysql_show_var SHOW_VAR; +//#include <mysql.h> +#include <sql_priv.h> +#include "../wsrep/wsrep_api.h" +//#include <sql_class.h> + +class set_var; +class THD; + +// Global wsrep parameters +extern wsrep_t* wsrep; + +// MySQL wsrep options +extern const char* wsrep_provider; +extern const char* wsrep_provider_options; +extern const char* wsrep_cluster_name; +extern const char* wsrep_cluster_address; +extern const char* wsrep_node_name; +extern const char* wsrep_node_address; +extern const char* wsrep_node_incoming_address; +extern const char* wsrep_data_home_dir; +extern const char* wsrep_dbug_option; +extern long wsrep_slave_threads; +extern my_bool wsrep_debug; +extern my_bool wsrep_convert_LOCK_to_trx; +extern ulong wsrep_retry_autocommit; +extern my_bool wsrep_auto_increment_control; +extern my_bool wsrep_drupal_282555_workaround; +extern my_bool wsrep_incremental_data_collection; +extern const char* wsrep_sst_method; +extern const char* wsrep_sst_receive_address; +extern char* wsrep_sst_auth; +extern const char* wsrep_sst_donor; +extern const char* wsrep_start_position; +extern long long wsrep_max_ws_size; +extern long wsrep_max_ws_rows; +extern const char* wsrep_notify_cmd; +extern my_bool wsrep_certify_nonPK; +extern long wsrep_max_protocol_version; +extern long wsrep_protocol_version; +extern ulong wsrep_forced_binlog_format; +extern ulong wsrep_OSU_method_options; +extern my_bool wsrep_recovery; + +enum enum_wsrep_OSU_method { WSREP_OSU_TOI, WSREP_OSU_RSU }; + +// MySQL status variables +extern my_bool wsrep_connected; +extern my_bool wsrep_ready; +extern const char* wsrep_cluster_state_uuid; +extern long long wsrep_cluster_conf_id; +extern const char* wsrep_cluster_status; +extern long wsrep_cluster_size; +extern long wsrep_local_index; +extern const char* wsrep_provider_name; +extern const char* wsrep_provider_version; +extern const char* wsrep_provider_vendor; +extern int wsrep_show_status(THD *thd, SHOW_VAR *var, char *buff); + +#define WSREP_SST_ADDRESS_AUTO "AUTO" +// MySQL variables funcs + +#define CHECK_ARGS (sys_var *self, THD* thd, set_var *var) +#define UPDATE_ARGS (sys_var *self, THD* thd, enum_var_type type) +#define DEFAULT_ARGS (THD* thd, enum_var_type var_type) +#define INIT_ARGS (const char* opt) + +extern int wsrep_init_vars(); + +extern bool wsrep_on_update UPDATE_ARGS; +extern void wsrep_causal_reads_update UPDATE_ARGS; +extern bool wsrep_start_position_check CHECK_ARGS; +extern bool wsrep_start_position_update UPDATE_ARGS; +extern void wsrep_start_position_init INIT_ARGS; + +extern bool wsrep_provider_check CHECK_ARGS; +extern bool wsrep_provider_update UPDATE_ARGS; +extern void wsrep_provider_init INIT_ARGS; + +extern bool wsrep_provider_options_check CHECK_ARGS; +extern bool wsrep_provider_options_update UPDATE_ARGS; +extern void wsrep_provider_options_init INIT_ARGS; + +extern bool wsrep_cluster_address_check CHECK_ARGS; +extern bool wsrep_cluster_address_update UPDATE_ARGS; +extern void wsrep_cluster_address_init INIT_ARGS; + +extern bool wsrep_cluster_name_check CHECK_ARGS; +extern bool wsrep_cluster_name_update UPDATE_ARGS; + +extern bool wsrep_node_name_check CHECK_ARGS; +extern bool wsrep_node_name_update UPDATE_ARGS; + +extern bool wsrep_node_address_check CHECK_ARGS; +extern bool wsrep_node_address_update UPDATE_ARGS; +extern void wsrep_node_address_init INIT_ARGS; + +extern bool wsrep_sst_method_check CHECK_ARGS; +extern bool wsrep_sst_method_update UPDATE_ARGS; +extern void wsrep_sst_method_init INIT_ARGS; + +extern bool wsrep_sst_receive_address_check CHECK_ARGS; +extern bool wsrep_sst_receive_address_update UPDATE_ARGS; + +extern bool wsrep_sst_auth_check CHECK_ARGS; +extern bool wsrep_sst_auth_update UPDATE_ARGS; +extern void wsrep_sst_auth_init INIT_ARGS; + +extern bool wsrep_sst_donor_check CHECK_ARGS; +extern bool wsrep_sst_donor_update UPDATE_ARGS; + + +extern bool wsrep_init_first(); // initialize wsrep before storage + // engines or after +extern int wsrep_init(); +extern void wsrep_deinit(); +extern void wsrep_recover(); + +/* wsrep initialization sequence at startup + * @param first wsrep_init_first() value */ +extern void wsrep_init_startup(bool first); + +extern void wsrep_close_client_connections(my_bool wait_to_end); +extern void wsrep_close_applier(THD *thd); +extern void wsrep_wait_appliers_close(THD *thd); +extern void wsrep_create_appliers(long threads = wsrep_slave_threads); +extern void wsrep_create_rollbacker(); +extern void wsrep_kill_mysql(THD *thd); + +/* new defines */ +extern void wsrep_stop_replication(THD *thd); +extern bool wsrep_start_replication(); +extern bool wsrep_causal_wait(THD* thd); +extern int wsrep_check_opts (int argc, char* const* argv); +extern void wsrep_prepend_PATH (const char* path); + +/* Other global variables */ +extern wsrep_seqno_t wsrep_locked_seqno; + +#define WSREP_ON \ + (global_system_variables.wsrep_on) + +#define WSREP(thd) \ + (WSREP_ON && (thd && thd->variables.wsrep_on)) + +#define WSREP_EMULATE_BINLOG(thd) \ + (WSREP(thd) && wsrep_emulate_bin_log) + +// MySQL logging functions don't seem to understand long long length modifer. +// This is a workaround. It also prefixes all messages with "WSREP" +#define WSREP_LOG(fun, ...) \ + { \ + char msg[256] = {'\0'}; \ + snprintf(msg, sizeof(msg) - 1, ## __VA_ARGS__); \ + fun("WSREP: %s", msg); \ + } + +#define WSREP_DEBUG(...) \ + if (wsrep_debug) WSREP_LOG(sql_print_information, ##__VA_ARGS__) +#define WSREP_INFO(...) WSREP_LOG(sql_print_information, ##__VA_ARGS__) +#define WSREP_WARN(...) WSREP_LOG(sql_print_warning, ##__VA_ARGS__) +#define WSREP_ERROR(...) WSREP_LOG(sql_print_error, ##__VA_ARGS__) + +/*! Synchronizes applier thread start with init thread */ +extern void wsrep_sst_grab(); +/*! Init thread waits for SST completion */ +extern bool wsrep_sst_wait(); +/*! Signals wsrep that initialization is complete, writesets can be applied */ +extern void wsrep_sst_continue(); + +extern void wsrep_SE_init_grab(); /*! grab init critical section */ +extern void wsrep_SE_init_wait(); /*! wait for SE init to complete */ +extern void wsrep_SE_init_done(); /*! signal that SE init is complte */ +extern void wsrep_SE_initialized(); /*! mark SE initialization complete */ + +extern void wsrep_ready_wait(); + +enum wsrep_trx_status { + WSREP_TRX_OK, + WSREP_TRX_ROLLBACK, + WSREP_TRX_ERROR, + }; + +extern enum wsrep_trx_status +wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all); +class Ha_trx_info; +struct THD_TRANS; +void wsrep_register_hton(THD* thd, bool all); + +/*! + * @param db Database string + * @param table Table string + * @param key Array of wsrep_key_t + * @param key_len In: number of elements in key array, Out: number of + * elements populated + * + * @return true if preparation was successful, otherwise false. + */ +bool wsrep_prepare_key_for_isolation(const char* db, + const char* table, + wsrep_key_part_t* key, + size_t *key_len); + +void wsrep_replication_process(THD *thd); +void wsrep_rollback_process(THD *thd); +void wsrep_brute_force_killer(THD *thd); +int wsrep_hire_brute_force_killer(THD *thd, uint64_t trx_id); +extern "C" bool wsrep_consistency_check(void *thd_ptr); +extern "C" int wsrep_thd_is_brute_force(void *thd_ptr); +extern "C" int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, + my_bool signal); +extern "C" int wsrep_thd_in_locking_session(void *thd_ptr); +void *wsrep_prepare_bf_thd(THD *thd); +void wsrep_return_from_bf_mode(void *shadow, THD *thd); + +/* this is visible for client build so that innodb plugin gets this */ +typedef struct wsrep_aborting_thd { + struct wsrep_aborting_thd *next; + THD *aborting_thd; +} *wsrep_aborting_thd_t; + +extern mysql_mutex_t LOCK_wsrep_ready; +extern mysql_cond_t COND_wsrep_ready; +extern mysql_mutex_t LOCK_wsrep_sst; +extern mysql_cond_t COND_wsrep_sst; +extern mysql_mutex_t LOCK_wsrep_sst_init; +extern mysql_cond_t COND_wsrep_sst_init; +extern mysql_mutex_t LOCK_wsrep_rollback; +extern mysql_cond_t COND_wsrep_rollback; +extern int wsrep_replaying; +extern mysql_mutex_t LOCK_wsrep_replaying; +extern mysql_cond_t COND_wsrep_replaying; +extern wsrep_aborting_thd_t wsrep_aborting_thd; +extern MYSQL_PLUGIN_IMPORT my_bool wsrep_debug; +extern my_bool wsrep_convert_LOCK_to_trx; +extern ulong wsrep_retry_autocommit; +extern my_bool wsrep_emulate_bin_log; +extern my_bool wsrep_auto_increment_control; +extern my_bool wsrep_drupal_282555_workaround; +extern long long wsrep_max_ws_size; +extern long wsrep_max_ws_rows; +extern int wsrep_to_isolation; +extern my_bool wsrep_certify_nonPK; + +extern PSI_mutex_key key_LOCK_wsrep_ready; +extern PSI_mutex_key key_COND_wsrep_ready; +extern PSI_mutex_key key_LOCK_wsrep_sst; +extern PSI_cond_key key_COND_wsrep_sst; +extern PSI_mutex_key key_LOCK_wsrep_sst_init; +extern PSI_cond_key key_COND_wsrep_sst_init; +extern PSI_mutex_key key_LOCK_wsrep_sst_thread; +extern PSI_cond_key key_COND_wsrep_sst_thread; +extern PSI_mutex_key key_LOCK_wsrep_rollback; +extern PSI_cond_key key_COND_wsrep_rollback; +extern PSI_mutex_key key_LOCK_wsrep_replaying; +extern PSI_cond_key key_COND_wsrep_replaying; + +int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_); +void wsrep_to_isolation_end(THD *thd); + +void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow*); +void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow*); +int wsrep_to_buf_helper( + THD* thd, const char *query, uint query_len, uchar** buf, uint* buf_len); +int wsrep_create_sp(THD *thd, uchar** buf, uint* buf_len); +int wsrep_create_trigger_query(THD *thd, uchar** buf, uint* buf_len); +int wsrep_create_event_query(THD *thd, uchar** buf, uint* buf_len); + +const wsrep_uuid_t* wsrep_cluster_uuid(); +struct xid_t; +void wsrep_set_SE_checkpoint(xid_t*); + +void wsrep_xid_init(xid_t*, const wsrep_uuid_t*, wsrep_seqno_t); +const wsrep_uuid_t* wsrep_xid_uuid(const xid_t*); +wsrep_seqno_t wsrep_xid_seqno(const xid_t*); +extern "C" int wsrep_is_wsrep_xid(const void* xid); + +#endif /* WSREP_MYSQLD_H */ diff --git a/sql/wsrep_notify.cc b/sql/wsrep_notify.cc new file mode 100644 index 00000000000..ff997d01183 --- /dev/null +++ b/sql/wsrep_notify.cc @@ -0,0 +1,107 @@ +/* Copyright 2010 Codership Oy <http://www.codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include <mysqld.h> +#include "wsrep_priv.h" + +const char* wsrep_notify_cmd=""; + +static const char* _status_str(wsrep_member_status_t status) +{ + switch (status) + { + case WSREP_MEMBER_UNDEFINED: return "Undefined"; + case WSREP_MEMBER_JOINER: return "Joiner"; + case WSREP_MEMBER_DONOR: return "Donor"; + case WSREP_MEMBER_JOINED: return "Joined"; + case WSREP_MEMBER_SYNCED: return "Synced"; + default: return "Error(?)"; + } +} + +void wsrep_notify_status (wsrep_member_status_t status, + const wsrep_view_info_t* view) +{ + if (!wsrep_notify_cmd || 0 == strlen(wsrep_notify_cmd)) + { + WSREP_INFO("wsrep_notify_cmd is not defined, skipping notification."); + return; + } + + char cmd_buf[1 << 16]; // this can be long + long cmd_len = sizeof(cmd_buf) - 1; + char* cmd_ptr = cmd_buf; + long cmd_off = 0; + + cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, "%s", + wsrep_notify_cmd); + + if (status >= WSREP_MEMBER_UNDEFINED && status < WSREP_MEMBER_ERROR) + { + cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, " --status %s", + _status_str(status)); + } + else + { + /* here we preserve provider error codes */ + cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, + " --status 'Error(%d)'", status); + } + + if (0 != view) + { + char uuid_str[40]; + + wsrep_uuid_print (&view->uuid, uuid_str, sizeof(uuid_str)); + cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, + " --uuid %s", uuid_str); + + cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, + " --primary %s", view->view >= 0 ? "yes" : "no"); + + cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, + " --index %d", view->my_idx); + + cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, " --members"); + + for (int i = 0; i < view->memb_num; i++) + { + wsrep_uuid_print (&view->members[i].id, uuid_str, sizeof(uuid_str)); + cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, + "%c%s/%s/%s", i > 0 ? ',' : ' ', + uuid_str, view->members[i].name, + view->members[i].incoming); + } + } + + if (cmd_off == cmd_len) + { + WSREP_ERROR("Notification buffer too short (%ld). Aborting notification.", + cmd_len); + return; + } + + wsp::process p(cmd_ptr, "r"); + + p.wait(); + int err = p.error(); + + if (err) + { + WSREP_ERROR("Notification command failed: %d (%s): \"%s\"", + err, strerror(err), cmd_ptr); + } +} + diff --git a/sql/wsrep_priv.h b/sql/wsrep_priv.h new file mode 100644 index 00000000000..4db8abf68de --- /dev/null +++ b/sql/wsrep_priv.h @@ -0,0 +1,231 @@ +/* Copyright 2010 Codership Oy <http://www.codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +//! @file declares symbols private to wsrep integration layer + +#ifndef WSREP_PRIV_H +#define WSREP_PRIV_H + +#include "wsrep_mysqld.h" +#include "../wsrep/wsrep_api.h" + +#include <log.h> +#include <pthread.h> +#include <cstdio> + +extern ssize_t wsrep_sst_prepare (void** msg); +extern int wsrep_sst_donate_cb (void* app_ctx, + void* recv_ctx, + const void* msg, size_t msg_len, + const wsrep_uuid_t* current_uuid, + wsrep_seqno_t current_seqno, + const char* state, size_t state_len, + bool bypass); + +extern size_t default_ip (char* buf, size_t buf_len); +extern size_t default_address(char* buf, size_t buf_len); + +extern wsrep_uuid_t local_uuid; +extern wsrep_seqno_t local_seqno; + +/*! SST thread signals init thread about sst completion */ +extern void wsrep_sst_complete(wsrep_uuid_t* uuid, wsrep_seqno_t seqno, bool); + +extern void wsrep_notify_status (wsrep_member_status_t new_status, + const wsrep_view_info_t* view = 0); + +namespace wsp { +class node_status +{ +public: + node_status() : status(WSREP_MEMBER_UNDEFINED) {} + void set(wsrep_member_status_t new_status, + const wsrep_view_info_t* view = 0) + { + if (status != new_status || 0 != view) + { + wsrep_notify_status(new_status, view); + status = new_status; + } + } + wsrep_member_status_t get() const { return status; } +private: + wsrep_member_status_t status; +}; +} /* namespace wsp */ + +extern wsp::node_status local_status; + +namespace wsp { +/* A small class to run external programs. */ +class process +{ +private: + const char* const str_; + FILE* io_; + int err_; + pid_t pid_; + +public: +/*! @arg type is a pointer to a null-terminated string which must contain + either the letter 'r' for reading or the letter 'w' for writing. + */ + process (const char* cmd, const char* type); + ~process (); + + FILE* pipe () { return io_; } + int error() { return err_; } + int wait (); + const char* cmd() { return str_; } +}; +#ifdef REMOVED +class lock +{ + pthread_mutex_t* const mtx_; + +public: + + lock (pthread_mutex_t* mtx) : mtx_(mtx) + { + int err = pthread_mutex_lock (mtx_); + + if (err) + { + WSREP_ERROR("Mutex lock failed: %s", strerror(err)); + abort(); + } + } + + virtual ~lock () + { + int err = pthread_mutex_unlock (mtx_); + + if (err) + { + WSREP_ERROR("Mutex unlock failed: %s", strerror(err)); + abort(); + } + } + + inline void wait (pthread_cond_t* cond) + { + pthread_cond_wait (cond, mtx_); + } + +private: + + lock (const lock&); + lock& operator=(const lock&); + +}; + +class monitor +{ + int mutable refcnt; + pthread_mutex_t mutable mtx; + pthread_cond_t mutable cond; + +public: + + monitor() : refcnt(0) + { + pthread_mutex_init (&mtx, NULL); + pthread_cond_init (&cond, NULL); + } + + ~monitor() + { + pthread_mutex_destroy (&mtx); + pthread_cond_destroy (&cond); + } + + void enter() const + { + lock l(&mtx); + + while (refcnt) + { + l.wait(&cond); + } + refcnt++; + } + + void leave() const + { + lock l(&mtx); + + refcnt--; + if (refcnt == 0) + { + pthread_cond_signal (&cond); + } + } + +private: + + monitor (const monitor&); + monitor& operator= (const monitor&); +}; + +class critical +{ + const monitor& mon; + +public: + + critical(const monitor& m) : mon(m) { mon.enter(); } + + ~critical() { mon.leave(); } + +private: + + critical (const critical&); + critical& operator= (const critical&); +}; +#endif + +class thd +{ + class thd_init + { + public: + thd_init() { my_thread_init(); } + ~thd_init() { my_thread_end(); } + } + init; + + thd (const thd&); + thd& operator= (const thd&); + +public: + + thd(); + ~thd(); + THD* const ptr; +}; + +class string +{ +public: + string() : string_(0) {} + void set(char* str) { if (string_) free (string_); string_ = str; } + ~string() { set (0); } +private: + char* string_; +}; + +} // namespace wsrep +#endif /* WSREP_PRIV_H */ diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc new file mode 100644 index 00000000000..4ce7fe84b0f --- /dev/null +++ b/sql/wsrep_sst.cc @@ -0,0 +1,932 @@ +/* Copyright 2008-2011 Codership Oy <http://www.codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include <mysqld.h> +#include <sql_class.h> +#include <set_var.h> +#include <sql_acl.h> +#include <sql_reload.h> +#include <sql_parse.h> +#include "wsrep_priv.h" +#include <cstdio> +#include <cstdlib> + +extern const char wsrep_defaults_file[]; + +#define WSREP_SST_MYSQLDUMP "mysqldump" +#define WSREP_SST_DEFAULT WSREP_SST_MYSQLDUMP +#define WSREP_SST_ADDRESS_AUTO "AUTO" +#define WSREP_SST_AUTH_MASK "********" + +const char* wsrep_sst_method = WSREP_SST_DEFAULT; +const char* wsrep_sst_receive_address = WSREP_SST_ADDRESS_AUTO; +const char* wsrep_sst_donor = ""; + char* wsrep_sst_auth = NULL; + +// container for real auth string +static const char* sst_auth_real = NULL; + +static const char *sst_methods[] = { + "mysqldump", + "rsync", + "rsync_wan", + "xtrabackup", + NULL +}; + +bool wsrep_sst_method_check (sys_var *self, THD* thd, set_var* var) +{ + char buff[FN_REFLEN]; + String str(buff, sizeof(buff), system_charset_info), *res; + const char* c_str = NULL; + + if ((res = var->value->val_str(&str))) { + c_str = res->c_ptr(); + int i = 0; + + while (sst_methods[i] && strcasecmp(sst_methods[i], c_str)) i++; + if (!sst_methods[i]) { + my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "wsrep_sst_method", c_str ? c_str : "NULL"); + return 1; + } + } + return 0; +} + +bool wsrep_sst_method_update (sys_var *self, THD* thd, enum_var_type type) +{ + return 0; +} + +static bool sst_receive_address_check (const char* str) +{ + if (!strncasecmp(str, "127.0.0.1", strlen("127.0.0.1")) || + !strncasecmp(str, "localhost", strlen("localhost"))) + { + return 1; + } + + return 0; +} + +bool wsrep_sst_receive_address_check (sys_var *self, THD* thd, set_var* var) +{ + const char* c_str = var->value->str_value.c_ptr(); + + if (sst_receive_address_check (c_str)) + { + my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "wsrep_sst_receive_address", c_str ? c_str : "NULL"); + return 1; + } + + return 0; +} + +bool wsrep_sst_receive_address_update (sys_var *self, THD* thd, + enum_var_type type) +{ + return 0; +} + +bool wsrep_sst_auth_check (sys_var *self, THD* thd, set_var* var) +{ + return 0; +} +static bool sst_auth_real_set (const char* value) +{ + const char* v = strdup (value); + + if (v) + { + if (sst_auth_real) free (const_cast<char*>(sst_auth_real)); + sst_auth_real = v; + + if (strlen(sst_auth_real)) + { + if (wsrep_sst_auth) + { + my_free ((void*)wsrep_sst_auth); + wsrep_sst_auth = my_strdup(WSREP_SST_AUTH_MASK, MYF(0)); + //strncpy (wsrep_sst_auth, WSREP_SST_AUTH_MASK, + // sizeof(wsrep_sst_auth) - 1); + } + else + wsrep_sst_auth = my_strdup (WSREP_SST_AUTH_MASK, MYF(0)); + } + return 0; + } + + return 1; +} + +bool wsrep_sst_auth_update (sys_var *self, THD* thd, enum_var_type type) +{ + return sst_auth_real_set (wsrep_sst_auth); +} + +void wsrep_sst_auth_init (const char* value) +{ + if (wsrep_sst_auth == value) wsrep_sst_auth = NULL; + if (value) sst_auth_real_set (value); +} + +bool wsrep_sst_donor_check (sys_var *self, THD* thd, set_var* var) +{ + return 0; +} + +bool wsrep_sst_donor_update (sys_var *self, THD* thd, enum_var_type type) +{ + return 0; +} + +static wsrep_uuid_t cluster_uuid = WSREP_UUID_UNDEFINED; + +bool wsrep_init_first() +{ + return (wsrep_provider != NULL + && strcmp (wsrep_provider, WSREP_NONE) + && strcmp (wsrep_sst_method, WSREP_SST_MYSQLDUMP)); +} + +static bool sst_complete = false; +static bool sst_needed = false; + +void wsrep_sst_grab () +{ + WSREP_INFO("wsrep_sst_grab()"); + if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort(); + sst_complete = false; + mysql_mutex_unlock (&LOCK_wsrep_sst); +} + +// Wait for end of SST +bool wsrep_sst_wait () +{ + if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort(); + while (!sst_complete) + { + WSREP_INFO("Waiting for SST to complete."); + mysql_cond_wait (&COND_wsrep_sst, &LOCK_wsrep_sst); + } + + if (local_seqno >= 0) + { + WSREP_INFO("SST complete, seqno: %lld", (long long) local_seqno); + } + else + { + WSREP_ERROR("SST failed: %d (%s)", + int(-local_seqno), strerror(-local_seqno)); + } + + mysql_mutex_unlock (&LOCK_wsrep_sst); + + return (local_seqno >= 0); +} + +// Signal end of SST +void wsrep_sst_complete (wsrep_uuid_t* sst_uuid, + wsrep_seqno_t sst_seqno, + bool needed) +{ + if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort(); + if (!sst_complete) + { + sst_complete = true; + sst_needed = needed; + local_uuid = *sst_uuid; + local_seqno = sst_seqno; + mysql_cond_signal (&COND_wsrep_sst); + } + else + { + WSREP_WARN("Nobody is waiting for SST."); + } + mysql_mutex_unlock (&LOCK_wsrep_sst); +} + +// Let applier threads to continue +void wsrep_sst_continue () +{ + if (sst_needed) + { + WSREP_INFO("Signalling provider to continue."); + wsrep->sst_received (wsrep, &local_uuid, local_seqno, NULL, 0); + } +} + +struct sst_thread_arg +{ + const char* cmd; + int err; + char* ret_str; + mysql_mutex_t lock; + mysql_cond_t cond; + + sst_thread_arg (const char* c) : cmd(c), err(-1), ret_str(0) + { + mysql_mutex_init(key_LOCK_wsrep_sst_thread, + &lock, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wsrep_sst_thread, &cond, NULL); + } + + ~sst_thread_arg() + { + mysql_cond_destroy (&cond); + mysql_mutex_unlock (&lock); + mysql_mutex_destroy (&lock); + } +}; + +static int sst_scan_uuid_seqno (const char* str, + wsrep_uuid_t* uuid, wsrep_seqno_t* seqno) +{ + int offt = wsrep_uuid_scan (str, strlen(str), uuid); + if (offt > 0 && strlen(str) > (unsigned int)offt && ':' == str[offt]) + { + *seqno = strtoll (str + offt + 1, NULL, 10); + if (*seqno != LLONG_MAX || errno != ERANGE) + { + return 0; + } + } + + WSREP_ERROR("Failed to parse uuid:seqno pair: '%s'", str); + return EINVAL; +} + +// get rid of trailing \n +static char* my_fgets (char* buf, size_t buf_len, FILE* stream) +{ + char* ret= fgets (buf, buf_len, stream); + + if (ret) + { + size_t len = strlen(ret); + if (len > 0 && ret[len - 1] == '\n') ret[len - 1] = '\0'; + } + + return ret; +} + +static void* sst_joiner_thread (void* a) +{ + sst_thread_arg* arg= (sst_thread_arg*) a; + int err= 1; + + { + const char magic[] = "ready"; + const size_t magic_len = sizeof(magic) - 1; + const size_t out_len = 512; + char out[out_len]; + + WSREP_INFO("Running: '%s'", arg->cmd); + + wsp::process proc (arg->cmd, "r"); + + if (proc.pipe() && !proc.error()) + { + const char* tmp= my_fgets (out, out_len, proc.pipe()); + + if (!tmp || strlen(tmp) < (magic_len + 2) || + strncasecmp (tmp, magic, magic_len)) + { + WSREP_ERROR("Failed to read '%s <addr>' from: %s\n\tRead: '%s'", + magic, arg->cmd, tmp); + proc.wait(); + if (proc.error()) err = proc.error(); + } + else + { + err = 0; + } + } + else + { + err = proc.error(); + WSREP_ERROR("Failed to execute: %s : %d (%s)", + arg->cmd, err, strerror(err)); + } + + // signal sst_prepare thread with ret code, + // it will go on sending SST request + mysql_mutex_lock (&arg->lock); + if (!err) + { + arg->ret_str = strdup (out + magic_len + 1); + if (!arg->ret_str) err = ENOMEM; + } + arg->err = -err; + mysql_cond_signal (&arg->cond); + mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that. + + if (err) return NULL; /* lp:808417 - return immediately, don't signal + * initializer thread to ensure single thread of + * shutdown. */ + + wsrep_uuid_t ret_uuid = WSREP_UUID_UNDEFINED; + wsrep_seqno_t ret_seqno = WSREP_SEQNO_UNDEFINED; + + // in case of successfull receiver start, wait for SST completion/end + char* tmp = my_fgets (out, out_len, proc.pipe()); + + proc.wait(); + err= EINVAL; + + if (!tmp) + { + WSREP_ERROR("Failed to read uuid:seqno from joiner script."); + if (proc.error()) err = proc.error(); + } + else + { + err= sst_scan_uuid_seqno (out, &ret_uuid, &ret_seqno); + } + + if (err) + { + ret_uuid= WSREP_UUID_UNDEFINED; + ret_seqno= -err; + } + + // Tell initializer thread that SST is complete + wsrep_sst_complete (&ret_uuid, ret_seqno, true); + } + + return NULL; +} + +static ssize_t sst_prepare_other (const char* method, + const char* addr_in, + const char** addr_out) +{ + ssize_t cmd_len= 1024; + char cmd_str[cmd_len]; + const char* sst_dir= mysql_real_data_home; + + int ret= snprintf (cmd_str, cmd_len, + "wsrep_sst_%s 'joiner' '%s' '%s' '%s' '%s' '%d' 2>sst.err", + method, addr_in, (sst_auth_real) ? sst_auth_real : "", + sst_dir, wsrep_defaults_file, (int)getpid()); + + if (ret < 0 || ret >= cmd_len) + { + WSREP_ERROR("sst_prepare_other(): snprintf() failed: %d", ret); + return (ret < 0 ? ret : -EMSGSIZE); + } + + pthread_t tmp; + sst_thread_arg arg(cmd_str); + mysql_mutex_lock (&arg.lock); + pthread_create (&tmp, NULL, sst_joiner_thread, &arg); + mysql_cond_wait (&arg.cond, &arg.lock); + + *addr_out= arg.ret_str; + + if (!arg.err) + ret = strlen(*addr_out); + else + { + assert (arg.err < 0); + ret = arg.err; + } + + pthread_detach (tmp); + + return ret; +} + +//extern ulong my_bind_addr; +extern uint mysqld_port; + +/*! Just tells donor where ti sent mysqldump */ +static ssize_t sst_prepare_mysqldump (const char* addr_in, + const char** addr_out) +{ + ssize_t ret = strlen (addr_in); + + if (!strrchr(addr_in, ':')) + { + ssize_t s = ret + 7; + char* tmp = (char*) malloc (s); + + if (tmp) + { + ret= snprintf (tmp, s, "%s:%u", addr_in, mysqld_port); + + if (ret > 0 && ret < s) + { + *addr_out= tmp; + return ret; + } + if (ret > 0) /* buffer too short */ ret = -EMSGSIZE; + free (tmp); + } + else { + ret= -ENOMEM; + } + + sql_print_error ("WSREP: Could not prepare state transfer request: " + "adding default port failed: %zd.", ret); + } + else { + *addr_out= addr_in; + } + + return ret; +} + +static bool SE_initialized = false; + +ssize_t wsrep_sst_prepare (void** msg) +{ + const ssize_t ip_max= 256; + char ip_buf[ip_max]; + const char* addr_in= NULL; + const char* addr_out= NULL; + + // Figure out SST address. Common for all SST methods + if (wsrep_sst_receive_address && + strcmp (wsrep_sst_receive_address, WSREP_SST_ADDRESS_AUTO)) + { + addr_in= wsrep_sst_receive_address; + } + else if (wsrep_node_address && strlen(wsrep_node_address)) + { + const char* const colon= strchr (wsrep_node_address, ':'); + if (colon) + { + ptrdiff_t const len= colon - wsrep_node_address; + strncpy (ip_buf, wsrep_node_address, len); + ip_buf[len]= '\0'; + addr_in= ip_buf; + } + else + { + addr_in= wsrep_node_address; + } + } + else + { + ssize_t ret= default_ip (ip_buf, ip_max); + + if (ret && ret < ip_max) + { + addr_in= ip_buf; + } + else + { + WSREP_ERROR("Could not prepare state transfer request: " + "failed to guess address to accept state transfer at. " + "wsrep_sst_receive_address must be set manually."); + unireg_abort(1); + } + } + + ssize_t addr_len= -ENOSYS; + if (!strcmp(wsrep_sst_method, WSREP_SST_MYSQLDUMP)) + { + addr_len= sst_prepare_mysqldump (addr_in, &addr_out); + if (addr_len < 0) unireg_abort(1); + } + else + { + /*! A heuristic workaround until we learn how to stop and start engines */ + if (SE_initialized) + { + // we already did SST at initializaiton, now engines are running + // sql_print_information() is here because the message is too long + // for WSREP_INFO. + sql_print_information ("WSREP: " + "You have configured '%s' state snapshot transfer method " + "which cannot be performed on a running server. " + "Wsrep provider won't be able to fall back to it " + "if other means of state transfer are unavailable. " + "In that case you will need to restart the server.", + wsrep_sst_method); + *msg = 0; + return 0; + } + + addr_len = sst_prepare_other (wsrep_sst_method, addr_in, &addr_out); + if (addr_len < 0) + { + WSREP_ERROR("Failed to prepare for '%s' SST. Unrecoverable.", + wsrep_sst_method); + unireg_abort(1); + } + } + + size_t const method_len(strlen(wsrep_sst_method)); + size_t const msg_len (method_len + addr_len + 2 /* + auth_len + 1*/); + + *msg = malloc (msg_len); + if (NULL != *msg) { + char* const method_ptr(reinterpret_cast<char*>(*msg)); + strcpy (method_ptr, wsrep_sst_method); + char* const addr_ptr(method_ptr + method_len + 1); + strcpy (addr_ptr, addr_out); + + WSREP_INFO ("Prepared SST request: %s|%s", method_ptr, addr_ptr); + } + else { + WSREP_ERROR("Failed to allocate SST request of size %zu. Can't continue.", + msg_len); + unireg_abort(1); + } + + if (addr_out != addr_in) /* malloc'ed */ free ((char*)addr_out); + + return msg_len; +} + +// helper method for donors +static int sst_run_shell (const char* cmd_str, int max_tries) +{ + int ret = 0; + + for (int tries=1; tries <= max_tries; tries++) + { + wsp::process proc (cmd_str, "r"); + + if (NULL != proc.pipe()) + { + proc.wait(); + } + + if ((ret = proc.error())) + { + WSREP_ERROR("Try %d/%d: '%s' failed: %d (%s)", + tries, max_tries, proc.cmd(), ret, strerror(ret)); + sleep (1); + } + else + { + WSREP_DEBUG("SST script successfully completed."); + break; + } + } + + return -ret; +} + +static int sst_mysqldump_check_addr (const char* user, const char* pswd, + const char* host, const char* port) +{ + return 0; +} + +static int sst_donate_mysqldump (const char* addr, + const wsrep_uuid_t* uuid, + const char* uuid_str, + wsrep_seqno_t seqno, + bool bypass) +{ + size_t host_len; + const char* port = strchr (addr, ':'); + + if (port) + { + port += 1; + host_len = port - addr; + } + else + { + port = ""; + host_len = strlen (addr) + 1; + } + + char host[host_len]; + + strncpy (host, addr, host_len - 1); + host[host_len - 1] = '\0'; + + const char* auth = sst_auth_real; + const char* pswd = (auth) ? strchr (auth, ':') : NULL; + size_t user_len; + + if (pswd) + { + pswd += 1; + user_len = pswd - auth; + } + else + { + pswd = ""; + user_len = (auth) ? strlen (auth) + 1 : 1; + } + + char user[user_len]; + + strncpy (user, (auth) ? auth : "", user_len - 1); + user[user_len - 1] = '\0'; + + int ret = sst_mysqldump_check_addr (user, pswd, host, port); + if (!ret) + { + size_t cmd_len= 1024; + char cmd_str[cmd_len]; + + snprintf (cmd_str, cmd_len, + "wsrep_sst_mysqldump '%s' '%s' '%s' '%s' '%u' '%s' '%lld' '%d'", + user, pswd, host, port, mysqld_port, uuid_str, (long long)seqno, + bypass); + + WSREP_DEBUG("Running: '%s'", cmd_str); + + ret= sst_run_shell (cmd_str, 3); + } + + wsrep->sst_sent (wsrep, uuid, ret ? ret : seqno); + + return ret; +} + +wsrep_seqno_t wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED; + +static int run_sql_command(THD *thd, const char *query) +{ + thd->set_query((char *)query, strlen(query)); + + Parser_state ps; + if (ps.init(thd, thd->query(), thd->query_length())) + { + WSREP_ERROR("SST query: %s failed", query); + return -1; + } + + mysql_parse(thd, thd->query(), thd->query_length(), &ps); + if (thd->is_error()) + { + int const err= thd->stmt_da->sql_errno(); + WSREP_WARN ("error executing '%s': %d (%s)%s", + query, err, thd->stmt_da->message(), + err == ER_UNKNOWN_SYSTEM_VARIABLE ? + ". Was mysqld built with --with-innodb-disallow-writes ?" : ""); + thd->clear_error(); + return -1; + } + return 0; +} + +static int sst_flush_tables(THD* thd) +{ + WSREP_INFO("Flushing tables for SST..."); + + int err; + int not_used; + if (run_sql_command(thd, "FLUSH TABLES WITH READ LOCK")) + { + WSREP_ERROR("Failed to flush and lock tables"); + err = -1; + } + else + { + /* make sure logs are flushed after global read lock acquired */ + err= reload_acl_and_cache(thd, REFRESH_ENGINE_LOG, + (TABLE_LIST*) 0, ¬_used); + } + + if (err) + { + WSREP_ERROR("Failed to flush tables: %d (%s)", err, strerror(err)); + } + else + { + WSREP_INFO("Tables flushed."); + const char base_name[]= "tables_flushed"; + ssize_t const full_len= strlen(mysql_real_data_home) + strlen(base_name)+2; + char real_name[full_len]; + sprintf(real_name, "%s/%s", mysql_real_data_home, base_name); + char tmp_name[full_len + 4]; + sprintf(tmp_name, "%s.tmp", real_name); + + FILE* file= fopen(tmp_name, "w+"); + if (0 == file) + { + err= errno; + WSREP_ERROR("Failed to open '%s': %d (%s)", tmp_name, err,strerror(err)); + } + else + { + fprintf(file, "%s:%lld\n", + wsrep_cluster_state_uuid, (long long)wsrep_locked_seqno); + fsync(fileno(file)); + fclose(file); + if (rename(tmp_name, real_name) == -1) + { + err= errno; + WSREP_ERROR("Failed to rename '%s' to '%s': %d (%s)", + tmp_name, real_name, err,strerror(err)); + } + } + } + + return err; +} + +static void sst_disallow_writes (THD* thd, bool yes) +{ + char query_str[64] = { 0, }; + ssize_t const query_max = sizeof(query_str) - 1; + snprintf (query_str, query_max, "SET GLOBAL innodb_disallow_writes=%d", + yes ? 1 : 0); + + if (run_sql_command(thd, query_str)) + { + WSREP_ERROR("Failed to disallow InnoDB writes"); + } +} + +static void* sst_donor_thread (void* a) +{ + sst_thread_arg* arg= (sst_thread_arg*)a; + + WSREP_INFO("Running: '%s'", arg->cmd); + + int err= 1; + bool locked= false; + + const char* out= NULL; + const size_t out_len= 128; + char out_buf[out_len]; + + wsrep_uuid_t ret_uuid= WSREP_UUID_UNDEFINED; + wsrep_seqno_t ret_seqno= WSREP_SEQNO_UNDEFINED; // seqno of complete SST + + wsp::thd thd; + wsp::process proc(arg->cmd, "r"); + + err= proc.error(); + +/* Inform server about SST script startup and release TO isolation */ + mysql_mutex_lock (&arg->lock); + arg->err = -err; + mysql_cond_signal (&arg->cond); + mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that. + + if (proc.pipe() && !err) + { +wait_signal: + out= my_fgets (out_buf, out_len, proc.pipe()); + + if (out) + { + const char magic_flush[]= "flush tables"; + const char magic_cont[]= "continue"; + const char magic_done[]= "done"; + + if (!strcasecmp (out, magic_flush)) + { + err= sst_flush_tables (thd.ptr); + if (!err) + { + sst_disallow_writes (thd.ptr, true); + locked= true; + goto wait_signal; + } + } + else if (!strcasecmp (out, magic_cont)) + { + if (locked) + { + sst_disallow_writes (thd.ptr, false); + thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr); + locked= false; + } + err= 0; + goto wait_signal; + } + else if (!strncasecmp (out, magic_done, strlen(magic_done))) + { + err= sst_scan_uuid_seqno (out + strlen(magic_done) + 1, + &ret_uuid, &ret_seqno); + } + else + { + WSREP_WARN("Received unknown signal: '%s'", out); + } + } + else + { + WSREP_ERROR("Failed to read from: %s", proc.cmd()); + } + if (err && proc.error()) err= proc.error(); + } + else + { + WSREP_ERROR("Failed to execute: %s : %d (%s)", + proc.cmd(), err, strerror(err)); + } + + if (locked) // don't forget to unlock server before return + { + sst_disallow_writes (thd.ptr, false); + thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr); + } + + // signal to donor that SST is over + wsrep->sst_sent (wsrep, &ret_uuid, err ? -err : ret_seqno); + proc.wait(); + + return NULL; +} + +static int sst_donate_other (const char* method, + const char* addr, + const char* uuid, + wsrep_seqno_t seqno, + bool bypass) +{ + ssize_t cmd_len = 4096; + char cmd_str[cmd_len]; + + int ret= snprintf (cmd_str, cmd_len, + "wsrep_sst_%s 'donor' '%s' '%s' '%s' '%s' '%s' '%lld' '%d'" + , + method, addr, sst_auth_real, mysql_real_data_home, + wsrep_defaults_file, uuid, (long long) seqno, bypass); + + if (ret < 0 || ret >= cmd_len) + { + WSREP_ERROR("sst_donate_other(): snprintf() failed: %d", ret); + return (ret < 0 ? ret : -EMSGSIZE); + } + + pthread_t tmp; + sst_thread_arg arg(cmd_str); + mysql_mutex_lock (&arg.lock); + pthread_create (&tmp, NULL, sst_donor_thread, &arg); + mysql_cond_wait (&arg.cond, &arg.lock); + + WSREP_INFO("sst_donor_thread signaled with %d", arg.err); + return arg.err; +} + +int wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx, + const void* msg, size_t msg_len, + const wsrep_uuid_t* current_uuid, + wsrep_seqno_t current_seqno, + const char* state, size_t state_len, + bool bypass) +{ + /* This will be reset when sync callback is called. + * Should we set wsrep_ready to FALSE here too? */ +// wsrep_notify_status(WSREP_MEMBER_DONOR); + local_status.set(WSREP_MEMBER_DONOR); + + const char* method = (char*)msg; + size_t method_len = strlen (method); + const char* data = method + method_len + 1; + + char uuid_str[37]; + wsrep_uuid_print (current_uuid, uuid_str, sizeof(uuid_str)); + + int ret; + if (!strcmp (WSREP_SST_MYSQLDUMP, method)) + { + ret = sst_donate_mysqldump (data, current_uuid, uuid_str, current_seqno, + bypass); + } + else + { + ret = sst_donate_other (method, data, uuid_str, current_seqno, bypass); + } + + return (ret > 0 ? 0 : ret); +} + +void wsrep_SE_init_grab() +{ + if (mysql_mutex_lock (&LOCK_wsrep_sst_init)) abort(); +} + +void wsrep_SE_init_wait() +{ + mysql_cond_wait (&COND_wsrep_sst_init, &LOCK_wsrep_sst_init); + mysql_mutex_unlock (&LOCK_wsrep_sst_init); +} + +void wsrep_SE_init_done() +{ + mysql_cond_signal (&COND_wsrep_sst_init); + mysql_mutex_unlock (&LOCK_wsrep_sst_init); +} + +void wsrep_SE_initialized() +{ + SE_initialized = true; +} diff --git a/sql/wsrep_utils.cc b/sql/wsrep_utils.cc new file mode 100644 index 00000000000..f39353eda44 --- /dev/null +++ b/sql/wsrep_utils.cc @@ -0,0 +1,447 @@ +/* Copyright 2010 Codership Oy <http://www.codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +//! @file declares symbols private to wsrep integration layer + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE // POSIX_SPAWN_USEVFORK flag +#endif + +#include <spawn.h> // posix_spawn() +#include <unistd.h> // pipe() +#include <errno.h> // errno +#include <string.h> // strerror() +#include <sys/wait.h> // waitpid() + +#include <sql_class.h> +#include "wsrep_priv.h" + +extern char** environ; // environment variables + +static wsp::string wsrep_PATH; + +void +wsrep_prepend_PATH (const char* path) +{ + int count = 0; + + while (environ[count]) + { + if (strncmp (environ[count], "PATH=", 5)) + { + count++; + continue; + } + + char* const old_path (environ[count]); + + if (strstr (old_path, path)) return; // path already there + + size_t const new_path_len(strlen(old_path) + strlen(":") + + strlen(path) + 1); + + char* const new_path (reinterpret_cast<char*>(malloc(new_path_len))); + + if (new_path) + { + snprintf (new_path, new_path_len, "PATH=%s:%s", path, + old_path + strlen("PATH=")); + + wsrep_PATH.set (new_path); + environ[count] = new_path; + } + else + { + WSREP_ERROR ("Failed to allocate 'PATH' environment variable " + "buffer of size %zu.", new_path_len); + } + + return; + } + + WSREP_ERROR ("Failed to find 'PATH' environment variable. " + "State snapshot transfer may not be working."); +} + +namespace wsp +{ + +#define PIPE_READ 0 +#define PIPE_WRITE 1 +#define STDIN_FD 0 +#define STDOUT_FD 1 + +#ifndef POSIX_SPAWN_USEVFORK +# define POSIX_SPAWN_USEVFORK 0 +#endif + +process::process (const char* cmd, const char* type) + : str_(cmd ? strdup(cmd) : strdup("")), io_(NULL), err_(EINVAL), pid_(0) +{ + if (0 == str_) + { + WSREP_ERROR ("Can't allocate command line of size: %zu", strlen(cmd)); + err_ = ENOMEM; + return; + } + + if (0 == strlen(str_)) + { + WSREP_ERROR ("Can't start a process: null or empty command line."); + return; + } + + if (NULL == type || (strcmp (type, "w") && strcmp(type, "r"))) + { + WSREP_ERROR ("type argument should be either \"r\" or \"w\"."); + return; + } + + int pipe_fds[2] = { -1, }; + if (::pipe(pipe_fds)) + { + err_ = errno; + WSREP_ERROR ("pipe() failed: %d (%s)", err_, strerror(err_)); + return; + } + + // which end of pipe will be returned to parent + int const parent_end (strcmp(type,"w") ? PIPE_READ : PIPE_WRITE); + int const child_end (parent_end == PIPE_READ ? PIPE_WRITE : PIPE_READ); + int const close_fd (parent_end == PIPE_READ ? STDOUT_FD : STDIN_FD); + + char* const pargv[4] = { strdup("sh"), strdup("-c"), strdup(str_), NULL }; + if (!(pargv[0] && pargv[1] && pargv[2])) + { + err_ = ENOMEM; + WSREP_ERROR ("Failed to allocate pargv[] array."); + goto cleanup_pipe; + } + + posix_spawnattr_t attr; + err_ = posix_spawnattr_init (&attr); + if (err_) + { + WSREP_ERROR ("posix_spawnattr_init() failed: %d (%s)", + err_, strerror(err_)); + goto cleanup_pipe; + } + + err_ = posix_spawnattr_setflags (&attr, POSIX_SPAWN_SETSIGDEF | + POSIX_SPAWN_USEVFORK); + if (err_) + { + WSREP_ERROR ("posix_spawnattr_setflags() failed: %d (%s)", + err_, strerror(err_)); + goto cleanup_attr; + } + + posix_spawn_file_actions_t fact; + err_ = posix_spawn_file_actions_init (&fact); + if (err_) + { + WSREP_ERROR ("posix_spawn_file_actions_init() failed: %d (%s)", + err_, strerror(err_)); + goto cleanup_attr; + } + + // close child's stdout|stdin depending on what we returning + err_ = posix_spawn_file_actions_addclose (&fact, close_fd); + if (err_) + { + WSREP_ERROR ("posix_spawn_file_actions_addclose() failed: %d (%s)", + err_, strerror(err_)); + goto cleanup_fact; + } + + // substitute our pipe descriptor in place of the closed one + err_ = posix_spawn_file_actions_adddup2 (&fact, + pipe_fds[child_end], close_fd); + if (err_) + { + WSREP_ERROR ("posix_spawn_file_actions_addup2() failed: %d (%s)", + err_, strerror(err_)); + goto cleanup_fact; + } + + err_ = posix_spawnp (&pid_, pargv[0], &fact, &attr, pargv, environ); + if (err_) + { + WSREP_ERROR ("posix_spawnp(%s) failed: %d (%s)", + pargv[2], err_, strerror(err_)); + pid_ = 0; // just to make sure it was not messed up in the call + goto cleanup_fact; + } + + io_ = fdopen (pipe_fds[parent_end], type); + + if (io_) + { + pipe_fds[parent_end] = -1; // skip close on cleanup + } + else + { + err_ = errno; + WSREP_ERROR ("fdopen() failed: %d (%s)", err_, strerror(err_)); + } + +cleanup_fact: + int err; // to preserve err_ code + err = posix_spawn_file_actions_destroy (&fact); + if (err) + { + WSREP_ERROR ("posix_spawn_file_actions_destroy() failed: %d (%s)\n", + err, strerror(err)); + } + +cleanup_attr: + err = posix_spawnattr_destroy (&attr); + if (err) + { + WSREP_ERROR ("posix_spawnattr_destroy() failed: %d (%s)", + err, strerror(err)); + } + +cleanup_pipe: + if (pipe_fds[0] >= 0) close (pipe_fds[0]); + if (pipe_fds[1] >= 0) close (pipe_fds[1]); + + free (pargv[0]); + free (pargv[1]); + free (pargv[2]); +} + +process::~process () +{ + if (io_) + { + assert (pid_); + assert (str_); + + WSREP_WARN("Closing pipe to child process: %s, PID(%ld) " + "which might still be running.", str_, (long)pid_); + + if (fclose (io_) == -1) + { + err_ = errno; + WSREP_ERROR("fclose() failed: %d (%s)", err_, strerror(err_)); + } + } + + if (str_) free (const_cast<char*>(str_)); +} + +int +process::wait () +{ + if (pid_) + { + int status; + if (-1 == waitpid(pid_, &status, 0)) + { + err_ = errno; assert (err_); + WSREP_ERROR("Waiting for process failed: %s, PID(%ld): %d (%s)", + str_, (long)pid_, err_, strerror (err_)); + } + else + { // command completed, check exit status + if (WIFEXITED (status)) { + err_ = WEXITSTATUS (status); + } + else { // command didn't complete with exit() + WSREP_ERROR("Process was aborted."); + err_ = errno ? errno : ECHILD; + } + + if (err_) { + switch (err_) /* Translate error codes to more meaningful */ + { + case 126: err_ = EACCES; break; /* Permission denied */ + case 127: err_ = ENOENT; break; /* No such file or directory */ + } + WSREP_ERROR("Process completed with error: %s: %d (%s)", + str_, err_, strerror(err_)); + } + + pid_ = 0; + if (io_) fclose (io_); + io_ = NULL; + } + } + else { + assert (NULL == io_); + WSREP_ERROR("Command did not run: %s", str_); + } + + return err_; +} + +thd::thd () : init(), ptr(new THD) +{ + if (ptr) + { + ptr->thread_stack= (char*) &ptr; + ptr->store_globals(); + ptr->variables.option_bits&= ~OPTION_BIN_LOG; // disable binlog + ptr->security_ctx->master_access= ~(ulong)0; + lex_start(ptr); + } +} + +thd::~thd () +{ + if (ptr) + { + delete ptr; + my_pthread_setspecific_ptr (THR_THD, 0); + } +} + +} // namespace wsp + +extern ulong my_bind_addr; +extern uint mysqld_port; + +size_t default_ip (char* buf, size_t buf_len) +{ + size_t ip_len = 0; + + if (htonl(INADDR_NONE) == my_bind_addr) { + WSREP_ERROR("Networking not configured, cannot receive state transfer."); + return 0; + } + + if (htonl(INADDR_ANY) == my_bind_addr) { + // binds to all interfaces, try to find the address of the first one +#if (TARGET_OS_LINUX == 1) + const char cmd[] = "/sbin/ifconfig | " + "grep -m1 -1 -E '^[a-z]?eth[0-9]' | tail -n 1 | " + "awk '{ print $2 }' | awk -F : '{ print $2 }'"; +#elif defined(__sun__) + const char cmd[] = "/sbin/ifconfig -a | " + "grep -m1 -1 -E 'net[0-9]:' | tail -n 1 | awk '{ print $2 }'"; +#else + char *cmd; +#error "OS not supported" +#endif + wsp::process proc (cmd, "r"); + + if (NULL != proc.pipe()) { + char* ret; + + ret = fgets (buf, buf_len, proc.pipe()); + + if (proc.wait()) return 0; + + if (NULL == ret) { + WSREP_ERROR("Failed to read output of: '%s'", cmd); + return 0; + } + } + else { + WSREP_ERROR("Failed to execute: '%s'", cmd); + return 0; + } + + // clear possible \n at the end of ip string left by fgets() + ip_len = strlen (buf); + if (ip_len > 0 && '\n' == buf[ip_len - 1]) { + ip_len--; + buf[ip_len] = '\0'; + } + + if (INADDR_NONE == inet_addr(buf)) { + if (strlen(buf) != 0) { + WSREP_WARN("Shell command returned invalid address: '%s'", buf); + } + return 0; + } + } + else { + uint8_t* b = (uint8_t*)&my_bind_addr; + ip_len = snprintf (buf, buf_len, + "%hhu.%hhu.%hhu.%hhu", b[0],b[1],b[2],b[3]); + } + + return ip_len; +} + +size_t default_address(char* buf, size_t buf_len) +{ + size_t addr_len = default_ip (buf, buf_len); + + if (addr_len && addr_len < buf_len) { + addr_len += snprintf (buf + addr_len, buf_len - addr_len, + ":%u", mysqld_port); + } + + return addr_len; +} + +/* + * WSREPXid + */ + +#define WSREP_XID_PREFIX "WSREPXid" +#define WSREP_XID_PREFIX_LEN MYSQL_XID_PREFIX_LEN +#define WSREP_XID_UUID_OFFSET 8 +#define WSREP_XID_SEQNO_OFFSET (WSREP_XID_UUID_OFFSET + sizeof(wsrep_uuid_t)) +#define WSREP_XID_GTRID_LEN (WSREP_XID_SEQNO_OFFSET + sizeof(wsrep_seqno_t)) + +void wsrep_xid_init(XID* xid, const wsrep_uuid_t* uuid, wsrep_seqno_t seqno) +{ + xid->formatID= 1; + xid->gtrid_length= WSREP_XID_GTRID_LEN; + xid->bqual_length= 0; + memset(xid->data, 0, sizeof(xid->data)); + memcpy(xid->data, WSREP_XID_PREFIX, WSREP_XID_PREFIX_LEN); + memcpy(xid->data + WSREP_XID_UUID_OFFSET, uuid, sizeof(wsrep_uuid_t)); + memcpy(xid->data + WSREP_XID_SEQNO_OFFSET, &seqno, sizeof(wsrep_seqno_t)); +} + +const wsrep_uuid_t* wsrep_xid_uuid(const XID* xid) +{ + if (wsrep_is_wsrep_xid(xid)) + return reinterpret_cast<const wsrep_uuid_t*>(xid->data + + WSREP_XID_UUID_OFFSET); + else + return &WSREP_UUID_UNDEFINED; +} + +wsrep_seqno_t wsrep_xid_seqno(const XID* xid) +{ + + if (wsrep_is_wsrep_xid(xid)) + { + wsrep_seqno_t seqno; + memcpy(&seqno, xid->data + WSREP_XID_SEQNO_OFFSET, sizeof(wsrep_seqno_t)); + return seqno; + } + else + { + return WSREP_SEQNO_UNDEFINED; + } +} + +extern "C" +int wsrep_is_wsrep_xid(const void* xid_ptr) +{ + const XID* xid= reinterpret_cast<const XID*>(xid_ptr); + return (xid->formatID == 1 && + xid->gtrid_length == WSREP_XID_GTRID_LEN && + xid->bqual_length == 0 && + !memcmp(xid->data, WSREP_XID_PREFIX, WSREP_XID_PREFIX_LEN)); +} diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc new file mode 100644 index 00000000000..66f0f05c006 --- /dev/null +++ b/sql/wsrep_var.cc @@ -0,0 +1,505 @@ +/* Copyright 2008 Codership Oy <http://www.codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include <mysqld.h> +#include <sql_class.h> +#include <sql_plugin.h> +#include <set_var.h> +#include <sql_acl.h> +#include "wsrep_priv.h" +#include <my_dir.h> +#include <cstdio> +#include <cstdlib> + +#define WSREP_START_POSITION_ZERO "00000000-0000-0000-0000-000000000000:-1" + +// trx history position to start with +const char* wsrep_start_position = WSREP_START_POSITION_ZERO; +const char* wsrep_provider = WSREP_NONE; +const char* wsrep_provider_options = (const char*)my_memdup("", 1, MYF(MY_WME)); +const char* wsrep_cluster_address = NULL; +const char* wsrep_cluster_name = "my_wsrep_cluster"; +const char* wsrep_node_name = glob_hostname; +static char node_address[256] = { 0, }; +const char* wsrep_node_address = node_address; +ulong wsrep_OSU_method_options; + +int wsrep_init_vars() +{ + global_system_variables.binlog_format=BINLOG_FORMAT_ROW; + return 0; +} + +bool wsrep_on_update (sys_var *self, THD* thd, enum_var_type var_type) +{ + if (var_type == OPT_GLOBAL) { + // FIXME: this variable probably should be changed only per session + thd->variables.wsrep_on = global_system_variables.wsrep_on; + } + else { + } + +#ifdef REMOVED + if (thd->variables.wsrep_on) + thd->variables.option_bits |= (OPTION_BIN_LOG); + else + thd->variables.option_bits &= ~(OPTION_BIN_LOG); +#endif + return false; +} + +void wsrep_causal_reads_update (sys_var *self, THD* thd, enum_var_type var_type) +{ + if (var_type == OPT_GLOBAL) { + thd->variables.wsrep_causal_reads = global_system_variables.wsrep_causal_reads; + } + else { + } +} + +static int wsrep_start_position_verify (const char* start_str) +{ + size_t start_len; + wsrep_uuid_t uuid; + ssize_t uuid_len; + + start_len = strlen (start_str); + if (start_len < 34) + return 1; + + uuid_len = wsrep_uuid_scan (start_str, start_len, &uuid); + if (uuid_len < 0 || (start_len - uuid_len) < 2) + return 1; + + if (start_str[uuid_len] != ':') // separator should follow UUID + return 1; + + char* endptr; + wsrep_seqno_t const seqno __attribute__((unused)) // to avoid GCC warnings + (strtoll(&start_str[uuid_len + 1], &endptr, 10)); + + if (*endptr == '\0') return 0; // remaining string was seqno + + return 1; +} + +bool wsrep_start_position_check (sys_var *self, THD* thd, set_var* var) +{ + char buff[FN_REFLEN]; + String str(buff, sizeof(buff), system_charset_info), *res; + const char* start_str = NULL; + + if (!(res = var->value->val_str(&str))) goto err; + + start_str = res->c_ptr(); + + if (!start_str) goto err; + + if (!wsrep_start_position_verify(start_str)) return 0; + +err: + + my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str, + start_str ? start_str : "NULL"); + return 1; +} + +void wsrep_set_local_position (const char* value) +{ + size_t value_len = strlen (value); + size_t uuid_len = wsrep_uuid_scan (value, value_len, &local_uuid); + + local_seqno = strtoll (value + uuid_len + 1, NULL, 10); + + XID xid; + wsrep_xid_init(&xid, &local_uuid, local_seqno); + wsrep_set_SE_checkpoint(&xid); + WSREP_INFO ("wsrep_start_position var submitted: '%s'", wsrep_start_position); +} + +bool wsrep_start_position_update (sys_var *self, THD* thd, enum_var_type type) +{ + // since this value passed wsrep_start_position_check, don't check anything + // here + wsrep_set_local_position (wsrep_start_position); + + if (wsrep) { + wsrep->sst_received (wsrep, &local_uuid, local_seqno, NULL, 0); + } + + return 0; +} + +void wsrep_start_position_init (const char* val) +{ + if (NULL == val || wsrep_start_position_verify (val)) + { + WSREP_ERROR("Bad initial value for wsrep_start_position: %s", + (val ? val : "")); + return; + } + + wsrep_start_position = my_strdup(val, MYF(0)); + + wsrep_set_local_position (val); +} + +static bool refresh_provider_options() +{ + char* opts= wsrep->options_get(wsrep); + if (opts) + { + if (wsrep_provider_options) my_free((void *)wsrep_provider_options); + wsrep_provider_options = (char*)my_memdup(opts, strlen(opts) + 1, + MYF(MY_WME)); + } + else + { + WSREP_ERROR("Failed to get provider options"); + return true; + } + return false; +} + +static int wsrep_provider_verify (const char* provider_str) +{ + MY_STAT f_stat; + char path[FN_REFLEN]; + + if (!provider_str || strlen(provider_str)== 0) + return 1; + + if (!strcmp(provider_str, WSREP_NONE)) + return 0; + + if (!unpack_filename(path, provider_str)) + return 1; + + /* check that provider file exists */ + bzero(&f_stat, sizeof(MY_STAT)); + if (!my_stat(path, &f_stat, MYF(0))) + { + return 1; + } + return 0; +} + +bool wsrep_provider_check (sys_var *self, THD* thd, set_var* var) +{ + char buff[FN_REFLEN]; + String str(buff, sizeof(buff), system_charset_info), *res; + const char* provider_str = NULL; + + if (!(res = var->value->val_str(&str))) goto err; + + provider_str = res->c_ptr(); + + if (!provider_str) goto err; + + if (!wsrep_provider_verify(provider_str)) return 0; + +err: + + my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str, + provider_str ? provider_str : "NULL"); + return 1; +} + +bool wsrep_provider_update (sys_var *self, THD* thd, enum_var_type type) +{ + bool rcode= false; + + bool wsrep_on_saved= thd->variables.wsrep_on; + thd->variables.wsrep_on= false; + + wsrep_stop_replication(thd); + wsrep_deinit(); + + char* tmp= strdup(wsrep_provider); // wsrep_init() rewrites provider + //when fails + if (wsrep_init()) + { + my_error(ER_CANT_OPEN_LIBRARY, MYF(0), tmp); + rcode = true; + } + free(tmp); + + // we sure don't want to use old address with new provider + wsrep_cluster_address_init(NULL); + wsrep_provider_options_init(NULL); + + thd->variables.wsrep_on= wsrep_on_saved; + + refresh_provider_options(); + + return rcode; +} + +void wsrep_provider_init (const char* value) +{ + if (NULL == value || wsrep_provider_verify (value)) + { + WSREP_ERROR("Bad initial value for wsrep_provider: %s", + (value ? value : "")); + return; + } + wsrep_provider = my_strdup(value, MYF(0)); +} + +bool wsrep_provider_options_check(sys_var *self, THD* thd, set_var* var) +{ + return 0; +} + +bool wsrep_provider_options_update(sys_var *self, THD* thd, enum_var_type type) +{ + wsrep_status_t ret= wsrep->options_set(wsrep, wsrep_provider_options); + if (ret != WSREP_OK) + { + WSREP_ERROR("Set options returned %d", ret); + return true; + } + return refresh_provider_options(); +} + +void wsrep_provider_options_init(const char* value) +{ + if (wsrep_provider_options && wsrep_provider_options != value) + my_free((void *)wsrep_provider_options); + wsrep_provider_options = (value) ? my_strdup(value, MYF(0)) : NULL; +} + +static int wsrep_cluster_address_verify (const char* cluster_address_str) +{ + /* There is no predefined address format, it depends on provider. */ + return 0; +} + +bool wsrep_cluster_address_check (sys_var *self, THD* thd, set_var* var) +{ + char buff[FN_REFLEN]; + String str(buff, sizeof(buff), system_charset_info), *res; + const char* cluster_address_str = NULL; + + if (!(res = var->value->val_str(&str))) goto err; + + cluster_address_str = res->c_ptr(); + + if (!wsrep_cluster_address_verify(cluster_address_str)) return 0; + + err: + + my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str, + cluster_address_str ? cluster_address_str : "NULL"); + return 1 ; +} + +bool wsrep_cluster_address_update (sys_var *self, THD* thd, enum_var_type type) +{ + bool wsrep_on_saved= thd->variables.wsrep_on; + thd->variables.wsrep_on= false; + + wsrep_stop_replication(thd); + + if (wsrep_start_replication()) + { + wsrep_create_rollbacker(); + wsrep_create_appliers(wsrep_slave_threads); + } + + thd->variables.wsrep_on= wsrep_on_saved; + + return false; +} + +void wsrep_cluster_address_init (const char* value) +{ + if (wsrep_cluster_address && wsrep_cluster_address != value) + my_free ((void*)wsrep_cluster_address); + + wsrep_cluster_address = (value) ? my_strdup(value, MYF(0)) : NULL; +} + +bool wsrep_cluster_name_check (sys_var *self, THD* thd, set_var* var) +{ + char buff[FN_REFLEN]; + String str(buff, sizeof(buff), system_charset_info), *res; + const char* cluster_name_str = NULL; + + if (!(res = var->value->val_str(&str))) goto err; + + cluster_name_str = res->c_ptr(); + + if (!cluster_name_str || strlen(cluster_name_str) == 0) goto err; + + return 0; + + err: + + my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str, + cluster_name_str ? cluster_name_str : "NULL"); + return 1; +} + +bool wsrep_cluster_name_update (sys_var *self, THD* thd, enum_var_type type) +{ + return 0; +} + +bool wsrep_node_name_check (sys_var *self, THD* thd, set_var* var) +{ + char buff[FN_REFLEN]; + String str(buff, sizeof(buff), system_charset_info), *res; + const char* node_name_str = NULL; + + if (!(res = var->value->val_str(&str))) goto err; + + node_name_str = res->c_ptr(); + + if (!node_name_str || strlen(node_name_str) == 0) goto err; + + return 0; + + err: + + my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str, + node_name_str ? node_name_str : "NULL"); + return 1; +} + +bool wsrep_node_name_update (sys_var *self, THD* thd, enum_var_type type) +{ + return 0; +} + +// TODO: do something more elaborate, like checking connectivity +bool wsrep_node_address_check (sys_var *self, THD* thd, set_var* var) +{ + char buff[FN_REFLEN]; + String str(buff, sizeof(buff), system_charset_info), *res; + const char* node_address_str = NULL; + + if (!(res = var->value->val_str(&str))) goto err; + + node_address_str = res->c_ptr(); + + if (!node_address_str || strlen(node_address_str) == 0) goto err; + + return 0; + + err: + + my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str, + node_address_str ? node_address_str : "NULL"); + return 1; +} + +bool wsrep_node_address_update (sys_var *self, THD* thd, enum_var_type type) +{ + return 0; +} + +void wsrep_node_address_init (const char* value) +{ + if (wsrep_node_address && strcmp(wsrep_node_address, value)) + my_free ((void*)wsrep_node_address); + + wsrep_node_address = (value) ? my_strdup(value, MYF(0)) : NULL; +} + +/* + * Status variables stuff below + */ +static inline void +wsrep_assign_to_mysql (SHOW_VAR* mysql, wsrep_stats_var* wsrep) +{ + mysql->name = wsrep->name; + switch (wsrep->type) { + case WSREP_VAR_INT64: + mysql->value = (char*) &wsrep->value._int64; + mysql->type = SHOW_LONGLONG; + break; + case WSREP_VAR_STRING: + mysql->value = (char*) &wsrep->value._string; + mysql->type = SHOW_CHAR_PTR; + break; + case WSREP_VAR_DOUBLE: + mysql->value = (char*) &wsrep->value._double; + mysql->type = SHOW_DOUBLE; + break; + } +} + +static wsrep_stats_var* wsrep_status_vars = NULL; + +#if DYNAMIC +// somehow this mysql status thing works only with statically allocated arrays. +static SHOW_VAR* mysql_status_vars = NULL; +static int mysql_status_len = -1; +#else +static SHOW_VAR mysql_status_vars[100 + 1]; +static const int mysql_status_len = 100; +#endif + +static void export_wsrep_status_to_mysql() +{ + int wsrep_status_len, i; + + if (wsrep_status_vars) wsrep->stats_free (wsrep, wsrep_status_vars); + + wsrep_status_vars = wsrep->stats_get (wsrep); + + if (!wsrep_status_vars) { + return; + } + + for (wsrep_status_len = 0; + wsrep_status_vars[wsrep_status_len].name != NULL; + wsrep_status_len++); + +#if DYNAMIC + if (wsrep_status_len != mysql_status_len) { + void* tmp = realloc (mysql_status_vars, + (wsrep_status_len + 1) * sizeof(SHOW_VAR)); + if (!tmp) { + + sql_print_error ("Out of memory for wsrep status variables." + "Number of variables: %d", wsrep_status_len); + return; + } + + mysql_status_len = wsrep_status_len; + mysql_status_vars = (SHOW_VAR*)tmp; + } + /* @TODO: fix this: */ +#else + if (mysql_status_len < wsrep_status_len) wsrep_status_len= mysql_status_len; +#endif + + for (i = 0; i < wsrep_status_len; i++) + wsrep_assign_to_mysql (mysql_status_vars + i, wsrep_status_vars + i); + + mysql_status_vars[wsrep_status_len].name = NullS; + mysql_status_vars[wsrep_status_len].value = NullS; + mysql_status_vars[wsrep_status_len].type = SHOW_LONG; +} + +int wsrep_show_status (THD *thd, SHOW_VAR *var, char *buff) +{ + export_wsrep_status_to_mysql(); + var->type= SHOW_ARRAY; + var->value= (char *) &mysql_status_vars; + return 0; +} diff --git a/support-files/wsrep.cnf.sh b/support-files/wsrep.cnf.sh new file mode 100644 index 00000000000..e8483910c0b --- /dev/null +++ b/support-files/wsrep.cnf.sh @@ -0,0 +1,125 @@ +# This file contains wsrep-related mysqld options. It should be included +# in the main MySQL configuration file. +# +# Options that need to be customized: +# - wsrep_provider +# - wsrep_cluster_address +# - wsrep_sst_auth +# The rest of defaults should work out of the box. + +## +## mysqld options _MANDATORY_ for correct opration of the cluster +## +[mysqld] + +# (This must be substituted by wsrep_format) +binlog_format=ROW + +# Currently only InnoDB storage engine is supported +default-storage-engine=innodb + +# to avoid issues with 'bulk mode inserts' using autoinc +innodb_autoinc_lock_mode=2 + +# This is a must for paralell applying +innodb_locks_unsafe_for_binlog=1 + +# Query Cache is not supported with wsrep +query_cache_size=0 +query_cache_type=0 + +# Override bind-address +# In some systems bind-address defaults to 127.0.0.1, and with mysqldump SST +# it will have (most likely) disastrous consequences on donor node +bind-address=0.0.0.0 + +## +## WSREP options +## + +# Full path to wsrep provider library or 'none' +wsrep_provider=none + +# Provider specific configuration options +#wsrep_provider_options= + +# Logical cluster name. Should be the same for all nodes. +wsrep_cluster_name="my_wsrep_cluster" + +# Group communication system handle +#wsrep_cluster_address="dummy://" + +# Human-readable node name (non-unique). Hostname by default. +#wsrep_node_name= + +# Base replication <address|hostname>[:port] of the node. +# The values supplied will be used as defaults for state transfer receiving, +# listening ports and so on. Default: address of the first network interface. +#wsrep_node_address= + +# Address for incoming client connections. Autodetect by default. +#wsrep_node_incoming_address= + +# How many threads will process writesets from other nodes +wsrep_slave_threads=1 + +# DBUG options for wsrep provider +#wsrep_dbug_option + +# Generate fake primary keys for non-PK tables (required for multi-master +# and parallel applying operation) +wsrep_certify_nonPK=1 + +# Maximum number of rows in write set +wsrep_max_ws_rows=131072 + +# Maximum size of write set +wsrep_max_ws_size=1073741824 + +# to enable debug level logging, set this to 1 +wsrep_debug=0 + +# convert locking sessions into transactions +wsrep_convert_LOCK_to_trx=0 + +# how many times to retry deadlocked autocommits +wsrep_retry_autocommit=1 + +# change auto_increment_increment and auto_increment_offset automatically +wsrep_auto_increment_control=1 + +# retry autoinc insert, which failed for duplicate key error +wsrep_drupal_282555_workaround=0 + +# enable "strictly synchronous" semantics for read operations +wsrep_causal_reads=0 + +# Command to call when node status or cluster membership changes. +# Will be passed all or some of the following options: +# --status - new status of this node +# --uuid - UUID of the cluster +# --primary - whether the component is primary or not ("yes"/"no") +# --members - comma-separated list of members +# --index - index of this node in the list +wsrep_notify_cmd= + +## +## WSREP State Transfer options +## + +# State Snapshot Transfer method +wsrep_sst_method=mysqldump + +# Address on THIS node to receive SST at. DON'T SET IT TO DONOR ADDRESS!!! +# (SST method dependent. Defaults to the first IP of the first interface) +#wsrep_sst_receive_address= + +# SST authentication string. This will be used to send SST to joining nodes. +# Depends on SST method. For mysqldump method it is root:<root password> +wsrep_sst_auth=root: + +# Desired SST donor name. +#wsrep_sst_donor= + +# Protocol version to use +# wsrep_protocol_version= diff --git a/support-files/wsrep_notify.sh b/support-files/wsrep_notify.sh new file mode 100644 index 00000000000..bdbe3d12a39 --- /dev/null +++ b/support-files/wsrep_notify.sh @@ -0,0 +1,102 @@ +#!/bin/sh -eu + +# This is a simple example of wsrep notification script (wsrep_notify_cmd). +# It will create 'wsrep' schema and two tables in it: 'membeship' and 'status' +# and fill them on every membership or node status change. +# +# Edit parameters below to specify the address and login to server. + +USER=root +PSWD=rootpass +HOST=127.0.0.1 +PORT=3306 + +SCHEMA="wsrep" +MEMB_TABLE="$SCHEMA.membership" +STATUS_TABLE="$SCHEMA.status" + +BEGIN=" +SET wsrep_on=0; +DROP SCHEMA IF EXISTS $SCHEMA; CREATE SCHEMA $SCHEMA; +CREATE TABLE $MEMB_TABLE ( + idx INT UNIQUE PRIMARY KEY, + uuid CHAR(40) UNIQUE, /* node UUID */ + name VARCHAR(32), /* node name */ + addr VARCHAR(256) /* node address */ +) ENGINE=MEMORY; +CREATE TABLE $STATUS_TABLE ( + size INT, /* component size */ + idx INT, /* this node index */ + status CHAR(16), /* this node status */ + uuid CHAR(40), /* cluster UUID */ + prim BOOLEAN /* if component is primary */ +) ENGINE=MEMORY; +BEGIN; +DELETE FROM $MEMB_TABLE; +DELETE FROM $STATUS_TABLE; +" +END="COMMIT;" + +configuration_change() +{ + echo "$BEGIN;" + + local idx=0 + + for NODE in $(echo $MEMBERS | sed s/,/\ /g) + do + echo "INSERT INTO $MEMB_TABLE VALUES ( $idx, " + # Don't forget to properly quote string values + echo "'$NODE'" | sed s/\\//\',\'/g + echo ");" + idx=$(( $idx + 1 )) + done + + echo "INSERT INTO $STATUS_TABLE VALUES($idx, $INDEX, '$STATUS', '$CLUSTER_UUID', $PRIMARY);" + + echo "$END" +} + +status_update() +{ + echo "SET wsrep_on=0; BEGIN; UPDATE $STATUS_TABLE SET status='$STATUS'; COMMIT;" +} + +COM=status_update # not a configuration change by default + +while [ $# -gt 0 ] +do + case $1 in + --status) + STATUS=$2 + shift + ;; + --uuid) + CLUSTER_UUID=$2 + shift + ;; + --primary) + [ "$2" = "yes" ] && PRIMARY="1" || PRIMARY="0" + COM=configuration_change + shift + ;; + --index) + INDEX=$2 + shift + ;; + --members) + MEMBERS=$2 + shift + ;; + esac + shift +done + +# Undefined means node is shutting down +if [ "$STATUS" != "Undefined" ] +then + $COM | mysql -B -u$USER -p$PSWD -h$HOST -P$PORT +fi + +exit 0 +# diff --git a/wsrep/CMakeLists.txt b/wsrep/CMakeLists.txt new file mode 100644 index 00000000000..11d0e34d1b0 --- /dev/null +++ b/wsrep/CMakeLists.txt @@ -0,0 +1,24 @@ +# Copyright (c) 2012, Codership Oy. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +INCLUDE_DIRECTORIES( "." ) + +SET(WSREP_SOURCES wsrep_uuid.c wsrep_loader.c wsrep_dummy.c) + +ADD_CONVENIENCE_LIBRARY(wsrep ${WSREP_SOURCES}) +DTRACE_INSTRUMENT(wsrep) + +#ADD_EXECUTABLE(listener wsrep_listener.c ${WSREP_SOURCES}) +#TARGET_LINK_LIBRARIES(listener ${LIBDL}) diff --git a/wsrep/Makefile.am b/wsrep/Makefile.am new file mode 100644 index 00000000000..40e4b501e86 --- /dev/null +++ b/wsrep/Makefile.am @@ -0,0 +1,2 @@ +noinst_LIBRARIES = libwsrep.a +libwsrep_a_SOURCES = wsrep_api.h wsrep_loader.c wsrep_dummy.c wsrep_uuid.c diff --git a/wsrep/wsrep_api.h b/wsrep/wsrep_api.h new file mode 100644 index 00000000000..2cd10afc7ff --- /dev/null +++ b/wsrep/wsrep_api.h @@ -0,0 +1,875 @@ +/* Copyright (C) 2009-2011 Codership Oy <info@codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#ifndef WSREP_H +#define WSREP_H + +#include <stdint.h> +#include <stdbool.h> +#include <stdlib.h> +#include <unistd.h> +#include <time.h> + +#ifdef __cplusplus +extern "C" { +#endif + +/*! + * wsrep replication API + */ + +#define WSREP_INTERFACE_VERSION "23" + +/*! + * Certain provider capabilities application may need to know + */ +#define WSREP_CAP_MULTI_MASTER ( 1ULL << 0 ) +#define WSREP_CAP_CERTIFICATION ( 1ULL << 1 ) +#define WSREP_CAP_PARALLEL_APPLYING ( 1ULL << 2 ) +#define WSREP_CAP_TRX_REPLAY ( 1ULL << 3 ) +#define WSREP_CAP_ISOLATION ( 1ULL << 4 ) +#define WSREP_CAP_PAUSE ( 1ULL << 5 ) +#define WSREP_CAP_CAUSAL_READS ( 1ULL << 6 ) +#define WSREP_CAP_CAUSAL_TRX ( 1ULL << 7 ) +#define WSREP_CAP_WRITE_SET_INCREMENTS ( 1ULL << 8 ) +#define WSREP_CAP_SESSION_LOCKS ( 1ULL << 9 ) +#define WSREP_CAP_DISTRIBUTED_LOCKS ( 1ULL << 10 ) +#define WSREP_CAP_CONSISTENCY_CHECK ( 1ULL << 11 ) + +/*! + * Write set replication flags + */ +#define WSREP_FLAG_PA_SAFE ( 1ULL << 0 ) + +/* Empty backend spec */ +#define WSREP_NONE "none" + +typedef uint64_t wsrep_trx_id_t; //!< application transaction ID +typedef uint64_t wsrep_conn_id_t; //!< application connection ID +typedef int64_t wsrep_seqno_t; //!< sequence number of a writeset, etc. + +/*! undefined seqno */ +#define WSREP_SEQNO_UNDEFINED (-1) + +/*! wsrep status codes */ +typedef enum wsrep_status { + WSREP_OK = 0, //!< success + WSREP_WARNING, //!< minor warning, error logged + WSREP_TRX_MISSING, //!< transaction is not known by wsrep + WSREP_TRX_FAIL, //!< transaction aborted, server can continue + WSREP_BF_ABORT, //!< trx was victim of brute force abort + WSREP_CONN_FAIL, //!< error in client connection, must abort + WSREP_NODE_FAIL, //!< error in node state, wsrep must reinit + WSREP_FATAL, //!< fatal error, server must abort + WSREP_NOT_IMPLEMENTED //!< feature not implemented +} wsrep_status_t; + +/*! + * @brief log severity levels, passed as first argument to log handler + */ +typedef enum wsrep_log_level +{ + WSREP_LOG_FATAL, //!< Unrecoverable error, application must quit. + WSREP_LOG_ERROR, //!< Operation failed, must be repeated. + WSREP_LOG_WARN, //!< Unexpected condition, but no operational failure. + WSREP_LOG_INFO, //!< Informational message. + WSREP_LOG_DEBUG //!< Debug message. Shows only of compiled with debug. +} wsrep_log_level_t; + +/*! + * @brief error log handler + * + * All messages from wsrep library are directed to this + * handler, if present. + * + * @param level log level + * @param message log message + */ +typedef void (*wsrep_log_cb_t)(wsrep_log_level_t, const char *); + +/*! + * UUID type - for all unique IDs + */ +typedef struct wsrep_uuid { + uint8_t uuid[16]; +} wsrep_uuid_t; + +/*! Undefined UUID */ +static const wsrep_uuid_t WSREP_UUID_UNDEFINED = {{0,}}; + +/*! + * Scan UUID from string + * @return length of UUID string representation or negative error code + */ +extern ssize_t +wsrep_uuid_scan (const char* str, size_t str_len, wsrep_uuid_t* uuid); + +/*! + * Print UUID to string + * @return length of UUID string representation or negative error code + */ +extern ssize_t +wsrep_uuid_print (const wsrep_uuid_t* uuid, char* str, size_t str_len); + +#define WSREP_MEMBER_NAME_LEN 32 //!< maximum logical member name length +#define WSREP_INCOMING_LEN 256 //!< max Domain Name length + 0x00 + +/*! + * member status + */ +typedef enum wsrep_member_status { + WSREP_MEMBER_UNDEFINED, //!< undefined state + WSREP_MEMBER_JOINER, //!< incomplete state, requested state transfer + WSREP_MEMBER_DONOR, //!< complete state, donates state transfer + WSREP_MEMBER_JOINED, //!< complete state + WSREP_MEMBER_SYNCED, //!< complete state, synchronized with group + WSREP_MEMBER_ERROR, //!< this and above is provider-specific error code + WSREP_MEMBER_MAX +} wsrep_member_status_t; + +/*! + * static information about a group member (some fields are tentative yet) + */ +typedef struct wsrep_member_info { + wsrep_uuid_t id; //!< group-wide unique member ID + char name[WSREP_MEMBER_NAME_LEN]; //!< human-readable name + char incoming[WSREP_INCOMING_LEN]; //!< address for client requests +} wsrep_member_info_t; + +/*! + * group status + */ +typedef enum wsrep_view_status { + WSREP_VIEW_PRIMARY, //!< primary group configuration (quorum present) + WSREP_VIEW_NON_PRIMARY, //!< non-primary group configuration (quorum lost) + WSREP_VIEW_DISCONNECTED, //!< not connected to group, retrying. + WSREP_VIEW_MAX +} wsrep_view_status_t; + +/*! + * view of the group + */ +typedef struct wsrep_view_info { + wsrep_uuid_t uuid; //!< global state UUID + wsrep_seqno_t seqno; //!< global state seqno + wsrep_seqno_t view; //!< global view number + wsrep_view_status_t status; //!< view status + bool state_gap; //!< gap between global and local states + int my_idx; //!< index of this member in the view + int memb_num; //!< number of members in the view + int proto_ver; //!< application protocol agreed on in the view + wsrep_member_info_t members[1]; //!< array of member information +} wsrep_view_info_t; + +/*! + * Magic string to tell provider to engage into trivial (empty) state transfer. + * No data will be passed, but the node shall be considered JOINED. + * Should be passed in sst_req parameter of wsrep_view_cb_t. + */ +#define WSREP_STATE_TRANSFER_TRIVIAL "trivial" + +/*! + * Magic string to tell provider not to engage in state transfer at all. + * The member will stay in WSREP_MEMBER_UNDEFINED state but will keep on + * receiving all writesets. + * Should be passed in sst_req parameter of wsrep_view_cb_t. + */ +#define WSREP_STATE_TRANSFER_NONE "none" + +/*! + * @brief group view handler + * + * This handler is called in total order corresponding to the group + * configuration change. It is to provide a vital information about + * new group view. If view info indicates existence of discontinuity + * between group and member states, state transfer request message + * should be filled in by the callback implementation. + * + * @note Currently it is assumed that sst_req is allocated using + * malloc()/calloc()/realloc() and it will be freed by + * wsrep implementation. + * + * @param app_ctx application context + * @param recv_ctx receiver context + * @param view new view on the group + * @param state current state + * @param state_len lenght of current state + * @param sst_req location to store SST request + * @param sst_req_len location to store SST request length or error code + * value of 0 means no SST. + */ +typedef void (*wsrep_view_cb_t) (void* app_ctx, + void* recv_ctx, + const wsrep_view_info_t* view, + const char* state, + size_t state_len, + void** sst_req, + ssize_t* sst_req_len); + +/*! + * @brief apply callback + * + * This handler is called from wsrep library to apply replicated write set + * Must support brute force applying for multi-master operation + * + * @param recv_ctx receiver context pointer provided by the application + * @param data data buffer containing the write set + * @param size data buffer size + * @param seqno global seqno part of the write set to be applied + * + * @return success code: + * @retval WSREP_OK + * @retval WSREP_NOT_IMPLEMENTED appl. does not support the write set format + * @retval WSREP_ERROR failed to apply the write set + */ +typedef enum wsrep_status (*wsrep_apply_cb_t) (void* recv_ctx, + const void* data, + size_t size, + wsrep_seqno_t seqno); + +/*! + * @brief commit callback + * + * This handler is called to commit the changes made by apply callback. + * + * @param recv_ctx receiver context pointer provided by the application + * @param seqno global seqno part of the write set to be committed + * @param commit true - commit writeset, false - rollback writeset + * + * @return success code: + * @retval WSREP_OK + * @retval WSREP_ERROR call failed + */ +typedef enum wsrep_status (*wsrep_commit_cb_t) (void* recv_ctx, + wsrep_seqno_t seqno, + bool commit); + +/*! + * @brief a callback to donate state snapshot + * + * This handler is called from wsrep library when it needs this node + * to deliver state to a new cluster member. + * No state changes will be committed for the duration of this call. + * Wsrep implementation may provide internal state to be transmitted + * to new cluster member for initial state. + * + * @param app_ctx application context + * @param recv_ctx receiver context + * @param msg state transfer request message + * @param msg_len state transfer request message length + * @param uuid current state uuid on this node + * @param seqno current state seqno on this node + * @param state current wsrep internal state buffer + * @param state_len current wsrep internal state buffer len + * @param bypass bypass snapshot transfer, only transfer uuid:seqno pair + * @return 0 for success or negative error code + */ +typedef int (*wsrep_sst_donate_cb_t) (void* app_ctx, + void* recv_ctx, + const void* msg, + size_t msg_len, + const wsrep_uuid_t* uuid, + wsrep_seqno_t seqno, + const char* state, + size_t state_len, + bool bypass); + +/*! + * @brief a callback to signal application that wsrep state is synced + * with cluster + * + * This callback is called after wsrep library has got in sync with + * rest of the cluster. + * + * @param app_ctx application context + */ +typedef void (*wsrep_synced_cb_t)(void* app_ctx); + + +/*! + * Initialization parameters for wsrep, used as arguments for wsrep_init() + */ +struct wsrep_init_args +{ + void* app_ctx; //!< Application context for callbacks + + /* Configuration parameters */ + const char* node_name; //!< Symbolic name of this node (e.g. hostname) + const char* node_address; //!< Address to be used by wsrep provider + const char* node_incoming; //!< Address for incoming client connections + const char* data_dir; //!< Directory where wsrep files are kept if any + const char* options; //!< Provider-specific configuration string + int proto_ver; //!< Max supported application protocol version + + /* Application initial state information. */ + const wsrep_uuid_t* state_uuid; //!< Application state sequence UUID + wsrep_seqno_t state_seqno; //!< Applicaiton state sequence number + const char* state; //!< Initial state for wsrep implementation + size_t state_len; //!< Length of state buffer + + /* Application callbacks */ + wsrep_log_cb_t logger_cb; //!< logging handler + wsrep_view_cb_t view_handler_cb; //!< group view change handler + + /* applier callbacks */ + wsrep_apply_cb_t apply_cb; //!< apply callback + wsrep_commit_cb_t commit_cb; //!< commit callback + + /* state snapshot transfer callbacks */ + wsrep_sst_donate_cb_t sst_donate_cb; //!< starting to donate + wsrep_synced_cb_t synced_cb; //!< synced with group +}; + +/*! Type of the stats variable value in struct wsrep_status_var */ +typedef enum wsrep_var_type +{ + WSREP_VAR_STRING, //!< pointer to null-terminated string + WSREP_VAR_INT64, //!< int64_t + WSREP_VAR_DOUBLE //!< double +} +wsrep_var_type_t; + +/*! Generalized stats variable representation */ +struct wsrep_stats_var +{ + const char* name; //!< variable name + wsrep_var_type_t type; //!< variable value type + union { + int64_t _int64; + double _double; + const char* _string; + } value; //!< variable value +}; + + +/*! Key part structure */ +typedef struct wsrep_key_part_ +{ + const void* buf; /*!< Buffer containing key part data */ + size_t buf_len; /*!< Length of buffer */ +} wsrep_key_part_t; + +/*! Key struct used to pass certification keys for transaction handling calls. + * A key consists of zero or more key parts. */ +typedef struct wsrep_key_ +{ + const wsrep_key_part_t* key_parts; /*!< Array of key parts */ + size_t key_parts_len; /*!< Length of key parts array */ +} wsrep_key_t; + +/*! Transaction handle struct passed for wsrep transaction handling calls */ +typedef struct wsrep_trx_handle_ +{ + wsrep_trx_id_t trx_id; //!< transaction ID + void* opaque; //!< opaque provider transaction context data +} wsrep_trx_handle_t; + +/*! + * @brief Helper method to reset trx handle state when trx id changes + * + * Instead of passing wsrep_trx_handle_t directly for wsrep calls, + * wrapping handle with this call offloads bookkeeping from + * application. + */ +static inline wsrep_trx_handle_t* wsrep_trx_handle_for_id( + wsrep_trx_handle_t* trx_handle, + wsrep_trx_id_t trx_id) +{ + if (trx_handle->trx_id != trx_id) + { + trx_handle->trx_id = trx_id; + trx_handle->opaque = NULL; + } + return trx_handle; +} + + +typedef struct wsrep_ wsrep_t; +/*! + * wsrep interface for dynamically loadable libraries + */ +struct wsrep_ { + + const char *version; //!< interface version string + + /*! + * @brief Initializes wsrep provider + * + * @param wsrep this wsrep handle + * @param args wsrep initialization parameters + */ + wsrep_status_t (*init) (wsrep_t* wsrep, + const struct wsrep_init_args* args); + + /*! + * @brief Returns provider capabilities flag bitmap + * + * @param wsrep this wsrep handle + */ + uint64_t (*capabilities) (wsrep_t* wsrep); + + /*! + * @brief Passes provider-specific configuration string to provider. + * + * @param wsrep this wsrep handle + * @param conf configuration string + * + * @retval WSREP_OK configuration string was parsed successfully + * @retval WSREP_WARNING could't not parse conf string, no action taken + */ + wsrep_status_t (*options_set) (wsrep_t* wsrep, const char* conf); + + /*! + * @brief Returns provider-specific string with current configuration values. + * + * @param wsrep this wsrep handle + * + * @return a dynamically allocated string with current configuration + * parameter values + */ + char* (*options_get) (wsrep_t* wsrep); + + /*! + * @brief Opens connection to cluster + * + * Returns when either node is ready to operate as a part of the clsuter + * or fails to reach operating status. + * + * @param wsrep this wsrep handle + * @param cluster_name unique symbolic cluster name + * @param cluster_url URL-like cluster address (backend://address) + * @param state_donor name of the node to be asked for state transfer. + */ + wsrep_status_t (*connect) (wsrep_t* wsrep, + const char* cluster_name, + const char* cluster_url, + const char* state_donor); + + /*! + * @brief Closes connection to cluster. + * + * If state_uuid and/or state_seqno is not NULL, will store final state + * in there. + * + * @param wsrep this wsrep handler + */ + wsrep_status_t (*disconnect)(wsrep_t* wsrep); + + /*! + * @brief start receiving replication events + * + * This function never returns + * + * @param wsrep this wsrep handle + * @param recv_ctx receiver context + */ + wsrep_status_t (*recv)(wsrep_t* wsrep, void* recv_ctx); + + /*! + * @brief Replicates/logs result of transaction to other nodes and allocates + * required resources. + * + * Must be called before transaction commit. Returns success code, which + * caller must check. + * In case of WSREP_OK, starts commit critical section, transaction can + * commit. Otherwise transaction must rollback. + * + * @param wsrep this wsrep handle + * @param trx_handle transaction which is committing + * @param conn_id connection ID + * @param app_data application specific applying data + * @param data_len the size of the applying data + * @param flags fine tuning the replication WSREP_FLAG_* + * @param seqno seqno part of the global transaction ID + * + * @retval WSREP_OK cluster-wide commit succeeded + * @retval WSREP_TRX_FAIL must rollback transaction + * @retval WSREP_CONN_FAIL must close client connection + * @retval WSREP_NODE_FAIL must close all connections and reinit + */ + wsrep_status_t (*pre_commit)(wsrep_t* wsrep, + wsrep_conn_id_t conn_id, + wsrep_trx_handle_t* trx_handle, + const void* app_data, + size_t data_len, + uint64_t flags, + wsrep_seqno_t* seqno); + + /*! + * @brief Releases resources after transaction commit. + * + * Ends commit critical section. + * + * @param wsrep this wsrep handle + * @param trx_handle transaction which is committing + * @retval WSREP_OK post_commit succeeded + */ + wsrep_status_t (*post_commit) (wsrep_t* wsrep, + wsrep_trx_handle_t* trx_handle); + + /*! + * @brief Releases resources after transaction rollback. + * + * @param wsrep this wsrep handle + * @param trx_handle transaction which is committing + * @retval WSREP_OK post_rollback succeeded + */ + wsrep_status_t (*post_rollback)(wsrep_t* wsrep, + wsrep_trx_handle_t* trx_handle); + + /*! + * @brief Replay trx as a slave write set + * + * If local trx has been aborted by brute force, and it has already + * replicated before this abort, we must try if we can apply it as + * slave trx. Note that slave nodes see only trx write sets and certification + * test based on write set content can be different to DBMS lock conflicts. + * + * @param wsrep this wsrep handle + * @param trx_handle transaction which is committing + * @param trx_ctx transaction context + * + * @retval WSREP_OK cluster commit succeeded + * @retval WSREP_TRX_FAIL must rollback transaction + * @retval WSREP_BF_ABORT brute force abort happened after trx replicated + * must rollback transaction and try to replay + * @retval WSREP_CONN_FAIL must close client connection + * @retval WSREP_NODE_FAIL must close all connections and reinit + */ + wsrep_status_t (*replay_trx)(wsrep_t* wsrep, + wsrep_trx_handle_t* trx_handle, + void* trx_ctx); + + /*! + * @brief Abort pre_commit() call of another thread. + * + * It is possible, that some high-priority transaction needs to abort + * another transaction which is in pre_commit() call waiting for resources. + * + * The kill routine checks that abort is not attmpted against a transaction + * which is front of the caller (in total order). + * + * @param wsrep this wsrep handle + * @param bf_seqno seqno of brute force trx, running this cancel + * @param victim_trx transaction to be aborted, and which is committing + * + * @retval WSREP_OK abort secceded + * @retval WSREP_WARNING abort failed + */ + wsrep_status_t (*abort_pre_commit)(wsrep_t* wsrep, + wsrep_seqno_t bf_seqno, + wsrep_trx_id_t victim_trx); + + /*! + * @brief Appends a query in transaction's write set + * + * @param wsrep this wsrep handle + * @param trx_handle transaction handle + * @param query SQL statement string + * @param timeval time to use for time functions + * @param randseed seed for rand + */ + wsrep_status_t (*append_query)(wsrep_t* wsrep, + wsrep_trx_handle_t* trx_handle, + const char* query, + time_t timeval, + uint32_t randseed); + + /*! + * @brief Appends a row reference in transaction's write set + * + * @param wsrep this wsrep handle + * @param trx_handle transaction handle + * @param key array of keys + * @param key_len length of the array of keys + * @param shared boolean denoting if key corresponds to shared resource + */ + wsrep_status_t (*append_key)(wsrep_t* wsrep, + wsrep_trx_handle_t* trx_handle, + const wsrep_key_t* key, + size_t key_len, + bool shared); + /*! + * @brief Appends data in transaction's write set + * + * This method can be called any time before commit and it + * appends data block into transaction's write set. + * + * @param wsrep this wsrep handle + * @param trx_handle transaction handle + * @param data data buffer + * @param data_len data buffer length + */ + wsrep_status_t (*append_data)(wsrep_t* wsrep, + wsrep_trx_handle_t* trx_handle, + const void* data, + size_t data_len); + + + /*! + * @brief Get causal ordering for read operation + * + * This call will block until causal ordering with all possible + * preceding writes in the cluster is guaranteed. If pointer to + * seqno is non-null, the call stores the global transaction ID + * of the last transaction which is guaranteed to be ordered + * causally before this call. + * + * @param wsrep this wsrep handle + * @param seqno location to store global transaction ID + */ + wsrep_status_t (*causal_read)(wsrep_t* wsrep, wsrep_seqno_t* seqno); + + /*! + * @brief Clears allocated connection context. + * + * Whenever a new connection ID is passed to wsrep provider through + * any of the API calls, a connection context is allocated for this + * connection. This call is to explicitly notify provider fo connection + * closing. + * + * @param wsrep this wsrep handle + * @param conn_id connection ID + * @param query the 'set database' query + * @param query_len length of query (does not end with 0) + */ + wsrep_status_t (*free_connection)(wsrep_t* wsrep, + wsrep_conn_id_t conn_id); + + /*! + * @brief Replicates a query and starts "total order isolation" section. + * + * Replicates the query and returns success code, which + * caller must check. Total order isolation continues + * until to_execute_end() is called. + * + * @param wsrep this wsrep handle + * @param conn_id connection ID + * @param key array of keys + * @param key_len lenght of the array of keys + * @param query query to be executed + * @param query_len length of the query string + * @param seqno seqno part of the action ID + * + * @retval WSREP_OK cluster commit succeeded + * @retval WSREP_CONN_FAIL must close client connection + * @retval WSREP_NODE_FAIL must close all connections and reinit + */ + wsrep_status_t (*to_execute_start)(wsrep_t* wsrep, + wsrep_conn_id_t conn_id, + const wsrep_key_t* key, + size_t key_len, + const void* query, + size_t query_len, + wsrep_seqno_t* seqno); + + /*! + * @brief Ends the total order isolation section. + * + * Marks the end of total order isolation. TO locks are freed + * and other transactions are free to commit from this point on. + * + * @param wsrep this wsrep handle + * @param conn_id connection ID + * + * @retval WSREP_OK cluster commit succeeded + * @retval WSREP_CONN_FAIL must close client connection + * @retval WSREP_NODE_FAIL must close all connections and reinit + */ + wsrep_status_t (*to_execute_end)(wsrep_t* wsrep, wsrep_conn_id_t conn_id); + + /*! + * @brief Signals to wsrep provider that state snapshot has been sent to + * joiner. + * + * @param wsrep this wsrep handle + * @param uuid sequence UUID (group UUID) + * @param seqno sequence number or negative error code of the operation + */ + wsrep_status_t (*sst_sent)(wsrep_t* wsrep, + const wsrep_uuid_t* uuid, + wsrep_seqno_t seqno); + + /*! + * @brief Signals to wsrep provider that new state snapshot has been received. + * May deadlock if called from sst_prepare_cb. + * + * @param wsrep this wsrep handle + * @param uuid sequence UUID (group UUID) + * @param seqno sequence number or negative error code of the operation + * @param state initial state provided by SST donor + * @param state_len length of state buffer + */ + wsrep_status_t (*sst_received)(wsrep_t* wsrep, + const wsrep_uuid_t* uuid, + wsrep_seqno_t seqno, + const char* state, + size_t state_len); + + + /*! + * @brief Generate request for consistent snapshot. + * + * If successfull, this call will generate internally SST request + * which in turn triggers calling SST donate callback on the nodes + * specified in donor_spec. If donor_spec is null, callback is + * called only locally. This call will block until sst_sent is called + * from callback. + * + * @param wsrep this wsrep handle + * @param msg context message for SST donate callback + * @param msg_len length of context message + * @param donor_spec list of snapshot donors + */ + wsrep_status_t (*snapshot)(wsrep_t* wsrep, + const void* msg, + size_t msg_len, + const char* donor_spec); + + /*! + * @brief Returns an array fo status variables. + * Array is terminated by Null variable name. + * + * @param wsrep this wsrep handle + * @return array of struct wsrep_status_var. + */ + struct wsrep_stats_var* (*stats_get) (wsrep_t* wsrep); + + /*! + * @brief Release resources that might be associated with the array. + * + * @param wsrep this wsrep handle. + */ + void (*stats_free) (wsrep_t* wsrep, struct wsrep_stats_var* var_array); + + /*! + * @brief Pauses writeset applying/committing. + * + * @return global sequence number of the paused state or negative error code. + */ + wsrep_seqno_t (*pause) (wsrep_t* wsrep); + + /*! + * @brief Resumes writeset applying/committing. + */ + wsrep_status_t (*resume) (wsrep_t* wsrep); + + /*! + * @brief Desynchronize from cluster + * + * Effectively turns off flow control for this node, allowing it + * to fall behind the cluster. + */ + wsrep_status_t (*desync) (wsrep_t* wsrep); + + /*! + * @brief Request to resynchronize with cluster. + * + * Effectively turns on flow control. Asynchronous - actual synchronization + * event to be deliverred via sync_cb. + */ + wsrep_status_t (*resync) (wsrep_t* wsrep); + + /*! + * @brief Acquire global named lock + * + * @param wsrep wsrep provider handle + * @param name lock name + * @param owner 64-bit owner ID + * @param tout timeout in nanoseconds. + * 0 - return immediately, -1 wait forever. + * @return wsrep status or negative error code + * @retval -EDEADLK lock was already acquired by this thread + * @retval -EBUSY lock was busy + */ + wsrep_status_t (*lock) (wsrep_t* wsrep, const char* name, int64_t owner, + int64_t tout); + + /*! + * @brief Release global named lock + * + * @param wsrep wsrep provider handle + * @param name lock name + * @param owner 64-bit owner ID + * @return wsrep status or negative error code + * @retval -EPERM lock does not belong to this owner + */ + wsrep_status_t (*unlock) (wsrep_t* wsrep, const char* name, int64_t owner); + + /*! + * @brief Check if global named lock is locked + * + * @param wsrep wsrep provider handle + * @param name lock name + * @param owner if not NULL will contain 64-bit owner ID + * @param node if not NULL will contain owner's node UUID + * @return true if lock is locked + */ + bool (*is_locked) (wsrep_t* wsrep, const char* name, int64_t* conn, + wsrep_uuid_t* node); + + /*! + * wsrep provider name + */ + const char* provider_name; + + /*! + * wsrep provider version + */ + const char* provider_version; + + /*! + * wsrep provider vendor name + */ + const char* provider_vendor; + + /*! + * @brief Frees allocated resources before unloading the library. + * @param wsrep this wsrep handle + */ + void (*free)(wsrep_t* wsrep); + + void *dlh; //!< reserved for future use + void *ctx; //!< reserved for implemetation private context +}; + +typedef int (*wsrep_loader_fun)(wsrep_t*); + +/*! + * + * @brief Loads wsrep library + * + * @param spec path to wsrep library. If NULL or WSREP_NONE initialises dummy + * pass-through implementation. + * @param hptr wsrep handle + * @param log_cb callback to handle loader messages. Otherwise writes to stderr. + * + * @return zero on success, errno on failure + */ +int wsrep_load(const char* spec, wsrep_t** hptr, wsrep_log_cb_t log_cb); + +/*! + * @brief Unloads wsrep library and frees associated resources + * + * @param hptr wsrep handler pointer + */ +void wsrep_unload(wsrep_t* hptr); + +#ifdef __cplusplus +} +#endif + +#endif /* WSREP_H */ diff --git a/wsrep/wsrep_dummy.c b/wsrep/wsrep_dummy.c new file mode 100644 index 00000000000..6d01ce14b4e --- /dev/null +++ b/wsrep/wsrep_dummy.c @@ -0,0 +1,368 @@ +/* Copyright (C) 2009-2010 Codership Oy <info@codersihp.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +/*! @file Dummy wsrep API implementation. */ + +#include <errno.h> + +#include "wsrep_api.h" + +/*! Dummy backend context. */ +typedef struct wsrep_dummy +{ + wsrep_log_cb_t log_fn; +} wsrep_dummy_t; + +/* Get pointer to wsrep_dummy context from wsrep_t pointer */ +#define WSREP_DUMMY(_p) ((wsrep_dummy_t *) (_p)->ctx) + +/* Trace function usage a-la DBUG */ +#define WSREP_DBUG_ENTER(_w) do { \ + if (WSREP_DUMMY(_w)) { \ + if (WSREP_DUMMY(_w)->log_fn) \ + WSREP_DUMMY(_w)->log_fn(WSREP_LOG_DEBUG, __FUNCTION__); \ + } \ + } while (0) + + +static void dummy_free(wsrep_t *w) +{ + WSREP_DBUG_ENTER(w); + free(w->ctx); + w->ctx = NULL; +} + +static wsrep_status_t dummy_init (wsrep_t* w, + const struct wsrep_init_args* args) +{ + WSREP_DUMMY(w)->log_fn = args->logger_cb; + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static uint64_t dummy_capabilities (wsrep_t* w __attribute__((unused))) +{ + return 0; +} + +static wsrep_status_t dummy_options_set( + wsrep_t* w, + const char* conf __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static char* dummy_options_get (wsrep_t* w) +{ + WSREP_DBUG_ENTER(w); + return NULL; +} + +static wsrep_status_t dummy_connect( + wsrep_t* w, + const char* name __attribute__((unused)), + const char* url __attribute__((unused)), + const char* donor __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_disconnect(wsrep_t* w) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_recv(wsrep_t* w, + void* recv_ctx __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_pre_commit( + wsrep_t* w, + const wsrep_conn_id_t conn_id __attribute__((unused)), + wsrep_trx_handle_t* trx_handle __attribute__((unused)), + const void* query __attribute__((unused)), + const size_t query_len __attribute__((unused)), + uint64_t flags __attribute__((unused)), + wsrep_seqno_t* seqno __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_post_commit( + wsrep_t* w, + wsrep_trx_handle_t* trx_handle __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_post_rollback( + wsrep_t* w, + wsrep_trx_handle_t* trx_handle __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_replay_trx( + wsrep_t* w, + wsrep_trx_handle_t* trx_handle __attribute__((unused)), + void* trx_ctx __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_abort_pre_commit( + wsrep_t* w, + const wsrep_seqno_t bf_seqno __attribute__((unused)), + const wsrep_trx_id_t trx_id __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_append_query( + wsrep_t* w, + wsrep_trx_handle_t* trx_handle __attribute__((unused)), + const char* query __attribute__((unused)), + const time_t timeval __attribute__((unused)), + const uint32_t randseed __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_append_row_key( + wsrep_t* w, + wsrep_trx_handle_t* trx_handle __attribute__((unused)), + const wsrep_key_t* key __attribute__((unused)), + const size_t key_len __attribute__((unused)), + const bool shared __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_append_data( + wsrep_t* w, + wsrep_trx_handle_t* trx_handle __attribute__((unused)), + const void* data __attribute__((unused)), + size_t data_len __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_causal_read( + wsrep_t* w, + wsrep_seqno_t* seqno __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_free_connection( + wsrep_t* w, + const wsrep_conn_id_t conn_id __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_to_execute_start( + wsrep_t* w, + const wsrep_conn_id_t conn_id __attribute__((unused)), + const wsrep_key_t* key __attribute__((unused)), + const size_t key_len __attribute__((unused)), + const void* query __attribute__((unused)), + const size_t query_len __attribute__((unused)), + wsrep_seqno_t* seqno __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_to_execute_end( + wsrep_t* w, + const wsrep_conn_id_t conn_id __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_sst_sent( + wsrep_t* w, + const wsrep_uuid_t* uuid __attribute__((unused)), + wsrep_seqno_t seqno __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_sst_received( + wsrep_t* w, + const wsrep_uuid_t* uuid __attribute__((unused)), + const wsrep_seqno_t seqno __attribute__((unused)), + const char* state __attribute__((unused)), + const size_t state_len __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_snapshot( + wsrep_t* w, + const void* msg __attribute__((unused)), + const size_t msg_len __attribute__((unused)), + const char* donor_spec __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static struct wsrep_stats_var dummy_stats[] = { + { NULL, WSREP_VAR_STRING, { 0 } } +}; + +static struct wsrep_stats_var* dummy_stats_get (wsrep_t* w) +{ + WSREP_DBUG_ENTER(w); + return dummy_stats; +} + +static void dummy_stats_free ( + wsrep_t* w, + struct wsrep_stats_var* stats __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); +} + +static wsrep_seqno_t dummy_pause (wsrep_t* w) +{ + WSREP_DBUG_ENTER(w); + return -ENOSYS; +} + +static wsrep_status_t dummy_resume (wsrep_t* w) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_desync (wsrep_t* w) +{ + WSREP_DBUG_ENTER(w); + return WSREP_NOT_IMPLEMENTED; +} + +static wsrep_status_t dummy_resync (wsrep_t* w) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static wsrep_status_t dummy_lock (wsrep_t* w, + const char* s __attribute__((unused)), + int64_t o __attribute__((unused)), + int64_t t __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_NOT_IMPLEMENTED; +} + +static wsrep_status_t dummy_unlock (wsrep_t* w, + const char* s __attribute__((unused)), + int64_t o __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + +static bool dummy_is_locked (wsrep_t* w, + const char* s __attribute__((unused)), + int64_t* o __attribute__((unused)), + wsrep_uuid_t* t __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return false; +} + +static wsrep_t dummy_iface = { + WSREP_INTERFACE_VERSION, + &dummy_init, + &dummy_capabilities, + &dummy_options_set, + &dummy_options_get, + &dummy_connect, + &dummy_disconnect, + &dummy_recv, + &dummy_pre_commit, + &dummy_post_commit, + &dummy_post_rollback, + &dummy_replay_trx, + &dummy_abort_pre_commit, + &dummy_append_query, + &dummy_append_row_key, + &dummy_append_data, + &dummy_causal_read, + &dummy_free_connection, + &dummy_to_execute_start, + &dummy_to_execute_end, + &dummy_sst_sent, + &dummy_sst_received, + &dummy_snapshot, + &dummy_stats_get, + &dummy_stats_free, + &dummy_pause, + &dummy_resume, + &dummy_desync, + &dummy_resync, + &dummy_lock, + &dummy_unlock, + &dummy_is_locked, + WSREP_NONE, + WSREP_INTERFACE_VERSION, + "Codership Oy <info@codership.com>", + &dummy_free, + NULL, + NULL +}; + +int wsrep_dummy_loader(wsrep_t* w) +{ + if (!w) + return EINVAL; + + *w = dummy_iface; + + // allocate private context + if (!(w->ctx = malloc(sizeof(wsrep_dummy_t)))) + return ENOMEM; + + // initialize private context + WSREP_DUMMY(w)->log_fn = NULL; + + return 0; +} + diff --git a/wsrep/wsrep_loader.c b/wsrep/wsrep_loader.c new file mode 100644 index 00000000000..b4460658f80 --- /dev/null +++ b/wsrep/wsrep_loader.c @@ -0,0 +1,199 @@ +/* Copyright (C) 2009-2011 Codership Oy <info@codersihp.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +/*! @file wsrep implementation loader */ + +#include <dlfcn.h> +#include <errno.h> +#include <string.h> +#include <stdio.h> + +#include "wsrep_api.h" + +// Logging stuff for the loader +static const char* log_levels[] = {"FATAL", "ERROR", "WARN", "INFO", "DEBUG"}; + +static void default_logger (wsrep_log_level_t lvl, const char* msg) +{ + fprintf (stderr, "wsrep loader: [%s] %s\n", log_levels[lvl], msg); +} + +static wsrep_log_cb_t logger = default_logger; + +/************************************************************************** + * Library loader + **************************************************************************/ + +static int verify(const wsrep_t *wh, const char *iface_ver) +{ + const size_t msg_len = 128; + char msg[msg_len]; + +#define VERIFY(_p) if (!(_p)) { \ + snprintf(msg, msg_len, "wsrep_load(): verify(): %s\n", # _p); \ + logger (WSREP_LOG_ERROR, msg); \ + return EINVAL; \ + } + + VERIFY(wh); + VERIFY(wh->version); + + if (strcmp(wh->version, iface_ver)) { + snprintf (msg, msg_len, + "provider interface version mismatch: need '%s', found '%s'", + iface_ver, wh->version); + logger (WSREP_LOG_ERROR, msg); + return EINVAL; + } + + VERIFY(wh->init); + VERIFY(wh->options_set); + VERIFY(wh->options_get); + VERIFY(wh->connect); + VERIFY(wh->disconnect); + VERIFY(wh->recv); + VERIFY(wh->pre_commit); + VERIFY(wh->post_commit); + VERIFY(wh->post_rollback); + VERIFY(wh->replay_trx); + VERIFY(wh->abort_pre_commit); + VERIFY(wh->append_query); + VERIFY(wh->append_key); + VERIFY(wh->free_connection); + VERIFY(wh->to_execute_start); + VERIFY(wh->to_execute_end); + VERIFY(wh->sst_sent); + VERIFY(wh->sst_received); + VERIFY(wh->stats_get); + VERIFY(wh->stats_free); + VERIFY(wh->pause); + VERIFY(wh->resume); + VERIFY(wh->desync); + VERIFY(wh->resync); + VERIFY(wh->lock); + VERIFY(wh->unlock); + VERIFY(wh->is_locked); + VERIFY(wh->provider_name); + VERIFY(wh->provider_version); + VERIFY(wh->provider_vendor); + VERIFY(wh->free); + return 0; +} + + +static wsrep_loader_fun wsrep_dlf(void *dlh, const char *sym) +{ + union { + wsrep_loader_fun dlfun; + void *obj; + } alias; + alias.obj = dlsym(dlh, sym); + return alias.dlfun; +} + +extern int wsrep_dummy_loader(wsrep_t *w); + +int wsrep_load(const char *spec, wsrep_t **hptr, wsrep_log_cb_t log_cb) +{ + int ret = 0; + void *dlh = NULL; + wsrep_loader_fun dlfun; + const size_t msg_len = 1024; + char msg[msg_len + 1]; + msg[msg_len] = 0; + + if (NULL != log_cb) + logger = log_cb; + + if (!(spec && hptr)) + return EINVAL; + + snprintf (msg, msg_len, + "wsrep_load(): loading provider library '%s'", spec); + logger (WSREP_LOG_INFO, msg); + + if (!(*hptr = malloc(sizeof(wsrep_t)))) { + logger (WSREP_LOG_FATAL, "wsrep_load(): out of memory"); + return ENOMEM; + } + + if (!spec || strcmp(spec, WSREP_NONE) == 0) { + if ((ret = wsrep_dummy_loader(*hptr)) != 0) { + free (*hptr); + *hptr = NULL; + } + return ret; + } + + if (!(dlh = dlopen(spec, RTLD_NOW | RTLD_LOCAL))) { + snprintf(msg, msg_len, "wsrep_load(): dlopen(): %s", dlerror()); + logger (WSREP_LOG_ERROR, msg); + ret = EINVAL; + goto out; + } + + if (!(dlfun = wsrep_dlf(dlh, "wsrep_loader"))) { + ret = EINVAL; + goto out; + } + + if ((ret = (*dlfun)(*hptr)) != 0) { + snprintf(msg, msg_len, "wsrep_load(): loader failed: %s", + strerror(ret)); + logger (WSREP_LOG_ERROR, msg); + goto out; + } + + if ((ret = verify(*hptr, WSREP_INTERFACE_VERSION)) != 0) { + snprintf (msg, msg_len, + "wsrep_load(): interface version mismatch: my version %s, " + "provider version %s", WSREP_INTERFACE_VERSION, + (*hptr)->version); + logger (WSREP_LOG_ERROR, msg); + goto out; + } + + (*hptr)->dlh = dlh; + +out: + if (ret != 0) { + if (dlh) dlclose(dlh); + free(*hptr); + *hptr = NULL; + } else { + snprintf (msg, msg_len, + "wsrep_load(): %s %s by %s loaded succesfully.", + (*hptr)->provider_name, (*hptr)->provider_version, + (*hptr)->provider_vendor); + logger (WSREP_LOG_INFO, msg); + } + + return ret; +} + +void wsrep_unload(wsrep_t *hptr) +{ + if (!hptr) { + logger (WSREP_LOG_WARN, "wsrep_unload(): null pointer."); + } else { + if (hptr->free) + hptr->free(hptr); + if (hptr->dlh) + dlclose(hptr->dlh); + free(hptr); + } +} + diff --git a/wsrep/wsrep_uuid.c b/wsrep/wsrep_uuid.c new file mode 100644 index 00000000000..c99240cc071 --- /dev/null +++ b/wsrep/wsrep_uuid.c @@ -0,0 +1,78 @@ +/* Copyright (C) 2009 Codership Oy <info@codersihp.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +/*! @file Helper functions to deal with history UUID string representations */ + +#include <errno.h> +#include <ctype.h> +#include <stdio.h> + +#include "wsrep_api.h" + +/*! + * Read UUID from string + * @return length of UUID string representation or -EINVAL in case of error + */ +ssize_t +wsrep_uuid_scan (const char* str, size_t str_len, wsrep_uuid_t* uuid) +{ + size_t uuid_len = 0; + size_t uuid_offt = 0; + + while (uuid_len + 1 < str_len) { + if ((4 == uuid_offt || 6 == uuid_offt || 8 == uuid_offt || + 10 == uuid_offt) && str[uuid_len] == '-') { + // skip dashes after 4th, 6th, 8th and 10th positions + uuid_len += 1; + continue; + } + if (isxdigit(str[uuid_len]) && isxdigit(str[uuid_len + 1])) { + // got hex digit + sscanf (str + uuid_len, "%2hhx", uuid->uuid + uuid_offt); + uuid_len += 2; + uuid_offt += 1; + if (sizeof (uuid->uuid) == uuid_offt) + return uuid_len; + } + else { + break; + } + } + + *uuid = WSREP_UUID_UNDEFINED; + return -EINVAL; +} + +/*! + * Write UUID to string + * @return length of UUID string representation or -EMSGSIZE if string is too + * short + */ +ssize_t +wsrep_uuid_print (const wsrep_uuid_t* uuid, char* str, size_t str_len) +{ + if (str_len > 36) { + const unsigned char* u = uuid->uuid; + return snprintf(str, str_len, "%02x%02x%02x%02x-%02x%02x-%02x%02x-" + "%02x%02x-%02x%02x%02x%02x%02x%02x", + u[ 0], u[ 1], u[ 2], u[ 3], u[ 4], u[ 5], u[ 6], u[ 7], + u[ 8], u[ 9], u[10], u[11], u[12], u[13], u[14], u[15]); + } + else { + return -EMSGSIZE; + } +} + |