summaryrefslogtreecommitdiff
path: root/sql/rpl_handler.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_handler.cc')
-rw-r--r--sql/rpl_handler.cc555
1 files changed, 0 insertions, 555 deletions
diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc
deleted file mode 100644
index 27e411ca6de..00000000000
--- a/sql/rpl_handler.cc
+++ /dev/null
@@ -1,555 +0,0 @@
-/* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; version 2 of the License.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
-
-#include "mariadb.h"
-#include "sql_priv.h"
-#include "unireg.h"
-
-#include "rpl_mi.h"
-#include "sql_repl.h"
-#include "log_event.h"
-#include "rpl_filter.h"
-#include <my_dir.h>
-#include "rpl_handler.h"
-
-Trans_delegate *transaction_delegate;
-Binlog_storage_delegate *binlog_storage_delegate;
-#ifdef HAVE_REPLICATION
-Binlog_transmit_delegate *binlog_transmit_delegate;
-Binlog_relay_IO_delegate *binlog_relay_io_delegate;
-#endif /* HAVE_REPLICATION */
-
-/*
- structure to save transaction log filename and position
-*/
-typedef struct Trans_binlog_info {
- my_off_t log_pos;
- char log_file[FN_REFLEN];
-} Trans_binlog_info;
-
-int get_user_var_int(const char *name,
- long long int *value, int *null_value)
-{
- bool null_val;
- user_var_entry *entry=
- (user_var_entry*) my_hash_search(&current_thd->user_vars,
- (uchar*) name, strlen(name));
- if (!entry)
- return 1;
- *value= entry->val_int(&null_val);
- if (null_value)
- *null_value= null_val;
- return 0;
-}
-
-int get_user_var_real(const char *name,
- double *value, int *null_value)
-{
- bool null_val;
- user_var_entry *entry=
- (user_var_entry*) my_hash_search(&current_thd->user_vars,
- (uchar*) name, strlen(name));
- if (!entry)
- return 1;
- *value= entry->val_real(&null_val);
- if (null_value)
- *null_value= null_val;
- return 0;
-}
-
-int get_user_var_str(const char *name, char *value,
- size_t len, unsigned int precision, int *null_value)
-{
- String str;
- bool null_val;
- user_var_entry *entry=
- (user_var_entry*) my_hash_search(&current_thd->user_vars,
- (uchar*) name, strlen(name));
- if (!entry)
- return 1;
- entry->val_str(&null_val, &str, precision);
- strncpy(value, str.c_ptr(), len);
- if (null_value)
- *null_value= null_val;
- return 0;
-}
-
-int delegates_init()
-{
- static my_aligned_storage<sizeof(Trans_delegate), MY_ALIGNOF(long)> trans_mem;
- static my_aligned_storage<sizeof(Binlog_storage_delegate),
- MY_ALIGNOF(long)> storage_mem;
-#ifdef HAVE_REPLICATION
- static my_aligned_storage<sizeof(Binlog_transmit_delegate),
- MY_ALIGNOF(long)> transmit_mem;
- static my_aligned_storage<sizeof(Binlog_relay_IO_delegate),
- MY_ALIGNOF(long)> relay_io_mem;
-#endif
-
- void *place_trans_mem= trans_mem.data;
- void *place_storage_mem= storage_mem.data;
-
- transaction_delegate= new (place_trans_mem) Trans_delegate;
-
- if (!transaction_delegate->is_inited())
- {
- sql_print_error("Initialization of transaction delegates failed. "
- "Please report a bug.");
- return 1;
- }
-
- binlog_storage_delegate= new (place_storage_mem) Binlog_storage_delegate;
-
- if (!binlog_storage_delegate->is_inited())
- {
- sql_print_error("Initialization binlog storage delegates failed. "
- "Please report a bug.");
- return 1;
- }
-
-#ifdef HAVE_REPLICATION
- void *place_transmit_mem= transmit_mem.data;
- void *place_relay_io_mem= relay_io_mem.data;
-
- binlog_transmit_delegate= new (place_transmit_mem) Binlog_transmit_delegate;
-
- if (!binlog_transmit_delegate->is_inited())
- {
- sql_print_error("Initialization of binlog transmit delegates failed. "
- "Please report a bug.");
- return 1;
- }
-
- binlog_relay_io_delegate= new (place_relay_io_mem) Binlog_relay_IO_delegate;
-
- if (!binlog_relay_io_delegate->is_inited())
- {
- sql_print_error("Initialization binlog relay IO delegates failed. "
- "Please report a bug.");
- return 1;
- }
-#endif
-
- return 0;
-}
-
-void delegates_destroy()
-{
- if (transaction_delegate)
- transaction_delegate->~Trans_delegate();
- transaction_delegate= 0;
- if (binlog_storage_delegate)
- binlog_storage_delegate->~Binlog_storage_delegate();
- binlog_storage_delegate= 0;
-#ifdef HAVE_REPLICATION
- if (binlog_transmit_delegate)
- binlog_transmit_delegate->~Binlog_transmit_delegate();
- binlog_transmit_delegate= 0;
- if (binlog_relay_io_delegate)
- binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
- binlog_relay_io_delegate= 0;
-#endif /* HAVE_REPLICATION */
-}
-
-/*
- This macro is used by almost all the Delegate methods to iterate
- over all the observers running given callback function of the
- delegate.
- */
-#define FOREACH_OBSERVER(r, f, do_lock, args) \
- param.server_id= thd->variables.server_id; \
- read_lock(); \
- Observer_info_iterator iter= observer_info_iter(); \
- Observer_info *info= iter++; \
- for (; info; info= iter++) \
- { \
- if (((Observer *)info->observer)->f \
- && ((Observer *)info->observer)->f args) \
- { \
- r= 1; \
- sql_print_error("Run function '" #f "' failed"); \
- break; \
- } \
- } \
- unlock();
-
-
-int Trans_delegate::after_commit(THD *thd, bool all)
-{
- Trans_param param;
- Trans_binlog_info *log_info;
- bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
- int ret= 0;
-
- param.flags = is_real_trans ? TRANS_IS_REAL_TRANS : 0;
-
- log_info= thd->semisync_info;
-
- param.log_file= log_info && log_info->log_file[0] ? log_info->log_file : 0;
- param.log_pos= log_info ? log_info->log_pos : 0;
-
- FOREACH_OBSERVER(ret, after_commit, false, (&param));
-
- /*
- This is the end of a real transaction or autocommit statement, we
- can mark the memory unused.
- */
- if (is_real_trans && log_info)
- {
- log_info->log_file[0]= 0;
- log_info->log_pos= 0;
- }
- return ret;
-}
-
-int Trans_delegate::after_rollback(THD *thd, bool all)
-{
- Trans_param param;
- Trans_binlog_info *log_info;
- bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
- int ret= 0;
-
- param.flags = is_real_trans ? TRANS_IS_REAL_TRANS : 0;
-
- log_info= thd->semisync_info;
-
- param.log_file= log_info && log_info->log_file[0] ? log_info->log_file : 0;
- param.log_pos= log_info ? log_info->log_pos : 0;
-
- FOREACH_OBSERVER(ret, after_rollback, false, (&param));
-
- /*
- This is the end of a real transaction or autocommit statement, we
- can mark the memory unused.
- */
- if (is_real_trans && log_info)
- {
- log_info->log_file[0]= 0;
- log_info->log_pos= 0;
- }
- return ret;
-}
-
-int Binlog_storage_delegate::after_flush(THD *thd,
- const char *log_file,
- my_off_t log_pos,
- bool synced,
- bool first_in_group,
- bool last_in_group)
-{
- Binlog_storage_param param;
- Trans_binlog_info *log_info;
- uint32 flags=0;
- int ret= 0;
-
- if (synced)
- flags |= BINLOG_STORAGE_IS_SYNCED;
- if (first_in_group)
- flags|= BINLOG_GROUP_COMMIT_LEADER;
- if (last_in_group)
- flags|= BINLOG_GROUP_COMMIT_TRAILER;
-
- if (!(log_info= thd->semisync_info))
- {
- if(!(log_info=
- (Trans_binlog_info*) my_malloc(sizeof(Trans_binlog_info), MYF(0))))
- return 1;
- thd->semisync_info= log_info;
- }
-
- strmake_buf(log_info->log_file, log_file+dirname_length(log_file));
- log_info->log_pos = log_pos;
-
- FOREACH_OBSERVER(ret, after_flush, false,
- (&param, log_info->log_file, log_info->log_pos, flags));
- return ret;
-}
-
-int Binlog_storage_delegate::after_sync(THD *thd,
- const char *log_file,
- my_off_t log_pos,
- bool first_in_group,
- bool last_in_group)
-{
- Binlog_storage_param param;
- uint32 flags=0;
-
- if (first_in_group)
- flags|= BINLOG_GROUP_COMMIT_LEADER;
- if (last_in_group)
- flags|= BINLOG_GROUP_COMMIT_TRAILER;
-
- int ret= 0;
- FOREACH_OBSERVER(ret, after_sync, false,
- (&param, log_file+dirname_length(log_file), log_pos, flags));
-
- return ret;
-}
-
-#ifdef HAVE_REPLICATION
-int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
- const char *log_file,
- my_off_t log_pos)
-{
- Binlog_transmit_param param;
- param.flags= flags;
-
- int ret= 0;
- FOREACH_OBSERVER(ret, transmit_start, true, (&param, log_file, log_pos));
- return ret;
-}
-
-int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
-{
- Binlog_transmit_param param;
- param.flags= flags;
-
- int ret= 0;
- FOREACH_OBSERVER(ret, transmit_stop, false, (&param));
- return ret;
-}
-
-int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
- String *packet)
-{
- /* NOTE2ME: Maximum extra header size for each observer, I hope 32
- bytes should be enough for each Observer to reserve their extra
- header. If later found this is not enough, we can increase this
- /HEZX
- */
-#define RESERVE_HEADER_SIZE 32
- unsigned char header[RESERVE_HEADER_SIZE];
- ulong hlen;
- Binlog_transmit_param param;
- param.flags= flags;
- param.server_id= thd->variables.server_id;
-
- int ret= 0;
- read_lock();
- Observer_info_iterator iter= observer_info_iter();
- Observer_info *info= iter++;
- for (; info; info= iter++)
- {
- hlen= 0;
- if (((Observer *)info->observer)->reserve_header
- && ((Observer *)info->observer)->reserve_header(&param,
- header,
- RESERVE_HEADER_SIZE,
- &hlen))
- {
- ret= 1;
- break;
- }
- if (hlen == 0)
- continue;
- if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen))
- {
- ret= 1;
- break;
- }
- }
- unlock();
- return ret;
-}
-
-int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
- String *packet,
- const char *log_file,
- my_off_t log_pos)
-{
- Binlog_transmit_param param;
- param.flags= flags;
-
- int ret= 0;
- FOREACH_OBSERVER(ret, before_send_event, false,
- (&param, (uchar *)packet->c_ptr(),
- packet->length(),
- log_file+dirname_length(log_file), log_pos));
- return ret;
-}
-
-int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
- String *packet)
-{
- Binlog_transmit_param param;
- param.flags= flags;
-
- int ret= 0;
- FOREACH_OBSERVER(ret, after_send_event, false,
- (&param, packet->c_ptr(), packet->length()));
- return ret;
-}
-
-int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
-
-{
- Binlog_transmit_param param;
- param.flags= flags;
-
- int ret= 0;
- FOREACH_OBSERVER(ret, after_reset_master, false, (&param));
- return ret;
-}
-
-void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
- Master_info *mi)
-{
- param->mysql= mi->mysql;
- param->user= mi->user;
- param->host= mi->host;
- param->port= mi->port;
- param->master_log_name= mi->master_log_name;
- param->master_log_pos= mi->master_log_pos;
-}
-
-int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
-{
- Binlog_relay_IO_param param;
- init_param(&param, mi);
-
- int ret= 0;
- FOREACH_OBSERVER(ret, thread_start, true, (&param));
- return ret;
-}
-
-
-int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
-{
-
- Binlog_relay_IO_param param;
- init_param(&param, mi);
-
- int ret= 0;
- FOREACH_OBSERVER(ret, thread_stop, false, (&param));
- return ret;
-}
-
-int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
- Master_info *mi,
- ushort flags)
-{
- Binlog_relay_IO_param param;
- init_param(&param, mi);
-
- int ret= 0;
- FOREACH_OBSERVER(ret, before_request_transmit, false, (&param, (uint32)flags));
- return ret;
-}
-
-int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
- const char *packet, ulong len,
- const char **event_buf,
- ulong *event_len)
-{
- Binlog_relay_IO_param param;
- init_param(&param, mi);
-
- int ret= 0;
- FOREACH_OBSERVER(ret, after_read_event, false,
- (&param, packet, len, event_buf, event_len));
- return ret;
-}
-
-int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
- const char *event_buf,
- ulong event_len,
- bool synced)
-{
- Binlog_relay_IO_param param;
- init_param(&param, mi);
-
- uint32 flags=0;
- if (synced)
- flags |= BINLOG_STORAGE_IS_SYNCED;
-
- int ret= 0;
- FOREACH_OBSERVER(ret, after_queue_event, false,
- (&param, event_buf, event_len, flags));
- return ret;
-}
-
-int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
-
-{
- Binlog_relay_IO_param param;
- init_param(&param, mi);
-
- int ret= 0;
- FOREACH_OBSERVER(ret, after_reset_slave, false, (&param));
- return ret;
-}
-#endif /* HAVE_REPLICATION */
-
-int register_trans_observer(Trans_observer *observer, void *p)
-{
- return transaction_delegate->add_observer(observer, (st_plugin_int *)p);
-}
-
-int unregister_trans_observer(Trans_observer *observer, void *p)
-{
- return transaction_delegate->remove_observer(observer, (st_plugin_int *)p);
-}
-
-int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
-{
- return binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
-}
-
-int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
-{
- return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p);
-}
-
-#ifdef HAVE_REPLICATION
-int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
-{
- return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p);
-}
-
-int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
-{
- return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p);
-}
-
-int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
-{
- return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p);
-}
-
-int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
-{
- return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
-}
-#else
-int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
-{
- return 0;
-}
-
-int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
-{
- return 0;
-}
-
-int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
-{
- return 0;
-}
-
-int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
-{
- return 0;
-}
-#endif /* HAVE_REPLICATION */