summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rwxr-xr-xsql/CMakeLists.txt2
-rw-r--r--sql/Makefile.am6
-rw-r--r--sql/authors.h5
-rw-r--r--sql/client_settings.h6
-rw-r--r--sql/contributors.h5
-rw-r--r--sql/field.h5
-rw-r--r--sql/gstream.h5
-rw-r--r--sql/ha_ndbcluster.h5
-rw-r--r--sql/ha_ndbcluster_binlog.h5
-rw-r--r--sql/ha_ndbcluster_cond.h5
-rw-r--r--sql/ha_ndbcluster_tables.h5
-rw-r--r--sql/ha_partition.h5
-rw-r--r--sql/handler.cc30
-rw-r--r--sql/handler.h11
-rw-r--r--sql/item.h5
-rw-r--r--sql/item_cmpfunc.h5
-rw-r--r--sql/item_func.h4
-rw-r--r--sql/item_geofunc.h4
-rw-r--r--sql/item_row.h5
-rw-r--r--sql/item_strfunc.h4
-rw-r--r--sql/item_subselect.h5
-rw-r--r--sql/item_sum.h5
-rw-r--r--sql/item_timefunc.h5
-rw-r--r--sql/item_xmlfunc.h4
-rw-r--r--sql/lex.h8
-rw-r--r--sql/log.cc105
-rw-r--r--sql/log.h34
-rw-r--r--sql/log_event.cc111
-rw-r--r--sql/log_event.h65
-rw-r--r--sql/log_event_old.cc32
-rw-r--r--sql/message.h7
-rw-r--r--sql/mysql_priv.h99
-rw-r--r--sql/mysqld.cc109
-rw-r--r--sql/mysqld_suffix.h4
-rw-r--r--sql/nt_servc.h5
-rw-r--r--sql/partition_element.h5
-rw-r--r--sql/partition_info.h5
-rw-r--r--sql/procedure.h5
-rw-r--r--sql/protocol.h4
-rw-r--r--sql/repl_failsafe.h4
-rw-r--r--sql/replication.h490
-rw-r--r--sql/rpl_handler.cc493
-rw-r--r--sql/rpl_handler.h213
-rw-r--r--sql/rpl_mi.cc138
-rw-r--r--sql/rpl_mi.h15
-rw-r--r--sql/rpl_record.cc78
-rw-r--r--sql/rpl_record.h7
-rw-r--r--sql/rpl_rli.cc18
-rw-r--r--sql/rpl_rli.h25
-rw-r--r--sql/scheduler.h5
-rw-r--r--sql/set_var.cc28
-rw-r--r--sql/set_var.h26
-rw-r--r--sql/share/errmsg-utf8.txt6
-rw-r--r--sql/share/errmsg.txt6
-rw-r--r--sql/slave.cc623
-rw-r--r--sql/slave.h13
-rw-r--r--sql/sp_head.cc1
-rw-r--r--sql/sql_acl.h4
-rw-r--r--sql/sql_analyse.h5
-rw-r--r--sql/sql_array.h4
-rw-r--r--sql/sql_binlog.cc2
-rw-r--r--sql/sql_class.cc39
-rw-r--r--sql/sql_class.h28
-rw-r--r--sql/sql_crypt.h5
-rw-r--r--sql/sql_lex.h12
-rw-r--r--sql/sql_map.h5
-rw-r--r--sql/sql_parse.cc6
-rw-r--r--sql/sql_partition.cc4
-rw-r--r--sql/sql_partition.h4
-rw-r--r--sql/sql_plugin.cc17
-rw-r--r--sql/sql_plugin.h8
-rw-r--r--sql/sql_repl.cc483
-rw-r--r--sql/sql_repl.h4
-rw-r--r--sql/sql_select.h4
-rw-r--r--sql/sql_servers.h5
-rw-r--r--sql/sql_sort.h5
-rw-r--r--sql/sql_string.h5
-rw-r--r--sql/sql_trigger.h4
-rw-r--r--sql/sql_udf.h4
-rw-r--r--sql/sql_view.h4
-rw-r--r--sql/sql_yacc.yy79
-rw-r--r--sql/structs.h5
-rw-r--r--sql/table.h4
-rw-r--r--sql/tzfile.h5
-rw-r--r--sql/tztime.h4
-rw-r--r--sql/unireg.h5
86 files changed, 3243 insertions, 453 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt
index e75ce9080ca..3e350144f74 100755
--- a/sql/CMakeLists.txt
+++ b/sql/CMakeLists.txt
@@ -76,7 +76,7 @@ SET (SQL_SOURCE
rpl_rli.cc rpl_mi.cc sql_servers.cc
sql_connect.cc scheduler.cc
sql_profile.cc event_parse_data.cc
- sql_signal.cc
+ sql_signal.cc rpl_handler.cc
${PROJECT_SOURCE_DIR}/sql/sql_yacc.cc
${PROJECT_SOURCE_DIR}/sql/sql_yacc.h
${PROJECT_SOURCE_DIR}/include/mysqld_error.h
diff --git a/sql/Makefile.am b/sql/Makefile.am
index b24d9369f6f..569bb70965e 100644
--- a/sql/Makefile.am
+++ b/sql/Makefile.am
@@ -111,7 +111,8 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \
sql_plugin.h authors.h event_parse_data.h \
event_data_objects.h event_scheduler.h \
sql_partition.h partition_info.h partition_element.h \
- contributors.h sql_servers.h sql_signal.h
+ contributors.h sql_servers.h sql_signal.h \
+ rpl_handler.h replication.h
mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
item.cc item_sum.cc item_buff.cc item_func.cc \
@@ -156,7 +157,8 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
event_queue.cc event_db_repository.cc events.cc \
sql_plugin.cc sql_binlog.cc \
sql_builtin.cc sql_tablespace.cc partition_info.cc \
- sql_servers.cc event_parse_data.cc sql_signal.cc
+ sql_servers.cc event_parse_data.cc sql_signal.cc \
+ rpl_handler.cc
nodist_mysqld_SOURCES = mini_client_errors.c pack.c client.c my_time.c my_user.c
diff --git a/sql/authors.h b/sql/authors.h
index bb5156742e5..ec46e368f5f 100644
--- a/sql/authors.h
+++ b/sql/authors.h
@@ -1,3 +1,6 @@
+#ifndef AUTHORS_INCLUDED
+#define AUTHORS_INCLUDED
+
/* Copyright (C) 2005-2006 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -151,3 +154,5 @@ struct show_table_authors_st show_table_authors[]= {
"SHA1(), AES_ENCRYPT(), AES_DECRYPT(), bug fixing" },
{NULL, NULL, NULL}
};
+
+#endif /* AUTHORS_INCLUDED */
diff --git a/sql/client_settings.h b/sql/client_settings.h
index 4f06c15a29e..fd50bfdbb88 100644
--- a/sql/client_settings.h
+++ b/sql/client_settings.h
@@ -14,6 +14,12 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+#ifndef CLIENT_SETTINGS_INCLUDED
+#define CLIENT_SETTINGS_INCLUDED
+#else
+#error You have already included an client_settings.h and it should not be included twice
+#endif /* CLIENT_SETTINGS_INCLUDED */
+
#include <thr_alarm.h>
#define CLIENT_CAPABILITIES (CLIENT_LONG_PASSWORD | CLIENT_LONG_FLAG | \
diff --git a/sql/contributors.h b/sql/contributors.h
index 87001e29d88..6cf8bb88e3b 100644
--- a/sql/contributors.h
+++ b/sql/contributors.h
@@ -1,3 +1,6 @@
+#ifndef CONTRIBUTORS_INCLUDED
+#define CONTRIBUTORS_INCLUDED
+
/* Copyright (C) 2006 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -37,3 +40,5 @@ struct show_table_contributors_st show_table_contributors[]= {
{"Mark Shuttleworth", "London, UK.", "EFF contribution for UC2006 Auction"},
{NULL, NULL, NULL}
};
+
+#endif /* CONTRIBUTORS_INCLUDED */
diff --git a/sql/field.h b/sql/field.h
index d8863ef43e6..8bf641a8649 100644
--- a/sql/field.h
+++ b/sql/field.h
@@ -1,3 +1,6 @@
+#ifndef FIELD_INCLUDED
+#define FIELD_INCLUDED
+
/* Copyright 2000-2008 MySQL AB, 2008 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify
@@ -2189,3 +2192,5 @@ int set_field_to_null_with_conversions(Field *field, bool no_conversions);
#define f_no_default(x) (x & FIELDFLAG_NO_DEFAULT)
#define f_bit_as_char(x) ((x) & FIELDFLAG_TREAT_BIT_AS_CHAR)
#define f_is_hex_escape(x) ((x) & FIELDFLAG_HEX_ESCAPE)
+
+#endif /* FIELD_INCLUDED */
diff --git a/sql/gstream.h b/sql/gstream.h
index 1ef90ad5bf0..ea7158ee1a3 100644
--- a/sql/gstream.h
+++ b/sql/gstream.h
@@ -1,3 +1,6 @@
+#ifndef GSTREAM_INCLUDED
+#define GSTREAM_INCLUDED
+
/* Copyright (C) 2000-2004 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -73,3 +76,5 @@ protected:
char *m_err_msg;
CHARSET_INFO *m_charset;
};
+
+#endif /* GSTREAM_INCLUDED */
diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h
index 9106fd60731..ac3e7329136 100644
--- a/sql/ha_ndbcluster.h
+++ b/sql/ha_ndbcluster.h
@@ -1,3 +1,6 @@
+#ifndef HA_NDBCLUSTER_INCLUDED
+#define HA_NDBCLUSTER_INCLUDED
+
/* Copyright (C) 2000-2003 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -582,3 +585,5 @@ static const int ndbcluster_hton_name_length=sizeof(ndbcluster_hton_name)-1;
extern int ndbcluster_terminating;
extern int ndb_util_thread_running;
extern pthread_cond_t COND_ndb_util_ready;
+
+#endif /* HA_NDBCLUSTER_INCLUDED */
diff --git a/sql/ha_ndbcluster_binlog.h b/sql/ha_ndbcluster_binlog.h
index 1cad643e5ec..d80dfe9ee74 100644
--- a/sql/ha_ndbcluster_binlog.h
+++ b/sql/ha_ndbcluster_binlog.h
@@ -1,3 +1,6 @@
+#ifndef HA_NDBCLUSTER_BINLOG_INCLUDED
+#define HA_NDBCLUSTER_BINLOG_INCLUDED
+
/* Copyright (C) 2000-2003 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -225,3 +228,5 @@ set_thd_ndb(THD *thd, Thd_ndb *thd_ndb)
{ thd_set_ha_data(thd, ndbcluster_hton, thd_ndb); }
Ndb* check_ndb_in_thd(THD* thd);
+
+#endif /* HA_NDBCLUSTER_BINLOG_INCLUDED */
diff --git a/sql/ha_ndbcluster_cond.h b/sql/ha_ndbcluster_cond.h
index 4401a93c9e1..4ccc7e062ec 100644
--- a/sql/ha_ndbcluster_cond.h
+++ b/sql/ha_ndbcluster_cond.h
@@ -1,3 +1,6 @@
+#ifndef HA_NDBCLUSTER_COND_INCLUDED
+#define HA_NDBCLUSTER_COND_INCLUDED
+
/* Copyright (C) 2000-2007 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -486,3 +489,5 @@ private:
Ndb_cond_stack *m_cond_stack;
};
+
+#endif /* HA_NDBCLUSTER_COND_INCLUDED */
diff --git a/sql/ha_ndbcluster_tables.h b/sql/ha_ndbcluster_tables.h
index c6bc8f577f8..ba2e8ec251b 100644
--- a/sql/ha_ndbcluster_tables.h
+++ b/sql/ha_ndbcluster_tables.h
@@ -1,3 +1,6 @@
+#ifndef HA_NDBCLUSTER_TABLES_INCLUDED
+#define HA_NDBCLUSTER_TABLES_INCLUDED
+
/* Copyright (C) 2000-2003 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -21,3 +24,5 @@
#define OLD_NDB_APPLY_TABLE "apply_status"
#define NDB_SCHEMA_TABLE "ndb_schema"
#define OLD_NDB_SCHEMA_TABLE "schema"
+
+#endif /* HA_NDBCLUSTER_TABLES_INCLUDED */
diff --git a/sql/ha_partition.h b/sql/ha_partition.h
index fdf1a17d42b..c62318daf84 100644
--- a/sql/ha_partition.h
+++ b/sql/ha_partition.h
@@ -1,3 +1,6 @@
+#ifndef HA_PARTITION_INCLUDED
+#define HA_PARTITION_INCLUDED
+
/* Copyright 2005-2008 MySQL AB, 2008 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify
@@ -1102,3 +1105,5 @@ public:
virtual void append_create_info(String *packet)
*/
};
+
+#endif /* HA_PARTITION_INCLUDED */
diff --git a/sql/handler.cc b/sql/handler.cc
index 865fc083e3a..bc8c627a6ae 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -24,6 +24,7 @@
#endif
#include "mysql_priv.h"
+#include "rpl_handler.h"
#include "rpl_filter.h"
#include <myisampack.h>
#include <errno.h>
@@ -222,6 +223,8 @@ handlerton *ha_checktype(THD *thd, enum legacy_db_type database_type,
return NULL;
}
+ RUN_HOOK(transaction, after_rollback, (thd, FALSE));
+
switch (database_type) {
#ifndef NO_HASH
case DB_TYPE_HASH:
@@ -423,7 +426,13 @@ int ha_finalize_handlerton(st_plugin_int *plugin)
reuse an array slot. Otherwise the number of uninstall/install
cycles would be limited.
*/
- hton2plugin[hton->slot]= NULL;
+ if (hton->slot != HA_SLOT_UNDEF)
+ {
+ /* Make sure we are not unpluging another plugin */
+ DBUG_ASSERT(hton2plugin[hton->slot] == plugin);
+ DBUG_ASSERT(hton->slot < MAX_HA);
+ hton2plugin[hton->slot]= NULL;
+ }
my_free((uchar*)hton, MYF(0));
@@ -440,6 +449,15 @@ int ha_initialize_handlerton(st_plugin_int *plugin)
hton= (handlerton *)my_malloc(sizeof(handlerton),
MYF(MY_WME | MY_ZEROFILL));
+
+ if (hton == NULL)
+ {
+ sql_print_error("Unable to allocate memory for plugin '%s' handlerton.",
+ plugin->name.str);
+ goto err_no_hton_memory;
+ }
+
+ hton->slot= HA_SLOT_UNDEF;
/* Historical Requirement */
plugin->data= hton; // shortcut for the future
if (plugin->plugin->init && plugin->plugin->init(hton))
@@ -550,6 +568,7 @@ err_deinit:
err:
my_free((uchar*) hton, MYF(0));
+err_no_hton_memory:
plugin->data= NULL;
DBUG_RETURN(1);
}
@@ -1200,6 +1219,7 @@ int ha_commit_trans(THD *thd, bool all)
if (cookie)
tc_log->unlog(cookie, xid);
DBUG_EXECUTE_IF("crash_commit_after", abort(););
+ RUN_HOOK(transaction, after_commit, (thd, FALSE));
end:
if (rw_trans)
start_waiting_global_read_lock(thd);
@@ -1347,6 +1367,7 @@ int ha_rollback_trans(THD *thd, bool all)
push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
ER_WARNING_NOT_COMPLETE_ROLLBACK,
ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
+ RUN_HOOK(transaction, after_rollback, (thd, FALSE));
DBUG_RETURN(error);
}
@@ -1381,7 +1402,14 @@ int ha_autocommit_or_rollback(THD *thd, int error)
thd->variables.tx_isolation=thd->session_tx_isolation;
}
+ else
#endif
+ {
+ if (!error)
+ RUN_HOOK(transaction, after_commit, (thd, FALSE));
+ else
+ RUN_HOOK(transaction, after_rollback, (thd, FALSE));
+ }
DBUG_RETURN(error);
}
diff --git a/sql/handler.h b/sql/handler.h
index fe8f7c437ff..632c262b59a 100644
--- a/sql/handler.h
+++ b/sql/handler.h
@@ -1,3 +1,6 @@
+#ifndef HANDLER_INCLUDED
+#define HANDLER_INCLUDED
+
/* Copyright 2000-2008 MySQL AB, 2008 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify
@@ -214,6 +217,13 @@
#define MAX_HA 15
/*
+ Use this instead of 0 as the initial value for the slot number of
+ handlerton, so that we can distinguish uninitialized slot number
+ from slot 0.
+*/
+#define HA_SLOT_UNDEF ((uint)-1)
+
+/*
Parameters for open() (in register form->filestat)
HA_GET_INFO does an implicit HA_ABORT_IF_LOCKED
*/
@@ -2073,3 +2083,4 @@ int ha_binlog_end(THD *thd);
#define ha_binlog_wait(a) do {} while (0)
#define ha_binlog_end(a) do {} while (0)
#endif
+#endif /* HANDLER_INCLUDED */
diff --git a/sql/item.h b/sql/item.h
index a2cff3ab3a9..b44e84f4b15 100644
--- a/sql/item.h
+++ b/sql/item.h
@@ -1,3 +1,6 @@
+#ifndef ITEM_INCLUDED
+#define ITEM_INCLUDED
+
/* Copyright 2000-2008 MySQL AB, 2008 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify
@@ -3126,3 +3129,5 @@ extern Cached_item *new_Cached_item(THD *thd, Item *item);
extern Item_result item_cmp_type(Item_result a,Item_result b);
extern void resolve_const_item(THD *thd, Item **ref, Item *cmp_item);
extern int stored_field_cmp_to_item(Field *field, Item *item);
+
+#endif /* ITEM_INCLUDED */
diff --git a/sql/item_cmpfunc.h b/sql/item_cmpfunc.h
index c2227fa04e0..3462bed94a2 100644
--- a/sql/item_cmpfunc.h
+++ b/sql/item_cmpfunc.h
@@ -1,3 +1,6 @@
+#ifndef ITEM_CMPFUNC_INCLUDED
+#define ITEM_CMPFUNC_INCLUDED
+
/* Copyright (C) 2000-2003 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -1721,3 +1724,5 @@ inline Item *and_conds(Item *a, Item *b)
}
Item *and_expressions(Item *a, Item *b, Item **org_item);
+
+#endif /* ITEM_CMPFUNC_INCLUDED */
diff --git a/sql/item_func.h b/sql/item_func.h
index fdbbff89e60..628878bcaed 100644
--- a/sql/item_func.h
+++ b/sql/item_func.h
@@ -1,3 +1,6 @@
+#ifndef ITEM_FUNC_INCLUDED
+#define ITEM_FUNC_INCLUDED
+
/* Copyright 2000-2008 MySQL AB, 2008 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify
@@ -1718,3 +1721,4 @@ public:
bool check_partition_func_processor(uchar *int_arg) {return FALSE;}
};
+#endif /* ITEM_FUNC_INCLUDED */
diff --git a/sql/item_geofunc.h b/sql/item_geofunc.h
index edbe104e307..9a55ea7d5b1 100644
--- a/sql/item_geofunc.h
+++ b/sql/item_geofunc.h
@@ -1,3 +1,6 @@
+#ifndef ITEM_GEOFUNC_INCLUDED
+#define ITEM_GEOFUNC_INCLUDED
+
/* Copyright (C) 2000-2003 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -386,3 +389,4 @@ public:
#endif
+#endif /* ITEM_GEOFUNC_INCLUDED */
diff --git a/sql/item_row.h b/sql/item_row.h
index 67441f49603..7c08c5888e0 100644
--- a/sql/item_row.h
+++ b/sql/item_row.h
@@ -1,3 +1,6 @@
+#ifndef ITEM_ROW_INCLUDED
+#define ITEM_ROW_INCLUDED
+
/* Copyright (C) 2000 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -77,3 +80,5 @@ public:
bool null_inside() { return with_null; };
void bring_value();
};
+
+#endif /* ITEM_ROW_INCLUDED */
diff --git a/sql/item_strfunc.h b/sql/item_strfunc.h
index 87d8c7bd438..2233679e40c 100644
--- a/sql/item_strfunc.h
+++ b/sql/item_strfunc.h
@@ -1,3 +1,6 @@
+#ifndef ITEM_STRFUNC_INCLUDED
+#define ITEM_STRFUNC_INCLUDED
+
/* Copyright (C) 2000-2003 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -847,3 +850,4 @@ public:
String *val_str(String *);
};
+#endif /* ITEM_STRFUNC_INCLUDED */
diff --git a/sql/item_subselect.h b/sql/item_subselect.h
index d4aa621c083..ea59521aab1 100644
--- a/sql/item_subselect.h
+++ b/sql/item_subselect.h
@@ -1,3 +1,6 @@
+#ifndef ITEM_SUBSELECT_INCLUDED
+#define ITEM_SUBSELECT_INCLUDED
+
/* Copyright (C) 2000 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -579,4 +582,4 @@ inline bool Item_subselect::is_uncacheable() const
return engine->uncacheable();
}
-
+#endif /* ITEM_SUBSELECT_INCLUDED */
diff --git a/sql/item_sum.h b/sql/item_sum.h
index 1dd1c5b5dd5..62addf4e033 100644
--- a/sql/item_sum.h
+++ b/sql/item_sum.h
@@ -1,3 +1,6 @@
+#ifndef ITEM_SUM_INCLUDED
+#define ITEM_SUM_INCLUDED
+
/* Copyright (C) 2000-2006 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -1413,3 +1416,5 @@ public:
virtual bool change_context_processor(uchar *cntx)
{ context= (Name_resolution_context *)cntx; return FALSE; }
};
+
+#endif /* ITEM_SUM_INCLUDED */
diff --git a/sql/item_timefunc.h b/sql/item_timefunc.h
index 9e3c2e8c89f..7f1b2ed3a53 100644
--- a/sql/item_timefunc.h
+++ b/sql/item_timefunc.h
@@ -1,3 +1,6 @@
+#ifndef ITEM_TIMEFUNC_INCLUDED
+#define ITEM_TIMEFUNC_INCLUDED
+
/* Copyright (C) 2000-2006 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -1026,3 +1029,5 @@ public:
const char *func_name() const { return "last_day"; }
bool get_date(MYSQL_TIME *res, uint fuzzy_date);
};
+
+#endif /* ITEM_TIMEFUNC_INCLUDED */
diff --git a/sql/item_xmlfunc.h b/sql/item_xmlfunc.h
index dadbb5ccf42..6373bde0aab 100644
--- a/sql/item_xmlfunc.h
+++ b/sql/item_xmlfunc.h
@@ -1,3 +1,6 @@
+#ifndef ITEM_XMLFUNC_INCLUDED
+#define ITEM_XMLFUNC_INCLUDED
+
/* Copyright (C) 2000-2005 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -61,3 +64,4 @@ public:
String *val_str(String *);
};
+#endif /* ITEM_XMLFUNC_INCLUDED */
diff --git a/sql/lex.h b/sql/lex.h
index b4945bbfd1a..cd0c042159f 100644
--- a/sql/lex.h
+++ b/sql/lex.h
@@ -1,3 +1,6 @@
+#ifndef LEX_INCLUDED
+#define LEX_INCLUDED
+
/* Copyright (C) 2000-2002 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -248,6 +251,7 @@ static SYMBOL symbols[] = {
{ "IDENTIFIED", SYM(IDENTIFIED_SYM)},
{ "IF", SYM(IF)},
{ "IGNORE", SYM(IGNORE_SYM)},
+ { "IGNORE_SERVER_IDS", SYM(IGNORE_SERVER_IDS_SYM)},
{ "IMPORT", SYM(IMPORT)},
{ "IN", SYM(IN_SYM)},
{ "INDEX", SYM(INDEX_SYM)},
@@ -327,6 +331,7 @@ static SYMBOL symbols[] = {
{ "MASTER_SSL_KEY", SYM(MASTER_SSL_KEY_SYM)},
{ "MASTER_SSL_VERIFY_SERVER_CERT", SYM(MASTER_SSL_VERIFY_SERVER_CERT_SYM)},
{ "MASTER_USER", SYM(MASTER_USER_SYM)},
+ { "MASTER_HEARTBEAT_PERIOD", SYM(MASTER_HEARTBEAT_PERIOD_SYM)},
{ "MATCH", SYM(MATCH)},
{ "MAX_CONNECTIONS_PER_HOUR", SYM(MAX_CONNECTIONS_PER_HOUR)},
{ "MAX_QUERIES_PER_HOUR", SYM(MAX_QUERIES_PER_HOUR)},
@@ -435,6 +440,7 @@ static SYMBOL symbols[] = {
{ "REDUNDANT", SYM(REDUNDANT_SYM)},
{ "REFERENCES", SYM(REFERENCES)},
{ "REGEXP", SYM(REGEXP)},
+ { "RELAYLOG", SYM(RELAYLOG_SYM)},
{ "RELAY_LOG_FILE", SYM(RELAY_LOG_FILE_SYM)},
{ "RELAY_LOG_POS", SYM(RELAY_LOG_POS_SYM)},
{ "RELAY_THREAD", SYM(RELAY_THREAD)},
@@ -649,3 +655,5 @@ static SYMBOL sql_functions[] = {
{ "VAR_POP", SYM(VARIANCE_SYM)},
{ "VAR_SAMP", SYM(VAR_SAMP_SYM)},
};
+
+#endif /* LEX_INCLUDED */
diff --git a/sql/log.cc b/sql/log.cc
index aff5330d93f..4f5230b82cc 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -38,6 +38,7 @@
#endif
#include <mysql/plugin.h>
+#include "rpl_handler.h"
/* max size of the log message */
#define MAX_LOG_BUFFER_SIZE 1024
@@ -49,8 +50,7 @@
LOGGER logger;
-MYSQL_BIN_LOG mysql_bin_log;
-ulong sync_binlog_counter= 0;
+MYSQL_BIN_LOG mysql_bin_log(&sync_binlog_period);
static bool test_if_number(const char *str,
long *res, bool allow_wildcards);
@@ -969,6 +969,7 @@ bool LOGGER::slow_log_print(THD *thd, const char *query, uint query_length,
uint user_host_len= 0;
ulonglong query_utime, lock_utime;
+ DBUG_ASSERT(thd->enable_slow_log);
/*
Print the message to the buffer if we have slow log enabled
*/
@@ -2421,10 +2422,11 @@ const char *MYSQL_LOG::generate_name(const char *log_name,
-MYSQL_BIN_LOG::MYSQL_BIN_LOG()
+MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period)
:bytes_written(0), prepared_xids(0), file_id(1), open_count(1),
need_start_event(TRUE), m_table_map_version(0),
- is_relay_log(0),
+ sync_period_ptr(sync_period),
+ is_relay_log(0), signal_cnt(0),
description_event_for_exec(0), description_event_for_queue(0)
{
/*
@@ -3654,6 +3656,8 @@ bool MYSQL_BIN_LOG::append(Log_event* ev)
}
bytes_written+= ev->data_written;
DBUG_PRINT("info",("max_size: %lu",max_size));
+ if (flush_and_sync(0))
+ goto err;
if ((uint) my_b_append_tell(&log_file) > max_size)
new_file_without_locking();
@@ -3684,6 +3688,8 @@ bool MYSQL_BIN_LOG::appendv(const char* buf, uint len,...)
bytes_written += len;
} while ((buf=va_arg(args,const char*)) && (len=va_arg(args,uint)));
DBUG_PRINT("info",("max_size: %lu",max_size));
+ if (flush_and_sync(0))
+ goto err;
if ((uint) my_b_append_tell(&log_file) > max_size)
new_file_without_locking();
@@ -3693,17 +3699,21 @@ err:
DBUG_RETURN(error);
}
-
-bool MYSQL_BIN_LOG::flush_and_sync()
+bool MYSQL_BIN_LOG::flush_and_sync(bool *synced)
{
int err=0, fd=log_file.file;
+ if (synced)
+ *synced= 0;
safe_mutex_assert_owner(&LOCK_log);
if (flush_io_cache(&log_file))
return 1;
- if (++sync_binlog_counter >= sync_binlog_period && sync_binlog_period)
+ uint sync_period= get_sync_period();
+ if (sync_period && ++sync_counter >= sync_period)
{
- sync_binlog_counter= 0;
+ sync_counter= 0;
err=my_sync(fd, MYF(MY_WME));
+ if (synced)
+ *synced= 1;
}
return err;
}
@@ -3994,7 +4004,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
if (file == &log_file)
{
- error= flush_and_sync();
+ error= flush_and_sync(0);
if (!error)
{
signal_update();
@@ -4180,8 +4190,16 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
if (file == &log_file) // we are writing to the real log (disk)
{
- if (flush_and_sync())
+ bool synced= 0;
+ if (flush_and_sync(&synced))
goto err;
+
+ if (RUN_HOOK(binlog_storage, after_flush,
+ (thd, log_file_name, file->pos_in_file, synced))) {
+ sql_print_error("Failed to run 'after_flush' hooks");
+ goto err;
+ }
+
signal_update();
rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
}
@@ -4436,7 +4454,7 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache, bool lock_log, bool sync_log)
DBUG_ASSERT(carry == 0);
if (sync_log)
- flush_and_sync();
+ return flush_and_sync(0);
return 0; // All OK
}
@@ -4483,7 +4501,7 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd, bool lock)
ev.write(&log_file);
if (lock)
{
- if (!error && !(error= flush_and_sync()))
+ if (!error && !(error= flush_and_sync(0)))
{
signal_update();
rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
@@ -4571,7 +4589,8 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event,
if (incident && write_incident(thd, FALSE))
goto err;
- if (flush_and_sync())
+ bool synced= 0;
+ if (flush_and_sync(&synced))
goto err;
DBUG_EXECUTE_IF("half_binlogged_transaction", abort(););
if (cache->error) // Error on read
@@ -4580,6 +4599,15 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event,
write_error=1; // Don't give more errors
goto err;
}
+
+ if (RUN_HOOK(binlog_storage, after_flush,
+ (thd, log_file_name, log_file.pos_in_file, synced)))
+ {
+ sql_print_error("Failed to run 'after_flush' hooks");
+ write_error=1;
+ goto err;
+ }
+
signal_update();
}
@@ -4616,12 +4644,9 @@ err:
/**
- Wait until we get a signal that the binary log has been updated.
+ Wait until we get a signal that the relay log has been updated.
@param thd Thread variable
- @param is_slave If 0, the caller is the Binlog_dump thread from master;
- if 1, the caller is the SQL thread from the slave. This
- influences only thd->proc_info.
@note
One must have a lock on LOCK_log before calling this function.
@@ -4629,22 +4654,53 @@ err:
THD::enter_cond() (see NOTES in sql_class.h).
*/
-void MYSQL_BIN_LOG::wait_for_update(THD* thd, bool is_slave)
+void MYSQL_BIN_LOG::wait_for_update_relay_log(THD* thd)
{
const char *old_msg;
- DBUG_ENTER("wait_for_update");
+ DBUG_ENTER("wait_for_update_relay_log");
old_msg= thd->enter_cond(&update_cond, &LOCK_log,
- is_slave ?
- "Has read all relay log; waiting for the slave I/O "
- "thread to update it" :
- "Has sent all binlog to slave; waiting for binlog "
- "to be updated");
+ "Slave has read all relay log; "
+ "waiting for the slave I/O "
+ "thread to update it" );
pthread_cond_wait(&update_cond, &LOCK_log);
thd->exit_cond(old_msg);
DBUG_VOID_RETURN;
}
+/**
+ Wait until we get a signal that the binary log has been updated.
+ Applies to master only.
+
+ NOTES
+ @param[in] thd a THD struct
+ @param[in] timeout a pointer to a timespec;
+ NULL means to wait w/o timeout.
+ @retval 0 if got signalled on update
+ @retval non-0 if wait timeout elapsed
+ @note
+ LOCK_log must be taken before calling this function.
+ LOCK_log is being released while the thread is waiting.
+ LOCK_log is released by the caller.
+*/
+
+int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd,
+ const struct timespec *timeout)
+{
+ int ret= 0;
+ const char* old_msg = thd->proc_info;
+ DBUG_ENTER("wait_for_update_bin_log");
+ old_msg= thd->enter_cond(&update_cond, &LOCK_log,
+ "Master has sent all binlog to slave; "
+ "waiting for binlog to be updated");
+ if (!timeout)
+ pthread_cond_wait(&update_cond, &LOCK_log);
+ else
+ ret= pthread_cond_timedwait(&update_cond, &LOCK_log,
+ const_cast<struct timespec *>(timeout));
+ DBUG_RETURN(ret);
+}
+
/**
Close the log file.
@@ -4857,6 +4913,7 @@ bool flush_error_log()
void MYSQL_BIN_LOG::signal_update()
{
DBUG_ENTER("MYSQL_BIN_LOG::signal_update");
+ signal_cnt++;
pthread_cond_broadcast(&update_cond);
DBUG_VOID_RETURN;
}
diff --git a/sql/log.h b/sql/log.h
index d306d6f7182..a31be6dcce6 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -269,6 +269,18 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
ulonglong m_table_map_version;
+ /* pointer to the sync period variable, for binlog this will be
+ sync_binlog_period, for relay log this will be
+ sync_relay_log_period
+ */
+ uint *sync_period_ptr;
+ uint sync_counter;
+
+ inline uint get_sync_period()
+ {
+ return *sync_period_ptr;
+ }
+
int write_to_file(IO_CACHE *cache);
/*
This is used to start writing to a new log file. The difference from
@@ -284,7 +296,7 @@ public:
/* This is relay log */
bool is_relay_log;
-
+ ulong signal_cnt; // update of the counter is checked by heartbeat
/*
These describe the log's format. This is used only for relay logs.
_for_exec is used by the SQL thread, _for_queue by the I/O thread. It's
@@ -296,7 +308,7 @@ public:
Format_description_log_event *description_event_for_exec,
*description_event_for_queue;
- MYSQL_BIN_LOG();
+ MYSQL_BIN_LOG(uint *sync_period);
/*
note that there's no destructor ~MYSQL_BIN_LOG() !
The reason is that we don't want it to be automatically called
@@ -339,7 +351,8 @@ public:
}
void set_max_size(ulong max_size_arg);
void signal_update();
- void wait_for_update(THD* thd, bool master_or_slave);
+ void wait_for_update_relay_log(THD* thd);
+ int wait_for_update_bin_log(THD* thd, const struct timespec * timeout);
void set_need_start_event() { need_start_event = 1; }
void init(bool no_auto_events_arg, ulong max_size);
void init_pthread_objects();
@@ -378,7 +391,20 @@ public:
bool is_active(const char* log_file_name);
int update_log_index(LOG_INFO* linfo, bool need_update_threads);
void rotate_and_purge(uint flags);
- bool flush_and_sync();
+ /**
+ Flush binlog cache and synchronize to disk.
+
+ This function flushes events in binlog cache to binary log file,
+ it will do synchronizing according to the setting of system
+ variable 'sync_binlog'. If file is synchronized, @c synced will
+ be set to 1, otherwise 0.
+
+ @param[out] synced if not NULL, set to 1 if file is synchronized, otherwise 0
+
+ @retval 0 Success
+ @retval other Failure
+ */
+ bool flush_and_sync(bool *synced);
int purge_logs(const char *to_log, bool included,
bool need_mutex, bool need_update_threads,
ulonglong *decrease_log_space);
diff --git a/sql/log_event.cc b/sql/log_event.cc
index f445cff4758..10a1ee431b4 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -2394,13 +2394,29 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
charset_database_number= thd_arg->variables.collation_database->number;
/*
- If we don't use flags2 for anything else than options contained in
- thd_arg->options, it would be more efficient to flags2=thd_arg->options
- (OPTIONS_WRITTEN_TO_BIN_LOG would be used only at reading time).
- But it's likely that we don't want to use 32 bits for 3 bits; in the future
- we will probably want to reclaim the 29 bits. So we need the &.
+ We only replicate over the bits of flags2 that we need: the rest
+ are masked out by "& OPTIONS_WRITTEN_TO_BINLOG".
+
+ We also force AUTOCOMMIT=1. Rationale (cf. BUG#29288): After
+ fixing BUG#26395, we always write BEGIN and COMMIT around all
+ transactions (even single statements in autocommit mode). This is
+ so that replication from non-transactional to transactional table
+ and error recovery from XA to non-XA table should work as
+ expected. The BEGIN/COMMIT are added in log.cc. However, there is
+ one exception: MyISAM bypasses log.cc and writes directly to the
+ binlog. So if autocommit is off, master has MyISAM, and slave has
+ a transactional engine, then the slave will just see one long
+ never-ending transaction. The only way to bypass explicit
+ BEGIN/COMMIT in the binlog is by using a non-transactional table.
+ So setting AUTOCOMMIT=1 will make this work as expected.
+
+ Note: explicitly replicate AUTOCOMMIT=1 from master. We do not
+ assume AUTOCOMMIT=1 on slave; the slave still reads the state of
+ the autocommit flag as written by the master to the binlog. This
+ behavior may change after WL#4162 has been implemented.
*/
- flags2= (uint32) (thd_arg->options & OPTIONS_WRITTEN_TO_BIN_LOG);
+ flags2= (uint32) (thd_arg->options &
+ (OPTIONS_WRITTEN_TO_BIN_LOG & ~OPTION_NOT_AUTOCOMMIT));
DBUG_ASSERT(thd_arg->variables.character_set_client->number < 256*256);
DBUG_ASSERT(thd_arg->variables.collation_connection->number < 256*256);
DBUG_ASSERT(thd_arg->variables.collation_server->number < 256*256);
@@ -3261,6 +3277,21 @@ Default database: '%s'. Query: '%s'",
*/
} /* End of if (db_ok(... */
+ {/**
+ The following failure injecion works in cooperation with tests
+ setting @@global.debug= 'd,stop_slave_middle_group'.
+ The sql thread receives the killed status and will proceed
+ to shutdown trying to finish incomplete events group.
+ */
+ DBUG_EXECUTE_IF("stop_slave_middle_group",
+ if (strcmp("COMMIT", query) != 0 &&
+ strcmp("BEGIN", query) != 0)
+ {
+ if (thd->transaction.all.modified_non_trans_table)
+ const_cast<Relay_log_info*>(rli)->abort_slave= 1;
+ };);
+ }
+
end:
/*
Probably we have set thd->query, thd->db, thd->catalog to point to places
@@ -3632,6 +3663,7 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
post_header_len[UPDATE_ROWS_EVENT-1]=
post_header_len[DELETE_ROWS_EVENT-1]= 6;);
post_header_len[INCIDENT_EVENT-1]= INCIDENT_HEADER_LEN;
+ post_header_len[HEARTBEAT_LOG_EVENT-1]= 0;
// Sanity-check that all post header lengths are initialized.
IF_DBUG({
@@ -7469,8 +7501,16 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
thd->transaction.stmt.modified_non_trans_table= TRUE;
} // row processing loop
- DBUG_EXECUTE_IF("STOP_SLAVE_after_first_Rows_event",
- const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
+ {/**
+ The following failure injecion works in cooperation with tests
+ setting @@global.debug= 'd,stop_slave_middle_group'.
+ The sql thread receives the killed status and will proceed
+ to shutdown trying to finish incomplete events group.
+ */
+ DBUG_EXECUTE_IF("stop_slave_middle_group",
+ if (thd->transaction.all.modified_non_trans_table)
+ const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
+ }
if ((error= do_after_row_operations(rli, error)) &&
ignored_error_code(convert_handler_error(error, thd, table)))
@@ -7508,32 +7548,6 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
thd->is_slave_error= 1;
DBUG_RETURN(error);
}
- /*
- This code would ideally be placed in do_update_pos() instead, but
- since we have no access to table there, we do the setting of
- last_event_start_time here instead.
- */
- else if (table && (table->s->primary_key == MAX_KEY) &&
- !cache_stmt && get_flags(STMT_END_F) == RLE_NO_FLAGS)
- {
- /*
- ------------ Temporary fix until WL#2975 is implemented ---------
-
- This event is not the last one (no STMT_END_F). If we stop now
- (in case of terminate_slave_thread()), how will we restart? We
- have to restart from Table_map_log_event, but as this table is
- not transactional, the rows already inserted will still be
- present, and idempotency is not guaranteed (no PK) so we risk
- that repeating leads to double insert. So we desperately try to
- continue, hope we'll eventually leave this buggy situation (by
- executing the final Rows_log_event). If we are in a hopeless
- wait (reached end of last relay log and nothing gets appended
- there), we timeout after one minute, and notify DBA about the
- problem. When WL#2975 is implemented, just remove the member
- Relay_log_info::last_event_start_time and all its occurrences.
- */
- const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0);
- }
if (get_flags(STMT_END_F))
if (error= rows_event_stmt_cleanup(rli, thd))
@@ -8445,13 +8459,17 @@ Rows_log_event::write_row(const Relay_log_info *const rli,
auto_afree_ptr<char> key(NULL);
/* fill table->record[0] with default values */
-
+ bool abort_on_warnings= (rli->sql_thd->variables.sql_mode &
+ (MODE_STRICT_TRANS_TABLES | MODE_STRICT_ALL_TABLES));
if ((error= prepare_record(table, m_width,
- TRUE /* check if columns have def. values */)))
+ table->file->ht->db_type != DB_TYPE_NDBCLUSTER,
+ abort_on_warnings, m_curr_row == m_rows_buf)))
DBUG_RETURN(error);
/* unpack row into table->record[0] */
- error= unpack_current_row(rli); // TODO: how to handle errors?
+ if ((error= unpack_current_row(rli, abort_on_warnings)))
+ DBUG_RETURN(error);
+
if (m_curr_row == m_rows_buf)
{
/* this is the first row to be inserted, we estimate the rows with
@@ -9248,8 +9266,12 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
store_record(m_table,record[1]);
+ bool abort_on_warnings= (rli->sql_thd->variables.sql_mode &
+ (MODE_STRICT_TRANS_TABLES | MODE_STRICT_ALL_TABLES));
m_curr_row= m_curr_row_end;
- error= unpack_current_row(rli); // this also updates m_curr_row_end
+ /* this also updates m_curr_row_end */
+ if ((error= unpack_current_row(rli, abort_on_warnings)))
+ return error;
/*
Now we have the right row to update. The old row (the one we're
@@ -9428,3 +9450,16 @@ st_print_event_info::st_print_event_info()
open_cached_file(&body_cache, NULL, NULL, 0, flags);
}
#endif
+
+
+#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
+Heartbeat_log_event::Heartbeat_log_event(const char* buf, uint event_len,
+ const Format_description_log_event* description_event)
+ :Log_event(buf, description_event)
+{
+ uint8 header_size= description_event->common_header_len;
+ ident_len = event_len - header_size;
+ set_if_smaller(ident_len,FN_REFLEN-1);
+ log_ident= buf + header_size;
+}
+#endif
diff --git a/sql/log_event.h b/sql/log_event.h
index 31d4a7480c2..cd5e659c910 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -250,6 +250,7 @@ struct sql_ex_info
#define EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN (4 + 4 + 4 + 1)
#define EXECUTE_LOAD_QUERY_HEADER_LEN (QUERY_HEADER_LEN + EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN)
#define INCIDENT_HEADER_LEN 2
+#define HEARTBEAT_HEADER_LEN 0
/*
Max number of possible extra bytes in a replication event compared to a
packet (i.e. a query) sent from client to master;
@@ -575,6 +576,12 @@ enum Log_event_type
INCIDENT_EVENT= 26,
/*
+ Heartbeat event to be send by master at its idle time
+ to ensure master's online status to slave
+ */
+ HEARTBEAT_LOG_EVENT= 27,
+
+ /*
Add new events here - right above this comment!
Existing events (except ENUM_END_EVENT) should never change their numbers
*/
@@ -689,6 +696,20 @@ typedef struct st_print_event_info
} PRINT_EVENT_INFO;
#endif
+/**
+ the struct aggregates two paramenters that identify an event
+ uniquely in scope of communication of a particular master and slave couple.
+ I.e there can not be 2 events from the same staying connected master which
+ have the same coordinates.
+ @note
+ Such identifier is not yet unique generally as the event originating master
+ is resetable. Also the crashed master can be replaced with some other.
+*/
+struct event_coordinates
+{
+ char * file_name; // binlog file name (directories stripped)
+ my_off_t pos; // event's position in the binlog file
+};
/**
@class Log_event
@@ -3541,12 +3562,16 @@ protected:
int write_row(const Relay_log_info *const, const bool);
// Unpack the current row into m_table->record[0]
- int unpack_current_row(const Relay_log_info *const rli)
+ int unpack_current_row(const Relay_log_info *const rli,
+ const bool abort_on_warning= TRUE)
{
DBUG_ASSERT(m_table);
+
+ bool first_row= (m_curr_row == m_rows_buf);
ASSERT_OR_RETURN_ERROR(m_curr_row < m_rows_end, HA_ERR_CORRUPT_EVENT);
int const result= ::unpack_row(rli, m_table, m_width, m_curr_row, &m_cols,
- &m_curr_row_end, &m_master_reclength);
+ &m_curr_row_end, &m_master_reclength,
+ abort_on_warning, first_row);
if (m_curr_row_end > m_rows_end)
my_error(ER_SLAVE_CORRUPT_EVENT, MYF(0));
ASSERT_OR_RETURN_ERROR(m_curr_row_end <= m_rows_end, HA_ERR_CORRUPT_EVENT);
@@ -3916,6 +3941,42 @@ static inline bool copy_event_cache_to_file_and_reinit(IO_CACHE *cache,
reinit_io_cache(cache, WRITE_CACHE, 0, FALSE, TRUE);
}
+#ifndef MYSQL_CLIENT
+/*****************************************************************************
+
+ Heartbeat Log Event class
+
+ Replication event to ensure to slave that master is alive.
+ The event is originated by master's dump thread and sent straight to
+ slave without being logged. Slave itself does not store it in relay log
+ but rather uses a data for immediate checks and throws away the event.
+
+ Two members of the class log_ident and Log_event::log_pos comprise
+ @see the event_coordinates instance. The coordinates that a heartbeat
+ instance carries correspond to the last event master has sent from
+ its binlog.
+
+ ****************************************************************************/
+class Heartbeat_log_event: public Log_event
+{
+public:
+ Heartbeat_log_event(const char* buf, uint event_len,
+ const Format_description_log_event* description_event);
+ Log_event_type get_type_code() { return HEARTBEAT_LOG_EVENT; }
+ bool is_valid() const
+ {
+ return (log_ident != NULL &&
+ log_pos >= BIN_LOG_HEADER_SIZE);
+ }
+ const char * get_log_ident() { return log_ident; }
+ uint get_ident_len() { return ident_len; }
+
+private:
+ const char* log_ident;
+ uint ident_len;
+};
+#endif
+
/**
@} (end of group Replication)
*/
diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc
index f1e5c43b04d..e6888af9cfc 100644
--- a/sql/log_event_old.cc
+++ b/sql/log_event_old.cc
@@ -226,7 +226,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info
row_start= row_end;
}
- DBUG_EXECUTE_IF("STOP_SLAVE_after_first_Rows_event",
+ DBUG_EXECUTE_IF("stop_slave_middle_group",
const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
error= do_after_row_operations(table, error);
if (!ev->cache_stmt)
@@ -269,34 +269,6 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info
DBUG_RETURN(error);
}
- /*
- This code would ideally be placed in do_update_pos() instead, but
- since we have no access to table there, we do the setting of
- last_event_start_time here instead.
- */
- if (table && (table->s->primary_key == MAX_KEY) &&
- !ev->cache_stmt &&
- ev->get_flags(Old_rows_log_event::STMT_END_F) == Old_rows_log_event::RLE_NO_FLAGS)
- {
- /*
- ------------ Temporary fix until WL#2975 is implemented ---------
-
- This event is not the last one (no STMT_END_F). If we stop now
- (in case of terminate_slave_thread()), how will we restart? We
- have to restart from Table_map_log_event, but as this table is
- not transactional, the rows already inserted will still be
- present, and idempotency is not guaranteed (no PK) so we risk
- that repeating leads to double insert. So we desperately try to
- continue, hope we'll eventually leave this buggy situation (by
- executing the final Old_rows_log_event). If we are in a hopeless
- wait (reached end of last relay log and nothing gets appended
- there), we timeout after one minute, and notify DBA about the
- problem. When WL#2975 is implemented, just remove the member
- st_relay_log_info::last_event_start_time and all its occurences.
- */
- const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0);
- }
-
DBUG_RETURN(0);
}
#endif
@@ -1744,7 +1716,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
} // row processing loop
- DBUG_EXECUTE_IF("STOP_SLAVE_after_first_Rows_event",
+ DBUG_EXECUTE_IF("stop_slave_middle_group",
const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
error= do_after_row_operations(rli, error);
if (!cache_stmt)
diff --git a/sql/message.h b/sql/message.h
index 0e7c282d5a1..97d039352b4 100644
--- a/sql/message.h
+++ b/sql/message.h
@@ -1,3 +1,6 @@
+#ifndef MESSAGE_INCLUDED
+#define MESSAGE_INCLUDED
+
/*
To change or add messages mysqld writes to the Windows error log, run
mc.exe message.mc
@@ -6,6 +9,8 @@
mc.exe can be installed with Windows SDK, some Visual Studio distributions
do not include it.
*/
+
+
//
// Values are 32 bit values layed out as follows:
//
@@ -53,3 +58,5 @@
//
#define MSG_DEFAULT 0xC0000064L
+#endif /* MESSAGE_INCLUDED */
+
diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h
index 128117e836d..05a79957963 100644
--- a/sql/mysql_priv.h
+++ b/sql/mysql_priv.h
@@ -963,100 +963,6 @@ struct Query_cache_query_flags
#define query_cache_is_cacheable_query(L) 0
#endif /*HAVE_QUERY_CACHE*/
-/*
- Error injector Macros to enable easy testing of recovery after failures
- in various error cases.
-*/
-#ifndef ERROR_INJECT_SUPPORT
-
-#define ERROR_INJECT(x) 0
-#define ERROR_INJECT_ACTION(x,action) 0
-#define ERROR_INJECT_CRASH(x) 0
-#define ERROR_INJECT_VALUE(x) 0
-#define ERROR_INJECT_VALUE_ACTION(x,action) 0
-#define ERROR_INJECT_VALUE_CRASH(x) 0
-#define SET_ERROR_INJECT_VALUE(x)
-
-#else
-
-inline bool check_and_unset_keyword(const char *dbug_str)
-{
- const char *extra_str= "-d,";
- char total_str[200];
- if (_db_strict_keyword_ (dbug_str))
- {
- strxmov(total_str, extra_str, dbug_str, NullS);
- DBUG_SET(total_str);
- return 1;
- }
- return 0;
-}
-
-
-inline bool
-check_and_unset_inject_value(int value)
-{
- THD *thd= current_thd;
- if (thd->error_inject_value == (uint)value)
- {
- thd->error_inject_value= 0;
- return 1;
- }
- return 0;
-}
-
-/*
- ERROR INJECT MODULE:
- --------------------
- These macros are used to insert macros from the application code.
- The event that activates those error injections can be activated
- from SQL by using:
- SET SESSION dbug=+d,code;
-
- After the error has been injected, the macros will automatically
- remove the debug code, thus similar to using:
- SET SESSION dbug=-d,code
- from SQL.
-
- ERROR_INJECT_CRASH will inject a crash of the MySQL Server if code
- is set when macro is called. ERROR_INJECT_CRASH can be used in
- if-statements, it will always return FALSE unless of course it
- crashes in which case it doesn't return at all.
-
- ERROR_INJECT_ACTION will inject the action specified in the action
- parameter of the macro, before performing the action the code will
- be removed such that no more events occur. ERROR_INJECT_ACTION
- can also be used in if-statements and always returns FALSE.
- ERROR_INJECT can be used in a normal if-statement, where the action
- part is performed in the if-block. The macro returns TRUE if the
- error was activated and otherwise returns FALSE. If activated the
- code is removed.
-
- Sometimes it is necessary to perform error inject actions as a serie
- of events. In this case one can use one variable on the THD object.
- Thus one sets this value by using e.g. SET_ERROR_INJECT_VALUE(100).
- Then one can later test for it by using ERROR_INJECT_CRASH_VALUE,
- ERROR_INJECT_ACTION_VALUE and ERROR_INJECT_VALUE. This have the same
- behaviour as the above described macros except that they use the
- error inject value instead of a code used by DBUG macros.
-*/
-#define SET_ERROR_INJECT_VALUE(x) \
- current_thd->error_inject_value= (x)
-#define ERROR_INJECT_CRASH(code) \
- DBUG_EVALUATE_IF(code, (abort(), 0), 0)
-#define ERROR_INJECT_ACTION(code, action) \
- (check_and_unset_keyword(code) ? ((action), 0) : 0)
-#define ERROR_INJECT(code) \
- check_and_unset_keyword(code)
-#define ERROR_INJECT_VALUE(value) \
- check_and_unset_inject_value(value)
-#define ERROR_INJECT_VALUE_ACTION(value,action) \
- (check_and_unset_inject_value(value) ? (action) : 0)
-#define ERROR_INJECT_VALUE_CRASH(value) \
- ERROR_INJECT_VALUE_ACTION(value, (abort(), 0))
-
-#endif
-
void write_bin_log(THD *thd, bool clear_error,
char const *query, ulong query_length);
@@ -1991,10 +1897,13 @@ extern ulong MYSQL_PLUGIN_IMPORT specialflag;
#endif /* MYSQL_SERVER || INNODB_COMPATIBILITY_HOOKS */
#ifdef MYSQL_SERVER
extern ulong current_pid;
-extern ulong expire_logs_days, sync_binlog_period, sync_binlog_counter;
+extern ulong expire_logs_days;
+extern uint sync_binlog_period, sync_relaylog_period,
+ sync_relayloginfo_period, sync_masterinfo_period;
extern ulong opt_tc_log_size, tc_log_max_pages_used, tc_log_page_size;
extern ulong tc_log_page_waits;
extern my_bool relay_log_purge, opt_innodb_safe_binlog, opt_innodb;
+extern my_bool relay_log_recovery;
extern uint test_flags,select_errors,ha_open_options;
extern uint protocol_version, mysqld_port, dropping_tables;
extern uint delay_key_write_options;
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 3aeea9ee5fb..608934d6864 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -33,6 +33,8 @@
#include "rpl_injector.h"
+#include "rpl_handler.h"
+
#ifdef HAVE_SYS_PRCTL_H
#include <sys/prctl.h>
#endif
@@ -490,6 +492,7 @@ extern const char *opt_ndb_distribution;
extern enum ndb_distribution opt_ndb_distribution_id;
#endif
my_bool opt_readonly, use_temp_pool, relay_log_purge;
+my_bool relay_log_recovery;
my_bool opt_sync_frm, opt_allow_suspicious_udfs;
my_bool opt_secure_auth= 0;
char* opt_secure_file_priv= 0;
@@ -569,7 +572,9 @@ ulong max_prepared_stmt_count;
*/
ulong prepared_stmt_count=0;
ulong thread_id=1L,current_pid;
-ulong slow_launch_threads = 0, sync_binlog_period;
+ulong slow_launch_threads = 0;
+uint sync_binlog_period= 0, sync_relaylog_period= 0,
+ sync_relayloginfo_period= 0, sync_masterinfo_period= 0;
ulong expire_logs_days = 0;
ulong rpl_recovery_rank=0;
const char *log_output_str= "FILE";
@@ -1317,6 +1322,7 @@ void clean_up(bool print_message)
ha_end();
if (tc_log)
tc_log->close();
+ delegates_destroy();
xid_cache_free();
delete_elements(&key_caches, (void (*)(const char*, uchar*)) free_key_cache);
multi_keycache_free();
@@ -3108,6 +3114,7 @@ SHOW_VAR com_status_vars[]= {
{"show_processlist", (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_PROCESSLIST]), SHOW_LONG_STATUS},
{"show_profile", (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_PROFILE]), SHOW_LONG_STATUS},
{"show_profiles", (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_PROFILES]), SHOW_LONG_STATUS},
+ {"show_relaylog_events", (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_RELAYLOG_EVENTS]), SHOW_LONG_STATUS},
{"show_slave_hosts", (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_SLAVE_HOSTS]), SHOW_LONG_STATUS},
{"show_slave_status", (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_SLAVE_STAT]), SHOW_LONG_STATUS},
{"show_status", (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_STATUS]), SHOW_LONG_STATUS},
@@ -3809,6 +3816,13 @@ static int init_server_components()
unireg_abort(1);
}
+ /* initialize delegates for extension observers */
+ if (delegates_init())
+ {
+ sql_print_error("Initialize extension delegates failed");
+ unireg_abort(1);
+ }
+
/* need to configure logging before initializing storage engines */
if (opt_update_log)
{
@@ -3867,17 +3881,17 @@ with --log-bin instead.");
}
if (opt_log_slave_updates && !opt_bin_log)
{
- sql_print_error("You need to use --log-bin to make "
+ sql_print_warning("You need to use --log-bin to make "
"--log-slave-updates work.");
- unireg_abort(1);
}
if (!opt_bin_log)
{
if (opt_binlog_format_id != BINLOG_FORMAT_UNSPEC)
{
- sql_print_error("You need to use --log-bin to make "
- "--binlog-format work.");
- unireg_abort(1);
+ sql_print_warning("You need to use --log-bin to make "
+ "--binlog-format work.");
+
+ global_system_variables.binlog_format= opt_binlog_format_id;
}
else
{
@@ -3899,11 +3913,17 @@ with --log-bin instead.");
#ifdef HAVE_REPLICATION
if (opt_log_slave_updates && replicate_same_server_id)
{
- sql_print_error("\
-using --replicate-same-server-id in conjunction with \
+ if (opt_bin_log)
+ {
+ sql_print_error("using --replicate-same-server-id in conjunction with \
--log-slave-updates is impossible, it would lead to infinite loops in this \
server.");
- unireg_abort(1);
+ unireg_abort(1);
+ }
+ else
+ sql_print_warning("using --replicate-same-server-id in conjunction with \
+--log-slave-updates would lead to infinite loops in this server. However this \
+will be ignored as the --log-bin option is not defined.");
}
#endif
@@ -5643,6 +5663,7 @@ enum options_mysqld
OPT_QUERY_CACHE_TYPE, OPT_QUERY_CACHE_WLOCK_INVALIDATE, OPT_RECORD_BUFFER,
OPT_RECORD_RND_BUFFER, OPT_DIV_PRECINCREMENT, OPT_RELAY_LOG_SPACE_LIMIT,
OPT_RELAY_LOG_PURGE,
+ OPT_RELAY_LOG_RECOVERY,
OPT_SLAVE_NET_TIMEOUT, OPT_SLAVE_COMPRESSED_PROTOCOL, OPT_SLOW_LAUNCH_TIME,
OPT_SLAVE_TRANS_RETRIES, OPT_READONLY, OPT_DEBUGGING,
OPT_SORT_BUFFER, OPT_TABLE_OPEN_CACHE, OPT_TABLE_DEF_CACHE,
@@ -5711,7 +5732,10 @@ enum options_mysqld
OPT_SLAVE_EXEC_MODE,
OPT_GENERAL_LOG_FILE,
OPT_SLOW_QUERY_LOG_FILE,
- OPT_IGNORE_BUILTIN_INNODB
+ OPT_IGNORE_BUILTIN_INNODB,
+ OPT_SYNC_RELAY_LOG,
+ OPT_SYNC_RELAY_LOG_INFO,
+ OPT_SYNC_MASTER_INFO
};
@@ -6952,6 +6976,13 @@ The minimum value for this variable is 4096.",
(uchar**) &relay_log_purge,
(uchar**) &relay_log_purge, 0, GET_BOOL, NO_ARG,
1, 0, 1, 0, 1, 0},
+ {"relay_log_recovery", OPT_RELAY_LOG_RECOVERY,
+ "Enables automatic relay log recovery right after the database startup, "
+ "which means that the IO Thread starts re-fetching from the master "
+ "right after the last transaction processed.",
+ (uchar**) &relay_log_recovery,
+ (uchar**) &relay_log_recovery, 0, GET_BOOL, NO_ARG,
+ 0, 0, 1, 0, 1, 0},
{"relay_log_space_limit", OPT_RELAY_LOG_SPACE_LIMIT,
"Maximum space to use for all relay logs.",
(uchar**) &relay_log_space_limit,
@@ -6986,8 +7017,23 @@ The minimum value for this variable is 4096.",
{"sync-binlog", OPT_SYNC_BINLOG,
"Synchronously flush binary log to disk after every #th event. "
"Use 0 (default) to disable synchronous flushing.",
- (uchar**) &sync_binlog_period, (uchar**) &sync_binlog_period, 0, GET_ULONG,
- REQUIRED_ARG, 0, 0, ULONG_MAX, 0, 1, 0},
+ (uchar**) &sync_binlog_period, (uchar**) &sync_binlog_period, 0, GET_UINT,
+ REQUIRED_ARG, 0, 0, (longlong) UINT_MAX, 0, 1, 0},
+ {"sync-relay-log", OPT_SYNC_RELAY_LOG,
+ "Synchronously flush relay log to disk after every #th event. "
+ "Use 0 (default) to disable synchronous flushing.",
+ (uchar**) &sync_relaylog_period, (uchar**) &sync_relaylog_period, 0, GET_UINT,
+ REQUIRED_ARG, 0, 0, (longlong) UINT_MAX, 0, 1, 0},
+ {"sync-relay-log-info", OPT_SYNC_RELAY_LOG_INFO,
+ "Synchronously flush relay log info to disk after #th transaction. "
+ "Use 0 (default) to disable synchronous flushing.",
+ (uchar**) &sync_relayloginfo_period, (uchar**) &sync_relayloginfo_period, 0, GET_UINT,
+ REQUIRED_ARG, 0, 0, (longlong) UINT_MAX, 0, 1, 0},
+ {"sync-master-info", OPT_SYNC_MASTER_INFO,
+ "Synchronously flush master info to disk after every #th event. "
+ "Use 0 (default) to disable synchronous flushing.",
+ (uchar**) &sync_masterinfo_period, (uchar**) &sync_masterinfo_period, 0, GET_UINT,
+ REQUIRED_ARG, 0, 0, (longlong) UINT_MAX, 0, 1, 0},
{"sync-frm", OPT_SYNC_FRM, "Sync .frm to disk on create. Enabled by default.",
(uchar**) &opt_sync_frm, (uchar**) &opt_sync_frm, 0, GET_BOOL, NO_ARG, 1, 0,
0, 0, 0, 0},
@@ -7113,7 +7159,8 @@ static int show_slave_running(THD *thd, SHOW_VAR *var, char *buff)
var->type= SHOW_MY_BOOL;
pthread_mutex_lock(&LOCK_active_mi);
var->value= buff;
- *((my_bool *)buff)= (my_bool) (active_mi && active_mi->slave_running &&
+ *((my_bool *)buff)= (my_bool) (active_mi &&
+ active_mi->slave_running == MYSQL_SLAVE_RUN_CONNECT &&
active_mi->rli.slave_running);
pthread_mutex_unlock(&LOCK_active_mi);
return 0;
@@ -7139,6 +7186,40 @@ static int show_slave_retried_trans(THD *thd, SHOW_VAR *var, char *buff)
pthread_mutex_unlock(&LOCK_active_mi);
return 0;
}
+
+static int show_slave_received_heartbeats(THD *thd, SHOW_VAR *var, char *buff)
+{
+ pthread_mutex_lock(&LOCK_active_mi);
+ if (active_mi)
+ {
+ var->type= SHOW_LONGLONG;
+ var->value= buff;
+ pthread_mutex_lock(&active_mi->rli.data_lock);
+ *((longlong *)buff)= active_mi->received_heartbeats;
+ pthread_mutex_unlock(&active_mi->rli.data_lock);
+ }
+ else
+ var->type= SHOW_UNDEF;
+ pthread_mutex_unlock(&LOCK_active_mi);
+ return 0;
+}
+
+static int show_heartbeat_period(THD *thd, SHOW_VAR *var, char *buff)
+{
+ pthread_mutex_lock(&LOCK_active_mi);
+ if (active_mi)
+ {
+ var->type= SHOW_CHAR;
+ var->value= buff;
+ my_sprintf(buff, (buff, "%.3f",active_mi->heartbeat_period));
+ }
+ else
+ var->type= SHOW_UNDEF;
+ pthread_mutex_unlock(&LOCK_active_mi);
+ return 0;
+}
+
+
#endif /* HAVE_REPLICATION */
static int show_open_tables(THD *thd, SHOW_VAR *var, char *buff)
@@ -7503,6 +7584,8 @@ SHOW_VAR status_vars[]= {
{"Slave_open_temp_tables", (char*) &slave_open_temp_tables, SHOW_LONG},
#ifdef HAVE_REPLICATION
{"Slave_retried_transactions",(char*) &show_slave_retried_trans, SHOW_FUNC},
+ {"Slave_heartbeat_period", (char*) &show_heartbeat_period, SHOW_FUNC},
+ {"Slave_received_heartbeats",(char*) &show_slave_received_heartbeats, SHOW_FUNC},
{"Slave_running", (char*) &show_slave_running, SHOW_FUNC},
#endif
{"Slow_launch_threads", (char*) &slow_launch_threads, SHOW_LONG},
diff --git a/sql/mysqld_suffix.h b/sql/mysqld_suffix.h
index 654d7cf88c1..c7ab212f2a2 100644
--- a/sql/mysqld_suffix.h
+++ b/sql/mysqld_suffix.h
@@ -1,3 +1,6 @@
+#ifndef MYSQLD_SUFFIX_INCLUDED
+#define MYSQLD_SUFFIX_INCLUDED
+
/* Copyright (C) 2000-2004 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -27,3 +30,4 @@
#else
#define MYSQL_SERVER_SUFFIX_STR MYSQL_SERVER_SUFFIX_DEF
#endif
+#endif /* MYSQLD_SUFFIX_INCLUDED */
diff --git a/sql/nt_servc.h b/sql/nt_servc.h
index 2f0d07df543..5bee42dedf0 100644
--- a/sql/nt_servc.h
+++ b/sql/nt_servc.h
@@ -1,3 +1,6 @@
+#ifndef NT_SERVC_INCLUDED
+#define NT_SERVC_INCLUDED
+
/**
@file
@@ -98,3 +101,5 @@ class NTService
};
/* ------------------------- the end -------------------------------------- */
+
+#endif /* NT_SERVC_INCLUDED */
diff --git a/sql/partition_element.h b/sql/partition_element.h
index bede5264c71..5575eb93bfb 100644
--- a/sql/partition_element.h
+++ b/sql/partition_element.h
@@ -1,3 +1,6 @@
+#ifndef PARTITION_ELEMENT_INCLUDED
+#define PARTITION_ELEMENT_INCLUDED
+
/* Copyright (C) 2006 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -98,3 +101,5 @@ public:
}
~partition_element() {}
};
+
+#endif /* PARTITION_ELEMENT_INCLUDED */
diff --git a/sql/partition_info.h b/sql/partition_info.h
index 9f438e8260b..b5a6c4a0961 100644
--- a/sql/partition_info.h
+++ b/sql/partition_info.h
@@ -1,3 +1,6 @@
+#ifndef PARTITION_INFO_INCLUDED
+#define PARTITION_INFO_INCLUDED
+
/* Copyright 2006-2008 MySQL AB, 2008 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify
@@ -314,3 +317,5 @@ void init_all_partitions_iterator(partition_info *part_info,
part_iter->ret_null_part= part_iter->ret_null_part_orig= FALSE;
part_iter->get_next= get_next_partition_id_range;
}
+
+#endif /* PARTITION_INFO_INCLUDED */
diff --git a/sql/procedure.h b/sql/procedure.h
index ceb586766b1..c6f50493876 100644
--- a/sql/procedure.h
+++ b/sql/procedure.h
@@ -1,3 +1,6 @@
+#ifndef PROCEDURE_INCLUDED
+#define PROCEDURE_INCLUDED
+
/* Copyright (C) 2000-2005 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -149,3 +152,5 @@ public:
Procedure *setup_procedure(THD *thd,ORDER *proc_param,select_result *result,
List<Item> &field_list,int *error);
+
+#endif /* PROCEDURE_INCLUDED */
diff --git a/sql/protocol.h b/sql/protocol.h
index 3b6a3bdb863..2857ee2e3ca 100644
--- a/sql/protocol.h
+++ b/sql/protocol.h
@@ -1,3 +1,6 @@
+#ifndef PROTOCOL_INCLUDED
+#define PROTOCOL_INCLUDED
+
/* Copyright (C) 2002-2006 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -184,3 +187,4 @@ uchar *net_store_data(uchar *to,const uchar *from, size_t length);
uchar *net_store_data(uchar *to,int32 from);
uchar *net_store_data(uchar *to,longlong from);
+#endif /* PROTOCOL_INCLUDED */
diff --git a/sql/repl_failsafe.h b/sql/repl_failsafe.h
index 6ff78067aca..bce2c727050 100644
--- a/sql/repl_failsafe.h
+++ b/sql/repl_failsafe.h
@@ -1,3 +1,6 @@
+#ifndef REPL_FAILSAFE_INCLUDED
+#define REPL_FAILSAFE_INCLUDED
+
/* Copyright (C) 2001-2005 MySQL AB & Sasha
This program is free software; you can redistribute it and/or modify
@@ -49,3 +52,4 @@ int register_slave(THD* thd, uchar* packet, uint packet_length);
void unregister_slave(THD* thd, bool only_mine, bool need_mutex);
#endif /* HAVE_REPLICATION */
+#endif /* REPL_FAILSAFE_INCLUDED */
diff --git a/sql/replication.h b/sql/replication.h
new file mode 100644
index 00000000000..6b7be58a5b1
--- /dev/null
+++ b/sql/replication.h
@@ -0,0 +1,490 @@
+/* Copyright (C) 2008 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef REPLICATION_H
+#define REPLICATION_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ Transaction observer flags.
+*/
+enum Trans_flags {
+ /** Transaction is a real transaction */
+ TRANS_IS_REAL_TRANS = 1
+};
+
+/**
+ Transaction observer parameter
+*/
+typedef struct Trans_param {
+ uint32 server_id;
+ uint32 flags;
+
+ /*
+ The latest binary log file name and position written by current
+ transaction, if binary log is disabled or no log event has been
+ written into binary log file by current transaction (events
+ written into transaction log cache are not counted), these two
+ member will be zero.
+ */
+ const char *log_file;
+ my_off_t log_pos;
+} Trans_param;
+
+/**
+ Observes and extends transaction execution
+*/
+typedef struct Trans_observer {
+ uint32 len;
+
+ /**
+ This callback is called after transaction commit
+
+ This callback is called right after commit to storage engines for
+ transactional tables.
+
+ For non-transactional tables, this is called at the end of the
+ statement, before sending statement status, if the statement
+ succeeded.
+
+ @note The return value is currently ignored by the server.
+
+ @param param The parameter for transaction observers
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*after_commit)(Trans_param *param);
+
+ /**
+ This callback is called after transaction rollback
+
+ This callback is called right after rollback to storage engines
+ for transactional tables.
+
+ For non-transactional tables, this is called at the end of the
+ statement, before sending statement status, if the statement
+ failed.
+
+ @note The return value is currently ignored by the server.
+
+ @param param The parameter for transaction observers
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*after_rollback)(Trans_param *param);
+} Trans_observer;
+
+/**
+ Binlog storage flags
+*/
+enum Binlog_storage_flags {
+ /** Binary log was sync:ed */
+ BINLOG_STORAGE_IS_SYNCED = 1
+};
+
+/**
+ Binlog storage observer parameters
+ */
+typedef struct Binlog_storage_param {
+ uint32 server_id;
+} Binlog_storage_param;
+
+/**
+ Observe binlog logging storage
+*/
+typedef struct Binlog_storage_observer {
+ uint32 len;
+
+ /**
+ This callback is called after binlog has been flushed
+
+ This callback is called after cached events have been flushed to
+ binary log file. Whether the binary log file is synchronized to
+ disk is indicated by the bit BINLOG_STORAGE_IS_SYNCED in @a flags.
+
+ @param param Observer common parameter
+ @param log_file Binlog file name been updated
+ @param log_pos Binlog position after update
+ @param flags flags for binlog storage
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*after_flush)(Binlog_storage_param *param,
+ const char *log_file, my_off_t log_pos,
+ uint32 flags);
+} Binlog_storage_observer;
+
+/**
+ Replication binlog transmitter (binlog dump) observer parameter.
+*/
+typedef struct Binlog_transmit_param {
+ uint32 server_id;
+ uint32 flags;
+} Binlog_transmit_param;
+
+/**
+ Observe and extends the binlog dumping thread.
+*/
+typedef struct Binlog_transmit_observer {
+ uint32 len;
+
+ /**
+ This callback is called when binlog dumping starts
+
+
+ @param param Observer common parameter
+ @param log_file Binlog file name to transmit from
+ @param log_pos Binlog position to transmit from
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*transmit_start)(Binlog_transmit_param *param,
+ const char *log_file, my_off_t log_pos);
+
+ /**
+ This callback is called when binlog dumping stops
+
+ @param param Observer common parameter
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*transmit_stop)(Binlog_transmit_param *param);
+
+ /**
+ This callback is called to reserve bytes in packet header for event transmission
+
+ This callback is called when resetting transmit packet header to
+ reserve bytes for this observer in packet header.
+
+ The @a header buffer is allocated by the server code, and @a size
+ is the size of the header buffer. Each observer can only reserve
+ a maximum size of @a size in the header.
+
+ @param param Observer common parameter
+ @param header Pointer of the header buffer
+ @param size Size of the header buffer
+ @param len Header length reserved by this observer
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*reserve_header)(Binlog_transmit_param *param,
+ unsigned char *header,
+ unsigned long size,
+ unsigned long *len);
+
+ /**
+ This callback is called before sending an event packet to slave
+
+ @param param Observer common parameter
+ @param packet Binlog event packet to send
+ @param len Length of the event packet
+ @param log_file Binlog file name of the event packet to send
+ @param log_pos Binlog position of the event packet to send
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*before_send_event)(Binlog_transmit_param *param,
+ unsigned char *packet, unsigned long len,
+ const char *log_file, my_off_t log_pos );
+
+ /**
+ This callback is called after sending an event packet to slave
+
+ @param param Observer common parameter
+ @param event_buf Binlog event packet buffer sent
+ @param len length of the event packet buffer
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*after_send_event)(Binlog_transmit_param *param,
+ const char *event_buf, unsigned long len);
+
+ /**
+ This callback is called after resetting master status
+
+ This is called when executing the command RESET MASTER, and is
+ used to reset status variables added by observers.
+
+ @param param Observer common parameter
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*after_reset_master)(Binlog_transmit_param *param);
+} Binlog_transmit_observer;
+
+/**
+ Binlog relay IO flags
+*/
+enum Binlog_relay_IO_flags {
+ /** Binary relay log was sync:ed */
+ BINLOG_RELAY_IS_SYNCED = 1
+};
+
+
+/**
+ Replication binlog relay IO observer parameter
+*/
+typedef struct Binlog_relay_IO_param {
+ uint32 server_id;
+
+ /* Master host, user and port */
+ char *host;
+ char *user;
+ unsigned int port;
+
+ char *master_log_name;
+ my_off_t master_log_pos;
+
+ MYSQL *mysql; /* the connection to master */
+} Binlog_relay_IO_param;
+
+/**
+ Observes and extends the service of slave IO thread.
+*/
+typedef struct Binlog_relay_IO_observer {
+ uint32 len;
+
+ /**
+ This callback is called when slave IO thread starts
+
+ @param param Observer common parameter
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*thread_start)(Binlog_relay_IO_param *param);
+
+ /**
+ This callback is called when slave IO thread stops
+
+ @param param Observer common parameter
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*thread_stop)(Binlog_relay_IO_param *param);
+
+ /**
+ This callback is called before slave requesting binlog transmission from master
+
+ This is called before slave issuing BINLOG_DUMP command to master
+ to request binlog.
+
+ @param param Observer common parameter
+ @param flags binlog dump flags
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*before_request_transmit)(Binlog_relay_IO_param *param, uint32 flags);
+
+ /**
+ This callback is called after read an event packet from master
+
+ @param param Observer common parameter
+ @param packet The event packet read from master
+ @param len Length of the event packet read from master
+ @param event_buf The event packet return after process
+ @param event_len The length of event packet return after process
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*after_read_event)(Binlog_relay_IO_param *param,
+ const char *packet, unsigned long len,
+ const char **event_buf, unsigned long *event_len);
+
+ /**
+ This callback is called after written an event packet to relay log
+
+ @param param Observer common parameter
+ @param event_buf Event packet written to relay log
+ @param event_len Length of the event packet written to relay log
+ @param flags flags for relay log
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*after_queue_event)(Binlog_relay_IO_param *param,
+ const char *event_buf, unsigned long event_len,
+ uint32 flags);
+
+ /**
+ This callback is called after reset slave relay log IO status
+
+ @param param Observer common parameter
+
+ @retval 0 Sucess
+ @retval 1 Failure
+ */
+ int (*after_reset_slave)(Binlog_relay_IO_param *param);
+} Binlog_relay_IO_observer;
+
+
+/**
+ Register a transaction observer
+
+ @param observer The transaction observer to register
+ @param p pointer to the internal plugin structure
+
+ @retval 0 Sucess
+ @retval 1 Observer already exists
+*/
+int register_trans_observer(Trans_observer *observer, void *p);
+
+/**
+ Unregister a transaction observer
+
+ @param observer The transaction observer to unregister
+ @param p pointer to the internal plugin structure
+
+ @retval 0 Sucess
+ @retval 1 Observer not exists
+*/
+int unregister_trans_observer(Trans_observer *observer, void *p);
+
+/**
+ Register a binlog storage observer
+
+ @param observer The binlog storage observer to register
+ @param p pointer to the internal plugin structure
+
+ @retval 0 Sucess
+ @retval 1 Observer already exists
+*/
+int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p);
+
+/**
+ Unregister a binlog storage observer
+
+ @param observer The binlog storage observer to unregister
+ @param p pointer to the internal plugin structure
+
+ @retval 0 Sucess
+ @retval 1 Observer not exists
+*/
+int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p);
+
+/**
+ Register a binlog transmit observer
+
+ @param observer The binlog transmit observer to register
+ @param p pointer to the internal plugin structure
+
+ @retval 0 Sucess
+ @retval 1 Observer already exists
+*/
+int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p);
+
+/**
+ Unregister a binlog transmit observer
+
+ @param observer The binlog transmit observer to unregister
+ @param p pointer to the internal plugin structure
+
+ @retval 0 Sucess
+ @retval 1 Observer not exists
+*/
+int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p);
+
+/**
+ Register a binlog relay IO (slave IO thread) observer
+
+ @param observer The binlog relay IO observer to register
+ @param p pointer to the internal plugin structure
+
+ @retval 0 Sucess
+ @retval 1 Observer already exists
+*/
+int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p);
+
+/**
+ Unregister a binlog relay IO (slave IO thread) observer
+
+ @param observer The binlog relay IO observer to unregister
+ @param p pointer to the internal plugin structure
+
+ @retval 0 Sucess
+ @retval 1 Observer not exists
+*/
+int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p);
+
+/**
+ Connect to master
+
+ This function can only used in the slave I/O thread context, and
+ will use the same master information to do the connection.
+
+ @code
+ MYSQL *mysql = mysql_init(NULL);
+ if (rpl_connect_master(mysql))
+ {
+ // do stuff with the connection
+ }
+ mysql_close(mysql); // close the connection
+ @endcode
+
+ @param mysql address of MYSQL structure to use, pass NULL will
+ create a new one
+
+ @return address of MYSQL structure on success, NULL on failure
+*/
+MYSQL *rpl_connect_master(MYSQL *mysql);
+
+/**
+ Set thread entering a condition
+
+ This function should be called before putting a thread to wait for
+ a condition. @a mutex should be held before calling this
+ function. After being waken up, @f thd_exit_cond should be called.
+
+ @param thd The thread entering the condition, NULL means current thread
+ @param cond The condition the thread is going to wait for
+ @param mutex The mutex associated with the condition, this must be
+ held before call this function
+ @param msg The new process message for the thread
+*/
+const char* thd_enter_cond(MYSQL_THD thd, pthread_cond_t *cond,
+ pthread_mutex_t *mutex, const char *msg);
+
+/**
+ Set thread leaving a condition
+
+ This function should be called after a thread being waken up for a
+ condition.
+
+ @param thd The thread entering the condition, NULL means current thread
+ @param old_msg The process message, ususally this should be the old process
+ message before calling @f thd_enter_cond
+*/
+void thd_exit_cond(MYSQL_THD thd, const char *old_msg);
+
+
+#ifdef __cplusplus
+}
+#endif
+#endif /* REPLICATION_H */
diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc
new file mode 100644
index 00000000000..da7aade5b99
--- /dev/null
+++ b/sql/rpl_handler.cc
@@ -0,0 +1,493 @@
+/* Copyright (C) 2008 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include "mysql_priv.h"
+
+#include "rpl_mi.h"
+#include "sql_repl.h"
+#include "log_event.h"
+#include "rpl_filter.h"
+#include <my_dir.h>
+#include "rpl_handler.h"
+
+Trans_delegate *transaction_delegate;
+Binlog_storage_delegate *binlog_storage_delegate;
+#ifdef HAVE_REPLICATION
+Binlog_transmit_delegate *binlog_transmit_delegate;
+Binlog_relay_IO_delegate *binlog_relay_io_delegate;
+#endif /* HAVE_REPLICATION */
+
+/*
+ structure to save transaction log filename and position
+*/
+typedef struct Trans_binlog_info {
+ my_off_t log_pos;
+ char log_file[FN_REFLEN];
+} Trans_binlog_info;
+
+static pthread_key(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+
+int get_user_var_int(const char *name,
+ long long int *value, int *null_value)
+{
+ my_bool null_val;
+ user_var_entry *entry=
+ (user_var_entry*) hash_search(&current_thd->user_vars,
+ (uchar*) name, strlen(name));
+ if (!entry)
+ return 1;
+ *value= entry->val_int(&null_val);
+ if (null_value)
+ *null_value= null_val;
+ return 0;
+}
+
+int get_user_var_real(const char *name,
+ double *value, int *null_value)
+{
+ my_bool null_val;
+ user_var_entry *entry=
+ (user_var_entry*) hash_search(&current_thd->user_vars,
+ (uchar*) name, strlen(name));
+ if (!entry)
+ return 1;
+ *value= entry->val_real(&null_val);
+ if (null_value)
+ *null_value= null_val;
+ return 0;
+}
+
+int get_user_var_str(const char *name, char *value,
+ size_t len, unsigned int precision, int *null_value)
+{
+ String str;
+ my_bool null_val;
+ user_var_entry *entry=
+ (user_var_entry*) hash_search(&current_thd->user_vars,
+ (uchar*) name, strlen(name));
+ if (!entry)
+ return 1;
+ entry->val_str(&null_val, &str, precision);
+ strncpy(value, str.c_ptr(), len);
+ if (null_value)
+ *null_value= null_val;
+ return 0;
+}
+
+int delegates_init()
+{
+ static unsigned long trans_mem[sizeof(Trans_delegate) / sizeof(unsigned long) + 1];
+ static unsigned long storage_mem[sizeof(Binlog_storage_delegate) / sizeof(unsigned long) + 1];
+#ifdef HAVE_REPLICATION
+ static unsigned long transmit_mem[sizeof(Binlog_transmit_delegate) / sizeof(unsigned long) + 1];
+ static unsigned long relay_io_mem[sizeof(Binlog_relay_IO_delegate)/ sizeof(unsigned long) + 1];
+#endif
+
+ if (!(transaction_delegate= new (trans_mem) Trans_delegate)
+ || (!transaction_delegate->is_inited())
+ || !(binlog_storage_delegate= new (storage_mem) Binlog_storage_delegate)
+ || (!binlog_storage_delegate->is_inited())
+#ifdef HAVE_REPLICATION
+ || !(binlog_transmit_delegate= new (transmit_mem) Binlog_transmit_delegate)
+ || (!binlog_transmit_delegate->is_inited())
+ || !(binlog_relay_io_delegate= new (relay_io_mem) Binlog_relay_IO_delegate)
+ || (!binlog_relay_io_delegate->is_inited())
+#endif /* HAVE_REPLICATION */
+ )
+ return 1;
+
+ if (pthread_key_create(&RPL_TRANS_BINLOG_INFO, NULL))
+ return 1;
+ return 0;
+}
+
+void delegates_destroy()
+{
+ if (transaction_delegate)
+ transaction_delegate->~Trans_delegate();
+ if (binlog_storage_delegate)
+ binlog_storage_delegate->~Binlog_storage_delegate();
+#ifdef HAVE_REPLICATION
+ if (binlog_transmit_delegate)
+ binlog_transmit_delegate->~Binlog_transmit_delegate();
+ if (binlog_relay_io_delegate)
+ binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
+#endif /* HAVE_REPLICATION */
+}
+
+/*
+ This macro is used by almost all the Delegate methods to iterate
+ over all the observers running given callback function of the
+ delegate .
+
+ Add observer plugins to the thd->lex list, after each statement, all
+ plugins add to thd->lex will be automatically unlocked.
+ */
+#define FOREACH_OBSERVER(r, f, thd, args) \
+ param.server_id= thd->server_id; \
+ read_lock(); \
+ Observer_info_iterator iter= observer_info_iter(); \
+ Observer_info *info= iter++; \
+ for (; info; info= iter++) \
+ { \
+ plugin_ref plugin= \
+ my_plugin_lock(thd, &info->plugin); \
+ if (!plugin) \
+ { \
+ r= 1; \
+ break; \
+ } \
+ if (((Observer *)info->observer)->f \
+ && ((Observer *)info->observer)->f args) \
+ { \
+ r= 1; \
+ plugin_unlock(thd, plugin); \
+ break; \
+ } \
+ plugin_unlock(thd, plugin); \
+ } \
+ unlock()
+
+
+int Trans_delegate::after_commit(THD *thd, bool all)
+{
+ Trans_param param;
+ bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
+ if (is_real_trans)
+ param.flags |= TRANS_IS_REAL_TRANS;
+
+ Trans_binlog_info *log_info=
+ my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+
+ param.log_file= log_info ? log_info->log_file : 0;
+ param.log_pos= log_info ? log_info->log_pos : 0;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, after_commit, thd, (&param));
+
+ /*
+ This is the end of a real transaction or autocommit statement, we
+ can free the memory allocated for binlog file and position.
+ */
+ if (is_real_trans && log_info)
+ {
+ my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
+ my_free(log_info, MYF(0));
+ }
+ return ret;
+}
+
+int Trans_delegate::after_rollback(THD *thd, bool all)
+{
+ Trans_param param;
+ bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
+ if (is_real_trans)
+ param.flags |= TRANS_IS_REAL_TRANS;
+
+ Trans_binlog_info *log_info=
+ my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+
+ param.log_file= log_info ? log_info->log_file : 0;
+ param.log_pos= log_info ? log_info->log_pos : 0;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, after_commit, thd, (&param));
+
+ /*
+ This is the end of a real transaction or autocommit statement, we
+ can free the memory allocated for binlog file and position.
+ */
+ if (is_real_trans && log_info)
+ {
+ my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
+ my_free(log_info, MYF(0));
+ }
+ return ret;
+}
+
+int Binlog_storage_delegate::after_flush(THD *thd,
+ const char *log_file,
+ my_off_t log_pos,
+ bool synced)
+{
+ Binlog_storage_param param;
+ uint32 flags=0;
+ if (synced)
+ flags |= BINLOG_STORAGE_IS_SYNCED;
+
+ Trans_binlog_info *log_info=
+ my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+
+ if (!log_info)
+ {
+ if(!(log_info=
+ (Trans_binlog_info *)my_malloc(sizeof(Trans_binlog_info), MYF(0))))
+ return 1;
+ my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, log_info);
+ }
+
+ strcpy(log_info->log_file, log_file+dirname_length(log_file));
+ log_info->log_pos = log_pos;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, after_flush, thd,
+ (&param, log_info->log_file, log_info->log_pos, flags));
+ return ret;
+}
+
+#ifdef HAVE_REPLICATION
+int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
+ const char *log_file,
+ my_off_t log_pos)
+{
+ Binlog_transmit_param param;
+ param.flags= flags;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, transmit_start, thd, (&param, log_file, log_pos));
+ return ret;
+}
+
+int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
+{
+ Binlog_transmit_param param;
+ param.flags= flags;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, transmit_stop, thd, (&param));
+ return ret;
+}
+
+int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
+ String *packet)
+{
+ /* NOTE2ME: Maximum extra header size for each observer, I hope 32
+ bytes should be enough for each Observer to reserve their extra
+ header. If later found this is not enough, we can increase this
+ /HEZX
+ */
+#define RESERVE_HEADER_SIZE 32
+ unsigned char header[RESERVE_HEADER_SIZE];
+ ulong hlen;
+ Binlog_transmit_param param;
+ param.flags= flags;
+ param.server_id= thd->server_id;
+
+ int ret= 0;
+ read_lock();
+ Observer_info_iterator iter= observer_info_iter();
+ Observer_info *info= iter++;
+ for (; info; info= iter++)
+ {
+ plugin_ref plugin=
+ my_plugin_lock(thd, &info->plugin);
+ if (!plugin)
+ {
+ ret= 1;
+ break;
+ }
+ hlen= 0;
+ if (((Observer *)info->observer)->reserve_header
+ && ((Observer *)info->observer)->reserve_header(&param,
+ header,
+ RESERVE_HEADER_SIZE,
+ &hlen))
+ {
+ ret= 1;
+ plugin_unlock(thd, plugin);
+ break;
+ }
+ plugin_unlock(thd, plugin);
+ if (hlen == 0)
+ continue;
+ if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen))
+ {
+ ret= 1;
+ break;
+ }
+ }
+ unlock();
+ return ret;
+}
+
+int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
+ String *packet,
+ const char *log_file,
+ my_off_t log_pos)
+{
+ Binlog_transmit_param param;
+ param.flags= flags;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, before_send_event, thd,
+ (&param, (uchar *)packet->c_ptr(),
+ packet->length(),
+ log_file+dirname_length(log_file), log_pos));
+ return ret;
+}
+
+int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
+ String *packet)
+{
+ Binlog_transmit_param param;
+ param.flags= flags;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, after_send_event, thd,
+ (&param, packet->c_ptr(), packet->length()));
+ return ret;
+}
+
+int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
+
+{
+ Binlog_transmit_param param;
+ param.flags= flags;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, after_reset_master, thd, (&param));
+ return ret;
+}
+
+void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
+ Master_info *mi)
+{
+ param->mysql= mi->mysql;
+ param->user= mi->user;
+ param->host= mi->host;
+ param->port= mi->port;
+ param->master_log_name= mi->master_log_name;
+ param->master_log_pos= mi->master_log_pos;
+}
+
+int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
+{
+ Binlog_relay_IO_param param;
+ init_param(&param, mi);
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, thread_start, thd, (&param));
+ return ret;
+}
+
+
+int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
+{
+
+ Binlog_relay_IO_param param;
+ init_param(&param, mi);
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, thread_stop, thd, (&param));
+ return ret;
+}
+
+int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
+ Master_info *mi,
+ ushort flags)
+{
+ Binlog_relay_IO_param param;
+ init_param(&param, mi);
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, before_request_transmit, thd, (&param, (uint32)flags));
+ return ret;
+}
+
+int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
+ const char *packet, ulong len,
+ const char **event_buf,
+ ulong *event_len)
+{
+ Binlog_relay_IO_param param;
+ init_param(&param, mi);
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, after_read_event, thd,
+ (&param, packet, len, event_buf, event_len));
+ return ret;
+}
+
+int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
+ const char *event_buf,
+ ulong event_len,
+ bool synced)
+{
+ Binlog_relay_IO_param param;
+ init_param(&param, mi);
+
+ uint32 flags=0;
+ if (synced)
+ flags |= BINLOG_STORAGE_IS_SYNCED;
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, after_queue_event, thd,
+ (&param, event_buf, event_len, flags));
+ return ret;
+}
+
+int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
+
+{
+ Binlog_relay_IO_param param;
+ init_param(&param, mi);
+
+ int ret= 0;
+ FOREACH_OBSERVER(ret, after_reset_slave, thd, (&param));
+ return ret;
+}
+#endif /* HAVE_REPLICATION */
+
+int register_trans_observer(Trans_observer *observer, void *p)
+{
+ return transaction_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_trans_observer(Trans_observer *observer, void *p)
+{
+ return transaction_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+
+int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
+{
+ return binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
+{
+ return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+
+#ifdef HAVE_REPLICATION
+int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
+{
+ return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
+{
+ return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+
+int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
+{
+ return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
+{
+ return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+#endif /* HAVE_REPLICATION */
diff --git a/sql/rpl_handler.h b/sql/rpl_handler.h
new file mode 100644
index 00000000000..4fb7b4e035b
--- /dev/null
+++ b/sql/rpl_handler.h
@@ -0,0 +1,213 @@
+/* Copyright (C) 2008 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef RPL_HANDLER_H
+#define RPL_HANDLER_H
+
+#include "mysql_priv.h"
+#include "rpl_mi.h"
+#include "rpl_rli.h"
+#include "sql_plugin.h"
+#include "replication.h"
+
+class Observer_info {
+public:
+ void *observer;
+ st_plugin_int *plugin_int;
+ plugin_ref plugin;
+
+ Observer_info(void *ob, st_plugin_int *p)
+ :observer(ob), plugin_int(p)
+ {
+ plugin= plugin_int_to_ref(plugin_int);
+ }
+};
+
+class Delegate {
+public:
+ typedef List<Observer_info> Observer_info_list;
+ typedef List_iterator<Observer_info> Observer_info_iterator;
+
+ int add_observer(void *observer, st_plugin_int *plugin)
+ {
+ int ret= FALSE;
+ if (!inited)
+ return TRUE;
+ write_lock();
+ Observer_info_iterator iter(observer_info_list);
+ Observer_info *info= iter++;
+ while (info && info->observer != observer)
+ info= iter++;
+ if (!info)
+ {
+ info= new Observer_info(observer, plugin);
+ if (!info || observer_info_list.push_back(info, &memroot))
+ ret= TRUE;
+ }
+ else
+ ret= TRUE;
+ unlock();
+ return ret;
+ }
+
+ int remove_observer(void *observer, st_plugin_int *plugin)
+ {
+ int ret= FALSE;
+ if (!inited)
+ return TRUE;
+ write_lock();
+ Observer_info_iterator iter(observer_info_list);
+ Observer_info *info= iter++;
+ while (info && info->observer != observer)
+ info= iter++;
+ if (info)
+ iter.remove();
+ else
+ ret= TRUE;
+ unlock();
+ return ret;
+ }
+
+ inline Observer_info_iterator observer_info_iter()
+ {
+ return Observer_info_iterator(observer_info_list);
+ }
+
+ inline bool is_empty()
+ {
+ return observer_info_list.is_empty();
+ }
+
+ inline int read_lock()
+ {
+ if (!inited)
+ return TRUE;
+ return rw_rdlock(&lock);
+ }
+
+ inline int write_lock()
+ {
+ if (!inited)
+ return TRUE;
+ return rw_wrlock(&lock);
+ }
+
+ inline int unlock()
+ {
+ if (!inited)
+ return TRUE;
+ return rw_unlock(&lock);
+ }
+
+ inline bool is_inited()
+ {
+ return inited;
+ }
+
+ Delegate()
+ {
+ inited= FALSE;
+ if (my_rwlock_init(&lock, NULL))
+ return;
+ init_sql_alloc(&memroot, 1024, 0);
+ inited= TRUE;
+ }
+ ~Delegate()
+ {
+ inited= FALSE;
+ rwlock_destroy(&lock);
+ free_root(&memroot, MYF(0));
+ }
+
+private:
+ Observer_info_list observer_info_list;
+ rw_lock_t lock;
+ MEM_ROOT memroot;
+ bool inited;
+};
+
+class Trans_delegate
+ :public Delegate {
+public:
+ typedef Trans_observer Observer;
+ int before_commit(THD *thd, bool all);
+ int before_rollback(THD *thd, bool all);
+ int after_commit(THD *thd, bool all);
+ int after_rollback(THD *thd, bool all);
+};
+
+class Binlog_storage_delegate
+ :public Delegate {
+public:
+ typedef Binlog_storage_observer Observer;
+ int after_flush(THD *thd, const char *log_file,
+ my_off_t log_pos, bool synced);
+};
+
+#ifdef HAVE_REPLICATION
+class Binlog_transmit_delegate
+ :public Delegate {
+public:
+ typedef Binlog_transmit_observer Observer;
+ int transmit_start(THD *thd, ushort flags,
+ const char *log_file, my_off_t log_pos);
+ int transmit_stop(THD *thd, ushort flags);
+ int reserve_header(THD *thd, ushort flags, String *packet);
+ int before_send_event(THD *thd, ushort flags,
+ String *packet, const
+ char *log_file, my_off_t log_pos );
+ int after_send_event(THD *thd, ushort flags,
+ String *packet);
+ int after_reset_master(THD *thd, ushort flags);
+};
+
+class Binlog_relay_IO_delegate
+ :public Delegate {
+public:
+ typedef Binlog_relay_IO_observer Observer;
+ int thread_start(THD *thd, Master_info *mi);
+ int thread_stop(THD *thd, Master_info *mi);
+ int before_request_transmit(THD *thd, Master_info *mi, ushort flags);
+ int after_read_event(THD *thd, Master_info *mi,
+ const char *packet, ulong len,
+ const char **event_buf, ulong *event_len);
+ int after_queue_event(THD *thd, Master_info *mi,
+ const char *event_buf, ulong event_len,
+ bool synced);
+ int after_reset_slave(THD *thd, Master_info *mi);
+private:
+ void init_param(Binlog_relay_IO_param *param, Master_info *mi);
+};
+#endif /* HAVE_REPLICATION */
+
+int delegates_init();
+void delegates_destroy();
+
+extern Trans_delegate *transaction_delegate;
+extern Binlog_storage_delegate *binlog_storage_delegate;
+#ifdef HAVE_REPLICATION
+extern Binlog_transmit_delegate *binlog_transmit_delegate;
+extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;
+#endif /* HAVE_REPLICATION */
+
+/*
+ if there is no observers in the delegate, we can return 0
+ immediately.
+*/
+#define RUN_HOOK(group, hook, args) \
+ (group ##_delegate->is_empty() ? \
+ 0 : group ##_delegate->hook args)
+
+#endif /* RPL_HANDLER_H */
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index 5e46837e948..e83e0ad0ba9 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -26,17 +26,21 @@
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
const char *default_val);
+int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val);
+int init_dynarray_intvar_from_file(DYNAMIC_ARRAY* arr, IO_CACHE* f);
-Master_info::Master_info()
+Master_info::Master_info(bool is_slave_recovery)
:Slave_reporting_capability("I/O"),
ssl(0), ssl_verify_server_cert(0), fd(-1), io_thd(0), inited(0),
- abort_slave(0),slave_running(0),
- slave_run_id(0)
+ rli(is_slave_recovery), abort_slave(0), slave_running(0),
+ slave_run_id(0), sync_counter(0),
+ heartbeat_period(0), received_heartbeats(0), master_id(0)
{
host[0] = 0; user[0] = 0; password[0] = 0;
ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0;
ssl_cipher[0]= 0; ssl_key[0]= 0;
+ my_init_dynamic_array(&ignore_server_ids, sizeof(::server_id), 16, 16);
bzero((char*) &file, sizeof(file));
pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
@@ -47,6 +51,7 @@ Master_info::Master_info()
Master_info::~Master_info()
{
+ delete_dynamic(&ignore_server_ids);
pthread_mutex_destroy(&run_lock);
pthread_mutex_destroy(&data_lock);
pthread_cond_destroy(&data_cond);
@@ -54,6 +59,43 @@ Master_info::~Master_info()
pthread_cond_destroy(&stop_cond);
}
+/**
+ A comparison function to be supplied as argument to @c sort_dynamic()
+ and @c bsearch()
+
+ @return -1 if first argument is less, 0 if it equal to, 1 if it is greater
+ than the second
+*/
+int change_master_server_id_cmp(ulong *id1, ulong *id2)
+{
+ return *id1 < *id2? -1 : (*id1 > *id2? 1 : 0);
+}
+
+
+/**
+ Reports if the s_id server has been configured to ignore events
+ it generates with
+
+ CHANGE MASTER IGNORE_SERVER_IDS= ( list of server ids )
+
+ Method is called from the io thread event receiver filtering.
+
+ @param s_id the master server identifier
+
+ @retval TRUE if s_id is in the list of ignored master servers,
+ @retval FALSE otherwise.
+ */
+bool Master_info::shall_ignore_server_id(ulong s_id)
+{
+ if (likely(ignore_server_ids.elements == 1))
+ return (* (ulong*) dynamic_array_ptr(&ignore_server_ids, 0)) == s_id;
+ else
+ return bsearch((const ulong *) &s_id,
+ ignore_server_ids.buffer,
+ ignore_server_ids.elements, sizeof(ulong),
+ (int (*) (const void*, const void*)) change_master_server_id_cmp)
+ != NULL;
+}
void init_master_info_with_options(Master_info* mi)
{
@@ -84,6 +126,17 @@ void init_master_info_with_options(Master_info* mi)
strmake(mi->ssl_key, master_ssl_key, sizeof(mi->ssl_key)-1);
/* Intentionally init ssl_verify_server_cert to 0, no option available */
mi->ssl_verify_server_cert= 0;
+ /*
+ always request heartbeat unless master_heartbeat_period is set
+ explicitly zero. Here is the default value for heartbeat period
+ if CHANGE MASTER did not specify it. (no data loss in conversion
+ as hb period has a max)
+ */
+ mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD,
+ (slave_net_timeout/2.0));
+ DBUG_ASSERT(mi->heartbeat_period > (float) 0.001
+ || mi->heartbeat_period == 0);
+
DBUG_VOID_RETURN;
}
@@ -93,9 +146,12 @@ enum {
/* 5.1.16 added value of master_ssl_verify_server_cert */
LINE_FOR_MASTER_SSL_VERIFY_SERVER_CERT= 15,
-
+ /* 6.0 added value of master_heartbeat_period */
+ LINE_FOR_MASTER_HEARTBEAT_PERIOD= 16,
+ /* 6.0 added value of master_ignore_server_id */
+ LINE_FOR_REPLICATE_IGNORE_SERVER_IDS= 17,
/* Number of lines currently used when saving master info file */
- LINES_IN_MASTER_INFO= LINE_FOR_MASTER_SSL_VERIFY_SERVER_CERT
+ LINES_IN_MASTER_INFO= LINE_FOR_REPLICATE_IGNORE_SERVER_IDS
};
int init_master_info(Master_info* mi, const char* master_info_fname,
@@ -197,6 +253,7 @@ file '%s')", fname);
mi->fd = fd;
int port, connect_retry, master_log_pos, lines;
int ssl= 0, ssl_verify_server_cert= 0;
+ float master_heartbeat_period= 0.0;
char *first_non_digit;
/*
@@ -281,7 +338,23 @@ file '%s')", fname);
if (lines >= LINE_FOR_MASTER_SSL_VERIFY_SERVER_CERT &&
init_intvar_from_file(&ssl_verify_server_cert, &mi->file, 0))
goto errwithmsg;
-
+ /*
+ Starting from 6.0 master_heartbeat_period might be
+ in the file
+ */
+ if (lines >= LINE_FOR_MASTER_HEARTBEAT_PERIOD &&
+ init_floatvar_from_file(&master_heartbeat_period, &mi->file, 0.0))
+ goto errwithmsg;
+ /*
+ Starting from 6.0 list of server_id of ignorable servers might be
+ in the file
+ */
+ if (lines >= LINE_FOR_REPLICATE_IGNORE_SERVER_IDS &&
+ init_dynarray_intvar_from_file(&mi->ignore_server_ids, &mi->file))
+ {
+ sql_print_error("Failed to initialize master info ignore_server_ids");
+ goto errwithmsg;
+ }
}
#ifndef HAVE_OPENSSL
@@ -300,6 +373,7 @@ file '%s')", fname);
mi->connect_retry= (uint) connect_retry;
mi->ssl= (my_bool) ssl;
mi->ssl_verify_server_cert= ssl_verify_server_cert;
+ mi->heartbeat_period= master_heartbeat_period;
}
DBUG_PRINT("master_info",("log_file_name: %s position: %ld",
mi->master_log_name,
@@ -310,6 +384,7 @@ file '%s')", fname);
goto err;
mi->inited = 1;
+ mi->rli.is_relay_log_recovery= FALSE;
// now change cache READ -> WRITE - must do this before flush_master_info
reinit_io_cache(&mi->file, WRITE_CACHE, 0L, 0, 1);
if ((error=test(flush_master_info(mi, 1))))
@@ -342,6 +417,7 @@ int flush_master_info(Master_info* mi, bool flush_relay_log_cache)
{
IO_CACHE* file = &mi->file;
char lbuf[22];
+ int err= 0;
DBUG_ENTER("flush_master_info");
DBUG_PRINT("enter",("master_pos: %ld", (long) mi->master_log_pos));
@@ -358,9 +434,35 @@ int flush_master_info(Master_info* mi, bool flush_relay_log_cache)
When we come to this place in code, relay log may or not be initialized;
the caller is responsible for setting 'flush_relay_log_cache' accordingly.
*/
- if (flush_relay_log_cache &&
- flush_io_cache(mi->rli.relay_log.get_log_file()))
- DBUG_RETURN(2);
+ if (flush_relay_log_cache)
+ {
+ IO_CACHE *log_file= mi->rli.relay_log.get_log_file();
+ if (flush_io_cache(log_file))
+ DBUG_RETURN(2);
+ }
+
+ /*
+ produce a line listing the total number and all the ignored server_id:s
+ */
+ char* ignore_server_ids_buf;
+ {
+ ignore_server_ids_buf=
+ (char *) my_malloc((sizeof(::server_id) * 3 + 1) *
+ (1 + mi->ignore_server_ids.elements), MYF(MY_WME));
+ if (!ignore_server_ids_buf)
+ DBUG_RETURN(1);
+ for (ulong i= 0, cur_len= my_sprintf(ignore_server_ids_buf,
+ (ignore_server_ids_buf, "%u",
+ mi->ignore_server_ids.elements));
+ i < mi->ignore_server_ids.elements; i++)
+ {
+ ulong s_id;
+ get_dynamic(&mi->ignore_server_ids, (uchar*) &s_id, i);
+ cur_len +=my_sprintf(ignore_server_ids_buf + cur_len,
+ (ignore_server_ids_buf + cur_len,
+ " %lu", s_id));
+ }
+ }
/*
We flushed the relay log BEFORE the master.info file, because if we crash
@@ -378,17 +480,27 @@ int flush_master_info(Master_info* mi, bool flush_relay_log_cache)
contents of file). But because of number of lines in the first line
of file we don't care about this garbage.
*/
-
+ char heartbeat_buf[sizeof(mi->heartbeat_period) * 4]; // buffer to suffice always
+ my_sprintf(heartbeat_buf, (heartbeat_buf, "%.3f", mi->heartbeat_period));
my_b_seek(file, 0L);
my_b_printf(file,
- "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n%d\n",
+ "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n",
LINES_IN_MASTER_INFO,
mi->master_log_name, llstr(mi->master_log_pos, lbuf),
mi->host, mi->user,
mi->password, mi->port, mi->connect_retry,
(int)(mi->ssl), mi->ssl_ca, mi->ssl_capath, mi->ssl_cert,
- mi->ssl_cipher, mi->ssl_key, mi->ssl_verify_server_cert);
- DBUG_RETURN(-flush_io_cache(file));
+ mi->ssl_cipher, mi->ssl_key, mi->ssl_verify_server_cert,
+ heartbeat_buf, ignore_server_ids_buf);
+ my_free(ignore_server_ids_buf, MYF(0));
+ err= flush_io_cache(file);
+ if (sync_masterinfo_period && !err &&
+ ++(mi->sync_counter) >= sync_masterinfo_period)
+ {
+ err= my_sync(mi->fd, MYF(MY_WME));
+ mi->sync_counter= 0;
+ }
+ DBUG_RETURN(-err);
}
diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
index 93fb0a98198..f822a6bc1b1 100644
--- a/sql/rpl_mi.h
+++ b/sql/rpl_mi.h
@@ -20,6 +20,7 @@
#include "rpl_rli.h"
#include "rpl_reporting.h"
+#include "my_sys.h"
/*****************************************************************************
@@ -58,8 +59,9 @@
class Master_info : public Slave_reporting_capability
{
public:
- Master_info();
+ Master_info(bool is_slave_recovery);
~Master_info();
+ bool shall_ignore_server_id(ulong s_id);
/* the variables below are needed because we can change masters on the fly */
char master_log_name[FN_REFLEN];
@@ -100,6 +102,16 @@ class Master_info : public Slave_reporting_capability
*/
long clock_diff_with_master;
+ /*
+ Keeps track of the number of events before fsyncing.
+ The option --sync-master-info determines how many
+ events should happen before fsyncing.
+ */
+ uint sync_counter;
+ float heartbeat_period; // interface with CHANGE MASTER or master.info
+ ulonglong received_heartbeats; // counter of received heartbeat events
+ DYNAMIC_ARRAY ignore_server_ids;
+ ulong master_id;
};
void init_master_info_with_options(Master_info* mi);
@@ -109,6 +121,7 @@ int init_master_info(Master_info* mi, const char* master_info_fname,
int thread_mask);
void end_master_info(Master_info* mi);
int flush_master_info(Master_info* mi, bool flush_relay_log_cache);
+int change_master_server_id_cmp(ulong *id1, ulong *id2);
#endif /* HAVE_REPLICATION */
#endif /* RPL_MI_H */
diff --git a/sql/rpl_record.cc b/sql/rpl_record.cc
index 14a80cbb4b6..8e80620dd2c 100644
--- a/sql/rpl_record.cc
+++ b/sql/rpl_record.cc
@@ -180,7 +180,8 @@ int
unpack_row(Relay_log_info const *rli,
TABLE *table, uint const colcnt,
uchar const *const row_data, MY_BITMAP const *cols,
- uchar const **const row_end, ulong *const master_reclength)
+ uchar const **const row_end, ulong *const master_reclength,
+ const bool abort_on_warning, const bool first_row)
{
DBUG_ENTER("unpack_row");
DBUG_ASSERT(row_data);
@@ -224,8 +225,35 @@ unpack_row(Relay_log_info const *rli,
/* Field...::unpack() cannot return 0 */
DBUG_ASSERT(pack_ptr != NULL);
- if ((null_bits & null_mask) && f->maybe_null())
- f->set_null();
+ if (null_bits & null_mask)
+ {
+ if (f->maybe_null())
+ {
+ DBUG_PRINT("debug", ("Was NULL; null mask: 0x%x; null bits: 0x%x",
+ null_mask, null_bits));
+ f->set_null();
+ }
+ else
+ {
+ MYSQL_ERROR::enum_warning_level error_type=
+ MYSQL_ERROR::WARN_LEVEL_NOTE;
+ if (abort_on_warning && (table->file->has_transactions() ||
+ first_row))
+ {
+ error = HA_ERR_ROWS_EVENT_APPLY;
+ error_type= MYSQL_ERROR::WARN_LEVEL_ERROR;
+ }
+ else
+ {
+ f->set_default();
+ error_type= MYSQL_ERROR::WARN_LEVEL_WARN;
+ }
+ push_warning_printf(current_thd, error_type,
+ ER_BAD_NULL_ERROR,
+ ER(ER_BAD_NULL_ERROR),
+ f->field_name);
+ }
+ }
else
{
f->set_notnull();
@@ -305,13 +333,17 @@ unpack_row(Relay_log_info const *rli,
@param table Table whose record[0] buffer is prepared.
@param skip Number of columns for which default/nullable check
should be skipped.
- @param check Indicates if errors should be raised when checking
- default/nullable field properties.
+ @param check Specifies if lack of default error needs checking.
+ @param abort_on_warning
+ Controls how to react on lack of a field's default.
+ The parameter mimics the master side one for
+ @c check_that_all_fields_are_given_values.
@returns 0 on success or a handler level error code
*/
int prepare_record(TABLE *const table,
- const uint skip, const bool check)
+ const uint skip, const bool check,
+ const bool abort_on_warning, const bool first_row)
{
DBUG_ENTER("prepare_record");
@@ -326,17 +358,37 @@ int prepare_record(TABLE *const table,
if (skip >= table->s->fields || !check)
DBUG_RETURN(0);
- /* Checking if exists default/nullable fields in the default values. */
-
- for (Field **field_ptr= table->field+skip ; *field_ptr ; ++field_ptr)
+ /*
+ For fields the extra fields on the slave, we check if they have a default.
+ The check follows the same rules as the INSERT query without specifying an
+ explicit value for a field not having the explicit default
+ (@c check_that_all_fields_are_given_values()).
+ */
+ for (Field **field_ptr= table->field+skip; *field_ptr; ++field_ptr)
{
uint32 const mask= NOT_NULL_FLAG | NO_DEFAULT_VALUE_FLAG;
Field *const f= *field_ptr;
-
- if (((f->flags & mask) == mask))
+ if ((f->flags & NO_DEFAULT_VALUE_FLAG) &&
+ (f->real_type() != MYSQL_TYPE_ENUM))
{
- my_error(ER_NO_DEFAULT_FOR_FIELD, MYF(0), f->field_name);
- error = HA_ERR_ROWS_EVENT_APPLY;
+
+ MYSQL_ERROR::enum_warning_level error_type=
+ MYSQL_ERROR::WARN_LEVEL_NOTE;
+ if (abort_on_warning && (table->file->has_transactions() ||
+ first_row))
+ {
+ error= HA_ERR_ROWS_EVENT_APPLY;
+ error_type= MYSQL_ERROR::WARN_LEVEL_ERROR;
+ }
+ else
+ {
+ f->set_default();
+ error_type= MYSQL_ERROR::WARN_LEVEL_WARN;
+ }
+ push_warning_printf(current_thd, error_type,
+ ER_NO_DEFAULT_FOR_FIELD,
+ ER(ER_NO_DEFAULT_FOR_FIELD),
+ f->field_name);
}
}
diff --git a/sql/rpl_record.h b/sql/rpl_record.h
index f9e64f0ab1d..6e8838f82b3 100644
--- a/sql/rpl_record.h
+++ b/sql/rpl_record.h
@@ -27,10 +27,13 @@ size_t pack_row(TABLE* table, MY_BITMAP const* cols,
int unpack_row(Relay_log_info const *rli,
TABLE *table, uint const colcnt,
uchar const *const row_data, MY_BITMAP const *cols,
- uchar const **const row_end, ulong *const master_reclength);
+ uchar const **const row_end, ulong *const master_reclength,
+ const bool abort_on_warning= TRUE, const bool first_row= TRUE);
// Fill table's record[0] with default values.
-int prepare_record(TABLE *const, const uint =0, const bool =FALSE);
+int prepare_record(TABLE *const table, const uint skip, const bool check,
+ const bool abort_on_warning= TRUE,
+ const bool first_row= TRUE);
#endif
#endif
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 82bd137b5b0..ae3cbf789a5 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -28,12 +28,13 @@ int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
const char *default_val);
-
-Relay_log_info::Relay_log_info()
+Relay_log_info::Relay_log_info(bool is_slave_recovery)
:Slave_reporting_capability("SQL"),
no_storage(FALSE), replicate_same_server_id(::replicate_same_server_id),
- info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
- cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0),
+ info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period),
+ sync_counter(0), is_relay_log_recovery(is_slave_recovery),
+ save_temporary_tables(0), cur_log_old_open_count(0), group_relay_log_pos(0),
+ event_relay_log_pos(0),
#if HAVE_purify
is_fake(FALSE),
#endif
@@ -258,6 +259,9 @@ Failed to open the existing relay log info file '%s' (errno %d)",
rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos;
rli->group_master_log_pos= master_log_pos;
+ if (rli->is_relay_log_recovery && init_recovery(rli->mi, &msg))
+ goto err;
+
if (init_relay_log_pos(rli,
rli->group_relay_log_name,
rli->group_relay_log_pos,
@@ -289,7 +293,10 @@ Failed to open the existing relay log info file '%s' (errno %d)",
*/
reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
if ((error= flush_relay_log_info(rli)))
- sql_print_error("Failed to flush relay log info file");
+ {
+ msg= "Failed to flush relay log info file";
+ goto err;
+ }
if (count_relay_log_space(rli))
{
msg="Error counting relay log space";
@@ -1188,7 +1195,6 @@ void Relay_log_info::cleanup_context(THD *thd, bool error)
*/
thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
- last_event_start_time= 0;
DBUG_VOID_RETURN;
}
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 171778d9675..fd36d18adae 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -96,6 +96,19 @@ public:
LOG_INFO linfo;
IO_CACHE cache_buf,*cur_log;
+ /*
+ Keeps track of the number of transactions that commits
+ before fsyncing. The option --sync-relay-log-info determines
+ how many transactions should commit before fsyncing.
+ */
+ uint sync_counter;
+
+ /*
+ Identifies when the recovery process is going on.
+ See sql/slave.cc:init_recovery for further details.
+ */
+ bool is_relay_log_recovery;
+
/* The following variables are safe to read any time */
/* IO_CACHE of the info file - set only during init or end */
@@ -267,7 +280,7 @@ public:
char slave_patternload_file[FN_REFLEN];
size_t slave_patternload_file_size;
- Relay_log_info();
+ Relay_log_info(bool is_slave_recovery);
~Relay_log_info();
/*
@@ -336,12 +349,10 @@ public:
void clear_tables_to_lock();
/*
- Used by row-based replication to detect that it should not stop at
- this event, but give it a chance to send more events. The time
- where the last event inside a group started is stored here. If the
- variable is zero, we are not in a group (but may be in a
- transaction).
- */
+ Used to defer stopping the SQL thread to give it a chance
+ to finish up the current group of events.
+ The timestamp is set and reset in @c sql_slave_killed().
+ */
time_t last_event_start_time;
/**
diff --git a/sql/scheduler.h b/sql/scheduler.h
index 46bbd300cbb..e7916031a27 100644
--- a/sql/scheduler.h
+++ b/sql/scheduler.h
@@ -1,3 +1,6 @@
+#ifndef SCHEDULER_INCLUDED
+#define SCHEDULER_INCLUDED
+
/* Copyright (C) 2007 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -58,3 +61,5 @@ enum pool_command_op
class thd_scheduler
{};
+
+#endif /* SCHEDULER_INCLUDED */
diff --git a/sql/set_var.cc b/sql/set_var.cc
index 8f246caa758..29ed552bfb0 100644
--- a/sql/set_var.cc
+++ b/sql/set_var.cc
@@ -1546,6 +1546,23 @@ static bool get_unsigned(THD *thd, set_var *var, ulonglong user_max,
}
+bool sys_var_uint_ptr::check(THD *thd, set_var *var)
+{
+ var->save_result.ulong_value= (ulong) var->value->val_uint();
+ return 0;
+}
+
+bool sys_var_uint_ptr::update(THD *thd, set_var *var)
+{
+ *value= (uint) var->save_result.ulong_value;
+ return 0;
+}
+
+void sys_var_uint_ptr::set_default(THD *thd, enum_var_type type)
+{
+ *value= (uint) option_limits->def_value;
+}
+
sys_var_long_ptr::
sys_var_long_ptr(sys_var_chain *chain, const char *name_arg, ulong *value_ptr_arg,
sys_after_update_func after_update_arg)
@@ -3135,6 +3152,15 @@ static bool set_option_autocommit(THD *thd, set_var *var)
ulonglong org_options= thd->options;
+ /*
+ If we are setting AUTOCOMMIT=1 and it was not already 1, then we
+ need to commit any outstanding transactions.
+ */
+ if (var->save_result.ulong_value != 0 &&
+ (thd->options & OPTION_NOT_AUTOCOMMIT) &&
+ ha_commit(thd))
+ return 1;
+
if (var->save_result.ulong_value != 0)
thd->options&= ~((sys_var_thd_bit*) var->var)->bit_flag;
else
@@ -3148,8 +3174,6 @@ static bool set_option_autocommit(THD *thd, set_var *var)
thd->options&= ~(ulonglong) (OPTION_BEGIN | OPTION_KEEP_LOG);
thd->transaction.all.modified_non_trans_table= FALSE;
thd->server_status|= SERVER_STATUS_AUTOCOMMIT;
- if (ha_commit(thd))
- return 1;
}
else
{
diff --git a/sql/set_var.h b/sql/set_var.h
index d52679550ab..a83de6425ae 100644
--- a/sql/set_var.h
+++ b/sql/set_var.h
@@ -1,3 +1,6 @@
+#ifndef SET_VAR_INCLUDED
+#define SET_VAR_INCLUDED
+
/* Copyright (C) 2002-2006 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -172,6 +175,27 @@ public:
{ return (uchar*) value; }
};
+/**
+ Unsigned int system variable class
+ */
+class sys_var_uint_ptr :public sys_var
+{
+public:
+ sys_var_uint_ptr(sys_var_chain *chain, const char *name_arg,
+ uint *value_ptr_arg,
+ sys_after_update_func after_update_arg= NULL)
+ :sys_var(name_arg, after_update_arg),
+ value(value_ptr_arg)
+ { chain_sys_var(chain); }
+ bool check(THD *thd, set_var *var);
+ bool update(THD *thd, set_var *var);
+ void set_default(THD *thd, enum_var_type type);
+ SHOW_TYPE show_type() { return SHOW_INT; }
+ uchar *value_ptr(THD *thd, enum_var_type type, LEX_STRING *base)
+ { return (uchar*) value; }
+private:
+ uint *value;
+};
/*
A global ulong variable that is protected by LOCK_global_system_variables
@@ -1490,3 +1514,5 @@ void free_key_cache(const char *name, KEY_CACHE *key_cache);
bool process_key_caches(process_key_cache_t func);
void delete_elements(I_List<NAMED_LIST> *list,
void (*free_element)(const char*, uchar*));
+
+#endif /* SET_VAR_INCLUDED */
diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt
index 62f49250a1a..2b2d60c0657 100644
--- a/sql/share/errmsg-utf8.txt
+++ b/sql/share/errmsg-utf8.txt
@@ -4620,7 +4620,7 @@ ER_USER_LIMIT_REACHED 42000
swe "Användare '%-.64s' har överskridit '%s' (nuvarande värde: %ld)"
ER_SPECIFIC_ACCESS_DENIED_ERROR 42000
nla "Toegang geweigerd. U moet het %-.128s privilege hebben voor deze operatie"
- eng "Access denied; you need the %-.128s privilege for this operation"
+ eng "Access denied; you need (at least one of) the %-.128s privilege(s) for this operation"
ger "Kein Zugriff. Hierfür wird die Berechtigung %-.128s benötigt"
ita "Accesso non consentito. Serve il privilegio %-.128s per questa operazione"
por "Acesso negado. Você precisa o privilégio %-.128s para essa operação"
@@ -6237,3 +6237,7 @@ ER_COND_ITEM_TOO_LONG
ER_UNKNOWN_LOCALE
eng "Unknown locale: '%-.64s'"
+
+ER_SLAVE_IGNORE_SERVER_IDS
+ eng "The requested server id %d clashes with the slave startup option --replicate-same-server-id"
+
diff --git a/sql/share/errmsg.txt b/sql/share/errmsg.txt
index bec7536b5d1..317087854c1 100644
--- a/sql/share/errmsg.txt
+++ b/sql/share/errmsg.txt
@@ -4620,7 +4620,7 @@ ER_USER_LIMIT_REACHED 42000
swe "Användare '%-.64s' har överskridit '%s' (nuvarande värde: %ld)"
ER_SPECIFIC_ACCESS_DENIED_ERROR 42000
nla "Toegang geweigerd. U moet het %-.128s privilege hebben voor deze operatie"
- eng "Access denied; you need the %-.128s privilege for this operation"
+ eng "Access denied; you need (at least one of) the %-.128s privilege(s) for this operation"
ger "Kein Zugriff. Hierfür wird die Berechtigung %-.128s benötigt"
ita "Accesso non consentito. Serve il privilegio %-.128s per questa operazione"
por "Acesso negado. Você precisa o privilégio %-.128s para essa operação"
@@ -6237,3 +6237,7 @@ ER_COND_ITEM_TOO_LONG
ER_UNKNOWN_LOCALE
eng "Unknown locale: '%-.64s'"
+
+ER_SLAVE_IGNORE_SERVER_IDS
+ eng "The requested server id %d clashes with the slave startup option --replicate-same-server-id"
+
diff --git a/sql/slave.cc b/sql/slave.cc
index 50742817f09..8c82dcaabe0 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -40,6 +40,7 @@
#include <errmsg.h>
#include <mysqld_error.h>
#include <mysys_err.h>
+#include "rpl_handler.h"
#ifdef HAVE_REPLICATION
@@ -48,6 +49,10 @@
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
#define MAX_SLAVE_RETRY_PAUSE 5
+/*
+ a parameter of sql_slave_killed() to defer the killed status
+*/
+#define SLAVE_WAIT_GROUP_DONE 60
bool use_slave_mask = 0;
MY_BITMAP slave_error_mask;
char slave_skip_error_names[SHOW_VAR_FUNC_BUFF_SIZE];
@@ -69,6 +74,8 @@ ulonglong relay_log_space_limit = 0;
int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
int events_till_abort = -1;
+static pthread_key(Master_info*, RPL_MASTER_INFO);
+
enum enum_slave_reconnect_actions
{
SLAVE_RECON_ACT_REG= 0,
@@ -220,6 +227,7 @@ void unlock_slave_threads(Master_info* mi)
int init_slave()
{
DBUG_ENTER("init_slave");
+ int error= 0;
/*
This is called when mysqld starts. Before client connections are
@@ -231,7 +239,10 @@ int init_slave()
TODO: re-write this to interate through the list of files
for multi-master
*/
- active_mi= new Master_info;
+ active_mi= new Master_info(relay_log_recovery);
+
+ if (pthread_key_create(&RPL_MASTER_INFO, NULL))
+ goto err;
/*
If --slave-skip-errors=... was not used, the string value for the
@@ -250,6 +261,7 @@ int init_slave()
if (!active_mi)
{
sql_print_error("Failed to allocate memory for the master info structure");
+ error= 1;
goto err;
}
@@ -257,6 +269,7 @@ int init_slave()
!master_host, (SLAVE_IO | SLAVE_SQL)))
{
sql_print_error("Failed to initialize the master info structure");
+ error= 1;
goto err;
}
@@ -275,18 +288,69 @@ int init_slave()
SLAVE_IO | SLAVE_SQL))
{
sql_print_error("Failed to create slave threads");
+ error= 1;
goto err;
}
}
- pthread_mutex_unlock(&LOCK_active_mi);
- DBUG_RETURN(0);
err:
pthread_mutex_unlock(&LOCK_active_mi);
- DBUG_RETURN(1);
+ DBUG_RETURN(error);
}
+/*
+ Updates the master info based on the information stored in the
+ relay info and ignores relay logs previously retrieved by the IO
+ thread, which thus starts fetching again based on to the
+ group_master_log_pos and group_master_log_name. Eventually, the old
+ relay logs will be purged by the normal purge mechanism.
+
+ In the feature, we should improve this routine in order to avoid throwing
+ away logs that are safely stored in the disk. Note also that this recovery
+ routine relies on the correctness of the relay-log.info and only tolerates
+ coordinate problems in master.info.
+
+ In this function, there is no need for a mutex as the caller
+ (i.e. init_slave) already has one acquired.
+
+ Specifically, the following structures are updated:
+
+ 1 - mi->master_log_pos <-- rli->group_master_log_pos
+ 2 - mi->master_log_name <-- rli->group_master_log_name
+ 3 - It moves the relay log to the new relay log file, by
+ rli->group_relay_log_pos <-- BIN_LOG_HEADER_SIZE;
+ rli->event_relay_log_pos <-- BIN_LOG_HEADER_SIZE;
+ rli->group_relay_log_name <-- rli->relay_log.get_log_fname();
+ rli->event_relay_log_name <-- rli->relay_log.get_log_fname();
+
+ If there is an error, it returns (1), otherwise returns (0).
+ */
+int init_recovery(Master_info* mi, const char** errmsg)
+{
+ DBUG_ENTER("init_recovery");
+
+ Relay_log_info *rli= &mi->rli;
+ if (rli->group_master_log_name[0])
+ {
+ mi->master_log_pos= max(BIN_LOG_HEADER_SIZE,
+ rli->group_master_log_pos);
+ strmake(mi->master_log_name, rli->group_master_log_name,
+ sizeof(mi->master_log_name)-1);
+
+ sql_print_warning("Recovery from master pos %ld and file %s.",
+ (ulong) mi->master_log_pos, mi->master_log_name);
+
+ strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(),
+ sizeof(rli->group_relay_log_name)-1);
+ strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(),
+ sizeof(mi->rli.event_relay_log_name)-1);
+
+ rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
+ }
+ DBUG_RETURN(0);
+}
+
/**
Convert slave skip errors bitmap into a printable string.
*/
@@ -730,44 +794,92 @@ static bool io_slave_killed(THD* thd, Master_info* mi)
DBUG_RETURN(mi->abort_slave || abort_loop || thd->killed);
}
+/**
+ The function analyzes a possible killed status and makes
+ a decision whether to accept it or not.
+ Normally upon accepting the sql thread goes to shutdown.
+ In the event of deffering decision @rli->last_event_start_time waiting
+ timer is set to force the killed status be accepted upon its expiration.
+
+ @param thd pointer to a THD instance
+ @param rli pointer to Relay_log_info instance
+ @return TRUE the killed status is recognized, FALSE a possible killed
+ status is deferred.
+*/
static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
{
+ bool ret= FALSE;
DBUG_ENTER("sql_slave_killed");
DBUG_ASSERT(rli->sql_thd == thd);
DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun
if (abort_loop || thd->killed || rli->abort_slave)
{
- if (rli->abort_slave && rli->is_in_group() &&
- thd->transaction.all.modified_non_trans_table)
- DBUG_RETURN(0);
- /*
- If we are in an unsafe situation (stopping could corrupt replication),
- we give one minute to the slave SQL thread of grace before really
- terminating, in the hope that it will be able to read more events and
- the unsafe situation will soon be left. Note that this one minute starts
- from the last time anything happened in the slave SQL thread. So it's
- really one minute of idleness, we don't timeout if the slave SQL thread
- is actively working.
- */
- if (rli->last_event_start_time == 0)
- DBUG_RETURN(1);
- DBUG_PRINT("info", ("Slave SQL thread is in an unsafe situation, giving "
- "it some grace period"));
- if (difftime(time(0), rli->last_event_start_time) > 60)
+ if (thd->transaction.all.modified_non_trans_table && rli->is_in_group())
{
- rli->report(ERROR_LEVEL, 0,
- "SQL thread had to stop in an unsafe situation, in "
- "the middle of applying updates to a "
- "non-transactional table without any primary key. "
- "There is a risk of duplicate updates when the slave "
- "SQL thread is restarted. Please check your tables' "
- "contents after restart.");
- DBUG_RETURN(1);
+ char msg_stopped[]=
+ "... The slave SQL is stopped, leaving the current group "
+ "of events unfinished with a non-transaction table changed. "
+ "If the group consists solely of Row-based events, you can try "
+ "restarting the slave with --slave-exec-mode=IDEMPOTENT, which "
+ "ignores duplicate key, key not found, and similar errors (see "
+ "documentation for details).";
+
+ if (rli->abort_slave)
+ {
+ DBUG_PRINT("info", ("Slave SQL thread is being stopped in the middle of"
+ " a group having updated a non-trans table, giving"
+ " it some grace period"));
+
+ /*
+ Slave sql thread shutdown in face of unfinished group modified
+ Non-trans table is handled via a timer. The slave may eventually
+ give out to complete the current group and in that case there
+ might be issues at consequent slave restart, see the error message.
+ WL#2975 offers a robust solution requiring to store the last exectuted
+ event's coordinates along with the group's coordianates
+ instead of waiting with @c last_event_start_time the timer.
+ */
+
+ if (rli->last_event_start_time == 0)
+ rli->last_event_start_time= my_time(0);
+ ret= difftime(my_time(0), rli->last_event_start_time) <=
+ SLAVE_WAIT_GROUP_DONE ? FALSE : TRUE;
+
+ DBUG_EXECUTE_IF("stop_slave_middle_group",
+ DBUG_EXECUTE_IF("incomplete_group_in_relay_log",
+ ret= TRUE;);); // time is over
+
+ if (ret == 0)
+ {
+ rli->report(WARNING_LEVEL, 0,
+ "slave SQL thread is being stopped in the middle "
+ "of applying of a group having updated a non-transaction "
+ "table; waiting for the group completion ... ");
+ }
+ else
+ {
+ rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ ER(ER_SLAVE_FATAL_ERROR), msg_stopped);
+ }
+ }
+ else
+ {
+ ret= TRUE;
+ rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, ER(ER_SLAVE_FATAL_ERROR),
+ msg_stopped);
+ }
+ }
+ else
+ {
+ ret= TRUE;
}
}
- DBUG_RETURN(0);
+ if (ret)
+ rli->last_event_start_time= 0;
+
+ DBUG_RETURN(ret);
}
@@ -860,6 +972,126 @@ int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
DBUG_RETURN(1);
}
+int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
+{
+ char buf[16];
+ DBUG_ENTER("init_floatvar_from_file");
+
+
+ if (my_b_gets(f, buf, sizeof(buf)))
+ {
+ if (sscanf(buf, "%f", var) != 1)
+ DBUG_RETURN(1);
+ else
+ DBUG_RETURN(0);
+ }
+ else if (default_val != 0.0)
+ {
+ *var = default_val;
+ DBUG_RETURN(0);
+ }
+ DBUG_RETURN(1);
+}
+
+
+/**
+ A master info read method
+
+ This function is called from @c init_master_info() along with
+ relatives to restore some of @c active_mi members.
+ Particularly, this function is responsible for restoring
+ IGNORE_SERVER_IDS list of servers whose events the slave is
+ going to ignore (to not log them in the relay log).
+ Items being read are supposed to be decimal output of values of a
+ type shorter or equal of @c long and separated by the single space.
+
+ @param arr @c DYNAMIC_ARRAY pointer to storage for servers id
+ @param f @c IO_CACHE pointer to the source file
+
+ @retval 0 All OK
+ @retval non-zero An error
+*/
+
+int init_dynarray_intvar_from_file(DYNAMIC_ARRAY* arr, IO_CACHE* f)
+{
+ int ret= 0;
+ char buf[16 * (sizeof(long)*4 + 1)]; // static buffer to use most of times
+ char *buf_act= buf; // actual buffer can be dynamic if static is short
+ char *token, *last;
+ uint num_items; // number of items of `arr'
+ size_t read_size;
+ DBUG_ENTER("init_dynarray_intvar_from_file");
+
+ if ((read_size= my_b_gets(f, buf_act, sizeof(buf))) == 0)
+ {
+ return 0; // no line in master.info
+ }
+ if (read_size + 1 == sizeof(buf) && buf[sizeof(buf) - 2] != '\n')
+ {
+ /*
+ short read happend; allocate sufficient memory and make the 2nd read
+ */
+ char buf_work[(sizeof(long)*3 + 1)*16];
+ memcpy(buf_work, buf, sizeof(buf_work));
+ num_items= atoi(strtok_r(buf_work, " ", &last));
+ size_t snd_size;
+ /*
+ max size lower bound approximate estimation bases on the formula:
+ (the items number + items themselves) *
+ (decimal size + space) - 1 + `\n' + '\0'
+ */
+ size_t max_size= (1 + num_items) * (sizeof(long)*3 + 1) + 1;
+ buf_act= (char*) my_malloc(max_size, MYF(MY_WME));
+ memcpy(buf_act, buf, read_size);
+ snd_size= my_b_gets(f, buf_act + read_size, max_size - read_size);
+ if (snd_size == 0 ||
+ (snd_size + 1 == max_size - read_size) && buf[max_size - 2] != '\n')
+ {
+ /*
+ failure to make the 2nd read or short read again
+ */
+ ret= 1;
+ goto err;
+ }
+ }
+ token= strtok_r(buf_act, " ", &last);
+ if (token == NULL)
+ {
+ ret= 1;
+ goto err;
+ }
+ num_items= atoi(token);
+ for (uint i=0; i < num_items; i++)
+ {
+ token= strtok_r(NULL, " ", &last);
+ if (token == NULL)
+ {
+ ret= 1;
+ goto err;
+ }
+ else
+ {
+ ulong val= atol(token);
+ insert_dynamic(arr, (uchar *) &val);
+ }
+ }
+err:
+ if (buf_act != buf)
+ my_free(buf_act, MYF(0));
+ DBUG_RETURN(ret);
+}
+
+
+static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info)
+{
+ if (io_slave_killed(thd, mi))
+ {
+ if (info && global_system_variables.log_warnings)
+ sql_print_information(info);
+ return TRUE;
+ }
+ return FALSE;
+}
/*
Check if the error is caused by network.
@@ -1028,7 +1260,7 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
(master_res= mysql_store_result(mysql)) &&
(master_row= mysql_fetch_row(master_res)))
{
- if ((::server_id == strtoul(master_row[1], 0, 10)) &&
+ if ((::server_id == (mi->master_id= strtoul(master_row[1], 0, 10))) &&
!mi->rli.replicate_same_server_id)
{
errmsg= "The slave I/O thread stops because master and slave have equal \
@@ -1066,6 +1298,13 @@ maybe it is a *VERY OLD MASTER*.");
mysql_free_result(master_res);
master_res= NULL;
}
+ if (mi->master_id == 0 && mi->ignore_server_ids.elements > 0)
+ {
+ errmsg= "Slave configured with server id filtering could not detect the master server id.";
+ err_code= ER_SLAVE_FATAL_ERROR;
+ sprintf(err_buff, ER(err_code), errmsg);
+ goto err;
+ }
/*
Check that the master's global character_set_server and ours are the same.
@@ -1189,6 +1428,31 @@ when it try to get the value of TIME_ZONE global variable from master.";
}
}
+ if (mi->heartbeat_period != 0.0)
+ {
+ char llbuf[22];
+ const char query_format[]= "SET @master_heartbeat_period= %s";
+ char query[sizeof(query_format) - 2 + sizeof(llbuf)];
+ /*
+ the period is an ulonglong of nano-secs.
+ */
+ llstr((ulonglong) (mi->heartbeat_period*1000000000UL), llbuf);
+ my_sprintf(query, (query, query_format, llbuf));
+
+ if (mysql_real_query(mysql, query, strlen(query))
+ && !check_io_slave_killed(mi->io_thd, mi, NULL))
+ {
+ errmsg= "The slave I/O thread stops because SET @master_heartbeat_period "
+ "on master failed.";
+ err_code= ER_SLAVE_FATAL_ERROR;
+ sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
+ mysql_free_result(mysql_store_result(mysql));
+ goto err;
+ }
+ mysql_free_result(mysql_store_result(mysql));
+ }
+
+
err:
if (errmsg)
{
@@ -1604,6 +1868,10 @@ bool show_master_info(THD* thd, Master_info* mi)
field_list.push_back(new Item_empty_string("Last_IO_Error", 20));
field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, MYSQL_TYPE_LONG));
field_list.push_back(new Item_empty_string("Last_SQL_Error", 20));
+ field_list.push_back(new Item_empty_string("Replicate_Ignore_Server_Ids",
+ FN_REFLEN));
+ field_list.push_back(new Item_return_int("Master_Server_Id", sizeof(ulong),
+ MYSQL_TYPE_LONG));
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
@@ -1639,7 +1907,8 @@ bool show_master_info(THD* thd, Master_info* mi)
protocol->store((ulonglong) mi->rli.group_relay_log_pos);
protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
protocol->store(mi->slave_running == MYSQL_SLAVE_RUN_CONNECT ?
- "Yes" : "No", &my_charset_bin);
+ "Yes" : (mi->slave_running == MYSQL_SLAVE_RUN_NOT_CONNECT ?
+ "Connecting" : "No"), &my_charset_bin);
protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
protocol->store(rpl_filter->get_do_db());
protocol->store(rpl_filter->get_ignore_db());
@@ -1725,6 +1994,32 @@ bool show_master_info(THD* thd, Master_info* mi)
protocol->store(mi->rli.last_error().number);
// Last_SQL_Error
protocol->store(mi->rli.last_error().message, &my_charset_bin);
+ // Replicate_Ignore_Server_Ids
+ {
+ char buff[FN_REFLEN];
+ ulong i, cur_len;
+ for (i= 0, buff[0]= 0, cur_len= 0;
+ i < mi->ignore_server_ids.elements; i++)
+ {
+ ulong s_id, slen;
+ char sbuff[FN_REFLEN];
+ get_dynamic(&mi->ignore_server_ids, (uchar*) &s_id, i);
+ slen= my_sprintf(sbuff, (sbuff, (i==0? "%lu" : ", %lu"), s_id));
+ if (cur_len + slen + 4 > FN_REFLEN)
+ {
+ /*
+ break the loop whenever remained space could not fit
+ ellipses on the next cycle
+ */
+ my_sprintf(buff + cur_len, (buff + cur_len, "..."));
+ break;
+ }
+ cur_len += my_sprintf(buff + cur_len, (buff + cur_len, "%s", sbuff));
+ }
+ protocol->store(buff, &my_charset_bin);
+ }
+ // Master_Server_id
+ protocol->store((uint32) mi->master_id);
pthread_mutex_unlock(&mi->rli.err_lock);
pthread_mutex_unlock(&mi->err_lock);
@@ -1868,17 +2163,22 @@ static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
}
-static int request_dump(MYSQL* mysql, Master_info* mi,
- bool *suppress_warnings)
+static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
+ bool *suppress_warnings)
{
uchar buf[FN_REFLEN + 10];
int len;
- int binlog_flags = 0; // for now
+ ushort binlog_flags = 0; // for now
char* logname = mi->master_log_name;
DBUG_ENTER("request_dump");
*suppress_warnings= FALSE;
+ if (RUN_HOOK(binlog_relay_io,
+ before_request_transmit,
+ (thd, mi, binlog_flags)))
+ DBUG_RETURN(1);
+
// TODO if big log files: Change next to int8store()
int4store(buf, (ulong) mi->master_log_pos);
int2store(buf + 4, binlog_flags);
@@ -2274,6 +2574,27 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
delete ev;
DBUG_RETURN(1);
}
+
+ { /**
+ The following failure injecion works in cooperation with tests
+ setting @@global.debug= 'd,incomplete_group_in_relay_log'.
+ Xid or Commit events are not executed to force the slave sql
+ read hanging if the realy log does not have any more events.
+ */
+ DBUG_EXECUTE_IF("incomplete_group_in_relay_log",
+ if ((ev->get_type_code() == XID_EVENT) ||
+ ((ev->get_type_code() == QUERY_EVENT) &&
+ strcmp("COMMIT", ((Query_log_event *) ev)->query) == 0))
+ {
+ DBUG_ASSERT(thd->transaction.all.modified_non_trans_table);
+ rli->abort_slave= 1;
+ pthread_mutex_unlock(&rli->data_lock);
+ delete ev;
+ rli->inc_event_relay_log_pos();
+ DBUG_RETURN(0);
+ };);
+ }
+
exec_res= apply_event_and_update_pos(ev, thd, rli);
/*
@@ -2377,18 +2698,6 @@ on this slave.\
}
-static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info)
-{
- if (io_slave_killed(thd, mi))
- {
- if (info && global_system_variables.log_warnings)
- sql_print_information("%s", info);
- return TRUE;
- }
- return FALSE;
-}
-
-
/**
@brief Try to reconnect slave IO thread.
@@ -2528,6 +2837,16 @@ pthread_handler_t handle_slave_io(void *arg)
mi->master_log_name,
llstr(mi->master_log_pos,llbuff)));
+ /* This must be called before run any binlog_relay_io hooks */
+ my_pthread_setspecific_ptr(RPL_MASTER_INFO, mi);
+
+ if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)))
+ {
+ mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ ER(ER_SLAVE_FATAL_ERROR), "Failed to run 'thread_start' hook");
+ goto err;
+ }
+
if (!(mi->mysql = mysql = mysql_init(NULL)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
@@ -2617,7 +2936,7 @@ connected:
while (!io_slave_killed(thd,mi))
{
thd_proc_info(thd, "Requesting binlog dump");
- if (request_dump(mysql, mi, &suppress_warnings))
+ if (request_dump(thd, mysql, mi, &suppress_warnings))
{
sql_print_error("Failed on request_dump()");
if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
@@ -2637,6 +2956,7 @@ requesting master dump") ||
goto err;
goto connected;
});
+ const char *event_buf;
DBUG_ASSERT(mi->last_error().number == 0);
while (!io_slave_killed(thd,mi))
@@ -2697,14 +3017,37 @@ Stopping slave I/O thread due to out-of-memory error from master");
retry_count=0; // ok event, reset retry counter
thd_proc_info(thd, "Queueing master event to the relay log");
- if (queue_event(mi,(const char*)mysql->net.read_pos + 1,
- event_len))
+ event_buf= (const char*)mysql->net.read_pos + 1;
+ if (RUN_HOOK(binlog_relay_io, after_read_event,
+ (thd, mi,(const char*)mysql->net.read_pos + 1,
+ event_len, &event_buf, &event_len)))
+ {
+ mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ ER(ER_SLAVE_FATAL_ERROR),
+ "Failed to run 'after_read_event' hook");
+ goto err;
+ }
+
+ /* XXX: 'synced' should be updated by queue_event to indicate
+ whether event has been synced to disk */
+ bool synced= 0;
+ if (queue_event(mi, event_buf, event_len))
{
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"could not queue event from master");
goto err;
}
+
+ if (RUN_HOOK(binlog_relay_io, after_queue_event,
+ (thd, mi, event_buf, event_len, synced)))
+ {
+ mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ ER(ER_SLAVE_FATAL_ERROR),
+ "Failed to run 'after_queue_event' hook");
+ goto err;
+ }
+
if (flush_master_info(mi, 1))
{
sql_print_error("Failed to flush master info file");
@@ -2750,6 +3093,7 @@ err:
// print the current replication position
sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s",
IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
+ RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi));
thd->set_query(NULL, 0);
thd->reset_db(NULL, 0);
if (mysql)
@@ -3555,9 +3899,11 @@ static int queue_old_event(Master_info *mi, const char *buf,
static int queue_event(Master_info* mi,const char* buf, ulong event_len)
{
int error= 0;
+ String error_msg;
ulong inc_pos;
Relay_log_info *rli= &mi->rli;
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
+ ulong s_id;
DBUG_ENTER("queue_event");
LINT_INIT(inc_pos);
@@ -3589,7 +3935,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
if (unlikely(process_io_rotate(mi,&rev)))
{
- error= 1;
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
goto err;
}
/*
@@ -3616,7 +3962,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
Log_event::read_log_event(buf, event_len, &errmsg,
mi->rli.relay_log.description_event_for_queue)))
{
- error= 2;
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
goto err;
}
delete mi->rli.relay_log.description_event_for_queue;
@@ -3635,6 +3981,56 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
}
break;
+
+ case HEARTBEAT_LOG_EVENT:
+ {
+ /*
+ HB (heartbeat) cannot come before RL (Relay)
+ */
+ char llbuf[22];
+ Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue);
+ if (!hb.is_valid())
+ {
+ error= ER_SLAVE_HEARTBEAT_FAILURE;
+ error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
+ error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
+ error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident()));
+ error_msg.append(STRING_WITH_LEN(" log_pos "));
+ llstr(hb.log_pos, llbuf);
+ error_msg.append(llbuf, strlen(llbuf));
+ goto err;
+ }
+ mi->received_heartbeats++;
+ /*
+ compare local and event's versions of log_file, log_pos.
+
+ Heartbeat is sent only after an event corresponding to the corrdinates
+ the heartbeat carries.
+ Slave can not have a difference in coordinates except in the only
+ special case when mi->master_log_name, master_log_pos have never
+ been updated by Rotate event i.e when slave does not have any history
+ with the master (and thereafter mi->master_log_pos is NULL).
+
+ TODO: handling `when' for SHOW SLAVE STATUS' snds behind
+ */
+ if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len())
+ && mi->master_log_name != NULL)
+ || mi->master_log_pos != hb.log_pos)
+ {
+ /* missed events of heartbeat from the past */
+ error= ER_SLAVE_HEARTBEAT_FAILURE;
+ error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
+ error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
+ error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident()));
+ error_msg.append(STRING_WITH_LEN(" log_pos "));
+ llstr(hb.log_pos, llbuf);
+ error_msg.append(llbuf, strlen(llbuf));
+ goto err;
+ }
+ goto skip_relay_logging;
+ }
+ break;
+
default:
inc_pos= event_len;
break;
@@ -3654,9 +4050,20 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
*/
pthread_mutex_lock(log_lock);
-
- if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
- !mi->rli.replicate_same_server_id)
+ s_id= uint4korr(buf + SERVER_ID_OFFSET);
+ if ((s_id == ::server_id && !mi->rli.replicate_same_server_id) ||
+ /*
+ the following conjunction deals with IGNORE_SERVER_IDS, if set
+ If the master is on the ignore list, execution of
+ format description log events and rotate events is necessary.
+ */
+ (mi->ignore_server_ids.elements > 0 &&
+ mi->shall_ignore_server_id(s_id) &&
+ /* everything is filtered out from non-master */
+ (s_id != mi->master_id ||
+ /* for the master meta information is necessary */
+ buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
+ buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT)))
{
/*
Do not write it to the relay log.
@@ -3671,10 +4078,14 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
But events which were generated by this slave and which do not exist in
the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
mi->master_log_pos.
+ If the event is originated remotely and is being filtered out by
+ IGNORE_SERVER_IDS it increments mi->master_log_pos
+ as well as rli->group_relay_log_pos.
*/
- if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
- buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
- buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
+ if (!(s_id == ::server_id && !mi->rli.replicate_same_server_id) ||
+ buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
+ buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT &&
+ buf[EVENT_TYPE_OFFSET] != STOP_EVENT)
{
mi->master_log_pos+= inc_pos;
memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
@@ -3682,8 +4093,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
rli->ign_master_log_pos_end= mi->master_log_pos;
}
rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
- DBUG_PRINT("info", ("master_log_pos: %lu, event originating from the same server, ignored",
- (ulong) mi->master_log_pos));
+ DBUG_PRINT("info", ("master_log_pos: %lu, event originating from %u server, ignored",
+ (ulong) mi->master_log_pos, uint4korr(buf + SERVER_ID_OFFSET)));
}
else
{
@@ -3695,15 +4106,23 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
}
else
- error= 3;
+ {
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ }
rli->ign_master_log_name_end[0]= 0; // last event is not ignored
}
pthread_mutex_unlock(log_lock);
-
+skip_relay_logging:
+
err:
pthread_mutex_unlock(&mi->data_lock);
DBUG_PRINT("info", ("error: %d", error));
+ if (error)
+ mi->report(ERROR_LEVEL, error, ER(error),
+ (error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
+ "could not queue event from master" :
+ error_msg.ptr());
DBUG_RETURN(error);
}
@@ -3909,6 +4328,71 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
}
+MYSQL *rpl_connect_master(MYSQL *mysql)
+{
+ THD *thd= current_thd;
+ Master_info *mi= my_pthread_getspecific_ptr(Master_info*, RPL_MASTER_INFO);
+ if (!mi)
+ {
+ sql_print_error("'rpl_connect_master' must be called in slave I/O thread context.");
+ return NULL;
+ }
+
+ bool allocated= false;
+
+ if (!mysql)
+ {
+ if(!(mysql= mysql_init(NULL)))
+ {
+ sql_print_error("rpl_connect_master: failed in mysql_init()");
+ return NULL;
+ }
+ allocated= true;
+ }
+
+ /*
+ XXX: copied from connect_to_master, this function should not
+ change the slave status, so we cannot use connect_to_master
+ directly
+
+ TODO: make this part a seperate function to eliminate duplication
+ */
+ mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
+ mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
+
+#ifdef HAVE_OPENSSL
+ if (mi->ssl)
+ {
+ mysql_ssl_set(mysql,
+ mi->ssl_key[0]?mi->ssl_key:0,
+ mi->ssl_cert[0]?mi->ssl_cert:0,
+ mi->ssl_ca[0]?mi->ssl_ca:0,
+ mi->ssl_capath[0]?mi->ssl_capath:0,
+ mi->ssl_cipher[0]?mi->ssl_cipher:0);
+ mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT,
+ &mi->ssl_verify_server_cert);
+ }
+#endif
+
+ mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
+ /* This one is not strictly needed but we have it here for completeness */
+ mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);
+
+ if (io_slave_killed(thd, mi)
+ || !mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
+ mi->port, 0, 0))
+ {
+ if (!io_slave_killed(thd, mi))
+ sql_print_error("rpl_connect_master: error connecting to master: %s (server_error: %d)",
+ mysql_error(mysql), mysql_errno(mysql));
+
+ if (allocated)
+ mysql_close(mysql); // this will free the object
+ return NULL;
+ }
+ return mysql;
+}
+
/*
Store the file and position where the execute-slave thread are in the
relay log.
@@ -3962,7 +4446,14 @@ bool flush_relay_log_info(Relay_log_info* rli)
error=1;
if (flush_io_cache(file))
error=1;
-
+ if (sync_relayloginfo_period &&
+ !error &&
+ ++(rli->sync_counter) >= sync_relayloginfo_period)
+ {
+ if (my_sync(rli->info_fd, MYF(MY_WME)))
+ error=1;
+ rli->sync_counter= 0;
+ }
/* Flushing the relay log is done by the slave I/O thread */
DBUG_RETURN(error);
}
@@ -4211,8 +4702,8 @@ static Log_event* next_event(Relay_log_info* rli)
*/
pthread_mutex_unlock(&rli->log_space_lock);
pthread_cond_broadcast(&rli->log_space_cond);
- // Note that wait_for_update unlocks lock_log !
- rli->relay_log.wait_for_update(rli->sql_thd, 1);
+ // Note that wait_for_update_relay_log unlocks lock_log !
+ rli->relay_log.wait_for_update_relay_log(rli->sql_thd);
// re-acquire data lock since we released it earlier
pthread_mutex_lock(&rli->data_lock);
rli->last_master_timestamp= save_timestamp;
@@ -4369,6 +4860,8 @@ void rotate_relay_log(Master_info* mi)
DBUG_ENTER("rotate_relay_log");
Relay_log_info* rli= &mi->rli;
+ DBUG_EXECUTE_IF("crash_before_rotate_relaylog", abort(););
+
/* We don't lock rli->run_lock. This would lead to deadlocks. */
pthread_mutex_lock(&mi->run_lock);
diff --git a/sql/slave.h b/sql/slave.h
index f356d28b626..eff0fa49f61 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -22,6 +22,17 @@
@file
*/
+
+/**
+ Some of defines are need in parser even though replication is not
+ compiled in (embedded).
+*/
+
+/**
+ The maximum is defined as (ULONG_MAX/1000) with 4 bytes ulong
+*/
+#define SLAVE_MAX_HEARTBEAT_PERIOD 4294967
+
#ifdef HAVE_REPLICATION
#include "log.h"
@@ -33,7 +44,6 @@
#define MAX_SLAVE_ERROR 2000
-
// Forward declarations
class Relay_log_info;
class Master_info;
@@ -134,6 +144,7 @@ extern ulonglong relay_log_space_limit;
#define SLAVE_FORCE_ALL 4
int init_slave();
+int init_recovery(Master_info* mi, const char** errmsg);
void init_slave_skip_errors(const char* arg);
bool flush_relay_log_info(Relay_log_info* rli);
int register_slave_on_master(MYSQL* mysql);
diff --git a/sql/sp_head.cc b/sql/sp_head.cc
index 908f0997be6..a95b27ef9cd 100644
--- a/sql/sp_head.cc
+++ b/sql/sp_head.cc
@@ -175,6 +175,7 @@ sp_get_flags_for_command(LEX *lex)
case SQLCOM_SHOW_AUTHORS:
case SQLCOM_SHOW_BINLOGS:
case SQLCOM_SHOW_BINLOG_EVENTS:
+ case SQLCOM_SHOW_RELAYLOG_EVENTS:
case SQLCOM_SHOW_CHARSETS:
case SQLCOM_SHOW_COLLATIONS:
case SQLCOM_SHOW_COLUMN_TYPES:
diff --git a/sql/sql_acl.h b/sql/sql_acl.h
index a8090fba2e7..c0622bd747c 100644
--- a/sql/sql_acl.h
+++ b/sql/sql_acl.h
@@ -1,3 +1,6 @@
+#ifndef SQL_ACL_INCLUDED
+#define SQL_ACL_INCLUDED
+
/* Copyright (C) 2000-2006 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -273,3 +276,4 @@ bool is_acl_user(const char *host, const char *user);
#define check_grant(A,B,C,D,E,F) 0
#define check_grant_db(A,B) 0
#endif
+#endif /* SQL_ACL_INCLUDED */
diff --git a/sql/sql_analyse.h b/sql/sql_analyse.h
index 8807b40857e..8f52b90c874 100644
--- a/sql/sql_analyse.h
+++ b/sql/sql_analyse.h
@@ -1,3 +1,6 @@
+#ifndef SQL_ANALYSE_INCLUDED
+#define SQL_ANALYSE_INCLUDED
+
/* Copyright (C) 2000-2003, 2005 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -355,3 +358,5 @@ public:
select_result *result,
List<Item> &field_list);
};
+
+#endif /* SQL_ANALYSE_INCLUDED */
diff --git a/sql/sql_array.h b/sql/sql_array.h
index e1b22921519..dfaa9b02947 100644
--- a/sql/sql_array.h
+++ b/sql/sql_array.h
@@ -1,3 +1,6 @@
+#ifndef SQL_ARRAY_INCLUDED
+#define SQL_ARRAY_INCLUDED
+
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -66,3 +69,4 @@ public:
}
};
+#endif /* SQL_ARRAY_INCLUDED */
diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc
index ee51480411b..58c309ef57b 100644
--- a/sql/sql_binlog.cc
+++ b/sql/sql_binlog.cc
@@ -61,7 +61,7 @@ void mysql_client_binlog_statement(THD* thd)
rli= thd->rli_fake;
if (!rli)
{
- rli= thd->rli_fake= new Relay_log_info;
+ rli= thd->rli_fake= new Relay_log_info(FALSE);
#ifdef HAVE_purify
rli->is_fake= TRUE;
#endif
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 4ed25b03ed7..757f6abb00b 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -266,6 +266,42 @@ const char *set_thd_proc_info(THD *thd, const char *info,
}
extern "C"
+const char* thd_enter_cond(MYSQL_THD thd, pthread_cond_t *cond,
+ pthread_mutex_t *mutex, const char *msg)
+{
+ if (!thd)
+ thd= current_thd;
+
+ const char* old_msg = thd->proc_info;
+ safe_mutex_assert_owner(mutex);
+ thd->mysys_var->current_mutex = mutex;
+ thd->mysys_var->current_cond = cond;
+ thd->proc_info = msg;
+ return old_msg;
+}
+
+extern "C"
+void thd_exit_cond(MYSQL_THD thd, const char *old_msg)
+{
+ if (!thd)
+ thd= current_thd;
+
+ /*
+ Putting the mutex unlock in thd_exit_cond() ensures that
+ mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
+ locked (if that would not be the case, you'll get a deadlock if someone
+ does a THD::awake() on you).
+ */
+ pthread_mutex_unlock(thd->mysys_var->current_mutex);
+ pthread_mutex_lock(&thd->mysys_var->mutex);
+ thd->mysys_var->current_mutex = 0;
+ thd->mysys_var->current_cond = 0;
+ thd->proc_info = old_msg;
+ pthread_mutex_unlock(&thd->mysys_var->mutex);
+ return;
+}
+
+extern "C"
void **thd_ha_data(const THD *thd, const struct handlerton *hton)
{
return (void **) &thd->ha_data[hton->slot].ha_ptr;
@@ -475,9 +511,6 @@ THD::THD()
limit_found_rows= 0;
row_count_func= -1;
statement_id_counter= 0UL;
-#ifdef ERROR_INJECT_SUPPORT
- error_inject_value= 0UL;
-#endif
// Must be reset to handle error with THD's created for init of mysqld
lex->current_select= 0;
start_time=(time_t) 0;
diff --git a/sql/sql_class.h b/sql/sql_class.h
index ea2c63cdc97..aaa959feae9 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -14,6 +14,9 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+#ifndef SQL_CLASS_INCLUDED
+#define SQL_CLASS_INCLUDED
+
/* Classes in mysql */
#ifdef USE_PRAGMA_INTERFACE
@@ -22,6 +25,7 @@
#include "log.h"
#include "rpl_tblmap.h"
+#include "replication.h"
/**
An interface that is used to take an action when
@@ -1662,9 +1666,6 @@ public:
query_id_t query_id;
ulong col_access;
-#ifdef ERROR_INJECT_SUPPORT
- ulong error_inject_value;
-#endif
/* Statement id is thread-wide. This counter is used to generate ids */
ulong statement_id_counter;
ulong rand_saved_seed1, rand_saved_seed2;
@@ -1885,27 +1886,11 @@ public:
inline const char* enter_cond(pthread_cond_t *cond, pthread_mutex_t* mutex,
const char* msg)
{
- const char* old_msg = proc_info;
- safe_mutex_assert_owner(mutex);
- mysys_var->current_mutex = mutex;
- mysys_var->current_cond = cond;
- proc_info = msg;
- return old_msg;
+ return thd_enter_cond(this, cond, mutex, msg);
}
inline void exit_cond(const char* old_msg)
{
- /*
- Putting the mutex unlock in exit_cond() ensures that
- mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
- locked (if that would not be the case, you'll get a deadlock if someone
- does a THD::awake() on you).
- */
- pthread_mutex_unlock(mysys_var->current_mutex);
- pthread_mutex_lock(&mysys_var->mutex);
- mysys_var->current_mutex = 0;
- mysys_var->current_cond = 0;
- proc_info = old_msg;
- pthread_mutex_unlock(&mysys_var->mutex);
+ thd_exit_cond(this, old_msg);
}
inline time_t query_start() { query_start_used=1; return start_time; }
inline void set_time()
@@ -3088,3 +3073,4 @@ void add_diff_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var,
void mark_transaction_to_rollback(THD *thd, bool all);
#endif /* MYSQL_SERVER */
+#endif /* SQL_CLASS_INCLUDED */
diff --git a/sql/sql_crypt.h b/sql/sql_crypt.h
index a5a6bee8a58..8d5a761cbdf 100644
--- a/sql/sql_crypt.h
+++ b/sql/sql_crypt.h
@@ -1,3 +1,6 @@
+#ifndef SQL_CRYPT_INCLUDED
+#define SQL_CRYPT_INCLUDED
+
/* Copyright (C) 2000-2001, 2005 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -35,3 +38,5 @@ class SQL_CRYPT :public Sql_alloc
void encode(char *str, uint length);
void decode(char *str, uint length);
};
+
+#endif /* SQL_CRYPT_INCLUDED */
diff --git a/sql/sql_lex.h b/sql/sql_lex.h
index 02f337d7015..05a4d1d78bf 100644
--- a/sql/sql_lex.h
+++ b/sql/sql_lex.h
@@ -17,6 +17,9 @@
@defgroup Semantic_Analysis Semantic Analysis
*/
+#ifndef SQL_LEX_INCLUDED
+#define SQL_LEX_INCLUDED
+
/* YACC and LEX Definitions */
/* These may not be declared yet */
@@ -119,7 +122,7 @@ enum enum_sql_command {
SQLCOM_ALTER_DB_UPGRADE,
SQLCOM_SHOW_PROFILE, SQLCOM_SHOW_PROFILES,
SQLCOM_SIGNAL, SQLCOM_RESIGNAL,
-
+ SQLCOM_SHOW_RELAYLOG_EVENTS,
/*
When a command is added here, be sure it's also added in mysqld.cc
in "struct show_var_st status_vars[]= {" ...
@@ -204,17 +207,19 @@ typedef struct st_lex_master_info
{
char *host, *user, *password, *log_file_name;
uint port, connect_retry;
+ float heartbeat_period;
ulonglong pos;
ulong server_id;
/*
Enum is used for making it possible to detect if the user
changed variable or if it should be left at old value
*/
- enum {SSL_UNCHANGED, SSL_DISABLE, SSL_ENABLE}
- ssl, ssl_verify_server_cert;
+ enum {LEX_MI_UNCHANGED, LEX_MI_DISABLE, LEX_MI_ENABLE}
+ ssl, ssl_verify_server_cert, heartbeat_opt, repl_ignore_server_ids_opt;
char *ssl_key, *ssl_cert, *ssl_ca, *ssl_capath, *ssl_cipher;
char *relay_log_name;
ulong relay_log_pos;
+ DYNAMIC_ARRAY repl_ignore_server_ids;
} LEX_MASTER_INFO;
@@ -2068,3 +2073,4 @@ extern bool is_lex_native_function(const LEX_STRING *name);
void my_missing_function_error(const LEX_STRING &token, const char *name);
#endif /* MYSQL_SERVER */
+#endif /* SQL_LEX_INCLUDED */
diff --git a/sql/sql_map.h b/sql/sql_map.h
index a1efba0da6f..5ae260841e0 100644
--- a/sql/sql_map.h
+++ b/sql/sql_map.h
@@ -1,3 +1,6 @@
+#ifndef SQL_MAP_INCLUDED
+#define SQL_MAP_INCLUDED
+
/* Copyright (C) 2000-2001, 2005 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -60,3 +63,5 @@ public:
return file->map;
}
};
+
+#endif /* SQL_MAP_INCLUDED */
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 7b9e31e84f0..d677414070f 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -21,6 +21,7 @@
#include <m_ctype.h>
#include <myisam.h>
#include <my_dir.h>
+#include "rpl_handler.h"
#include "sp_head.h"
#include "sp.h"
@@ -1677,9 +1678,9 @@ void log_slow_statement(THD *thd)
/*
Do not log administrative statements unless the appropriate option is
- set; do not log into slow log if reading from backup.
+ set.
*/
- if (thd->enable_slow_log && !thd->user_time)
+ if (thd->enable_slow_log)
{
ulonglong end_utime_of_query= thd->current_utime();
thd_proc_info(thd, "logging slow query");
@@ -2387,6 +2388,7 @@ mysql_execute_command(THD *thd)
res = show_slave_hosts(thd);
break;
}
+ case SQLCOM_SHOW_RELAYLOG_EVENTS: /* fall through */
case SQLCOM_SHOW_BINLOG_EVENTS:
{
if (check_global_access(thd, REPL_SLAVE_ACL))
diff --git a/sql/sql_partition.cc b/sql/sql_partition.cc
index 24bb99c97a7..eb711001ef6 100644
--- a/sql/sql_partition.cc
+++ b/sql/sql_partition.cc
@@ -40,6 +40,10 @@
#ifdef WITH_PARTITION_STORAGE_ENGINE
#include "ha_partition.h"
+
+#define ERROR_INJECT_CRASH(code) \
+ DBUG_EVALUATE_IF(code, (abort(), 0), 0)
+
/*
Partition related functions declarations and some static constants;
*/
diff --git a/sql/sql_partition.h b/sql/sql_partition.h
index 282e24f1853..0c47340016c 100644
--- a/sql/sql_partition.h
+++ b/sql/sql_partition.h
@@ -1,3 +1,6 @@
+#ifndef SQL_PARTITION_INCLUDED
+#define SQL_PARTITION_INCLUDED
+
/* Copyright (C) 2006 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -209,3 +212,4 @@ typedef int (*get_partitions_in_range_iter)(partition_info *part_info,
#include "partition_info.h"
+#endif /* SQL_PARTITION_INCLUDED */
diff --git a/sql/sql_plugin.cc b/sql/sql_plugin.cc
index 831ef778f37..1278eafce92 100644
--- a/sql/sql_plugin.cc
+++ b/sql/sql_plugin.cc
@@ -19,14 +19,6 @@
#define REPORT_TO_LOG 1
#define REPORT_TO_USER 2
-#ifdef DBUG_OFF
-#define plugin_ref_to_int(A) A
-#define plugin_int_to_ref(A) A
-#else
-#define plugin_ref_to_int(A) (A ? A[0] : NULL)
-#define plugin_int_to_ref(A) &(A)
-#endif
-
extern struct st_mysql_plugin *mysqld_builtins[];
/**
@@ -54,7 +46,8 @@ const LEX_STRING plugin_type_names[MYSQL_MAX_PLUGIN_TYPE_NUM]=
{ C_STRING_WITH_LEN("STORAGE ENGINE") },
{ C_STRING_WITH_LEN("FTPARSER") },
{ C_STRING_WITH_LEN("DAEMON") },
- { C_STRING_WITH_LEN("INFORMATION SCHEMA") }
+ { C_STRING_WITH_LEN("INFORMATION SCHEMA") },
+ { C_STRING_WITH_LEN("REPLICATION") },
};
extern int initialize_schema_table(st_plugin_int *plugin);
@@ -93,7 +86,8 @@ static int min_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]=
MYSQL_HANDLERTON_INTERFACE_VERSION,
MYSQL_FTPARSER_INTERFACE_VERSION,
MYSQL_DAEMON_INTERFACE_VERSION,
- MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION
+ MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
+ MYSQL_REPLICATION_INTERFACE_VERSION,
};
static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]=
{
@@ -101,7 +95,8 @@ static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]=
MYSQL_HANDLERTON_INTERFACE_VERSION,
MYSQL_FTPARSER_INTERFACE_VERSION,
MYSQL_DAEMON_INTERFACE_VERSION,
- MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION
+ MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
+ MYSQL_REPLICATION_INTERFACE_VERSION,
};
static bool initialized= 0;
diff --git a/sql/sql_plugin.h b/sql/sql_plugin.h
index 004d0d5abb7..c6ad846943c 100644
--- a/sql/sql_plugin.h
+++ b/sql/sql_plugin.h
@@ -18,6 +18,14 @@
class sys_var;
+#ifdef DBUG_OFF
+#define plugin_ref_to_int(A) A
+#define plugin_int_to_ref(A) A
+#else
+#define plugin_ref_to_int(A) (A ? A[0] : NULL)
+#define plugin_int_to_ref(A) &(A)
+#endif
+
/*
the following flags are valid for plugin_init()
*/
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 84781a3a0ae..5f07df64905 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -21,6 +21,7 @@
#include "log_event.h"
#include "rpl_filter.h"
#include <my_dir.h>
+#include "rpl_handler.h"
int max_binlog_dump_events = 0; // unlimited
my_bool opt_sporadic_binlog_dump_fail = 0;
@@ -80,6 +81,32 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
DBUG_RETURN(0);
}
+/*
+ Reset thread transmit packet buffer for event sending
+
+ This function allocates header bytes for event transmission, and
+ should be called before store the event data to the packet buffer.
+*/
+static int reset_transmit_packet(THD *thd, ushort flags,
+ ulong *ev_offset, const char **errmsg)
+{
+ int ret= 0;
+ String *packet= &thd->packet;
+
+ /* reserve and set default header */
+ packet->length(0);
+ packet->set("\0", 1, &my_charset_bin);
+
+ if (RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet)))
+ {
+ *errmsg= "Failed to run hook 'reserve_header'";
+ my_errno= ER_UNKNOWN_ERROR;
+ ret= 1;
+ }
+ *ev_offset= packet->length();
+ return ret;
+}
+
static int send_file(THD *thd)
{
NET* net = &thd->net;
@@ -336,6 +363,73 @@ Increase max_allowed_packet on master";
}
+/**
+ An auxiliary function for calling in mysql_binlog_send
+ to initialize the heartbeat timeout in waiting for a binlogged event.
+
+ @param[in] thd THD to access a user variable
+
+ @return heartbeat period an ulonglong of nanoseconds
+ or zero if heartbeat was not demanded by slave
+*/
+static ulonglong get_heartbeat_period(THD * thd)
+{
+ my_bool null_value;
+ LEX_STRING name= { C_STRING_WITH_LEN("master_heartbeat_period")};
+ user_var_entry *entry=
+ (user_var_entry*) hash_search(&thd->user_vars, (uchar*) name.str,
+ name.length);
+ return entry? entry->val_int(&null_value) : 0;
+}
+
+/*
+ Function prepares and sends repliation heartbeat event.
+
+ @param net net object of THD
+ @param packet buffer to store the heartbeat instance
+ @param event_coordinates binlog file name and position of the last
+ real event master sent from binlog
+
+ @note
+ Among three essential pieces of heartbeat data Log_event::when
+ is computed locally.
+ The error to send is serious and should force terminating
+ the dump thread.
+*/
+static int send_heartbeat_event(NET* net, String* packet,
+ const struct event_coordinates *coord)
+{
+ DBUG_ENTER("send_heartbeat_event");
+ char header[LOG_EVENT_HEADER_LEN];
+ /*
+ 'when' (the timestamp) is set to 0 so that slave could distinguish between
+ real and fake Rotate events (if necessary)
+ */
+ memset(header, 0, 4); // when
+
+ header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT;
+
+ char* p= coord->file_name + dirname_length(coord->file_name);
+
+ uint ident_len = strlen(p);
+ ulong event_len = ident_len + LOG_EVENT_HEADER_LEN;
+ int4store(header + SERVER_ID_OFFSET, server_id);
+ int4store(header + EVENT_LEN_OFFSET, event_len);
+ int2store(header + FLAGS_OFFSET, 0);
+
+ int4store(header + LOG_POS_OFFSET, coord->pos); // log_pos
+
+ packet->append(header, sizeof(header));
+ packet->append(p, ident_len); // log_file_name
+
+ if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
+ net_flush(net))
+ {
+ DBUG_RETURN(-1);
+ }
+ DBUG_RETURN(0);
+}
+
/*
TODO: Clean up loop to only have one call to send_file()
*/
@@ -346,6 +440,9 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
LOG_INFO linfo;
char *log_file_name = linfo.log_file_name;
char search_file_name[FN_REFLEN], *name;
+
+ ulong ev_offset;
+
IO_CACHE log;
File file = -1;
String* packet = &thd->packet;
@@ -361,6 +458,30 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
bzero((char*) &log,sizeof(log));
+ /*
+ heartbeat_period from @master_heartbeat_period user variable
+ */
+ ulonglong heartbeat_period= get_heartbeat_period(thd);
+ struct timespec heartbeat_buf;
+ struct event_coordinates coord_buf;
+ struct timespec *heartbeat_ts= NULL;
+ struct event_coordinates *coord= NULL;
+ if (heartbeat_period != LL(0))
+ {
+ heartbeat_ts= &heartbeat_buf;
+ set_timespec_nsec(*heartbeat_ts, 0);
+ coord= &coord_buf;
+ coord->file_name= log_file_name; // initialization basing on what slave remembers
+ coord->pos= pos;
+ }
+ sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
+ thd->server_id, log_ident, (ulong)pos);
+ if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
+ {
+ errmsg= "Failed to run hook 'transmit_start'";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
#ifndef DBUG_OFF
if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
@@ -416,11 +537,9 @@ impossible position";
goto err;
}
- /*
- We need to start a packet with something other than 255
- to distinguish it from error
- */
- packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */
+ /* reset transmit packet for the fake rotate event below */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
/*
Tell the client about the log name with a fake Rotate event;
@@ -460,7 +579,7 @@ impossible position";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
}
- packet->set("\0", 1, &my_charset_bin);
+
/*
Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
this larger than the corresponding packet (query) sent
@@ -476,6 +595,11 @@ impossible position";
log_lock = mysql_bin_log.get_log_lock();
if (pos > BIN_LOG_HEADER_SIZE)
{
+ /* reset transmit packet for the event read from binary log
+ file */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
+
/*
Try to find a Format_description_log_event at the beginning of
the binlog
@@ -483,29 +607,30 @@ impossible position";
if (!(error = Log_event::read_log_event(&log, packet, log_lock)))
{
/*
- The packet has offsets equal to the normal offsets in a binlog
- event +1 (the first character is \0).
+ The packet has offsets equal to the normal offsets in a
+ binlog event + ev_offset (the first ev_offset characters are
+ the header (default \0)).
*/
DBUG_PRINT("info",
("Looked for a Format_description_log_event, found event type %d",
- (*packet)[EVENT_TYPE_OFFSET+1]));
- if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+ (*packet)[EVENT_TYPE_OFFSET+ev_offset]));
+ if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT)
{
- binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
+ binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
LOG_EVENT_BINLOG_IN_USE_F);
- (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+ (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
/*
mark that this event with "log_pos=0", so the slave
should not increment master's binlog position
(rli->group_master_log_pos)
*/
- int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
+ int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0);
/*
if reconnect master sends FD event with `created' as 0
to avoid destroying temp tables.
*/
int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
- ST_CREATED_OFFSET+1, (ulong) 0);
+ ST_CREATED_OFFSET+ev_offset, (ulong) 0);
/* send it */
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
@@ -531,8 +656,6 @@ impossible position";
Format_description_log_event will be found naturally if it is written.
*/
}
- /* reset the packet as we wrote to it in any case */
- packet->set("\0", 1, &my_charset_bin);
} /* end of if (pos > BIN_LOG_HEADER_SIZE); */
else
{
@@ -544,6 +667,12 @@ impossible position";
while (!net->error && net->vio != 0 && !thd->killed)
{
+ Log_event_type event_type= UNKNOWN_EVENT;
+
+ /* reset the transmit packet for the event read from binary log
+ file */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
{
#ifndef DBUG_OFF
@@ -555,16 +684,31 @@ impossible position";
goto err;
}
#endif
+ /*
+ log's filename does not change while it's active
+ */
+ if (coord)
+ coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
- if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+ event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
+ if (event_type == FORMAT_DESCRIPTION_EVENT)
{
- binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
+ binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
LOG_EVENT_BINLOG_IN_USE_F);
- (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+ (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
}
- else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
+ else if (event_type == STOP_EVENT)
binlog_can_be_corrupted= FALSE;
+ pos = my_b_tell(&log);
+ if (RUN_HOOK(binlog_transmit, before_send_event,
+ (thd, flags, packet, log_file_name, pos)))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
+ errmsg= "run 'before_send_event' hook failed";
+ goto err;
+ }
+
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
errmsg = "Failed on my_net_write()";
@@ -572,9 +716,8 @@ impossible position";
goto err;
}
- DBUG_PRINT("info", ("log event code %d",
- (*packet)[LOG_EVENT_OFFSET+1] ));
- if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+ DBUG_PRINT("info", ("log event code %d", event_type));
+ if (event_type == LOAD_EVENT)
{
if (send_file(thd))
{
@@ -583,7 +726,17 @@ impossible position";
goto err;
}
}
- packet->set("\0", 1, &my_charset_bin);
+
+ if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
+ {
+ errmsg= "Failed to run hook 'after_send_event'";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+
+ /* reset transmit packet for next loop */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
}
/*
@@ -634,6 +787,11 @@ impossible position";
}
#endif
+ /* reset the transmit packet for the event read from binary log
+ file */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
+
/*
No one will update the log while we are reading
now, but we'll be quick and just read one record
@@ -650,34 +808,86 @@ impossible position";
/* we read successfully, so we'll need to send it to the slave */
pthread_mutex_unlock(log_lock);
read_packet = 1;
+ if (coord)
+ coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
+ event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
break;
case LOG_READ_EOF:
+ {
+ int ret;
+ ulong signal_cnt;
DBUG_PRINT("wait",("waiting for data in binary log"));
if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
{
pthread_mutex_unlock(log_lock);
goto end;
}
- if (!thd->killed)
- {
- /* Note that the following call unlocks lock_log */
- mysql_bin_log.wait_for_update(thd, 0);
- }
- else
- pthread_mutex_unlock(log_lock);
- DBUG_PRINT("wait",("binary log received update"));
- break;
- default:
+#ifndef DBUG_OFF
+ ulong hb_info_counter= 0;
+#endif
+ signal_cnt= mysql_bin_log.signal_cnt;
+ do
+ {
+ if (coord)
+ {
+ DBUG_ASSERT(heartbeat_ts && heartbeat_period != LL(0));
+ set_timespec_nsec(*heartbeat_ts, heartbeat_period);
+ }
+ ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
+ DBUG_ASSERT(ret == 0 || heartbeat_period != LL(0) && coord != NULL);
+ if (ret == ETIMEDOUT || ret == ETIME)
+ {
+#ifndef DBUG_OFF
+ if (hb_info_counter < 3)
+ {
+ sql_print_information("master sends heartbeat message");
+ hb_info_counter++;
+ if (hb_info_counter == 3)
+ sql_print_information("the rest of heartbeat info skipped ...");
+ }
+#endif
+ /* reset transmit packet for the heartbeat event */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
+ if (send_heartbeat_event(net, packet, coord))
+ {
+ errmsg = "Failed on my_net_write()";
+ my_errno= ER_UNKNOWN_ERROR;
+ pthread_mutex_unlock(log_lock);
+ goto err;
+ }
+ }
+ else
+ {
+ DBUG_ASSERT(ret == 0 && signal_cnt != mysql_bin_log.signal_cnt ||
+ thd->killed);
+ DBUG_PRINT("wait",("binary log received update"));
+ }
+ } while (signal_cnt == mysql_bin_log.signal_cnt && !thd->killed);
+ pthread_mutex_unlock(log_lock);
+ }
+ break;
+
+ default:
pthread_mutex_unlock(log_lock);
test_for_non_eof_log_read_errors(error, &errmsg);
goto err;
}
if (read_packet)
- {
- thd_proc_info(thd, "Sending binlog event to slave");
+ {
+ thd_proc_info(thd, "Sending binlog event to slave");
+ pos = my_b_tell(&log);
+ if (RUN_HOOK(binlog_transmit, before_send_event,
+ (thd, flags, packet, log_file_name, pos)))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
+ errmsg= "run 'before_send_event' hook failed";
+ goto err;
+ }
+
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
{
errmsg = "Failed on my_net_write()";
@@ -685,7 +895,7 @@ impossible position";
goto err;
}
- if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+ if (event_type == LOAD_EVENT)
{
if (send_file(thd))
{
@@ -694,11 +904,13 @@ impossible position";
goto err;
}
}
- packet->set("\0", 1, &my_charset_bin);
- /*
- No need to net_flush because we will get to flush later when
- we hit EOF pretty quick
- */
+
+ if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
+ errmsg= "Failed to run hook 'after_send_event'";
+ goto err;
+ }
}
log.error=0;
@@ -728,6 +940,10 @@ impossible position";
end_io_cache(&log);
(void) my_close(file, MYF(MY_WME));
+ /* reset transmit packet for the possible fake rotate event */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
+
/*
Call fake_rotate_event() in case the previous log (the one which
we have just finished reading) did not contain a Rotate event
@@ -745,8 +961,8 @@ impossible position";
goto err;
}
- packet->length(0);
- packet->append('\0');
+ if (coord)
+ coord->file_name= log_file_name; // reset to the next
}
}
@@ -754,6 +970,7 @@ end:
end_io_cache(&log);
(void)my_close(file, MYF(MY_WME));
+ RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
my_eof(thd);
thd_proc_info(thd, "Waiting to finalize termination");
pthread_mutex_lock(&LOCK_thread_count);
@@ -764,6 +981,7 @@ end:
err:
thd_proc_info(thd, "Waiting to finalize termination");
end_io_cache(&log);
+ RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
/*
Exclude iteration through thread list
this is needed for purge_logs() - it will iterate through
@@ -1058,6 +1276,7 @@ int reset_slave(THD *thd, Master_info* mi)
goto err;
}
+ RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
err:
unlock_slave_threads(mi);
if (error)
@@ -1131,26 +1350,40 @@ bool change_master(THD* thd, Master_info* mi)
int thread_mask;
const char* errmsg= 0;
bool need_relay_log_purge= 1;
+ bool ret= FALSE;
DBUG_ENTER("change_master");
lock_slave_threads(mi);
init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
+ LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
if (thread_mask) // We refuse if any slave thread is running
{
my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
- unlock_slave_threads(mi);
- DBUG_RETURN(TRUE);
+ ret= TRUE;
+ goto err;
}
thd_proc_info(thd, "Changing master");
- LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
+ /*
+ We need to check if there is an empty master_host. Otherwise
+ change master succeeds, a master.info file is created containing
+ empty master_host string and when issuing: start slave; an error
+ is thrown stating that the server is not configured as slave.
+ (See BUG#28796).
+ */
+ if(lex_mi->host && !*lex_mi->host)
+ {
+ my_error(ER_WRONG_ARGUMENTS, MYF(0), "MASTER_HOST");
+ unlock_slave_threads(mi);
+ DBUG_RETURN(TRUE);
+ }
// TODO: see if needs re-write
if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
thread_mask))
{
my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
- unlock_slave_threads(mi);
- DBUG_RETURN(TRUE);
+ ret= TRUE;
+ goto err;
}
/*
@@ -1189,13 +1422,46 @@ bool change_master(THD* thd, Master_info* mi)
mi->port = lex_mi->port;
if (lex_mi->connect_retry)
mi->connect_retry = lex_mi->connect_retry;
+ if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
+ mi->heartbeat_period = lex_mi->heartbeat_period;
+ else
+ mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD,
+ (slave_net_timeout/2.0));
+ mi->received_heartbeats= LL(0); // counter lives until master is CHANGEd
+ /*
+ reset the last time server_id list if the current CHANGE MASTER
+ is mentioning IGNORE_SERVER_IDS= (...)
+ */
+ if (lex_mi->repl_ignore_server_ids_opt == LEX_MASTER_INFO::LEX_MI_ENABLE)
+ reset_dynamic(&mi->ignore_server_ids);
+ for (uint i= 0; i < lex_mi->repl_ignore_server_ids.elements; i++)
+ {
+ ulong s_id;
+ get_dynamic(&lex_mi->repl_ignore_server_ids, (uchar*) &s_id, i);
+ if (s_id == ::server_id && replicate_same_server_id)
+ {
+ my_error(ER_SLAVE_IGNORE_SERVER_IDS, MYF(0), s_id);
+ ret= TRUE;
+ goto err;
+ }
+ else
+ {
+ if (bsearch((const ulong *) &s_id,
+ mi->ignore_server_ids.buffer,
+ mi->ignore_server_ids.elements, sizeof(ulong),
+ (int (*) (const void*, const void*))
+ change_master_server_id_cmp) == NULL)
+ insert_dynamic(&mi->ignore_server_ids, (uchar*) &s_id);
+ }
+ }
+ sort_dynamic(&mi->ignore_server_ids, (qsort_cmp) change_master_server_id_cmp);
- if (lex_mi->ssl != LEX_MASTER_INFO::SSL_UNCHANGED)
- mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::SSL_ENABLE);
+ if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
+ mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
- if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::SSL_UNCHANGED)
+ if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
mi->ssl_verify_server_cert=
- (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::SSL_ENABLE);
+ (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
if (lex_mi->ssl_ca)
strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
@@ -1218,9 +1484,11 @@ bool change_master(THD* thd, Master_info* mi)
if (lex_mi->relay_log_name)
{
need_relay_log_purge= 0;
- strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
+ char relay_log_name[FN_REFLEN];
+ mi->rli.relay_log.make_log_name(relay_log_name, lex_mi->relay_log_name);
+ strmake(mi->rli.group_relay_log_name, relay_log_name,
sizeof(mi->rli.group_relay_log_name)-1);
- strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
+ strmake(mi->rli.event_relay_log_name, relay_log_name,
sizeof(mi->rli.event_relay_log_name)-1);
}
@@ -1267,8 +1535,8 @@ bool change_master(THD* thd, Master_info* mi)
if (flush_master_info(mi, 0))
{
my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
- unlock_slave_threads(mi);
- DBUG_RETURN(TRUE);
+ ret= TRUE;
+ goto err;
}
if (need_relay_log_purge)
{
@@ -1279,8 +1547,8 @@ bool change_master(THD* thd, Master_info* mi)
&errmsg))
{
my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
- unlock_slave_threads(mi);
- DBUG_RETURN(TRUE);
+ ret= TRUE;
+ goto err;
}
}
else
@@ -1295,8 +1563,8 @@ bool change_master(THD* thd, Master_info* mi)
&msg, 0))
{
my_error(ER_RELAY_LOG_INIT, MYF(0), msg);
- unlock_slave_threads(mi);
- DBUG_RETURN(TRUE);
+ ret= TRUE;
+ goto err;
}
}
/*
@@ -1333,10 +1601,13 @@ bool change_master(THD* thd, Master_info* mi)
pthread_cond_broadcast(&mi->data_cond);
pthread_mutex_unlock(&mi->rli.data_lock);
+err:
unlock_slave_threads(mi);
thd_proc_info(thd, 0);
- my_ok(thd);
- DBUG_RETURN(FALSE);
+ if (ret == FALSE)
+ my_ok(thd);
+ delete_dynamic(&lex_mi->repl_ignore_server_ids); //freeing of parser-time alloc
+ DBUG_RETURN(ret);
}
@@ -1357,7 +1628,11 @@ int reset_master(THD* thd)
ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
return 1;
}
- return mysql_bin_log.reset_logs(thd);
+
+ if (mysql_bin_log.reset_logs(thd))
+ return 1;
+ RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */));
+ return 0;
}
int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
@@ -1395,6 +1670,7 @@ bool mysql_show_binlog_events(THD* thd)
bool ret = TRUE;
IO_CACHE log;
File file = -1;
+ MYSQL_BIN_LOG *binary_log= NULL;
DBUG_ENTER("mysql_show_binlog_events");
Log_event::init_show_field_list(&field_list);
@@ -1405,14 +1681,30 @@ bool mysql_show_binlog_events(THD* thd)
Format_description_log_event *description_event= new
Format_description_log_event(3); /* MySQL 4.0 by default */
- /*
- Wait for handlers to insert any pending information
- into the binlog. For e.g. ndb which updates the binlog asynchronously
- this is needed so that the uses sees all its own commands in the binlog
- */
- ha_binlog_wait(thd);
+ DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS ||
+ thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
- if (mysql_bin_log.is_open())
+ /* select wich binary log to use: binlog or relay */
+ if ( thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS )
+ {
+ /*
+ Wait for handlers to insert any pending information
+ into the binlog. For e.g. ndb which updates the binlog asynchronously
+ this is needed so that the uses sees all its own commands in the binlog
+ */
+ ha_binlog_wait(thd);
+
+ binary_log= &mysql_bin_log;
+ }
+ else /* showing relay log contents */
+ {
+ if (!active_mi)
+ DBUG_RETURN(TRUE);
+
+ binary_log= &(active_mi->rli.relay_log);
+ }
+
+ if (binary_log->is_open())
{
LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
SELECT_LEX_UNIT *unit= &thd->lex->unit;
@@ -1420,7 +1712,7 @@ bool mysql_show_binlog_events(THD* thd)
my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
char search_file_name[FN_REFLEN], *name;
const char *log_file_name = lex_mi->log_file_name;
- pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
+ pthread_mutex_t *log_lock = binary_log->get_log_lock();
LOG_INFO linfo;
Log_event* ev;
@@ -1430,13 +1722,13 @@ bool mysql_show_binlog_events(THD* thd)
name= search_file_name;
if (log_file_name)
- mysql_bin_log.make_log_name(search_file_name, log_file_name);
+ binary_log->make_log_name(search_file_name, log_file_name);
else
name=0; // Find first log
linfo.index_file_offset = 0;
- if (mysql_bin_log.find_log_pos(&linfo, name, 1))
+ if (binary_log->find_log_pos(&linfo, name, 1))
{
errmsg = "Could not find target log";
goto err;
@@ -1739,6 +2031,26 @@ public:
bool update(THD *thd, set_var *var);
};
+static void fix_slave_net_timeout(THD *thd, enum_var_type type)
+{
+ DBUG_ENTER("fix_slave_net_timeout");
+#ifdef HAVE_REPLICATION
+ pthread_mutex_lock(&LOCK_active_mi);
+ DBUG_PRINT("info",("slave_net_timeout=%lu mi->heartbeat_period=%.3f",
+ slave_net_timeout,
+ (active_mi? active_mi->heartbeat_period : 0.0)));
+ if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
+ push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
+ "The currect value for master_heartbeat_period"
+ " exceeds the new value of `slave_net_timeout' sec."
+ " A sensible value for the period should be"
+ " less than the timeout.");
+ pthread_mutex_unlock(&LOCK_active_mi);
+#endif
+ DBUG_VOID_RETURN;
+}
+
static sys_var_chain vars = { NULL, NULL };
static sys_var_const sys_log_slave_updates(&vars, "log_slave_updates",
@@ -1755,6 +2067,16 @@ static sys_var_const sys_relay_log_info_file(&vars, "relay_log_info_file",
(uchar*) &relay_log_info_file);
static sys_var_bool_ptr sys_relay_log_purge(&vars, "relay_log_purge",
&relay_log_purge);
+static sys_var_bool_ptr sys_relay_log_recovery(&vars, "relay_log_recovery",
+ &relay_log_recovery);
+static sys_var_uint_ptr sys_sync_binlog_period(&vars, "sync_binlog",
+ &sync_binlog_period);
+static sys_var_uint_ptr sys_sync_relaylog_period(&vars, "sync_relay_log",
+ &sync_relaylog_period);
+static sys_var_uint_ptr sys_sync_relayloginfo_period(&vars, "sync_relay_log_info",
+ &sync_relayloginfo_period);
+static sys_var_uint_ptr sys_sync_masterinfo_period(&vars, "sync_master_info",
+ &sync_masterinfo_period);
static sys_var_const sys_relay_log_space_limit(&vars,
"relay_log_space_limit",
OPT_GLOBAL, SHOW_LONGLONG,
@@ -1764,13 +2086,13 @@ static sys_var_const sys_slave_load_tmpdir(&vars, "slave_load_tmpdir",
OPT_GLOBAL, SHOW_CHAR_PTR,
(uchar*) &slave_load_tmpdir);
static sys_var_long_ptr sys_slave_net_timeout(&vars, "slave_net_timeout",
- &slave_net_timeout);
+ &slave_net_timeout,
+ fix_slave_net_timeout);
static sys_var_const sys_slave_skip_errors(&vars, "slave_skip_errors",
OPT_GLOBAL, SHOW_CHAR,
(uchar*) slave_skip_error_names);
static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retries",
&slave_trans_retries);
-static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period);
static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter");
@@ -1812,12 +2134,6 @@ bool sys_var_slave_skip_counter::update(THD *thd, set_var *var)
}
-bool sys_var_sync_binlog_period::update(THD *thd, set_var *var)
-{
- sync_binlog_period= (ulong) var->save_result.ulonglong_value;
- return 0;
-}
-
int init_replication_sys_vars()
{
if (mysql_add_sys_var_chain(vars.first, my_long_options))
@@ -1829,6 +2145,5 @@ int init_replication_sys_vars()
return 0;
}
-#endif /* HAVE_REPLICATION */
-
+#endif /* HAVE_REPLICATION */
diff --git a/sql/sql_repl.h b/sql/sql_repl.h
index d5c9040f8dc..aa71ac96ff8 100644
--- a/sql/sql_repl.h
+++ b/sql/sql_repl.h
@@ -13,6 +13,9 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+#ifndef SQL_REPL_INCLUDED
+#define SQL_REPL_INCLUDED
+
#include "rpl_filter.h"
#ifdef HAVE_REPLICATION
@@ -65,3 +68,4 @@ int init_replication_sys_vars();
#endif /* HAVE_REPLICATION */
+#endif /* SQL_REPL_INCLUDED */
diff --git a/sql/sql_select.h b/sql/sql_select.h
index 4eb6548d23b..f7dc09ddc62 100644
--- a/sql/sql_select.h
+++ b/sql/sql_select.h
@@ -1,3 +1,6 @@
+#ifndef SQL_SELECT_INCLUDED
+#define SQL_SELECT_INCLUDED
+
/* Copyright (C) 2000-2006 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -758,3 +761,4 @@ inline bool optimizer_flag(THD *thd, uint flag)
return (thd->variables.optimizer_switch & flag);
}
+#endif /* SQL_SELECT_INCLUDED */
diff --git a/sql/sql_servers.h b/sql/sql_servers.h
index 63c691893d1..12855f8473c 100644
--- a/sql/sql_servers.h
+++ b/sql/sql_servers.h
@@ -1,3 +1,6 @@
+#ifndef SQL_SERVERS_INCLUDED
+#define SQL_SERVERS_INCLUDED
+
/* Copyright (C) 2006 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -41,3 +44,5 @@ int alter_server(THD *thd, LEX_SERVER_OPTIONS *server_options);
/* lookup functions */
FOREIGN_SERVER *get_server_by_name(MEM_ROOT *mem, const char *server_name,
FOREIGN_SERVER *server_buffer);
+
+#endif /* SQL_SERVERS_INCLUDED */
diff --git a/sql/sql_sort.h b/sql/sql_sort.h
index 1e9322f7f5b..102b3ef0a11 100644
--- a/sql/sql_sort.h
+++ b/sql/sql_sort.h
@@ -1,3 +1,6 @@
+#ifndef SQL_SORT_INCLUDED
+#define SQL_SORT_INCLUDED
+
/* Copyright (C) 2000 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -87,3 +90,5 @@ int merge_buffers(SORTPARAM *param,IO_CACHE *from_file,
BUFFPEK *lastbuff,BUFFPEK *Fb,
BUFFPEK *Tb,int flag);
void reuse_freed_buff(QUEUE *queue, BUFFPEK *reuse, uint key_length);
+
+#endif /* SQL_SORT_INCLUDED */
diff --git a/sql/sql_string.h b/sql/sql_string.h
index d62908e5d66..7b10aafbff1 100644
--- a/sql/sql_string.h
+++ b/sql/sql_string.h
@@ -1,3 +1,6 @@
+#ifndef SQL_STRING_INCLUDED
+#define SQL_STRING_INCLUDED
+
/* Copyright (C) 2000 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -389,3 +392,5 @@ static inline bool check_if_only_end_space(CHARSET_INFO *cs, char *str,
{
return str+ cs->cset->scan(cs, str, end, MY_SEQ_SPACES) == end;
}
+
+#endif /* SQL_STRING_INCLUDED */
diff --git a/sql/sql_trigger.h b/sql/sql_trigger.h
index f6754a75284..b411acf2ac5 100644
--- a/sql/sql_trigger.h
+++ b/sql/sql_trigger.h
@@ -1,3 +1,6 @@
+#ifndef SQL_TRIGGER_INCLUDED
+#define SQL_TRIGGER_INCLUDED
+
/* Copyright (C) 2004-2005 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -174,3 +177,4 @@ bool load_table_name_for_trigger(THD *thd,
const LEX_STRING *trn_path,
LEX_STRING *tbl_name);
+#endif /* SQL_TRIGGER_INCLUDED */
diff --git a/sql/sql_udf.h b/sql/sql_udf.h
index 4b8b492698e..95cb167869e 100644
--- a/sql/sql_udf.h
+++ b/sql/sql_udf.h
@@ -1,3 +1,6 @@
+#ifndef SQL_UDF_INCLUDED
+#define SQL_UDF_INCLUDED
+
/* Copyright (C) 2000-2001, 2003-2006 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -140,3 +143,4 @@ void free_udf(udf_func *udf);
int mysql_create_function(THD *thd,udf_func *udf);
int mysql_drop_function(THD *thd,const LEX_STRING *name);
#endif
+#endif /* SQL_UDF_INCLUDED */
diff --git a/sql/sql_view.h b/sql/sql_view.h
index e08c2168e14..3de51c3264e 100644
--- a/sql/sql_view.h
+++ b/sql/sql_view.h
@@ -1,3 +1,6 @@
+#ifndef SQL_VIEW_INCLUDED
+#define SQL_VIEW_INCLUDED
+
/* -*- C++ -*- */
/* Copyright (C) 2004 MySQL AB
@@ -42,3 +45,4 @@ bool mysql_rename_view(THD *thd, const char *new_db, const char *new_name,
#define VIEW_ANY_ACL (SELECT_ACL | UPDATE_ACL | INSERT_ACL | DELETE_ACL)
+#endif /* SQL_VIEW_INCLUDED */
diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy
index 66ce6479500..bf8f7a0ebe7 100644
--- a/sql/sql_yacc.yy
+++ b/sql/sql_yacc.yy
@@ -753,6 +753,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize);
%token IDENT_QUOTED
%token IF
%token IGNORE_SYM
+%token IGNORE_SERVER_IDS_SYM
%token IMPORT
%token INDEXES
%token INDEX_SYM
@@ -827,6 +828,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize);
%token MASTER_SSL_VERIFY_SERVER_CERT_SYM
%token MASTER_SYM
%token MASTER_USER_SYM
+%token MASTER_HEARTBEAT_PERIOD_SYM
%token MATCH /* SQL-2003-R */
%token MAX_CONNECTIONS_PER_HOUR
%token MAX_QUERIES_PER_HOUR
@@ -945,6 +947,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize);
%token REDUNDANT_SYM
%token REFERENCES /* SQL-2003-R */
%token REGEXP
+%token RELAYLOG_SYM
%token RELAY_LOG_FILE_SYM
%token RELAY_LOG_POS_SYM
%token RELAY_THREAD
@@ -1588,6 +1591,12 @@ change:
LEX *lex = Lex;
lex->sql_command = SQLCOM_CHANGE_MASTER;
bzero((char*) &lex->mi, sizeof(lex->mi));
+ /*
+ resetting flags that can left from the previous CHANGE MASTER
+ */
+ lex->mi.repl_ignore_server_ids_opt= LEX_MASTER_INFO::LEX_MI_UNCHANGED;
+ my_init_dynamic_array(&Lex->mi.repl_ignore_server_ids,
+ sizeof(::server_id), 16, 16);
}
master_defs
{}
@@ -1622,7 +1631,7 @@ master_def:
| MASTER_SSL_SYM EQ ulong_num
{
Lex->mi.ssl= $3 ?
- LEX_MASTER_INFO::SSL_ENABLE : LEX_MASTER_INFO::SSL_DISABLE;
+ LEX_MASTER_INFO::LEX_MI_ENABLE : LEX_MASTER_INFO::LEX_MI_DISABLE;
}
| MASTER_SSL_CA_SYM EQ TEXT_STRING_sys
{
@@ -1647,11 +1656,69 @@ master_def:
| MASTER_SSL_VERIFY_SERVER_CERT_SYM EQ ulong_num
{
Lex->mi.ssl_verify_server_cert= $3 ?
- LEX_MASTER_INFO::SSL_ENABLE : LEX_MASTER_INFO::SSL_DISABLE;
+ LEX_MASTER_INFO::LEX_MI_ENABLE : LEX_MASTER_INFO::LEX_MI_DISABLE;
}
- | master_file_def
+
+ | MASTER_HEARTBEAT_PERIOD_SYM EQ NUM_literal
+ {
+ Lex->mi.heartbeat_period= (float) $3->val_real();
+ if (Lex->mi.heartbeat_period > SLAVE_MAX_HEARTBEAT_PERIOD ||
+ Lex->mi.heartbeat_period < 0.0)
+ {
+ const char format[]= "%d seconds";
+ char buf[4*sizeof(SLAVE_MAX_HEARTBEAT_PERIOD) + sizeof(format)];
+ my_sprintf(buf, (buf, format, SLAVE_MAX_HEARTBEAT_PERIOD));
+ my_error(ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
+ MYF(0),
+ " is negative or exceeds the maximum ",
+ buf);
+ MYSQL_YYABORT;
+ }
+ if (Lex->mi.heartbeat_period > slave_net_timeout)
+ {
+ push_warning_printf(YYTHD, MYSQL_ERROR::WARN_LEVEL_WARN,
+ ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
+ ER(ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE),
+ " exceeds the value of `slave_net_timeout' sec.",
+ " A sensible value for the period should be"
+ " less than the timeout.");
+ }
+ if (Lex->mi.heartbeat_period < 0.001)
+ {
+ if (Lex->mi.heartbeat_period != 0.0)
+ {
+ push_warning_printf(YYTHD, MYSQL_ERROR::WARN_LEVEL_WARN,
+ ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
+ ER(ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE),
+ " is less than 1 msec.",
+ " The period is reset to zero which means"
+ " no heartbeats will be sending");
+ Lex->mi.heartbeat_period= 0.0;
+ }
+ Lex->mi.heartbeat_opt= LEX_MASTER_INFO::LEX_MI_DISABLE;
+ }
+ Lex->mi.heartbeat_opt= LEX_MASTER_INFO::LEX_MI_ENABLE;
+ }
+ | IGNORE_SERVER_IDS_SYM EQ '(' ignore_server_id_list ')'
+ {
+ Lex->mi.repl_ignore_server_ids_opt= LEX_MASTER_INFO::LEX_MI_ENABLE;
+ }
+ |
+ master_file_def
;
+ignore_server_id_list:
+ /* Empty */
+ | ignore_server_id
+ | ignore_server_id_list ',' ignore_server_id
+ ;
+
+ignore_server_id:
+ ulong_num
+ {
+ insert_dynamic(&Lex->mi.repl_ignore_server_ids, (uchar*) &($1));
+ }
+
master_file_def:
MASTER_LOG_FILE_SYM EQ TEXT_STRING_sys
{
@@ -10201,6 +10268,11 @@ show_param:
LEX *lex= Lex;
lex->sql_command= SQLCOM_SHOW_BINLOG_EVENTS;
} opt_limit_clause_init
+ | RELAYLOG_SYM EVENTS_SYM binlog_in binlog_from
+ {
+ LEX *lex= Lex;
+ lex->sql_command= SQLCOM_SHOW_RELAYLOG_EVENTS;
+ } opt_limit_clause_init
| keys_or_index from_or_in table_ident opt_db where_clause
{
LEX *lex= Lex;
@@ -11867,6 +11939,7 @@ keyword_sp:
| REDO_BUFFER_SIZE_SYM {}
| REDOFILE_SYM {}
| REDUNDANT_SYM {}
+ | RELAYLOG_SYM {}
| RELAY_LOG_FILE_SYM {}
| RELAY_LOG_POS_SYM {}
| RELAY_THREAD {}
diff --git a/sql/structs.h b/sql/structs.h
index a58c18f97c5..bcda7b1e787 100644
--- a/sql/structs.h
+++ b/sql/structs.h
@@ -1,3 +1,6 @@
+#ifndef STRUCTS_INCLUDED
+#define STRUCTS_INCLUDED
+
/* Copyright (C) 2000-2006 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -380,3 +383,5 @@ public:
Discrete_interval* get_tail() const { return tail; };
Discrete_interval* get_current() const { return current; };
};
+
+#endif /* STRUCTS_INCLUDED */
diff --git a/sql/table.h b/sql/table.h
index e4a382c799f..f9da0637b4b 100644
--- a/sql/table.h
+++ b/sql/table.h
@@ -1,3 +1,6 @@
+#ifndef TABLE_INCLUDED
+#define TABLE_INCLUDED
+
/* Copyright 2000-2008 MySQL AB, 2008 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify
@@ -1741,3 +1744,4 @@ static inline void dbug_tmp_restore_column_maps(MY_BITMAP *read_set,
size_t max_row_length(TABLE *table, const uchar *data);
+#endif /* TABLE_INCLUDED */
diff --git a/sql/tzfile.h b/sql/tzfile.h
index 1ff82d62329..1c1800ba1ed 100644
--- a/sql/tzfile.h
+++ b/sql/tzfile.h
@@ -1,3 +1,6 @@
+#ifndef TZFILE_INCLUDED
+#define TZFILE_INCLUDED
+
/* Copyright (C) 2004 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -134,3 +137,5 @@ struct tzhead {
*/
#define isleap(y) (((y) % 4) == 0 && (((y) % 100) != 0 || ((y) % 400) == 0))
+
+#endif
diff --git a/sql/tztime.h b/sql/tztime.h
index 9bf103519c4..9990e91f17b 100644
--- a/sql/tztime.h
+++ b/sql/tztime.h
@@ -1,3 +1,6 @@
+#ifndef TZTIME_INCLUDED
+#define TZTIME_INCLUDED
+
/* Copyright (C) 2004 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -79,3 +82,4 @@ static const int MY_TZ_TABLES_COUNT= 4;
#endif /* !defined(TESTTIME) && !defined(TZINFO2SQL) */
+#endif /* TZTIME_INCLUDED */
diff --git a/sql/unireg.h b/sql/unireg.h
index f22be194952..a390b755772 100644
--- a/sql/unireg.h
+++ b/sql/unireg.h
@@ -1,3 +1,6 @@
+#ifndef UNIREG_INCLUDED
+#define UNIREG_INCLUDED
+
/* Copyright (C) 2000-2006 MySQL AB
This program is free software; you can redistribute it and/or modify
@@ -16,8 +19,6 @@
/* Extra functions used by unireg library */
-#ifndef _unireg_h
-
#ifndef NO_ALARM_LOOP
#define NO_ALARM_LOOP /* lib5 and popen can't use alarm */
#endif