summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <sasha@mysql.sashanet.com>2000-10-01 21:28:21 -0600
committerunknown <sasha@mysql.sashanet.com>2000-10-01 21:28:21 -0600
commit425252771bcb784274c28dd68ca31ec1aeba487e (patch)
tree466e8f523c9a860c721a78cc88462a999b93c856 /sql
parentb595ea8c2d29f7281bb50b47ccd436ada6a52df9 (diff)
downloadmariadb-git-425252771bcb784274c28dd68ca31ec1aeba487e.tar.gz
added sql_repl.* officially - fixing an oops - forgot to actually add
it earlier
Diffstat (limited to 'sql')
-rw-r--r--sql/sql_repl.cc528
-rw-r--r--sql/sql_repl.h21
2 files changed, 549 insertions, 0 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
new file mode 100644
index 00000000000..45d225e698f
--- /dev/null
+++ b/sql/sql_repl.cc
@@ -0,0 +1,528 @@
+/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+// Sasha Pachev <sasha@mysql.com> is currently in charge of this file
+// Do not mess with it without his permission!
+
+#include "mysql_priv.h"
+#include "sql_repl.h"
+#include "sql_acl.h"
+#include "log_event.h"
+
+extern const char* any_db;
+extern pthread_handler_decl(handle_slave,arg);
+
+static int send_file(THD *thd)
+{
+ NET* net = &thd->net;
+ int fd = -1,bytes, error = 1;
+ char fname[FN_REFLEN+1];
+ char buf[IO_SIZE*15];
+ const char *errmsg = 0;
+ int old_timeout;
+ DBUG_ENTER("send_file");
+
+ // the client might be slow loading the data, give him wait_timeout to do
+ // the job
+ old_timeout = thd->net.timeout;
+ thd->net.timeout = thd->inactive_timeout;
+
+ // we need net_flush here because the client will not know it needs to send
+ // us the file name until it has processed the load event entry
+ if (net_flush(net) || my_net_read(net) == packet_error)
+ {
+ errmsg = "Failed reading file name";
+ goto err;
+ }
+
+ fn_format(fname, (char*)net->read_pos + 1, "", "", 4);
+ // this is needed to make replicate-ignore-db
+ if (!strcmp(fname,"/dev/null"))
+ goto end;
+ // TODO: work on the well-known system that does not have a /dev/null :-)
+
+ if ((fd = my_open(fname, O_RDONLY, MYF(MY_WME))) < 0)
+ {
+ errmsg = "Failed on my_open()";
+ goto err;
+ }
+
+ while ((bytes = (int) my_read(fd, (byte*) buf, sizeof(buf),
+ MYF(MY_WME))) > 0)
+ {
+ if (my_net_write(net, buf, bytes))
+ {
+ errmsg = "Failed on my_net_write()";
+ goto err;
+ }
+ }
+
+ end:
+ if (my_net_write(net, "", 0) || net_flush(net) ||
+ (my_net_read(net) == packet_error))
+ {
+ errmsg = "failed negotiating file transfer close";
+ goto err;
+ }
+ error = 0;
+
+ err:
+ thd->net.timeout = old_timeout;
+ if(fd >= 0)
+ (void) my_close(fd, MYF(MY_WME));
+ if (errmsg)
+ {
+ sql_print_error("failed in send_file() : %s", errmsg);
+ DBUG_PRINT("error", (errmsg));
+ }
+ DBUG_RETURN(error);
+}
+
+void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
+{
+ LOG_INFO linfo;
+ char *log_file_name = linfo.log_file_name;
+ char search_file_name[FN_REFLEN];
+ FILE* log = NULL;
+ String* packet = &thd->packet;
+ int error;
+ const char *errmsg = "Unknown error";
+ NET* net = &thd->net;
+
+ DBUG_ENTER("mysql_binlog_send");
+
+ if(!mysql_bin_log.is_open())
+ {
+ errmsg = "Binary log is not open";
+ goto err;
+ }
+
+ if(log_ident[0])
+ mysql_bin_log.make_log_name(search_file_name, log_ident);
+ else
+ search_file_name[0] = 0;
+
+ if(mysql_bin_log.find_first_log(&linfo, search_file_name))
+ {
+ errmsg = "Could not find first log";
+ goto err;
+ }
+ log = my_fopen(log_file_name, O_RDONLY, MYF(MY_WME));
+
+ if(!log)
+ {
+ errmsg = "Could not open log file";
+ goto err;
+ }
+
+ if(my_fseek(log, pos, MY_SEEK_SET, MYF(MY_WME)) == MY_FILEPOS_ERROR )
+ {
+ errmsg = "Error on fseek()";
+ goto err;
+ }
+
+
+ packet->length(0);
+ packet->append("\0", 1);
+ // we need to start a packet with something other than 255
+ // to distiquish it from error
+
+ while(!net->error && net->vio != 0 && !thd->killed)
+ {
+ while(!(error = Log_event::read_log_event(log, packet)))
+ {
+ if(my_net_write(net, (char*)packet->ptr(), packet->length()) )
+ {
+ errmsg = "Failed on my_net_write()";
+ goto err;
+ }
+ DBUG_PRINT("info", ("log event code %d",(*packet)[LOG_EVENT_OFFSET+1] ));
+ if((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+ {
+ if(send_file(thd))
+ {
+ errmsg = "failed in send_file()";
+ goto err;
+ }
+ }
+ packet->length(0);
+ packet->append("\0",1);
+ }
+ if(error != LOG_READ_EOF)
+ {
+ errmsg = "error reading log event";
+ goto err;
+ }
+
+ if(!(flags & BINLOG_DUMP_NON_BLOCK) && mysql_bin_log.is_active(log_file_name))
+ // block until there is more data in the log
+ // unless non-blocking mode requested
+ {
+ if(net_flush(net))
+ {
+ errmsg = "failed on net_flush()";
+ goto err;
+ }
+
+ // we may have missed the update broadcast from the log
+ // that has just happened, let's try to catch it if it did
+ // if we did not miss anything, we just wait for other threads
+ // to signal us
+ {
+ pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
+ clearerr(log);
+
+ // tell the kill thread how to wake us up
+ pthread_mutex_lock(&thd->mysys_var->mutex);
+ thd->mysys_var->current_mutex = log_lock;
+ thd->mysys_var->current_cond = &COND_binlog_update;
+ const char* proc_info = thd->proc_info;
+ thd->proc_info = "Waiting for update";
+ pthread_mutex_unlock(&thd->mysys_var->mutex);
+
+ bool read_packet = 0, fatal_error = 0;
+
+ pthread_mutex_lock(log_lock); // no one will update the log while we are reading
+ // now, but we'll be quick and just read one record
+
+
+ switch(Log_event::read_log_event(log, packet))
+ {
+ case 0:
+ read_packet = 1; // we read successfully, so we'll need to send it to the
+ // slave
+ break;
+ case LOG_READ_EOF:
+ pthread_cond_wait(&COND_binlog_update, log_lock);
+ break;
+
+ default:
+ fatal_error = 1;
+ break;
+ }
+
+ pthread_mutex_unlock(log_lock);
+
+ pthread_mutex_lock(&thd->mysys_var->mutex);
+ thd->mysys_var->current_mutex= 0;
+ thd->mysys_var->current_cond= 0;
+ thd->proc_info= proc_info;
+ pthread_mutex_unlock(&thd->mysys_var->mutex);
+
+ if(read_packet)
+ {
+ if(my_net_write(net, (char*)packet->ptr(), packet->length()) )
+ {
+ errmsg = "Failed on my_net_write()";
+ goto err;
+ }
+
+ if((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+ {
+ if(send_file(thd))
+ {
+ errmsg = "failed in send_file()";
+ goto err;
+ }
+ }
+ packet->length(0);
+ packet->append("\0",1);
+ // no need to net_flush because we will get to flush later when
+ // we hit EOF pretty quick
+ }
+
+ if(fatal_error)
+ {
+ errmsg = "error reading log entry";
+ goto err;
+ }
+
+ clearerr(log);
+ }
+ }
+ else
+ {
+ bool loop_breaker = 0; // need this to break out of the for loop from switch
+
+ switch(mysql_bin_log.find_next_log(&linfo))
+ {
+ case LOG_INFO_EOF:
+ loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
+ break;
+ case 0:
+ break;
+ default:
+ errmsg = "could not find next log";
+ goto err;
+ }
+
+ if(loop_breaker)
+ break;
+
+ (void) my_fclose(log, MYF(MY_WME));
+ log = my_fopen(log_file_name, O_RDONLY, MYF(MY_WME));
+ if(!log)
+ goto err;
+ // fake Rotate_log event just in case it did not make it to the log
+ // otherwise the slave make get confused about the offset
+ {
+ char header[LOG_EVENT_HEADER_LEN];
+ memset(header, 0, 4); // when does not matter
+ header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
+ char* p = strrchr(log_file_name, FN_LIBCHAR); // find the last slash
+ if(p)
+ p++;
+ else
+ p = log_file_name;
+
+ uint ident_len = (uint) strlen(p);
+ ulong event_len = ident_len + sizeof(header);
+ int4store(header + EVENT_TYPE_OFFSET + 1, server_id);
+ int4store(header + EVENT_LEN_OFFSET, event_len);
+ packet->append(header, sizeof(header));
+ packet->append(p,ident_len);
+ if(my_net_write(net, (char*)packet->ptr(), packet->length()))
+ {
+ errmsg = "failed on my_net_write()";
+ goto err;
+ }
+ packet->length(0);
+ packet->append("\0",1);
+ }
+ }
+ }
+
+ (void)my_fclose(log, MYF(MY_WME));
+
+ send_eof(&thd->net);
+ DBUG_VOID_RETURN;
+ err:
+ if(log)
+ (void) my_fclose(log, MYF(MY_WME));
+ send_error(&thd->net, 0, errmsg);
+ DBUG_VOID_RETURN;
+}
+
+int start_slave(THD* thd , bool net_report)
+{
+ if(!thd) thd = current_thd;
+ NET* net = &thd->net;
+ const char* err = 0;
+ if (check_access(thd, PROCESS_ACL, any_db))
+ return 1;
+ pthread_mutex_lock(&LOCK_slave);
+ if(!slave_running)
+ if(glob_mi.inited && glob_mi.host)
+ {
+ pthread_t hThread;
+ if(pthread_create(&hThread, &connection_attrib, handle_slave, 0))
+ {
+ err = "cannot create slave thread";
+ }
+ }
+ else
+ err = "Master host not set or master info not initialized";
+ else
+ err = "Slave already running";
+
+ pthread_mutex_unlock(&LOCK_slave);
+ if(err)
+ {
+ if(net_report) send_error(net, 0, err);
+ return 1;
+ }
+ else if(net_report)
+ send_ok(net);
+
+ return 0;
+}
+
+int stop_slave(THD* thd, bool net_report )
+{
+ if(!thd) thd = current_thd;
+ NET* net = &thd->net;
+ const char* err = 0;
+
+ if (check_access(thd, PROCESS_ACL, any_db))
+ return 1;
+
+ pthread_mutex_lock(&LOCK_slave);
+ if (slave_running)
+ {
+ abort_slave = 1;
+ thr_alarm_kill(slave_real_id);
+ // do not abort the slave in the middle of a query, so we do not set
+ // thd->killed for the slave thread
+ thd->proc_info = "waiting for slave to die";
+ pthread_cond_wait(&COND_slave_stopped, &LOCK_slave);
+ }
+ else
+ err = "Slave is not running";
+
+ pthread_mutex_unlock(&LOCK_slave);
+ thd->proc_info = 0;
+
+ if(err)
+ {
+ if(net_report) send_error(net, 0, err);
+ return 1;
+ }
+ else if(net_report)
+ send_ok(net);
+
+ return 0;
+}
+
+void reset_slave()
+{
+ MY_STAT stat_area;
+ char fname[FN_REFLEN];
+ bool slave_was_running = slave_running;
+
+ if(slave_running)
+ stop_slave(0,0);
+
+ fn_format(fname, master_info_file, mysql_data_home, "", 4+16+32);
+ if(my_stat(fname, &stat_area, MYF(0)))
+ if(my_delete(fname, MYF(MY_WME)))
+ return;
+
+ if(slave_was_running)
+ start_slave(0,0);
+}
+
+void kill_zombie_dump_threads(uint32 slave_server_id)
+{
+ pthread_mutex_lock(&LOCK_thread_count);
+ pthread_mutex_unlock(&LOCK_thread_count);
+}
+
+int change_master(THD* thd)
+{
+ bool slave_was_running;
+ // kill slave thread
+ pthread_mutex_lock(&LOCK_slave);
+ if((slave_was_running = slave_running))
+ {
+ abort_slave = 1;
+ thr_alarm_kill(slave_real_id);
+ thd->proc_info = "waiting for slave to die";
+ pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done
+ }
+ pthread_mutex_unlock(&LOCK_slave);
+ thd->proc_info = "changing master";
+ LEX_MASTER_INFO* lex_mi = &thd->lex.mi;
+
+ if(!glob_mi.inited)
+ init_master_info(&glob_mi);
+
+ pthread_mutex_lock(&glob_mi.lock);
+ if((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
+ {
+ // if we change host or port, we must reset the postion
+ glob_mi.log_file_name[0] = 0;
+ glob_mi.pos = 0;
+ }
+
+ if(lex_mi->log_file_name)
+ strmake(glob_mi.log_file_name, lex_mi->log_file_name,
+ sizeof(glob_mi.log_file_name));
+ if(lex_mi->pos)
+ glob_mi.pos = lex_mi->pos;
+
+ if(lex_mi->host)
+ strmake(glob_mi.host, lex_mi->host, sizeof(glob_mi.host));
+ if(lex_mi->user)
+ strmake(glob_mi.user, lex_mi->user, sizeof(glob_mi.user));
+ if(lex_mi->password)
+ strmake(glob_mi.password, lex_mi->password, sizeof(glob_mi.password));
+ if(lex_mi->port)
+ glob_mi.port = lex_mi->port;
+ if(lex_mi->connect_retry)
+ glob_mi.connect_retry = lex_mi->connect_retry;
+
+ flush_master_info(&glob_mi);
+ pthread_mutex_unlock(&glob_mi.lock);
+ thd->proc_info = "starting slave";
+ if(slave_was_running)
+ start_slave(0,0);
+ thd->proc_info = 0;
+
+ send_ok(&thd->net);
+ return 0;
+}
+
+void reset_master()
+{
+ if(!mysql_bin_log.is_open())
+ {
+ my_error(ER_FLUSH_MASTER_BINLOG_CLOSED, MYF(ME_BELL+ME_WAITTANG));
+ return;
+ }
+
+ LOG_INFO linfo;
+ if (mysql_bin_log.find_first_log(&linfo, ""))
+ return;
+
+ for(;;)
+ {
+ my_delete(linfo.log_file_name, MYF(MY_WME));
+ if (mysql_bin_log.find_next_log(&linfo))
+ break;
+ }
+ mysql_bin_log.close(1); // exiting close
+ my_delete(mysql_bin_log.get_index_fname(), MYF(MY_WME));
+ mysql_bin_log.open(opt_bin_logname,LOG_BIN);
+
+}
+
+int show_binlog_info(THD* thd)
+{
+ DBUG_ENTER("show_binlog_info");
+ List<Item> field_list;
+ field_list.push_back(new Item_empty_string("File", FN_REFLEN));
+ field_list.push_back(new Item_empty_string("Position",20));
+ field_list.push_back(new Item_empty_string("Binlog_do_db",20));
+ field_list.push_back(new Item_empty_string("Binlog_ignore_db",20));
+
+ if(send_fields(thd, field_list, 1))
+ DBUG_RETURN(-1);
+ String* packet = &thd->packet;
+ packet->length(0);
+
+ if(mysql_bin_log.is_open())
+ {
+ LOG_INFO li;
+ mysql_bin_log.get_current_log(&li);
+ net_store_data(packet, li.log_file_name);
+ net_store_data(packet, (longlong)li.pos);
+ net_store_data(packet, &binlog_do_db);
+ net_store_data(packet, &binlog_ignore_db);
+ }
+ else
+ {
+ net_store_null(packet);
+ net_store_null(packet);
+ net_store_null(packet);
+ net_store_null(packet);
+ }
+
+ if(my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
+ DBUG_RETURN(-1);
+
+ send_eof(&thd->net);
+ DBUG_RETURN(0);
+}
diff --git a/sql/sql_repl.h b/sql/sql_repl.h
new file mode 100644
index 00000000000..9c8d43bda54
--- /dev/null
+++ b/sql/sql_repl.h
@@ -0,0 +1,21 @@
+#ifndef SQL_REPL_H
+#define SQL_REPL_H
+
+extern bool slave_running;
+extern volatile bool abort_slave;
+extern char* master_host;
+extern pthread_t slave_real_id;
+extern MASTER_INFO glob_mi;
+extern my_string opt_bin_logname, master_info_file;
+extern I_List<i_string> binlog_do_db, binlog_ignore_db;
+
+int start_slave(THD* thd = 0, bool net_report = 1);
+int stop_slave(THD* thd = 0, bool net_report = 1);
+int change_master(THD* thd);
+void reset_slave();
+void reset_master();
+
+extern int init_master_info(MASTER_INFO* mi);
+void kill_zombie_dump_threads(uint32 slave_server_id);
+
+#endif