summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSeppo Jaakola <seppo.jaakola@codership.com>2012-04-26 13:09:06 +0300
committerSeppo Jaakola <seppo.jaakola@codership.com>2012-04-26 13:09:06 +0300
commitf96fd3f40f37c0080e71e45f85e53bd156aa27f5 (patch)
tree7aa68fd52aed378a26ef8ae804e78fdbc0e99bd5
parent2fc1ec43560b453b4694adbc1aac11f3f23b1761 (diff)
downloadmariadb-git-f96fd3f40f37c0080e71e45f85e53bd156aa27f5.tar.gz
Added wsrep specific files
-rwxr-xr-xBUILD/compile-amd64-debug-wsrep11
-rwxr-xr-xBUILD/compile-amd64-wsrep9
-rwxr-xr-xBUILD/compile-pentium-debug-wsrep12
-rwxr-xr-xBUILD/compile-pentium-wsrep11
-rwxr-xr-xBUILD/compile-pentium64-wsrep28
-rw-r--r--Docs/README-wsrep466
-rw-r--r--cmake/wsrep.cmake59
-rw-r--r--scripts/wsrep_sst_mysqldump.sh133
-rw-r--r--scripts/wsrep_sst_rsync.sh214
-rw-r--r--sql/wsrep_check_opts.cc392
-rw-r--r--sql/wsrep_hton.cc419
-rw-r--r--sql/wsrep_mysqld.cc1060
-rw-r--r--sql/wsrep_mysqld.h297
-rw-r--r--sql/wsrep_notify.cc107
-rw-r--r--sql/wsrep_priv.h231
-rw-r--r--sql/wsrep_sst.cc932
-rw-r--r--sql/wsrep_utils.cc447
-rw-r--r--sql/wsrep_var.cc505
-rw-r--r--support-files/wsrep.cnf.sh125
-rw-r--r--support-files/wsrep_notify.sh102
-rw-r--r--wsrep/CMakeLists.txt24
-rw-r--r--wsrep/Makefile.am2
-rw-r--r--wsrep/wsrep_api.h875
-rw-r--r--wsrep/wsrep_dummy.c368
-rw-r--r--wsrep/wsrep_loader.c199
-rw-r--r--wsrep/wsrep_uuid.c78
26 files changed, 7106 insertions, 0 deletions
diff --git a/BUILD/compile-amd64-debug-wsrep b/BUILD/compile-amd64-debug-wsrep
new file mode 100755
index 00000000000..995a8afcca9
--- /dev/null
+++ b/BUILD/compile-amd64-debug-wsrep
@@ -0,0 +1,11 @@
+#! /bin/sh
+
+path=`dirname $0`
+. "$path/SETUP.sh"
+
+extra_flags="$amd64_cflags $debug_cflags -g -O0 $wsrep_cflags"
+c_warnings="$c_warnings $debug_extra_warnings"
+cxx_warnings="$cxx_warnings $debug_extra_warnings"
+extra_configs="$amd64_configs $debug_configs $wsrep_configs --with-wsrep"
+
+. "$path/FINISH.sh"
diff --git a/BUILD/compile-amd64-wsrep b/BUILD/compile-amd64-wsrep
new file mode 100755
index 00000000000..57dfbdd6da2
--- /dev/null
+++ b/BUILD/compile-amd64-wsrep
@@ -0,0 +1,9 @@
+#! /bin/sh
+
+path=`dirname $0`
+. "$path/SETUP.sh"
+
+extra_flags="$amd64_cflags $fast_cflags -g $wsrep_cflags"
+extra_configs="$amd64_configs $wsrep_configs --with-wsrep"
+
+. "$path/FINISH.sh"
diff --git a/BUILD/compile-pentium-debug-wsrep b/BUILD/compile-pentium-debug-wsrep
new file mode 100755
index 00000000000..ee68e3fd0c1
--- /dev/null
+++ b/BUILD/compile-pentium-debug-wsrep
@@ -0,0 +1,12 @@
+#! /bin/sh -x
+
+path=`dirname $0`
+set -- "$@" --with-debug=full
+. "$path/SETUP.sh"
+
+extra_flags="$pentium_cflags $debug_cflags -g -O0 $wsrep_cflags"
+c_warnings="$c_warnings $debug_extra_warnings"
+cxx_warnings="$cxx_warnings $debug_extra_warnings"
+extra_configs="$pentium_configs $debug_configs $wsrep_configs --with-wsrep"
+
+. "$path/FINISH.sh"
diff --git a/BUILD/compile-pentium-wsrep b/BUILD/compile-pentium-wsrep
new file mode 100755
index 00000000000..eeb14310e4e
--- /dev/null
+++ b/BUILD/compile-pentium-wsrep
@@ -0,0 +1,11 @@
+#! /bin/sh
+
+path=`dirname $0`
+. "$path/SETUP.sh"
+
+extra_flags="$pentium_cflags $fast_cflags $wsrep_cflags"
+extra_configs="$pentium_configs $wsrep_configs --with-wsrep"
+
+#strip=yes
+
+. "$path/FINISH.sh"
diff --git a/BUILD/compile-pentium64-wsrep b/BUILD/compile-pentium64-wsrep
new file mode 100755
index 00000000000..0bccb34e753
--- /dev/null
+++ b/BUILD/compile-pentium64-wsrep
@@ -0,0 +1,28 @@
+#! /bin/sh
+
+# Copyright (C) 2006, 2007 MySQL AB
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Library General Public
+# License as published by the Free Software Foundation; version 2
+# of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Library General Public License for more details.
+#
+# You should have received a copy of the GNU Library General Public
+# License along with this library; if not, write to the Free
+# Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
+# MA 02111-1307, USA
+
+path=`dirname $0`
+. "$path/SETUP.sh"
+
+extra_flags="$pentium64_cflags $fast_cflags -g $wsrep_cflags"
+extra_configs="$pentium_configs $static_link $wsrep_configs --with-wsrep"
+CC="$CC --pipe"
+strip=yes
+
+. "$path/FINISH.sh"
diff --git a/Docs/README-wsrep b/Docs/README-wsrep
new file mode 100644
index 00000000000..ea6a62621ed
--- /dev/null
+++ b/Docs/README-wsrep
@@ -0,0 +1,466 @@
+Codership Oy
+http://www.codership.com
+<info@codership.com>
+
+DISCLAIMER
+
+THIS SOFTWARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER
+EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
+IN NO EVENT SHALL CODERSHIP OY BE HELD LIABLE TO ANY PARTY FOR ANY DAMAGES
+RESULTING DIRECTLY OR INDIRECTLY FROM THE USE OF THIS SOFTWARE.
+
+Trademark Information.
+
+MySQL is a trademark or registered trademark of Oracle and/or its affiliates.
+Other trademarks are the property of their respective owners.
+
+Licensing Information.
+
+Please see file COPYING that came with this distribution
+
+Source code can be found at
+wsrep API: https://launchpad.net/wsrep
+MySQL patch: https://launchpad.net/codership-mysql
+
+
+ABOUT THIS DOCUMENT
+
+This document covers installation and configuration issues specific to this
+wsrep-patched MySQL distribution by Codership. It does not cover the use or
+administration of MySQL server per se. The reader is assumed to know how to
+install, configure, administer and use standard MySQL server version 5.1.xx.
+
+
+ MYSQL-5.5.x/wsrep-23.x
+
+CONTENTS:
+=========
+1. WHAT IS WSREP PATCH FOR MYSQL
+2. INSTALLATION
+3. FIRST TIME SETUP
+ 3.1 CONFIGURATION FILES
+ 3.2 DATABASE PRIVILEGES
+ 3.3 CHECK AND CORRECT FIREWALL SETTINGS
+ 3.4 SELINUX
+ 3.5 APPARMOR
+ 3.6 CONNECT TO CLUSTER
+4. UPGRADING FROM MySQL 5.1.x
+5. CONFIGURATION OPTIONS
+ 5.1 MANDATORY MYSQL OPTIONS
+ 5.2 WSREP OPTIONS
+6. ONLINE SCHEMA UPGRADE
+ 6.1 TOTAL ORDER ISOLATION (TOI)
+ 6.2 ROLLING SCHEMA UPGRADE (RSU)
+7. LIMITATIONS
+
+
+1. WHAT IS WSREP PATCH FOR MYSQL/INNODB
+
+Wsrep API developed by Codership Oy is a modern generic (database-agnostic)
+replication API for transactional databases with a goal to make database
+replication/logging subsystem completely modular and pluggable. It is developed
+with flexibility and completeness in mind to satisfy broad range of modern
+replication scenarios. It is equally suitable for synchronous and asynchronous,
+master-slave and multi-master replication.
+
+wsrep stands for Write Set REPlication.
+
+Wsrep patch for MySQL/InnoDB allows MySQL server to load and use various wsrep
+API implementations ("wsrep providers") with different qualities of service.
+Without wsrep provider MySQL-wsrep server will function like a regular
+standalone server.
+
+
+2. INSTALLATION
+
+In the examples below mysql authentication options are omitted for brevity.
+
+2.1 Download and install mysql-wsrep package.
+
+Download binary package for your Linux distribution from
+https://launchpad.net/codership-mysql/
+
+2.1.1 On Debian and Debian-derived distributions.
+
+Upgrade from mysql-server-5.0 to mysql-wsrep is not supported yet, please
+upgrade to mysql-server-5.1 first.
+
+If you're installing over an existing mysql installation, mysql-server-wsrep
+will conflict with mysql-server-5.1 package, so remove it first:
+
+$ sudo apt-get remove mysql-server-5.1 mysql-server-core-5.1
+
+mysql-server-wsrep requires psmisc and mysql-client-5.1.47 (or later).
+MySQL 5.1 packages can be found from backports repositories.
+For further information about configuring and using Debian or Ubuntu
+backports, see:
+
+* http://backports.debian.org
+
+* https://help.ubuntu.com/community/UbuntuBackports
+
+For example, installation of required packages on Debian Lenny:
+
+$ sudo apt-get install psmisc
+$ sudo apt-get -t lenny-backports install mysql-client-5.1
+
+Now you should be able to install mysql-wsrep package:
+
+$ sudo dpkg -i <mysql-server-wsrep DEB>
+
+2.1.2 On CentOS and similar RPM-based distributions.
+
+If you're migrating from existing MySQL installation, there are two variants:
+
+ a) If you're already using official MySQL-server-community 5.1.x RPM from
+ Oracle:
+
+ # rpm -e mysql-server
+
+ b) If you're upgrading from the stock mysql-5.0.77 on CentOS:
+
+ 1) Make sure that the following packages are not installed:
+ # rpm --nodeps --allmatches -e mysql-server mysql-test mysql-bench
+
+ 2) Install *official* MySQL-shared-compat-5.1.x from
+ http://dev.mysql.com/downloads/mysql/5.1.html
+
+Actual installation:
+
+ # rpm -Uvh <MySQL-server-wsrep RPM>
+
+ If this fails due to unsatisfied dependencies, install missing packages
+ (e.g. yum install perl-DBI) and retry.
+
+Additional packages to consider (if not yet installed):
+ * galera (multi-master replication provider, https://launchpad.net/galera)
+ * MySQL-client-community (for connecting to server and mysqldump-based SST)
+ * rsync (for rsync-based SST)
+
+2.2 Upgrade system tables.
+
+If you're upgrading a previous MySQL installation, it might be advisable to
+upgrade system tables. To do that start mysqld and run mysql_upgrade command.
+Consult MySQL documentation in case of errors. Normally they are not critical
+and can be ignored unless specific functionality is needed.
+
+
+3. FIRST TIME SETUP
+
+Unless you're upgrading an already installed mysql-wsrep package, you will need
+to set up a few things to prepare server for operation.
+
+3.1 CONFIGURATION FILES
+
+* Make sure system-wide my.cnf does not bind mysqld to 127.0.0.1. That is, if
+ you have the following line in [mysqld] section, comment it out:
+
+ #bind-address = 127.0.0.1
+
+* Make sure system-wide my.cnf contains "!includedir /etc/mysql/conf.d/" line.
+
+* Edit /etc/mysql/conf.d/wsrep.cnf and set wsrep_provider option by specifying
+ a path to provider library. If you don't have a provider, leave it as it is.
+
+* When a new node joins the cluster it'll have to receive a state snapshot from
+ one of the peers. This requires a privileged MySQL account with access from
+ the rest of the cluster. Edit /etc/mysql/conf.d/wsrep.cnf and set mysql
+ login/password pair for SST, for example:
+
+ wsrep_sst_auth=wsrep_sst:wspass
+
+* See CONFIGURATION section below about other configuration parameters that you
+ might want to change at this point.
+
+3.2 DATABASE PRIVILEGES
+
+Restart MySQL server and connect to it as root to grant privileges to SST
+account (empty users confuse MySQL authentication matching rules, we need to
+delete them too):
+
+$ mysql -e "SET wsrep_on=OFF; DELETE FROM mysql.user WHERE user='';"
+$ mysql -e "SET wsrep_on=OFF; GRANT ALL ON *.* TO wsrep_sst@'%' IDENTIFIED BY 'wspass'";
+
+3.3 CHECK AND CORRECT FIREWALL SETTINGS.
+
+MySQL-wsrep server needs to be accessible from other cluster members through
+its client listening socket and through wsrep provider socket. See your
+distribution and wsrep provider documentation for details. For example on
+CentOS you might need to do something along these lines:
+
+# iptables --insert RH-Firewall-1-INPUT 1 --proto tcp --source <my IP>/24 --destination <my IP>/32 --dport 3306 -j ACCEPT
+# iptables --insert RH-Firewall-1-INPUT 1 --proto tcp --source <my IP>/24 --destination <my IP>/32 --dport 4567 -j ACCEPT
+
+If there is a NAT firewall between the nodes, it must be configured to allow
+direct connections between the nodes (e.g. via port forwarding).
+
+3.4 SELINUX
+
+If you have SELinux enabled, it may block mysqld from doing required operations.
+You'll need to either disable it or configure to allow mysqld to run external
+programs and open listen sockets at unprivileged ports (i.e. things that
+an unprivileged user can do). See SELinux documentation about it.
+
+To quickly disable SELinux:
+1) run 'setenforce 0' as root.
+2) set 'SELINUX=permissive' in /etc/selinux/config
+
+3.5 APPARMOR
+
+AppArmor automatically comes with Ubuntu and may also prevent mysqld to from
+opening additional ports or run scripts. See AppArmor documentation about its
+configuration. To disable AppArmor for mysqld:
+
+$ cd /etc/apparmor.d/disable/
+$ sudo ln -s /etc/apparmor.d/usr.sbin.mysqld
+$ sudo service apparmor restart
+
+
+3.6 CONNECT TO CLUSTER
+
+Now you're ready to connect to cluster by setting wsrep_cluster_address variable
+and monitor status of wsrep provider:
+
+mysql> SET GLOBAL wsrep_cluster_address='<cluster address string>';
+mysql> SHOW STATUS LIKE 'wsrep%';
+
+
+4 UPGRADING FROM MySQL 5.1.x
+
+!!! THESE INSTRUCTIONS ARE PRELIMINARY AND INCOMPLETE !!!
+
+1) BEFORE UPGRADE (while running 5.1.x):
+ - comment out 'wsrep_provider' setting from configuration files
+ (my.cnf and/or wsrep.cnf)
+ - If performing a rolling upgrade on a running cluster, set
+ wsrep_sst_method=mysqldump.
+ You might also need to configure wsrep_sst_receive_address and
+ wsrep_sst_auth appropriately. Mysqldump is the only way to transfer data
+ from 5.1.x to 5.5.x reliably.
+ - remove innodb_plugin settings from configuration files.
+
+2) Perform upgrade as usual:
+ http://dev.mysql.com/doc/refman/5.5/en/upgrading-from-previous-series.html
+ Don't forget to run 'mysql_upgrade' command.
+
+3) AFTER UPGRADING individual node:
+ - uncomment 'wsrep_provider' line in configuration file.
+ - restart the server and join the cluster.
+
+4) AFTER UPGRADING the whole cluster:
+ - revert to usual wsrep SST settings if not 'mysqldump'.
+
+
+5. CONFIGURATION OPTIONS
+
+5.1 MANDATORY MYSQL OPTIONS
+
+binlog_format=ROW
+ This option is required to use row-level replication as opposed to
+ statement-level. For performance and consistency considerations don't change
+ that. As a side effect, binlog, if turned on, can be ROW only. In future this
+ option won't have special meaning.
+
+innodb_autoinc_lock_mode=2
+ This is a required parameter. Without it INSERTs into tables with
+ AUTO_INCREMENT column may fail.
+ autoinc lock modes 0 and 1 can cause unresolved deadlock, and make
+ system unresponsive.
+
+innodb_locks_unsafe_for_binlog=1
+ This option is required for parallel applying.
+
+5.2 WSREP OPTIONS
+
+All options are optional except for wsrep_provider, wsrep_cluster_address, and
+wsrep_sst_auth.
+
+wsrep_provider=none
+ A full path to the library that implements WSREP interface. If none is
+ specified, the server behaves like a regular mysqld.
+
+wsrep_provider_options=
+ Provider-specific option string. Check wsrep provider documentation or
+ http://www.codership.com/wiki
+
+wsrep_cluster_address=
+ Provider-specific cluster address string. This is used to connect a node to
+ the desired cluster. This option can be given either on mysqld startup or set
+ during runtime. See wsrep provider documentation for possible values.
+
+wsrep_cluster_name="my_wsrep_cluster"
+ Logical cluster name, must be the same for all nodes of the cluster.
+
+wsrep_node_name=
+ Human readable node name (for easier log reading only). Defaults to hostname.
+
+wsrep_slave_threads=1
+ Number of threads dedicated to processing of writesets from other nodes.
+ For best performance should be few per CPU core.
+
+wsrep_dbug_option
+ Options for the built-in DBUG library (independent from what MySQL uses).
+ Empty by default. Not currently in use.
+
+wsrep_debug=0
+ Enable debug-level logging.
+
+wsrep_convert_LOCK_to_trx=0
+ Implicitly convert locking sessions into transactions inside mysqld. By
+ itself it does not mean support for locking sessions, but it prevents the
+ database from going into logically inconsistent state. Note however, that
+ loading large database dump with LOCK statements might result in abnormally
+ large transactions and cause an out-of-memory condition
+
+wsrep_retry_autocommit=1
+ Retry autocommit queries and single statement transactions should they fail
+ certification test. This is analogous to rescheduling an autocommit query
+ should it go into deadlock with other transactions in the database lock
+ manager.
+
+wsrep_auto_increment_control=1
+ Automatically adjust auto_increment_increment and auto_increment_offset
+ variables based on the number of nodes in the cluster. Significantly reduces
+ certification conflict rate for INSERTS.
+
+wsrep_drupal_282555_workaround=1
+ MySQL seems to have an obscure bug when INSERT into table with
+ AUTO_INCREMENT column with NULL value for that column can fail with a
+ duplicate key error. When this option is on, it retries such INSERTs.
+ Required for stable Drupal operation. Documented at:
+ http://bugs.mysql.com/bug.php?id=41984
+ http://drupal.org/node/282555
+
+wsrep_causal_reads=0
+ Enforce strict READ COMMITTED semantics on reads and transactions. May
+ result in additional latencies. It is a session variable.
+
+wsrep_OSU_method=TOI
+ Online Schema Upgrade (OSU) can be performed with two alternative methods:
+ Total Order Isolation (TOI) runs DDL statement in all cluster nodes in
+ same total order sequence locking the affected table for the duration of the
+ operation. This may result in the whole cluster being blocked for the
+ duration of the operation.
+ Rolling Schema Upgrade (RSU) executes the DDL statement only locally, thus
+ blocking only one cluster node. During the DDL processing, the node
+ is not replicating and may be unable to process replication events (due to
+ table lock). Once DDL operation is complete, the node will catch up and sync
+ with the cluster to become fully operational again. The DDL statement or
+ its effects are not replicated, so it is user's responsibility to manually
+ perform this operation on each of the nodes.
+
+wsrep_forced_binlog_format=none
+ Force every transaction to use given binlog format. When this variable is
+ set to something else than NONE, all transactions will use the given forced
+ format, regardless of what the client session has specified in binlog_format.
+ Valid choices for wsrep_forced_binlog_format are: ROW, STATEMENT, MIXED and
+ special value NONE, meaning that there is no forced binlog format in effect.
+ This variable was intruduced to support STATEMENT format replication during
+ rolling schema upgrade processing. However, in most cases ROW replication
+ is valid for asymmetrict schema replication.
+
+State snapshot transfer options.
+
+When a new node joins the cluster it has to synchronize its initial state with
+the other cluster members by transferring state snapshot from one of them.
+The options below govern how this happens and should be set up before attempting
+to join or start a cluster.
+
+wsrep_sst_method=mysqldump
+ What method to use to copy database state to a newly joined node. Supported
+ methods:
+ - mysqldump: slow (except for small datasets) but most tested.
+ - rsync: much faster on large datasets.
+ - rsync_wan: same as rsync but with deltaxfer to minimize network traffic.
+
+wsrep_sst_receive_address=
+ Address (hostname:port) at which this node wants to receive state snapshot.
+ Defaults to mysqld bind address, and if that is not specified (0.0.0.0) -
+ to the first IP of eth0 + mysqld bind port.
+ NOTE: check that your firewall allows connections to this address from other
+ cluster nodes.
+
+wsrep_sst_auth=
+ Authentication information needed for state transfer. Depends on the state
+ transfer method. For mysqldump-based SST it is
+ <mysql_root_user>:<mysql_root_password>
+ and should be the same on all nodes - it is used to authenticate with both
+ state snapshot receiver and state snapshot donor.
+
+wsrep_sst_donor=
+ A name of the node which should serve as state snapshot donor. This allows
+ to control which node will serve state snapshot request. By default the
+ most suitable node is chosen by wsrep provider. This is the same as given in
+ wsrep_node_name.
+
+
+6. ONLINE SCHEMA UPGRADE
+
+ Schema upgrades mean any data definition statements (DDL statemnents) run
+ for the database. They change the database structure and are non-
+ transactional.
+
+ Release 22.3 brings a new method for performing schema upgrades. User can
+ now choose whether to use the traditional total order isolation or new
+ rolling schema upgrade method. The OSU method choice is done by global
+ parameter: 'wsrep_OSU_method'.
+
+6.1 Total Order Isolation (TOI)
+
+ With earlier releases, DDL processing happened always by Total Order
+ Isolation (TOI) method. With TOI, the DDL was scheduled to be processed in
+ same transaction seqeuncing 'slot' in each cluster node.
+ The processing is secured by locking the affected table from any other use.
+ With TOI method, the whole cluster has part of the database locked for the
+ duration of the DDL processing.
+
+6.2 Rolling Schema Upgrade (RSU)
+
+ Rolling schema upgrade is new DDL processing method, where DDL will be
+ processed locally for the node. The node is disconnected of the replication
+ for the duration of the DDL processing, so that there is only DDL statement
+ processing in the node and it does not block the rest of the cluster. When
+ the DDL processing is complete, the node applies delayed replication events
+ and synchronizes back with the cluster.
+ The DDL can then be executed cluster-wide by running the same DDL statement
+ for each node in turn. When this rolling schema upgrade proceeds, part of
+ the cluster will have old schema structure and part of the cluster will have
+ new schema structure.
+
+
+7. LIMITATIONS
+
+1) Currently replication works only with InnoDB storage engine. Any writes to
+ tables of other types, including system (mysql.*) tables are not replicated.
+ However, DDL statements are replicated in statement level, and changes
+ to mysql.* tables will get replicated that way.
+ So, you can safely issue: CREATE USER...,
+ but issuing: INSERT INTO mysql.user..., will not be replicated.
+
+2) DELETE operation is unsupported on tables without primary key. Also rows in
+ tables without primary key may appear in different order on different nodes.
+ As a result SELECT...LIMIT... may return slightly different sets.
+
+3) Unsupported queries:
+ * LOCK/UNLOCK TABLES cannot be supported in multi-master setups.
+ * lock functions (GET_LOCK(), RELEASE_LOCK()... )
+
+4) Query log cannot be directed to table. If you enable query logging,
+ you must forward the log to a file:
+ log_output = FILE
+ Use general_log and general_log_file to choose query logging and the
+ log file name
+
+5) Maximum allowed transaction size is defined by wsrep_max_ws_rows and
+ wsrep_max_ws_size. Anything bigger (e.g. huge LOAD DATA) will be rejected.
+
+6) Due to cluster level optimistic concurrency control, transaction issuing
+ COMMIT may still be aborted at that stage. There can be two transactions.
+ writing to same rows and committing in separate cluster nodes, and only one
+ of the them can successfully commit. The failing one will be aborted.
+ For cluster level aborts, MySQL/galera cluster gives back deadlock error.
+ code (Error: 1213 SQLSTATE: 40001 (ER_LOCK_DEADLOCK)).
+
+7) XA transactions can not be supported due to possible rollback on commit.
+
diff --git a/cmake/wsrep.cmake b/cmake/wsrep.cmake
new file mode 100644
index 00000000000..1fd747cddd6
--- /dev/null
+++ b/cmake/wsrep.cmake
@@ -0,0 +1,59 @@
+# Copyright (c) 2011, Codership Oy <info@codership.com>.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+# We need to generate a proper spec file even without --with-wsrep flag,
+# so WSREP_VERSION is produced regardless
+
+# Set the patch version
+SET(WSREP_PATCH_VERSION "5")
+
+# Obtain patch revision number
+SET(WSREP_PATCH_REVNO $ENV{WSREP_REV})
+IF(NOT WSREP_PATCH_REVNO)
+ EXECUTE_PROCESS(
+ COMMAND bzr revno
+ OUTPUT_VARIABLE WSREP_PATCH_REVNO
+ RESULT_VARIABLE RESULT
+ )
+STRING(REGEX REPLACE "(\r?\n)+$" "" WSREP_PATCH_REVNO "${WSREP_PATCH_REVNO}")
+#FILE(WRITE "wsrep_config" "Debug: WSREP_PATCH_REVNO result: ${RESULT}\n")
+ENDIF()
+IF(NOT WSREP_PATCH_REVNO)
+ SET(WSREP_PATCH_REVNO "XXXX")
+ENDIF()
+
+# Obtain wsrep API version
+EXECUTE_PROCESS(
+ COMMAND sh -c "grep WSREP_INTERFACE_VERSION ${MySQL_SOURCE_DIR}/wsrep/wsrep_api.h | cut -d '\"' -f 2"
+ OUTPUT_VARIABLE WSREP_API_VERSION
+ RESULT_VARIABLE RESULT
+)
+#FILE(WRITE "wsrep_config" "Debug: WSREP_API_VERSION result: ${RESULT}\n")
+STRING(REGEX REPLACE "(\r?\n)+$" "" WSREP_API_VERSION "${WSREP_API_VERSION}")
+
+SET(WSREP_VERSION
+ "${WSREP_API_VERSION}.${WSREP_PATCH_VERSION}.r${WSREP_PATCH_REVNO}"
+)
+
+OPTION(WITH_WSREP "WSREP replication API (to use, e.g. Galera Replication library)" OFF)
+IF (WITH_WSREP)
+ SET(WSREP_C_FLAGS "-DWITH_WSREP -DWSREP_PROC_INFO -DMYSQL_MAX_VARIABLE_VALUE_LEN=2048")
+ SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${WSREP_C_FLAGS}")
+ SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${WSREP_C_FLAGS}")
+ SET(COMPILATION_COMMENT "${COMPILATION_COMMENT}, wsrep_${WSREP_VERSION}")
+ SET(WITH_EMBEDDED_SERVER OFF)
+ENDIF()
+
+#
diff --git a/scripts/wsrep_sst_mysqldump.sh b/scripts/wsrep_sst_mysqldump.sh
new file mode 100644
index 00000000000..8106850e918
--- /dev/null
+++ b/scripts/wsrep_sst_mysqldump.sh
@@ -0,0 +1,133 @@
+#!/bin/sh -e
+# Copyright (C) 2009 Codership Oy
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; see the file COPYING. If not, write to the
+# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston
+# MA 02110-1301 USA.
+
+# This is a reference script for mysqldump-based state snapshot tansfer
+
+USER=$1
+PSWD=$2
+HOST=$3
+PORT=$4
+LOCAL_HOST="127.0.0.1"
+LOCAL_PORT=$5
+UUID=$6
+SEQNO=$7
+BYPASS=$8
+
+EINVAL=22
+
+err()
+{
+ echo "SST error: $*" >&2
+}
+
+local_ip()
+{
+ PATH=$PATH:/usr/sbin:/usr/bin:/sbin:/bin
+
+ [ "$1" = "127.0.0.1" ] && return 0
+ [ "$1" = "localhost" ] && return 0
+ [ "$1" = "$(hostname -s)" ] && return 0
+ [ "$1" = "$(hostname -f)" ] && return 0
+ [ "$1" = "$(hostname -d)" ] && return 0
+
+ # Now if ip program is not found in the path, we can't return 0 since
+ # it would block any address. Thankfully grep should fail in this case
+ ip route get "$1" | grep local >/dev/null && return 0
+
+ return 1
+}
+
+if test -z "$USER"; then err "USER cannot be nil"; exit $EINVAL; fi
+if test -z "$HOST"; then err "HOST cannot be nil"; exit $EINVAL; fi
+if test -z "$PORT"; then err "PORT cannot be nil"; exit $EINVAL; fi
+if test -z "$LOCAL_PORT"; then err "LOCAL_PORT cannot be nil"; exit $EINVAL; fi
+if test -z "$UUID"; then err "UUID cannot be nil"; exit $EINVAL; fi
+if test -z "$SEQNO"; then err "SEQNO cannot be nil"; exit $EINVAL; fi
+
+if local_ip $HOST && [ "$PORT" = "$LOCAL_PORT" ]
+then
+ err "destination address '$HOST:$PORT' matches source address."
+ exit $EINVAL
+fi
+
+# Check client version
+if ! mysql --version | grep 'Distrib 5.5' >/dev/null
+then
+ mysql --version >&2
+ err "this procedure requires MySQL client version 5.5.x"
+ exit $EINVAL
+fi
+
+AUTH="-u$USER"
+if test -n "$PSWD"; then AUTH="$AUTH -p$PSWD"; fi
+
+STOP_WSREP="SET wsrep_on=OFF;"
+
+# NOTE: we don't use --routines here because we're dumping mysql.proc table
+#MYSQLDUMP="@bindir@/mysqldump $AUTH -h$LOCAL_HOST -P$LOCAL_PORT \
+MYSQLDUMP="mysqldump $AUTH -h$LOCAL_HOST -P$LOCAL_PORT \
+--add-drop-database --add-drop-table --skip-add-locks --create-options \
+--disable-keys --extended-insert --skip-lock-tables --quick --set-charset \
+--skip-comments --flush-privileges --all-databases"
+
+# mysqldump cannot restore CSV tables, fix this issue
+CSV_TABLES_FIX="
+set sql_mode='';
+
+USE mysql;
+
+SET @str = IF (@@have_csv = 'YES', 'CREATE TABLE IF NOT EXISTS general_log (event_time TIMESTAMP NOT NULL, user_host MEDIUMTEXT NOT NULL, thread_id INTEGER NOT NULL, server_id INTEGER UNSIGNED NOT NULL, command_type VARCHAR(64) NOT NULL,argument MEDIUMTEXT NOT NULL) engine=CSV CHARACTER SET utf8 comment=\"General log\"', 'SET @dummy = 0');
+
+PREPARE stmt FROM @str;
+EXECUTE stmt;
+DROP PREPARE stmt;
+
+SET @str = IF (@@have_csv = 'YES', 'CREATE TABLE IF NOT EXISTS slow_log (start_time TIMESTAMP NOT NULL, user_host MEDIUMTEXT NOT NULL, query_time TIME NOT NULL, lock_time TIME NOT NULL, rows_sent INTEGER NOT NULL, rows_examined INTEGER NOT NULL, db VARCHAR(512) NOT NULL, last_insert_id INTEGER NOT NULL, insert_id INTEGER NOT NULL, server_id INTEGER UNSIGNED NOT NULL, sql_text MEDIUMTEXT NOT NULL) engine=CSV CHARACTER SET utf8 comment=\"Slow log\"', 'SET @dummy = 0');
+
+PREPARE stmt FROM @str;
+EXECUTE stmt;
+DROP PREPARE stmt;"
+
+SET_START_POSITION="SET GLOBAL wsrep_start_position='$UUID:$SEQNO';"
+
+#MYSQL="@bindir@/mysql -u'$USER' -p'$PSWD' -h'$HOST' -P'$PORT'"
+MYSQL="mysql $AUTH -h$HOST -P$PORT --disable-reconnect --connect_timeout=10"
+
+# need to disable logging when loading the dump
+# reason is that dump contains ALTER TABLE for log tables, and
+# this causes an error if logging is enabled
+GENERAL_LOG_OPT=`$MYSQL --skip-column-names -e"$STOP_WSREP SELECT @@GENERAL_LOG"`
+SLOW_LOG_OPT=`$MYSQL --skip-column-names -e"$STOP_WSREP SELECT @@SLOW_QUERY_LOG"`
+$MYSQL -e"$STOP_WSREP SET GLOBAL GENERAL_LOG=OFF"
+$MYSQL -e"$STOP_WSREP SET GLOBAL SLOW_QUERY_LOG=OFF"
+
+# commands to restore log settings
+RESTORE_GENERAL_LOG="SET GLOBAL GENERAL_LOG=$GENERAL_LOG_OPT;"
+RESTORE_SLOW_QUERY_LOG="SET GLOBAL SLOW_QUERY_LOG=$SLOW_LOG_OPT;"
+
+if [ $BYPASS -eq 0 ]
+then
+ (echo $STOP_WSREP && $MYSQLDUMP && echo $CSV_TABLES_FIX \
+ && echo $RESTORE_GENERAL_LOG && echo $RESTORE_SLOW_QUERY_LOG \
+ && echo $SET_START_POSITION \
+ || echo "SST failed to complete;") | $MYSQL
+else
+ echo "Bypassing state dump." >&2
+ echo $SET_START_POSITION | $MYSQL
+fi
+
+#
diff --git a/scripts/wsrep_sst_rsync.sh b/scripts/wsrep_sst_rsync.sh
new file mode 100644
index 00000000000..c0b54159eda
--- /dev/null
+++ b/scripts/wsrep_sst_rsync.sh
@@ -0,0 +1,214 @@
+#!/bin/bash -ue
+
+# Copyright (C) 2010 Codership Oy
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; see the file COPYING. If not, write to the
+# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston
+# MA 02110-1301 USA.
+
+# This is a reference script for rsync-based state snapshot tansfer
+
+RSYNC_PID=
+RSYNC_CONF=
+
+cleanup_joiner()
+{
+ echo "Joiner cleanup:" >&2
+set -x
+ local PID=$(cat "$RSYNC_PID" 2>/dev/null || echo 0)
+ [ "0" != "$PID" ] && kill $PID && sleep 0.5 && kill -9 $PID || :
+set +x
+ rm -rf "$RSYNC_CONF"
+ rm -rf "$MAGIC_FILE"
+ rm -rf "$RSYNC_PID"
+}
+
+check_pid()
+{
+ local pid_file=$1
+ [ -r $pid_file ] && ps -p $(cat $pid_file) >/dev/null 2>&1
+}
+
+check_pid_and_port()
+{
+ local pid_file=$1
+ local rsync_pid=$(cat $pid_file)
+ local rsync_port=$2
+
+ check_pid $pid_file && \
+ netstat -anpt 2>/dev/null | \
+ grep LISTEN | grep \:$rsync_port | grep $rsync_pid/rsync >/dev/null
+}
+
+ROLE=$1
+ADDR=$2
+AUTH=$3
+DATA=$4
+CONF=$5
+
+MAGIC_FILE="$DATA/rsync_sst_complete"
+rm -rf "$MAGIC_FILE"
+
+if [ "$ROLE" = "donor" ]
+then
+ UUID=$6
+ SEQNO=$7
+ BYPASS=$8
+
+ if [ $BYPASS -eq 0 ]
+ then
+
+ FLUSHED="$DATA/tables_flushed"
+ rm -rf "$FLUSHED"
+
+ # Use deltaxfer only for WAN
+ inv=$(basename $0)
+ [ "$inv" = "wsrep_sst_rsync_wan" ] && WHOLE_FILE_OPT="" \
+ || WHOLE_FILE_OPT="--whole-file"
+
+ echo "flush tables"
+
+ # wait for tables flushed and state ID written to the file
+ while [ ! -r "$FLUSHED" ] && ! grep -q ':' "$FLUSHED" >/dev/null 2>&1
+ do
+ sleep 0.2
+ done
+
+ STATE="$(cat $FLUSHED)"
+ rm -rf "$FLUSHED"
+
+ sync
+
+ # Old filter - include everything except selected
+ # FILTER=(--exclude '*.err' --exclude '*.pid' --exclude '*.sock' \
+ # --exclude '*.conf' --exclude core --exclude 'galera.*' \
+ # --exclude grastate.txt --exclude '*.pem' \
+ # --exclude '*.[0-9][0-9][0-9][0-9][0-9][0-9]' --exclude '*.index')
+
+ # New filter - exclude everything except dirs (schemas) and innodb files
+ FILTER=(-f '+ /ibdata*' -f '+ /ib_logfile*' -f '+ */' -f '-! */*')
+
+ RC=0
+ rsync --archive --no-times --ignore-times --inplace --delete --quiet \
+ $WHOLE_FILE_OPT "${FILTER[@]}" "$DATA" rsync://$ADDR || RC=$?
+
+ [ $RC -ne 0 ] && echo "rsync returned code $RC:" >> /dev/stderr
+
+ case $RC in
+ 0) RC=0 # Success
+ ;;
+ 12) RC=71 # EPROTO
+ echo "rsync server on the other end has incompatible protocol. " \
+ "Make sure you have the same version of rsync on all nodes."\
+ >> /dev/stderr
+ ;;
+ 22) RC=12 # ENOMEM
+ ;;
+ *) RC=255 # unknown error
+ ;;
+ esac
+
+ [ $RC -ne 0 ] && exit $RC
+
+ else # BYPASS
+ STATE="$UUID:$SEQNO"
+ fi
+
+ echo "continue" # now server can resume updating data
+
+ echo "$STATE" > "$MAGIC_FILE"
+ rsync -aqc "$MAGIC_FILE" rsync://$ADDR
+
+ echo "done $STATE"
+
+elif [ "$ROLE" = "joiner" ]
+then
+ MYSQLD_PID=$6
+
+ MODULE="rsync_sst"
+
+ RSYNC_PID="$DATA/$MODULE.pid"
+
+ if check_pid $RSYNC_PID
+ then
+ echo "rsync daemon already running."
+ exit 114 # EALREADY
+ fi
+ rm -rf "$RSYNC_PID"
+
+ RSYNC_PORT=$(echo $ADDR | awk -F ':' '{ print $2 }')
+ if [ -z "$RSYNC_PORT" ]
+ then
+ RSYNC_PORT=4444
+ ADDR="$(echo $ADDR | awk -F ':' '{ print $1 }'):$RSYNC_PORT"
+ fi
+
+ trap "exit 32" HUP PIPE
+ trap "exit 3" INT TERM
+ trap cleanup_joiner EXIT
+
+ MYUID=$(id -u)
+ MYGID=$(id -g)
+ RSYNC_CONF="$DATA/$MODULE.conf"
+
+cat << EOF > "$RSYNC_CONF"
+pid file = $RSYNC_PID
+use chroot = no
+[$MODULE]
+ path = $DATA
+ read only = no
+ timeout = 300
+ uid = $MYUID
+ gid = $MYGID
+EOF
+
+# rm -rf "$DATA"/ib_logfile* # we don't want old logs around
+
+ # listen at all interfaces (for firewalled setups)
+ rsync --daemon --port $RSYNC_PORT --config "$RSYNC_CONF"
+
+ until check_pid_and_port $RSYNC_PID $RSYNC_PORT
+ do
+ sleep 0.2
+ done
+
+ echo "ready $ADDR/$MODULE"
+
+ # wait for SST to complete by monitoring magic file
+ while [ ! -r "$MAGIC_FILE" ] && check_pid "$RSYNC_PID" && \
+ ps -p $MYSQLD_PID >/dev/null
+ do
+ sleep 1
+ done
+
+ if ! ps -p $MYSQLD_PID >/dev/null
+ then
+ echo "Parent mysqld process (PID:$MYSQLD_PID) terminated unexpectedly." >&2
+ exit 32
+ fi
+
+ if [ -r "$MAGIC_FILE" ]
+ then
+ cat "$MAGIC_FILE" # output UUID:seqno
+ else
+ # this message should cause joiner to abort
+ echo "rsync process ended without creating '$MAGIC_FILE'"
+ fi
+
+# cleanup_joiner
+else
+ echo "Unrecognized role: $ROLE"
+ exit 22 # EINVAL
+fi
+
+exit 0
diff --git a/sql/wsrep_check_opts.cc b/sql/wsrep_check_opts.cc
new file mode 100644
index 00000000000..5764be39093
--- /dev/null
+++ b/sql/wsrep_check_opts.cc
@@ -0,0 +1,392 @@
+/* Copyright 2011 Codership Oy <http://www.codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+//#include <mysqld.h>
+#include <sql_class.h>
+//#include <sql_plugin.h>
+//#include <set_var.h>
+
+#include "wsrep_mysqld.h"
+
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <ctype.h>
+
+/* This file is about checking for correctness of mysql configuration options */
+
+struct opt
+{
+ const char* const name;
+ const char* value;
+};
+
+/* A list of options to check.
+ * At first we assume default values and then see if they are changed on CLI or
+ * in my.cnf */
+static struct opt opts[] =
+{
+ { "wsrep_slave_threads", "1" }, // mysqld.cc
+ { "bind_address", "0.0.0.0" }, // mysqld.cc
+ { "wsrep_sst_method","mysqldump" }, // mysqld.cc
+ { "wsrep_sst_receive_address","AUTO"}, // mysqld.cc
+ { "binlog_format", "ROW" }, // mysqld.cc
+ { "wsrep_provider", "none" }, // mysqld.cc
+ { "query_cache_type", "0" }, // mysqld.cc
+ { "query_cache_size", "0" }, // mysqld.cc
+ { "locked_in_memory", "0" }, // mysqld.cc
+ { "wsrep_cluster_address", "0" }, // mysqld.cc
+ { "locks_unsafe_for_binlog", "0" }, // ha_innodb.cc
+ { "autoinc_lock_mode", "1" }, // ha_innodb.cc
+ { 0, 0 }
+};
+
+enum
+{
+ WSREP_SLAVE_THREADS,
+ BIND_ADDRESS,
+ WSREP_SST_METHOD,
+ WSREP_SST_RECEIVE_ADDRESS,
+ BINLOG_FORMAT,
+ WSREP_PROVIDER,
+ QUERY_CACHE_TYPE,
+ QUERY_CACHE_SIZE,
+ LOCKED_IN_MEMORY,
+ WSREP_CLUSTER_ADDRESS,
+ LOCKS_UNSAFE_FOR_BINLOG,
+ AUTOINC_LOCK_MODE
+};
+
+
+/* A class to make a copy of argv[] vector */
+struct argv_copy
+{
+ int const argc_;
+ char** argv_;
+
+ argv_copy (int const argc, const char* const argv[]) :
+ argc_ (argc),
+ argv_ (reinterpret_cast<char**>(calloc(argc_, sizeof(char*))))
+ {
+ if (argv_)
+ {
+ for (int i = 0; i < argc_; ++i)
+ {
+ argv_[i] = strdup(argv[i]);
+
+ if (!argv_[i])
+ {
+ argv_free (); // free whatever bee allocated
+ return;
+ }
+ }
+ }
+ }
+
+ ~argv_copy () { argv_free (); }
+
+private:
+ argv_copy (const argv_copy&);
+ argv_copy& operator= (const argv_copy&);
+
+ void argv_free()
+ {
+ if (argv_)
+ {
+ for (int i = 0; (i < argc_) && argv_[i] ; ++i) free (argv_[i]);
+ free (argv_);
+ argv_ = 0;
+ }
+ }
+};
+
+/* a short corresponding to '--' byte sequence */
+static short const long_opt_prefix ('-' + ('-' << 8));
+
+/* Normalizes long options to have '_' instead of '-' */
+static int
+normalize_opts (argv_copy& a)
+{
+ if (a.argv_)
+ {
+ for (int i = 0; i < a.argc_; ++i)
+ {
+ char* ptr = a.argv_[i];
+ if (long_opt_prefix == *(short*)ptr) // long option
+ {
+ ptr += 2;
+ const char* end = strchr(ptr, '=');
+
+ if (!end) end = ptr + strlen(ptr);
+
+ for (; ptr != end; ++ptr) if ('-' == *ptr) *ptr = '_';
+ }
+ }
+
+ return 0;
+ }
+
+ return EINVAL;
+}
+
+/* Find required options in the argument list and change their values */
+static int
+find_opts (argv_copy& a, struct opt* const opts)
+{
+ for (int i = 0; i < a.argc_; ++i)
+ {
+ char* ptr = a.argv_[i] + 2; // we're interested only in long options
+
+ struct opt* opt = opts;
+ for (; 0 != opt->name; ++opt)
+ {
+ if (!strstr(ptr, opt->name)) continue; // try next option
+
+ /* 1. try to find value after the '=' */
+ opt->value = strchr(ptr, '=') + 1;
+
+ /* 2. if no '=', try next element in the argument vector */
+ if (reinterpret_cast<void*>(1) == opt->value)
+ {
+ /* also check that the next element is not an option itself */
+ if (i + 1 < a.argc_ && *(a.argv_[i + 1]) != '-')
+ {
+ ++i;
+ opt->value = a.argv_[i];
+ }
+ else opt->value = ""; // no value supplied (like boolean opt)
+ }
+
+ break; // option found, break inner loop
+ }
+ }
+
+ return 0;
+}
+
+/* Parses string for an integer. Returns 0 on success. */
+int get_long_long (const struct opt& opt, long long* const val, int const base)
+{
+ const char* const str = opt.value;
+
+ if ('\0' != *str)
+ {
+ char* endptr;
+
+ *val = strtoll (str, &endptr, base);
+
+ if ('k' == *endptr || 'K' == *endptr)
+ {
+ *val *= 1024L;
+ endptr++;
+ }
+ else if ('m' == *endptr || 'M' == *endptr)
+ {
+ *val *= 1024L * 1024L;
+ endptr++;
+ }
+ else if ('g' == *endptr || 'G' == *endptr)
+ {
+ *val *= 1024L * 1024L * 1024L;
+ endptr++;
+ }
+
+ if ('\0' == *endptr) return 0; // the whole string was a valid integer
+ }
+
+ WSREP_ERROR ("Bad value for *%s: '%s'. Should be integer.",
+ opt.name, opt.value);
+
+ return EINVAL;
+}
+
+/* This is flimzy coz hell knows how mysql interprets boolean strings...
+ * and, no, I'm not going to become versed in how mysql handles options -
+ * I'd rather sing.
+
+ Aha, http://dev.mysql.com/doc/refman/5.1/en/dynamic-system-variables.html:
+ Variables that have a type of “boolean” can be set to 0, 1, ON or OFF. (If you
+ set them on the command line or in an option file, use the numeric values.)
+
+ So it is '0' for FALSE, '1' or empty string for TRUE
+
+ */
+int get_bool (const struct opt& opt, bool* const val)
+{
+ const char* str = opt.value;
+
+ while (isspace(*str)) ++str; // skip initial whitespaces
+
+ ssize_t str_len = strlen(str);
+ switch (str_len)
+ {
+ case 0:
+ *val = true;
+ return 0;
+ case 1:
+ if ('0' == *str || '1' == *str)
+ {
+ *val = ('1' == *str);
+ return 0;
+ }
+ }
+
+ WSREP_ERROR ("Bad value for *%s: '%s'. Should be '0', '1' or empty string.",
+ opt.name, opt.value);
+
+ return EINVAL;
+}
+
+static int
+check_opts (int const argc, const char* const argv[], struct opt opts[])
+{
+ /* First, make a copy of argv to be able to manipulate it */
+ argv_copy a(argc, argv);
+
+ if (!a.argv_)
+ {
+ WSREP_ERROR ("Could not copy argv vector: not enough memory.");
+ return ENOMEM;
+ }
+
+ int err = normalize_opts (a);
+ if (err)
+ {
+ WSREP_ERROR ("Failed to normalize options.");
+ return err;
+ }
+
+ err = find_opts (a, opts);
+ if (err)
+ {
+ WSREP_ERROR ("Failed to parse options.");
+ return err;
+ }
+
+ /* At this point we have updated default values in our option list to
+ what has been specified on the command line / my.cnf */
+
+ long long slave_threads;
+ err = get_long_long (opts[WSREP_SLAVE_THREADS], &slave_threads, 10);
+ if (err) return err;
+
+ int rcode = 0;
+
+ if (slave_threads > 1)
+ /* Need to check AUTOINC_LOCK_MODE and LOCKS_UNSAFE_FOR_BINLOG */
+ {
+ long long autoinc_lock_mode;
+ err = get_long_long (opts[AUTOINC_LOCK_MODE], &autoinc_lock_mode, 10);
+ if (err) return err;
+
+ bool locks_unsafe_for_binlog;
+ err = get_bool (opts[LOCKS_UNSAFE_FOR_BINLOG],&locks_unsafe_for_binlog);
+ if (err) return err;
+
+ if (autoinc_lock_mode != 2)
+ {
+ WSREP_ERROR ("Parallel applying (wsrep_slave_threads > 1) requires"
+ " innodb_autoinc_lock_mode = 2.");
+ rcode = EINVAL;
+ }
+ }
+
+ long long query_cache_size, query_cache_type;
+ if ((err = get_long_long (opts[QUERY_CACHE_SIZE], &query_cache_size, 10)))
+ return err;
+ if ((err = get_long_long (opts[QUERY_CACHE_TYPE], &query_cache_type, 10)))
+ return err;
+
+ if (0 != query_cache_size && 0 != query_cache_type)
+ {
+ WSREP_ERROR ("Query cache is not supported (size=%lld type=%lld)",
+ query_cache_size, query_cache_type);
+ rcode = EINVAL;
+ }
+
+ bool locked_in_memory;
+ err = get_bool (opts[LOCKED_IN_MEMORY], &locked_in_memory);
+ if (err) { WSREP_ERROR("get_bool error: %s", strerror(err)); return err; }
+ if (locked_in_memory)
+ {
+ WSREP_ERROR ("Memory locking is not supported (locked_in_memory=%s)",
+ locked_in_memory ? "ON" : "OFF");
+ rcode = EINVAL;
+ }
+
+ if (!strcasecmp(opts[WSREP_SST_METHOD].value,"mysqldump"))
+ {
+ if (!strcasecmp(opts[BIND_ADDRESS].value, "127.0.0.1") ||
+ !strcasecmp(opts[BIND_ADDRESS].value, "localhost"))
+ {
+ WSREP_ERROR ("wsrep_sst_method is set to 'mysqldump' yet "
+ "mysqld bind_address is set to '%s', which makes it "
+ "impossible to receive state transfer from another "
+ "node, since mysqld won't accept such connections. "
+ "If you wish to use mysqldump state transfer method, "
+ "set bind_address to allow mysql client connections "
+ "from other cluster members (e.g. 0.0.0.0).",
+ opts[BIND_ADDRESS].value);
+ rcode = EINVAL;
+ }
+ }
+ else
+ {
+ // non-mysqldump SST requires wsrep_cluster_address on startup
+ if (strlen(opts[WSREP_CLUSTER_ADDRESS].value) == 0)
+ {
+ WSREP_ERROR ("%s SST method requires wsrep_cluster_address to be "
+ "configured on startup.",opts[WSREP_SST_METHOD].value);
+ rcode = EINVAL;
+ }
+ }
+
+ if (strcasecmp(opts[WSREP_SST_RECEIVE_ADDRESS].value, "AUTO"))
+ {
+ if (!strncasecmp(opts[WSREP_SST_RECEIVE_ADDRESS].value,
+ "127.0.0.1", strlen("127.0.0.1")) ||
+ !strncasecmp(opts[WSREP_SST_RECEIVE_ADDRESS].value,
+ "localhost", strlen("localhost")))
+ {
+ WSREP_WARN ("wsrep_sst_receive_address is set to '%s' which "
+ "makes it impossible for another host to reach this "
+ "one. Please set it to the address which this node "
+ "can be connected at by other cluster members.",
+ opts[WSREP_SST_RECEIVE_ADDRESS].value);
+// rcode = EINVAL;
+ }
+ }
+
+ if (strcasecmp(opts[WSREP_PROVIDER].value, "none"))
+ {
+ if (strcasecmp(opts[BINLOG_FORMAT].value, "ROW"))
+ {
+ WSREP_ERROR ("Only binlog_format = 'ROW' is currently supported. "
+ "Configured value: '%s'. Please adjust your "
+ "configuration.", opts[BINLOG_FORMAT].value);
+
+ rcode = EINVAL;
+ }
+ }
+
+ return rcode;
+}
+
+int
+wsrep_check_opts (int const argc, char* const* const argv)
+{
+ return check_opts (argc, argv, opts);
+}
+
diff --git a/sql/wsrep_hton.cc b/sql/wsrep_hton.cc
new file mode 100644
index 00000000000..9986d7c79cd
--- /dev/null
+++ b/sql/wsrep_hton.cc
@@ -0,0 +1,419 @@
+/* Copyright 2008 Codership Oy <http://www.codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <mysqld.h>
+#include "sql_base.h"
+#include <sql_class.h>
+#include "wsrep_mysqld.h"
+#include "wsrep_priv.h"
+#include <cstdio>
+#include <cstdlib>
+
+extern handlerton *binlog_hton;
+extern int binlog_close_connection(handlerton *hton, THD *thd);
+extern ulonglong thd_to_trx_id(THD *thd);
+
+extern "C" int thd_binlog_format(const MYSQL_THD thd);
+// todo: share interface with ha_innodb.c
+
+enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all);
+
+/*
+ a post-commit cleanup on behalf of wsrep. Can't be a part of hton struct.
+ Is called by THD::transactions.cleanup()
+*/
+void wsrep_cleanup_transaction(THD *thd)
+{
+ if (thd->thread_id == 0) return;
+ if (thd->wsrep_exec_mode == LOCAL_COMMIT)
+ {
+ if (thd->variables.wsrep_on &&
+ thd->wsrep_conflict_state != MUST_REPLAY)
+ {
+ if (thd->wsrep_seqno_changed)
+ {
+ if (wsrep->post_commit(wsrep, &thd->wsrep_trx_handle))
+ {
+ DBUG_PRINT("wsrep", ("set committed fail"));
+ WSREP_WARN("set committed fail: %llu %d",
+ (long long)thd->real_id, thd->stmt_da->status());
+ }
+ }
+ //else
+ //WSREP_DEBUG("no trx handle for %s", thd->query());
+ thd_binlog_trx_reset(thd);
+ thd->wsrep_seqno_changed = false;
+ }
+ thd->wsrep_exec_mode= LOCAL_STATE;
+ }
+}
+
+/*
+ wsrep hton
+*/
+handlerton *wsrep_hton;
+
+void wsrep_register_hton(THD* thd, bool all)
+{
+ THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt;
+ for (Ha_trx_info *i= trans->ha_list; WSREP(thd) && i; i = i->next())
+ {
+ if (i->ht()->db_type == DB_TYPE_INNODB)
+ {
+ trans_register_ha(thd, all, wsrep_hton);
+ thd->ha_data[wsrep_hton->slot].ha_info[all].set_trx_read_write();
+ break;
+ }
+ }
+}
+
+/*
+ wsrep exploits binlog's caches even if binlogging itself is not
+ activated. In such case connection close needs calling
+ actual binlog's method.
+ Todo: split binlog hton from its caches to use ones by wsrep
+ without referring to binlog's stuff.
+*/
+static int
+wsrep_close_connection(handlerton* hton, THD* thd)
+{
+ DBUG_ENTER("wsrep_close_connection");
+ if (thd_get_ha_data(thd, binlog_hton) != NULL)
+ binlog_hton->close_connection (binlog_hton, thd);
+ DBUG_RETURN(0);
+}
+
+/*
+ prepare/wsrep_run_wsrep_commit can fail in two ways
+ - certification test or an equivalent. As a result,
+ the current transaction just rolls back
+ Error codes:
+ WSREP_TRX_ROLLBACK, WSREP_TRX_ERROR
+ - a post-certification failure makes this server unable to
+ commit its own WS and therefore the server must abort
+*/
+static int wsrep_prepare(handlerton *hton, THD *thd, bool all)
+{
+#ifndef DBUG_OFF
+ //wsrep_seqno_t old = thd->wsrep_trx_seqno;
+#endif
+ DBUG_ENTER("wsrep_prepare");
+ if ((all ||
+ !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
+ (thd->variables.wsrep_on && !wsrep_trans_cache_is_empty(thd)))
+ {
+ switch (wsrep_run_wsrep_commit(thd, hton, all))
+ {
+ case WSREP_TRX_OK:
+ // DBUG_ASSERT(thd->wsrep_trx_seqno > old ||
+ // thd->wsrep_exec_mode == REPL_RECV ||
+ // thd->wsrep_exec_mode == TOTAL_ORDER);
+ break;
+ case WSREP_TRX_ROLLBACK:
+ case WSREP_TRX_ERROR:
+ DBUG_RETURN(1);
+ }
+ }
+ DBUG_RETURN(0);
+}
+
+static int wsrep_savepoint_set(handlerton *hton, THD *thd, void *sv)
+{
+ if (!wsrep_emulate_bin_log) return 0;
+ int rcode = binlog_hton->savepoint_set(binlog_hton, thd, sv);
+ return rcode;
+}
+static int wsrep_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
+{
+ if (!wsrep_emulate_bin_log) return 0;
+ int rcode = binlog_hton->savepoint_rollback(binlog_hton, thd, sv);
+ return rcode;
+}
+
+static int wsrep_rollback(handlerton *hton, THD *thd, bool all)
+{
+ DBUG_ENTER("wsrep_rollback");
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ if ((all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
+ (thd->variables.wsrep_on && thd->wsrep_conflict_state != MUST_REPLAY))
+ {
+ if (wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle))
+ {
+ DBUG_PRINT("wsrep", ("setting rollback fail"));
+ WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s",
+ (long long)thd->real_id, thd->query());
+ }
+ }
+
+ int rcode = 0;
+ if (!wsrep_emulate_bin_log)
+ {
+ if (all) thd_binlog_trx_reset(thd);
+ }
+
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ DBUG_RETURN(rcode);
+}
+
+int wsrep_commit(handlerton *hton, THD *thd, bool all)
+{
+ DBUG_ENTER("wsrep_commit");
+
+ DBUG_RETURN(0);
+}
+
+extern my_bool opt_log_slave_updates;
+enum wsrep_trx_status
+wsrep_run_wsrep_commit(
+ THD *thd, handlerton *hton, bool all)
+{
+ int rcode = -1;
+ uint data_len = 0;
+ uchar *rbr_data = NULL;
+ IO_CACHE *cache;
+ int replay_round= 0;
+
+ if (thd->stmt_da->is_error()) {
+ WSREP_ERROR("commit issue, error: %d %s",
+ thd->stmt_da->sql_errno(), thd->stmt_da->message());
+ }
+
+ DBUG_ENTER("wsrep_run_wsrep_commit");
+ if (thd->slave_thread && !opt_log_slave_updates) {
+ DBUG_RETURN(WSREP_TRX_OK);
+ }
+ if (thd->wsrep_exec_mode == REPL_RECV) {
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ if (thd->wsrep_conflict_state == MUST_ABORT) {
+ if (wsrep_debug)
+ WSREP_INFO("WSREP: must abort for BF");
+ DBUG_PRINT("wsrep", ("BF apply commit fail"));
+ thd->wsrep_conflict_state = NO_CONFLICT;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ //
+ // TODO: test all calls of the rollback.
+ // rollback must happen automagically innobase_rollback(hton, thd, 1);
+ //
+ DBUG_RETURN(WSREP_TRX_ERROR);
+ }
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ }
+ if (thd->wsrep_exec_mode != LOCAL_STATE) {
+ DBUG_RETURN(WSREP_TRX_OK);
+ }
+ if (thd->wsrep_consistency_check) {
+ WSREP_DEBUG("commit for consistency check: %s", thd->query());
+ DBUG_RETURN(WSREP_TRX_OK);
+ }
+
+ DBUG_PRINT("wsrep", ("replicating commit"));
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ if (thd->wsrep_conflict_state == MUST_ABORT) {
+ DBUG_PRINT("wsrep", ("replicate commit fail"));
+ thd->wsrep_conflict_state = ABORTED;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ if (wsrep_debug) {
+ WSREP_INFO("innobase_commit, abort %s",
+ (thd->query()) ? thd->query() : "void");
+ }
+ DBUG_RETURN(WSREP_TRX_ROLLBACK);
+ }
+
+ mysql_mutex_lock(&LOCK_wsrep_replaying);
+
+ while (wsrep_replaying > 0 &&
+ thd->wsrep_conflict_state == NO_CONFLICT &&
+ thd->killed == NOT_KILLED &&
+ !shutdown_in_progress) {
+
+ mysql_mutex_unlock(&LOCK_wsrep_replaying);
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ mysql_mutex_lock(&thd->mysys_var->mutex);
+ thd_proc_info(thd, "wsrep waiting on replaying");
+ thd->mysys_var->current_mutex= &LOCK_wsrep_replaying;
+ thd->mysys_var->current_cond= &COND_wsrep_replaying;
+ mysql_mutex_unlock(&thd->mysys_var->mutex);
+
+ mysql_mutex_lock(&LOCK_wsrep_replaying);
+ // Using timedwait is a hack to avoid deadlock in case if BF victim
+ // misses the signal.
+ struct timespec wtime = {0, 1000000};
+ mysql_cond_timedwait(&COND_wsrep_replaying, &LOCK_wsrep_replaying,
+ &wtime);
+ if (replay_round++ % 100000 == 0)
+ WSREP_DEBUG("commit waiting for replaying: replayers %d, thd: (%lu) conflict: %d (round: %d)",
+ wsrep_replaying, thd->thread_id, thd->wsrep_conflict_state, replay_round);
+
+ mysql_mutex_unlock(&LOCK_wsrep_replaying);
+
+ mysql_mutex_lock(&thd->mysys_var->mutex);
+ thd->mysys_var->current_mutex= 0;
+ thd->mysys_var->current_cond= 0;
+ mysql_mutex_unlock(&thd->mysys_var->mutex);
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ mysql_mutex_lock(&LOCK_wsrep_replaying);
+ }
+ mysql_mutex_unlock(&LOCK_wsrep_replaying);
+
+ if (thd->wsrep_conflict_state == MUST_ABORT) {
+ DBUG_PRINT("wsrep", ("replicate commit fail"));
+ thd->wsrep_conflict_state = ABORTED;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ WSREP_DEBUG("innobase_commit abort after replaying wait %s",
+ (thd->query()) ? thd->query() : "void");
+ DBUG_RETURN(WSREP_TRX_ROLLBACK);
+ } thd->wsrep_query_state = QUERY_COMMITTING;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ cache = get_trans_log(thd);
+ rcode = 0;
+ if (cache) {
+ thd->binlog_flush_pending_rows_event(true);
+ rcode = wsrep_write_cache(cache, &rbr_data, &data_len);
+ if (rcode) {
+ WSREP_ERROR("rbr write fail, data_len: %d, %d", data_len, rcode);
+ if (data_len) my_free(rbr_data);
+ DBUG_RETURN(WSREP_TRX_ROLLBACK);
+ }
+ }
+ if (data_len == 0)
+ {
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ thd->wsrep_exec_mode = LOCAL_COMMIT;
+ WSREP_DEBUG("empty rbr buffer, query: %s", thd->query());
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ DBUG_RETURN(WSREP_TRX_OK);
+ }
+ if (!rcode) {
+ rcode = wsrep->pre_commit(
+ wsrep,
+ (wsrep_conn_id_t)thd->thread_id,
+ &thd->wsrep_trx_handle,
+ rbr_data,
+ data_len,
+ (thd->wsrep_PA_safe) ? WSREP_FLAG_PA_SAFE : 0ULL,
+ &thd->wsrep_trx_seqno);
+ if (rcode == WSREP_TRX_MISSING) {
+ rcode = WSREP_OK;
+ } else if (rcode == WSREP_BF_ABORT) {
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ thd->wsrep_conflict_state = MUST_REPLAY;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ mysql_mutex_lock(&LOCK_wsrep_replaying);
+ wsrep_replaying++;
+ WSREP_DEBUG("replaying increased: %d, thd: %lu",
+ wsrep_replaying, thd->thread_id);
+ mysql_mutex_unlock(&LOCK_wsrep_replaying);
+ }
+ thd->wsrep_seqno_changed = true;
+ } else {
+ WSREP_ERROR("I/O error reading from thd's binlog iocache: "
+ "errno=%d, io cache code=%d", my_errno, cache->error);
+ if (data_len) my_free(rbr_data);
+ DBUG_ASSERT(0); // failure like this can not normally happen
+ DBUG_RETURN(WSREP_TRX_ERROR);
+ }
+
+ if (data_len) {
+ my_free(rbr_data);
+ }
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ switch(rcode) {
+ case 0:
+ thd->wsrep_exec_mode = LOCAL_COMMIT;
+ /* Override XID iff it was generated by mysql */
+ if (thd->transaction.xid_state.xid.get_my_xid())
+ {
+ wsrep_xid_init(&thd->transaction.xid_state.xid,
+ wsrep_cluster_uuid(),
+ thd->wsrep_trx_seqno);
+ }
+ DBUG_PRINT("wsrep", ("replicating commit success"));
+
+ break;
+ case WSREP_TRX_FAIL:
+ case WSREP_BF_ABORT:
+ WSREP_DEBUG("commit failed for reason: %d", rcode);
+ DBUG_PRINT("wsrep", ("replicating commit fail"));
+
+ thd->wsrep_query_state= QUERY_EXEC;
+
+ if (thd->wsrep_conflict_state == MUST_ABORT) {
+ thd->wsrep_conflict_state= ABORTED;
+ }
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ DBUG_RETURN(WSREP_TRX_ROLLBACK);
+
+ case WSREP_CONN_FAIL:
+ WSREP_ERROR("connection failure");
+ DBUG_RETURN(WSREP_TRX_ERROR);
+ default:
+ WSREP_ERROR("unknown connection failure");
+ DBUG_RETURN(WSREP_TRX_ERROR);
+ }
+
+ thd->wsrep_query_state= QUERY_EXEC;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ DBUG_RETURN(WSREP_TRX_OK);
+}
+
+
+static int wsrep_hton_init(void *p)
+{
+ wsrep_hton= (handlerton *)p;
+ //wsrep_hton->state=opt_bin_log ? SHOW_OPTION_YES : SHOW_OPTION_NO;
+ wsrep_hton->state= SHOW_OPTION_YES;
+ wsrep_hton->db_type=DB_TYPE_WSREP;
+ wsrep_hton->savepoint_offset= sizeof(my_off_t);
+ wsrep_hton->close_connection= wsrep_close_connection;
+ wsrep_hton->savepoint_set= wsrep_savepoint_set;
+ wsrep_hton->savepoint_rollback= wsrep_savepoint_rollback;
+ wsrep_hton->commit= wsrep_commit;
+ wsrep_hton->rollback= wsrep_rollback;
+ wsrep_hton->prepare= wsrep_prepare;
+ wsrep_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN; // todo: fix flags
+ wsrep_hton->slot= 0;
+ return 0;
+}
+
+
+struct st_mysql_storage_engine wsrep_storage_engine=
+{ MYSQL_HANDLERTON_INTERFACE_VERSION };
+
+
+mysql_declare_plugin(wsrep)
+{
+ MYSQL_STORAGE_ENGINE_PLUGIN,
+ &wsrep_storage_engine,
+ "wsrep",
+ "Codership Oy",
+ "A pseudo storage engine to represent transactions in multi-master synchornous replication",
+ PLUGIN_LICENSE_GPL,
+ wsrep_hton_init, /* Plugin Init */
+ NULL, /* Plugin Deinit */
+ 0x0100 /* 1.0 */,
+ NULL, /* status variables */
+ NULL, /* system variables */
+ NULL, /* config options */
+ 0, /* flags */
+}
+mysql_declare_plugin_end;
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
new file mode 100644
index 00000000000..5a5250652b8
--- /dev/null
+++ b/sql/wsrep_mysqld.cc
@@ -0,0 +1,1060 @@
+/* Copyright 2008 Codership Oy <http://www.codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <mysqld.h>
+#include <sql_class.h>
+#include <sql_parse.h>
+#include "wsrep_priv.h"
+#include <cstdio>
+#include <cstdlib>
+#include "log_event.h"
+
+wsrep_t *wsrep = NULL;
+my_bool wsrep_emulate_bin_log = FALSE; // activating parts of binlog interface
+
+/*
+ * Begin configuration options and their default values
+ */
+
+const char* wsrep_data_home_dir = NULL;
+
+#define WSREP_NODE_INCOMING_AUTO "AUTO"
+const char* wsrep_node_incoming_address = WSREP_NODE_INCOMING_AUTO;
+const char* wsrep_dbug_option = "";
+
+long wsrep_slave_threads = 1; // # of slave action appliers wanted
+my_bool wsrep_debug = 0; // enable debug level logging
+my_bool wsrep_convert_LOCK_to_trx = 1; // convert locking sessions to trx
+ulong wsrep_retry_autocommit = 5; // retry aborted autocommit trx
+my_bool wsrep_auto_increment_control = 1; // control auto increment variables
+my_bool wsrep_drupal_282555_workaround = 1; // retry autoinc insert after dupkey
+my_bool wsrep_incremental_data_collection = 0; // incremental data collection
+long long wsrep_max_ws_size = 1073741824LL; //max ws (RBR buffer) size
+long wsrep_max_ws_rows = 65536; // max number of rows in ws
+int wsrep_to_isolation = 0; // # of active TO isolation threads
+my_bool wsrep_certify_nonPK = 1; // certify, even when no primary key
+long wsrep_max_protocol_version = 1; // maximum protocol version to use
+ulong wsrep_forced_binlog_format = BINLOG_FORMAT_UNSPEC;
+my_bool wsrep_recovery = 0; // recovery
+
+/*
+ * End configuration options
+ */
+
+static wsrep_uuid_t cluster_uuid = WSREP_UUID_UNDEFINED;
+const wsrep_uuid_t* wsrep_cluster_uuid()
+{
+ return &cluster_uuid;
+}
+static char cluster_uuid_str[40]= { 0, };
+static const char* cluster_status_str[WSREP_VIEW_MAX] =
+{
+ "Primary",
+ "non-Primary",
+ "Disconnected"
+};
+
+static char provider_name[256]= { 0, };
+static char provider_version[256]= { 0, };
+static char provider_vendor[256]= { 0, };
+
+/*
+ * wsrep status variables
+ */
+my_bool wsrep_connected = FALSE;
+my_bool wsrep_ready = FALSE; // node can accept queries
+const char* wsrep_cluster_state_uuid = cluster_uuid_str;
+long long wsrep_cluster_conf_id = WSREP_SEQNO_UNDEFINED;
+const char* wsrep_cluster_status = cluster_status_str[WSREP_VIEW_DISCONNECTED];
+long wsrep_cluster_size = 0;
+long wsrep_local_index = -1;
+const char* wsrep_provider_name = provider_name;
+const char* wsrep_provider_version = provider_version;
+const char* wsrep_provider_vendor = provider_vendor;
+/* End wsrep status variables */
+
+
+wsrep_uuid_t local_uuid = WSREP_UUID_UNDEFINED;
+wsrep_seqno_t local_seqno = WSREP_SEQNO_UNDEFINED;
+wsp::node_status local_status;
+long wsrep_protocol_version = 1;
+
+// action execute callback
+extern wsrep_status_t wsrep_apply_cb(void *ctx,
+ const void* buf, size_t buf_len,
+ wsrep_seqno_t global_seqno);
+
+extern wsrep_status_t wsrep_commit_cb (void *ctx,
+ wsrep_seqno_t global_seqno,
+ bool commit);
+
+static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) {
+ switch (level) {
+ case WSREP_LOG_INFO:
+ sql_print_information("WSREP: %s", msg);
+ break;
+ case WSREP_LOG_WARN:
+ sql_print_warning("WSREP: %s", msg);
+ break;
+ case WSREP_LOG_ERROR:
+ case WSREP_LOG_FATAL:
+ sql_print_error("WSREP: %s", msg);
+ break;
+ case WSREP_LOG_DEBUG:
+ if (wsrep_debug) sql_print_information ("[Debug] WSREP: %s", msg);
+ default:
+ break;
+ }
+}
+
+static void wsrep_log_states (wsrep_log_level_t level,
+ wsrep_uuid_t* group_uuid,
+ wsrep_seqno_t group_seqno,
+ wsrep_uuid_t* node_uuid,
+ wsrep_seqno_t node_seqno)
+{
+ char uuid_str[37];
+ char msg[256];
+
+ wsrep_uuid_print (group_uuid, uuid_str, sizeof(uuid_str));
+ snprintf (msg, 255, "WSREP: Group state: %s:%lld",
+ uuid_str, (long long)group_seqno);
+ wsrep_log_cb (level, msg);
+
+ wsrep_uuid_print (node_uuid, uuid_str, sizeof(uuid_str));
+ snprintf (msg, 255, "WSREP: Local state: %s:%lld",
+ uuid_str, (long long)node_seqno);
+ wsrep_log_cb (level, msg);
+}
+
+static my_bool set_SE_checkpoint(THD* unused, plugin_ref plugin, void* arg)
+{
+ XID* xid= reinterpret_cast<XID*>(arg);
+ handlerton* hton= plugin_data(plugin, handlerton *);
+ if (hton->db_type == DB_TYPE_INNODB)
+ {
+ const wsrep_uuid_t* uuid(wsrep_xid_uuid(xid));
+ char uuid_str[40] = {0, };
+ wsrep_uuid_print(uuid, uuid_str, sizeof(uuid_str));
+ WSREP_DEBUG("Set WSREPXid for InnoDB: %s:%lld",
+ uuid_str, (long long)wsrep_xid_seqno(xid));
+ hton->wsrep_set_checkpoint(hton, xid);
+ }
+ return FALSE;
+}
+
+void wsrep_set_SE_checkpoint(XID* xid)
+{
+ plugin_foreach(NULL, set_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, xid);
+}
+
+static my_bool get_SE_checkpoint(THD* unused, plugin_ref plugin, void* arg)
+{
+ XID* xid= reinterpret_cast<XID*>(arg);
+ handlerton* hton= plugin_data(plugin, handlerton *);
+ if (hton->db_type == DB_TYPE_INNODB)
+ {
+ hton->wsrep_get_checkpoint(hton, xid);
+ const wsrep_uuid_t* uuid(wsrep_xid_uuid(xid));
+ char uuid_str[40] = {0, };
+ wsrep_uuid_print(uuid, uuid_str, sizeof(uuid_str));
+ WSREP_DEBUG("Read WSREPXid from InnoDB: %s:%lld",
+ uuid_str, (long long)wsrep_xid_seqno(xid));
+
+ }
+ return FALSE;
+}
+
+void wsrep_get_SE_checkpoint(XID* xid)
+{
+ plugin_foreach(NULL, get_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, xid);
+}
+
+static void wsrep_view_handler_cb (void* app_ctx,
+ void* recv_ctx,
+ const wsrep_view_info_t* view,
+ const char* state,
+ size_t state_len,
+ void** sst_req,
+ ssize_t* sst_req_len)
+{
+ wsrep_member_status_t new_status= local_status.get();
+
+ if (memcmp(&cluster_uuid, &view->uuid, sizeof(wsrep_uuid_t)))
+ {
+ cluster_uuid= view->uuid;
+ wsrep_uuid_print (&cluster_uuid, cluster_uuid_str,
+ sizeof(cluster_uuid_str));
+ }
+
+ wsrep_cluster_conf_id= view->view;
+ wsrep_cluster_status= cluster_status_str[view->status];
+ wsrep_cluster_size= view->memb_num;
+ wsrep_local_index= view->my_idx;
+
+ WSREP_INFO("New cluster view: global state: %s:%lld, view# %lld: %s, "
+ "number of nodes: %ld, my index: %ld, protocol version %d",
+ wsrep_cluster_state_uuid, (long long)view->seqno,
+ (long long)wsrep_cluster_conf_id, wsrep_cluster_status,
+ wsrep_cluster_size, wsrep_local_index, view->proto_ver);
+
+ /* Proceed further only if view is PRIMARY */
+ if (WSREP_VIEW_PRIMARY != view->status) {
+ wsrep_ready= FALSE;
+ new_status= WSREP_MEMBER_UNDEFINED;
+ /* Always record local_uuid and local_seqno in non-prim since this
+ * may lead to re-initializing provider and start position is
+ * determined according to these variables */
+ // WRONG! local_uuid should be the last primary configuration uuid we were
+ // a member of. local_seqno should be updated in commit calls.
+ // local_uuid= cluster_uuid;
+ // local_seqno= view->first - 1;
+ goto out;
+ }
+
+ switch (view->proto_ver)
+ {
+ case 0:
+ case 1:
+ // version change
+ if (view->proto_ver != wsrep_protocol_version)
+ {
+ my_bool wsrep_ready_saved= wsrep_ready;
+ wsrep_ready= FALSE;
+ WSREP_INFO("closing client connections for "
+ "protocol change %ld -> %d",
+ wsrep_protocol_version, view->proto_ver);
+ wsrep_close_client_connections(TRUE);
+ wsrep_protocol_version= view->proto_ver;
+ wsrep_ready= wsrep_ready_saved;
+ }
+ break;
+ default:
+ WSREP_ERROR("Unsupported application protocol version: %d",
+ view->proto_ver);
+ unireg_abort(1);
+ }
+
+ if (view->state_gap)
+ {
+ WSREP_WARN("Gap in state sequence. Need state transfer.");
+
+ /* After that wsrep will call wsrep_sst_prepare. */
+ /* keep ready flag 0 until we receive the snapshot */
+ wsrep_ready= FALSE;
+
+ /* Close client connections to ensure that they don't interfere
+ * with SST */
+ WSREP_DEBUG("[debug]: closing client connections for PRIM");
+ wsrep_close_client_connections(TRUE);
+
+ *sst_req_len= wsrep_sst_prepare (sst_req);
+
+ if (*sst_req_len < 0)
+ {
+ int err = *sst_req_len;
+ WSREP_ERROR("SST preparation failed: %d (%s)", -err, strerror(-err));
+ new_status= WSREP_MEMBER_UNDEFINED;
+ }
+ else
+ {
+ new_status= WSREP_MEMBER_JOINER;
+ }
+ }
+ else
+ {
+ /*
+ * NOTE: Initialize wsrep_group_uuid here only if it wasn't initialized
+ * before.
+ */
+ if (!memcmp (&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(wsrep_uuid_t)))
+ {
+ if (wsrep_init_first())
+ {
+ wsrep_SE_init_grab();
+ // Signal init thread to continue
+ wsrep_sst_complete (&cluster_uuid, view->seqno, false);
+ // and wait for SE initialization
+ wsrep_SE_init_wait();
+ }
+ else
+ {
+ local_uuid= cluster_uuid;
+ local_seqno= view->seqno;
+ }
+ /* Init storage engine XIDs from first view */
+ XID xid;
+ wsrep_xid_init(&xid, &local_uuid, local_seqno);
+ wsrep_set_SE_checkpoint(&xid);
+ new_status= WSREP_MEMBER_JOINED;
+ }
+ else // just some sanity check
+ {
+ if (memcmp (&local_uuid, &cluster_uuid, sizeof (wsrep_uuid_t)))
+ {
+ WSREP_ERROR("Undetected state gap. Can't continue.");
+ wsrep_log_states (WSREP_LOG_FATAL, &cluster_uuid, view->seqno,
+ &local_uuid, -1);
+ abort();
+ }
+ }
+ }
+
+ if (wsrep_auto_increment_control)
+ {
+ global_system_variables.auto_increment_offset= view->my_idx + 1;
+ global_system_variables.auto_increment_increment= view->memb_num;
+ }
+
+out:
+
+ local_status.set(new_status, view);
+}
+
+// Wait until wsrep has reached ready state
+void wsrep_ready_wait ()
+{
+ if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
+ while (!wsrep_ready)
+ {
+ WSREP_INFO("Waiting to reach ready state");
+ mysql_cond_wait (&COND_wsrep_ready, &LOCK_wsrep_ready);
+ }
+ WSREP_INFO("ready state reached");
+ mysql_mutex_unlock (&LOCK_wsrep_ready);
+}
+
+static void wsrep_synced_cb(void* app_ctx)
+{
+ WSREP_INFO("Synchronized with group, ready for connections");
+ if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
+ if (!wsrep_ready)
+ {
+ wsrep_ready= TRUE;
+ mysql_cond_signal (&COND_wsrep_ready);
+ }
+ local_status.set(WSREP_MEMBER_SYNCED);
+ mysql_mutex_unlock (&LOCK_wsrep_ready);
+}
+
+static void wsrep_init_position()
+{
+ /* read XIDs from storage engines */
+ XID xid;
+ memset(&xid, 0, sizeof(xid));
+ xid.formatID= -1;
+ wsrep_get_SE_checkpoint(&xid);
+
+ if (xid.formatID == -1)
+ {
+ WSREP_INFO("Read nil XID from storage engines, skipping position init");
+ return;
+ }
+ else if (!wsrep_is_wsrep_xid(&xid))
+ {
+ WSREP_WARN("Read non-wsrep XID from storage engines, skipping position init");
+ return;
+ }
+
+ const wsrep_uuid_t* uuid= wsrep_xid_uuid(&xid);
+ const wsrep_seqno_t seqno= wsrep_xid_seqno(&xid);
+
+ char uuid_str[40] = {0, };
+ wsrep_uuid_print(uuid, uuid_str, sizeof(uuid_str));
+ WSREP_INFO("Initial position: %s:%lld", uuid_str, (long long)seqno);
+
+
+ if (!memcmp(&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(local_uuid)) &&
+ local_seqno == WSREP_SEQNO_UNDEFINED)
+ {
+ // Initial state
+ local_uuid= *uuid;
+ local_seqno= seqno;
+ }
+ else if (memcmp(&local_uuid, uuid, sizeof(local_uuid)) ||
+ local_seqno != seqno)
+ {
+ WSREP_WARN("Initial position was provided by configuration or SST, "
+ "avoiding override");
+ }
+}
+
+
+int wsrep_init()
+{
+ int rcode= -1;
+
+ wsrep_ready= FALSE;
+ assert(wsrep_provider);
+
+ wsrep_init_position();
+
+ if ((rcode= wsrep_load(wsrep_provider, &wsrep, wsrep_log_cb)) != WSREP_OK)
+ {
+ if (strcasecmp(wsrep_provider, WSREP_NONE))
+ {
+ WSREP_ERROR("wsrep_load(%s) failed: %s (%d). Reverting to no provider.",
+ wsrep_provider, strerror(rcode), rcode);
+ strcpy((char*)wsrep_provider, WSREP_NONE); // damn it's a dirty hack
+ (void) wsrep_init();
+ return rcode;
+ }
+ else /* this is for recursive call above */
+ {
+ WSREP_ERROR("Could not revert to no provider: %s (%d). Need to abort.",
+ strerror(rcode), rcode);
+ unireg_abort(1);
+ }
+ }
+
+ if (strlen(wsrep_provider)== 0 ||
+ !strcmp(wsrep_provider, WSREP_NONE))
+ {
+ // enable normal operation in case no provider is specified
+ wsrep_ready= TRUE;
+ global_system_variables.wsrep_on = 0;
+ }
+ else
+ {
+ global_system_variables.wsrep_on = 1;
+ strncpy(provider_name,
+ wsrep->provider_name, sizeof(provider_name) - 1);
+ strncpy(provider_version,
+ wsrep->provider_version, sizeof(provider_version) - 1);
+ strncpy(provider_vendor,
+ wsrep->provider_vendor, sizeof(provider_vendor) - 1);
+ }
+
+ struct wsrep_init_args wsrep_args;
+
+ if (!wsrep_data_home_dir || strlen(wsrep_data_home_dir) == 0)
+ wsrep_data_home_dir = mysql_real_data_home;
+
+ if (strcmp (wsrep_provider, WSREP_NONE) &&
+ (!wsrep_node_incoming_address ||
+ !strcmp (wsrep_node_incoming_address, WSREP_NODE_INCOMING_AUTO))) {
+ static char inc_addr[256];
+ size_t inc_addr_max = sizeof (inc_addr);
+ size_t ret = default_address (inc_addr, inc_addr_max);
+ if (ret > 0 && ret < inc_addr_max) {
+ wsrep_node_incoming_address = inc_addr;
+ }
+ else {
+ wsrep_node_incoming_address = NULL;
+ }
+ }
+
+ wsrep_args.data_dir = wsrep_data_home_dir;
+ wsrep_args.node_name = (wsrep_node_name) ? wsrep_node_name : "";
+ wsrep_args.node_address = (wsrep_node_address) ? wsrep_node_address : "";
+ wsrep_args.node_incoming = wsrep_node_incoming_address;
+ wsrep_args.options = (wsrep_provider_options) ?
+ wsrep_provider_options : "";
+ wsrep_args.proto_ver = wsrep_max_protocol_version;
+
+ wsrep_args.state_uuid = &local_uuid;
+ wsrep_args.state_seqno = local_seqno;
+
+ wsrep_args.logger_cb = wsrep_log_cb;
+ wsrep_args.view_handler_cb = wsrep_view_handler_cb;
+ wsrep_args.apply_cb = wsrep_apply_cb;
+ wsrep_args.commit_cb = wsrep_commit_cb;
+ wsrep_args.sst_donate_cb = wsrep_sst_donate_cb;
+ wsrep_args.synced_cb = wsrep_synced_cb;
+
+ rcode = wsrep->init(wsrep, &wsrep_args);
+
+ if (rcode)
+ {
+ DBUG_PRINT("wsrep",("wsrep::init() failed: %d", rcode));
+ WSREP_ERROR("wsrep::init() failed: %d, must shutdown", rcode);
+ free(wsrep);
+ wsrep = NULL;
+ }
+
+ return rcode;
+}
+
+extern "C" int wsrep_on(void *);
+
+void wsrep_init_startup (bool first)
+{
+ if (wsrep_init()) unireg_abort(1);
+
+ wsrep_thr_lock_init(wsrep_thd_is_brute_force, wsrep_abort_thd,
+ wsrep_debug, wsrep_convert_LOCK_to_trx, wsrep_on);
+
+ /* Skip replication start if no cluster address */
+ if (!wsrep_cluster_address || strlen(wsrep_cluster_address) == 0) return;
+
+ if (first) wsrep_sst_grab(); // do it so we can wait for SST below
+
+ if (!wsrep_start_replication()) unireg_abort(1);
+
+ wsrep_create_rollbacker();
+ wsrep_create_appliers(1);
+
+ if (first && !wsrep_sst_wait()) unireg_abort(1);// wait until SST is completed
+}
+
+
+void wsrep_deinit()
+{
+ wsrep_unload(wsrep);
+ wsrep= 0;
+ provider_name[0]= '\0';
+ provider_version[0]= '\0';
+ provider_vendor[0]= '\0';
+}
+
+void wsrep_recover()
+{
+ XID xid;
+ memset(&xid, 0, sizeof(xid));
+ xid.formatID= -1;
+ wsrep_get_SE_checkpoint(&xid);
+ char uuid_str[40];
+ wsrep_uuid_print(wsrep_xid_uuid(&xid), uuid_str, sizeof(uuid_str));
+ WSREP_INFO("Recovered position: %s:%lld", uuid_str,
+ (long long)wsrep_xid_seqno(&xid));
+}
+
+
+void wsrep_stop_replication(THD *thd)
+{
+ WSREP_INFO("Stop replication");
+ if (!wsrep)
+ {
+ WSREP_INFO("Provider was not loaded, in stop replication");
+ return;
+ }
+
+ /* disconnect from group first to get wsrep_ready == FALSE */
+ WSREP_DEBUG("Provider disconnect");
+ wsrep->disconnect(wsrep);
+
+ wsrep_connected= FALSE;
+
+ wsrep_close_client_connections(TRUE);
+
+ /* wait until appliers have stopped */
+ wsrep_wait_appliers_close(thd);
+
+ return;
+}
+
+
+bool wsrep_start_replication()
+{
+ wsrep_status_t rcode;
+
+ /*
+ if provider is trivial, don't even try to connect,
+ but resume local node operation
+ */
+ if (strlen(wsrep_provider)== 0 ||
+ !strcmp(wsrep_provider, WSREP_NONE))
+ {
+ // enable normal operation in case no provider is specified
+ wsrep_ready = TRUE;
+ return true;
+ }
+
+ if (!wsrep_cluster_address || strlen(wsrep_cluster_address)== 0)
+ {
+ // if provider is non-trivial, but no address is specified, wait for address
+ wsrep_ready = FALSE;
+ return true;
+ }
+
+ WSREP_INFO("Start replication");
+
+ if ((rcode = wsrep->connect(wsrep,
+ wsrep_cluster_name,
+ wsrep_cluster_address,
+ wsrep_sst_donor)))
+ {
+ if (-ESOCKTNOSUPPORT == rcode)
+ {
+ DBUG_PRINT("wsrep",("unrecognized cluster address: '%s', rcode: %d",
+ wsrep_cluster_address, rcode));
+ WSREP_ERROR("unrecognized cluster address: '%s', rcode: %d",
+ wsrep_cluster_address, rcode);
+ }
+ else
+ {
+ DBUG_PRINT("wsrep",("wsrep->connect() failed: %d", rcode));
+ WSREP_ERROR("wsrep::connect() failed: %d", rcode);
+ }
+
+ return false;
+ }
+ else
+ {
+ wsrep_connected= TRUE;
+
+ uint64_t caps = wsrep->capabilities (wsrep);
+
+ wsrep_incremental_data_collection =
+ (caps & WSREP_CAP_WRITE_SET_INCREMENTS);
+
+ char* opts= wsrep->options_get(wsrep);
+ if (opts)
+ {
+ wsrep_provider_options_init(opts);
+ free(opts);
+ }
+ else
+ {
+ WSREP_WARN("Failed to get wsrep options");
+ }
+ }
+
+ return true;
+}
+
+bool
+wsrep_causal_wait (THD* thd)
+{
+ if (thd->variables.wsrep_causal_reads && thd->variables.wsrep_on &&
+ !thd->in_active_multi_stmt_transaction())
+ {
+ // This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0
+ // TODO: modify to check if thd has locked any rows.
+ wsrep_seqno_t seqno;
+ wsrep_status_t ret= wsrep->causal_read (wsrep, &seqno);
+
+ if (unlikely(WSREP_OK != ret))
+ {
+ const char* msg;
+ int err;
+
+ // Possibly relevant error codes:
+ // ER_CHECKREAD, ER_ERROR_ON_READ, ER_INVALID_DEFAULT, ER_EMPTY_QUERY,
+ // ER_FUNCTION_NOT_DEFINED, ER_NOT_ALLOWED_COMMAND, ER_NOT_SUPPORTED_YET,
+ // ER_FEATURE_DISABLED, ER_QUERY_INTERRUPTED
+
+ switch (ret)
+ {
+ case WSREP_NOT_IMPLEMENTED:
+ msg= "consistent reads by wsrep backend. "
+ "Please unset wsrep_causal_reads variable.";
+ err= ER_NOT_SUPPORTED_YET;
+ break;
+ default:
+ msg= "Causal wait failed.";
+ err= ER_ERROR_ON_READ;
+ }
+
+ my_error(err, MYF(0), msg);
+
+ return true;
+ }
+ }
+
+ return false;
+}
+
+bool wsrep_prepare_key_for_isolation(const char* db,
+ const char* table,
+ wsrep_key_part_t* key,
+ size_t* key_len)
+{
+ if (*key_len < 2) return false;
+
+ switch (wsrep_protocol_version)
+ {
+ case 0:
+ *key_len= 0;
+ break;
+ case 1:
+ {
+ *key_len= 0;
+ if (db)
+ {
+ // sql_print_information("%s.%s", db, table);
+ if (db)
+ {
+ key[*key_len].buf= db;
+ key[*key_len].buf_len= strlen(db);
+ ++(*key_len);
+ if (table)
+ {
+ key[*key_len].buf= table;
+ key[*key_len].buf_len= strlen(table);
+ ++(*key_len);
+ }
+ }
+ }
+ break;
+ }
+ default:
+ return false;
+ }
+
+ return true;
+}
+
+bool wsrep_prepare_key_for_innodb(const uchar* cache_key,
+ size_t cache_key_len,
+ const uchar* row_id,
+ size_t row_id_len,
+ wsrep_key_part_t* key,
+ size_t* key_len)
+{
+ if (*key_len < 3) return false;
+
+ *key_len= 0;
+ switch (wsrep_protocol_version)
+ {
+ case 0:
+ {
+ key[*key_len].buf = cache_key;
+ key[*key_len].buf_len = cache_key_len;
+ ++(*key_len);
+ break;
+ }
+ case 1:
+ {
+ key[*key_len].buf = cache_key;
+ key[*key_len].buf_len = strlen( (char*)cache_key );
+ ++(*key_len);
+ key[*key_len].buf = cache_key + strlen( (char*)cache_key ) + 1;
+ key[*key_len].buf_len = strlen( (char*)(key[*key_len].buf) );
+ ++(*key_len);
+ break;
+ }
+ default:
+ return false;
+ }
+
+ key[*key_len].buf = row_id;
+ key[*key_len].buf_len = row_id_len;
+ ++(*key_len);
+
+ return true;
+}
+
+/*
+ * Construct Query_log_Event from thd query and serialize it
+ * into buffer.
+ *
+ * Return 0 in case of success, 1 in case of error.
+ */
+int wsrep_to_buf_helper(
+ THD* thd, const char *query, uint query_len, uchar** buf, uint* buf_len)
+{
+ IO_CACHE tmp_io_cache;
+ if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX,
+ 65536, MYF(MY_WME)))
+ return 1;
+ Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0);
+ int ret(0);
+ if (ev.write(&tmp_io_cache)) ret= 1;
+ if (!ret && wsrep_write_cache(&tmp_io_cache, buf, buf_len)) ret= 1;
+ close_cached_file(&tmp_io_cache);
+ return ret;
+}
+
+#include "sql_show.h"
+static int
+create_view_query(THD *thd, uchar** buf, uint* buf_len)
+{
+ LEX *lex= thd->lex;
+ SELECT_LEX *select_lex= &lex->select_lex;
+ TABLE_LIST *first_table= select_lex->table_list.first;
+ TABLE_LIST *views = first_table;
+
+ String buff;
+ const LEX_STRING command[3]=
+ {{ C_STRING_WITH_LEN("CREATE ") },
+ { C_STRING_WITH_LEN("ALTER ") },
+ { C_STRING_WITH_LEN("CREATE OR REPLACE ") }};
+
+ buff.append(command[thd->lex->create_view_mode].str,
+ command[thd->lex->create_view_mode].length);
+
+ if (!lex->definer)
+ {
+ /*
+ DEFINER-clause is missing; we have to create default definer in
+ persistent arena to be PS/SP friendly.
+ If this is an ALTER VIEW then the current user should be set as
+ the definer.
+ */
+
+ if (!(lex->definer= create_default_definer(thd)))
+ {
+ WSREP_WARN("view default definer issue");
+ }
+ }
+
+ views->algorithm = lex->create_view_algorithm;
+ views->definer.user = lex->definer->user;
+ views->definer.host = lex->definer->host;
+ views->view_suid = lex->create_view_suid;
+ views->with_check = lex->create_view_check;
+
+ view_store_options(thd, views, &buff);
+ buff.append(STRING_WITH_LEN("VIEW "));
+ /* Test if user supplied a db (ie: we did not use thd->db) */
+ if (views->db && views->db[0] &&
+ (thd->db == NULL || strcmp(views->db, thd->db)))
+ {
+ append_identifier(thd, &buff, views->db,
+ views->db_length);
+ buff.append('.');
+ }
+ append_identifier(thd, &buff, views->table_name,
+ views->table_name_length);
+ if (lex->view_list.elements)
+ {
+ List_iterator_fast<LEX_STRING> names(lex->view_list);
+ LEX_STRING *name;
+ int i;
+
+ for (i= 0; (name= names++); i++)
+ {
+ buff.append(i ? ", " : "(");
+ append_identifier(thd, &buff, name->str, name->length);
+ }
+ buff.append(')');
+ }
+ buff.append(STRING_WITH_LEN(" AS "));
+ //buff.append(views->source.str, views->source.length);
+ buff.append(thd->lex->create_view_select.str,
+ thd->lex->create_view_select.length);
+ //int errcode= query_error_code(thd, TRUE);
+ //if (thd->binlog_query(THD::STMT_QUERY_TYPE,
+ // buff.ptr(), buff.length(), FALSE, FALSE, FALSE, errcod
+ return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len);
+}
+
+static int wsrep_TOI_begin(THD *thd, char *db_, char *table_)
+{
+ wsrep_status_t ret(WSREP_WARNING);
+ uchar* buf(0);
+ uint buf_len(0);
+ int buf_err;
+
+ wsrep_key_part_t wkey_part[2];
+ wsrep_key_t wkey = {wkey_part, 2};
+ WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno,
+ thd->wsrep_exec_mode, thd->query() );
+ switch (thd->lex->sql_command)
+ {
+ case SQLCOM_CREATE_VIEW:
+ buf_err= create_view_query(thd, &buf, &buf_len);
+ break;
+ case SQLCOM_CREATE_PROCEDURE:
+ case SQLCOM_CREATE_SPFUNCTION:
+ buf_err= wsrep_create_sp(thd, &buf, &buf_len);
+ break;
+ case SQLCOM_CREATE_TRIGGER:
+ buf_err= wsrep_create_trigger_query(thd, &buf, &buf_len);
+ break;
+ case SQLCOM_CREATE_EVENT:
+ buf_err= wsrep_create_event_query(thd, &buf, &buf_len);
+ break;
+ default:
+ buf_err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(), &buf,
+ &buf_len);
+ break;
+ }
+
+ if (!buf_err &&
+ wsrep_prepare_key_for_isolation(db_, table_, wkey_part,
+ &wkey.key_parts_len) &&
+ WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id,
+ &wkey, 1,
+ buf, buf_len,
+ &thd->wsrep_trx_seqno)))
+ {
+ thd->wsrep_exec_mode= TOTAL_ORDER;
+ wsrep_to_isolation++;
+ if (buf) my_free(buf);
+ WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)thd->wsrep_trx_seqno,
+ thd->wsrep_exec_mode);
+ }
+ else {
+ /* jump to error handler in mysql_execute_command() */
+ WSREP_WARN("TO isolation failed for: %d, sql: %s. Check wsrep "
+ "connection state and retry the query.",
+ ret, (thd->query()) ? thd->query() : "void");
+ my_error(ER_LOCK_DEADLOCK, MYF(0), "WSREP replication failed. Check "
+ "your wsrep connection state and retry the query.");
+ if (buf) my_free(buf);
+ return -1;
+ }
+ return 0;
+}
+
+static void wsrep_TOI_end(THD *thd) {
+ wsrep_status_t ret;
+ wsrep_to_isolation--;
+ WSREP_DEBUG("TO END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno,
+ thd->wsrep_exec_mode, (thd->query()) ? thd->query() : "void")
+ if (WSREP_OK == (ret = wsrep->to_execute_end(wsrep, thd->thread_id))) {
+ WSREP_DEBUG("TO END: %lld", (long long)thd->wsrep_trx_seqno);
+ }
+ else {
+ WSREP_WARN("TO isolation end failed for: %d, sql: %s",
+ ret, (thd->query()) ? thd->query() : "void");
+ }
+}
+
+static int wsrep_RSU_begin(THD *thd, char *db_, char *table_)
+{
+ wsrep_status_t ret(WSREP_WARNING);
+ WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno,
+ thd->wsrep_exec_mode, thd->query() );
+
+ ret = wsrep->desync(wsrep);
+ if (ret != WSREP_OK)
+ {
+ WSREP_WARN("desync failed %d for %s", ret, thd->query());
+ return(ret);
+ }
+ wsrep_seqno_t seqno = wsrep->pause(wsrep);
+ if (seqno == WSREP_SEQNO_UNDEFINED)
+ {
+ WSREP_WARN("pause failed %lld for %s", (long long)seqno, thd->query());
+ return(1);
+ }
+ WSREP_DEBUG("paused at %lld", (long long)seqno);
+ thd->variables.wsrep_on = 0;
+ return 0;
+}
+
+static void wsrep_RSU_end(THD *thd)
+{
+ wsrep_status_t ret(WSREP_WARNING);
+ WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno,
+ thd->wsrep_exec_mode, thd->query() );
+
+ ret = wsrep->resume(wsrep);
+ if (ret != WSREP_OK)
+ {
+ WSREP_WARN("resume failed %d for %s", ret, thd->query());
+ }
+ ret = wsrep->resync(wsrep);
+ if (ret != WSREP_OK)
+ {
+ WSREP_WARN("resync failed %d for %s", ret, thd->query());
+ return;
+ }
+ thd->variables.wsrep_on = 1;
+ return;
+}
+
+int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_)
+{
+ int ret= 0;
+ if (thd->variables.wsrep_on && thd->wsrep_exec_mode==LOCAL_STATE)
+ {
+ switch (wsrep_OSU_method_options) {
+ case WSREP_OSU_TOI: ret = wsrep_TOI_begin(thd, db_, table_); break;
+ case WSREP_OSU_RSU: ret = wsrep_RSU_begin(thd, db_, table_); break;
+ }
+ if (!ret)
+ {
+ thd->wsrep_exec_mode= TOTAL_ORDER;
+ }
+ }
+ return ret;
+}
+
+void wsrep_to_isolation_end(THD *thd) {
+ if (thd->wsrep_exec_mode==TOTAL_ORDER)
+ {
+ switch(wsrep_OSU_method_options)
+ {
+ case WSREP_OSU_TOI: return wsrep_TOI_end(thd);
+ case WSREP_OSU_RSU: return wsrep_RSU_end(thd);
+ }
+ }
+}
+
+#define WSREP_MDL_LOG(severity, msg, req, gra) \
+ WSREP_##severity( \
+ "%s\n" \
+ "request: (%lu \tseqno %lld \tmode %d \tQstate \t%d cmd %d %d \t%s)\n" \
+ "granted: (%lu \tseqno %lld \tmode %d \tQstate \t%d cmd %d %d \t%s)", \
+ msg, \
+ req->thread_id, (long long)req->wsrep_trx_seqno, \
+ req->wsrep_exec_mode, req->wsrep_query_state, \
+ req->command, req->lex->sql_command, req->query(), \
+ gra->thread_id, (long long)gra->wsrep_trx_seqno, \
+ gra->wsrep_exec_mode, gra->wsrep_query_state, \
+ gra->command, gra->lex->sql_command, gra->query());
+
+bool
+wsrep_grant_mdl_exception(MDL_context *requestor_ctx,
+ MDL_ticket *ticket
+) {
+ if (!WSREP_ON) return FALSE;
+
+ THD *request_thd = requestor_ctx->get_thd();
+ THD *granted_thd = ticket->get_ctx()->get_thd();
+
+ mysql_mutex_lock(&request_thd->LOCK_wsrep_thd);
+ if (request_thd->wsrep_exec_mode == TOTAL_ORDER ||
+ request_thd->wsrep_exec_mode == REPL_RECV)
+ {
+ mysql_mutex_unlock(&request_thd->LOCK_wsrep_thd);
+ WSREP_MDL_LOG(DEBUG, "MDL conflict", request_thd, granted_thd);
+
+ mysql_mutex_lock(&granted_thd->LOCK_wsrep_thd);
+ if (granted_thd->wsrep_exec_mode == TOTAL_ORDER ||
+ granted_thd->wsrep_exec_mode == REPL_RECV)
+ {
+ WSREP_MDL_LOG(INFO, "MDL BF-BF conflict", request_thd, granted_thd);
+ mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd);
+ return TRUE;
+ }
+ else if (granted_thd->lex->sql_command == SQLCOM_FLUSH)
+ {
+ WSREP_DEBUG("mdl granted over FLUSH BF");
+ mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd);
+ return TRUE;
+ }
+ else if (request_thd->lex->sql_command == SQLCOM_DROP_TABLE)
+ {
+ WSREP_DEBUG("DROP caused BF abort");
+ mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd);
+ wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1);
+ return FALSE;
+ }
+ else if (granted_thd->wsrep_query_state == QUERY_COMMITTING)
+ {
+ WSREP_DEBUG("mdl granted, but commiting thd abort scheduled");
+ mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd);
+ wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1);
+ return FALSE;
+ }
+ else
+ {
+ WSREP_MDL_LOG(INFO, "MDL conflict -> BF abort", request_thd, granted_thd);
+ mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd);
+ wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1);
+ return FALSE;
+ }
+ }
+ else
+ {
+ mysql_mutex_unlock(&request_thd->LOCK_wsrep_thd);
+ }
+ return FALSE;
+}
diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h
new file mode 100644
index 00000000000..90498c266ee
--- /dev/null
+++ b/sql/wsrep_mysqld.h
@@ -0,0 +1,297 @@
+/* Copyright 2008 Codership Oy <http://www.codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef WSREP_MYSQLD_H
+#define WSREP_MYSQLD_H
+
+#include "mysqld.h"
+typedef struct st_mysql_show_var SHOW_VAR;
+//#include <mysql.h>
+#include <sql_priv.h>
+#include "../wsrep/wsrep_api.h"
+//#include <sql_class.h>
+
+class set_var;
+class THD;
+
+// Global wsrep parameters
+extern wsrep_t* wsrep;
+
+// MySQL wsrep options
+extern const char* wsrep_provider;
+extern const char* wsrep_provider_options;
+extern const char* wsrep_cluster_name;
+extern const char* wsrep_cluster_address;
+extern const char* wsrep_node_name;
+extern const char* wsrep_node_address;
+extern const char* wsrep_node_incoming_address;
+extern const char* wsrep_data_home_dir;
+extern const char* wsrep_dbug_option;
+extern long wsrep_slave_threads;
+extern my_bool wsrep_debug;
+extern my_bool wsrep_convert_LOCK_to_trx;
+extern ulong wsrep_retry_autocommit;
+extern my_bool wsrep_auto_increment_control;
+extern my_bool wsrep_drupal_282555_workaround;
+extern my_bool wsrep_incremental_data_collection;
+extern const char* wsrep_sst_method;
+extern const char* wsrep_sst_receive_address;
+extern char* wsrep_sst_auth;
+extern const char* wsrep_sst_donor;
+extern const char* wsrep_start_position;
+extern long long wsrep_max_ws_size;
+extern long wsrep_max_ws_rows;
+extern const char* wsrep_notify_cmd;
+extern my_bool wsrep_certify_nonPK;
+extern long wsrep_max_protocol_version;
+extern long wsrep_protocol_version;
+extern ulong wsrep_forced_binlog_format;
+extern ulong wsrep_OSU_method_options;
+extern my_bool wsrep_recovery;
+
+enum enum_wsrep_OSU_method { WSREP_OSU_TOI, WSREP_OSU_RSU };
+
+// MySQL status variables
+extern my_bool wsrep_connected;
+extern my_bool wsrep_ready;
+extern const char* wsrep_cluster_state_uuid;
+extern long long wsrep_cluster_conf_id;
+extern const char* wsrep_cluster_status;
+extern long wsrep_cluster_size;
+extern long wsrep_local_index;
+extern const char* wsrep_provider_name;
+extern const char* wsrep_provider_version;
+extern const char* wsrep_provider_vendor;
+extern int wsrep_show_status(THD *thd, SHOW_VAR *var, char *buff);
+
+#define WSREP_SST_ADDRESS_AUTO "AUTO"
+// MySQL variables funcs
+
+#define CHECK_ARGS (sys_var *self, THD* thd, set_var *var)
+#define UPDATE_ARGS (sys_var *self, THD* thd, enum_var_type type)
+#define DEFAULT_ARGS (THD* thd, enum_var_type var_type)
+#define INIT_ARGS (const char* opt)
+
+extern int wsrep_init_vars();
+
+extern bool wsrep_on_update UPDATE_ARGS;
+extern void wsrep_causal_reads_update UPDATE_ARGS;
+extern bool wsrep_start_position_check CHECK_ARGS;
+extern bool wsrep_start_position_update UPDATE_ARGS;
+extern void wsrep_start_position_init INIT_ARGS;
+
+extern bool wsrep_provider_check CHECK_ARGS;
+extern bool wsrep_provider_update UPDATE_ARGS;
+extern void wsrep_provider_init INIT_ARGS;
+
+extern bool wsrep_provider_options_check CHECK_ARGS;
+extern bool wsrep_provider_options_update UPDATE_ARGS;
+extern void wsrep_provider_options_init INIT_ARGS;
+
+extern bool wsrep_cluster_address_check CHECK_ARGS;
+extern bool wsrep_cluster_address_update UPDATE_ARGS;
+extern void wsrep_cluster_address_init INIT_ARGS;
+
+extern bool wsrep_cluster_name_check CHECK_ARGS;
+extern bool wsrep_cluster_name_update UPDATE_ARGS;
+
+extern bool wsrep_node_name_check CHECK_ARGS;
+extern bool wsrep_node_name_update UPDATE_ARGS;
+
+extern bool wsrep_node_address_check CHECK_ARGS;
+extern bool wsrep_node_address_update UPDATE_ARGS;
+extern void wsrep_node_address_init INIT_ARGS;
+
+extern bool wsrep_sst_method_check CHECK_ARGS;
+extern bool wsrep_sst_method_update UPDATE_ARGS;
+extern void wsrep_sst_method_init INIT_ARGS;
+
+extern bool wsrep_sst_receive_address_check CHECK_ARGS;
+extern bool wsrep_sst_receive_address_update UPDATE_ARGS;
+
+extern bool wsrep_sst_auth_check CHECK_ARGS;
+extern bool wsrep_sst_auth_update UPDATE_ARGS;
+extern void wsrep_sst_auth_init INIT_ARGS;
+
+extern bool wsrep_sst_donor_check CHECK_ARGS;
+extern bool wsrep_sst_donor_update UPDATE_ARGS;
+
+
+extern bool wsrep_init_first(); // initialize wsrep before storage
+ // engines or after
+extern int wsrep_init();
+extern void wsrep_deinit();
+extern void wsrep_recover();
+
+/* wsrep initialization sequence at startup
+ * @param first wsrep_init_first() value */
+extern void wsrep_init_startup(bool first);
+
+extern void wsrep_close_client_connections(my_bool wait_to_end);
+extern void wsrep_close_applier(THD *thd);
+extern void wsrep_wait_appliers_close(THD *thd);
+extern void wsrep_create_appliers(long threads = wsrep_slave_threads);
+extern void wsrep_create_rollbacker();
+extern void wsrep_kill_mysql(THD *thd);
+
+/* new defines */
+extern void wsrep_stop_replication(THD *thd);
+extern bool wsrep_start_replication();
+extern bool wsrep_causal_wait(THD* thd);
+extern int wsrep_check_opts (int argc, char* const* argv);
+extern void wsrep_prepend_PATH (const char* path);
+
+/* Other global variables */
+extern wsrep_seqno_t wsrep_locked_seqno;
+
+#define WSREP_ON \
+ (global_system_variables.wsrep_on)
+
+#define WSREP(thd) \
+ (WSREP_ON && (thd && thd->variables.wsrep_on))
+
+#define WSREP_EMULATE_BINLOG(thd) \
+ (WSREP(thd) && wsrep_emulate_bin_log)
+
+// MySQL logging functions don't seem to understand long long length modifer.
+// This is a workaround. It also prefixes all messages with "WSREP"
+#define WSREP_LOG(fun, ...) \
+ { \
+ char msg[256] = {'\0'}; \
+ snprintf(msg, sizeof(msg) - 1, ## __VA_ARGS__); \
+ fun("WSREP: %s", msg); \
+ }
+
+#define WSREP_DEBUG(...) \
+ if (wsrep_debug) WSREP_LOG(sql_print_information, ##__VA_ARGS__)
+#define WSREP_INFO(...) WSREP_LOG(sql_print_information, ##__VA_ARGS__)
+#define WSREP_WARN(...) WSREP_LOG(sql_print_warning, ##__VA_ARGS__)
+#define WSREP_ERROR(...) WSREP_LOG(sql_print_error, ##__VA_ARGS__)
+
+/*! Synchronizes applier thread start with init thread */
+extern void wsrep_sst_grab();
+/*! Init thread waits for SST completion */
+extern bool wsrep_sst_wait();
+/*! Signals wsrep that initialization is complete, writesets can be applied */
+extern void wsrep_sst_continue();
+
+extern void wsrep_SE_init_grab(); /*! grab init critical section */
+extern void wsrep_SE_init_wait(); /*! wait for SE init to complete */
+extern void wsrep_SE_init_done(); /*! signal that SE init is complte */
+extern void wsrep_SE_initialized(); /*! mark SE initialization complete */
+
+extern void wsrep_ready_wait();
+
+enum wsrep_trx_status {
+ WSREP_TRX_OK,
+ WSREP_TRX_ROLLBACK,
+ WSREP_TRX_ERROR,
+ };
+
+extern enum wsrep_trx_status
+wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all);
+class Ha_trx_info;
+struct THD_TRANS;
+void wsrep_register_hton(THD* thd, bool all);
+
+/*!
+ * @param db Database string
+ * @param table Table string
+ * @param key Array of wsrep_key_t
+ * @param key_len In: number of elements in key array, Out: number of
+ * elements populated
+ *
+ * @return true if preparation was successful, otherwise false.
+ */
+bool wsrep_prepare_key_for_isolation(const char* db,
+ const char* table,
+ wsrep_key_part_t* key,
+ size_t *key_len);
+
+void wsrep_replication_process(THD *thd);
+void wsrep_rollback_process(THD *thd);
+void wsrep_brute_force_killer(THD *thd);
+int wsrep_hire_brute_force_killer(THD *thd, uint64_t trx_id);
+extern "C" bool wsrep_consistency_check(void *thd_ptr);
+extern "C" int wsrep_thd_is_brute_force(void *thd_ptr);
+extern "C" int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr,
+ my_bool signal);
+extern "C" int wsrep_thd_in_locking_session(void *thd_ptr);
+void *wsrep_prepare_bf_thd(THD *thd);
+void wsrep_return_from_bf_mode(void *shadow, THD *thd);
+
+/* this is visible for client build so that innodb plugin gets this */
+typedef struct wsrep_aborting_thd {
+ struct wsrep_aborting_thd *next;
+ THD *aborting_thd;
+} *wsrep_aborting_thd_t;
+
+extern mysql_mutex_t LOCK_wsrep_ready;
+extern mysql_cond_t COND_wsrep_ready;
+extern mysql_mutex_t LOCK_wsrep_sst;
+extern mysql_cond_t COND_wsrep_sst;
+extern mysql_mutex_t LOCK_wsrep_sst_init;
+extern mysql_cond_t COND_wsrep_sst_init;
+extern mysql_mutex_t LOCK_wsrep_rollback;
+extern mysql_cond_t COND_wsrep_rollback;
+extern int wsrep_replaying;
+extern mysql_mutex_t LOCK_wsrep_replaying;
+extern mysql_cond_t COND_wsrep_replaying;
+extern wsrep_aborting_thd_t wsrep_aborting_thd;
+extern MYSQL_PLUGIN_IMPORT my_bool wsrep_debug;
+extern my_bool wsrep_convert_LOCK_to_trx;
+extern ulong wsrep_retry_autocommit;
+extern my_bool wsrep_emulate_bin_log;
+extern my_bool wsrep_auto_increment_control;
+extern my_bool wsrep_drupal_282555_workaround;
+extern long long wsrep_max_ws_size;
+extern long wsrep_max_ws_rows;
+extern int wsrep_to_isolation;
+extern my_bool wsrep_certify_nonPK;
+
+extern PSI_mutex_key key_LOCK_wsrep_ready;
+extern PSI_mutex_key key_COND_wsrep_ready;
+extern PSI_mutex_key key_LOCK_wsrep_sst;
+extern PSI_cond_key key_COND_wsrep_sst;
+extern PSI_mutex_key key_LOCK_wsrep_sst_init;
+extern PSI_cond_key key_COND_wsrep_sst_init;
+extern PSI_mutex_key key_LOCK_wsrep_sst_thread;
+extern PSI_cond_key key_COND_wsrep_sst_thread;
+extern PSI_mutex_key key_LOCK_wsrep_rollback;
+extern PSI_cond_key key_COND_wsrep_rollback;
+extern PSI_mutex_key key_LOCK_wsrep_replaying;
+extern PSI_cond_key key_COND_wsrep_replaying;
+
+int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_);
+void wsrep_to_isolation_end(THD *thd);
+
+void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow*);
+void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow*);
+int wsrep_to_buf_helper(
+ THD* thd, const char *query, uint query_len, uchar** buf, uint* buf_len);
+int wsrep_create_sp(THD *thd, uchar** buf, uint* buf_len);
+int wsrep_create_trigger_query(THD *thd, uchar** buf, uint* buf_len);
+int wsrep_create_event_query(THD *thd, uchar** buf, uint* buf_len);
+
+const wsrep_uuid_t* wsrep_cluster_uuid();
+struct xid_t;
+void wsrep_set_SE_checkpoint(xid_t*);
+
+void wsrep_xid_init(xid_t*, const wsrep_uuid_t*, wsrep_seqno_t);
+const wsrep_uuid_t* wsrep_xid_uuid(const xid_t*);
+wsrep_seqno_t wsrep_xid_seqno(const xid_t*);
+extern "C" int wsrep_is_wsrep_xid(const void* xid);
+
+#endif /* WSREP_MYSQLD_H */
diff --git a/sql/wsrep_notify.cc b/sql/wsrep_notify.cc
new file mode 100644
index 00000000000..ff997d01183
--- /dev/null
+++ b/sql/wsrep_notify.cc
@@ -0,0 +1,107 @@
+/* Copyright 2010 Codership Oy <http://www.codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <mysqld.h>
+#include "wsrep_priv.h"
+
+const char* wsrep_notify_cmd="";
+
+static const char* _status_str(wsrep_member_status_t status)
+{
+ switch (status)
+ {
+ case WSREP_MEMBER_UNDEFINED: return "Undefined";
+ case WSREP_MEMBER_JOINER: return "Joiner";
+ case WSREP_MEMBER_DONOR: return "Donor";
+ case WSREP_MEMBER_JOINED: return "Joined";
+ case WSREP_MEMBER_SYNCED: return "Synced";
+ default: return "Error(?)";
+ }
+}
+
+void wsrep_notify_status (wsrep_member_status_t status,
+ const wsrep_view_info_t* view)
+{
+ if (!wsrep_notify_cmd || 0 == strlen(wsrep_notify_cmd))
+ {
+ WSREP_INFO("wsrep_notify_cmd is not defined, skipping notification.");
+ return;
+ }
+
+ char cmd_buf[1 << 16]; // this can be long
+ long cmd_len = sizeof(cmd_buf) - 1;
+ char* cmd_ptr = cmd_buf;
+ long cmd_off = 0;
+
+ cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, "%s",
+ wsrep_notify_cmd);
+
+ if (status >= WSREP_MEMBER_UNDEFINED && status < WSREP_MEMBER_ERROR)
+ {
+ cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, " --status %s",
+ _status_str(status));
+ }
+ else
+ {
+ /* here we preserve provider error codes */
+ cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off,
+ " --status 'Error(%d)'", status);
+ }
+
+ if (0 != view)
+ {
+ char uuid_str[40];
+
+ wsrep_uuid_print (&view->uuid, uuid_str, sizeof(uuid_str));
+ cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off,
+ " --uuid %s", uuid_str);
+
+ cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off,
+ " --primary %s", view->view >= 0 ? "yes" : "no");
+
+ cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off,
+ " --index %d", view->my_idx);
+
+ cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, " --members");
+
+ for (int i = 0; i < view->memb_num; i++)
+ {
+ wsrep_uuid_print (&view->members[i].id, uuid_str, sizeof(uuid_str));
+ cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off,
+ "%c%s/%s/%s", i > 0 ? ',' : ' ',
+ uuid_str, view->members[i].name,
+ view->members[i].incoming);
+ }
+ }
+
+ if (cmd_off == cmd_len)
+ {
+ WSREP_ERROR("Notification buffer too short (%ld). Aborting notification.",
+ cmd_len);
+ return;
+ }
+
+ wsp::process p(cmd_ptr, "r");
+
+ p.wait();
+ int err = p.error();
+
+ if (err)
+ {
+ WSREP_ERROR("Notification command failed: %d (%s): \"%s\"",
+ err, strerror(err), cmd_ptr);
+ }
+}
+
diff --git a/sql/wsrep_priv.h b/sql/wsrep_priv.h
new file mode 100644
index 00000000000..4db8abf68de
--- /dev/null
+++ b/sql/wsrep_priv.h
@@ -0,0 +1,231 @@
+/* Copyright 2010 Codership Oy <http://www.codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+//! @file declares symbols private to wsrep integration layer
+
+#ifndef WSREP_PRIV_H
+#define WSREP_PRIV_H
+
+#include "wsrep_mysqld.h"
+#include "../wsrep/wsrep_api.h"
+
+#include <log.h>
+#include <pthread.h>
+#include <cstdio>
+
+extern ssize_t wsrep_sst_prepare (void** msg);
+extern int wsrep_sst_donate_cb (void* app_ctx,
+ void* recv_ctx,
+ const void* msg, size_t msg_len,
+ const wsrep_uuid_t* current_uuid,
+ wsrep_seqno_t current_seqno,
+ const char* state, size_t state_len,
+ bool bypass);
+
+extern size_t default_ip (char* buf, size_t buf_len);
+extern size_t default_address(char* buf, size_t buf_len);
+
+extern wsrep_uuid_t local_uuid;
+extern wsrep_seqno_t local_seqno;
+
+/*! SST thread signals init thread about sst completion */
+extern void wsrep_sst_complete(wsrep_uuid_t* uuid, wsrep_seqno_t seqno, bool);
+
+extern void wsrep_notify_status (wsrep_member_status_t new_status,
+ const wsrep_view_info_t* view = 0);
+
+namespace wsp {
+class node_status
+{
+public:
+ node_status() : status(WSREP_MEMBER_UNDEFINED) {}
+ void set(wsrep_member_status_t new_status,
+ const wsrep_view_info_t* view = 0)
+ {
+ if (status != new_status || 0 != view)
+ {
+ wsrep_notify_status(new_status, view);
+ status = new_status;
+ }
+ }
+ wsrep_member_status_t get() const { return status; }
+private:
+ wsrep_member_status_t status;
+};
+} /* namespace wsp */
+
+extern wsp::node_status local_status;
+
+namespace wsp {
+/* A small class to run external programs. */
+class process
+{
+private:
+ const char* const str_;
+ FILE* io_;
+ int err_;
+ pid_t pid_;
+
+public:
+/*! @arg type is a pointer to a null-terminated string which must contain
+ either the letter 'r' for reading or the letter 'w' for writing.
+ */
+ process (const char* cmd, const char* type);
+ ~process ();
+
+ FILE* pipe () { return io_; }
+ int error() { return err_; }
+ int wait ();
+ const char* cmd() { return str_; }
+};
+#ifdef REMOVED
+class lock
+{
+ pthread_mutex_t* const mtx_;
+
+public:
+
+ lock (pthread_mutex_t* mtx) : mtx_(mtx)
+ {
+ int err = pthread_mutex_lock (mtx_);
+
+ if (err)
+ {
+ WSREP_ERROR("Mutex lock failed: %s", strerror(err));
+ abort();
+ }
+ }
+
+ virtual ~lock ()
+ {
+ int err = pthread_mutex_unlock (mtx_);
+
+ if (err)
+ {
+ WSREP_ERROR("Mutex unlock failed: %s", strerror(err));
+ abort();
+ }
+ }
+
+ inline void wait (pthread_cond_t* cond)
+ {
+ pthread_cond_wait (cond, mtx_);
+ }
+
+private:
+
+ lock (const lock&);
+ lock& operator=(const lock&);
+
+};
+
+class monitor
+{
+ int mutable refcnt;
+ pthread_mutex_t mutable mtx;
+ pthread_cond_t mutable cond;
+
+public:
+
+ monitor() : refcnt(0)
+ {
+ pthread_mutex_init (&mtx, NULL);
+ pthread_cond_init (&cond, NULL);
+ }
+
+ ~monitor()
+ {
+ pthread_mutex_destroy (&mtx);
+ pthread_cond_destroy (&cond);
+ }
+
+ void enter() const
+ {
+ lock l(&mtx);
+
+ while (refcnt)
+ {
+ l.wait(&cond);
+ }
+ refcnt++;
+ }
+
+ void leave() const
+ {
+ lock l(&mtx);
+
+ refcnt--;
+ if (refcnt == 0)
+ {
+ pthread_cond_signal (&cond);
+ }
+ }
+
+private:
+
+ monitor (const monitor&);
+ monitor& operator= (const monitor&);
+};
+
+class critical
+{
+ const monitor& mon;
+
+public:
+
+ critical(const monitor& m) : mon(m) { mon.enter(); }
+
+ ~critical() { mon.leave(); }
+
+private:
+
+ critical (const critical&);
+ critical& operator= (const critical&);
+};
+#endif
+
+class thd
+{
+ class thd_init
+ {
+ public:
+ thd_init() { my_thread_init(); }
+ ~thd_init() { my_thread_end(); }
+ }
+ init;
+
+ thd (const thd&);
+ thd& operator= (const thd&);
+
+public:
+
+ thd();
+ ~thd();
+ THD* const ptr;
+};
+
+class string
+{
+public:
+ string() : string_(0) {}
+ void set(char* str) { if (string_) free (string_); string_ = str; }
+ ~string() { set (0); }
+private:
+ char* string_;
+};
+
+} // namespace wsrep
+#endif /* WSREP_PRIV_H */
diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc
new file mode 100644
index 00000000000..4ce7fe84b0f
--- /dev/null
+++ b/sql/wsrep_sst.cc
@@ -0,0 +1,932 @@
+/* Copyright 2008-2011 Codership Oy <http://www.codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <mysqld.h>
+#include <sql_class.h>
+#include <set_var.h>
+#include <sql_acl.h>
+#include <sql_reload.h>
+#include <sql_parse.h>
+#include "wsrep_priv.h"
+#include <cstdio>
+#include <cstdlib>
+
+extern const char wsrep_defaults_file[];
+
+#define WSREP_SST_MYSQLDUMP "mysqldump"
+#define WSREP_SST_DEFAULT WSREP_SST_MYSQLDUMP
+#define WSREP_SST_ADDRESS_AUTO "AUTO"
+#define WSREP_SST_AUTH_MASK "********"
+
+const char* wsrep_sst_method = WSREP_SST_DEFAULT;
+const char* wsrep_sst_receive_address = WSREP_SST_ADDRESS_AUTO;
+const char* wsrep_sst_donor = "";
+ char* wsrep_sst_auth = NULL;
+
+// container for real auth string
+static const char* sst_auth_real = NULL;
+
+static const char *sst_methods[] = {
+ "mysqldump",
+ "rsync",
+ "rsync_wan",
+ "xtrabackup",
+ NULL
+};
+
+bool wsrep_sst_method_check (sys_var *self, THD* thd, set_var* var)
+{
+ char buff[FN_REFLEN];
+ String str(buff, sizeof(buff), system_charset_info), *res;
+ const char* c_str = NULL;
+
+ if ((res = var->value->val_str(&str))) {
+ c_str = res->c_ptr();
+ int i = 0;
+
+ while (sst_methods[i] && strcasecmp(sst_methods[i], c_str)) i++;
+ if (!sst_methods[i]) {
+ my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "wsrep_sst_method", c_str ? c_str : "NULL");
+ return 1;
+ }
+ }
+ return 0;
+}
+
+bool wsrep_sst_method_update (sys_var *self, THD* thd, enum_var_type type)
+{
+ return 0;
+}
+
+static bool sst_receive_address_check (const char* str)
+{
+ if (!strncasecmp(str, "127.0.0.1", strlen("127.0.0.1")) ||
+ !strncasecmp(str, "localhost", strlen("localhost")))
+ {
+ return 1;
+ }
+
+ return 0;
+}
+
+bool wsrep_sst_receive_address_check (sys_var *self, THD* thd, set_var* var)
+{
+ const char* c_str = var->value->str_value.c_ptr();
+
+ if (sst_receive_address_check (c_str))
+ {
+ my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "wsrep_sst_receive_address", c_str ? c_str : "NULL");
+ return 1;
+ }
+
+ return 0;
+}
+
+bool wsrep_sst_receive_address_update (sys_var *self, THD* thd,
+ enum_var_type type)
+{
+ return 0;
+}
+
+bool wsrep_sst_auth_check (sys_var *self, THD* thd, set_var* var)
+{
+ return 0;
+}
+static bool sst_auth_real_set (const char* value)
+{
+ const char* v = strdup (value);
+
+ if (v)
+ {
+ if (sst_auth_real) free (const_cast<char*>(sst_auth_real));
+ sst_auth_real = v;
+
+ if (strlen(sst_auth_real))
+ {
+ if (wsrep_sst_auth)
+ {
+ my_free ((void*)wsrep_sst_auth);
+ wsrep_sst_auth = my_strdup(WSREP_SST_AUTH_MASK, MYF(0));
+ //strncpy (wsrep_sst_auth, WSREP_SST_AUTH_MASK,
+ // sizeof(wsrep_sst_auth) - 1);
+ }
+ else
+ wsrep_sst_auth = my_strdup (WSREP_SST_AUTH_MASK, MYF(0));
+ }
+ return 0;
+ }
+
+ return 1;
+}
+
+bool wsrep_sst_auth_update (sys_var *self, THD* thd, enum_var_type type)
+{
+ return sst_auth_real_set (wsrep_sst_auth);
+}
+
+void wsrep_sst_auth_init (const char* value)
+{
+ if (wsrep_sst_auth == value) wsrep_sst_auth = NULL;
+ if (value) sst_auth_real_set (value);
+}
+
+bool wsrep_sst_donor_check (sys_var *self, THD* thd, set_var* var)
+{
+ return 0;
+}
+
+bool wsrep_sst_donor_update (sys_var *self, THD* thd, enum_var_type type)
+{
+ return 0;
+}
+
+static wsrep_uuid_t cluster_uuid = WSREP_UUID_UNDEFINED;
+
+bool wsrep_init_first()
+{
+ return (wsrep_provider != NULL
+ && strcmp (wsrep_provider, WSREP_NONE)
+ && strcmp (wsrep_sst_method, WSREP_SST_MYSQLDUMP));
+}
+
+static bool sst_complete = false;
+static bool sst_needed = false;
+
+void wsrep_sst_grab ()
+{
+ WSREP_INFO("wsrep_sst_grab()");
+ if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
+ sst_complete = false;
+ mysql_mutex_unlock (&LOCK_wsrep_sst);
+}
+
+// Wait for end of SST
+bool wsrep_sst_wait ()
+{
+ if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
+ while (!sst_complete)
+ {
+ WSREP_INFO("Waiting for SST to complete.");
+ mysql_cond_wait (&COND_wsrep_sst, &LOCK_wsrep_sst);
+ }
+
+ if (local_seqno >= 0)
+ {
+ WSREP_INFO("SST complete, seqno: %lld", (long long) local_seqno);
+ }
+ else
+ {
+ WSREP_ERROR("SST failed: %d (%s)",
+ int(-local_seqno), strerror(-local_seqno));
+ }
+
+ mysql_mutex_unlock (&LOCK_wsrep_sst);
+
+ return (local_seqno >= 0);
+}
+
+// Signal end of SST
+void wsrep_sst_complete (wsrep_uuid_t* sst_uuid,
+ wsrep_seqno_t sst_seqno,
+ bool needed)
+{
+ if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
+ if (!sst_complete)
+ {
+ sst_complete = true;
+ sst_needed = needed;
+ local_uuid = *sst_uuid;
+ local_seqno = sst_seqno;
+ mysql_cond_signal (&COND_wsrep_sst);
+ }
+ else
+ {
+ WSREP_WARN("Nobody is waiting for SST.");
+ }
+ mysql_mutex_unlock (&LOCK_wsrep_sst);
+}
+
+// Let applier threads to continue
+void wsrep_sst_continue ()
+{
+ if (sst_needed)
+ {
+ WSREP_INFO("Signalling provider to continue.");
+ wsrep->sst_received (wsrep, &local_uuid, local_seqno, NULL, 0);
+ }
+}
+
+struct sst_thread_arg
+{
+ const char* cmd;
+ int err;
+ char* ret_str;
+ mysql_mutex_t lock;
+ mysql_cond_t cond;
+
+ sst_thread_arg (const char* c) : cmd(c), err(-1), ret_str(0)
+ {
+ mysql_mutex_init(key_LOCK_wsrep_sst_thread,
+ &lock, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wsrep_sst_thread, &cond, NULL);
+ }
+
+ ~sst_thread_arg()
+ {
+ mysql_cond_destroy (&cond);
+ mysql_mutex_unlock (&lock);
+ mysql_mutex_destroy (&lock);
+ }
+};
+
+static int sst_scan_uuid_seqno (const char* str,
+ wsrep_uuid_t* uuid, wsrep_seqno_t* seqno)
+{
+ int offt = wsrep_uuid_scan (str, strlen(str), uuid);
+ if (offt > 0 && strlen(str) > (unsigned int)offt && ':' == str[offt])
+ {
+ *seqno = strtoll (str + offt + 1, NULL, 10);
+ if (*seqno != LLONG_MAX || errno != ERANGE)
+ {
+ return 0;
+ }
+ }
+
+ WSREP_ERROR("Failed to parse uuid:seqno pair: '%s'", str);
+ return EINVAL;
+}
+
+// get rid of trailing \n
+static char* my_fgets (char* buf, size_t buf_len, FILE* stream)
+{
+ char* ret= fgets (buf, buf_len, stream);
+
+ if (ret)
+ {
+ size_t len = strlen(ret);
+ if (len > 0 && ret[len - 1] == '\n') ret[len - 1] = '\0';
+ }
+
+ return ret;
+}
+
+static void* sst_joiner_thread (void* a)
+{
+ sst_thread_arg* arg= (sst_thread_arg*) a;
+ int err= 1;
+
+ {
+ const char magic[] = "ready";
+ const size_t magic_len = sizeof(magic) - 1;
+ const size_t out_len = 512;
+ char out[out_len];
+
+ WSREP_INFO("Running: '%s'", arg->cmd);
+
+ wsp::process proc (arg->cmd, "r");
+
+ if (proc.pipe() && !proc.error())
+ {
+ const char* tmp= my_fgets (out, out_len, proc.pipe());
+
+ if (!tmp || strlen(tmp) < (magic_len + 2) ||
+ strncasecmp (tmp, magic, magic_len))
+ {
+ WSREP_ERROR("Failed to read '%s <addr>' from: %s\n\tRead: '%s'",
+ magic, arg->cmd, tmp);
+ proc.wait();
+ if (proc.error()) err = proc.error();
+ }
+ else
+ {
+ err = 0;
+ }
+ }
+ else
+ {
+ err = proc.error();
+ WSREP_ERROR("Failed to execute: %s : %d (%s)",
+ arg->cmd, err, strerror(err));
+ }
+
+ // signal sst_prepare thread with ret code,
+ // it will go on sending SST request
+ mysql_mutex_lock (&arg->lock);
+ if (!err)
+ {
+ arg->ret_str = strdup (out + magic_len + 1);
+ if (!arg->ret_str) err = ENOMEM;
+ }
+ arg->err = -err;
+ mysql_cond_signal (&arg->cond);
+ mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that.
+
+ if (err) return NULL; /* lp:808417 - return immediately, don't signal
+ * initializer thread to ensure single thread of
+ * shutdown. */
+
+ wsrep_uuid_t ret_uuid = WSREP_UUID_UNDEFINED;
+ wsrep_seqno_t ret_seqno = WSREP_SEQNO_UNDEFINED;
+
+ // in case of successfull receiver start, wait for SST completion/end
+ char* tmp = my_fgets (out, out_len, proc.pipe());
+
+ proc.wait();
+ err= EINVAL;
+
+ if (!tmp)
+ {
+ WSREP_ERROR("Failed to read uuid:seqno from joiner script.");
+ if (proc.error()) err = proc.error();
+ }
+ else
+ {
+ err= sst_scan_uuid_seqno (out, &ret_uuid, &ret_seqno);
+ }
+
+ if (err)
+ {
+ ret_uuid= WSREP_UUID_UNDEFINED;
+ ret_seqno= -err;
+ }
+
+ // Tell initializer thread that SST is complete
+ wsrep_sst_complete (&ret_uuid, ret_seqno, true);
+ }
+
+ return NULL;
+}
+
+static ssize_t sst_prepare_other (const char* method,
+ const char* addr_in,
+ const char** addr_out)
+{
+ ssize_t cmd_len= 1024;
+ char cmd_str[cmd_len];
+ const char* sst_dir= mysql_real_data_home;
+
+ int ret= snprintf (cmd_str, cmd_len,
+ "wsrep_sst_%s 'joiner' '%s' '%s' '%s' '%s' '%d' 2>sst.err",
+ method, addr_in, (sst_auth_real) ? sst_auth_real : "",
+ sst_dir, wsrep_defaults_file, (int)getpid());
+
+ if (ret < 0 || ret >= cmd_len)
+ {
+ WSREP_ERROR("sst_prepare_other(): snprintf() failed: %d", ret);
+ return (ret < 0 ? ret : -EMSGSIZE);
+ }
+
+ pthread_t tmp;
+ sst_thread_arg arg(cmd_str);
+ mysql_mutex_lock (&arg.lock);
+ pthread_create (&tmp, NULL, sst_joiner_thread, &arg);
+ mysql_cond_wait (&arg.cond, &arg.lock);
+
+ *addr_out= arg.ret_str;
+
+ if (!arg.err)
+ ret = strlen(*addr_out);
+ else
+ {
+ assert (arg.err < 0);
+ ret = arg.err;
+ }
+
+ pthread_detach (tmp);
+
+ return ret;
+}
+
+//extern ulong my_bind_addr;
+extern uint mysqld_port;
+
+/*! Just tells donor where ti sent mysqldump */
+static ssize_t sst_prepare_mysqldump (const char* addr_in,
+ const char** addr_out)
+{
+ ssize_t ret = strlen (addr_in);
+
+ if (!strrchr(addr_in, ':'))
+ {
+ ssize_t s = ret + 7;
+ char* tmp = (char*) malloc (s);
+
+ if (tmp)
+ {
+ ret= snprintf (tmp, s, "%s:%u", addr_in, mysqld_port);
+
+ if (ret > 0 && ret < s)
+ {
+ *addr_out= tmp;
+ return ret;
+ }
+ if (ret > 0) /* buffer too short */ ret = -EMSGSIZE;
+ free (tmp);
+ }
+ else {
+ ret= -ENOMEM;
+ }
+
+ sql_print_error ("WSREP: Could not prepare state transfer request: "
+ "adding default port failed: %zd.", ret);
+ }
+ else {
+ *addr_out= addr_in;
+ }
+
+ return ret;
+}
+
+static bool SE_initialized = false;
+
+ssize_t wsrep_sst_prepare (void** msg)
+{
+ const ssize_t ip_max= 256;
+ char ip_buf[ip_max];
+ const char* addr_in= NULL;
+ const char* addr_out= NULL;
+
+ // Figure out SST address. Common for all SST methods
+ if (wsrep_sst_receive_address &&
+ strcmp (wsrep_sst_receive_address, WSREP_SST_ADDRESS_AUTO))
+ {
+ addr_in= wsrep_sst_receive_address;
+ }
+ else if (wsrep_node_address && strlen(wsrep_node_address))
+ {
+ const char* const colon= strchr (wsrep_node_address, ':');
+ if (colon)
+ {
+ ptrdiff_t const len= colon - wsrep_node_address;
+ strncpy (ip_buf, wsrep_node_address, len);
+ ip_buf[len]= '\0';
+ addr_in= ip_buf;
+ }
+ else
+ {
+ addr_in= wsrep_node_address;
+ }
+ }
+ else
+ {
+ ssize_t ret= default_ip (ip_buf, ip_max);
+
+ if (ret && ret < ip_max)
+ {
+ addr_in= ip_buf;
+ }
+ else
+ {
+ WSREP_ERROR("Could not prepare state transfer request: "
+ "failed to guess address to accept state transfer at. "
+ "wsrep_sst_receive_address must be set manually.");
+ unireg_abort(1);
+ }
+ }
+
+ ssize_t addr_len= -ENOSYS;
+ if (!strcmp(wsrep_sst_method, WSREP_SST_MYSQLDUMP))
+ {
+ addr_len= sst_prepare_mysqldump (addr_in, &addr_out);
+ if (addr_len < 0) unireg_abort(1);
+ }
+ else
+ {
+ /*! A heuristic workaround until we learn how to stop and start engines */
+ if (SE_initialized)
+ {
+ // we already did SST at initializaiton, now engines are running
+ // sql_print_information() is here because the message is too long
+ // for WSREP_INFO.
+ sql_print_information ("WSREP: "
+ "You have configured '%s' state snapshot transfer method "
+ "which cannot be performed on a running server. "
+ "Wsrep provider won't be able to fall back to it "
+ "if other means of state transfer are unavailable. "
+ "In that case you will need to restart the server.",
+ wsrep_sst_method);
+ *msg = 0;
+ return 0;
+ }
+
+ addr_len = sst_prepare_other (wsrep_sst_method, addr_in, &addr_out);
+ if (addr_len < 0)
+ {
+ WSREP_ERROR("Failed to prepare for '%s' SST. Unrecoverable.",
+ wsrep_sst_method);
+ unireg_abort(1);
+ }
+ }
+
+ size_t const method_len(strlen(wsrep_sst_method));
+ size_t const msg_len (method_len + addr_len + 2 /* + auth_len + 1*/);
+
+ *msg = malloc (msg_len);
+ if (NULL != *msg) {
+ char* const method_ptr(reinterpret_cast<char*>(*msg));
+ strcpy (method_ptr, wsrep_sst_method);
+ char* const addr_ptr(method_ptr + method_len + 1);
+ strcpy (addr_ptr, addr_out);
+
+ WSREP_INFO ("Prepared SST request: %s|%s", method_ptr, addr_ptr);
+ }
+ else {
+ WSREP_ERROR("Failed to allocate SST request of size %zu. Can't continue.",
+ msg_len);
+ unireg_abort(1);
+ }
+
+ if (addr_out != addr_in) /* malloc'ed */ free ((char*)addr_out);
+
+ return msg_len;
+}
+
+// helper method for donors
+static int sst_run_shell (const char* cmd_str, int max_tries)
+{
+ int ret = 0;
+
+ for (int tries=1; tries <= max_tries; tries++)
+ {
+ wsp::process proc (cmd_str, "r");
+
+ if (NULL != proc.pipe())
+ {
+ proc.wait();
+ }
+
+ if ((ret = proc.error()))
+ {
+ WSREP_ERROR("Try %d/%d: '%s' failed: %d (%s)",
+ tries, max_tries, proc.cmd(), ret, strerror(ret));
+ sleep (1);
+ }
+ else
+ {
+ WSREP_DEBUG("SST script successfully completed.");
+ break;
+ }
+ }
+
+ return -ret;
+}
+
+static int sst_mysqldump_check_addr (const char* user, const char* pswd,
+ const char* host, const char* port)
+{
+ return 0;
+}
+
+static int sst_donate_mysqldump (const char* addr,
+ const wsrep_uuid_t* uuid,
+ const char* uuid_str,
+ wsrep_seqno_t seqno,
+ bool bypass)
+{
+ size_t host_len;
+ const char* port = strchr (addr, ':');
+
+ if (port)
+ {
+ port += 1;
+ host_len = port - addr;
+ }
+ else
+ {
+ port = "";
+ host_len = strlen (addr) + 1;
+ }
+
+ char host[host_len];
+
+ strncpy (host, addr, host_len - 1);
+ host[host_len - 1] = '\0';
+
+ const char* auth = sst_auth_real;
+ const char* pswd = (auth) ? strchr (auth, ':') : NULL;
+ size_t user_len;
+
+ if (pswd)
+ {
+ pswd += 1;
+ user_len = pswd - auth;
+ }
+ else
+ {
+ pswd = "";
+ user_len = (auth) ? strlen (auth) + 1 : 1;
+ }
+
+ char user[user_len];
+
+ strncpy (user, (auth) ? auth : "", user_len - 1);
+ user[user_len - 1] = '\0';
+
+ int ret = sst_mysqldump_check_addr (user, pswd, host, port);
+ if (!ret)
+ {
+ size_t cmd_len= 1024;
+ char cmd_str[cmd_len];
+
+ snprintf (cmd_str, cmd_len,
+ "wsrep_sst_mysqldump '%s' '%s' '%s' '%s' '%u' '%s' '%lld' '%d'",
+ user, pswd, host, port, mysqld_port, uuid_str, (long long)seqno,
+ bypass);
+
+ WSREP_DEBUG("Running: '%s'", cmd_str);
+
+ ret= sst_run_shell (cmd_str, 3);
+ }
+
+ wsrep->sst_sent (wsrep, uuid, ret ? ret : seqno);
+
+ return ret;
+}
+
+wsrep_seqno_t wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED;
+
+static int run_sql_command(THD *thd, const char *query)
+{
+ thd->set_query((char *)query, strlen(query));
+
+ Parser_state ps;
+ if (ps.init(thd, thd->query(), thd->query_length()))
+ {
+ WSREP_ERROR("SST query: %s failed", query);
+ return -1;
+ }
+
+ mysql_parse(thd, thd->query(), thd->query_length(), &ps);
+ if (thd->is_error())
+ {
+ int const err= thd->stmt_da->sql_errno();
+ WSREP_WARN ("error executing '%s': %d (%s)%s",
+ query, err, thd->stmt_da->message(),
+ err == ER_UNKNOWN_SYSTEM_VARIABLE ?
+ ". Was mysqld built with --with-innodb-disallow-writes ?" : "");
+ thd->clear_error();
+ return -1;
+ }
+ return 0;
+}
+
+static int sst_flush_tables(THD* thd)
+{
+ WSREP_INFO("Flushing tables for SST...");
+
+ int err;
+ int not_used;
+ if (run_sql_command(thd, "FLUSH TABLES WITH READ LOCK"))
+ {
+ WSREP_ERROR("Failed to flush and lock tables");
+ err = -1;
+ }
+ else
+ {
+ /* make sure logs are flushed after global read lock acquired */
+ err= reload_acl_and_cache(thd, REFRESH_ENGINE_LOG,
+ (TABLE_LIST*) 0, &not_used);
+ }
+
+ if (err)
+ {
+ WSREP_ERROR("Failed to flush tables: %d (%s)", err, strerror(err));
+ }
+ else
+ {
+ WSREP_INFO("Tables flushed.");
+ const char base_name[]= "tables_flushed";
+ ssize_t const full_len= strlen(mysql_real_data_home) + strlen(base_name)+2;
+ char real_name[full_len];
+ sprintf(real_name, "%s/%s", mysql_real_data_home, base_name);
+ char tmp_name[full_len + 4];
+ sprintf(tmp_name, "%s.tmp", real_name);
+
+ FILE* file= fopen(tmp_name, "w+");
+ if (0 == file)
+ {
+ err= errno;
+ WSREP_ERROR("Failed to open '%s': %d (%s)", tmp_name, err,strerror(err));
+ }
+ else
+ {
+ fprintf(file, "%s:%lld\n",
+ wsrep_cluster_state_uuid, (long long)wsrep_locked_seqno);
+ fsync(fileno(file));
+ fclose(file);
+ if (rename(tmp_name, real_name) == -1)
+ {
+ err= errno;
+ WSREP_ERROR("Failed to rename '%s' to '%s': %d (%s)",
+ tmp_name, real_name, err,strerror(err));
+ }
+ }
+ }
+
+ return err;
+}
+
+static void sst_disallow_writes (THD* thd, bool yes)
+{
+ char query_str[64] = { 0, };
+ ssize_t const query_max = sizeof(query_str) - 1;
+ snprintf (query_str, query_max, "SET GLOBAL innodb_disallow_writes=%d",
+ yes ? 1 : 0);
+
+ if (run_sql_command(thd, query_str))
+ {
+ WSREP_ERROR("Failed to disallow InnoDB writes");
+ }
+}
+
+static void* sst_donor_thread (void* a)
+{
+ sst_thread_arg* arg= (sst_thread_arg*)a;
+
+ WSREP_INFO("Running: '%s'", arg->cmd);
+
+ int err= 1;
+ bool locked= false;
+
+ const char* out= NULL;
+ const size_t out_len= 128;
+ char out_buf[out_len];
+
+ wsrep_uuid_t ret_uuid= WSREP_UUID_UNDEFINED;
+ wsrep_seqno_t ret_seqno= WSREP_SEQNO_UNDEFINED; // seqno of complete SST
+
+ wsp::thd thd;
+ wsp::process proc(arg->cmd, "r");
+
+ err= proc.error();
+
+/* Inform server about SST script startup and release TO isolation */
+ mysql_mutex_lock (&arg->lock);
+ arg->err = -err;
+ mysql_cond_signal (&arg->cond);
+ mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that.
+
+ if (proc.pipe() && !err)
+ {
+wait_signal:
+ out= my_fgets (out_buf, out_len, proc.pipe());
+
+ if (out)
+ {
+ const char magic_flush[]= "flush tables";
+ const char magic_cont[]= "continue";
+ const char magic_done[]= "done";
+
+ if (!strcasecmp (out, magic_flush))
+ {
+ err= sst_flush_tables (thd.ptr);
+ if (!err)
+ {
+ sst_disallow_writes (thd.ptr, true);
+ locked= true;
+ goto wait_signal;
+ }
+ }
+ else if (!strcasecmp (out, magic_cont))
+ {
+ if (locked)
+ {
+ sst_disallow_writes (thd.ptr, false);
+ thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr);
+ locked= false;
+ }
+ err= 0;
+ goto wait_signal;
+ }
+ else if (!strncasecmp (out, magic_done, strlen(magic_done)))
+ {
+ err= sst_scan_uuid_seqno (out + strlen(magic_done) + 1,
+ &ret_uuid, &ret_seqno);
+ }
+ else
+ {
+ WSREP_WARN("Received unknown signal: '%s'", out);
+ }
+ }
+ else
+ {
+ WSREP_ERROR("Failed to read from: %s", proc.cmd());
+ }
+ if (err && proc.error()) err= proc.error();
+ }
+ else
+ {
+ WSREP_ERROR("Failed to execute: %s : %d (%s)",
+ proc.cmd(), err, strerror(err));
+ }
+
+ if (locked) // don't forget to unlock server before return
+ {
+ sst_disallow_writes (thd.ptr, false);
+ thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr);
+ }
+
+ // signal to donor that SST is over
+ wsrep->sst_sent (wsrep, &ret_uuid, err ? -err : ret_seqno);
+ proc.wait();
+
+ return NULL;
+}
+
+static int sst_donate_other (const char* method,
+ const char* addr,
+ const char* uuid,
+ wsrep_seqno_t seqno,
+ bool bypass)
+{
+ ssize_t cmd_len = 4096;
+ char cmd_str[cmd_len];
+
+ int ret= snprintf (cmd_str, cmd_len,
+ "wsrep_sst_%s 'donor' '%s' '%s' '%s' '%s' '%s' '%lld' '%d'"
+ ,
+ method, addr, sst_auth_real, mysql_real_data_home,
+ wsrep_defaults_file, uuid, (long long) seqno, bypass);
+
+ if (ret < 0 || ret >= cmd_len)
+ {
+ WSREP_ERROR("sst_donate_other(): snprintf() failed: %d", ret);
+ return (ret < 0 ? ret : -EMSGSIZE);
+ }
+
+ pthread_t tmp;
+ sst_thread_arg arg(cmd_str);
+ mysql_mutex_lock (&arg.lock);
+ pthread_create (&tmp, NULL, sst_donor_thread, &arg);
+ mysql_cond_wait (&arg.cond, &arg.lock);
+
+ WSREP_INFO("sst_donor_thread signaled with %d", arg.err);
+ return arg.err;
+}
+
+int wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx,
+ const void* msg, size_t msg_len,
+ const wsrep_uuid_t* current_uuid,
+ wsrep_seqno_t current_seqno,
+ const char* state, size_t state_len,
+ bool bypass)
+{
+ /* This will be reset when sync callback is called.
+ * Should we set wsrep_ready to FALSE here too? */
+// wsrep_notify_status(WSREP_MEMBER_DONOR);
+ local_status.set(WSREP_MEMBER_DONOR);
+
+ const char* method = (char*)msg;
+ size_t method_len = strlen (method);
+ const char* data = method + method_len + 1;
+
+ char uuid_str[37];
+ wsrep_uuid_print (current_uuid, uuid_str, sizeof(uuid_str));
+
+ int ret;
+ if (!strcmp (WSREP_SST_MYSQLDUMP, method))
+ {
+ ret = sst_donate_mysqldump (data, current_uuid, uuid_str, current_seqno,
+ bypass);
+ }
+ else
+ {
+ ret = sst_donate_other (method, data, uuid_str, current_seqno, bypass);
+ }
+
+ return (ret > 0 ? 0 : ret);
+}
+
+void wsrep_SE_init_grab()
+{
+ if (mysql_mutex_lock (&LOCK_wsrep_sst_init)) abort();
+}
+
+void wsrep_SE_init_wait()
+{
+ mysql_cond_wait (&COND_wsrep_sst_init, &LOCK_wsrep_sst_init);
+ mysql_mutex_unlock (&LOCK_wsrep_sst_init);
+}
+
+void wsrep_SE_init_done()
+{
+ mysql_cond_signal (&COND_wsrep_sst_init);
+ mysql_mutex_unlock (&LOCK_wsrep_sst_init);
+}
+
+void wsrep_SE_initialized()
+{
+ SE_initialized = true;
+}
diff --git a/sql/wsrep_utils.cc b/sql/wsrep_utils.cc
new file mode 100644
index 00000000000..f39353eda44
--- /dev/null
+++ b/sql/wsrep_utils.cc
@@ -0,0 +1,447 @@
+/* Copyright 2010 Codership Oy <http://www.codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+//! @file declares symbols private to wsrep integration layer
+
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE // POSIX_SPAWN_USEVFORK flag
+#endif
+
+#include <spawn.h> // posix_spawn()
+#include <unistd.h> // pipe()
+#include <errno.h> // errno
+#include <string.h> // strerror()
+#include <sys/wait.h> // waitpid()
+
+#include <sql_class.h>
+#include "wsrep_priv.h"
+
+extern char** environ; // environment variables
+
+static wsp::string wsrep_PATH;
+
+void
+wsrep_prepend_PATH (const char* path)
+{
+ int count = 0;
+
+ while (environ[count])
+ {
+ if (strncmp (environ[count], "PATH=", 5))
+ {
+ count++;
+ continue;
+ }
+
+ char* const old_path (environ[count]);
+
+ if (strstr (old_path, path)) return; // path already there
+
+ size_t const new_path_len(strlen(old_path) + strlen(":") +
+ strlen(path) + 1);
+
+ char* const new_path (reinterpret_cast<char*>(malloc(new_path_len)));
+
+ if (new_path)
+ {
+ snprintf (new_path, new_path_len, "PATH=%s:%s", path,
+ old_path + strlen("PATH="));
+
+ wsrep_PATH.set (new_path);
+ environ[count] = new_path;
+ }
+ else
+ {
+ WSREP_ERROR ("Failed to allocate 'PATH' environment variable "
+ "buffer of size %zu.", new_path_len);
+ }
+
+ return;
+ }
+
+ WSREP_ERROR ("Failed to find 'PATH' environment variable. "
+ "State snapshot transfer may not be working.");
+}
+
+namespace wsp
+{
+
+#define PIPE_READ 0
+#define PIPE_WRITE 1
+#define STDIN_FD 0
+#define STDOUT_FD 1
+
+#ifndef POSIX_SPAWN_USEVFORK
+# define POSIX_SPAWN_USEVFORK 0
+#endif
+
+process::process (const char* cmd, const char* type)
+ : str_(cmd ? strdup(cmd) : strdup("")), io_(NULL), err_(EINVAL), pid_(0)
+{
+ if (0 == str_)
+ {
+ WSREP_ERROR ("Can't allocate command line of size: %zu", strlen(cmd));
+ err_ = ENOMEM;
+ return;
+ }
+
+ if (0 == strlen(str_))
+ {
+ WSREP_ERROR ("Can't start a process: null or empty command line.");
+ return;
+ }
+
+ if (NULL == type || (strcmp (type, "w") && strcmp(type, "r")))
+ {
+ WSREP_ERROR ("type argument should be either \"r\" or \"w\".");
+ return;
+ }
+
+ int pipe_fds[2] = { -1, };
+ if (::pipe(pipe_fds))
+ {
+ err_ = errno;
+ WSREP_ERROR ("pipe() failed: %d (%s)", err_, strerror(err_));
+ return;
+ }
+
+ // which end of pipe will be returned to parent
+ int const parent_end (strcmp(type,"w") ? PIPE_READ : PIPE_WRITE);
+ int const child_end (parent_end == PIPE_READ ? PIPE_WRITE : PIPE_READ);
+ int const close_fd (parent_end == PIPE_READ ? STDOUT_FD : STDIN_FD);
+
+ char* const pargv[4] = { strdup("sh"), strdup("-c"), strdup(str_), NULL };
+ if (!(pargv[0] && pargv[1] && pargv[2]))
+ {
+ err_ = ENOMEM;
+ WSREP_ERROR ("Failed to allocate pargv[] array.");
+ goto cleanup_pipe;
+ }
+
+ posix_spawnattr_t attr;
+ err_ = posix_spawnattr_init (&attr);
+ if (err_)
+ {
+ WSREP_ERROR ("posix_spawnattr_init() failed: %d (%s)",
+ err_, strerror(err_));
+ goto cleanup_pipe;
+ }
+
+ err_ = posix_spawnattr_setflags (&attr, POSIX_SPAWN_SETSIGDEF |
+ POSIX_SPAWN_USEVFORK);
+ if (err_)
+ {
+ WSREP_ERROR ("posix_spawnattr_setflags() failed: %d (%s)",
+ err_, strerror(err_));
+ goto cleanup_attr;
+ }
+
+ posix_spawn_file_actions_t fact;
+ err_ = posix_spawn_file_actions_init (&fact);
+ if (err_)
+ {
+ WSREP_ERROR ("posix_spawn_file_actions_init() failed: %d (%s)",
+ err_, strerror(err_));
+ goto cleanup_attr;
+ }
+
+ // close child's stdout|stdin depending on what we returning
+ err_ = posix_spawn_file_actions_addclose (&fact, close_fd);
+ if (err_)
+ {
+ WSREP_ERROR ("posix_spawn_file_actions_addclose() failed: %d (%s)",
+ err_, strerror(err_));
+ goto cleanup_fact;
+ }
+
+ // substitute our pipe descriptor in place of the closed one
+ err_ = posix_spawn_file_actions_adddup2 (&fact,
+ pipe_fds[child_end], close_fd);
+ if (err_)
+ {
+ WSREP_ERROR ("posix_spawn_file_actions_addup2() failed: %d (%s)",
+ err_, strerror(err_));
+ goto cleanup_fact;
+ }
+
+ err_ = posix_spawnp (&pid_, pargv[0], &fact, &attr, pargv, environ);
+ if (err_)
+ {
+ WSREP_ERROR ("posix_spawnp(%s) failed: %d (%s)",
+ pargv[2], err_, strerror(err_));
+ pid_ = 0; // just to make sure it was not messed up in the call
+ goto cleanup_fact;
+ }
+
+ io_ = fdopen (pipe_fds[parent_end], type);
+
+ if (io_)
+ {
+ pipe_fds[parent_end] = -1; // skip close on cleanup
+ }
+ else
+ {
+ err_ = errno;
+ WSREP_ERROR ("fdopen() failed: %d (%s)", err_, strerror(err_));
+ }
+
+cleanup_fact:
+ int err; // to preserve err_ code
+ err = posix_spawn_file_actions_destroy (&fact);
+ if (err)
+ {
+ WSREP_ERROR ("posix_spawn_file_actions_destroy() failed: %d (%s)\n",
+ err, strerror(err));
+ }
+
+cleanup_attr:
+ err = posix_spawnattr_destroy (&attr);
+ if (err)
+ {
+ WSREP_ERROR ("posix_spawnattr_destroy() failed: %d (%s)",
+ err, strerror(err));
+ }
+
+cleanup_pipe:
+ if (pipe_fds[0] >= 0) close (pipe_fds[0]);
+ if (pipe_fds[1] >= 0) close (pipe_fds[1]);
+
+ free (pargv[0]);
+ free (pargv[1]);
+ free (pargv[2]);
+}
+
+process::~process ()
+{
+ if (io_)
+ {
+ assert (pid_);
+ assert (str_);
+
+ WSREP_WARN("Closing pipe to child process: %s, PID(%ld) "
+ "which might still be running.", str_, (long)pid_);
+
+ if (fclose (io_) == -1)
+ {
+ err_ = errno;
+ WSREP_ERROR("fclose() failed: %d (%s)", err_, strerror(err_));
+ }
+ }
+
+ if (str_) free (const_cast<char*>(str_));
+}
+
+int
+process::wait ()
+{
+ if (pid_)
+ {
+ int status;
+ if (-1 == waitpid(pid_, &status, 0))
+ {
+ err_ = errno; assert (err_);
+ WSREP_ERROR("Waiting for process failed: %s, PID(%ld): %d (%s)",
+ str_, (long)pid_, err_, strerror (err_));
+ }
+ else
+ { // command completed, check exit status
+ if (WIFEXITED (status)) {
+ err_ = WEXITSTATUS (status);
+ }
+ else { // command didn't complete with exit()
+ WSREP_ERROR("Process was aborted.");
+ err_ = errno ? errno : ECHILD;
+ }
+
+ if (err_) {
+ switch (err_) /* Translate error codes to more meaningful */
+ {
+ case 126: err_ = EACCES; break; /* Permission denied */
+ case 127: err_ = ENOENT; break; /* No such file or directory */
+ }
+ WSREP_ERROR("Process completed with error: %s: %d (%s)",
+ str_, err_, strerror(err_));
+ }
+
+ pid_ = 0;
+ if (io_) fclose (io_);
+ io_ = NULL;
+ }
+ }
+ else {
+ assert (NULL == io_);
+ WSREP_ERROR("Command did not run: %s", str_);
+ }
+
+ return err_;
+}
+
+thd::thd () : init(), ptr(new THD)
+{
+ if (ptr)
+ {
+ ptr->thread_stack= (char*) &ptr;
+ ptr->store_globals();
+ ptr->variables.option_bits&= ~OPTION_BIN_LOG; // disable binlog
+ ptr->security_ctx->master_access= ~(ulong)0;
+ lex_start(ptr);
+ }
+}
+
+thd::~thd ()
+{
+ if (ptr)
+ {
+ delete ptr;
+ my_pthread_setspecific_ptr (THR_THD, 0);
+ }
+}
+
+} // namespace wsp
+
+extern ulong my_bind_addr;
+extern uint mysqld_port;
+
+size_t default_ip (char* buf, size_t buf_len)
+{
+ size_t ip_len = 0;
+
+ if (htonl(INADDR_NONE) == my_bind_addr) {
+ WSREP_ERROR("Networking not configured, cannot receive state transfer.");
+ return 0;
+ }
+
+ if (htonl(INADDR_ANY) == my_bind_addr) {
+ // binds to all interfaces, try to find the address of the first one
+#if (TARGET_OS_LINUX == 1)
+ const char cmd[] = "/sbin/ifconfig | "
+ "grep -m1 -1 -E '^[a-z]?eth[0-9]' | tail -n 1 | "
+ "awk '{ print $2 }' | awk -F : '{ print $2 }'";
+#elif defined(__sun__)
+ const char cmd[] = "/sbin/ifconfig -a | "
+ "grep -m1 -1 -E 'net[0-9]:' | tail -n 1 | awk '{ print $2 }'";
+#else
+ char *cmd;
+#error "OS not supported"
+#endif
+ wsp::process proc (cmd, "r");
+
+ if (NULL != proc.pipe()) {
+ char* ret;
+
+ ret = fgets (buf, buf_len, proc.pipe());
+
+ if (proc.wait()) return 0;
+
+ if (NULL == ret) {
+ WSREP_ERROR("Failed to read output of: '%s'", cmd);
+ return 0;
+ }
+ }
+ else {
+ WSREP_ERROR("Failed to execute: '%s'", cmd);
+ return 0;
+ }
+
+ // clear possible \n at the end of ip string left by fgets()
+ ip_len = strlen (buf);
+ if (ip_len > 0 && '\n' == buf[ip_len - 1]) {
+ ip_len--;
+ buf[ip_len] = '\0';
+ }
+
+ if (INADDR_NONE == inet_addr(buf)) {
+ if (strlen(buf) != 0) {
+ WSREP_WARN("Shell command returned invalid address: '%s'", buf);
+ }
+ return 0;
+ }
+ }
+ else {
+ uint8_t* b = (uint8_t*)&my_bind_addr;
+ ip_len = snprintf (buf, buf_len,
+ "%hhu.%hhu.%hhu.%hhu", b[0],b[1],b[2],b[3]);
+ }
+
+ return ip_len;
+}
+
+size_t default_address(char* buf, size_t buf_len)
+{
+ size_t addr_len = default_ip (buf, buf_len);
+
+ if (addr_len && addr_len < buf_len) {
+ addr_len += snprintf (buf + addr_len, buf_len - addr_len,
+ ":%u", mysqld_port);
+ }
+
+ return addr_len;
+}
+
+/*
+ * WSREPXid
+ */
+
+#define WSREP_XID_PREFIX "WSREPXid"
+#define WSREP_XID_PREFIX_LEN MYSQL_XID_PREFIX_LEN
+#define WSREP_XID_UUID_OFFSET 8
+#define WSREP_XID_SEQNO_OFFSET (WSREP_XID_UUID_OFFSET + sizeof(wsrep_uuid_t))
+#define WSREP_XID_GTRID_LEN (WSREP_XID_SEQNO_OFFSET + sizeof(wsrep_seqno_t))
+
+void wsrep_xid_init(XID* xid, const wsrep_uuid_t* uuid, wsrep_seqno_t seqno)
+{
+ xid->formatID= 1;
+ xid->gtrid_length= WSREP_XID_GTRID_LEN;
+ xid->bqual_length= 0;
+ memset(xid->data, 0, sizeof(xid->data));
+ memcpy(xid->data, WSREP_XID_PREFIX, WSREP_XID_PREFIX_LEN);
+ memcpy(xid->data + WSREP_XID_UUID_OFFSET, uuid, sizeof(wsrep_uuid_t));
+ memcpy(xid->data + WSREP_XID_SEQNO_OFFSET, &seqno, sizeof(wsrep_seqno_t));
+}
+
+const wsrep_uuid_t* wsrep_xid_uuid(const XID* xid)
+{
+ if (wsrep_is_wsrep_xid(xid))
+ return reinterpret_cast<const wsrep_uuid_t*>(xid->data
+ + WSREP_XID_UUID_OFFSET);
+ else
+ return &WSREP_UUID_UNDEFINED;
+}
+
+wsrep_seqno_t wsrep_xid_seqno(const XID* xid)
+{
+
+ if (wsrep_is_wsrep_xid(xid))
+ {
+ wsrep_seqno_t seqno;
+ memcpy(&seqno, xid->data + WSREP_XID_SEQNO_OFFSET, sizeof(wsrep_seqno_t));
+ return seqno;
+ }
+ else
+ {
+ return WSREP_SEQNO_UNDEFINED;
+ }
+}
+
+extern "C"
+int wsrep_is_wsrep_xid(const void* xid_ptr)
+{
+ const XID* xid= reinterpret_cast<const XID*>(xid_ptr);
+ return (xid->formatID == 1 &&
+ xid->gtrid_length == WSREP_XID_GTRID_LEN &&
+ xid->bqual_length == 0 &&
+ !memcmp(xid->data, WSREP_XID_PREFIX, WSREP_XID_PREFIX_LEN));
+}
diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc
new file mode 100644
index 00000000000..66f0f05c006
--- /dev/null
+++ b/sql/wsrep_var.cc
@@ -0,0 +1,505 @@
+/* Copyright 2008 Codership Oy <http://www.codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <mysqld.h>
+#include <sql_class.h>
+#include <sql_plugin.h>
+#include <set_var.h>
+#include <sql_acl.h>
+#include "wsrep_priv.h"
+#include <my_dir.h>
+#include <cstdio>
+#include <cstdlib>
+
+#define WSREP_START_POSITION_ZERO "00000000-0000-0000-0000-000000000000:-1"
+
+// trx history position to start with
+const char* wsrep_start_position = WSREP_START_POSITION_ZERO;
+const char* wsrep_provider = WSREP_NONE;
+const char* wsrep_provider_options = (const char*)my_memdup("", 1, MYF(MY_WME));
+const char* wsrep_cluster_address = NULL;
+const char* wsrep_cluster_name = "my_wsrep_cluster";
+const char* wsrep_node_name = glob_hostname;
+static char node_address[256] = { 0, };
+const char* wsrep_node_address = node_address;
+ulong wsrep_OSU_method_options;
+
+int wsrep_init_vars()
+{
+ global_system_variables.binlog_format=BINLOG_FORMAT_ROW;
+ return 0;
+}
+
+bool wsrep_on_update (sys_var *self, THD* thd, enum_var_type var_type)
+{
+ if (var_type == OPT_GLOBAL) {
+ // FIXME: this variable probably should be changed only per session
+ thd->variables.wsrep_on = global_system_variables.wsrep_on;
+ }
+ else {
+ }
+
+#ifdef REMOVED
+ if (thd->variables.wsrep_on)
+ thd->variables.option_bits |= (OPTION_BIN_LOG);
+ else
+ thd->variables.option_bits &= ~(OPTION_BIN_LOG);
+#endif
+ return false;
+}
+
+void wsrep_causal_reads_update (sys_var *self, THD* thd, enum_var_type var_type)
+{
+ if (var_type == OPT_GLOBAL) {
+ thd->variables.wsrep_causal_reads = global_system_variables.wsrep_causal_reads;
+ }
+ else {
+ }
+}
+
+static int wsrep_start_position_verify (const char* start_str)
+{
+ size_t start_len;
+ wsrep_uuid_t uuid;
+ ssize_t uuid_len;
+
+ start_len = strlen (start_str);
+ if (start_len < 34)
+ return 1;
+
+ uuid_len = wsrep_uuid_scan (start_str, start_len, &uuid);
+ if (uuid_len < 0 || (start_len - uuid_len) < 2)
+ return 1;
+
+ if (start_str[uuid_len] != ':') // separator should follow UUID
+ return 1;
+
+ char* endptr;
+ wsrep_seqno_t const seqno __attribute__((unused)) // to avoid GCC warnings
+ (strtoll(&start_str[uuid_len + 1], &endptr, 10));
+
+ if (*endptr == '\0') return 0; // remaining string was seqno
+
+ return 1;
+}
+
+bool wsrep_start_position_check (sys_var *self, THD* thd, set_var* var)
+{
+ char buff[FN_REFLEN];
+ String str(buff, sizeof(buff), system_charset_info), *res;
+ const char* start_str = NULL;
+
+ if (!(res = var->value->val_str(&str))) goto err;
+
+ start_str = res->c_ptr();
+
+ if (!start_str) goto err;
+
+ if (!wsrep_start_position_verify(start_str)) return 0;
+
+err:
+
+ my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str,
+ start_str ? start_str : "NULL");
+ return 1;
+}
+
+void wsrep_set_local_position (const char* value)
+{
+ size_t value_len = strlen (value);
+ size_t uuid_len = wsrep_uuid_scan (value, value_len, &local_uuid);
+
+ local_seqno = strtoll (value + uuid_len + 1, NULL, 10);
+
+ XID xid;
+ wsrep_xid_init(&xid, &local_uuid, local_seqno);
+ wsrep_set_SE_checkpoint(&xid);
+ WSREP_INFO ("wsrep_start_position var submitted: '%s'", wsrep_start_position);
+}
+
+bool wsrep_start_position_update (sys_var *self, THD* thd, enum_var_type type)
+{
+ // since this value passed wsrep_start_position_check, don't check anything
+ // here
+ wsrep_set_local_position (wsrep_start_position);
+
+ if (wsrep) {
+ wsrep->sst_received (wsrep, &local_uuid, local_seqno, NULL, 0);
+ }
+
+ return 0;
+}
+
+void wsrep_start_position_init (const char* val)
+{
+ if (NULL == val || wsrep_start_position_verify (val))
+ {
+ WSREP_ERROR("Bad initial value for wsrep_start_position: %s",
+ (val ? val : ""));
+ return;
+ }
+
+ wsrep_start_position = my_strdup(val, MYF(0));
+
+ wsrep_set_local_position (val);
+}
+
+static bool refresh_provider_options()
+{
+ char* opts= wsrep->options_get(wsrep);
+ if (opts)
+ {
+ if (wsrep_provider_options) my_free((void *)wsrep_provider_options);
+ wsrep_provider_options = (char*)my_memdup(opts, strlen(opts) + 1,
+ MYF(MY_WME));
+ }
+ else
+ {
+ WSREP_ERROR("Failed to get provider options");
+ return true;
+ }
+ return false;
+}
+
+static int wsrep_provider_verify (const char* provider_str)
+{
+ MY_STAT f_stat;
+ char path[FN_REFLEN];
+
+ if (!provider_str || strlen(provider_str)== 0)
+ return 1;
+
+ if (!strcmp(provider_str, WSREP_NONE))
+ return 0;
+
+ if (!unpack_filename(path, provider_str))
+ return 1;
+
+ /* check that provider file exists */
+ bzero(&f_stat, sizeof(MY_STAT));
+ if (!my_stat(path, &f_stat, MYF(0)))
+ {
+ return 1;
+ }
+ return 0;
+}
+
+bool wsrep_provider_check (sys_var *self, THD* thd, set_var* var)
+{
+ char buff[FN_REFLEN];
+ String str(buff, sizeof(buff), system_charset_info), *res;
+ const char* provider_str = NULL;
+
+ if (!(res = var->value->val_str(&str))) goto err;
+
+ provider_str = res->c_ptr();
+
+ if (!provider_str) goto err;
+
+ if (!wsrep_provider_verify(provider_str)) return 0;
+
+err:
+
+ my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str,
+ provider_str ? provider_str : "NULL");
+ return 1;
+}
+
+bool wsrep_provider_update (sys_var *self, THD* thd, enum_var_type type)
+{
+ bool rcode= false;
+
+ bool wsrep_on_saved= thd->variables.wsrep_on;
+ thd->variables.wsrep_on= false;
+
+ wsrep_stop_replication(thd);
+ wsrep_deinit();
+
+ char* tmp= strdup(wsrep_provider); // wsrep_init() rewrites provider
+ //when fails
+ if (wsrep_init())
+ {
+ my_error(ER_CANT_OPEN_LIBRARY, MYF(0), tmp);
+ rcode = true;
+ }
+ free(tmp);
+
+ // we sure don't want to use old address with new provider
+ wsrep_cluster_address_init(NULL);
+ wsrep_provider_options_init(NULL);
+
+ thd->variables.wsrep_on= wsrep_on_saved;
+
+ refresh_provider_options();
+
+ return rcode;
+}
+
+void wsrep_provider_init (const char* value)
+{
+ if (NULL == value || wsrep_provider_verify (value))
+ {
+ WSREP_ERROR("Bad initial value for wsrep_provider: %s",
+ (value ? value : ""));
+ return;
+ }
+ wsrep_provider = my_strdup(value, MYF(0));
+}
+
+bool wsrep_provider_options_check(sys_var *self, THD* thd, set_var* var)
+{
+ return 0;
+}
+
+bool wsrep_provider_options_update(sys_var *self, THD* thd, enum_var_type type)
+{
+ wsrep_status_t ret= wsrep->options_set(wsrep, wsrep_provider_options);
+ if (ret != WSREP_OK)
+ {
+ WSREP_ERROR("Set options returned %d", ret);
+ return true;
+ }
+ return refresh_provider_options();
+}
+
+void wsrep_provider_options_init(const char* value)
+{
+ if (wsrep_provider_options && wsrep_provider_options != value)
+ my_free((void *)wsrep_provider_options);
+ wsrep_provider_options = (value) ? my_strdup(value, MYF(0)) : NULL;
+}
+
+static int wsrep_cluster_address_verify (const char* cluster_address_str)
+{
+ /* There is no predefined address format, it depends on provider. */
+ return 0;
+}
+
+bool wsrep_cluster_address_check (sys_var *self, THD* thd, set_var* var)
+{
+ char buff[FN_REFLEN];
+ String str(buff, sizeof(buff), system_charset_info), *res;
+ const char* cluster_address_str = NULL;
+
+ if (!(res = var->value->val_str(&str))) goto err;
+
+ cluster_address_str = res->c_ptr();
+
+ if (!wsrep_cluster_address_verify(cluster_address_str)) return 0;
+
+ err:
+
+ my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str,
+ cluster_address_str ? cluster_address_str : "NULL");
+ return 1 ;
+}
+
+bool wsrep_cluster_address_update (sys_var *self, THD* thd, enum_var_type type)
+{
+ bool wsrep_on_saved= thd->variables.wsrep_on;
+ thd->variables.wsrep_on= false;
+
+ wsrep_stop_replication(thd);
+
+ if (wsrep_start_replication())
+ {
+ wsrep_create_rollbacker();
+ wsrep_create_appliers(wsrep_slave_threads);
+ }
+
+ thd->variables.wsrep_on= wsrep_on_saved;
+
+ return false;
+}
+
+void wsrep_cluster_address_init (const char* value)
+{
+ if (wsrep_cluster_address && wsrep_cluster_address != value)
+ my_free ((void*)wsrep_cluster_address);
+
+ wsrep_cluster_address = (value) ? my_strdup(value, MYF(0)) : NULL;
+}
+
+bool wsrep_cluster_name_check (sys_var *self, THD* thd, set_var* var)
+{
+ char buff[FN_REFLEN];
+ String str(buff, sizeof(buff), system_charset_info), *res;
+ const char* cluster_name_str = NULL;
+
+ if (!(res = var->value->val_str(&str))) goto err;
+
+ cluster_name_str = res->c_ptr();
+
+ if (!cluster_name_str || strlen(cluster_name_str) == 0) goto err;
+
+ return 0;
+
+ err:
+
+ my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str,
+ cluster_name_str ? cluster_name_str : "NULL");
+ return 1;
+}
+
+bool wsrep_cluster_name_update (sys_var *self, THD* thd, enum_var_type type)
+{
+ return 0;
+}
+
+bool wsrep_node_name_check (sys_var *self, THD* thd, set_var* var)
+{
+ char buff[FN_REFLEN];
+ String str(buff, sizeof(buff), system_charset_info), *res;
+ const char* node_name_str = NULL;
+
+ if (!(res = var->value->val_str(&str))) goto err;
+
+ node_name_str = res->c_ptr();
+
+ if (!node_name_str || strlen(node_name_str) == 0) goto err;
+
+ return 0;
+
+ err:
+
+ my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str,
+ node_name_str ? node_name_str : "NULL");
+ return 1;
+}
+
+bool wsrep_node_name_update (sys_var *self, THD* thd, enum_var_type type)
+{
+ return 0;
+}
+
+// TODO: do something more elaborate, like checking connectivity
+bool wsrep_node_address_check (sys_var *self, THD* thd, set_var* var)
+{
+ char buff[FN_REFLEN];
+ String str(buff, sizeof(buff), system_charset_info), *res;
+ const char* node_address_str = NULL;
+
+ if (!(res = var->value->val_str(&str))) goto err;
+
+ node_address_str = res->c_ptr();
+
+ if (!node_address_str || strlen(node_address_str) == 0) goto err;
+
+ return 0;
+
+ err:
+
+ my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str,
+ node_address_str ? node_address_str : "NULL");
+ return 1;
+}
+
+bool wsrep_node_address_update (sys_var *self, THD* thd, enum_var_type type)
+{
+ return 0;
+}
+
+void wsrep_node_address_init (const char* value)
+{
+ if (wsrep_node_address && strcmp(wsrep_node_address, value))
+ my_free ((void*)wsrep_node_address);
+
+ wsrep_node_address = (value) ? my_strdup(value, MYF(0)) : NULL;
+}
+
+/*
+ * Status variables stuff below
+ */
+static inline void
+wsrep_assign_to_mysql (SHOW_VAR* mysql, wsrep_stats_var* wsrep)
+{
+ mysql->name = wsrep->name;
+ switch (wsrep->type) {
+ case WSREP_VAR_INT64:
+ mysql->value = (char*) &wsrep->value._int64;
+ mysql->type = SHOW_LONGLONG;
+ break;
+ case WSREP_VAR_STRING:
+ mysql->value = (char*) &wsrep->value._string;
+ mysql->type = SHOW_CHAR_PTR;
+ break;
+ case WSREP_VAR_DOUBLE:
+ mysql->value = (char*) &wsrep->value._double;
+ mysql->type = SHOW_DOUBLE;
+ break;
+ }
+}
+
+static wsrep_stats_var* wsrep_status_vars = NULL;
+
+#if DYNAMIC
+// somehow this mysql status thing works only with statically allocated arrays.
+static SHOW_VAR* mysql_status_vars = NULL;
+static int mysql_status_len = -1;
+#else
+static SHOW_VAR mysql_status_vars[100 + 1];
+static const int mysql_status_len = 100;
+#endif
+
+static void export_wsrep_status_to_mysql()
+{
+ int wsrep_status_len, i;
+
+ if (wsrep_status_vars) wsrep->stats_free (wsrep, wsrep_status_vars);
+
+ wsrep_status_vars = wsrep->stats_get (wsrep);
+
+ if (!wsrep_status_vars) {
+ return;
+ }
+
+ for (wsrep_status_len = 0;
+ wsrep_status_vars[wsrep_status_len].name != NULL;
+ wsrep_status_len++);
+
+#if DYNAMIC
+ if (wsrep_status_len != mysql_status_len) {
+ void* tmp = realloc (mysql_status_vars,
+ (wsrep_status_len + 1) * sizeof(SHOW_VAR));
+ if (!tmp) {
+
+ sql_print_error ("Out of memory for wsrep status variables."
+ "Number of variables: %d", wsrep_status_len);
+ return;
+ }
+
+ mysql_status_len = wsrep_status_len;
+ mysql_status_vars = (SHOW_VAR*)tmp;
+ }
+ /* @TODO: fix this: */
+#else
+ if (mysql_status_len < wsrep_status_len) wsrep_status_len= mysql_status_len;
+#endif
+
+ for (i = 0; i < wsrep_status_len; i++)
+ wsrep_assign_to_mysql (mysql_status_vars + i, wsrep_status_vars + i);
+
+ mysql_status_vars[wsrep_status_len].name = NullS;
+ mysql_status_vars[wsrep_status_len].value = NullS;
+ mysql_status_vars[wsrep_status_len].type = SHOW_LONG;
+}
+
+int wsrep_show_status (THD *thd, SHOW_VAR *var, char *buff)
+{
+ export_wsrep_status_to_mysql();
+ var->type= SHOW_ARRAY;
+ var->value= (char *) &mysql_status_vars;
+ return 0;
+}
diff --git a/support-files/wsrep.cnf.sh b/support-files/wsrep.cnf.sh
new file mode 100644
index 00000000000..e8483910c0b
--- /dev/null
+++ b/support-files/wsrep.cnf.sh
@@ -0,0 +1,125 @@
+# This file contains wsrep-related mysqld options. It should be included
+# in the main MySQL configuration file.
+#
+# Options that need to be customized:
+# - wsrep_provider
+# - wsrep_cluster_address
+# - wsrep_sst_auth
+# The rest of defaults should work out of the box.
+
+##
+## mysqld options _MANDATORY_ for correct opration of the cluster
+##
+[mysqld]
+
+# (This must be substituted by wsrep_format)
+binlog_format=ROW
+
+# Currently only InnoDB storage engine is supported
+default-storage-engine=innodb
+
+# to avoid issues with 'bulk mode inserts' using autoinc
+innodb_autoinc_lock_mode=2
+
+# This is a must for paralell applying
+innodb_locks_unsafe_for_binlog=1
+
+# Query Cache is not supported with wsrep
+query_cache_size=0
+query_cache_type=0
+
+# Override bind-address
+# In some systems bind-address defaults to 127.0.0.1, and with mysqldump SST
+# it will have (most likely) disastrous consequences on donor node
+bind-address=0.0.0.0
+
+##
+## WSREP options
+##
+
+# Full path to wsrep provider library or 'none'
+wsrep_provider=none
+
+# Provider specific configuration options
+#wsrep_provider_options=
+
+# Logical cluster name. Should be the same for all nodes.
+wsrep_cluster_name="my_wsrep_cluster"
+
+# Group communication system handle
+#wsrep_cluster_address="dummy://"
+
+# Human-readable node name (non-unique). Hostname by default.
+#wsrep_node_name=
+
+# Base replication <address|hostname>[:port] of the node.
+# The values supplied will be used as defaults for state transfer receiving,
+# listening ports and so on. Default: address of the first network interface.
+#wsrep_node_address=
+
+# Address for incoming client connections. Autodetect by default.
+#wsrep_node_incoming_address=
+
+# How many threads will process writesets from other nodes
+wsrep_slave_threads=1
+
+# DBUG options for wsrep provider
+#wsrep_dbug_option
+
+# Generate fake primary keys for non-PK tables (required for multi-master
+# and parallel applying operation)
+wsrep_certify_nonPK=1
+
+# Maximum number of rows in write set
+wsrep_max_ws_rows=131072
+
+# Maximum size of write set
+wsrep_max_ws_size=1073741824
+
+# to enable debug level logging, set this to 1
+wsrep_debug=0
+
+# convert locking sessions into transactions
+wsrep_convert_LOCK_to_trx=0
+
+# how many times to retry deadlocked autocommits
+wsrep_retry_autocommit=1
+
+# change auto_increment_increment and auto_increment_offset automatically
+wsrep_auto_increment_control=1
+
+# retry autoinc insert, which failed for duplicate key error
+wsrep_drupal_282555_workaround=0
+
+# enable "strictly synchronous" semantics for read operations
+wsrep_causal_reads=0
+
+# Command to call when node status or cluster membership changes.
+# Will be passed all or some of the following options:
+# --status - new status of this node
+# --uuid - UUID of the cluster
+# --primary - whether the component is primary or not ("yes"/"no")
+# --members - comma-separated list of members
+# --index - index of this node in the list
+wsrep_notify_cmd=
+
+##
+## WSREP State Transfer options
+##
+
+# State Snapshot Transfer method
+wsrep_sst_method=mysqldump
+
+# Address on THIS node to receive SST at. DON'T SET IT TO DONOR ADDRESS!!!
+# (SST method dependent. Defaults to the first IP of the first interface)
+#wsrep_sst_receive_address=
+
+# SST authentication string. This will be used to send SST to joining nodes.
+# Depends on SST method. For mysqldump method it is root:<root password>
+wsrep_sst_auth=root:
+
+# Desired SST donor name.
+#wsrep_sst_donor=
+
+# Protocol version to use
+# wsrep_protocol_version=
diff --git a/support-files/wsrep_notify.sh b/support-files/wsrep_notify.sh
new file mode 100644
index 00000000000..bdbe3d12a39
--- /dev/null
+++ b/support-files/wsrep_notify.sh
@@ -0,0 +1,102 @@
+#!/bin/sh -eu
+
+# This is a simple example of wsrep notification script (wsrep_notify_cmd).
+# It will create 'wsrep' schema and two tables in it: 'membeship' and 'status'
+# and fill them on every membership or node status change.
+#
+# Edit parameters below to specify the address and login to server.
+
+USER=root
+PSWD=rootpass
+HOST=127.0.0.1
+PORT=3306
+
+SCHEMA="wsrep"
+MEMB_TABLE="$SCHEMA.membership"
+STATUS_TABLE="$SCHEMA.status"
+
+BEGIN="
+SET wsrep_on=0;
+DROP SCHEMA IF EXISTS $SCHEMA; CREATE SCHEMA $SCHEMA;
+CREATE TABLE $MEMB_TABLE (
+ idx INT UNIQUE PRIMARY KEY,
+ uuid CHAR(40) UNIQUE, /* node UUID */
+ name VARCHAR(32), /* node name */
+ addr VARCHAR(256) /* node address */
+) ENGINE=MEMORY;
+CREATE TABLE $STATUS_TABLE (
+ size INT, /* component size */
+ idx INT, /* this node index */
+ status CHAR(16), /* this node status */
+ uuid CHAR(40), /* cluster UUID */
+ prim BOOLEAN /* if component is primary */
+) ENGINE=MEMORY;
+BEGIN;
+DELETE FROM $MEMB_TABLE;
+DELETE FROM $STATUS_TABLE;
+"
+END="COMMIT;"
+
+configuration_change()
+{
+ echo "$BEGIN;"
+
+ local idx=0
+
+ for NODE in $(echo $MEMBERS | sed s/,/\ /g)
+ do
+ echo "INSERT INTO $MEMB_TABLE VALUES ( $idx, "
+ # Don't forget to properly quote string values
+ echo "'$NODE'" | sed s/\\//\',\'/g
+ echo ");"
+ idx=$(( $idx + 1 ))
+ done
+
+ echo "INSERT INTO $STATUS_TABLE VALUES($idx, $INDEX, '$STATUS', '$CLUSTER_UUID', $PRIMARY);"
+
+ echo "$END"
+}
+
+status_update()
+{
+ echo "SET wsrep_on=0; BEGIN; UPDATE $STATUS_TABLE SET status='$STATUS'; COMMIT;"
+}
+
+COM=status_update # not a configuration change by default
+
+while [ $# -gt 0 ]
+do
+ case $1 in
+ --status)
+ STATUS=$2
+ shift
+ ;;
+ --uuid)
+ CLUSTER_UUID=$2
+ shift
+ ;;
+ --primary)
+ [ "$2" = "yes" ] && PRIMARY="1" || PRIMARY="0"
+ COM=configuration_change
+ shift
+ ;;
+ --index)
+ INDEX=$2
+ shift
+ ;;
+ --members)
+ MEMBERS=$2
+ shift
+ ;;
+ esac
+ shift
+done
+
+# Undefined means node is shutting down
+if [ "$STATUS" != "Undefined" ]
+then
+ $COM | mysql -B -u$USER -p$PSWD -h$HOST -P$PORT
+fi
+
+exit 0
+#
diff --git a/wsrep/CMakeLists.txt b/wsrep/CMakeLists.txt
new file mode 100644
index 00000000000..11d0e34d1b0
--- /dev/null
+++ b/wsrep/CMakeLists.txt
@@ -0,0 +1,24 @@
+# Copyright (c) 2012, Codership Oy. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+INCLUDE_DIRECTORIES( "." )
+
+SET(WSREP_SOURCES wsrep_uuid.c wsrep_loader.c wsrep_dummy.c)
+
+ADD_CONVENIENCE_LIBRARY(wsrep ${WSREP_SOURCES})
+DTRACE_INSTRUMENT(wsrep)
+
+#ADD_EXECUTABLE(listener wsrep_listener.c ${WSREP_SOURCES})
+#TARGET_LINK_LIBRARIES(listener ${LIBDL})
diff --git a/wsrep/Makefile.am b/wsrep/Makefile.am
new file mode 100644
index 00000000000..40e4b501e86
--- /dev/null
+++ b/wsrep/Makefile.am
@@ -0,0 +1,2 @@
+noinst_LIBRARIES = libwsrep.a
+libwsrep_a_SOURCES = wsrep_api.h wsrep_loader.c wsrep_dummy.c wsrep_uuid.c
diff --git a/wsrep/wsrep_api.h b/wsrep/wsrep_api.h
new file mode 100644
index 00000000000..2cd10afc7ff
--- /dev/null
+++ b/wsrep/wsrep_api.h
@@ -0,0 +1,875 @@
+/* Copyright (C) 2009-2011 Codership Oy <info@codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#ifndef WSREP_H
+#define WSREP_H
+
+#include <stdint.h>
+#include <stdbool.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <time.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/*!
+ * wsrep replication API
+ */
+
+#define WSREP_INTERFACE_VERSION "23"
+
+/*!
+ * Certain provider capabilities application may need to know
+ */
+#define WSREP_CAP_MULTI_MASTER ( 1ULL << 0 )
+#define WSREP_CAP_CERTIFICATION ( 1ULL << 1 )
+#define WSREP_CAP_PARALLEL_APPLYING ( 1ULL << 2 )
+#define WSREP_CAP_TRX_REPLAY ( 1ULL << 3 )
+#define WSREP_CAP_ISOLATION ( 1ULL << 4 )
+#define WSREP_CAP_PAUSE ( 1ULL << 5 )
+#define WSREP_CAP_CAUSAL_READS ( 1ULL << 6 )
+#define WSREP_CAP_CAUSAL_TRX ( 1ULL << 7 )
+#define WSREP_CAP_WRITE_SET_INCREMENTS ( 1ULL << 8 )
+#define WSREP_CAP_SESSION_LOCKS ( 1ULL << 9 )
+#define WSREP_CAP_DISTRIBUTED_LOCKS ( 1ULL << 10 )
+#define WSREP_CAP_CONSISTENCY_CHECK ( 1ULL << 11 )
+
+/*!
+ * Write set replication flags
+ */
+#define WSREP_FLAG_PA_SAFE ( 1ULL << 0 )
+
+/* Empty backend spec */
+#define WSREP_NONE "none"
+
+typedef uint64_t wsrep_trx_id_t; //!< application transaction ID
+typedef uint64_t wsrep_conn_id_t; //!< application connection ID
+typedef int64_t wsrep_seqno_t; //!< sequence number of a writeset, etc.
+
+/*! undefined seqno */
+#define WSREP_SEQNO_UNDEFINED (-1)
+
+/*! wsrep status codes */
+typedef enum wsrep_status {
+ WSREP_OK = 0, //!< success
+ WSREP_WARNING, //!< minor warning, error logged
+ WSREP_TRX_MISSING, //!< transaction is not known by wsrep
+ WSREP_TRX_FAIL, //!< transaction aborted, server can continue
+ WSREP_BF_ABORT, //!< trx was victim of brute force abort
+ WSREP_CONN_FAIL, //!< error in client connection, must abort
+ WSREP_NODE_FAIL, //!< error in node state, wsrep must reinit
+ WSREP_FATAL, //!< fatal error, server must abort
+ WSREP_NOT_IMPLEMENTED //!< feature not implemented
+} wsrep_status_t;
+
+/*!
+ * @brief log severity levels, passed as first argument to log handler
+ */
+typedef enum wsrep_log_level
+{
+ WSREP_LOG_FATAL, //!< Unrecoverable error, application must quit.
+ WSREP_LOG_ERROR, //!< Operation failed, must be repeated.
+ WSREP_LOG_WARN, //!< Unexpected condition, but no operational failure.
+ WSREP_LOG_INFO, //!< Informational message.
+ WSREP_LOG_DEBUG //!< Debug message. Shows only of compiled with debug.
+} wsrep_log_level_t;
+
+/*!
+ * @brief error log handler
+ *
+ * All messages from wsrep library are directed to this
+ * handler, if present.
+ *
+ * @param level log level
+ * @param message log message
+ */
+typedef void (*wsrep_log_cb_t)(wsrep_log_level_t, const char *);
+
+/*!
+ * UUID type - for all unique IDs
+ */
+typedef struct wsrep_uuid {
+ uint8_t uuid[16];
+} wsrep_uuid_t;
+
+/*! Undefined UUID */
+static const wsrep_uuid_t WSREP_UUID_UNDEFINED = {{0,}};
+
+/*!
+ * Scan UUID from string
+ * @return length of UUID string representation or negative error code
+ */
+extern ssize_t
+wsrep_uuid_scan (const char* str, size_t str_len, wsrep_uuid_t* uuid);
+
+/*!
+ * Print UUID to string
+ * @return length of UUID string representation or negative error code
+ */
+extern ssize_t
+wsrep_uuid_print (const wsrep_uuid_t* uuid, char* str, size_t str_len);
+
+#define WSREP_MEMBER_NAME_LEN 32 //!< maximum logical member name length
+#define WSREP_INCOMING_LEN 256 //!< max Domain Name length + 0x00
+
+/*!
+ * member status
+ */
+typedef enum wsrep_member_status {
+ WSREP_MEMBER_UNDEFINED, //!< undefined state
+ WSREP_MEMBER_JOINER, //!< incomplete state, requested state transfer
+ WSREP_MEMBER_DONOR, //!< complete state, donates state transfer
+ WSREP_MEMBER_JOINED, //!< complete state
+ WSREP_MEMBER_SYNCED, //!< complete state, synchronized with group
+ WSREP_MEMBER_ERROR, //!< this and above is provider-specific error code
+ WSREP_MEMBER_MAX
+} wsrep_member_status_t;
+
+/*!
+ * static information about a group member (some fields are tentative yet)
+ */
+typedef struct wsrep_member_info {
+ wsrep_uuid_t id; //!< group-wide unique member ID
+ char name[WSREP_MEMBER_NAME_LEN]; //!< human-readable name
+ char incoming[WSREP_INCOMING_LEN]; //!< address for client requests
+} wsrep_member_info_t;
+
+/*!
+ * group status
+ */
+typedef enum wsrep_view_status {
+ WSREP_VIEW_PRIMARY, //!< primary group configuration (quorum present)
+ WSREP_VIEW_NON_PRIMARY, //!< non-primary group configuration (quorum lost)
+ WSREP_VIEW_DISCONNECTED, //!< not connected to group, retrying.
+ WSREP_VIEW_MAX
+} wsrep_view_status_t;
+
+/*!
+ * view of the group
+ */
+typedef struct wsrep_view_info {
+ wsrep_uuid_t uuid; //!< global state UUID
+ wsrep_seqno_t seqno; //!< global state seqno
+ wsrep_seqno_t view; //!< global view number
+ wsrep_view_status_t status; //!< view status
+ bool state_gap; //!< gap between global and local states
+ int my_idx; //!< index of this member in the view
+ int memb_num; //!< number of members in the view
+ int proto_ver; //!< application protocol agreed on in the view
+ wsrep_member_info_t members[1]; //!< array of member information
+} wsrep_view_info_t;
+
+/*!
+ * Magic string to tell provider to engage into trivial (empty) state transfer.
+ * No data will be passed, but the node shall be considered JOINED.
+ * Should be passed in sst_req parameter of wsrep_view_cb_t.
+ */
+#define WSREP_STATE_TRANSFER_TRIVIAL "trivial"
+
+/*!
+ * Magic string to tell provider not to engage in state transfer at all.
+ * The member will stay in WSREP_MEMBER_UNDEFINED state but will keep on
+ * receiving all writesets.
+ * Should be passed in sst_req parameter of wsrep_view_cb_t.
+ */
+#define WSREP_STATE_TRANSFER_NONE "none"
+
+/*!
+ * @brief group view handler
+ *
+ * This handler is called in total order corresponding to the group
+ * configuration change. It is to provide a vital information about
+ * new group view. If view info indicates existence of discontinuity
+ * between group and member states, state transfer request message
+ * should be filled in by the callback implementation.
+ *
+ * @note Currently it is assumed that sst_req is allocated using
+ * malloc()/calloc()/realloc() and it will be freed by
+ * wsrep implementation.
+ *
+ * @param app_ctx application context
+ * @param recv_ctx receiver context
+ * @param view new view on the group
+ * @param state current state
+ * @param state_len lenght of current state
+ * @param sst_req location to store SST request
+ * @param sst_req_len location to store SST request length or error code
+ * value of 0 means no SST.
+ */
+typedef void (*wsrep_view_cb_t) (void* app_ctx,
+ void* recv_ctx,
+ const wsrep_view_info_t* view,
+ const char* state,
+ size_t state_len,
+ void** sst_req,
+ ssize_t* sst_req_len);
+
+/*!
+ * @brief apply callback
+ *
+ * This handler is called from wsrep library to apply replicated write set
+ * Must support brute force applying for multi-master operation
+ *
+ * @param recv_ctx receiver context pointer provided by the application
+ * @param data data buffer containing the write set
+ * @param size data buffer size
+ * @param seqno global seqno part of the write set to be applied
+ *
+ * @return success code:
+ * @retval WSREP_OK
+ * @retval WSREP_NOT_IMPLEMENTED appl. does not support the write set format
+ * @retval WSREP_ERROR failed to apply the write set
+ */
+typedef enum wsrep_status (*wsrep_apply_cb_t) (void* recv_ctx,
+ const void* data,
+ size_t size,
+ wsrep_seqno_t seqno);
+
+/*!
+ * @brief commit callback
+ *
+ * This handler is called to commit the changes made by apply callback.
+ *
+ * @param recv_ctx receiver context pointer provided by the application
+ * @param seqno global seqno part of the write set to be committed
+ * @param commit true - commit writeset, false - rollback writeset
+ *
+ * @return success code:
+ * @retval WSREP_OK
+ * @retval WSREP_ERROR call failed
+ */
+typedef enum wsrep_status (*wsrep_commit_cb_t) (void* recv_ctx,
+ wsrep_seqno_t seqno,
+ bool commit);
+
+/*!
+ * @brief a callback to donate state snapshot
+ *
+ * This handler is called from wsrep library when it needs this node
+ * to deliver state to a new cluster member.
+ * No state changes will be committed for the duration of this call.
+ * Wsrep implementation may provide internal state to be transmitted
+ * to new cluster member for initial state.
+ *
+ * @param app_ctx application context
+ * @param recv_ctx receiver context
+ * @param msg state transfer request message
+ * @param msg_len state transfer request message length
+ * @param uuid current state uuid on this node
+ * @param seqno current state seqno on this node
+ * @param state current wsrep internal state buffer
+ * @param state_len current wsrep internal state buffer len
+ * @param bypass bypass snapshot transfer, only transfer uuid:seqno pair
+ * @return 0 for success or negative error code
+ */
+typedef int (*wsrep_sst_donate_cb_t) (void* app_ctx,
+ void* recv_ctx,
+ const void* msg,
+ size_t msg_len,
+ const wsrep_uuid_t* uuid,
+ wsrep_seqno_t seqno,
+ const char* state,
+ size_t state_len,
+ bool bypass);
+
+/*!
+ * @brief a callback to signal application that wsrep state is synced
+ * with cluster
+ *
+ * This callback is called after wsrep library has got in sync with
+ * rest of the cluster.
+ *
+ * @param app_ctx application context
+ */
+typedef void (*wsrep_synced_cb_t)(void* app_ctx);
+
+
+/*!
+ * Initialization parameters for wsrep, used as arguments for wsrep_init()
+ */
+struct wsrep_init_args
+{
+ void* app_ctx; //!< Application context for callbacks
+
+ /* Configuration parameters */
+ const char* node_name; //!< Symbolic name of this node (e.g. hostname)
+ const char* node_address; //!< Address to be used by wsrep provider
+ const char* node_incoming; //!< Address for incoming client connections
+ const char* data_dir; //!< Directory where wsrep files are kept if any
+ const char* options; //!< Provider-specific configuration string
+ int proto_ver; //!< Max supported application protocol version
+
+ /* Application initial state information. */
+ const wsrep_uuid_t* state_uuid; //!< Application state sequence UUID
+ wsrep_seqno_t state_seqno; //!< Applicaiton state sequence number
+ const char* state; //!< Initial state for wsrep implementation
+ size_t state_len; //!< Length of state buffer
+
+ /* Application callbacks */
+ wsrep_log_cb_t logger_cb; //!< logging handler
+ wsrep_view_cb_t view_handler_cb; //!< group view change handler
+
+ /* applier callbacks */
+ wsrep_apply_cb_t apply_cb; //!< apply callback
+ wsrep_commit_cb_t commit_cb; //!< commit callback
+
+ /* state snapshot transfer callbacks */
+ wsrep_sst_donate_cb_t sst_donate_cb; //!< starting to donate
+ wsrep_synced_cb_t synced_cb; //!< synced with group
+};
+
+/*! Type of the stats variable value in struct wsrep_status_var */
+typedef enum wsrep_var_type
+{
+ WSREP_VAR_STRING, //!< pointer to null-terminated string
+ WSREP_VAR_INT64, //!< int64_t
+ WSREP_VAR_DOUBLE //!< double
+}
+wsrep_var_type_t;
+
+/*! Generalized stats variable representation */
+struct wsrep_stats_var
+{
+ const char* name; //!< variable name
+ wsrep_var_type_t type; //!< variable value type
+ union {
+ int64_t _int64;
+ double _double;
+ const char* _string;
+ } value; //!< variable value
+};
+
+
+/*! Key part structure */
+typedef struct wsrep_key_part_
+{
+ const void* buf; /*!< Buffer containing key part data */
+ size_t buf_len; /*!< Length of buffer */
+} wsrep_key_part_t;
+
+/*! Key struct used to pass certification keys for transaction handling calls.
+ * A key consists of zero or more key parts. */
+typedef struct wsrep_key_
+{
+ const wsrep_key_part_t* key_parts; /*!< Array of key parts */
+ size_t key_parts_len; /*!< Length of key parts array */
+} wsrep_key_t;
+
+/*! Transaction handle struct passed for wsrep transaction handling calls */
+typedef struct wsrep_trx_handle_
+{
+ wsrep_trx_id_t trx_id; //!< transaction ID
+ void* opaque; //!< opaque provider transaction context data
+} wsrep_trx_handle_t;
+
+/*!
+ * @brief Helper method to reset trx handle state when trx id changes
+ *
+ * Instead of passing wsrep_trx_handle_t directly for wsrep calls,
+ * wrapping handle with this call offloads bookkeeping from
+ * application.
+ */
+static inline wsrep_trx_handle_t* wsrep_trx_handle_for_id(
+ wsrep_trx_handle_t* trx_handle,
+ wsrep_trx_id_t trx_id)
+{
+ if (trx_handle->trx_id != trx_id)
+ {
+ trx_handle->trx_id = trx_id;
+ trx_handle->opaque = NULL;
+ }
+ return trx_handle;
+}
+
+
+typedef struct wsrep_ wsrep_t;
+/*!
+ * wsrep interface for dynamically loadable libraries
+ */
+struct wsrep_ {
+
+ const char *version; //!< interface version string
+
+ /*!
+ * @brief Initializes wsrep provider
+ *
+ * @param wsrep this wsrep handle
+ * @param args wsrep initialization parameters
+ */
+ wsrep_status_t (*init) (wsrep_t* wsrep,
+ const struct wsrep_init_args* args);
+
+ /*!
+ * @brief Returns provider capabilities flag bitmap
+ *
+ * @param wsrep this wsrep handle
+ */
+ uint64_t (*capabilities) (wsrep_t* wsrep);
+
+ /*!
+ * @brief Passes provider-specific configuration string to provider.
+ *
+ * @param wsrep this wsrep handle
+ * @param conf configuration string
+ *
+ * @retval WSREP_OK configuration string was parsed successfully
+ * @retval WSREP_WARNING could't not parse conf string, no action taken
+ */
+ wsrep_status_t (*options_set) (wsrep_t* wsrep, const char* conf);
+
+ /*!
+ * @brief Returns provider-specific string with current configuration values.
+ *
+ * @param wsrep this wsrep handle
+ *
+ * @return a dynamically allocated string with current configuration
+ * parameter values
+ */
+ char* (*options_get) (wsrep_t* wsrep);
+
+ /*!
+ * @brief Opens connection to cluster
+ *
+ * Returns when either node is ready to operate as a part of the clsuter
+ * or fails to reach operating status.
+ *
+ * @param wsrep this wsrep handle
+ * @param cluster_name unique symbolic cluster name
+ * @param cluster_url URL-like cluster address (backend://address)
+ * @param state_donor name of the node to be asked for state transfer.
+ */
+ wsrep_status_t (*connect) (wsrep_t* wsrep,
+ const char* cluster_name,
+ const char* cluster_url,
+ const char* state_donor);
+
+ /*!
+ * @brief Closes connection to cluster.
+ *
+ * If state_uuid and/or state_seqno is not NULL, will store final state
+ * in there.
+ *
+ * @param wsrep this wsrep handler
+ */
+ wsrep_status_t (*disconnect)(wsrep_t* wsrep);
+
+ /*!
+ * @brief start receiving replication events
+ *
+ * This function never returns
+ *
+ * @param wsrep this wsrep handle
+ * @param recv_ctx receiver context
+ */
+ wsrep_status_t (*recv)(wsrep_t* wsrep, void* recv_ctx);
+
+ /*!
+ * @brief Replicates/logs result of transaction to other nodes and allocates
+ * required resources.
+ *
+ * Must be called before transaction commit. Returns success code, which
+ * caller must check.
+ * In case of WSREP_OK, starts commit critical section, transaction can
+ * commit. Otherwise transaction must rollback.
+ *
+ * @param wsrep this wsrep handle
+ * @param trx_handle transaction which is committing
+ * @param conn_id connection ID
+ * @param app_data application specific applying data
+ * @param data_len the size of the applying data
+ * @param flags fine tuning the replication WSREP_FLAG_*
+ * @param seqno seqno part of the global transaction ID
+ *
+ * @retval WSREP_OK cluster-wide commit succeeded
+ * @retval WSREP_TRX_FAIL must rollback transaction
+ * @retval WSREP_CONN_FAIL must close client connection
+ * @retval WSREP_NODE_FAIL must close all connections and reinit
+ */
+ wsrep_status_t (*pre_commit)(wsrep_t* wsrep,
+ wsrep_conn_id_t conn_id,
+ wsrep_trx_handle_t* trx_handle,
+ const void* app_data,
+ size_t data_len,
+ uint64_t flags,
+ wsrep_seqno_t* seqno);
+
+ /*!
+ * @brief Releases resources after transaction commit.
+ *
+ * Ends commit critical section.
+ *
+ * @param wsrep this wsrep handle
+ * @param trx_handle transaction which is committing
+ * @retval WSREP_OK post_commit succeeded
+ */
+ wsrep_status_t (*post_commit) (wsrep_t* wsrep,
+ wsrep_trx_handle_t* trx_handle);
+
+ /*!
+ * @brief Releases resources after transaction rollback.
+ *
+ * @param wsrep this wsrep handle
+ * @param trx_handle transaction which is committing
+ * @retval WSREP_OK post_rollback succeeded
+ */
+ wsrep_status_t (*post_rollback)(wsrep_t* wsrep,
+ wsrep_trx_handle_t* trx_handle);
+
+ /*!
+ * @brief Replay trx as a slave write set
+ *
+ * If local trx has been aborted by brute force, and it has already
+ * replicated before this abort, we must try if we can apply it as
+ * slave trx. Note that slave nodes see only trx write sets and certification
+ * test based on write set content can be different to DBMS lock conflicts.
+ *
+ * @param wsrep this wsrep handle
+ * @param trx_handle transaction which is committing
+ * @param trx_ctx transaction context
+ *
+ * @retval WSREP_OK cluster commit succeeded
+ * @retval WSREP_TRX_FAIL must rollback transaction
+ * @retval WSREP_BF_ABORT brute force abort happened after trx replicated
+ * must rollback transaction and try to replay
+ * @retval WSREP_CONN_FAIL must close client connection
+ * @retval WSREP_NODE_FAIL must close all connections and reinit
+ */
+ wsrep_status_t (*replay_trx)(wsrep_t* wsrep,
+ wsrep_trx_handle_t* trx_handle,
+ void* trx_ctx);
+
+ /*!
+ * @brief Abort pre_commit() call of another thread.
+ *
+ * It is possible, that some high-priority transaction needs to abort
+ * another transaction which is in pre_commit() call waiting for resources.
+ *
+ * The kill routine checks that abort is not attmpted against a transaction
+ * which is front of the caller (in total order).
+ *
+ * @param wsrep this wsrep handle
+ * @param bf_seqno seqno of brute force trx, running this cancel
+ * @param victim_trx transaction to be aborted, and which is committing
+ *
+ * @retval WSREP_OK abort secceded
+ * @retval WSREP_WARNING abort failed
+ */
+ wsrep_status_t (*abort_pre_commit)(wsrep_t* wsrep,
+ wsrep_seqno_t bf_seqno,
+ wsrep_trx_id_t victim_trx);
+
+ /*!
+ * @brief Appends a query in transaction's write set
+ *
+ * @param wsrep this wsrep handle
+ * @param trx_handle transaction handle
+ * @param query SQL statement string
+ * @param timeval time to use for time functions
+ * @param randseed seed for rand
+ */
+ wsrep_status_t (*append_query)(wsrep_t* wsrep,
+ wsrep_trx_handle_t* trx_handle,
+ const char* query,
+ time_t timeval,
+ uint32_t randseed);
+
+ /*!
+ * @brief Appends a row reference in transaction's write set
+ *
+ * @param wsrep this wsrep handle
+ * @param trx_handle transaction handle
+ * @param key array of keys
+ * @param key_len length of the array of keys
+ * @param shared boolean denoting if key corresponds to shared resource
+ */
+ wsrep_status_t (*append_key)(wsrep_t* wsrep,
+ wsrep_trx_handle_t* trx_handle,
+ const wsrep_key_t* key,
+ size_t key_len,
+ bool shared);
+ /*!
+ * @brief Appends data in transaction's write set
+ *
+ * This method can be called any time before commit and it
+ * appends data block into transaction's write set.
+ *
+ * @param wsrep this wsrep handle
+ * @param trx_handle transaction handle
+ * @param data data buffer
+ * @param data_len data buffer length
+ */
+ wsrep_status_t (*append_data)(wsrep_t* wsrep,
+ wsrep_trx_handle_t* trx_handle,
+ const void* data,
+ size_t data_len);
+
+
+ /*!
+ * @brief Get causal ordering for read operation
+ *
+ * This call will block until causal ordering with all possible
+ * preceding writes in the cluster is guaranteed. If pointer to
+ * seqno is non-null, the call stores the global transaction ID
+ * of the last transaction which is guaranteed to be ordered
+ * causally before this call.
+ *
+ * @param wsrep this wsrep handle
+ * @param seqno location to store global transaction ID
+ */
+ wsrep_status_t (*causal_read)(wsrep_t* wsrep, wsrep_seqno_t* seqno);
+
+ /*!
+ * @brief Clears allocated connection context.
+ *
+ * Whenever a new connection ID is passed to wsrep provider through
+ * any of the API calls, a connection context is allocated for this
+ * connection. This call is to explicitly notify provider fo connection
+ * closing.
+ *
+ * @param wsrep this wsrep handle
+ * @param conn_id connection ID
+ * @param query the 'set database' query
+ * @param query_len length of query (does not end with 0)
+ */
+ wsrep_status_t (*free_connection)(wsrep_t* wsrep,
+ wsrep_conn_id_t conn_id);
+
+ /*!
+ * @brief Replicates a query and starts "total order isolation" section.
+ *
+ * Replicates the query and returns success code, which
+ * caller must check. Total order isolation continues
+ * until to_execute_end() is called.
+ *
+ * @param wsrep this wsrep handle
+ * @param conn_id connection ID
+ * @param key array of keys
+ * @param key_len lenght of the array of keys
+ * @param query query to be executed
+ * @param query_len length of the query string
+ * @param seqno seqno part of the action ID
+ *
+ * @retval WSREP_OK cluster commit succeeded
+ * @retval WSREP_CONN_FAIL must close client connection
+ * @retval WSREP_NODE_FAIL must close all connections and reinit
+ */
+ wsrep_status_t (*to_execute_start)(wsrep_t* wsrep,
+ wsrep_conn_id_t conn_id,
+ const wsrep_key_t* key,
+ size_t key_len,
+ const void* query,
+ size_t query_len,
+ wsrep_seqno_t* seqno);
+
+ /*!
+ * @brief Ends the total order isolation section.
+ *
+ * Marks the end of total order isolation. TO locks are freed
+ * and other transactions are free to commit from this point on.
+ *
+ * @param wsrep this wsrep handle
+ * @param conn_id connection ID
+ *
+ * @retval WSREP_OK cluster commit succeeded
+ * @retval WSREP_CONN_FAIL must close client connection
+ * @retval WSREP_NODE_FAIL must close all connections and reinit
+ */
+ wsrep_status_t (*to_execute_end)(wsrep_t* wsrep, wsrep_conn_id_t conn_id);
+
+ /*!
+ * @brief Signals to wsrep provider that state snapshot has been sent to
+ * joiner.
+ *
+ * @param wsrep this wsrep handle
+ * @param uuid sequence UUID (group UUID)
+ * @param seqno sequence number or negative error code of the operation
+ */
+ wsrep_status_t (*sst_sent)(wsrep_t* wsrep,
+ const wsrep_uuid_t* uuid,
+ wsrep_seqno_t seqno);
+
+ /*!
+ * @brief Signals to wsrep provider that new state snapshot has been received.
+ * May deadlock if called from sst_prepare_cb.
+ *
+ * @param wsrep this wsrep handle
+ * @param uuid sequence UUID (group UUID)
+ * @param seqno sequence number or negative error code of the operation
+ * @param state initial state provided by SST donor
+ * @param state_len length of state buffer
+ */
+ wsrep_status_t (*sst_received)(wsrep_t* wsrep,
+ const wsrep_uuid_t* uuid,
+ wsrep_seqno_t seqno,
+ const char* state,
+ size_t state_len);
+
+
+ /*!
+ * @brief Generate request for consistent snapshot.
+ *
+ * If successfull, this call will generate internally SST request
+ * which in turn triggers calling SST donate callback on the nodes
+ * specified in donor_spec. If donor_spec is null, callback is
+ * called only locally. This call will block until sst_sent is called
+ * from callback.
+ *
+ * @param wsrep this wsrep handle
+ * @param msg context message for SST donate callback
+ * @param msg_len length of context message
+ * @param donor_spec list of snapshot donors
+ */
+ wsrep_status_t (*snapshot)(wsrep_t* wsrep,
+ const void* msg,
+ size_t msg_len,
+ const char* donor_spec);
+
+ /*!
+ * @brief Returns an array fo status variables.
+ * Array is terminated by Null variable name.
+ *
+ * @param wsrep this wsrep handle
+ * @return array of struct wsrep_status_var.
+ */
+ struct wsrep_stats_var* (*stats_get) (wsrep_t* wsrep);
+
+ /*!
+ * @brief Release resources that might be associated with the array.
+ *
+ * @param wsrep this wsrep handle.
+ */
+ void (*stats_free) (wsrep_t* wsrep, struct wsrep_stats_var* var_array);
+
+ /*!
+ * @brief Pauses writeset applying/committing.
+ *
+ * @return global sequence number of the paused state or negative error code.
+ */
+ wsrep_seqno_t (*pause) (wsrep_t* wsrep);
+
+ /*!
+ * @brief Resumes writeset applying/committing.
+ */
+ wsrep_status_t (*resume) (wsrep_t* wsrep);
+
+ /*!
+ * @brief Desynchronize from cluster
+ *
+ * Effectively turns off flow control for this node, allowing it
+ * to fall behind the cluster.
+ */
+ wsrep_status_t (*desync) (wsrep_t* wsrep);
+
+ /*!
+ * @brief Request to resynchronize with cluster.
+ *
+ * Effectively turns on flow control. Asynchronous - actual synchronization
+ * event to be deliverred via sync_cb.
+ */
+ wsrep_status_t (*resync) (wsrep_t* wsrep);
+
+ /*!
+ * @brief Acquire global named lock
+ *
+ * @param wsrep wsrep provider handle
+ * @param name lock name
+ * @param owner 64-bit owner ID
+ * @param tout timeout in nanoseconds.
+ * 0 - return immediately, -1 wait forever.
+ * @return wsrep status or negative error code
+ * @retval -EDEADLK lock was already acquired by this thread
+ * @retval -EBUSY lock was busy
+ */
+ wsrep_status_t (*lock) (wsrep_t* wsrep, const char* name, int64_t owner,
+ int64_t tout);
+
+ /*!
+ * @brief Release global named lock
+ *
+ * @param wsrep wsrep provider handle
+ * @param name lock name
+ * @param owner 64-bit owner ID
+ * @return wsrep status or negative error code
+ * @retval -EPERM lock does not belong to this owner
+ */
+ wsrep_status_t (*unlock) (wsrep_t* wsrep, const char* name, int64_t owner);
+
+ /*!
+ * @brief Check if global named lock is locked
+ *
+ * @param wsrep wsrep provider handle
+ * @param name lock name
+ * @param owner if not NULL will contain 64-bit owner ID
+ * @param node if not NULL will contain owner's node UUID
+ * @return true if lock is locked
+ */
+ bool (*is_locked) (wsrep_t* wsrep, const char* name, int64_t* conn,
+ wsrep_uuid_t* node);
+
+ /*!
+ * wsrep provider name
+ */
+ const char* provider_name;
+
+ /*!
+ * wsrep provider version
+ */
+ const char* provider_version;
+
+ /*!
+ * wsrep provider vendor name
+ */
+ const char* provider_vendor;
+
+ /*!
+ * @brief Frees allocated resources before unloading the library.
+ * @param wsrep this wsrep handle
+ */
+ void (*free)(wsrep_t* wsrep);
+
+ void *dlh; //!< reserved for future use
+ void *ctx; //!< reserved for implemetation private context
+};
+
+typedef int (*wsrep_loader_fun)(wsrep_t*);
+
+/*!
+ *
+ * @brief Loads wsrep library
+ *
+ * @param spec path to wsrep library. If NULL or WSREP_NONE initialises dummy
+ * pass-through implementation.
+ * @param hptr wsrep handle
+ * @param log_cb callback to handle loader messages. Otherwise writes to stderr.
+ *
+ * @return zero on success, errno on failure
+ */
+int wsrep_load(const char* spec, wsrep_t** hptr, wsrep_log_cb_t log_cb);
+
+/*!
+ * @brief Unloads wsrep library and frees associated resources
+ *
+ * @param hptr wsrep handler pointer
+ */
+void wsrep_unload(wsrep_t* hptr);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* WSREP_H */
diff --git a/wsrep/wsrep_dummy.c b/wsrep/wsrep_dummy.c
new file mode 100644
index 00000000000..6d01ce14b4e
--- /dev/null
+++ b/wsrep/wsrep_dummy.c
@@ -0,0 +1,368 @@
+/* Copyright (C) 2009-2010 Codership Oy <info@codersihp.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+/*! @file Dummy wsrep API implementation. */
+
+#include <errno.h>
+
+#include "wsrep_api.h"
+
+/*! Dummy backend context. */
+typedef struct wsrep_dummy
+{
+ wsrep_log_cb_t log_fn;
+} wsrep_dummy_t;
+
+/* Get pointer to wsrep_dummy context from wsrep_t pointer */
+#define WSREP_DUMMY(_p) ((wsrep_dummy_t *) (_p)->ctx)
+
+/* Trace function usage a-la DBUG */
+#define WSREP_DBUG_ENTER(_w) do { \
+ if (WSREP_DUMMY(_w)) { \
+ if (WSREP_DUMMY(_w)->log_fn) \
+ WSREP_DUMMY(_w)->log_fn(WSREP_LOG_DEBUG, __FUNCTION__); \
+ } \
+ } while (0)
+
+
+static void dummy_free(wsrep_t *w)
+{
+ WSREP_DBUG_ENTER(w);
+ free(w->ctx);
+ w->ctx = NULL;
+}
+
+static wsrep_status_t dummy_init (wsrep_t* w,
+ const struct wsrep_init_args* args)
+{
+ WSREP_DUMMY(w)->log_fn = args->logger_cb;
+ WSREP_DBUG_ENTER(w);
+ return WSREP_OK;
+}
+
+static uint64_t dummy_capabilities (wsrep_t* w __attribute__((unused)))
+{
+ return 0;
+}
+
+static wsrep_status_t dummy_options_set(
+ wsrep_t* w,
+ const char* conf __attribute__((unused)))
+{
+ WSREP_DBUG_ENTER(w);
+ return WSREP_OK;
+}
+
+static char* dummy_options_get (wsrep_t* w)
+{
+ WSREP_DBUG_ENTER(w);
+ return NULL;
+}
+
+static wsrep_status_t dummy_connect(
+ wsrep_t* w,
+ const char* name __attribute__((unused)),
+ const char* url __attribute__((unused)),
+ const char* donor __attribute__((unused)))
+{
+ WSREP_DBUG_ENTER(w);
+ return WSREP_OK;
+}
+
+static wsrep_status_t dummy_disconnect(wsrep_t* w)
+{
+ WSREP_DBUG_ENTER(w);
+ return WSREP_OK;
+}
+
+static wsrep_status_t dummy_recv(wsrep_t* w,
+ void* recv_ctx __attribute__((unused)))
+{
+ WSREP_DBUG_ENTER(w);
+ return WSREP_OK;
+}
+
+static wsrep_status_t dummy_pre_commit(
+ wsrep_t* w,
+ const wsrep_conn_id_t conn_id __attribute__((unused)),
+ wsrep_trx_handle_t* trx_handle __attribute__((unused)),
+ const void* query __attribute__((unused)),
+ const size_t query_len __attribute__((unused)),
+ uint64_t flags __attribute__((unused)),
+ wsrep_seqno_t* seqno __attribute__((unused)))
+{
+ WSREP_DBUG_ENTER(w);
+ return WSREP_OK;
+}
+
+static wsrep_status_t dummy_post_commit(
+ wsrep_t* w,
+ wsrep_trx_handle_t* trx_handle __attribute__((unused)))
+{
+ WSREP_DBUG_ENTER(w);
+ return WSREP_OK;
+}
+
+static wsrep_status_t dummy_post_rollback(
+ wsrep_t* w,
+ wsrep_trx_handle_t* trx_handle __attribute__((unused)))
+{
+ WSREP_DBUG_ENTER(w);
+ return WSREP_OK;
+}
+
+static wsrep_status_t dummy_replay_trx(
+ wsrep_t* w,
+ wsrep_trx_handle_t* trx_handle __attribute__((unused)),
+ void* trx_ctx __attribute__((unused)))
+{
+ WSREP_DBUG_ENTER(w);
+ return WSREP_OK;
+}
+
+static wsrep_status_t dummy_abort_pre_commit(
+ wsrep_t* w,
+ const wsrep_seqno_t bf_seqno __attribute__((unused)),
+ const wsrep_trx_id_t trx_id __attribute__((unused)))
+{
+ WSREP_DBUG_ENTER(w);
+ return WSREP_OK;
+}
+
+static wsrep_status_t dummy_append_query(
+ wsrep_t* w,
+ wsrep_trx_handle_t* trx_handle __attribute__((unused)),
+ const char* query __attribute__((unused)),
+ const time_t timeval __attribute__((unused)),
+ const uint32_t randseed __attribute__((unused)))
+{
+ WSREP_DBUG_ENTER(w);
+ return WSREP_OK;
+}
+
+static wsrep_status_t dummy_append_row_key(
+ wsrep_t* w,
+ wsrep_trx_handle_t* trx_handle __attribute__((unused)),
+ const wsrep_key_t* key __attribute__((unused)),
+ const size_t key_len __attribute__((unused)),
+ const bool shared __attribute__((unused)))
+{
+ WSREP_DBUG_ENTER(w);
+ return WSREP_OK;
+}
+
+static wsrep_status_t dummy_append_data(
+ wsrep_t* w,
+ wsrep_trx_handle_t* trx_handle __attribute__((unused)),
+ const void* data __attribute__((unused)),
+ size_t data_len __attribute__((unused)))
+{
+ WSREP_DBUG_ENTER(w);
+ return WSREP_OK;
+}
+
+static wsrep_status_t dummy_causal_read(
+ wsrep_t* w,
+ wsrep_seqno_t* seqno __attribute__((unused)))
+{
+ WSREP_DBUG_ENTER(w);
+ return WSREP_OK;
+}
+
+static wsrep_status_t dummy_free_connection(
+ wsrep_t* w,
+ const wsrep_conn_id_t conn_id __attribute__((unused)))
+{
+ WSREP_DBUG_ENTER(w);
+ return WSREP_OK;
+}
+
+static wsrep_status_t dummy_to_execute_start(
+ wsrep_t* w,
+ const wsrep_conn_id_t conn_id __attribute__((unused)),
+ const wsrep_key_t* key __attribute__((unused)),
+ const size_t key_len __attribute__((unused)),
+ const void* query __attribute__((unused)),
+ const size_t query_len __attribute__((unused)),
+ wsrep_seqno_t* seqno __attribute__((unused)))
+{
+ WSREP_DBUG_ENTER(w);
+ return WSREP_OK;
+}
+
+static wsrep_status_t dummy_to_execute_end(
+ wsrep_t* w,
+ const wsrep_conn_id_t conn_id __attribute__((unused)))
+{
+ WSREP_DBUG_ENTER(w);
+ return WSREP_OK;
+}
+
+static wsrep_status_t dummy_sst_sent(
+ wsrep_t* w,
+ const wsrep_uuid_t* uuid __attribute__((unused)),
+ wsrep_seqno_t seqno __attribute__((unused)))
+{
+ WSREP_DBUG_ENTER(w);
+ return WSREP_OK;
+}
+
+static wsrep_status_t dummy_sst_received(
+ wsrep_t* w,
+ const wsrep_uuid_t* uuid __attribute__((unused)),
+ const wsrep_seqno_t seqno __attribute__((unused)),
+ const char* state __attribute__((unused)),
+ const size_t state_len __attribute__((unused)))
+{
+ WSREP_DBUG_ENTER(w);
+ return WSREP_OK;
+}
+
+static wsrep_status_t dummy_snapshot(
+ wsrep_t* w,
+ const void* msg __attribute__((unused)),
+ const size_t msg_len __attribute__((unused)),
+ const char* donor_spec __attribute__((unused)))
+{
+ WSREP_DBUG_ENTER(w);
+ return WSREP_OK;
+}
+
+static struct wsrep_stats_var dummy_stats[] = {
+ { NULL, WSREP_VAR_STRING, { 0 } }
+};
+
+static struct wsrep_stats_var* dummy_stats_get (wsrep_t* w)
+{
+ WSREP_DBUG_ENTER(w);
+ return dummy_stats;
+}
+
+static void dummy_stats_free (
+ wsrep_t* w,
+ struct wsrep_stats_var* stats __attribute__((unused)))
+{
+ WSREP_DBUG_ENTER(w);
+}
+
+static wsrep_seqno_t dummy_pause (wsrep_t* w)
+{
+ WSREP_DBUG_ENTER(w);
+ return -ENOSYS;
+}
+
+static wsrep_status_t dummy_resume (wsrep_t* w)
+{
+ WSREP_DBUG_ENTER(w);
+ return WSREP_OK;
+}
+
+static wsrep_status_t dummy_desync (wsrep_t* w)
+{
+ WSREP_DBUG_ENTER(w);
+ return WSREP_NOT_IMPLEMENTED;
+}
+
+static wsrep_status_t dummy_resync (wsrep_t* w)
+{
+ WSREP_DBUG_ENTER(w);
+ return WSREP_OK;
+}
+
+static wsrep_status_t dummy_lock (wsrep_t* w,
+ const char* s __attribute__((unused)),
+ int64_t o __attribute__((unused)),
+ int64_t t __attribute__((unused)))
+{
+ WSREP_DBUG_ENTER(w);
+ return WSREP_NOT_IMPLEMENTED;
+}
+
+static wsrep_status_t dummy_unlock (wsrep_t* w,
+ const char* s __attribute__((unused)),
+ int64_t o __attribute__((unused)))
+{
+ WSREP_DBUG_ENTER(w);
+ return WSREP_OK;
+}
+
+static bool dummy_is_locked (wsrep_t* w,
+ const char* s __attribute__((unused)),
+ int64_t* o __attribute__((unused)),
+ wsrep_uuid_t* t __attribute__((unused)))
+{
+ WSREP_DBUG_ENTER(w);
+ return false;
+}
+
+static wsrep_t dummy_iface = {
+ WSREP_INTERFACE_VERSION,
+ &dummy_init,
+ &dummy_capabilities,
+ &dummy_options_set,
+ &dummy_options_get,
+ &dummy_connect,
+ &dummy_disconnect,
+ &dummy_recv,
+ &dummy_pre_commit,
+ &dummy_post_commit,
+ &dummy_post_rollback,
+ &dummy_replay_trx,
+ &dummy_abort_pre_commit,
+ &dummy_append_query,
+ &dummy_append_row_key,
+ &dummy_append_data,
+ &dummy_causal_read,
+ &dummy_free_connection,
+ &dummy_to_execute_start,
+ &dummy_to_execute_end,
+ &dummy_sst_sent,
+ &dummy_sst_received,
+ &dummy_snapshot,
+ &dummy_stats_get,
+ &dummy_stats_free,
+ &dummy_pause,
+ &dummy_resume,
+ &dummy_desync,
+ &dummy_resync,
+ &dummy_lock,
+ &dummy_unlock,
+ &dummy_is_locked,
+ WSREP_NONE,
+ WSREP_INTERFACE_VERSION,
+ "Codership Oy <info@codership.com>",
+ &dummy_free,
+ NULL,
+ NULL
+};
+
+int wsrep_dummy_loader(wsrep_t* w)
+{
+ if (!w)
+ return EINVAL;
+
+ *w = dummy_iface;
+
+ // allocate private context
+ if (!(w->ctx = malloc(sizeof(wsrep_dummy_t))))
+ return ENOMEM;
+
+ // initialize private context
+ WSREP_DUMMY(w)->log_fn = NULL;
+
+ return 0;
+}
+
diff --git a/wsrep/wsrep_loader.c b/wsrep/wsrep_loader.c
new file mode 100644
index 00000000000..b4460658f80
--- /dev/null
+++ b/wsrep/wsrep_loader.c
@@ -0,0 +1,199 @@
+/* Copyright (C) 2009-2011 Codership Oy <info@codersihp.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+/*! @file wsrep implementation loader */
+
+#include <dlfcn.h>
+#include <errno.h>
+#include <string.h>
+#include <stdio.h>
+
+#include "wsrep_api.h"
+
+// Logging stuff for the loader
+static const char* log_levels[] = {"FATAL", "ERROR", "WARN", "INFO", "DEBUG"};
+
+static void default_logger (wsrep_log_level_t lvl, const char* msg)
+{
+ fprintf (stderr, "wsrep loader: [%s] %s\n", log_levels[lvl], msg);
+}
+
+static wsrep_log_cb_t logger = default_logger;
+
+/**************************************************************************
+ * Library loader
+ **************************************************************************/
+
+static int verify(const wsrep_t *wh, const char *iface_ver)
+{
+ const size_t msg_len = 128;
+ char msg[msg_len];
+
+#define VERIFY(_p) if (!(_p)) { \
+ snprintf(msg, msg_len, "wsrep_load(): verify(): %s\n", # _p); \
+ logger (WSREP_LOG_ERROR, msg); \
+ return EINVAL; \
+ }
+
+ VERIFY(wh);
+ VERIFY(wh->version);
+
+ if (strcmp(wh->version, iface_ver)) {
+ snprintf (msg, msg_len,
+ "provider interface version mismatch: need '%s', found '%s'",
+ iface_ver, wh->version);
+ logger (WSREP_LOG_ERROR, msg);
+ return EINVAL;
+ }
+
+ VERIFY(wh->init);
+ VERIFY(wh->options_set);
+ VERIFY(wh->options_get);
+ VERIFY(wh->connect);
+ VERIFY(wh->disconnect);
+ VERIFY(wh->recv);
+ VERIFY(wh->pre_commit);
+ VERIFY(wh->post_commit);
+ VERIFY(wh->post_rollback);
+ VERIFY(wh->replay_trx);
+ VERIFY(wh->abort_pre_commit);
+ VERIFY(wh->append_query);
+ VERIFY(wh->append_key);
+ VERIFY(wh->free_connection);
+ VERIFY(wh->to_execute_start);
+ VERIFY(wh->to_execute_end);
+ VERIFY(wh->sst_sent);
+ VERIFY(wh->sst_received);
+ VERIFY(wh->stats_get);
+ VERIFY(wh->stats_free);
+ VERIFY(wh->pause);
+ VERIFY(wh->resume);
+ VERIFY(wh->desync);
+ VERIFY(wh->resync);
+ VERIFY(wh->lock);
+ VERIFY(wh->unlock);
+ VERIFY(wh->is_locked);
+ VERIFY(wh->provider_name);
+ VERIFY(wh->provider_version);
+ VERIFY(wh->provider_vendor);
+ VERIFY(wh->free);
+ return 0;
+}
+
+
+static wsrep_loader_fun wsrep_dlf(void *dlh, const char *sym)
+{
+ union {
+ wsrep_loader_fun dlfun;
+ void *obj;
+ } alias;
+ alias.obj = dlsym(dlh, sym);
+ return alias.dlfun;
+}
+
+extern int wsrep_dummy_loader(wsrep_t *w);
+
+int wsrep_load(const char *spec, wsrep_t **hptr, wsrep_log_cb_t log_cb)
+{
+ int ret = 0;
+ void *dlh = NULL;
+ wsrep_loader_fun dlfun;
+ const size_t msg_len = 1024;
+ char msg[msg_len + 1];
+ msg[msg_len] = 0;
+
+ if (NULL != log_cb)
+ logger = log_cb;
+
+ if (!(spec && hptr))
+ return EINVAL;
+
+ snprintf (msg, msg_len,
+ "wsrep_load(): loading provider library '%s'", spec);
+ logger (WSREP_LOG_INFO, msg);
+
+ if (!(*hptr = malloc(sizeof(wsrep_t)))) {
+ logger (WSREP_LOG_FATAL, "wsrep_load(): out of memory");
+ return ENOMEM;
+ }
+
+ if (!spec || strcmp(spec, WSREP_NONE) == 0) {
+ if ((ret = wsrep_dummy_loader(*hptr)) != 0) {
+ free (*hptr);
+ *hptr = NULL;
+ }
+ return ret;
+ }
+
+ if (!(dlh = dlopen(spec, RTLD_NOW | RTLD_LOCAL))) {
+ snprintf(msg, msg_len, "wsrep_load(): dlopen(): %s", dlerror());
+ logger (WSREP_LOG_ERROR, msg);
+ ret = EINVAL;
+ goto out;
+ }
+
+ if (!(dlfun = wsrep_dlf(dlh, "wsrep_loader"))) {
+ ret = EINVAL;
+ goto out;
+ }
+
+ if ((ret = (*dlfun)(*hptr)) != 0) {
+ snprintf(msg, msg_len, "wsrep_load(): loader failed: %s",
+ strerror(ret));
+ logger (WSREP_LOG_ERROR, msg);
+ goto out;
+ }
+
+ if ((ret = verify(*hptr, WSREP_INTERFACE_VERSION)) != 0) {
+ snprintf (msg, msg_len,
+ "wsrep_load(): interface version mismatch: my version %s, "
+ "provider version %s", WSREP_INTERFACE_VERSION,
+ (*hptr)->version);
+ logger (WSREP_LOG_ERROR, msg);
+ goto out;
+ }
+
+ (*hptr)->dlh = dlh;
+
+out:
+ if (ret != 0) {
+ if (dlh) dlclose(dlh);
+ free(*hptr);
+ *hptr = NULL;
+ } else {
+ snprintf (msg, msg_len,
+ "wsrep_load(): %s %s by %s loaded succesfully.",
+ (*hptr)->provider_name, (*hptr)->provider_version,
+ (*hptr)->provider_vendor);
+ logger (WSREP_LOG_INFO, msg);
+ }
+
+ return ret;
+}
+
+void wsrep_unload(wsrep_t *hptr)
+{
+ if (!hptr) {
+ logger (WSREP_LOG_WARN, "wsrep_unload(): null pointer.");
+ } else {
+ if (hptr->free)
+ hptr->free(hptr);
+ if (hptr->dlh)
+ dlclose(hptr->dlh);
+ free(hptr);
+ }
+}
+
diff --git a/wsrep/wsrep_uuid.c b/wsrep/wsrep_uuid.c
new file mode 100644
index 00000000000..c99240cc071
--- /dev/null
+++ b/wsrep/wsrep_uuid.c
@@ -0,0 +1,78 @@
+/* Copyright (C) 2009 Codership Oy <info@codersihp.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+/*! @file Helper functions to deal with history UUID string representations */
+
+#include <errno.h>
+#include <ctype.h>
+#include <stdio.h>
+
+#include "wsrep_api.h"
+
+/*!
+ * Read UUID from string
+ * @return length of UUID string representation or -EINVAL in case of error
+ */
+ssize_t
+wsrep_uuid_scan (const char* str, size_t str_len, wsrep_uuid_t* uuid)
+{
+ size_t uuid_len = 0;
+ size_t uuid_offt = 0;
+
+ while (uuid_len + 1 < str_len) {
+ if ((4 == uuid_offt || 6 == uuid_offt || 8 == uuid_offt ||
+ 10 == uuid_offt) && str[uuid_len] == '-') {
+ // skip dashes after 4th, 6th, 8th and 10th positions
+ uuid_len += 1;
+ continue;
+ }
+ if (isxdigit(str[uuid_len]) && isxdigit(str[uuid_len + 1])) {
+ // got hex digit
+ sscanf (str + uuid_len, "%2hhx", uuid->uuid + uuid_offt);
+ uuid_len += 2;
+ uuid_offt += 1;
+ if (sizeof (uuid->uuid) == uuid_offt)
+ return uuid_len;
+ }
+ else {
+ break;
+ }
+ }
+
+ *uuid = WSREP_UUID_UNDEFINED;
+ return -EINVAL;
+}
+
+/*!
+ * Write UUID to string
+ * @return length of UUID string representation or -EMSGSIZE if string is too
+ * short
+ */
+ssize_t
+wsrep_uuid_print (const wsrep_uuid_t* uuid, char* str, size_t str_len)
+{
+ if (str_len > 36) {
+ const unsigned char* u = uuid->uuid;
+ return snprintf(str, str_len, "%02x%02x%02x%02x-%02x%02x-%02x%02x-"
+ "%02x%02x-%02x%02x%02x%02x%02x%02x",
+ u[ 0], u[ 1], u[ 2], u[ 3], u[ 4], u[ 5], u[ 6], u[ 7],
+ u[ 8], u[ 9], u[10], u[11], u[12], u[13], u[14], u[15]);
+ }
+ else {
+ return -EMSGSIZE;
+ }
+}
+